44 Full
55 )
66
7- from util import AsyncQueue
7+ from util import (
8+ AsyncQueue ,
9+ DummyLock
10+ )
11+
812from time import time
913import sys
1014
@@ -23,12 +27,9 @@ class Channel(object):
2327
2428 def __new__ (cls , * args ):
2529 if cls is Channel :
26- max_items = 0
27- if len (args ) == 1 :
28- max_items = args [0 ]
29- if len (args ) > 1 :
30- raise ValueError ("Specify not more than the number of items the channel should take" )
31- wc = WChannel (max_items )
30+ if len (args ) > 0 :
31+ raise ValueError ("Cannot take any arguments when creating a new channel" )
32+ wc = WChannel ()
3233 rc = RChannel (wc )
3334 return wc , rc
3435 # END constructor mode
@@ -39,11 +40,11 @@ class WChannel(Channel):
3940 """The write end of a channel"""
4041 __slots__ = ('_closed' , '_queue' )
4142
42- def __init__ (self , max_items = 0 ):
43+ def __init__ (self ):
4344 """initialize this instance, able to hold max_items at once
4445 Write calls will block if the channel is full, until someone reads from it"""
4546 self ._closed = False
46- self ._queue = AsyncQueue (max_items )
47+ self ._queue = AsyncQueue ()
4748
4849
4950 #{ Interface
@@ -74,7 +75,21 @@ def size(self):
7475 def close (self ):
7576 """Close the channel. Multiple close calls on a closed channel are no
7677 an error"""
78+ mutex = self ._queue .mutex
79+ mutex .acquire ()
80+ # this is atomic already, due to the GIL - no need to get the queue's mutex
81+ print "channel.close()"
7782 self ._closed = True
83+ # now make sure that the people waiting for an item are released now
84+ # As we it could be that some readers are already on their way to initiate
85+ # a blocking get, we must make sure that locks never block before that happens
86+
87+ # now we are the only one accessing the queue, so change it
88+ self ._queue .mutex = DummyLock ()
89+ print self ._queue .not_empty ._waiters
90+ self ._queue .not_empty .notify_all ()
91+ print self ._queue .not_empty ._waiters
92+ mutex .release ()
7893
7994 @property
8095 def closed (self ):
@@ -134,58 +149,47 @@ def read(self, count=0, block=True, timeout=None):
134149 pass
135150 # END handle exceptions
136151 else :
137- # if we have really bad timing, the source of the channel
138- # marks itself closed, but before setting it, the thread
139- # switches to us. We read it, read False, and try to fetch
140- # something, and never return. The whole closed channel thing
141- # is not atomic ( of course )
142- # This is why we never block for long, to get a chance to recheck
143- # for closed channels.
144- # We blend this into the timeout of the user
145- ourtimeout = 0.25 # the smaller, the more responsive, but the slower
146- wc = self ._wc
147- timeout = (timeout is None and sys .maxint ) or timeout # make sure we can compute with it
148- assert timeout != 0.0 , "shouldn't block if timeout is 0" # okay safe
149- if timeout and ourtimeout > timeout :
150- ourtimeout = timeout
151- # END setup timeout
152-
153152 # to get everything into one loop, we set the count accordingly
154153 if count == 0 :
155154 count = sys .maxint
156155 # END handle count
157156
157+ endtime = sys .maxint # allows timeout for whole operation
158+ if timeout is not None :
159+ endtime = time () + timeout
160+ # could be improved by a separate: no-endtime branch, saving the time calls
158161 for i in xrange (count ):
159- have_timeout = False
160- st = time ()
161- while True :
162+ try :
163+ print "about to read" , i , count , block , timeout
164+ out .append (queue .get (block , timeout ))
165+ print "got one"
166+ except Empty :
167+ pass
168+ # END ignore empty
169+
170+ # if we have been unblocked because the closed state changed
171+ # in the meanwhile, stop trying
172+ # NOTE: must NOT cache _wc
173+ if self ._wc .closed :
174+ # its racing time - all threads waiting for the queue
175+ # are awake now, and we actually can't be sure its empty
176+ # Hence we pop it empty without blocking, getting as much
177+ # as we can. This effectively lets us race ( with mutexes )
178+ # of the other threads.
179+ print "stopped because it was closed"
162180 try :
163- if wc .closed :
164- have_timeout = True
165- # its about the 'in the meanwhile' :) - get everything
166- # we can in non-blocking mode. This will raise
167- try :
168- while True :
169- out .append (queue .get (False ))
170- # END until it raises Empty
171- except Empty :
172- break
173- # END finally, out of here
174- # END don't continue on closed channels
175-
176- # END abort reading if it was closed ( in the meanwhile )
177- out .append (queue .get (block , ourtimeout ))
178- break # breakout right away
181+ while True :
182+ out .append (queue .get (False ))
183+ # END pop it empty
179184 except Empty :
180- if timeout - (time () - st ) <= 0 :
181- # hitting timeout
182- have_timeout = True
183- break
184- # END abort if the user wants no more time spent here
185- # END handle timeout
186- # END endless timer loop
187- if have_timeout :
185+ pass
186+ # END ignore emptyness, we have all
187+
188188 break
189+ # END handle cloased
190+
191+ if time () >= endtime :
192+ break
189193 # END stop on timeout
190194 # END for each item
191195 # END handle blocking
0 commit comments