Package gyropy :: Module g_remote_exec
[frames] | no frames]

Source Code for Module gyropy.g_remote_exec

  1  #!/usr/bin/env python 
  2   
  3  import sys 
  4  import imp 
  5  import cPickle 
  6  from gmisclib import g_pipe 
  7  from gmisclib import load_mod 
  8  import traceback 
  9  from gmisclib import die 
 10   
 11  PROTOCOL = cPickle.HIGHEST_PROTOCOL 
 12   
13 -class RemoteException(Exception):
14 - def __init__(self, *s):
15 Exception.__init__(self, *s)
16
17 -def make_remote_exception(name, mod, args, trtext):
18 return RemoteException("Remote Exception=", (mod, name), 19 "args=", [str(q) for q in args], 20 "traceback=", trtext 21 )
22 23
24 -def deal_with_arbitrary_exception():
25 ty, ob, trb = sys.exc_info() 26 tmp = tuple(traceback.format_tb(trb)) 27 del trb 28 log( "DWAE, construcing remote exception" ) 29 return make_remote_exception(ty.__name__, ty.__module__, 30 ob.args, tmp 31 )
32 33
34 -class io_grabber(object):
35 """This buffers text, transmitting it in units of lines. 36 That's important at the other end if there are multiple 37 connections, because fancyprint adds a prefix at the 38 beginning of each write, assuming it is the beginning 39 of each line.""" 40
41 - def __init__(self):
42 self.store = []
43
44 - def write(self, s):
45 self.store.append( s )
46
47 - def writelines(self, x):
48 self.store.extend( x )
49
50 - def flush(self):
51 pass
52
53 - def close(self):
54 pass
55
56 - def get_io(self):
57 tmp = self.store 58 self.store = [] 59 return tmp
60 61 62
63 -class response(object):
64 - def __init__(self, ety, val, out, err):
65 self.val = val 66 self.out = out 67 self.err = err 68 self.ety = ety
69 70 71 _ffc = {}
72 -def _find_func(mname, path, fname):
73 key = (mname, path, fname) 74 try: 75 return _ffc[ key ] 76 except KeyError: 77 m = load_mod.load(mname, path) 78 if isinstance(fname, tuple): 79 f = m 80 for fc in fname: 81 f = getattr(f, fc) 82 else: 83 f = getattr(m, fname) 84 _ffc[ key ] = f 85 return f
86 87
88 -class request(object):
89 - def __init__(self, path, mname, fname, a, kw):
90 self.path = path 91 self.mname = mname 92 self.fname = fname 93 self.a = a 94 self.kw = kw
95 96
97 - def call(self):
98 f = _find_func(self.mname, self.path, self.fname) 99 ety = False 100 try: 101 v = f(*(self.a), **(self.kw)) 102 except: 103 v = deal_with_arbitrary_exception() 104 ety = True 105 return response(ety, v, [], [])
106 107
108 -def set_remote_path(pa):
109 if pa is None: 110 return 111 pth = [] 112 imp.acquire_lock() 113 for pc in pa: 114 if pc is None: 115 pth.extend( sys.path) 116 else: 117 pth.append(pc) 118 sys.path = pth 119 imp.release_lock() 120 return pth
121 122
123 -def echo(s):
124 return s
125 126
127 -class remote_fcn(object):
128 - def __init__(self, connection, pypath, mname, fname):
129 self.connection = connection 130 if pypath is None: 131 self.pypath = pypath 132 else: 133 self.pypath = tuple(pypath) 134 self.mname = mname 135 assert isinstance(fname, str) or isinstance(fname, tuple) 136 if isinstance(fname, str): 137 self.fname = fname 138 else: 139 self.fname = tuple(fname)
140
141 - def __call__(self, *a, **kw):
142 # print "remote_fcn.__call__" 143 tmp = self.connection.call(self.pypath, self.mname, self.fname, a, kw) 144 # print "remote_fcn.__call__: got response" 145 sys.stdout.writelines(tmp.out) 146 sys.stdout.flush() 147 sys.stderr.writelines(tmp.err) 148 sys.stderr.flush() 149 if tmp.ety: 150 # print "remote_fcn raised exception", tmp.val 151 raise tmp.val 152 # print "remote_fcn result", tmp.val 153 return tmp.val
154 155
156 -class connection(object):
157 - def __init__(self, execpath, arglist):
158 self.closed = False 159 self.wfd, self.rfd = g_pipe.popen2(execpath, arglist)
160
161 - def call(self, pypath, mname, fname, a, kw):
162 # print 'connection.call' 163 # self.wfd.writelines('q\n') 164 cPickle.dump(request(pypath, mname, fname, a, kw), self.wfd, PROTOCOL) 165 # print 'connection.dumped' 166 self.wfd.flush() 167 # print 'connection.flushed' 168 # tmp = self.rfd.readline() 169 # assert tmp=='z\n', "tmp=%s(%d)" % (tmp, len(tmp)) 170 tmp = cPickle.load(self.rfd) 171 # print 'connection.got response' 172 return tmp
173 174
175 - def close(self):
176 if not self.closed: 177 self.wfd.close() 178 self.rfd.close() 179 self.closed = True
180
181 - def __del__(self):
182 self.close()
183
184 -def log(*s):
185 # sys.stderr.writelines(str(s)) 186 # sys.stderr.writelines('\n') 187 pass
188 189
190 -def server():
191 log('Server starting') 192 out = sys.stdout 193 sys.stdout = sys.stderr 194 while True: 195 log('serverloop') 196 try: 197 # tmp = sys.stdin.readline() 198 # if not tmp: 199 # log("EOF on readline") 200 # break 201 # assert tmp=='q\n', "tmp=%s(%d)" % (tmp, len(tmp)) 202 rq = cPickle.load(sys.stdin) 203 log('loaded') 204 except EOFError: 205 log("EOF error") 206 break 207 log('call') 208 back = rq.call() 209 log('return') 210 # out.writelines('z\n') 211 cPickle.dump(back, out, PROTOCOL) 212 out.flush() 213 log('loopend')
214 215
216 -def test_rf2(i):
217 return [8] * i
218
219 -def test():
220 c = connection(None, ['python', '-c', 'import gyropy; gyropy.g_remote_exec.server()']) 221 f = remote_fcn(c, None, 'math', 'sqrt') 222 g = remote_fcn(c, None, 'gyropy', ('g_remote_exec', 'test_rf2')) 223 for i in range(1000): 224 err = abs(f(i)**2 - i) 225 if err > 0.001: 226 die.die("Wrong answer") 227 try: 228 f(-1) 229 except RemoteException: 230 pass 231 else: 232 die.die("No exception!") 233 f2 = remote_fcn(c, None, "gyropy", ("g_remote_exec", "set_remote_path")) 234 if f2([None, "/tmp"])[-1] != "/tmp": 235 die.die("Bad set remote path") 236 for i in range(98, 1000, 3): 237 if len(g(i)) != i: 238 die.die("Wrong answer") 239 c.close()
240 241 242 243 if __name__ == '__main__': 244 if len(sys.argv)>1 and sys.argv[1] == '-test': 245 for i in range(10): 246 test() 247 print 'OK' 248 sys.exit(0) 249 server() 250