1
2
3 """This will behave badly under isinstance().
4 if isinstance(x,y) probably not isinstance(remote_x, y).
5 Also, dicts are passed by value instead of by reference,
6 so if you remotely modify a dict, you will not see the modification
7 locally.
8 """
9
10
11
12 import weakref
13 import types
14 import sys
15 import cPickle
16 import g_threading as T
17 import imp
18 import StringIO
19 import time
20 import traceback
21 import g_pipe
22 import g_mailbox as GM
23 import os
24
25 __version__ = '0.23'
26 DEBUG = 2
27
28 LOGFILE = './g_proxy.client'
29
30
31 Id = id
32
33 TRANSPORTABLE_TYPES = (
34
35 int, float, complex, tuple, str, long,
36 types.NoneType, types.BooleanType,
37
38 types.ClassType, types.InstanceType, frozenset,
39 set,
40
41 dict
42 )
43 TRANSPORTABLE_TYPES_O = frozenset( [ Id(q) for q in TRANSPORTABLE_TYPES] + [Id(object)] )
44
45 NEVER, ALWAYS, MAYBE = 0, 1, 2
46 _Names = ['INITIALIZATION', 'GET_CLASSINFO', 'R_TEST', 'RM_STORE',
47 'SETA', 'SETAP',
48 'DELA', 'GETA', 'RCALL', 'RMCALL', 'IMPORT_MOD', 'GLOBALS',
49 'STDOUT', 'STDERR', 'VALUE', 'EXCEPTION',
50 'QUIT_1', 'QUIT_2', 'ABORT'
51 ]
52
53 NO_CI_ATTR = frozenset(['__dict__', '__class__', '__weakref__', '__init__',
54 '__getattribute__', '__setattr__', '__delattr__',
55 '__del__',
56 '__new__'
57 ]
58 )
59
60
64
65 _loglock = T.Lock()
66 _logfd = None
80
81
84
87
89 whatami = 'I am the class'
90
92 self.x = x
93 self.whatami = 'I am the instance'
94
96 return self.x == other.x and hasattr(self, '__eq__') and hasattr(other, '__eq__')
97
101
102 assert test1(3) == test1(3)
103
105 whatami = 'I am the class'
106
108 self.x = x
109 self.whatami = 'I am the instance'
110
112 return self.x == other.x and hasattr(self, '__eq__') and hasattr(other, '__eq__')
113
117 assert test1classic(3) == test1classic(3)
118
119
120 TEST = tuple( [
121 (None, False, 1, 2.0, frozenset([33,11]), (6,), 'seven', {8:9}, {9:{}}),
122 complex(3.0, 4.0),
123 ( set([10]), ),
124 test1(3),
125 test1classic(3),
126 "end of equality test",
127
128
129 test0(),
130 test0classic(),
131 sys,
132 (Exception(), Exception(5))
133 ])
134
135
138
141
142
143 if DEBUG:
145 return '%s%d' % (str(type(o)), Id(o))
147 if s == '':
148 return None
149 return s
150
151
152
156 if isinstance(s, proxy_object):
157 return '<REMOTE %s>' % id(s)
158 return str(s)
159
160 for tmp in _Names:
161 globals()[tmp] = tmp
162 del tmp
163 else:
165 return '%d%d' % (Id(type(o)), Id(o))
167 if s == '':
168 return None
169 return s
174 for (i, tmp) in enumerate(_Names):
175 globals()[tmp] = i + 100
176 del tmp
177 del i
178 del _Names
179
180
183 self.store = storetype()
184 self.lock = T.Lock(verbose=DEBUG>1, name='object_store')
185
186
187 - def put(self, xid, x):
188
189 self.lock.acquire()
190 self.store[xid] = x
191
192 self.lock.release()
193
194 return xid
195
197 return self.put(Oid(x), x)
198
199
205
206 - def get(self, xid):
207 self.lock.acquire()
208 try:
209 tmp = self.store[xid]
210 finally:
211 self.lock.release()
212 return tmp
213
214
216 """This is only called for the rowlp case,
217 and the object store is a WeakDictionary, so
218 if we encounter an object, we know that something
219 else references it. Consequently, we cannot
220 depend on it being deleted and calling its __del__ item
221 any time soon.
222 """
223 while self.store:
224 self.lock.acquire()
225 oid, v = self.store.popitem()
226 try:
227 object.__getattribute__(v, 'conn')
228 except AttributeError:
229 pass
230 else:
231 object.__setattr__(v, 'conn', None)
232 self.lock.release()
233
234
239
240
241
243 ty, ob, trb = sys.exc_info()
244 if ty is None:
245 log('Whoops: deal_with_arbitrary_exception', (ty, ob))
246 tmp = tuple(traceback.format_tb(trb))
247 del trb
248 log('INFO: deal_with... %s.%s' % (ty.__module__, ty.__name__), *(ob.args))
249 log('INFO: deal_with... traceback:', *tuple(tmp))
250 c.send(EXCEPTION, boxid, (ty.__name__, ty.__module__,
251 Oid(ty), ob.args, tmp
252 ),
253 proxy=NEVER
254 )
255 del ob
256
257
258
260 IDLE = 0
261 BUSY = 1
262 STARTING = 2
263
265 self.lock = T.Condition(verbose=DEBUG>1, name='threadstate')
266 self.state = {}
267
268 self.expected_stops = 0
269
271 self.lock.acquire()
272 self.state[me] = self.IDLE
273 prinfo('to_idle: %d %s' % (id(me), me))
274 self._print()
275 self.lock.notify()
276 self.lock.release()
277
279 self.lock.acquire()
280 prinfo('to_busy: %d %s' % (id(me), me))
281 self.state[me] = self.BUSY
282 self._print()
283 self.lock.notify()
284 self.lock.release()
285
287 self.lock.acquire()
288 prinfo('to_stopped: %d %s' % (id(me), me))
289
290 ime = me
291
292
293
294
295
296 if self.expected_stops > 0:
297 self.expected_stops -= 1
298 del self.state[ime]
299 self._print()
300 self.lock.notify()
301 self.lock.release()
302
304 self.lock.acquire()
305 prinfo('to_starting: %d %s' % (id(me), me))
306 self.state[me] = self.STARTING
307 self._print()
308 self.lock.notify()
309 self.lock.release()
310
312 """Call within lock."""
313 prinfo('threadstate:')
314 m = {self.IDLE: 'idle', self.BUSY: 'busy', self.STARTING: 'starting'}
315 for (k, v) in self.state.items():
316 prinfo('\t%d : %s' % (k, m[v]))
317
319 prinfo('anticpate_stop')
320 self.lock.acquire()
321 self.expected_stops += 1
322 self.lock.release()
323
325 self.lock.acquire()
326 n = 0
327 for s in self.state.values():
328 if s == self.BUSY:
329 n += 1
330 self.lock.release()
331 return n
332
334 self.lock.acquire()
335 n = -self.expected_stops
336 for s in self.state.values():
337 if s == self.IDLE:
338 n += 1
339 self.lock.release()
340 return max(0, n)
341
342
344 self.lock.acquire()
345 tmp = self.state.keys()
346 self.lock.release()
347 return tmp
348
350 self.lock.acquire()
351 tmp = []
352 for (idme,s) in self.state.items():
353 if s == self.BUSY:
354 tmp.append(idme)
355 self.lock.release()
356 return tmp
357
359 """Called from main thread."""
360 self.lock.acquire()
361
362
363
364 while len(self.state) > 0:
365 prinfo('WUID: waiting: %s' % str(self.state))
366 self.lock.wait()
367
368
369
370 self.lock.release()
371
373 """Called from main thread."""
374 pass
375
376
378 me = T.get_ident()
379 prinfo('thread_within_listen starting')
380 while True:
381 threadstate.to_idle(me)
382 try:
383
384 info = inbox.get()
385 threadstate.to_busy(me)
386
387 except GM.EOF:
388
389 threadstate.to_stopped(me)
390 return
391 except:
392 threadstate.to_stopped(me)
393 raise
394
395
396
397
398 op, boxid, interior = info
399 arg = c.pickler.load(interior)
400 prinfo('TWI %s %s %s' % (S(op), S(boxid), S(arg)))
401
402 if op == INITIALIZATION:
403 v, oOidlist = arg
404 assert v == __version__
405 assert c.remote_special_objs is None
406 c.remote_special_objs = dict(zip(oOidlist,
407 [ proxy_object, type ]
408 ))
409 c.s_listen_ready.put(None)
410 elif op == QUIT_1:
411 qc = 0
412 while c._gc is not None and qc < 4:
413 c._gc.collect()
414 time.sleep(0.1)
415
416 if threadstate.n_busy() <= 1:
417 qc += 1
418 else:
419 prinfo('Busy - waiting: %s' % S(threadstate.busylist()))
420 c.send(QUIT_2, None, None)
421 c.reason = 'QUIT'
422
423 elif op == STDOUT:
424 if c.stdout is not None:
425 c.stdout.fprint(arg)
426 elif op == STDERR:
427 if c.stderr is not None:
428 c.stderr.fprint(arg)
429
430 elif op == VALUE:
431
432 mbox = c.box.find(boxid, 'value')
433
434 mbox.put( (op, arg) )
435 elif op == EXCEPTION:
436 mbox = c.box.find(boxid, 'Exception')
437 mbox.put( (op, arg) )
438
439
440
441 elif op == RM_STORE:
442 c.lowrp.rm(arg)
443 elif op == GETA:
444 objid, k = arg
445 try:
446 value = getattr(c.lowrp.get(objid), k)
447 except:
448 deal_with_arbitrary_exception(c, boxid)
449 else:
450 c.send(VALUE, boxid, value)
451
452 elif op == R_TEST:
453 c.send(VALUE, boxid, TEST[arg])
454 elif op == GET_CLASSINFO:
455 clobj = c.lo_clo.get(arg)
456 try:
457 bs = clobj.__bases__
458 except AttributeError:
459 bs = None
460 if bs is not None:
461 bs = tuple( [ c.lo_clo.putid(b) for b in bs ] )
462 try:
463 dct = tuple( clobj.__dict__.items() )
464 except AttributeError:
465 dct = None
466 if dct is not None:
467 dct = tuple( [ (k, callable(v))
468 for (k,v) in getattr(clobj, '__dict__', {}).items()
469 if k not in NO_CI_ATTR
470 ] )
471 tmp = (clobj.__name__, bs, dct)
472 c.send(VALUE, boxid, tmp, proxy=NEVER)
473 elif op in (SETA, SETAP):
474 objid, attr, value = arg
475 if op == SETAP:
476 value = c.lowrp.get(value)
477 try:
478 setattr(c.lowrp.get(objid), attr, value)
479 except:
480 deal_with_arbitrary_exception(c, boxid)
481 else:
482 c.send(VALUE, boxid, None, proxy=NEVER)
483 elif op == DELA:
484 objid, attr = arg
485 try:
486 delattr(c.lowrp.get(objid), attr)
487 except:
488 deal_with_arbitrary_exception(c, boxid)
489 else:
490 c.send(VALUE, boxid, None, proxy=NEVER)
491 elif op in (RCALL, RMCALL):
492 if op == RCALL:
493 objid, args, kwpairs = arg
494 obj = c.lowrp.get(objid)
495 else:
496 objid, aname, args, kwpairs = arg
497 obj = getattr(c.lowrp.get(objid), aname)
498
499 kwargs = dict(kwpairs)
500
501 assert isinstance(args, tuple)
502 assert isinstance(kwargs, dict)
503 assert hasattr(obj, '__call__')
504 try:
505 value = obj(*args, **kwargs)
506 except:
507 log('R[M]CALL %s obj=%s' % (str(op), str(obj)), str(args), str(kwargs))
508 deal_with_arbitrary_exception(c, boxid)
509 else:
510 c.send(VALUE, boxid, value)
511 elif op == IMPORT_MOD:
512 name, path = arg
513 prinfo('import mod in twi: %s %s' % (S(name), S(path)))
514 try:
515 value = import_mod(name, path)
516 except:
517 deal_with_arbitrary_exception(c, boxid)
518 else:
519 prinfo('imported module %s' % S(value))
520 c.send(VALUE, boxid, value, proxy=ALWAYS)
521 elif op == GLOBALS:
522 value = globals()
523 c.send(VALUE, boxid, value, proxy=ALWAYS)
524 else:
525 threadstate.to_stopped(me)
526 raise ProtocolError, "recv op=%s" % op
527
528
529
530
531
532
533
535 try:
536 thread_within_listen(mbox, threadstate, conn)
537 except GM.EOF:
538 pass
539 except:
540 log('Exception in thread_within_listen:', traceback.format_exc())
541 sys.exit(2)
542
543
544
546 """This buffers text, transmitting it in units of lines.
547 That's important at the other end if there are multiple
548 connections, because fancyprint adds a prefix at the
549 beginning of each write, assuming it is the beginning
550 of each line."""
551
553 log('iograbber.__init__')
554 self.name = name
555 self.c = None
556 self.tmp = []
557 self.tinl = []
558 self.lock = T.Lock(verbose=DEBUG>1, name='io_grabber')
559
561 log('iograbber.connect')
562 self.c = c
563 self.lock.acquire()
564 self._dump()
565 self.lock.release()
566
567
571
572
574 log('writelines:', x)
575 if isinstance(x, str):
576 x = (x,)
577 self.lock.acquire()
578 for s in x:
579 if s:
580 try:
581 i = s.rindex('\n')
582 self.tmp.append( s[:i+1] )
583 self.tinl.append( len(self.tmp) )
584 if i+1 < len(s):
585 self.tmp.append( s[i+1:] )
586 except ValueError:
587 self.tmp.append(s)
588 if self.c is not None:
589 self._dump()
590 self.lock.release()
591
592
594 log('_dump:')
595 j = 0
596 for i in self.tinl:
597 try:
598 jis = ''.join(self.tmp[j:i])
599 self.c.send(self.name, None, (jis,), proxy=NEVER)
600 j = i
601 except IOError:
602 pass
603 self.tinl = []
604
605
607 log('_flush:')
608 if self.c is not None and self.tmp:
609 try:
610 self.c.send(self.name, None, tuple(self.tmp), proxy=NEVER)
611 except IOError:
612 pass
613 self.tmp = []
614 self.tinl = []
615
616
619
620
621
622
623
626 self.name = name
627 self.rstderr = sys.stderr
628
633
635 self.rstderr.write('E')
636 self.rstderr.write(self.name)
637 self.rstderr.write(':')
638 self.rstderr.write(s)
639 self.rstderr.flush()
640
643
644
646 """Import a module from the specified path, or, failing that,
647 look in sys.path then for a builtin.
648 If path is None, only look in sys.path and builtins.
649 If path is an array containing None, replace the None with sys.path.
650 """
651
652
653 if path is None:
654 pth = None
655 else:
656 pth = []
657 for d in path:
658 if d is None:
659 pth.extend(sys.path)
660 else:
661 pth.append( d )
662 fd = None
663 imp.acquire_lock()
664 try:
665 fd, pn, desc = imp.find_module(name, pth)
666 except ImportError:
667 try:
668 fd, pn, desc = imp.find_module(name, None)
669 except:
670 imp.release_lock()
671 raise
672
673 if name in sys.modules:
674 if hasattr(sys.modules[name], '__file__'):
675 if os.path.dirname(sys.modules[name].__file__) == os.path.dirname(pn):
676 imp.release_lock()
677 return sys.modules[name]
678 else:
679 imp.release_lock()
680 return sys.modules[name]
681
682 try:
683 pymod = imp.load_module(name, fd, pn, desc)
684 finally:
685 if fd:
686 fd.close()
687 imp.release_lock()
688 return pymod
689
690 assert import_mod('sys', None).path
691
692
694 """This reads answers and puts them in the correct mailboxes."""
695 prinfo('Starting s_listen')
696
697 MAXIDLE = 6
698 twibox = GM.mailbox()
699 threadstate = C_threadstate()
700
701 c.send(INITIALIZATION, None,
702 (__version__,
703 (Oid(object), Oid(type))
704 )
705 )
706 c.reason = None
707 while True:
708 prinfo('s_listen LOOP')
709 try:
710 op, boxid, interior = cPickle.load(c.rfd)
711 except cPickle.UnpicklingError:
712 prinfo('Unpickling Error - bad instruction from other side')
713 c.reason = 'ERROR: Unpickling'
714 break
715 except EOFError:
716 prinfo('EOF in s_listen')
717 c.reason = 'EOF'
718 break
719 prinfo('s_listen op=%s, boxid=%s' % (S(op), S(boxid)))
720
721 if op in (ABORT, QUIT_2):
722 c.reason = op
723 break
724
725 n_idle = threadstate.n_idle()
726 if n_idle == 0:
727 prinfo('s_listen: add one more thread')
728 t = T.start_new_thread(thread_within_listen_wrapper,
729 (twibox, threadstate, c)
730 )
731 GM.Lockstate.setname(me=t, name='twl%d' % t)
732
733
734
735
736 threadstate.to_starting(t)
737
738
739 elif n_idle > MAXIDLE:
740 prinfo('s_listen: shutting down one thread')
741
742 threadstate.anticipate_stop()
743 twibox.putclose(for_get=GM.EOF(), ngx=1)
744
745 twibox.put( ( op, boxid, interior) )
746 threadstate.reap_stopped_threads()
747
748 prinfo('s_listen shutdown')
749 twibox.putclose()
750 del twibox
751 prinfo('waiting for twi_threads')
752 threadstate.wait_until_all_done()
753 prinfo('twi_threads joined')
754 c.close_wfd()
755 c.lowrp.clear()
756 c.rowlp.disconnect()
757 c.class_of_inst.disconnect()
758 c.lo_clo.disconnect()
759 prinfo('S_listen DONE - putting reason=%s' % c.reason)
760
761
762
764 try:
765 s_listen(c)
766 s_listen_done.put('OK')
767 except:
768 log('Exception in s_listen', traceback.format_exc())
769 s_listen_done.put('exception')
770 sys.exit(2)
771
772
775 self.lock = T.Lock(verbose=DEBUG>1, name='dropsite')
776 self.boxes = {}
777
778 - def make(self, dbg=''):
779 self.lock.acquire()
780 wbox = GM.waitbox()
781 boxid = '%s#%d' % (dbg, Id(wbox))
782 self.boxes[boxid] = wbox
783 self.lock.release()
784 return (wbox, boxid)
785
786
787 - def find(self, boxid, dbg=None):
788 """For internal use only."""
789 assert dbg is not None
790
791 self.lock.acquire()
792 tmp = self.boxes.pop(boxid)
793 self.lock.release()
794
795 return tmp
796
798 self.lock.acquire()
799 tmp = len(self.boxes)
800 self.lock.release()
801 return tmp
802
803
804
805
807 - def __init__(self, fd, prefix='', lock=None):
808 if lock is None:
809 self.lock = T.Lock(verbose=DEBUG>1, name='fancyprint')
810 else:
811 self.lock = lock
812 self.fd = fd
813 self.prefix = prefix
814
816 assert isinstance(s, tuple)
817 self.lock.acquire()
818 if self.fd is not None:
819 for ls in s:
820 self.fd.writelines( (self.prefix, ls) )
821 self.fd.flush()
822 self.lock.release()
823
824
826
827 op, rv = mbox.get()
828 if op == EXCEPTION:
829 ex_name, ex_mod, ex_clid, ex_args, trb = rv
830 log('get_raise: trb: %s.%s' % (ex_mod, ex_name), *tuple(trb))
831 tmp = construct_enhanced_exception(conn, ex_name, ex_mod,
832 ex_clid, ex_args, trb)
833 log('get_raise: c_e_e done', str(tmp), tmp.args, tmp.remote_traceback)
834 raise tmp
835 elif op == VALUE:
836 pass
837 else:
838 log('get_raise: unknown op: %s' % str(op))
839 raise RuntimeError, 'bad op in get_raise'
840 return rv
841
842
845 assert isinstance(conn, Connection)
846 self.conn = conn
847 self.check = hash(self)
848
849 - def dump(self, obj, proxy=NEVER):
850 buf = StringIO.StringIO()
851 p = cPickle.Pickler(buf, cPickle.HIGHEST_PROTOCOL)
852 if proxy == ALWAYS:
853 p.persistent_id = self.oid
854 elif proxy == MAYBE:
855 p.persistent_id = self.persistent_id
856 p.dump(obj)
857 return buf.getvalue()
858
859
861 assert hash(self) == self.check
862 if isinstance(obj, TRANSPORTABLE_TYPES):
863 rv = None
864 elif Id(obj) in TRANSPORTABLE_TYPES_O:
865 rv = None
866 else:
867 rv = self.oid(obj)
868 prinfo('persistent_id (pid) for [%s]%s = %s' % (S(obj), S(type(obj)), S(rv)))
869 return rv
870
871
872 - def oid(self, obj):
873 tmp = self.conn.lowrp.putid(obj)
874 try:
875 clobj = obj.__class__
876 except AttributeError:
877 clobj = None
878 if clobj is not None:
879 cloid = self.conn.lo_clo.putid(clobj)
880 else:
881 cloid = ''
882 rv = '%s~%s' % (str(tmp), str(cloid))
883 prinfo('persistent_id (oid) for [%s]%s = %s' % (S(obj), S(type(obj)), S(rv)))
884 return rv
885
886
894
895
897 prinfo('beginning persistent_load on %s' % S(oidstr))
898 assert hash(self) == self.check
899 oid, cloid = oidstr.split('~')
900 return proxy_factory(diO(oid), diO(cloid), self.conn)
901
902
903
904
906 """You need to call close()."""
907 sys = sys
908 try:
909 import gc as _gc
910 except ImportError:
911 _gc = None
912
913 - def __init__(self, wfd, rfd, stderr, stdout):
914 self.stderr = stderr
915 self.stdout = stdout
916 self.pickler = pickler(self)
917
918
919 self.lowrp = object_store(dict)
920
921 self.rowlp = object_store(weakref.WeakValueDictionary)
922 self.class_of_inst = object_store(weakref.WeakValueDictionary)
923 self.lo_clo = object_store(weakref.WeakValueDictionary)
924
925 self.qmlock = T.Lock(verbose=DEBUG>1, name='Connection.qmlock')
926 self.remote_special_objs = None
927 self.box = dropsite()
928 self.wfd = wfd
929 self.rfd = rfd
930 self.slock = T.Lock(verbose=DEBUG>1, name='Connection.slock')
931 self.closed = False
932 self.reason = None
933 self.s_listen_ready = GM.waitbox()
934 self.s_listen_done = GM.waitbox()
935
936
937
938
939
940 t = T.start_new_thread(s_listen_wrapper, (self, self.s_listen_done))
941 GM.Lockstate.setname(me=t, name='s_listen_wrapper')
942
943
944
945
946
947 self.s_listen_ready.get()
948
949
950 - def send(self, op, boxid, args, proxy=MAYBE):
951
952 interior = self.pickler.dump( args, proxy=proxy )
953 tmp = cPickle.dumps( (op, boxid, interior) )
954
955 self.slock.acquire()
956 try:
957 self.wfd.write(tmp)
958 self.wfd.flush()
959 finally:
960 self.slock.release()
961
962
964 """Called as part of the QUIT sequence."""
965 self.slock.acquire()
966 try:
967 self.wfd.close()
968 finally:
969 self.slock.release()
970 self.wfd = None
971
973 try:
974 self.send(RM_STORE, None, oid)
975 except IOError:
976
977
978
979
980
981 prinfo('rm from store - IO error')
982 pass
983
985 mbox, boxid = self.box.make('remote_test%d' % i)
986 self.send(R_TEST, boxid, i)
987 return get_raise(mbox, self)
988
990
991 mbox, boxid = self.box.make('remote_setattr:%s' % k)
992 try:
993 self.send(SETA, boxid, (objid, str(k), v))
994 except cPickle.PicklingError:
995 self.send(SETAP, boxid, (objid, str(k), self.lowrp.putid(v)))
996 get_raise(mbox, self)
997
999
1000 mbox, boxid = self.box.make('remote_delattr:%s' % k)
1001 self.send(DELA, boxid, (objid, str(k)))
1002 get_raise(mbox, self)
1003
1005 prinfo('remote_getattr: %s . %s' % (S(objid), S(k)))
1006 mbox, boxid = self.box.make('remote_getattr:%s' % S(k))
1007 self.send(GETA, boxid, (objid, str(k)))
1008 return get_raise(mbox, self)
1009
1010
1012 prinfo('remote_call')
1013 mbox, boxid = self.box.make('remote_call:%s(...)' % objid)
1014 kwtuple = tuple(kw.items())
1015 prinfo('kwtuple (RC)= %s' % S(kwtuple))
1016 self.send(RCALL, boxid, (objid, a, kwtuple ))
1017 return get_raise(mbox, self)
1018
1020 prinfo('remote_call_method: %s . %s(...)' % (objid, aname))
1021 mbox, boxid = self.box.make('remote_call_method:%s.%s(...)' % (objid, aname))
1022 kwtuple = tuple(kw.items())
1023 prinfo('kwtuple (RCM)= %s' % S(kwtuple))
1024 self.send(RMCALL, boxid, (objid, str(aname), a, kwtuple ))
1025 prinfo('RCM sent')
1026 return get_raise(mbox, self)
1027
1029 """Import a module from the specified path, or, failing that,
1030 look in sys.path then for a builtin.
1031 If path is None, only look in the remote machine's sys.path and builtins.
1032 If path is an array containing None, replace the None with the remote machine's sys.path.
1033 """
1034
1035 mbox, boxid = self.box.make('remote_import_mod:%s' % name)
1036 if path is not None:
1037 path = tuple(path)
1038 self.send(IMPORT_MOD, boxid, (str(name), path))
1039
1040 return get_raise(mbox, self)
1041
1043 mbox, boxid = self.box.make('remote_globals')
1044 self.send(GLOBALS, boxid, None)
1045
1046 return get_raise(mbox, self)
1047
1049 """Gets information about obj.__class__."""
1050 mbox, boxid = self.box.make('remote_get_classinfo:%s' % cloid)
1051 self.send(GET_CLASSINFO, boxid, cloid)
1052 prinfo('Waiting for response (rci) on %s' % S(boxid))
1053 return get_raise(mbox, self)
1054
1056 prinfo('Connection.serve()')
1057 self.s_listen_done.get()
1058
1059 prinfo('Connection.serve() joined')
1060
1061
1062
1063 if self.reason is not None and self.reason.startswith('ERROR:'):
1064 self.send(ABORT, None, self.reason)
1065 self.terminate()
1066
1068 self.closed = True
1069 if self._gc is not None:
1070 self._gc.collect()
1071 time.sleep(0.1)
1072 self.send(QUIT_1, None, None)
1073 prinfo('Connection.close ready for join')
1074
1075 self.s_listen_done.get()
1076 prinfo('Connection.close() joined.')
1077 self.terminate()
1078
1080 self.closed = True
1081 for t in T.enumerate():
1082 prinfo('Thread name=%s' % t.getName())
1083 del self.stderr
1084 del self.stdout
1085 del self.pickler
1086 del self.lowrp
1087 del self.rowlp
1088 del self.remote_special_objs
1089 del self.box
1090 del self.wfd
1091 del self.rfd
1092 if self._gc is not None:
1093 self._gc.collect()
1094
1095
1096
1098 if not self.closed:
1099 self.close()
1100
1101
1102
1103
1104
1105
1106
1114 return tmp
1115
1116
1122 def fset(self, v):
1123 oid = object.__getattribute__(self, 'oid')
1124 conn = object.__getattribute__(self, 'conn')
1125 return conn.remote_setattr(oid, onm, v)
1126 def fdel(self):
1127 oid = object.__getattribute__(self, 'oid')
1128 conn = object.__getattribute__(self, 'conn')
1129 return conn.remote_delattr(oid, onm)
1130 return property(fget, fset, fdel)
1131
1132
1133
1161
1162
1164 assert isinstance(conn, Connection)
1165 NO_REM_GET = ('__class__',)
1166 nm, bsidlist, ocd_items = conn.remote_get_classinfo(ocid)
1167 b = [ cached_class_u_p_i(bsid, conn) for bsid in bsidlist ]
1168 prinfo('Proxyclass: bases done: %s' % S(b))
1169 d = {}
1170 for (onm, is_callable) in ocd_items:
1171 prinfo('d_item %s = %s' % (S(onm), S(is_callable)))
1172 if is_callable:
1173 d[onm] = transform_callable(onm)
1174 else:
1175 d[onm] = create_property(onm)
1176 prinfo('Proxyclass: transforms/properties done')
1177
1178 def __init__(self, oid, conn):
1179
1180
1181 prinfo('proxyclass __init__ %s' % S(oid))
1182 assert isinstance(conn, Connection)
1183 object.__setattr__(self, 'oid', oid)
1184 object.__setattr__(self, 'conn', conn)
1185
1186 def __setattr__(self, k, v):
1187 oid = object.__getattribute__(self, 'oid')
1188 conn = object.__getattribute__(self, 'conn')
1189 conn.remote_setattr(oid, k, v)
1190
1191 def __delattr__(self, k):
1192 oid = object.__getattribute__(self, 'oid')
1193 conn = object.__getattribute__(self, 'conn')
1194 conn.remote_delattr(oid, k)
1195
1196 def __getattribute__(self, k):
1197 if k in NO_REM_GET:
1198 return object.__getattribute__(self, k)
1199 oid = object.__getattribute__(self, 'oid')
1200 conn = object.__getattribute__(self, 'conn')
1201 return conn.remote_getattr(oid, k)
1202
1203 def __del__(self):
1204 oid = object.__getattribute__(self, 'oid')
1205 conn = object.__getattribute__(self, 'conn')
1206 if conn is not None:
1207
1208
1209
1210 conn.remote_remove_from_store( oid )
1211
1212 d['__init__'] = __init__
1213 d['__setattr__'] = __setattr__
1214 d['__delattr__'] = __delattr__
1215 d['__getattribute__'] = __getattribute__
1216 d['__del__'] = __del__
1217 return type(nm, tuple(b), d)
1218
1219
1220
1222 nm = ex_nm + '_proxy_enhanced'
1223 log('Making class: %s.%s' % (ex_mod, nm))
1224 basemod = import_mod(ex_mod, sys.path)
1225 b = ( getattr(basemod, ex_nm), )
1226
1227 def __init__(self, args, trb):
1228 log('initializing base %s' % b[0].__name__, *args)
1229 b[0].__init__(self, *args)
1230 self.remote_traceback = trb
1231 log('initialized base %s' % b[0].__name__)
1232
1233 d = { '__init__': __init__ }
1234 if getattr(b[0], '__class__', None) is type(b[0]):
1235 tmp = type(nm, b, d)
1236 else:
1237 tmp = (types.ClassType)(nm, b, d)
1238 log('Constructed enh_exc_class', type(tmp))
1239 return tmp
1240
1241
1243 assert isinstance(conn, Connection)
1244 try:
1245 enh_cl = conn.class_of_inst.get(ex_clid)
1246 except KeyError:
1247 enh_cl = enh_exc_class_maker(ex_nm, ex_mod)
1248 conn.class_of_inst.put(ex_clid, enh_cl)
1249 return enh_cl(ex_args, trb)
1250
1251
1253 cl = enh_exc_class_maker('KeyError', 'exceptions')
1254 q = cl(('test',), ['r',])
1255 try:
1256 raise q
1257 except KeyError, x:
1258 assert x.args[0]=='test' and x.remote_traceback[0]=='r'
1259 else:
1260 raise AssertionError, "Failure of enh_exc_class_maker"
1261
1262
1263 test_ee()
1264
1265
1267 assert isinstance(conn, Connection)
1268 try:
1269 tmp = conn.remote_special_objs[ocid]
1270 prinfo('From Cache: %s' % S(tmp))
1271 return tmp
1272 except KeyError:
1273 pass
1274 try:
1275 return conn.class_of_inst.get(ocid)
1276 except KeyError:
1277 tmp = class_under_proxied_instance(ocid, conn)
1278 conn.class_of_inst.put(ocid, tmp)
1279 return tmp
1280
1281
1283 assert isinstance(conn, Connection)
1284 prinfo('Proxy factory %s' % S(oid))
1285 try:
1286 tmp = conn.rowlp.get(oid)
1287 prinfo('Proxy object found in rowlp cache: %s' % S(tmp))
1288 return tmp
1289 except KeyError:
1290 pass
1291 if conn.remote_special_objs.get(cloid, None) is type:
1292
1293
1294
1295
1296
1297
1298 return proxy_to_a_class(oid, conn)
1299
1300 prinfo('proxy_factory step 2')
1301 prinfo('Proxy Factory cloid=%s' % S(cloid))
1302 assert cloid is not None
1303 tmp = cached_class_u_p_i(cloid, conn)
1304 prinfo('proxyclass made: %s %s' % (S(tmp), S(tmp.__class__.__name__)))
1305 prinfo('dir(tmp)= %s' % S(dir(tmp)))
1306 prinfo('tmp.__class__= %s' % S(tmp.__class__))
1307 prinfo('tmp.__dict__= %s' %
1308 ' '.join('%s=%s' % (S(k),S(v)) for (k,v) in tmp.__dict__.items())
1309 )
1310 prinfo('tmp.__class__ is callable? %s' % S(callable(tmp.__class__)))
1311 ins = tmp(oid, conn)
1312 prinfo('proxy instance made: %s' % S(tmp))
1313 conn.rowlp.put(oid, ins)
1314 return ins
1315
1316
1317
1318
1320 try:
1321 import gc as _gc
1322 except ImportError:
1323 _gc = None
1324
1325 - def __init__(self, path, arglist, cname=None, printlock=None):
1326 if cname is not None:
1327 stderr = fancyprint(sys.stderr, 'E:%s:' % cname, lock=printlock)
1328 stdout = fancyprint(sys.stdout, 'O:%s:' % cname, lock=printlock)
1329 else:
1330 stderr = fancyprint(sys.stderr, lock=printlock)
1331 stdout = fancyprint(sys.stdout, lock=printlock)
1332 self.wfd, self.rfd = g_pipe.popen2(path, arglist)
1333 self.c = Connection(self.wfd, self.rfd,
1334 stderr=stderr, stdout=stdout)
1335
1337 """Import a module from the specified path, or, failing that,
1338 look in sys.path then for a builtin.
1339 If path is None, only look in the remote machine's sys.path and builtins.
1340 If path is an array containing None, replace the None with the remote machine's sys.path.
1341 The default is None.
1342 """
1343 return self.c.remote_import_mod(name, path)
1344
1345 - def test(self, i):
1347
1350
1352 if self.c is None:
1353 return
1354 prinfo( "Closing connection for remote exec" )
1355 self.c.close()
1356 prinfo( "Closing remote exec - c closed" )
1357 self.wfd.close()
1358 prinfo( "Closing remote exec - wfd closed" )
1359 self.rfd.close()
1360 prinfo( "Closing remote exec - rfd closed" )
1361 self.c = None
1362 if self._gc is not None:
1363 self._gc.collect()
1364
1365
1367 if self.c is not None:
1368 self.close()
1369
1370
1372 prinfo( '%TEST_MINIMAL %%%%%%%%%%%%%%%%%START')
1373 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] )
1374 prinfo( '%%%%%%%%%%%%%%%%%%MID')
1375 re.close()
1376 prinfo( '%%%%%%%%%%%%%%%%%%END')
1377
1378
1380 prinfo( '%TEST_SIMPLECLASS ++++++++++++++++++++++++++' )
1381 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] )
1382 for (itest, t) in enumerate(TEST):
1383 if isinstance(t, test1):
1384 break
1385 pt1 = re.test(itest)
1386 prinfo( 'pt1.dict= %s' % repr( pt1.__dict__ ))
1387 prinfo( 'pt1.cl= %s' % repr( pt1.__class__))
1388 prinfo( 'pt1.cl.d = %s' % repr( pt1.__class__.__dict__))
1389 prinfo( 'pt1.ga = %s' % repr( pt1.__class__.__dict__['__getattribute__']))
1390 prinfo( 'pt1.whatami=%s' % pt1.whatami)
1391 prinfo( '+++++++++++++++---------------')
1392 prinfo( 'pt1.w()= %s' % pt1.w() )
1393 prinfo( '+++++++++++++++---------------********')
1394 prinfo( 'pt1.x= %s' % pt1.x )
1395 assert pt1.x == TEST[itest].x
1396 prinfo('-----------2')
1397 assert TEST[itest] == pt1
1398 assert pt1 == TEST[itest]
1399 assert pt1.whatami == TEST[itest].whatami
1400 assert pt1.w() == TEST[itest].w()
1401 assert isinstance(pt1, proxy_object)
1402 assert pt1.__class__.__name__ == TEST[itest].__class__.__name__
1403 re.close()
1404
1405
1407 prinfo( '%TEST_MINIMALCLASS +++++++++++++++++++++++++++' )
1408 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] )
1409 for (i, t) in enumerate(TEST):
1410 if isinstance(t, test0):
1411 pt1 = re.test(i)
1412 assert isinstance(pt1, proxy_object)
1413 prinfo( 'names: %s vs. %s' % (pt1.__class__.__name__, t.__class__.__name__))
1414 assert pt1.__class__.__name__ == t.__class__.__name__
1415 if isinstance(t, test0classic):
1416 pt1 = re.test(i)
1417 assert pt1.__dict__ == t.__dict__
1418 assert pt1.__class__.__name__ == t.__class__.__name__
1419 re.close()
1420
1421
1423 prinfo( '%TEST_EQ QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ')
1424 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] )
1425 for (i, t) in enumerate(TEST):
1426 prinfo( 'TEST EQ %d' % i )
1427 if isinstance(t, test0):
1428 break
1429 tmp = re.test(i)
1430 assert tmp == t
1431 prinfo('Passed test %d' % i)
1432 re.close()
1433
1435 prinfo( '%TEST_SYS QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ')
1436 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] )
1437 tmp = re.import_mod('sys')
1438 assert isinstance(sys.path, types.ListType)
1439 assert len(tmp.path) > 0
1440 assert len(sys.path) > 0
1441 ltp = len(tmp.path)
1442 tmp.path.append('.')
1443 tmp.path.insert(0, '.')
1444 assert len(tmp.path) == ltp+2
1445 assert tmp.path[0] == tmp.path[-1]
1446 assert 'sys' in tmp.modules
1447
1448
1449
1450 print tmp
1451 print 'R:', tmp.argv, sys.argv
1452 assert tmp.argv[-1].split('/')[-1] == 'g_proxy.py', 'tmp.argv= %s' % str(tmp.argv)
1453 assert tmp.argv != sys.argv
1454 re.close()
1455
1457 prinfo( '%TEST_BUILTINS QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ')
1458 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] )
1459 tmp = re.import_mod('__builtin__')
1460 print tmp, tmp.__name__
1461 q1 = getattr(tmp, 'abs')
1462 q = tmp.abs
1463 assert q is q1
1464 print q
1465 assert q(3) == 3
1466 assert q(-3) == 3
1467 re.close()
1468
1469
1471 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] )
1472 raise SystemExit()
1473 re.close()
1474
1475
1477 prinfo('%TEST_GLOBALS*******************************')
1478 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] )
1479 tmp = re.globals()
1480 tmp['x'] = 3
1481 assert tmp['x'] == 3
1482 foo = test0()
1483 tmp['x'] = foo
1484 prinfo( 'Type(foo)= %s; type(tmp[x])=%s' % (str(type(foo)), str(type(tmp['x']))))
1485
1486 re.close()
1487
1488
1490 prinfo('%TEST_FILE $$$$$$$$$$$$$$$$$$')
1491 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] )
1492 tmp = re.import_mod('__builtin__')
1493
1494
1495
1496
1497
1498 rfd = tmp.open('/dev/null', 'r')
1499 assert rfd.readline() == ''
1500 opn = tmp.open
1501 rfd = opn('/dev/null', 'r')
1502 assert rfd.readline() == ''
1503 wfd = opn('/tmp/fooproxytest', 'w')
1504 wfd.write('xyzzy\n')
1505 wfd.close()
1506 rfd = opn('/tmp/fooproxytest', 'r')
1507 assert rfd.readline() == 'xyzzy\n'
1508 re.close()
1509
1510
1512 prinfo('%TEST_numpy !+!+!+!+!+!+!+!+!+!+!+!+!+!+!+!+!+!+!')
1513 import numpy
1514 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] )
1515 q = numpy.array((6,7,8))
1516 tmp = re.import_mod('numpy')
1517 print tmp.sum(q)
1518 print tmp.array((1,2,3))[1]
1519 re.close()
1520
1522 prinfo('%TEST_EXCEPTION !!!!!!!!!!!!!!!!!!!')
1523 re = remote_exec(sys.executable, ['python', '-m', 'g_proxy'] )
1524 tmp = re.import_mod('sys')
1525 try:
1526 sys.foobar
1527 except AttributeError, x:
1528 prinfo('OK: x=%s' % str(x))
1529 else:
1530 raise AssertionError, 'Wrong exception!'
1531 del tmp
1532 prinfo('-o-o-o-o-o-o-o-o-o')
1533 rmath = re.import_mod('math')
1534 try:
1535 rmath.sqrt(-1)
1536 except ValueError, x:
1537 assert 'domain' in str(x)
1538 else:
1539 raise AssertionError, 'Bad Exception!'
1540 re.close()
1541 prinfo('-c-c-c-c-c-c-c-c-c')
1542
1543
1569
1570
1571 if __name__ == '__main__':
1572 arglist = sys.argv[1:]
1573 while arglist and arglist[0].startswith('-'):
1574 arg = arglist.pop(0)
1575 if arg == '--':
1576 break
1577 elif arg == '-test':
1578 _logfd = open('g_proxy.client', 'w')
1579
1580
1581
1582 test_minimal()
1583 test_minimalclass()
1584 sys.setcheckinterval(1)
1585 test_simpleclass()
1586 sys.setcheckinterval(3)
1587 test_simpleclass()
1588 sys.setcheckinterval(9)
1589 test_simpleclass()
1590 sys.setcheckinterval(27)
1591 test_simpleclass()
1592 sys.setcheckinterval(81)
1593 test_simpleclass()
1594 sys.setcheckinterval(300)
1595 test_simpleclass()
1596 sys.setcheckinterval(1000)
1597 test_simpleclass()
1598 test_EQ()
1599 test_builtins()
1600 test_sys()
1601
1602 test_globals()
1603 test_file()
1604 test_exception()
1605 sys.exit(0)
1606 else:
1607 print 'Unrecognized flag: %s' % arg
1608 sys.exit(1)
1609 if arglist:
1610 name = arglist[0]
1611 _logfd = open('g_proxy.client', 'w')
1612 else:
1613 name = 'server'
1614 _logfd = open('g_proxy.server', 'w')
1615 server(name)
1616 else:
1617 _logfd = open('g_proxy.client', 'w')
1618