From e609c627ed88456f4416597d68c484a17642b2d3 Mon Sep 17 00:00:00 2001 From: Sean Zimmermann Date: Tue, 3 Apr 2018 19:16:33 -0700 Subject: [PATCH 1/7] Fixes issue 24882 --- Lib/concurrent/futures/thread.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 2af31a106dd914..5d692883dcf987 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -80,7 +80,14 @@ def _worker(executor_reference, work_queue, initializer, initargs): work_item.run() # Delete references to object. See issue16284 del work_item + + # attempt to increment idle count + executor = executor_reference() + if executor is not None: + executor._increase_idle_count() + del executor continue + executor = executor_reference() # Exit if: # - The interpreter is shutting down OR @@ -133,6 +140,8 @@ def __init__(self, max_workers=None, thread_name_prefix='', self._max_workers = max_workers self._work_queue = queue.SimpleQueue() + self._idle_lock = threading.Lock() + self._idle_count = 0 self._threads = set() self._broken = False self._shutdown = False @@ -178,12 +187,17 @@ def submit(*args, **kwargs): submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): + #if idle threads are available, don't spin new threads + with self._idle_lock: + if self._idle_count > 0: + self._idle_count -= 1 + return + # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. + num_threads = len(self._threads) if num_threads < self._max_workers: thread_name = '%s_%d' % (self._thread_name_prefix or self, @@ -198,6 +212,10 @@ def weakref_cb(_, q=self._work_queue): self._threads.add(t) _threads_queues[t] = self._work_queue + def _increase_idle_count(self): + with self._idle_lock: + self._idle_count += 1 + def _initializer_failed(self): with self._shutdown_lock: self._broken = ('A thread initializer failed, the thread pool ' From 3fa038e66e688c07a469cafc31fafaafe735a2ce Mon Sep 17 00:00:00 2001 From: Sean Zimmermann Date: Wed, 4 Apr 2018 14:57:55 -0700 Subject: [PATCH 2/7] Add news file entry for change. --- .../NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst diff --git a/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst b/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst new file mode 100644 index 00000000000000..8c418824a99d96 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-04-04-14-54-30.bpo-24882.urybpa.rst @@ -0,0 +1 @@ +Change ThreadPoolExecutor to use existing idle threads before spinning up new ones. \ No newline at end of file From 6c954330235e3cefecf2d65c6768fdff8d92c9ca Mon Sep 17 00:00:00 2001 From: Sean Zimmermann Date: Wed, 4 Apr 2018 15:29:38 -0700 Subject: [PATCH 3/7] Change test_concurrent_futures.ThreadPoolShutdownTest Adjust the shutdown test so that, after submitting three jobs to the executor, the test checks for less than three threads, instead of looking for exactly three threads. If idle threads are being recycled properly, then we should have less than three threads. --- Lib/test/test_concurrent_futures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 3c963dff1db2c9..c51642be297471 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -349,7 +349,7 @@ def test_threads_terminate(self): self.executor.submit(mul, 21, 2) self.executor.submit(mul, 6, 7) self.executor.submit(mul, 3, 14) - self.assertEqual(len(self.executor._threads), 3) + self.assertTrue(len(self.executor._threads) < 3) self.executor.shutdown() for t in self.executor._threads: t.join() From eb4591120eb110024fb8ad0dd811c2e509eab5df Mon Sep 17 00:00:00 2001 From: "Sean Zimmermann (SURFACE)" Date: Mon, 20 May 2019 15:54:36 -0700 Subject: [PATCH 4/7] Switched idle count to semaphor, Updated tests As suggested by reviewer tomMoral, swapped lock-protected counter with a semaphore to track the number of unused threads. Adjusted test_threads_terminate to wait for completiton of the previous future before submitting a new one (and checking the number of threads used). Also added a new test to confirm the thread pool can be saturated. --- Lib/concurrent/futures/thread.py | 15 ++++----------- Lib/test/test_concurrent_futures.py | 12 ++++++++---- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 5d692883dcf987..64d045fe4e33c6 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -84,7 +84,7 @@ def _worker(executor_reference, work_queue, initializer, initargs): # attempt to increment idle count executor = executor_reference() if executor is not None: - executor._increase_idle_count() + executor._idle_semaphore.release() del executor continue @@ -140,8 +140,7 @@ def __init__(self, max_workers=None, thread_name_prefix='', self._max_workers = max_workers self._work_queue = queue.SimpleQueue() - self._idle_lock = threading.Lock() - self._idle_count = 0 + self._idle_semaphore = threading.Semaphore(0) self._threads = set() self._broken = False self._shutdown = False @@ -188,10 +187,8 @@ def submit(*args, **kwargs): def _adjust_thread_count(self): #if idle threads are available, don't spin new threads - with self._idle_lock: - if self._idle_count > 0: - self._idle_count -= 1 - return + if self._idle_semaphore.acquire(timeout=0): + return # When the executor gets lost, the weakref callback will wake up # the worker threads. @@ -212,10 +209,6 @@ def weakref_cb(_, q=self._work_queue): self._threads.add(t) _threads_queues[t] = self._work_queue - def _increase_idle_count(self): - with self._idle_lock: - self._idle_count += 1 - def _initializer_failed(self): with self._shutdown_lock: self._broken = ('A thread initializer failed, the thread pool ' diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index c51642be297471..dffaec09673956 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -346,10 +346,10 @@ def _prime_executor(self): pass def test_threads_terminate(self): - self.executor.submit(mul, 21, 2) - self.executor.submit(mul, 6, 7) - self.executor.submit(mul, 3, 14) - self.assertTrue(len(self.executor._threads) < 3) + self.executor.submit(mul, 21, 2).result() + self.executor.submit(mul, 6, 7).result() + self.executor.submit(mul, 3, 14).result() + self.assertTrue(len(self.executor._threads) == 1) self.executor.shutdown() for t in self.executor._threads: t.join() @@ -753,6 +753,10 @@ def test_default_workers(self): self.assertEqual(executor._max_workers, (os.cpu_count() or 1) * 5) + def test_saturation(self): + list(self.executor.map(lambda x: mul(0, x), range(100 * self.executor._max_workers))) + self.assertEqual(len(self.executor._threads), self.executor._max_workers) + class ProcessPoolExecutorTest(ExecutorTest): From ea0de69ad856f7e26bd0355a8515771e1ef587ce Mon Sep 17 00:00:00 2001 From: "Sean Zimmermann (SURFACE)" Date: Tue, 21 May 2019 15:27:46 -0700 Subject: [PATCH 5/7] Updates tests as requested by pitrou. --- Lib/concurrent/futures/thread.py | 2 +- Lib/test/test_concurrent_futures.py | 34 ++++++++++++++++++++++++----- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 64d045fe4e33c6..ad6b4c20b56681 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -186,7 +186,7 @@ def submit(*args, **kwargs): submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): - #if idle threads are available, don't spin new threads + # if idle threads are available, don't spin new threads if self._idle_semaphore.acquire(timeout=0): return diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index dffaec09673956..c84661e32c6a3a 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -346,10 +346,15 @@ def _prime_executor(self): pass def test_threads_terminate(self): - self.executor.submit(mul, 21, 2).result() - self.executor.submit(mul, 6, 7).result() - self.executor.submit(mul, 3, 14).result() - self.assertTrue(len(self.executor._threads) == 1) + def acquire_lock(lock): + lock.acquire() + + sem = threading.Semaphore(0) + for i in range(3): + self.executor.submit(acquire_lock, sem) + self.assertEqual(len(self.executor._threads), 3) + for i in range(3): + sem.release() self.executor.shutdown() for t in self.executor._threads: t.join() @@ -754,8 +759,25 @@ def test_default_workers(self): (os.cpu_count() or 1) * 5) def test_saturation(self): - list(self.executor.map(lambda x: mul(0, x), range(100 * self.executor._max_workers))) - self.assertEqual(len(self.executor._threads), self.executor._max_workers) + executor = self.executor_type() + def acquire_lock(lock): + lock.acquire() + + sem = threading.Semaphore(0) + for i in range(100 * executor._max_workers): + executor.submit(acquire_lock, sem) + self.assertEqual(len(executor._threads), executor._max_workers) + for i in range(100 * executor._max_workers): + sem.release() + executor.shutdown(wait=True) + + def test_idle_thread_reuse(self): + executor = self.executor_type() + executor.submit(mul, 21, 2).result() + executor.submit(mul, 6, 7).result() + executor.submit(mul, 3, 14).result() + self.assertEqual(len(executor._threads), 1) + executor.shutdown(wait=True) class ProcessPoolExecutorTest(ExecutorTest): From 3c99c9c7ddaa5d14d07f253e01a4b6bbd55bc559 Mon Sep 17 00:00:00 2001 From: "Sean Zimmermann (SURFACE)" Date: Tue, 21 May 2019 15:50:44 -0700 Subject: [PATCH 6/7] Correct minor whitespace error. --- Lib/test/test_concurrent_futures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index c84661e32c6a3a..0bfa202242f0d4 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -770,7 +770,7 @@ def acquire_lock(lock): for i in range(100 * executor._max_workers): sem.release() executor.shutdown(wait=True) - + def test_idle_thread_reuse(self): executor = self.executor_type() executor.submit(mul, 21, 2).result() From 753527655ecf5e7763895afcb2ea3bbea16e0ec7 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 22 May 2019 22:58:34 +0200 Subject: [PATCH 7/7] Make test_saturation faster --- Lib/test/test_concurrent_futures.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 0bfa202242f0d4..b63f7d1bbe3477 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -759,15 +759,15 @@ def test_default_workers(self): (os.cpu_count() or 1) * 5) def test_saturation(self): - executor = self.executor_type() + executor = self.executor_type(4) def acquire_lock(lock): lock.acquire() sem = threading.Semaphore(0) - for i in range(100 * executor._max_workers): + for i in range(15 * executor._max_workers): executor.submit(acquire_lock, sem) self.assertEqual(len(executor._threads), executor._max_workers) - for i in range(100 * executor._max_workers): + for i in range(15 * executor._max_workers): sem.release() executor.shutdown(wait=True)