1 """This is designed to let you do asynchronous I/O conveniently.
2 """
3
4 from __future__ import with_statement
5
6 import sys
7 import threading
8 import die
9 import system_load as SL
10 from gyropy import g_mailbox
11
12 Lock = threading.Lock
16 self.ex_type, self.ex_value, self.ex_traceback = x
17
19 return "CaughtException: %s" % str(self.ex_value)
20
22 raise self.ex_type, self.ex_value, self.ex_traceback
23
24 get = reraise
25
28 - def __init__(self, tag=None, group=None, target=None, name=None, args=(), kwargs={}):
29 threading.Thread.__init__(self, group=group, target=target, name=name, args=args, kwargs=kwargs)
30 self.rv = None
31 self.rvset = False
32 self.tag = tag
33
35 try:
36 self.rv = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
37 except:
38 self.rv = CaughtException(sys.exc_info())
39 self.rvset = True
40
45
48 EOF = g_mailbox.EOF
49 CaughtException = CaughtException
50
52 if n is None:
53 n = SL.ncpu()
54 assert n > 0
55 self.iqueue = g_mailbox.listbox()
56 self.t = []
57 for i in range(n):
58 tt = threading.Thread(target=self._target, args=(self.iqueue,) )
59 tt.start()
60 self.t.append(tt)
61
64
65 - def startRV(self, target=None, args=None, kwargs=None):
66 assert target is not None
67 wbox = g_mailbox.waitbox()
68 self.iqueue.put((wbox, target, args, kwargs))
69 return wbox
70
71 - def startNR(self, target=None, args=None, kwargs=None):
72 assert target is not None
73 self.iqueue.put((None, target, args, kwargs))
74
75 - def startQ(self, queue, tag, target=None, args=None, kwargs=None):
76 def wrapped_target(target, *args, **kwargs):
77 return (tag, target(*args, **kwargs))
78 self.iqueue.put((queue, wrapped_target, args, kwargs))
79
80 @classmethod
82 while True:
83 try:
84 wbox, target, args, kwargs = iqueue.get()
85 except cls.EOF:
86 break
87 if kwargs is None:
88 kwargs = {}
89 if args is None:
90 args = ()
91
92 try:
93 if wbox is not None:
94 wbox.put(target(*args, **kwargs))
95 else:
96 target(*args, **kwargs)
97 except:
98 if wbox is not None:
99 wbox.put( cls.CaughtException(sys.exc_info()) )
100 else:
101 die.catch("Uncaught exception in Thread_pool thread")
102
104 if self.t:
105 self.iqueue.close()
106 while self.t:
107 tmp = self.t.pop(0)
108 tmp.join()
109
112
117 """Pick a ripe thread for joining.
118 @type tlist: list(L{threading.Thread})
119 @param tlist: a list of threads.
120 @return: a thread that is either finished and ready to be joined,
121 or (if none are ready yet) the oldest thread.
122 @rtype: L{threading.Thread}
123 """
124 for (i, t) in enumerate(tlist):
125 if not t.isAlive():
126 return tlist.pop(i)
127 return tlist.pop(0)
128
131 """Here, you have a pool of n threads. You can call
132 C{x.start(function, args, kwargs)} to start something running
133 (perhaps a write to a file), and then go off and compute
134 something else while the I/O completes.
135 If you need to wait for completion, call C{x.join()}.
136
137 This class is not designed to return values from the function.
138 """
139
142
143
144 - def start(self, fn, args=None, kwargs=None):
146
147
148
149
150 Thread_poolNR = Thread_poolW
157
160 """Here, you have a pool of n threads. You can call
161 C{x.start(function, args, kwargs)} to start something running
162 (perhaps a write to a file), and then go off and compute
163 something else while the I/O completes.
164 If you need to wait for completion, call C{x.join()}.
165
166 This class is designed to return values from the function via
167 C{get()} or C{getany()}.
168 """
169
171 Thread_pool.__init__(self, n)
172 self.lock = threading.Lock()
173 self.oqueue = g_mailbox.mailbox()
174 self.tags = {}
175
176
177 - def start(self, tag, fn, args=None, kwargs=None):
178 with self.lock:
179 self.tags[tag] = self.startRV(target=fn, args=args, kwargs=kwargs)
180
181
183 """
184 @return: the first available answer, as a (tag, value) pair. The tag was set
185 in the call to C{start} and the value is the result of the computation.
186 If the computation raised an exectption, C{getany} will return a CaughtException
187 object instead.
188 @rtype: a tuple(whatever, whatever) pair from a computation or a L{CaughtException}
189 """
190 with self.lock:
191 for tag in self.tags.keys():
192 if self.tags[tag]:
193 return (tag, self.tags.pop(tag).get())
194
195
196 tag, v = self.tags.popitem()
197 return (tag, v.get())
198
199
200 - def get(self, tag):
201 """
202 @rtype: whatever or a L{CaughtException}
203 """
204 with self.lock:
205 return self.tags.pop(tag).get()
206
207
209 """Is there an answer ready?"""
210 with self.lock:
211 for (k, v) in self.tags.items():
212 if len(v):
213 return True
214 return False
215
216
217
218
219
220
221
222
223
224
225
226 -def pairmap(fn, inlist, *args, **kwargs):
227 tp = Thread_poolR(kwargs.get('_poolsize', None))
228 no = 0
229 s = 0
230 for (j,xi) in enumerate(inlist):
231 tp.start(xi, fn, (xi,)+args, kwargs)
232 s += 1
233 if s > len(tp):
234 yield tp.getany()
235 no += 1
236 while no < s:
237 yield tp.getany()
238 no += 1
239
242 q = list(pairmap(lambda x:x, range(10)))
243 assert len(q)==10
244 seen = set()
245 for (i,j) in q:
246 assert i==j
247 seen.add(i)
248 assert sorted(seen)==range(10)
249
250
251 if __name__ == '__main__':
252 testmap()
253