Package gyropy :: Module g_mailbox
[frames] | no frames]

Source Code for Module gyropy.g_mailbox

  1  from __future__ import with_statement 
  2   
  3  """This module contains several variants on mailboxes. 
  4  A mailbox is a way to pass information from one thread to another. 
  5  You put something into a mailbox; eventually the other thread takes 
  6  it out. 
  7   
  8  These mailboxes  also have mechanisms so that you can shut down 
  9  a mailbox.   You can "close" it so that the other side will 
 10  get an exception when it tries to read or write. 
 11   
 12  A typical server looks like this:: 
 13   
 14          mbox = mailbox() 
 15   
 16          ... fork the client thread ... 
 17   
 18          while True: 
 19                  try: 
 20                          x = mbox.get() 
 21                  except g_mailgox.EOF: 
 22                          break 
 23                  serve(x) 
 24   
 25  A typical client:: 
 26   
 27          mbox = mailbox() 
 28   
 29          ... fork the server thread ... 
 30   
 31          for task in tasklist: 
 32                  mbox.put(task) 
 33          mbox.putclose() # This causes the next "get" to raise an EOF. 
 34  """ 
 35   
 36  import threading as T 
 37  import sys 
 38  Lock = T.Lock 
 39  Condition = T.Condition 
 40   
 41  DEBUG = False 
 42  # DEBUG = True 
 43   
 44  if __debug__: 
 45          _prinfo_lock = T.Lock() 
