Package gyropy :: Module g_proxy
[frames] | no frames]

Source Code for Module gyropy.g_proxy

   1  #!/usr/bin/env python 
   2   
   3  """This will behave badly under isinstance(). 
   4  if isinstance(x,y) probably not isinstance(remote_x, y). 
   5  Also, dicts are passed by value instead of by reference, 
   6  so if you remotely modify a dict, you will not see the modification 
   7  locally. 
   8  """ 
   9   
  10   
  11  # import exceptions 
  12  import weakref 
  13  import types 
  14  import sys 
  15  import cPickle 
  16  import g_threading as T 
  17  import imp 
  18  import StringIO 
  19  import time 
  20  import traceback 
  21  import g_pipe 
  22  import g_mailbox as GM 
  23  import os 
  24   
  25  __version__ = '0.23' 
  26  DEBUG = 2 
  27  # DEBUG = False 
  28  LOGFILE = './g_proxy.client' 
  29   
  30   
  31  Id = id 
  32   
  33  TRANSPORTABLE_TYPES = ( # We transport only immutable types. 
  34                  # int, float, complex, list, tuple, str, 
  35                  int, float, complex, tuple, str, long, 
  36                  types.NoneType, types.BooleanType, 
  37                  # dict, types.ClassType, types.InstanceType, set, frozenset 
  38                  types.ClassType, types.InstanceType, frozenset, 
  39                  set,    # This ought not to be transported, but it fails the proxy 
  40                          # equality test.    This indicates some kind of bug. 
  41                  dict    # Dict needs to be here to make cPickle.load() work. 
  42                  ) 
  43  TRANSPORTABLE_TYPES_O = frozenset( [ Id(q) for q in TRANSPORTABLE_TYPES] + [Id(object)] ) 
  44   
  45  NEVER, ALWAYS, MAYBE = 0, 1, 2 
  46  _Names = ['INITIALIZATION', 'GET_CLASSINFO', 'R_TEST', 'RM_STORE', 
  47          'SETA', 'SETAP', 
  48          'DELA', 'GETA', 'RCALL', 'RMCALL', 'IMPORT_MOD', 'GLOBALS', 
  49          'STDOUT', 'STDERR', 'VALUE', 'EXCEPTION', 
  50          'QUIT_1', 'QUIT_2', 'ABORT' 
  51          ] 
  52   
  53  NO_CI_ATTR = frozenset(['__dict__', '__class__', '__weakref__', '__init__', 
  54                          '__getattribute__', '__setattr__', '__delattr__', 
  55                          '__del__', 
  56                          '__new__' 
  57                          ] 
  58                  ) 
  59   
  60   
