1 from __future__ import with_statement
2
3 """This module contains several variants on mailboxes.
4 A mailbox is a way to pass information from one thread to another.
5 You put something into a mailbox; eventually the other thread takes
6 it out.
7
8 These mailboxes also have mechanisms so that you can shut down
9 a mailbox. You can "close" it so that the other side will
10 get an exception when it tries to read or write.
11
12 A typical server looks like this::
13
14 mbox = mailbox()
15
16 ... fork the client thread ...
17
18 while True:
19 try:
20 x = mbox.get()
21 except g_mailgox.EOF:
22 break
23 serve(x)
24
25 A typical client::
26
27 mbox = mailbox()
28
29 ... fork the server thread ...
30
31 for task in tasklist:
32 mbox.put(task)
33 mbox.putclose() # This causes the next "get" to raise an EOF.
34 """
35
36 import threading as T
37 import sys
38 Lock = T.Lock
39 Condition = T.Condition
40
41 DEBUG = False
42
43
44 if __debug__:
45 _prinfo_lock = T.Lock()
56 else:
59
60
61
63 if isinstance(x, (Exception, SystemExit, KeyboardInterrupt)):
64
65 raise x
66 elif isinstance(x, tuple):
67
68 if len(x) == 1:
69 raise x[0]
70 elif len(x) == 2:
71 raise x[0], x[1]
72 elif len(x) == 3:
73 raise x[0], x[1], x[2]
74 raise ValueError, "_Raise needs an exception or a tuple: got %s: %s" % (type(x), str(x))
75
76
77 -class EOF(Exception):
80
81
83 _empty = EOF()
84 eof = EOF()
85 ignore = EOF()
86
88 self.lock = T.Lock()
89 self.gc = T.Condition(self.lock)
90 self.pc = T.Condition(self.lock)
91 self.v = self._empty
92 self.psui = None
93 self.gsui = None
94 self.nps = -1
95 self.gps = -1
96
97
98 - def close(self, for_put=eof, for_get=eof, ngx=-1, npx=-1):
99 """Close the connection.
100 It causes other threads that use the mailbox to raise an exception,
101 unless you give it self._ignore as an argument.
102 Npx and ngx specify how many threads will commit suicide;
103 zero means none; setting either to 1 is a useful way to kill
104 a single thread.
105
106 Legal values for for_put and for_get are any exception,
107 anything returned from sys.exc_info, self.ignore and None.
108 - None => Do not raise an exception;
109 - self.ignore => Do not change whatever exception (or None)
110 is already stored.
111 """
112 with self.lock:
113 self.__set_psui(for_put, npx)
114 self.__set_gsui(for_get, ngx)
115
116
118 """@requires: Called under lock.
119 @note: suicide never happens when the mailbox is full.
120 """
121 if self.v is self._empty and self.gsui is not None:
122 tmpe = self.gsui
123 if self.ngs > 0:
124 self.ngs -= 1
125 if self.ngs == 0:
126 self.gsui = None
127 self.gc.notify()
128 _Raise(tmpe)
129
131 """@requires: Called under lock."""
132 if self.psui is not None:
133 tmpe = self.psui
134 if self.nps > 0:
135 self.nps -= 1
136 if self.nps == 0:
137 self.psui = None
138 self.pc.notify()
139 _Raise(tmpe)
140
141
143 """@requires: Called under lock."""
144 if for_put is not self.ignore and self.psui is None:
145 if npx == 0:
146 self.psui = None
147 else:
148 self.psui = for_put
149 self.pc.notifyAll()
150 self.nps = npx
151
152
154 """@requires: Called under lock."""
155 if for_get is not self.ignore and self.gsui is None:
156 if ngx == 0:
157 self.gsui = None
158 else:
159 self.gsui = for_get
160 self.gc.notifyAll()
161 self.ngs = ngx
162
163
165 with self.lock:
166 tmp = int( self.v is self._empty )
167 return tmp
168
170 with self.lock:
171 tmp = self.v is self._empty
172 return tmp
173
174
176 """Put a value into the mailbox. If one of the close()
177 functions has been used to set an exception on for_put,
178 it will raise the exception instead.
179 """
180 assert v is not self._empty
181 if __debug__:
182 prinfo('ready to put %s' % str(v))
183 with self.pc:
184 while self.v is not self._empty and self.psui is None:
185 self.pc.wait()
186 self.__psuicide()
187 self.v = v
188 self.gc.notify()
189 if __debug__:
190 prinfo('put done')
191
192
194 """Close the connection.
195 This is normally called by the producer of data
196 in place of put(). It can cause other threads to raise an exception.
197
198 This call acts like a
199 put() in that it will block until the mailbox
200 is emptied. It does not fill the mailbox, though.
201 Following this call:
202 - get() will not block (it will raise an exception
203 unless for_get is self.ignore or npx==0),
204 - put() will not block (it will raise an exception if
205 for_put is not self.ignore and npx!=0),
206 - likewise putclose() and getclose().
207
208 It will raise an exception (unless defer_ex is True)
209 if some other thread has closed
210 first and set a for_put exception. However, it will
211 set a for_get exception (if requested) before expiring.
212 """
213
214 with self.pc:
215 while self.v is not self._empty and self.psui is None:
216 self.pc.wait()
217 self.__set_gsui(for_get, ngx)
218 if not defer_ex:
219 self.__psuicide()
220
221 self.__set_psui(for_put, npx)
222 self.pc.notify()
223
224
226 """Raise any residual exceptions.
227 This is normally called by the producer of data
228
229 This call acts like a
230 put() in that it will block until the mailbox
231 is emptied or until an exception is waiting.
232 """
233
234 with self.pc:
235 while self.v is not self._empty and self.psui is None:
236 self.pc.wait()
237 self.__psuicide()
238 self.pc.notify()
239
240
242 """Get an item from the mailbox. Possibly, raise an exception if L{close} or L{putclose} has been called on the mailbox.
243 @note: Suicide has a lower priority than processing data.
244 So, if there is data in the mailbox and putclose() has
245 been called, one call to get() will succeed then
246 the second call will raise an exception.
247 @note: If the mailbox is empty, and if a previous call has specified
248 an exception to be raised by readers, raise it.
249 """
250 if __debug__:
251 prinfo('ready to get acquire')
252 with self.gc:
253 while self.v is self._empty and self.gsui is None:
254 self.gc.wait()
255 tmp = self.v
256 self.__gsuicide()
257 self.v = self._empty
258 self.gc.notify()
259
260
261
262 self.pc.notify()
263 if __debug__:
264 prinfo('get done=%s' % str(tmp))
265 return tmp
266
267
269 """Close the connection.
270 This is normally called by the consumer of data
271 in place of get().
272 It causes other threads to raise an exception.
273 It will raise an exception if some other thread has closed
274 the mailbox first.
275 Note that if there is something in the mailbox, it will
276 empty and return the value.
277 @note: If the mailbox is empty, and if a previous call has specified
278 an exception to be raised by readers, raise it.
279 """
280 with self.gc:
281 while self.v is self._empty and self.gsui is None:
282 self.gc.wait()
283 tmp = self.v
284 self.__set_psui(for_put, npx)
285 if not defer_ex:
286 self.__gsuicide()
287 self.v = self._empty
288 self.pc.notify()
289 self.__set_gsui(for_get, ngx)
290 return tmp
291
292
293
294 if __debug__ and DEBUG:
296
297
298
299 if self.v is not self._empty and not self.gsui and not self.psui:
300 prinfo('Mailbox deleted while non-empty')
301
302
304 """This is a stripped down mailbox that is good for one message
305 between one sender and one reciever.
306 It is an error to call put() or get() or putclose() more than once.
307
308 The normal usage is that you create the mailbox, spawn a thread, and hand the mailbox to the new thread.
309 Then, one side calls L{get} on the mailbox to wait for an answer.
310 The other side computes something, then calls L{put}.
311 The waitbox is then thrown away.
312 """
313 _empty = EOF()
314 _was_closed = EOF()
315 _has_been_used = EOF()
316
318 self.get_lock = T.Lock()
319 self.get_lock.acquire()
320 self.v = self._empty
321 self.sui = None
322
324 with self.get_lock:
325 tmp = self.v is self._empty
326 return tmp
327
329 """Close the connection.
330 This is normally called by the producer of data in place of L{put}().
331 It causes other threads to raise an exception.
332 """
333 self.sui = to_be_raised
334 self.get_lock.release()
335
336
338 self.v = v
339 self.get_lock.release()
340
341
343 """Get an object from the mailbox. This will normally block if the mailbox is empty.
344 @note: Suicide has a lower priority than processing data.
345 So, if there is data in the mailbox and L{putclose}() has
346 been called, one call to C{get()} will succeed;
347 the second call will raise an exception.
348 @note: If the mailbox is empty, and if a previous call has specified
349 an exception to be raised by readers, raise it.
350 """
351 self.get_lock.acquire()
352 self.get_lock = None
353 if self.sui is not None:
354 _Raise(self.sui)
355 self.sui = self._has_been_used
356 return self.v
357
358
359 if __debug__ and DEBUG:
361 if self.v is not self._empty and not self.sui:
362 prinfo('Mailbox deleted while non-empty')
363
364
365
366
367
369 """Like a mailbox, but produces infinite copies of whatever value
370 was put in there last. It is created with a value inside.
371 @note: This needs revision to more closely follow mailbox's methods.
372 """
373
374 _empty = EOF()
375
376 - def __init__(self, value=None, suicide=None):
377 self.value = value
378 self.suicide = suicide
379 self.lock = T.Lock()
380
381
383 with self.lock:
384 if self.suicide is None:
385 self.suicide = e_to_raise
386
387
389 with self.lock:
390 if self.suicide is not None:
391 tmp = self.suicide
392 self.lock.release()
393 _Raise(tmp)
394 return 1
395
396
398 with self.lock:
399 self.suicide = e_to_raise
400
401
403 with self.lock:
404 if self.suicide is not None:
405 tmp = self.suicide
406 self.lock.release()
407 _Raise(tmp)
408 self.value = v
409
410
412 """Get an object from the mailbox. This will normally block if the mailbox is empty.
413 @note: If the mailbox is empty, and if a previous call has specified
414 an exception to be raised by readers, raise it.
415 """
416 with self.lock:
417 if self.suicide is not None:
418 tmp = self.suicide
419 self.lock.release()
420 _Raise(tmp)
421 tmp = self.value
422 return tmp
423
424
426 """A mailbox with an unbounded queue."""
427 eof = EOF()
428 ignore = EOF()
429
431 self.lock = T.Lock()
432 self.gc = T.Condition(self.lock)
433 self.pc = self.lock
434 if initial:
435 self.v = list(initial)
436 else:
437 self.v = []
438 self.psui = None
439 self.gsui = None
440 self.nps = -1
441 self.gps = -1
442
443
444 - def close(self, for_put=eof, for_get=eof, ngx=-1, npx=-1):
445 """Close the connection.
446 It causes other threads to raise an exception,
447 unless you give it self._ignore as an argument.
448 Npx and ngx specify how many threads will commit suicide;
449 zero means none; setting either to 1 is a useful way to kill
450 a single thread.
451
452 Legal values for for_put and for_get are any exception,
453 anything returned from sys.exc_info, self.ignore and None.
454 - None => Do not raise an exception;
455 - self.ignore => Do not change whatever exception (or None)
456 is already stored.
457 """
458 with self.lock:
459 self.__set_psui(for_put, npx)
460 self.__set_gsui(for_get, ngx)
461
462
464 """@requires: Called under lock.
465 @note: Suicide never happens when the mailbox is full.
466 """
467 if (not self.v) and (self.gsui is not None):
468 tmpe = self.gsui
469 if self.ngs > 0:
470 self.ngs -= 1
471 if self.ngs == 0:
472 self.gsui = None
473 self.gc.notify()
474 _Raise(tmpe)
475
477 """@requires: Called under lock."""
478 if self.psui is not None:
479 tmpe = self.psui
480 if self.nps > 0:
481 self.nps -= 1
482 if self.nps == 0:
483 self.psui = None
484 _Raise(tmpe)
485
486
488 """@requires: Called under lock."""
489 if for_put is not self.ignore and self.psui is None:
490 if npx == 0:
491 self.psui = None
492 else:
493 self.psui = for_put
494 self.nps = npx
495
496
498 """@requires: Called under lock."""
499 if for_get is not self.ignore and self.gsui is None:
500 if ngx == 0:
501 self.gsui = None
502 else:
503 self.gsui = for_get
504 self.gc.notifyAll()
505 self.ngs = ngx
506
507
509 with self.lock:
510 tmp = len(self.v)
511 return tmp
512
513
515 """Put a value into the mailbox. If one of the close()
516 functions has been used to set an exception on for_put,
517 it will raise the exception instead.
518 """
519 if __debug__:
520 prinfo('ready to put %s' % str(v))
521 with self.pc:
522 self.__psuicide()
523 self.v.append(v)
524 self.gc.notify()
525 if __debug__:
526 prinfo('put done')
527
528
530 """Close the connection.
531 This is normally called by the producer of data
532 in place of put(). It can cause other threads to raise an exception.
533
534 It does not fill the mailbox.
535 Following this call,
536 - get() will not block (it will raise an exception
537 unless for_get is self.ignore or npx==0),
538 - put() will not block (it will raise an exception if
539 for_put is not self.ignore and npx!=0),
540 - likewise putclose() and getclose().
541
542 It will raise an exception (unless defer_ex is True)
543 if some other thread has closed
544 first and set a for_put exception. However, it will
545 set a for_get exception (if requested) before expiring.
546 """
547
548 with self.pc:
549 self.__set_gsui(for_get, ngx)
550 if not defer_ex:
551 self.__psuicide()
552
553 self.__set_psui(for_put, npx)
554
555
557 """Raise any residual exceptions.
558 This is normally called by the producer of data
559 """
560
561 with self.pc:
562 self.__psuicide()
563
564
566 """Get an object from the mailbox. This will normally block if the mailbox is empty.
567 @note: Suicide has a lower priority than processing data.
568 So, if there is data in the mailbox and putclose() has
569 been called, one call to get() will succeed;
570 the second call will raise an exception.
571 @note: If the mailbox is empty, and if a previous call has specified
572 an exception to be raised by readers, raise it.
573 """
574 if __debug__:
575 prinfo('ready to get acquire')
576 with self.gc:
577 while (not self.v) and self.gsui is None:
578 self.gc.wait()
579 if self.v:
580 tmp = self.v.pop(0)
581 else:
582 self.__gsuicide()
583 self.gc.notify()
584
585
586
587 if __debug__:
588 prinfo('get done=%s' % str(tmp))
589 return tmp
590
591
593 """Get an object from the mailbox and close the mailbox.
594 This is normally called by the consumer of data
595 in place of get().
596 It causes other threads to raise an exception.
597 @note: It will raise an exception if some other thread has closed the mailbox first.
598 """
599 with self.gc:
600 while (not self.v) and self.gsui is None:
601 self.gc.wait()
602 tmp = self.v.pop(0)
603 self.__set_psui(for_put, npx)
604 if (not self.v) and (not defer_ex):
605 self.__gsuicide()
606 self.__set_gsui(for_get, ngx)
607 return tmp
608
609
610
611 if __debug__ and DEBUG:
613
614
615
616 if self.v and not self.gsui and not self.psui:
617 prinfo('Mailbox deleted while non-empty')
618
619
620 maillist = listbox
621
622
624 for i in range(n):
625 m.put(i)
626 if m.get() != i:
627 print 'Tmp != i'
628 sys.exit(1)
629 prinfo('OK1')
630 m.putclose(KeyError())
631 try:
632 m.get()
633 print 'Whoops1'
634 sys.exit(1)
635 except KeyError:
636 pass
637 prinfo('OK2')
638
640 m.put(0)
641 if m.get() != 0:
642 print 'm.get() != 0'
643 sys.exit(1)
644 m.putclose(KeyError())
645 m.put(1)
646
647
648 if m.get() != 1:
649 print 'm.get() != 1'
650 sys.exit(1)
651 try:
652 m.get()
653 prinfo('Whoops1')
654 sys.exit(1)
655 except KeyError:
656 pass
657
658 m.putclose(for_put=EOF())
659 prinfo('OK3')
660 try:
661 m.put(0)
662 prinfo('Whoops2')
663 sys.exit(3)
664 except EOF:
665 pass
666 else:
667 prinfo('No exception 2')
668 sys.exit(4)
669 prinfo('OK4')
670
671
672 testmt_waiting = 0
685
686 m1 = mailbox()
687 m2 = mailbox()
688 m3 = mailbox()
689 m4 = mailbox()
690 prinfo('ok1')
691 for i in range(10):
692 t = T.Thread(target=innertest, args=(m1,m2,m3,m4))
693 t.start()
694 prinfo('ok2')
695 for i in range(10):
696 m1.put(i)
697 prinfo('ok3 %d' % i)
698 prinfo('ok4, %d' % m2.get())
699 if testmt_waiting != 10:
700 print 'testmt_waiting != 10'
701 sys.exit(1)
702 for i in range(10):
703 m3.put(i)
704 prinfo('ok5 %d' % i)
705 prinfo('ok6 %d' % m4.get())
706 if testmt_waiting != 0:
707 print 'Waiting=%d' % testmt_waiting
708 sys.exit(1)
709
710 if __name__ == '__main__':
711 test1(mailbox(), 30)
712 test2(mailbox())
713 testmt()
714 for i in range(5,100,20):
715 for sci in range(0, 100, 10):
716 sys.setcheckinterval(sci)
717 test1(mailbox(), i)
718 test2(mailbox())
719 testmt()
720 test1(maillist(), 30)
721 test2(maillist())
722 print "OK"
723