46 - def prinfo(*s):
47 global _prinfo_lock 48 if not DEBUG: 49 return 50 with _prinfo_lock: 51 sys.stderr.flush() 52 sys.stdout.flush() 53 tid = '???' 54 sys.stderr.writelines([tid] + [ str(q) for q in s] + ['\n']) 55 sys.stderr.flush()
56 else:
57 - def prinfo(*s):
58 pass
59 60 61
62 -def _Raise(x):
63 if isinstance(x, (Exception, SystemExit, KeyboardInterrupt)): 64 # print 'Exception', x 65 raise x 66 elif isinstance(x, tuple): 67 # print 'tuple', x 68 if len(x) == 1: 69 raise x[0] 70 elif len(x) == 2: 71 raise x[0], x[1] 72 elif len(x) == 3: 73 raise x[0], x[1], x[2] 74 raise ValueError, "_Raise needs an exception or a tuple: got %s: %s" % (type(x), str(x))
75 76
77 -class EOF(Exception):
78 - def __init__(self, *s):
79 Exception.__init__(self, *s)
80 81
82 -class mailbox(object):
83 _empty = EOF() 84 eof = EOF() 85 ignore = EOF() 86
87 - def __init__(self):
88 self.lock = T.Lock() 89 self.gc = T.Condition(self.lock) 90 self.pc = T.Condition(self.lock) 91 self.v = self._empty 92 self.psui = None 93 self.gsui = None 94 self.nps = -1 95 self.gps = -1
96 97
98 - def close(self, for_put=eof, for_get=eof, ngx=-1, npx=-1):
99 """Close the connection. 100 It causes other threads that use the mailbox to raise an exception, 101 unless you give it self._ignore as an argument. 102 Npx and ngx specify how many threads will commit suicide; 103 zero means none; setting either to 1 is a useful way to kill 104 a single thread. 105 106 Legal values for for_put and for_get are any exception, 107 anything returned from sys.exc_info, self.ignore and None. 108 - None => Do not raise an exception; 109 - self.ignore => Do not change whatever exception (or None) 110 is already stored. 111 """ 112 with self.lock: 113 self.__set_psui(for_put, npx) 114 self.__set_gsui(for_get, ngx)
115 116
117 - def __gsuicide(self):
118 """@requires: Called under lock. 119 @note: suicide never happens when the mailbox is full. 120 """ 121 if self.v is self._empty and self.gsui is not None: 122 tmpe = self.gsui 123 if self.ngs > 0: 124 self.ngs -= 1 125 if self.ngs == 0: 126 self.gsui = None 127 self.gc.notify() # Necessary for getclose(); get() case. 128 _Raise(tmpe)
129
130 - def __psuicide(self):
131 """@requires: Called under lock.""" 132 if self.psui is not None: 133 tmpe = self.psui 134 if self.nps > 0: 135 self.nps -= 1 136 if self.nps == 0: 137 self.psui = None 138 self.pc.notify() # Necessary for putclose(); put() case. 139 _Raise(tmpe)
140 141
142 - def __set_psui(self, for_put, npx):
143 """@requires: Called under lock.""" 144 if for_put is not self.ignore and self.psui is None: 145 if npx == 0: 146 self.psui = None 147 else: 148 self.psui = for_put 149 self.pc.notifyAll() 150 self.nps = npx
151 152
153 - def __set_gsui(self, for_get, ngx):
154 """@requires: Called under lock.""" 155 if for_get is not self.ignore and self.gsui is None: 156 if ngx == 0: 157 self.gsui = None 158 else: 159 self.gsui = for_get 160 self.gc.notifyAll() 161 self.ngs = ngx
162 163
164 - def __len__(self):
165 with self.lock: 166 tmp = int( self.v is self._empty ) 167 return tmp
168
169 - def __nonzero__(self):
170 with self.lock: 171 tmp = self.v is self._empty 172 return tmp
173 174
175 - def put(self, v):
176 """Put a value into the mailbox. If one of the close() 177 functions has been used to set an exception on for_put, 178 it will raise the exception instead. 179 """ 180 assert v is not self._empty 181 if __debug__: 182 prinfo('ready to put %s' % str(v)) 183 with self.pc: 184 while self.v is not self._empty and self.psui is None: 185 self.pc.wait() 186 self.__psuicide() 187 self.v = v 188 self.gc.notify() 189 if __debug__: 190 prinfo('put done')
191 192
193 - def putclose(self, for_get=eof, for_put=ignore, ngx=-1, npx=-1, defer_ex=False):
194 """Close the connection. 195 This is normally called by the producer of data 196 in place of put(). It can cause other threads to raise an exception. 197 198 This call acts like a 199 put() in that it will block until the mailbox 200 is emptied. It does not fill the mailbox, though. 201 Following this call: 202 - get() will not block (it will raise an exception 203 unless for_get is self.ignore or npx==0), 204 - put() will not block (it will raise an exception if 205 for_put is not self.ignore and npx!=0), 206 - likewise putclose() and getclose(). 207 208 It will raise an exception (unless defer_ex is True) 209 if some other thread has closed 210 first and set a for_put exception. However, it will 211 set a for_get exception (if requested) before expiring. 212 """ 213 # print 'putclose' 214 with self.pc: 215 while self.v is not self._empty and self.psui is None: 216 self.pc.wait() 217 self.__set_gsui(for_get, ngx) 218 if not defer_ex: 219 self.__psuicide() 220 # self.v remains empty 221 self.__set_psui(for_put, npx) 222 self.pc.notify() # A put can follow a putclose()
223 224
225 - def putraise(self):
226 """Raise any residual exceptions. 227 This is normally called by the producer of data 228 229 This call acts like a 230 put() in that it will block until the mailbox 231 is emptied or until an exception is waiting. 232 """ 233 # print 'putclose' 234 with self.pc: 235 while self.v is not self._empty and self.psui is None: 236 self.pc.wait() 237 self.__psuicide() 238 self.pc.notify() # A put or putclose can follow a putraise()
239 240
241 - def get(self):
242 """Get an item from the mailbox. Possibly, raise an exception if L{close} or L{putclose} has been called on the mailbox. 243 @note: Suicide has a lower priority than processing data. 244 So, if there is data in the mailbox and putclose() has 245 been called, one call to get() will succeed then 246 the second call will raise an exception. 247 @note: If the mailbox is empty, and if a previous call has specified 248 an exception to be raised by readers, raise it. 249 """ 250 if __debug__: 251 prinfo('ready to get acquire') 252 with self.gc: 253 while self.v is self._empty and self.gsui is None: 254 self.gc.wait() 255 tmp = self.v 256 self.__gsuicide() 257 self.v = self._empty 258 self.gc.notify() # Even though we empty the mailbox, we 259 # need to unblock the next call to get() 260 # because it might need to suicide 261 # (if a suitable exception is set). 262 self.pc.notify() 263 if __debug__: 264 prinfo('get done=%s' % str(tmp)) 265 return tmp
266 267
268 - def getclose(self, for_put=eof, for_get=ignore, npx=-1, ngx=-1, defer_ex=False):
269 """Close the connection. 270 This is normally called by the consumer of data 271 in place of get(). 272 It causes other threads to raise an exception. 273 It will raise an exception if some other thread has closed 274 the mailbox first. 275 Note that if there is something in the mailbox, it will 276 empty and return the value. 277 @note: If the mailbox is empty, and if a previous call has specified 278 an exception to be raised by readers, raise it. 279 """ 280 with self.gc: 281 while self.v is self._empty and self.gsui is None: 282 self.gc.wait() 283 tmp = self.v 284 self.__set_psui(for_put, npx) 285 if not defer_ex: 286 self.__gsuicide() 287 self.v = self._empty 288 self.pc.notify() 289 self.__set_gsui(for_get, ngx) 290 return tmp
291 292 293 294 if __debug__ and DEBUG:
295 - def __del__(self):
296 # We don't need to acquire or release any locks, because we only 297 # get to __del__ if there is nothing holding a reference 298 # to this object, so that nothing can be waiting on a lock. 299 if self.v is not self._empty and not self.gsui and not self.psui: 300 prinfo('Mailbox deleted while non-empty')
301 302
303 -class waitbox(object):
304 """This is a stripped down mailbox that is good for one message 305 between one sender and one reciever. 306 It is an error to call put() or get() or putclose() more than once. 307 308 The normal usage is that you create the mailbox, spawn a thread, and hand the mailbox to the new thread. 309 Then, one side calls L{get} on the mailbox to wait for an answer. 310 The other side computes something, then calls L{put}. 311 The waitbox is then thrown away. 312 """ 313 _empty = EOF() 314 _was_closed = EOF() 315 _has_been_used = EOF() 316
317 - def __init__(self):
318 self.get_lock = T.Lock() 319 self.get_lock.acquire() 320 self.v = self._empty 321 self.sui = None
322
323 - def __nonzero__(self):
324 with self.get_lock: 325 tmp = self.v is self._empty 326 return tmp
327
328 - def putclose(self, to_be_raised=_was_closed):
329 """Close the connection. 330 This is normally called by the producer of data in place of L{put}(). 331 It causes other threads to raise an exception. 332 """ 333 self.sui = to_be_raised 334 self.get_lock.release()
335 336
337 - def put(self, v):
338 self.v = v 339 self.get_lock.release()
340 341
342 - def get(self):
343 """Get an object from the mailbox. This will normally block if the mailbox is empty. 344 @note: Suicide has a lower priority than processing data. 345 So, if there is data in the mailbox and L{putclose}() has 346 been called, one call to C{get()} will succeed; 347 the second call will raise an exception. 348 @note: If the mailbox is empty, and if a previous call has specified 349 an exception to be raised by readers, raise it. 350 """ 351 self.get_lock.acquire() 352 self.get_lock = None 353 if self.sui is not None: 354 _Raise(self.sui) 355 self.sui = self._has_been_used 356 return self.v
357 358 359 if __debug__ and DEBUG:
360 - def __del__(self):
361 if self.v is not self._empty and not self.sui: 362 prinfo('Mailbox deleted while non-empty')
363 # We don't need to unlock the lock, because we only 364 # get to __del__ if there is nothing holding a reference 365 # to this object, so that nothing can be waiting on a lock. 366 367
368 -class repbox(object):
369 """Like a mailbox, but produces infinite copies of whatever value 370 was put in there last. It is created with a value inside. 371 @note: This needs revision to more closely follow mailbox's methods. 372 """ 373 374 _empty = EOF() 375
376 - def __init__(self, value=None, suicide=None):
377 self.value = value 378 self.suicide = suicide 379 self.lock = T.Lock()
380 381
382 - def close(self, e_to_raise=_empty):
383 with self.lock: 384 if self.suicide is None: 385 self.suicide = e_to_raise
386 387
388 - def __len__(self):
389 with self.lock: 390 if self.suicide is not None: 391 tmp = self.suicide 392 self.lock.release() 393 _Raise(tmp) 394 return 1
395 396
397 - def closenow(self, e_to_raise=_empty):
398 with self.lock: 399 self.suicide = e_to_raise
400 401
402 - def put(self, v):
403 with self.lock: 404 if self.suicide is not None: 405 tmp = self.suicide 406 self.lock.release() 407 _Raise(tmp) 408 self.value = v
409 410
411 - def get(self):
412 """Get an object from the mailbox. This will normally block if the mailbox is empty. 413 @note: If the mailbox is empty, and if a previous call has specified 414 an exception to be raised by readers, raise it. 415 """ 416 with self.lock: 417 if self.suicide is not None: 418 tmp = self.suicide 419 self.lock.release() 420 _Raise(tmp) 421 tmp = self.value 422 return tmp
423 424
425 -class listbox(object):
426 """A mailbox with an unbounded queue.""" 427 eof = EOF() 428 ignore = EOF() 429
430 - def __init__(self, initial=None):
431 self.lock = T.Lock() 432 self.gc = T.Condition(self.lock) 433 self.pc = self.lock 434 if initial: 435 self.v = list(initial) 436 else: 437 self.v = [] 438 self.psui = None 439 self.gsui = None 440 self.nps = -1 441 self.gps = -1
442 443
444 - def close(self, for_put=eof, for_get=eof, ngx=-1, npx=-1):
445 """Close the connection. 446 It causes other threads to raise an exception, 447 unless you give it self._ignore as an argument. 448 Npx and ngx specify how many threads will commit suicide; 449 zero means none; setting either to 1 is a useful way to kill 450 a single thread. 451 452 Legal values for for_put and for_get are any exception, 453 anything returned from sys.exc_info, self.ignore and None. 454 - None => Do not raise an exception; 455 - self.ignore => Do not change whatever exception (or None) 456 is already stored. 457 """ 458 with self.lock: 459 self.__set_psui(for_put, npx) 460 self.__set_gsui(for_get, ngx)
461 462
463 - def __gsuicide(self):
464 """@requires: Called under lock. 465 @note: Suicide never happens when the mailbox is full. 466 """ 467 if (not self.v) and (self.gsui is not None): 468 tmpe = self.gsui 469 if self.ngs > 0: 470 self.ngs -= 1 471 if self.ngs == 0: 472 self.gsui = None 473 self.gc.notify() # Necessary for getclose(); get() case. 474 _Raise(tmpe)
475
476 - def __psuicide(self):
477 """@requires: Called under lock.""" 478 if self.psui is not None: 479 tmpe = self.psui 480 if self.nps > 0: 481 self.nps -= 1 482 if self.nps == 0: 483 self.psui = None 484 _Raise(tmpe)
485 486
487 - def __set_psui(self, for_put, npx):
488 """@requires: Called under lock.""" 489 if for_put is not self.ignore and self.psui is None: 490 if npx == 0: 491 self.psui = None 492 else: 493 self.psui = for_put 494 self.nps = npx
495 496
497 - def __set_gsui(self, for_get, ngx):
498 """@requires: Called under lock.""" 499 if for_get is not self.ignore and self.gsui is None: 500 if ngx == 0: 501 self.gsui = None 502 else: 503 self.gsui = for_get 504 self.gc.notifyAll() 505 self.ngs = ngx
506 507
508 - def __len__(self):
509 with self.lock: 510 tmp = len(self.v) 511 return tmp
512 513
514 - def put(self, v):
515 """Put a value into the mailbox. If one of the close() 516 functions has been used to set an exception on for_put, 517 it will raise the exception instead. 518 """ 519 if __debug__: 520 prinfo('ready to put %s' % str(v)) 521 with self.pc: 522 self.__psuicide() 523 self.v.append(v) 524 self.gc.notify() 525 if __debug__: 526 prinfo('put done')
527 528
529 - def putclose(self, for_get=eof, for_put=ignore, ngx=-1, npx=-1, defer_ex=False):
530 """Close the connection. 531 This is normally called by the producer of data 532 in place of put(). It can cause other threads to raise an exception. 533 534 It does not fill the mailbox. 535 Following this call, 536 - get() will not block (it will raise an exception 537 unless for_get is self.ignore or npx==0), 538 - put() will not block (it will raise an exception if 539 for_put is not self.ignore and npx!=0), 540 - likewise putclose() and getclose(). 541 542 It will raise an exception (unless defer_ex is True) 543 if some other thread has closed 544 first and set a for_put exception. However, it will 545 set a for_get exception (if requested) before expiring. 546 """ 547 # print 'putclose' 548 with self.pc: 549 self.__set_gsui(for_get, ngx) 550 if not defer_ex: 551 self.__psuicide() 552 # self.v remains empty 553 self.__set_psui(for_put, npx)
554 555
556 - def putraise(self):
557 """Raise any residual exceptions. 558 This is normally called by the producer of data 559 """ 560 # print 'putclose' 561 with self.pc: 562 self.__psuicide()
563 564
565 - def get(self):
566 """Get an object from the mailbox. This will normally block if the mailbox is empty. 567 @note: Suicide has a lower priority than processing data. 568 So, if there is data in the mailbox and putclose() has 569 been called, one call to get() will succeed; 570 the second call will raise an exception. 571 @note: If the mailbox is empty, and if a previous call has specified 572 an exception to be raised by readers, raise it. 573 """ 574 if __debug__: 575 prinfo('ready to get acquire') 576 with self.gc: 577 while (not self.v) and self.gsui is None: 578 self.gc.wait() 579 if self.v: 580 tmp = self.v.pop(0) 581 else: 582 self.__gsuicide() 583 self.gc.notify() # Even if we empty the mailbox, we 584 # need to unblock the next call to get() 585 # because it might need to suicide 586 # (if a suitable exception is set). 587 if __debug__: 588 prinfo('get done=%s' % str(tmp)) 589 return tmp
590 591
592 - def getclose(self, for_put=eof, for_get=ignore, npx=-1, ngx=-1, defer_ex=False):
593 """Get an object from the mailbox and close the mailbox. 594 This is normally called by the consumer of data 595 in place of get(). 596 It causes other threads to raise an exception. 597 @note: It will raise an exception if some other thread has closed the mailbox first. 598 """ 599 with self.gc: 600 while (not self.v) and self.gsui is None: 601 self.gc.wait() 602 tmp = self.v.pop(0) 603 self.__set_psui(for_put, npx) 604 if (not self.v) and (not defer_ex): 605 self.__gsuicide() 606 self.__set_gsui(for_get, ngx) 607 return tmp
608 609 610 611 if __debug__ and DEBUG:
612 - def __del__(self):
613 # We don't need to acquire or release any locks, because we only 614 # get to __del__ if there is nothing holding a reference 615 # to this object, so that nothing can be waiting on a lock. 616 if self.v and not self.gsui and not self.psui: 617 prinfo('Mailbox deleted while non-empty')
618 619 620 maillist = listbox 621 622
623 -def test1(m, n=1000):
624 for i in range(n): 625 m.put(i) 626 if m.get() != i: 627 print 'Tmp != i' 628 sys.exit(1) 629 prinfo('OK1') 630 m.putclose(KeyError()) 631 try: 632 m.get() 633 print 'Whoops1' 634 sys.exit(1) 635 except KeyError: 636 pass # Got EOF 637 prinfo('OK2')
638
639 -def test2(m):
640 m.put(0) 641 if m.get() != 0: 642 print 'm.get() != 0' 643 sys.exit(1) 644 m.putclose(KeyError()) 645 m.put(1) 646 # Get will not raise an exception here because 647 # there is something in the mailbox. 648 if m.get() != 1: 649 print 'm.get() != 1' 650 sys.exit(1) 651 try: 652 m.get() 653 prinfo('Whoops1') 654 sys.exit(1) 655 except KeyError: 656 pass # Got EOF 657 658 m.putclose(for_put=EOF()) 659 prinfo('OK3') 660 try: 661 m.put(0) 662 prinfo('Whoops2') 663 sys.exit(3) 664 except EOF: 665 pass # Got EOF 666 else: 667 prinfo('No exception 2') 668 sys.exit(4) 669 prinfo('OK4')
670 671 672 testmt_waiting = 0
673 -def testmt():
674 675 def innertest(m1, m2, m3, m4): 676 global testmt_waiting 677 i = m1.get() 678 prinfo('innertest: Got m1=%d' % i) 679 testmt_waiting += 1 680 prinfo('innertest: putting m2=%d' % i) 681 m2.put(i) 682 i = m3.get() 683 testmt_waiting -= 1 684 m4.put(i)
685 686 m1 = mailbox() 687 m2 = mailbox() 688 m3 = mailbox() 689 m4 = mailbox() 690 prinfo('ok1') 691 for i in range(10): 692 t = T.Thread(target=innertest, args=(m1,m2,m3,m4)) 693 t.start() 694 prinfo('ok2') 695 for i in range(10): 696 m1.put(i) 697 prinfo('ok3 %d' % i) 698 prinfo('ok4, %d' % m2.get()) 699 if testmt_waiting != 10: 700 print 'testmt_waiting != 10' 701 sys.exit(1) 702 for i in range(10): 703 m3.put(i) 704 prinfo('ok5 %d' % i) 705 prinfo('ok6 %d' % m4.get()) 706 if testmt_waiting != 0: 707 print 'Waiting=%d' % testmt_waiting 708 sys.exit(1) 709 710 if __name__ == '__main__': 711 test1(mailbox(), 30) 712 test2(mailbox()) 713 testmt() 714 for i in range(5,100,20): 715 for sci in range(0, 100, 10): 716 sys.setcheckinterval(sci) 717 test1(mailbox(), i) 718 test2(mailbox()) 719 testmt() 720 test1(maillist(), 30) 721 test2(maillist()) 722 print "OK" 723