61 -class ProtocolError(Exception):
62 - def __init__(self, *a):
63 Exception.__init__(self, *a)
64 65 _loglock = T.Lock() 66 _logfd = None
67 -def log(*args):
68 global _logfd 69 global _loglock 70 if _logfd is None: 71 return 72 _loglock.acquire() 73 for (i,a) in enumerate(args): 74 if i == 0: 75 _logfd.writelines( [str(a), '\n'] ) 76 else: 77 _logfd.writelines( ['\t', str(a), '\n'] ) 78 _logfd.flush() 79 _loglock.release()
80 81
82 -class test0(object):
83 pass
84
85 -class test0classic:
86 pass
87
88 -class test1(object):
89 whatami = 'I am the class' 90
91 - def __init__(self, x):
92 self.x = x 93 self.whatami = 'I am the instance'
94
95 - def __eq__(self, other):
96 return self.x == other.x and hasattr(self, '__eq__') and hasattr(other, '__eq__')
97
98 - def w(self):
99 prinfo('executing test1.w()') 100 return self.whatami
101 102 assert test1(3) == test1(3) 103
104 -class test1classic:
105 whatami = 'I am the class' 106
107 - def __init__(self, x):
108 self.x = x 109 self.whatami = 'I am the instance'
110
111 - def __eq__(self, other):
112 return self.x == other.x and hasattr(self, '__eq__') and hasattr(other, '__eq__')
113
114 - def w(self):
115 prinfo('executing test1.w()') 116 return self.whatami
117 assert test1classic(3) == test1classic(3) 118 119 120 TEST = tuple( [ 121 (None, False, 1, 2.0, frozenset([33,11]), (6,), 'seven', {8:9}, {9:{}}), 122 complex(3.0, 4.0), 123 ( set([10]), ), 124 test1(3), 125 test1classic(3), 126 "end of equality test", 127 # Up to here, the local and remote test cases 128 # should return equality. 129 test0(), 130 test0classic(), 131 sys, 132 (Exception(), Exception(5)) 133 ]) 134 135
136 -class proxy_object(object):
137 pass
138
139 -def Prinfo(s):
140 log(s)
141 142 143 if DEBUG:
144 - def Oid(o):
145 return '%s%d' % (str(type(o)), Id(o))
146 - def diO(s):
147 if s == '': 148 return None 149 return s
150 # def prinfo(s): 151 # sys.stderr.writelines([s, '\n']) 152 # sys.stderr.flush()
153 - def prinfo(s):
154 log(s)
155 - def S(s):
156 if isinstance(s, proxy_object): 157 return '<REMOTE %s>' % id(s) 158 return str(s)
159 160 for tmp in _Names: 161 globals()[tmp] = tmp 162 del tmp 163 else:
164 - def Oid(o):
165 return '%d%d' % (Id(type(o)), Id(o))
166 - def diO(s):
167 if s == '': 168 return None 169 return s
170 - def prinfo(s):
171 pass
172 - def S(s):
173 pass
174 for (i, tmp) in enumerate(_Names): 175 globals()[tmp] = i + 100 176 del tmp 177 del i 178 del _Names 179 180
181 -class object_store(object):
182 - def __init__(self, storetype=dict):
183 self.store = storetype() 184 self.lock = T.Lock(verbose=DEBUG>1, name='object_store')
185 # prinfo('OS:%d__init__: locked: %s' % (Id(self), self.lock.locked())) 186
187 - def put(self, xid, x):
188 # prinfo('OS%d before put lock: %s' % (Id(self), self.lock.locked())) 189 self.lock.acquire() 190 self.store[xid] = x 191 # prinfo('OS%d in put lock' % Id(self)) 192 self.lock.release() 193 # prinfo('OS%d past put lock: locked: %s' % (Id(self), self.lock.locked())) 194 return xid
195
196 - def putid(self, x):
197 return self.put(Oid(x), x)
198 199
200 - def rm(self, oid):
201 self.lock.acquire() 202 del self.store[oid] 203 # prinfo('OS in rm lock') 204 self.lock.release()
205
206 - def get(self, xid):
207 self.lock.acquire() 208 try: 209 tmp = self.store[xid] 210 finally: 211 self.lock.release() 212 return tmp
213 214
215 - def disconnect(self):
216 """This is only called for the rowlp case, 217 and the object store is a WeakDictionary, so 218 if we encounter an object, we know that something 219 else references it. Consequently, we cannot 220 depend on it being deleted and calling its __del__ item 221 any time soon. 222 """ 223 while self.store: 224 self.lock.acquire() 225 oid, v = self.store.popitem() 226 try: 227 object.__getattribute__(v, 'conn') 228 except AttributeError: 229 pass 230 else: 231 object.__setattr__(v, 'conn', None) 232 self.lock.release()
233 234
235 - def clear(self):
236 self.lock.acquire() 237 self.store.clear() 238 self.lock.release()
239 240 241
242 -def deal_with_arbitrary_exception(c, boxid):
243 ty, ob, trb = sys.exc_info() 244 if ty is None: 245 log('Whoops: deal_with_arbitrary_exception', (ty, ob)) 246 tmp = tuple(traceback.format_tb(trb)) 247 del trb 248 log('INFO: deal_with... %s.%s' % (ty.__module__, ty.__name__), *(ob.args)) 249 log('INFO: deal_with... traceback:', *tuple(tmp)) 250 c.send(EXCEPTION, boxid, (ty.__name__, ty.__module__, 251 Oid(ty), ob.args, tmp 252 ), 253 proxy=NEVER 254 ) 255 del ob
256 257 258
259 -class C_threadstate(object):
260 IDLE = 0 261 BUSY = 1 262 STARTING = 2 263
264 - def __init__(self):
265 self.lock = T.Condition(verbose=DEBUG>1, name='threadstate') 266 self.state = {} 267 # self.terminated = [] 268 self.expected_stops = 0
269
270 - def to_idle(self, me):
271 self.lock.acquire() 272 self.state[me] = self.IDLE 273 prinfo('to_idle: %d %s' % (id(me), me)) 274 self._print() 275 self.lock.notify() 276 self.lock.release()
277
278 - def to_busy(self, me):
279 self.lock.acquire() 280 prinfo('to_busy: %d %s' % (id(me), me)) 281 self.state[me] = self.BUSY 282 self._print() 283 self.lock.notify() 284 self.lock.release()
285
286 - def to_stopped(self, me):
287 self.lock.acquire() 288 prinfo('to_stopped: %d %s' % (id(me), me)) 289 # self.terminated.append(me) 290 ime = me 291 # if ime in self.state: 292 # if self.expected_stops > 0: 293 # self.expected_stops -= 1 294 # del self.state[ime] 295 # self.lock.notify() 296 if self.expected_stops > 0: 297 self.expected_stops -= 1 298 del self.state[ime] 299 self._print() 300 self.lock.notify() 301 self.lock.release()
302
303 - def to_starting(self, me):
304 self.lock.acquire() 305 prinfo('to_starting: %d %s' % (id(me), me)) 306 self.state[me] = self.STARTING 307 self._print() 308 self.lock.notify() 309 self.lock.release()
310
311 - def _print(self):
312 """Call within lock.""" 313 prinfo('threadstate:') 314 m = {self.IDLE: 'idle', self.BUSY: 'busy', self.STARTING: 'starting'} 315 for (k, v) in self.state.items(): 316 prinfo('\t%d : %s' % (k, m[v]))
317
318 - def anticipate_stop(self):
319 prinfo('anticpate_stop') 320 self.lock.acquire() 321 self.expected_stops += 1 322 self.lock.release()
323
324 - def n_busy(self):
325 self.lock.acquire() 326 n = 0 327 for s in self.state.values(): 328 if s == self.BUSY: 329 n += 1 330 self.lock.release() 331 return n
332
333 - def n_idle(self):
334 self.lock.acquire() 335 n = -self.expected_stops 336 for s in self.state.values(): 337 if s == self.IDLE: 338 n += 1 339 self.lock.release() 340 return max(0, n)
341 342
343 - def list(self):
344 self.lock.acquire() 345 tmp = self.state.keys() 346 self.lock.release() 347 return tmp
348
349 - def busylist(self):
350 self.lock.acquire() 351 tmp = [] 352 for (idme,s) in self.state.items(): 353 if s == self.BUSY: 354 tmp.append(idme) 355 self.lock.release() 356 return tmp
357
358 - def wait_until_all_done(self):
359 """Called from main thread.""" 360 self.lock.acquire() 361 # for thr in self.terminated: 362 # prinfo('WUID: joining1 %d %s' % (id(thr), thr)) 363 # thr.join() 364 while len(self.state) > 0: 365 prinfo('WUID: waiting: %s' % str(self.state)) 366 self.lock.wait() 367 # for thr in self.terminated: 368 # prinfo('WUID: joining2 %d %s' % (id(thr), thr)) 369 # thr.join() 370 self.lock.release()
371
372 - def reap_stopped_threads(self):
373 """Called from main thread.""" 374 pass
375 376
377 -def thread_within_listen(inbox, threadstate, c):
378 me = T.get_ident() 379 prinfo('thread_within_listen starting') 380 while True: 381 threadstate.to_idle(me) 382 try: 383 # prinfo('twi: ready for inbox.get') 384 info = inbox.get() 385 threadstate.to_busy(me) 386 # prinfo('twi: inbox.gotten') 387 except GM.EOF: 388 # prinfo('twibox.get got a EOF: %d %s' % (threadstate.n_idle(), threadstate.list())) 389 threadstate.to_stopped(me) 390 return 391 except: 392 threadstate.to_stopped(me) 393 raise 394 395 # if info is None: 396 # break # Shut a thread down per request. 397 398 op, boxid, interior = info 399 arg = c.pickler.load(interior) 400 prinfo('TWI %s %s %s' % (S(op), S(boxid), S(arg))) 401 402 if op == INITIALIZATION: 403 v, oOidlist = arg 404 assert v == __version__ 405 assert c.remote_special_objs is None 406 c.remote_special_objs = dict(zip(oOidlist, 407 [ proxy_object, type ] 408 )) 409 c.s_listen_ready.put(None) 410 elif op == QUIT_1: 411 qc = 0 412 while c._gc is not None and qc < 4: 413 c._gc.collect() # See what we can shake loose. 414 time.sleep(0.1) # Wait for it to have an effect on the other side, 415 # and maybe shake loose something there. 416 if threadstate.n_busy() <= 1: # Wait till things quiet down. 417 qc += 1 418 else: 419 prinfo('Busy - waiting: %s' % S(threadstate.busylist())) 420 c.send(QUIT_2, None, None) 421 c.reason = 'QUIT' 422 423 elif op == STDOUT: 424 if c.stdout is not None: 425 c.stdout.fprint(arg) 426 elif op == STDERR: 427 if c.stderr is not None: 428 c.stderr.fprint(arg) 429 430 elif op == VALUE: 431 # prinfo('value ready to find') 432 mbox = c.box.find(boxid, 'value') 433 # prinfo('value ready to put') 434 mbox.put( (op, arg) ) 435 elif op == EXCEPTION: 436 mbox = c.box.find(boxid, 'Exception') 437 mbox.put( (op, arg) ) 438 439 440 # These are basically server calls below: 441 elif op == RM_STORE: 442 c.lowrp.rm(arg) 443 elif op == GETA: 444 objid, k = arg 445 try: 446 value = getattr(c.lowrp.get(objid), k) 447 except: 448 deal_with_arbitrary_exception(c, boxid) 449 else: 450 c.send(VALUE, boxid, value) 451 452 elif op == R_TEST: 453 c.send(VALUE, boxid, TEST[arg]) 454 elif op == GET_CLASSINFO: 455 clobj = c.lo_clo.get(arg) 456 try: 457 bs = clobj.__bases__ 458 except AttributeError: 459 bs = None 460 if bs is not None: 461 bs = tuple( [ c.lo_clo.putid(b) for b in bs ] ) 462 try: 463 dct = tuple( clobj.__dict__.items() ) 464 except AttributeError: 465 dct = None 466 if dct is not None: 467 dct = tuple( [ (k, callable(v)) 468 for (k,v) in getattr(clobj, '__dict__', {}).items() 469 if k not in NO_CI_ATTR 470 ] ) 471 tmp = (clobj.__name__, bs, dct) 472 c.send(VALUE, boxid, tmp, proxy=NEVER) 473 elif op in (SETA, SETAP): 474 objid, attr, value = arg 475 if op == SETAP: 476 value = c.lowrp.get(value) 477 try: 478 setattr(c.lowrp.get(objid), attr, value) 479 except: 480 deal_with_arbitrary_exception(c, boxid) 481 else: 482 c.send(VALUE, boxid, None, proxy=NEVER) 483 elif op == DELA: 484 objid, attr = arg 485 try: 486 delattr(c.lowrp.get(objid), attr) 487 except: 488 deal_with_arbitrary_exception(c, boxid) 489 else: 490 c.send(VALUE, boxid, None, proxy=NEVER) 491 elif op in (RCALL, RMCALL): 492 if op == RCALL: 493 objid, args, kwpairs = arg 494 obj = c.lowrp.get(objid) 495 else: 496 objid, aname, args, kwpairs = arg 497 obj = getattr(c.lowrp.get(objid), aname) 498 499 kwargs = dict(kwpairs) 500 501 assert isinstance(args, tuple) 502 assert isinstance(kwargs, dict) 503 assert hasattr(obj, '__call__') 504 try: 505 value = obj(*args, **kwargs) 506 except: 507 log('R[M]CALL %s obj=%s' % (str(op), str(obj)), str(args), str(kwargs)) 508 deal_with_arbitrary_exception(c, boxid) 509 else: 510 c.send(VALUE, boxid, value) 511 elif op == IMPORT_MOD: 512 name, path = arg 513 prinfo('import mod in twi: %s %s' % (S(name), S(path))) 514 try: 515 value = import_mod(name, path) 516 except: 517 deal_with_arbitrary_exception(c, boxid) 518 else: 519 prinfo('imported module %s' % S(value)) 520 c.send(VALUE, boxid, value, proxy=ALWAYS) 521 elif op == GLOBALS: 522 value = globals() 523 c.send(VALUE, boxid, value, proxy=ALWAYS) 524 else: 525 threadstate.to_stopped(me) 526 raise ProtocolError, "recv op=%s" % op
527 528 # prinfo('TWI END %s, %d still busy' % (S(boxid), threadstate.n_busy())) 529 530 531 532 533
534 -def thread_within_listen_wrapper(mbox, threadstate, conn):
535 try: 536 thread_within_listen(mbox, threadstate, conn) 537 except GM.EOF: 538 pass 539 except: 540 log('Exception in thread_within_listen:', traceback.format_exc()) 541 sys.exit(2)
542 543 544
545 -class io_grabber(object):
546 """This buffers text, transmitting it in units of lines. 547 That's important at the other end if there are multiple 548 connections, because fancyprint adds a prefix at the 549 beginning of each write, assuming it is the beginning 550 of each line.""" 551
552 - def __init__(self, name):
553 log('iograbber.__init__') 554 self.name = name 555 self.c = None 556 self.tmp = [] 557 self.tinl = [] 558 self.lock = T.Lock(verbose=DEBUG>1, name='io_grabber')
559
560 - def connect(self, c):
561 log('iograbber.connect') 562 self.c = c 563 self.lock.acquire() 564 self._dump() 565 self.lock.release()
566 567
568 - def write(self, s):
569 log('write:', s) 570 return self.writelines( (s,) )
571 572
573 - def writelines(self, x):
574 log('writelines:', x) 575 if isinstance(x, str): 576 x = (x,) 577 self.lock.acquire() 578 for s in x: 579 if s: 580 try: 581 i = s.rindex('\n') 582 self.tmp.append( s[:i+1] ) 583 self.tinl.append( len(self.tmp) ) 584 if i+1 < len(s): 585 self.tmp.append( s[i+1:] ) 586 except ValueError: 587 self.tmp.append(s) 588 if self.c is not None: 589 self._dump() 590 self.lock.release()
591 592
593 - def _dump(self):
594 log('_dump:') 595 j = 0 596 for i in self.tinl: 597 try: 598 jis = ''.join(self.tmp[j:i]) 599 self.c.send(self.name, None, (jis,), proxy=NEVER) 600 j = i 601 except IOError: 602 pass 603 self.tinl = []
604 605
606 - def flush(self):
607 log('_flush:') 608 if self.c is not None and self.tmp: 609 try: 610 self.c.send(self.name, None, tuple(self.tmp), proxy=NEVER) 611 except IOError: 612 pass 613 self.tmp = [] 614 self.tinl = []
615 616
617 - def close(self):
618 self.flush()
619 620 621 622 623
624 -class io_labeler(object):
625 - def __init__(self, name):
626 self.name = name 627 self.rstderr = sys.stderr
628
629 - def writelines(self, x):
630 self.rstderr.writelines( ('E', self.name, ':',) ) 631 self.rstderr.writelines(x) 632 self.rstderr.flush()
633
634 - def write(self, s):
635 self.rstderr.write('E') 636 self.rstderr.write(self.name) 637 self.rstderr.write(':') 638 self.rstderr.write(s) 639 self.rstderr.flush()
640
641 - def flush(self):
642 pass
643 644
645 -def import_mod(name, path):
646 """Import a module from the specified path, or, failing that, 647 look in sys.path then for a builtin. 648 If path is None, only look in sys.path and builtins. 649 If path is an array containing None, replace the None with sys.path. 650 """ 651 # print '#LM name=', name, 'path=', path 652 # print 'MODULES=', sys.modules.keys() 653 if path is None: 654 pth = None 655 else: 656 pth = [] 657 for d in path: 658 if d is None: 659 pth.extend(sys.path) 660 else: 661 pth.append( d ) 662 fd = None 663 imp.acquire_lock() 664 try: 665 fd, pn, desc = imp.find_module(name, pth) 666 except ImportError: 667 try: 668 fd, pn, desc = imp.find_module(name, None) 669 except: 670 imp.release_lock() 671 raise 672 673 if name in sys.modules: 674 if hasattr(sys.modules[name], '__file__'): 675 if os.path.dirname(sys.modules[name].__file__) == os.path.dirname(pn): 676 imp.release_lock() 677 return sys.modules[name] 678 else: 679 imp.release_lock() 680 return sys.modules[name] 681 682 try: 683 pymod = imp.load_module(name, fd, pn, desc) 684 finally: 685 if fd: 686 fd.close() 687 imp.release_lock() 688 return pymod
689 690 assert import_mod('sys', None).path 691 692
693 -def s_listen(c): # Run in separate thread
694 """This reads answers and puts them in the correct mailboxes.""" 695 prinfo('Starting s_listen') 696 # TWI_thread_name = 'Twi-thread%d' % id(c) # Must be unique! 697 MAXIDLE = 6 698 twibox = GM.mailbox() 699 threadstate = C_threadstate() 700 701 c.send(INITIALIZATION, None, 702 (__version__, 703 (Oid(object), Oid(type)) 704 ) 705 ) 706 c.reason = None 707 while True: 708 prinfo('s_listen LOOP') 709 try: 710 op, boxid, interior = cPickle.load(c.rfd) 711 except cPickle.UnpicklingError: 712 prinfo('Unpickling Error - bad instruction from other side') 713 c.reason = 'ERROR: Unpickling' 714 break 715 except EOFError: 716 prinfo('EOF in s_listen') 717 c.reason = 'EOF' 718 break 719 prinfo('s_listen op=%s, boxid=%s' % (S(op), S(boxid))) 720 721 if op in (ABORT, QUIT_2): 722 c.reason = op 723 break 724 725 n_idle = threadstate.n_idle() 726 if n_idle == 0: 727 prinfo('s_listen: add one more thread') 728 t = T.start_new_thread(thread_within_listen_wrapper, 729 (twibox, threadstate, c) 730 ) 731 GM.Lockstate.setname(me=t, name='twl%d' % t) 732 # t = T.Thread( target=thread_within_listen_wrapper, 733 # args=(twibox, threadstate, c), 734 # name=TWI_thread_name 735 # ) 736 threadstate.to_starting(t) 737 # t.setDaemon(True) # KLUGE 738 # t.start() 739 elif n_idle > MAXIDLE: 740 prinfo('s_listen: shutting down one thread') 741 # twibox.put(None) 742 threadstate.anticipate_stop() 743 twibox.putclose(for_get=GM.EOF(), ngx=1) 744 # prinfo('s_listen: ready to twibox.put: idle=%d' % threadstate.n_idle()) 745 twibox.put( ( op, boxid, interior) ) 746 threadstate.reap_stopped_threads() 747 # prinfo('s_listen<') 748 prinfo('s_listen shutdown') 749 twibox.putclose() 750 del twibox 751 prinfo('waiting for twi_threads') 752 threadstate.wait_until_all_done() 753 prinfo('twi_threads joined') 754 c.close_wfd() 755 c.lowrp.clear() 756 c.rowlp.disconnect() 757 c.class_of_inst.disconnect() 758 c.lo_clo.disconnect() 759 prinfo('S_listen DONE - putting reason=%s' % c.reason) 760 761 762
763 -def s_listen_wrapper(c, s_listen_done): # Run in separate thread
764 try: 765 s_listen(c) 766 s_listen_done.put('OK') 767 except: 768 log('Exception in s_listen', traceback.format_exc()) 769 s_listen_done.put('exception') 770 sys.exit(2) 771 772
773 -class dropsite(object):
774 - def __init__(self):
775 self.lock = T.Lock(verbose=DEBUG>1, name='dropsite') 776 self.boxes = {}
777
778 - def make(self, dbg=''):
779 self.lock.acquire() 780 wbox = GM.waitbox() 781 boxid = '%s#%d' % (dbg, Id(wbox)) 782 self.boxes[boxid] = wbox 783 self.lock.release() 784 return (wbox, boxid)
785 786
787 - def find(self, boxid, dbg=None):
788 """For internal use only.""" 789 assert dbg is not None 790 # prinfo('dropsite: pre get %s %s' % (boxid, dbg)) 791 self.lock.acquire() 792 tmp = self.boxes.pop(boxid) 793 self.lock.release() 794 # prinfo('dropsite: gotten %s %s' % (boxid, dbg)) 795 return tmp
796
797 - def count(self):
798 self.lock.acquire() 799 tmp = len(self.boxes) 800 self.lock.release() 801 return tmp
802 803 804 805
806 -class fancyprint(object):
807 - def __init__(self, fd, prefix='', lock=None):
808 if lock is None: 809 self.lock = T.Lock(verbose=DEBUG>1, name='fancyprint') 810 else: 811 self.lock = lock 812 self.fd = fd 813 self.prefix = prefix
814
815 - def fprint(self, s):
816 assert isinstance(s, tuple) 817 self.lock.acquire() 818 if self.fd is not None: 819 for ls in s: 820 self.fd.writelines( (self.prefix, ls) ) 821 self.fd.flush() 822 self.lock.release()
823 824
825 -def get_raise(mbox, conn):
826 # prinfo('get_raise waiting for get') 827 op, rv = mbox.get() 828 if op == EXCEPTION: 829 ex_name, ex_mod, ex_clid, ex_args, trb = rv 830 log('get_raise: trb: %s.%s' % (ex_mod, ex_name), *tuple(trb)) 831 tmp = construct_enhanced_exception(conn, ex_name, ex_mod, 832 ex_clid, ex_args, trb) 833 log('get_raise: c_e_e done', str(tmp), tmp.args, tmp.remote_traceback) 834 raise tmp 835 elif op == VALUE: 836 pass 837 else: 838 log('get_raise: unknown op: %s' % str(op)) 839 raise RuntimeError, 'bad op in get_raise' 840 return rv
841 842
843 -class pickler(object):
844 - def __init__(self, conn):
845 assert isinstance(conn, Connection) 846 self.conn = conn 847 self.check = hash(self)
848
849 - def dump(self, obj, proxy=NEVER):
850 buf = StringIO.StringIO() 851 p = cPickle.Pickler(buf, cPickle.HIGHEST_PROTOCOL) 852 if proxy == ALWAYS: 853 p.persistent_id = self.oid 854 elif proxy == MAYBE: 855 p.persistent_id = self.persistent_id 856 p.dump(obj) 857 return buf.getvalue()
858 859
860 - def persistent_id(self, obj):
861 assert hash(self) == self.check 862 if isinstance(obj, TRANSPORTABLE_TYPES): 863 rv = None 864 elif Id(obj) in TRANSPORTABLE_TYPES_O: 865 rv = None 866 else: 867 rv = self.oid(obj) 868 prinfo('persistent_id (pid) for [%s]%s = %s' % (S(obj), S(type(obj)), S(rv))) 869 return rv
870 871
872 - def oid(self, obj):
873 tmp = self.conn.lowrp.putid(obj) 874 try: 875 clobj = obj.__class__ 876 except AttributeError: 877 clobj = None 878 if clobj is not None: 879 cloid = self.conn.lo_clo.putid(clobj) 880 else: 881 cloid = '' 882 rv = '%s~%s' % (str(tmp), str(cloid)) 883 prinfo('persistent_id (oid) for [%s]%s = %s' % (S(obj), S(type(obj)), S(rv))) 884 return rv
885 886
887 - def load(self, s):
888 prinfo('unpickle.load') 889 up = cPickle.Unpickler(StringIO.StringIO(s)) 890 up.persistent_load = self.persistent_load 891 tmp = up.load() 892 prinfo('unpickled') 893 return tmp
894 895
896 - def persistent_load(self, oidstr):
897 prinfo('beginning persistent_load on %s' % S(oidstr)) 898 assert hash(self) == self.check 899 oid, cloid = oidstr.split('~') 900 return proxy_factory(diO(oid), diO(cloid), self.conn)
901 902 903 904
905 -class Connection(object):
906 """You need to call close().""" 907 sys = sys 908 try: 909 import gc as _gc 910 except ImportError: 911 _gc = None 912
913 - def __init__(self, wfd, rfd, stderr, stdout):
914 self.stderr = stderr # used in thread_within_listen 915 self.stdout = stdout # used in thread_within_listen 916 self.pickler = pickler(self) 917 918 # local objects with remote proxies: 919 self.lowrp = object_store(dict) 920 # remote objects with local proxies: 921 self.rowlp = object_store(weakref.WeakValueDictionary) 922 self.class_of_inst = object_store(weakref.WeakValueDictionary) 923 self.lo_clo = object_store(weakref.WeakValueDictionary) 924 925 self.qmlock = T.Lock(verbose=DEBUG>1, name='Connection.qmlock') 926 self.remote_special_objs = None # will be replaced 927 self.box = dropsite() 928 self.wfd = wfd 929 self.rfd = rfd 930 self.slock = T.Lock(verbose=DEBUG>1, name='Connection.slock') 931 self.closed = False 932 self.reason = None 933 self.s_listen_ready = GM.waitbox() 934 self.s_listen_done = GM.waitbox() 935 # self.s_listen = T.Thread(target=s_listen_wrapper, 936 # args= (self,), 937 # name='s_listen_thread' 938 # ) 939 940 t = T.start_new_thread(s_listen_wrapper, (self, self.s_listen_done)) 941 GM.Lockstate.setname(me=t, name='s_listen_wrapper') 942 943 # self.s_listen.setDaemon(True) # KLUGE 944 # Start the listener: 945 # self.s_listen.start() 946 # ...and wait until the listener is running. 947 self.s_listen_ready.get()
948 949
950 - def send(self, op, boxid, args, proxy=MAYBE):
951 # prinfo('c.send( op=%s id=%s ...)' % (op, boxid)) 952 interior = self.pickler.dump( args, proxy=proxy ) 953 tmp = cPickle.dumps( (op, boxid, interior) ) 954 # prinfo('Pickle done in send') 955 self.slock.acquire() 956 try: 957 self.wfd.write(tmp) 958 self.wfd.flush() 959 finally: 960 self.slock.release()
961 # prinfo('send done (%s %s)' % (op, boxid)) 962
963 - def close_wfd(self):
964 """Called as part of the QUIT sequence.""" 965 self.slock.acquire() 966 try: 967 self.wfd.close() 968 finally: 969 self.slock.release() 970 self.wfd = None
971
972 - def remote_remove_from_store(self, oid):
973 try: 974 self.send(RM_STORE, None, oid) 975 except IOError: 976 # These IO errors happen in the shutdown process 977 # when the file descriptors get closed before 978 # all the proxies are deleted. They are of no 979 # importance, as the cached objects on the other 980 # side will be deleted anyway. 981 prinfo('rm from store - IO error') 982 pass
983
984 - def remote_test(self, i):
985 mbox, boxid = self.box.make('remote_test%d' % i) 986 self.send(R_TEST, boxid, i) 987 return get_raise(mbox, self)
988
989 - def remote_setattr(self, objid, k, v):
990 # prinfo('remote_setattr') 991 mbox, boxid = self.box.make('remote_setattr:%s' % k) 992 try: 993 self.send(SETA, boxid, (objid, str(k), v)) 994 except cPickle.PicklingError: 995 self.send(SETAP, boxid, (objid, str(k), self.lowrp.putid(v))) 996 get_raise(mbox, self)
997
998 - def remote_delattr(self, objid, k):
999 # prinfo('remote_delattr') 1000 mbox, boxid = self.box.make('remote_delattr:%s' % k) 1001 self.send(DELA, boxid, (objid, str(k))) 1002 get_raise(mbox, self)
1003
1004 - def remote_getattr(self, objid, k):
1005 prinfo('remote_getattr: %s . %s' % (S(objid), S(k))) 1006 mbox, boxid = self.box.make('remote_getattr:%s' % S(k)) 1007 self.send(GETA, boxid, (objid, str(k))) 1008 return get_raise(mbox, self)
1009 1010
1011 - def remote_call(self, objid, a, kw):
1012 prinfo('remote_call') 1013 mbox, boxid = self.box.make('remote_call:%s(...)' % objid) 1014 kwtuple = tuple(kw.items()) 1015 prinfo('kwtuple (RC)= %s' % S(kwtuple)) 1016 self.send(RCALL, boxid, (objid, a, kwtuple )) 1017 return get_raise(mbox, self)
1018
1019 - def remote_call_method(self, objid, aname, a, kw):
1020 prinfo('remote_call_method: %s . %s(...)' % (objid, aname)) 1021 mbox, boxid = self.box.make('remote_call_method:%s.%s(...)' % (objid, aname)) 1022 kwtuple = tuple(kw.items()) 1023 prinfo('kwtuple (RCM)= %s' % S(kwtuple)) 1024 self.send(RMCALL, boxid, (objid, str(aname), a, kwtuple )) 1025 prinfo('RCM sent') 1026 return get_raise(mbox, self)
1027
1028 - def remote_import_mod(self, name, path):
1029 """Import a module from the specified path, or, failing that, 1030 look in sys.path then for a builtin. 1031 If path is None, only look in the remote machine's sys.path and builtins. 1032 If path is an array containing None, replace the None with the remote machine's sys.path. 1033 """ 1034 # prinfo('remote_import_mod %s %s' % (name, path)) 1035 mbox, boxid = self.box.make('remote_import_mod:%s' % name) 1036 if path is not None: 1037 path = tuple(path) 1038 self.send(IMPORT_MOD, boxid, (str(name), path)) 1039 # prinfo('Waiting for response (rim) on %s' % boxid) 1040 return get_raise(mbox, self)
1041
1042 - def remote_globals(self):
1043 mbox, boxid = self.box.make('remote_globals') 1044 self.send(GLOBALS, boxid, None) 1045 # prinfo('Waiting for response (rg) on %s' % boxid) 1046 return get_raise(mbox, self)
1047
1048 - def remote_get_classinfo(self, cloid):
1049 """Gets information about obj.__class__.""" 1050 mbox, boxid = self.box.make('remote_get_classinfo:%s' % cloid) 1051 self.send(GET_CLASSINFO, boxid, cloid) 1052 prinfo('Waiting for response (rci) on %s' % S(boxid)) 1053 return get_raise(mbox, self)
1054
1055 - def serve(self):
1056 prinfo('Connection.serve()') 1057 self.s_listen_done.get() 1058 # self.s_listen.join() 1059 prinfo('Connection.serve() joined') 1060 # The other side normally initiates the termination with a 1061 # QUIT_1 or an ABORT. 1062 # We initiate it only on error. 1063 if self.reason is not None and self.reason.startswith('ERROR:'): 1064 self.send(ABORT, None, self.reason) 1065 self.terminate()
1066
1067 - def close(self):
1068 self.closed = True 1069 if self._gc is not None: 1070 self._gc.collect() 1071 time.sleep(0.1) 1072 self.send(QUIT_1, None, None) 1073 prinfo('Connection.close ready for join') 1074 # self.s_listen.join() 1075 self.s_listen_done.get() 1076 prinfo('Connection.close() joined.') 1077 self.terminate()
1078
1079 - def terminate(self):
1080 self.closed = True 1081 for t in T.enumerate(): 1082 prinfo('Thread name=%s' % t.getName()) 1083 del self.stderr 1084 del self.stdout 1085 del self.pickler 1086 del self.lowrp 1087 del self.rowlp 1088 del self.remote_special_objs 1089 del self.box 1090 del self.wfd 1091 del self.rfd 1092 if self._gc is not None: 1093 self._gc.collect()
1094 1095 1096
1097 - def __del__(self):
1098 if not self.closed: 1099 self.close()
1100 1101 1102 1103 1104 1105 1106
1107 -def transform_callable(onm):
1108 def tmp(self, *a, **kw): 1109 # prinfo('Transformed %s function' % onm) 1110 oid = object.__getattribute__(self, 'oid') 1111 conn = object.__getattribute__(self, 'conn') 1112 assert isinstance(conn, Connection) 1113 return conn.remote_call_method(oid, onm, a, kw)
1114 return tmp 1115 1116
1117 -def create_property(onm):
1118 def fget(self): 1119 oid = object.__getattribute__(self, 'oid') 1120 conn = object.__getattribute__(self, 'conn') 1121 return conn.remote_getattr(oid, onm)
1122 def fset(self, v): 1123 oid = object.__getattribute__(self, 'oid') 1124 conn = object.__getattribute__(self, 'conn') 1125 return conn.remote_setattr(oid, onm, v) 1126 def fdel(self): 1127 oid = object.__getattribute__(self, 'oid') 1128 conn = object.__getattribute__(self, 'conn') 1129 return conn.remote_delattr(oid, onm) 1130 return property(fget, fset, fdel) 1131 1132 1133
1134 -class proxy_to_a_class(proxy_object):
1135 - def __init__(self, oid, conn):
1136 object.__setattr__(self, 'oid', oid) 1137 object.__setattr__(self, 'conn', conn)
1138
1139 - def __setattr__(self, k, v):
1140 oid = object.__getattribute__(self, 'oid') 1141 conn = object.__getattribute__(self, 'conn') 1142 conn.remote_setattr(oid, k, v)
1143
1144 - def __delattr__(self, k):
1145 oid = object.__getattribute__(self, 'oid') 1146 conn = object.__getattribute__(self, 'conn') 1147 conn.remote_delattr(oid, k)
1148
1149 - def __getattribute__(self, k):
1150 if k == '__class__': 1151 return object.__getattribute__(self, k) 1152 oid = object.__getattribute__(self, 'oid') 1153 conn = object.__getattribute__(self, 'conn') 1154 return conn.remote_getattr(oid, k)
1155 1156
1157 - def __call__(self, *a, **kw):
1158 oid = object.__getattribute__(self, 'oid') 1159 conn = object.__getattribute__(self, 'conn') 1160 return conn.remote_call(oid, a, kw)
1161 1162
1163 -def class_under_proxied_instance(ocid, conn):
1164 assert isinstance(conn, Connection) 1165 NO_REM_GET = ('__class__',) 1166 nm, bsidlist, ocd_items = conn.remote_get_classinfo(ocid) 1167 b = [ cached_class_u_p_i(bsid, conn) for bsid in bsidlist ] 1168 prinfo('Proxyclass: bases done: %s' % S(b)) 1169 d = {} 1170 for (onm, is_callable) in ocd_items: 1171 prinfo('d_item %s = %s' % (S(onm), S(is_callable))) 1172 if is_callable: 1173 d[onm] = transform_callable(onm) 1174 else: 1175 d[onm] = create_property(onm) 1176 prinfo('Proxyclass: transforms/properties done') 1177 1178 def __init__(self, oid, conn): 1179 # proxyclass.__init__. Oid here is only the id of the object itself, 1180 # not the (object-id,class-id) tuple produced by Oid(). 1181 prinfo('proxyclass __init__ %s' % S(oid)) 1182 assert isinstance(conn, Connection) 1183 object.__setattr__(self, 'oid', oid) 1184 object.__setattr__(self, 'conn', conn)
1185 1186 def __setattr__(self, k, v): 1187 oid = object.__getattribute__(self, 'oid') 1188 conn = object.__getattribute__(self, 'conn') 1189 conn.remote_setattr(oid, k, v) 1190 1191 def __delattr__(self, k): 1192 oid = object.__getattribute__(self, 'oid') 1193 conn = object.__getattribute__(self, 'conn') 1194 conn.remote_delattr(oid, k) 1195 1196 def __getattribute__(self, k): 1197 if k in NO_REM_GET: 1198 return object.__getattribute__(self, k) 1199 oid = object.__getattribute__(self, 'oid') 1200 conn = object.__getattribute__(self, 'conn') 1201 return conn.remote_getattr(oid, k) 1202 1203 def __del__(self): 1204 oid = object.__getattribute__(self, 'oid') 1205 conn = object.__getattribute__(self, 'conn') 1206 if conn is not None: 1207 # conn is None after disconnection. 1208 # It is of no consequence, because the cached object 1209 # is being deleted on the other side anyway. 1210 conn.remote_remove_from_store( oid ) 1211 1212 d['__init__'] = __init__ 1213 d['__setattr__'] = __setattr__ 1214 d['__delattr__'] = __delattr__ 1215 d['__getattribute__'] = __getattribute__ 1216 d['__del__'] = __del__ 1217 return type(nm, tuple(b), d) 1218 1219 1220
1221 -def enh_exc_class_maker(ex_nm, ex_mod):
1222 nm = ex_nm + '_proxy_enhanced' 1223 log('Making class: %s.%s' % (ex_mod, nm)) 1224 basemod = import_mod(ex_mod, sys.path) 1225 b = ( getattr(basemod, ex_nm), ) 1226 1227 def __init__(self, args, trb): 1228 log('initializing base %s' % b[0].__name__, *args) 1229 b[0].__init__(self, *args) 1230 self.remote_traceback = trb 1231 log('initialized base %s' % b[0].__name__)
1232 1233 d = { '__init__': __init__ } 1234 if getattr(b[0], '__class__', None) is type(b[0]): # New style class 1235 tmp = type(nm, b, d) 1236 else: 1237 tmp = (types.ClassType)(nm, b, d) 1238 log('Constructed enh_exc_class', type(tmp)) 1239 return tmp 1240 1241
1242 -def construct_enhanced_exception(conn, ex_nm, ex_mod, ex_clid, ex_args, trb):
1243 assert isinstance(conn, Connection) 1244 try: 1245 enh_cl = conn.class_of_inst.get(ex_clid) 1246 except KeyError: 1247 enh_cl = enh_exc_class_maker(ex_nm, ex_mod) 1248 conn.class_of_inst.put(ex_clid, enh_cl) 1249 return enh_cl(ex_args, trb)
1250 1251
1252 -def test_ee():
1253 cl = enh_exc_class_maker('KeyError', 'exceptions') 1254 q = cl(('test',), ['r',]) 1255 try: 1256 raise q 1257 except KeyError, x: 1258 assert x.args[0]=='test' and x.remote_traceback[0]=='r' 1259 else: 1260 raise AssertionError, "Failure of enh_exc_class_maker"
1261 1262 1263 test_ee() 1264 1265
1266 -def cached_class_u_p_i(ocid, conn):
1267 assert isinstance(conn, Connection) 1268 try: 1269 tmp = conn.remote_special_objs[ocid] 1270 prinfo('From Cache: %s' % S(tmp)) 1271 return tmp 1272 except KeyError: 1273 pass 1274 try: 1275 return conn.class_of_inst.get(ocid) 1276 except KeyError: 1277 tmp = class_under_proxied_instance(ocid, conn) 1278 conn.class_of_inst.put(ocid, tmp) 1279 return tmp
1280 1281
1282 -def proxy_factory(oid, cloid, conn):
1283 assert isinstance(conn, Connection) 1284 prinfo('Proxy factory %s' % S(oid)) 1285 try: 1286 tmp = conn.rowlp.get(oid) 1287 prinfo('Proxy object found in rowlp cache: %s' % S(tmp)) 1288 return tmp 1289 except KeyError: 1290 pass 1291 if conn.remote_special_objs.get(cloid, None) is type: 1292 # We construct a class object, not an instance. 1293 # Don't use cached_proxyclass, because that may find an 1294 # proxy for this class that was created to underly an 1295 # proxy for instance of the class, 1296 # rather than a direct proxy for the class. 1297 # The two things are not the same! 1298 return proxy_to_a_class(oid, conn) 1299 1300 prinfo('proxy_factory step 2') 1301 prinfo('Proxy Factory cloid=%s' % S(cloid)) 1302 assert cloid is not None 1303 tmp = cached_class_u_p_i(cloid, conn) 1304 prinfo('proxyclass made: %s %s' % (S(tmp), S(tmp.__class__.__name__))) 1305 prinfo('dir(tmp)= %s' % S(dir(tmp))) 1306 prinfo('tmp.__class__= %s' % S(tmp.__class__)) 1307 prinfo('tmp.__dict__= %s' % 1308 ' '.join('%s=%s' % (S(k),S(v)) for (k,v) in tmp.__dict__.items()) 1309 ) 1310 prinfo('tmp.__class__ is callable? %s' % S(callable(tmp.__class__))) 1311 ins = tmp(oid, conn) 1312 prinfo('proxy instance made: %s' % S(tmp)) 1313 conn.rowlp.put(oid, ins) 1314 return ins
1315 1316 1317 1318
1319 -class remote_exec(object):
1320 try: 1321 import gc as _gc 1322 except ImportError: 1323 _gc = None 1324
1325 - def __init__(self, path, arglist, cname=None, printlock=None):
1326 if cname is not None: 1327 stderr = fancyprint(sys.stderr, 'E:%s:' % cname, lock=printlock) 1328 stdout = fancyprint(sys.stdout, 'O:%s:' % cname, lock=printlock) 1329 else: 1330 stderr = fancyprint(sys.stderr, lock=printlock) 1331 stdout = fancyprint(sys.stdout, lock=printlock) 1332 self.wfd, self.rfd = g_pipe.popen2(path, arglist) 1333 self.c = Connection(self.wfd, self.rfd, 1334 stderr=stderr, stdout=stdout)
1335
1336 - def import_mod(self, name, path=None):
1337 """Import a module from the specified path, or, failing that, 1338 look in sys.path then for a builtin. 1339 If path is None, only look in the remote machine's sys.path and builtins. 1340 If path is an array containing None, replace the None with the remote machine's sys.path. 1341 The default is None. 1342 """ 1343 return self.c.remote_import_mod(name, path)
1344
1345 - def test(self, i):
1346 return self.c.remote_test(i)
1347
1348 - def globals(self):
1349 return self.c.remote_globals()
1350
1351 - def close(self):
1352 if self.c is None: 1353 return 1354 prinfo( "Closing connection for remote exec" ) 1355 self.c.close() 1356 prinfo( "Closing remote exec - c closed" ) 1357 self.wfd.close() 1358 prinfo( "Closing remote exec - wfd closed" ) 1359 self.rfd.close() 1360 prinfo( "Closing remote exec - rfd closed" ) 1361 self.c = None 1362 if self._gc is not None: 1363 self._gc.collect()
1364 1365
1366 - def __del__(self):
1367 if self.c is not None: 1368 self.close()
1369 1370
1371 -def test_minimal():
1372 prinfo( '%TEST_MINIMAL %%%%%%%%%%%%%%%%%START') 1373 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] ) 1374 prinfo( '%%%%%%%%%%%%%%%%%%MID') 1375 re.close() 1376 prinfo( '%%%%%%%%%%%%%%%%%%END')
1377 1378
1379 -def test_simpleclass():
1380 prinfo( '%TEST_SIMPLECLASS ++++++++++++++++++++++++++' ) 1381 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] ) 1382 for (itest, t) in enumerate(TEST): 1383 if isinstance(t, test1): 1384 break 1385 pt1 = re.test(itest) 1386 prinfo( 'pt1.dict= %s' % repr( pt1.__dict__ )) 1387 prinfo( 'pt1.cl= %s' % repr( pt1.__class__)) 1388 prinfo( 'pt1.cl.d = %s' % repr( pt1.__class__.__dict__)) 1389 prinfo( 'pt1.ga = %s' % repr( pt1.__class__.__dict__['__getattribute__'])) 1390 prinfo( 'pt1.whatami=%s' % pt1.whatami) 1391 prinfo( '+++++++++++++++---------------') 1392 prinfo( 'pt1.w()= %s' % pt1.w() ) 1393 prinfo( '+++++++++++++++---------------********') 1394 prinfo( 'pt1.x= %s' % pt1.x ) 1395 assert pt1.x == TEST[itest].x 1396 prinfo('-----------2') 1397 assert TEST[itest] == pt1 1398 assert pt1 == TEST[itest] 1399 assert pt1.whatami == TEST[itest].whatami 1400 assert pt1.w() == TEST[itest].w() 1401 assert isinstance(pt1, proxy_object) 1402 assert pt1.__class__.__name__ == TEST[itest].__class__.__name__ 1403 re.close()
1404 1405
1406 -def test_minimalclass():
1407 prinfo( '%TEST_MINIMALCLASS +++++++++++++++++++++++++++' ) 1408 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] ) 1409 for (i, t) in enumerate(TEST): 1410 if isinstance(t, test0): 1411 pt1 = re.test(i) 1412 assert isinstance(pt1, proxy_object) 1413 prinfo( 'names: %s vs. %s' % (pt1.__class__.__name__, t.__class__.__name__)) 1414 assert pt1.__class__.__name__ == t.__class__.__name__ 1415 if isinstance(t, test0classic): 1416 pt1 = re.test(i) 1417 assert pt1.__dict__ == t.__dict__ 1418 assert pt1.__class__.__name__ == t.__class__.__name__ 1419 re.close()
1420 1421
1422 -def test_EQ():
1423 prinfo( '%TEST_EQ QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ') 1424 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] ) 1425 for (i, t) in enumerate(TEST): 1426 prinfo( 'TEST EQ %d' % i ) 1427 if isinstance(t, test0): 1428 break 1429 tmp = re.test(i) 1430 assert tmp == t 1431 prinfo('Passed test %d' % i) 1432 re.close()
1433
1434 -def test_sys():
1435 prinfo( '%TEST_SYS QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ') 1436 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] ) 1437 tmp = re.import_mod('sys') 1438 assert isinstance(sys.path, types.ListType) 1439 assert len(tmp.path) > 0 1440 assert len(sys.path) > 0 1441 ltp = len(tmp.path) 1442 tmp.path.append('.') 1443 tmp.path.insert(0, '.') 1444 assert len(tmp.path) == ltp+2 1445 assert tmp.path[0] == tmp.path[-1] 1446 assert 'sys' in tmp.modules 1447 # assert 'sys' in sys.modules 1448 # assert 'threading' in tmp.modules 1449 # assert 'threading' in sys.modules 1450 print tmp 1451 print 'R:', tmp.argv, sys.argv 1452 assert tmp.argv[-1].split('/')[-1] == 'g_proxy.py', 'tmp.argv= %s' % str(tmp.argv) 1453 assert tmp.argv != sys.argv 1454 re.close()
1455
1456 -def test_builtins():
1457 prinfo( '%TEST_BUILTINS QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ') 1458 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] ) 1459 tmp = re.import_mod('__builtin__') 1460 print tmp, tmp.__name__ 1461 q1 = getattr(tmp, 'abs') 1462 q = tmp.abs 1463 assert q is q1 1464 print q 1465 assert q(3) == 3 1466 assert q(-3) == 3 1467 re.close()
1468 1469
1470 -def test_exit():
1471 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] ) 1472 raise SystemExit() 1473 re.close()
1474 1475
1476 -def test_globals():
1477 prinfo('%TEST_GLOBALS*******************************') 1478 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] ) 1479 tmp = re.globals() 1480 tmp['x'] = 3 1481 assert tmp['x'] == 3 1482 foo = test0() 1483 tmp['x'] = foo 1484 prinfo( 'Type(foo)= %s; type(tmp[x])=%s' % (str(type(foo)), str(type(tmp['x'])))) 1485 # assert type(foo) == type(tmp['x']) 1486 re.close()
1487 1488
1489 -def test_file():
1490 prinfo('%TEST_FILE $$$$$$$$$$$$$$$$$$') 1491 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] ) 1492 tmp = re.import_mod('__builtin__') 1493 # print 'tmp=', tmp 1494 # print 'tmp.__class__=', tmp.__class__ 1495 # print 'tmp.__class__.__dict__=', tmp.__class__.__dict__ 1496 # print 'tcdk=', tmp.__class__.__dict__.keys() 1497 # print 'dir(tmp)=', dir(tmp) 1498 rfd = tmp.open('/dev/null', 'r') 1499 assert rfd.readline() == '' 1500 opn = tmp.open 1501 rfd = opn('/dev/null', 'r') 1502 assert rfd.readline() == '' 1503 wfd = opn('/tmp/fooproxytest', 'w') 1504 wfd.write('xyzzy\n') 1505 wfd.close() 1506 rfd = opn('/tmp/fooproxytest', 'r') 1507 assert rfd.readline() == 'xyzzy\n' 1508 re.close()
1509 1510
1511 -def test_numpy():
1512 prinfo('%TEST_numpy !+!+!+!+!+!+!+!+!+!+!+!+!+!+!+!+!+!+!') 1513 import numpy 1514 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] ) 1515 q = numpy.array((6,7,8)) 1516 tmp = re.import_mod('numpy') 1517 print tmp.sum(q) 1518 print tmp.array((1,2,3))[1] 1519 re.close()
1520
1521 -def test_exception():
1522 prinfo('%TEST_EXCEPTION !!!!!!!!!!!!!!!!!!!') 1523 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] ) 1524 tmp = re.import_mod('sys') 1525 try: 1526 sys.foobar 1527 except AttributeError, x: 1528 prinfo('OK: x=%s' % str(x)) 1529 else: 1530 raise AssertionError, 'Wrong exception!' 1531 del tmp 1532 prinfo('-o-o-o-o-o-o-o-o-o') 1533 rmath = re.import_mod('math') 1534 try: 1535 rmath.sqrt(-1) 1536 except ValueError, x: 1537 assert 'domain' in str(x) 1538 else: 1539 raise AssertionError, 'Bad Exception!' 1540 re.close() 1541 prinfo('-c-c-c-c-c-c-c-c-c')
1542 1543
1544 -def server(name):
1545 sys.stdout.flush() 1546 sys.stderr.flush() 1547 1548 tmp = sys.stdout 1549 if DEBUG: 1550 # sys.stdout = sys.stderr 1551 sys.stdout = open('foo.out', 'w') 1552 sys.stderr = open('foo.err', 'w') 1553 # sys.stderr = io_labeler('stderr') 1554 else: 1555 sys.stdout = io_grabber(STDOUT) 1556 # sys.stderr = open('foo.err', 'w') 1557 sys.stderr = io_grabber(STDERR) 1558 assert sys.stdout is not tmp 1559 c = Connection(sys.stdout, sys.stdin, stdout=None, stderr=None) 1560 if isinstance(sys.stdout, io_grabber): 1561 sys.stdout.connect(c) 1562 if isinstance(sys.stderr, io_grabber): 1563 sys.stderr.connect(c) 1564 1565 prinfo('Connection started [main thread]') 1566 c.serve() 1567 prinfo('Connection served [main thread]') 1568 sys.stdout.close()
1569 1570 1571 if __name__ == '__main__': 1572 arglist = sys.argv[1:] 1573 while arglist and arglist[0].startswith('-'): 1574 arg = arglist.pop(0) 1575 if arg == '--': 1576 break 1577 elif arg == '-test': 1578 _logfd = open('g_proxy.client', 'w') 1579 # test_numpy() 1580 # sys.exit(0) 1581 # test_exception() 1582 test_minimal() 1583 test_minimalclass() 1584 sys.setcheckinterval(1) 1585 test_simpleclass() 1586 sys.setcheckinterval(3) 1587 test_simpleclass() 1588 sys.setcheckinterval(9) 1589 test_simpleclass() 1590 sys.setcheckinterval(27) 1591 test_simpleclass() 1592 sys.setcheckinterval(81) 1593 test_simpleclass() 1594 sys.setcheckinterval(300) 1595 test_simpleclass() 1596 sys.setcheckinterval(1000) 1597 test_simpleclass() 1598 test_EQ() 1599 test_builtins() 1600 test_sys() 1601 # test_exit() 1602 test_globals() 1603 test_file() 1604 test_exception() 1605 sys.exit(0) 1606 else: 1607 print 'Unrecognized flag: %s' % arg 1608 sys.exit(1) 1609 if arglist: 1610 name = arglist[0] 1611 _logfd = open('g_proxy.client', 'w') 1612 else: 1613 name = 'server' 1614 _logfd = open('g_proxy.server', 'w') 1615 server(name) 1616 else: 1617 _logfd = open('g_proxy.client', 'w') 1618