Package gmisclib :: Module mcmc_cooperate
[frames] | no frames]

Source Code for Module gmisclib.mcmc_cooperate

  1  # -*- coding: utf-8 -*- 
  2   
  3  import socket 
  4  import random 
  5  import hashlib 
  6  import cPickle 
  7   
  8  import g_encode 
  9   
 10   
 11   
12 -class Barrier(object):
13 - def __init__(self, *x):
14 """Constructs a barrier from a list of integers. 15 """ 16 if len(x)==1 and isinstance(x[0], str): 17 self.x = tuple([int(q) for q in x[0].split('.')]) 18 else: 19 self.x = tuple(x) 20 for tmp in self.x: 21 if not (tmp >= 0): 22 raise ValueError, "Nonpositive component of Barrier: %s" % tmp
23
24 - def __cmp__(self, other):
25 return cmp(self.x, other.x)
26 27
28 - def __iadd__(self, other):
29 if len(other.x) > len(self.x): 30 xl = list(other.x) 31 for (i, x) in enumerate(self.x): 32 xl[i] += x 33 else: 34 xl = list(self.x) 35 for (i, x) in enumerate(other.x): 36 xl[i] += x 37 self.x = tuple(xl) 38 return self
39
40 - def __repr__(self):
41 return '.'.join([str(q) for q in self.x])
42
43 - def deepen(self, v):
44 return Barrier( *(self.x + (v,)) )
45 46 assert Barrier(12) == Barrier(12) 47 assert Barrier(11) > Barrier(10) 48 assert Barrier(1, 3) > Barrier(1, 1) 49 assert Barrier(1, 3) > Barrier(1) 50 assert Barrier(2) > Barrier(1, 3) 51 assert Barrier(2, 1) > Barrier(1, 3) 52 53
54 -class Oops(Exception):
55 - def __init__(self, *s):
56 Exception.__init__(self, *s)
57
58 -class LateToBarrier(Oops):
59 - def __init__(self, *s):
60 Oops.__init__(self, *s)
61 62 63 encoder = g_encode.encoder(regex=r"""[^a-zA-Z0-9<>?,./:";'{}[\]!@$^&*()_+=|\\-]""") 64 65
66 -class connection(object):
67 e = encoder 68
69 - def __init__(self, host, port, Key, jobid):
70 self.my_id = None 71 self.host = host 72 self.port = port 73 self.jobid = jobid 74 self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 75 self.sock.connect((self.host, self.port)) 76 self.sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True) 77 seed = hash(open("/dev/urandom", "r").read(16)) 78 x = hashlib.sha1('%s:%s:%s' % (Key, self.jobid, seed)).hexdigest() 79 self.sf = self.sock.makefile("w", 4096) 80 self.rf = self.sock.makefile("r", 4096) 81 assert self.version()[1] == __version__ 82 self.send("connect", jobid, seed, x) 83 self.flush() 84 self.my_id = self.recv()[1]
85
86 - def send(self, *s):
87 assert s 88 self.sf.write(' '.join([self.e.fwd(str(q)) for q in s]) + '\n')
89
90 - def flush(self):
91 self.sf.flush()
92
93 - def recv(self):
94 tmp = [self.e.back(q) for q in self.rf.readline().strip().split()] 95 if not tmp: 96 raise Oops("Empty response") 97 if tmp[0] == "Fail": 98 raise Oops(*(tmp[1:])) 99 return tmp
100
101 - def list_ops(self):
102 self.send("list_ops", self.jobid, self.my_id) 103 self.flush() 104 return self.recv()[1:]
105
106 - def version(self):
107 self.send("version") 108 self.flush() 109 return [Barrier(q) for q in self.recv()[1:]]
110
111 - def rank(self):
112 """@return: number_of_processes, rank_of_this_process 113 @rtype: (int, int) 114 """ 115 self.send("rank", self.jobid, self.my_id) 116 self.flush() 117 a = self.recv() 118 return (int(a[1]), int(a[2]))
119
120 - def close(self):
121 self.send("leave", self.jobid, self.my_id) 122 self.flush() 123 self.recv() 124 self.my_id = None
125
126 - def __del__(self):
127 if self.my_id is not None: 128 self.close()
129
130 - def barrier(self, b, nmin=0, exc=True):
131 self.send("barrier", self.jobid, self.my_id, b, nmin) 132 self.flush() 133 x = self.recv() 134 if exc and x[0]!="OK": 135 raise LateToBarrier(x[1]) 136 if x[0]=="OK": 137 return None 138 return Barrier(x[1])
139
140 - def swap_vec(self, logp, v):
141 """@return: (logp, vector) 142 """ 143 size = v.shape[0] 144 self.send("*set_vector", self.jobid, self.my_id, size, logp, cPickle.dumps(v)) 145 self.send("get_vector", self.jobid, self.my_id) 146 self.flush() 147 lp, v = self.recv()[1:] 148 return (float(lp), cPickle.loads(v))
149
150 - def set(self, *kv):
151 assert len(kv)>=2 and len(kv)%2 == 0 152 self.send("set_info", self.jobid, self.my_id, *kv) 153 self.flush() 154 self.recv()
155
156 - def get_list(self, key):
157 self.send("get_info_list", self.jobid, self.my_id, key) 158 self.flush() 159 return self.recv()[1:]
160
161 - def get_combined(self, key, operation):
162 self.send("get_info_combined", self.jobid, self.my_id, key, operation) 163 self.flush() 164 return Unpack[operation](self.recv()[1])
165
166 - def spread(self, key, value, barrier, nmin=0):
167 self.send("*barrier", self.jobid, self.my_id, barrier.deepen(0), nmin) 168 self.send("*set_info", self.jobid, self.my_id, key, value) 169 self.send("*barrier", self.jobid, self.my_id, barrier.deepen(1), nmin) 170 self.send("get_info_list", self.jobid, self.my_id, key) 171 self.flush() 172 tmp = self.recv() 173 return tmp[1:]
174 175
176 - def get_consensus(self, key, value, barrier, operation, nmin=0):
177 self.send("*barrier", self.jobid, self.my_id, barrier.deepen(0), nmin) 178 self.send("*set_info", self.jobid, self.my_id, key, value) 179 self.send("*barrier", self.jobid, self.my_id, barrier.deepen(1), nmin) 180 self.send("get_info_combined", self.jobid, self.my_id, key, operation) 181 self.flush() 182 return self.recv()[1]
183 184
185 -def test0():
186 import time 187 test = connection(*test_args) 188 test.set("a", "A") 189 # test.barrier(Barrier(1)) 190 time.sleep(random.expovariate(1.0)) 191 test.set("a", "B") 192 time.sleep(random.expovariate(1.0)) 193 test.barrier(Barrier(2)) 194 time.sleep(random.expovariate(1.0)) 195 for x in test.get_list("a"): 196 assert x == "B" 197 time.sleep(random.expovariate(1.0)) 198 test.barrier(Barrier(3)) 199 test.barrier(Barrier(4)) 200 test.set("a", "C") 201 test.close() 202 print "Test0 OK"
203 204
205 -def test0s():
206 import time 207 test = connection(*test_args) 208 test.set("a", "A") 209 time.sleep(random.expovariate(1.0)) 210 test.barrier(Barrier(1)) 211 print "Barrier 1s" 212 time.sleep(random.expovariate(1.0)) 213 test.barrier(Barrier(1, 3)) 214 print "Barrier1.3s" 215 time.sleep(random.expovariate(1.0)) 216 tmp = test.spread("a", "x", Barrier(1, 3)) 217 print "test.spread 0s tmp=", tmp 218 for x in test.get_list("a"): 219 assert x == "x" 220 time.sleep(random.expovariate(1.0)) 221 test.barrier(Barrier(3)) 222 test.set("a", "B") 223 test.close() 224 print "Test0s OK"
225
226 -def test1():
227 import time 228 test = connection(*test_args) 229 print "rank=", test.rank() 230 test.set("key", "value") 231 assert "value" in test.get_list("key") 232 time.sleep(random.expovariate(100.0)) 233 test.set("k2", "2", "k", "wahoonie") 234 test.barrier(Barrier(1)) 235 print 'Barrier 1' 236 time.sleep(random.expovariate(100.0)) 237 test.barrier(Barrier(1, 1)) 238 time.sleep(random.expovariate(100.0)) 239 test.set("k2", "1", "k", "no_value") 240 test.barrier(Barrier(2)) 241 print 'Barrier 2' 242 time.sleep(random.expovariate(100.0)) 243 test.barrier(Barrier(2, 0, 1)) 244 tmp = test.get_list("k2") 245 assert tmp[0] == "1", "tmp=%s" % tmp 246 test.barrier(Barrier(2, 1)) 247 print 'Barrier 2.1' 248 test.set("k2", "2", "k", "wahoonie") 249 time.sleep(random.expovariate(100.0)) 250 test.barrier(Barrier(2, 1, 1)) 251 tmp = test.get_list("k") 252 assert tmp[0] == "wahoonie", "tmp=%s" % tmp 253 test.set("k3", "x y") 254 assert "x y" in test.get_list("k3") 255 time.sleep(random.expovariate(100.0)) 256 assert test.get_combined("k2", 'float_median') == 2.0 257 test.barrier(Barrier(3)) 258 time.sleep(random.expovariate(100.0)) 259 test.barrier(Barrier(3)) 260 time.sleep(random.expovariate(100.0)) 261 assert test.barrier(Barrier(2), exc=False) == Barrier(3) 262 time.sleep(random.expovariate(100.0)) 263 assert test.barrier(Barrier(1), exc=False) == Barrier(3) 264 time.sleep(random.expovariate(100.0)) 265 assert test.barrier(Barrier(3)) is None 266 time.sleep(random.expovariate(100.0)) 267 assert test.barrier(Barrier(4)) is None 268 assert test.spread("k3", "x", Barrier(6))[0] == "x" 269 time.sleep(random.expovariate(100.0)) 270 assert test.get_consensus("k3", "a", Barrier(6, 3), "string_median") == "a" 271 test.barrier(Barrier(7)) 272 print "barrier 7" 273 tmp = test.spread("k3", "x", Barrier(7, 3)) 274 assert tmp[0] == "x", "tmp= %s" % tmp 275 time.sleep(random.expovariate(100.0)) 276 test.close()
277
278 -def test_many(n, fcn):
279 import threading 280 t = [] 281 for i in range(n): 282 t.append( threading.Thread(target=fcn) ) 283 for tt in t: 284 tt.start() 285 for tt in t: 286 tt.join()
287 288 289 __version__ = Barrier(0, 1, 0) 290 291
292 -def op_string_median(tmp):
293 tmp.sort() 294 n = len(tmp) 295 if n%2 == 1: 296 return [ tmp[n//2] ] 297 return [ random.choice([tmp[(n-1)//2], tmp[n//2]]) ]
298 299
300 -def op_float_median(tmp):
301 try: 302 tmp = sorted([float(q) for q in tmp]) 303 except ValueError, x: 304 return "Bad data: %s" % str(x) 305 n = len(tmp) 306 if n%2 == 1: 307 return [ tmp[n//2] ] 308 return [ 0.5*(tmp[(n-1)//2] + tmp[n//2]) ]
309 310 311 Ops = {'string_median': op_string_median, 312 'float_median': op_float_median 313 } 314 Unpack = {'float_median': float, 'string_median': str} 315 316 test_args = ('localhost', 8487, "K", "job") 317 318 if __name__ == '__main__': 319 test_many(2, test0s) 320 # print 'Test_many(2s) OK' 321 test_many(2, test0) 322 # print 'Test_many(2) OK' 323 test1() 324 print 'Test1 OK' 325 test_many(2, test1) 326 test_many(11, test0s) 327 test_many(11, test0) 328 test_many(11, test1) 329 # print "OK" 330