66from threading import Lock
77
88from util import (
9- SyncQueue ,
109 AsyncQueue ,
1110 DummyLock
1211 )
1918
2019from graph import Graph
2120from channel import (
22- Channel ,
21+ mkchannel ,
2322 WChannel ,
23+ SerialWChannel ,
2424 RChannel
2525 )
2626
@@ -329,7 +329,8 @@ def _remove_task_if_orphaned(self, task):
329329
330330 #{ Interface
331331 def size (self ):
332- """:return: amount of workers in the pool"""
332+ """:return: amount of workers in the pool
333+ :note: method is not threadsafe !"""
333334 return self ._num_workers
334335
335336 def set_size (self , size = 0 ):
@@ -339,7 +340,9 @@ def set_size(self, size=0):
339340
340341 :return: self
341342 :param size: if 0, the pool will do all work itself in the calling thread,
342- otherwise the work will be distributed among the given amount of threads
343+ otherwise the work will be distributed among the given amount of threads.
344+ If the size is 0, newly added tasks will use channels which are NOT
345+ threadsafe to optimize item throughput.
343346
344347 :note: currently NOT threadsafe !"""
345348 assert size > - 1 , "Size cannot be negative"
@@ -437,17 +440,29 @@ def add_task(self, task):
437440 the task will be considered orphaned and will be deleted on the next
438441 occasion."""
439442 # create a write channel for it
440- wc , rc = Channel ()
441- rc = RPoolChannel (wc , task , self )
442- task .set_wc (wc )
443+ wctype = WChannel
443444
444445 self ._taskgraph_lock .acquire ()
445446 try :
446447 self ._taskorder_cache .clear ()
447448 self ._tasks .add_node (task )
449+
450+ # fix locks - in serial mode, the task does not need real locks
451+ # Additionally, use a non-threadsafe queue
452+ # This brings about 15% more performance, but sacrifices thread-safety
453+ # when reading from multiple threads.
454+ if self .size () == 0 :
455+ task ._slock = DummyLock ()
456+ wctype = SerialWChannel
457+ # END improve locks
458+
459+ # setup the tasks channel
460+ wc = wctype ()
461+ rc = RPoolChannel (wc , task , self )
462+ task .set_wc (wc )
448463 finally :
449464 self ._taskgraph_lock .release ()
450- # END sync task addition
465+ # END sync task addition
451466
452467 # If the input channel is one of our read channels, we add the relation
453468 if isinstance (task , InputChannelTask ):
@@ -462,11 +477,6 @@ def add_task(self, task):
462477 # END add task relation
463478 # END handle input channels for connections
464479
465- # fix locks - in serial mode, the task does not need real locks
466- if self .size () == 0 :
467- task ._slock = DummyLock ()
468- # END improve locks
469-
470480 return rc
471481
472482 #} END interface
0 commit comments