1
2
3 import socket
4 import random
5 import hashlib
6 import cPickle
7
8 import g_encode
9
10
11
14 """Constructs a barrier from a list of integers.
15 """
16 if len(x)==1 and isinstance(x[0], str):
17 self.x = tuple([int(q) for q in x[0].split('.')])
18 else:
19 self.x = tuple(x)
20 for tmp in self.x:
21 if not (tmp >= 0):
22 raise ValueError, "Nonpositive component of Barrier: %s" % tmp
23
25 return cmp(self.x, other.x)
26
27
29 if len(other.x) > len(self.x):
30 xl = list(other.x)
31 for (i, x) in enumerate(self.x):
32 xl[i] += x
33 else:
34 xl = list(self.x)
35 for (i, x) in enumerate(other.x):
36 xl[i] += x
37 self.x = tuple(xl)
38 return self
39
41 return '.'.join([str(q) for q in self.x])
42
45
46 assert Barrier(12) == Barrier(12)
47 assert Barrier(11) > Barrier(10)
48 assert Barrier(1, 3) > Barrier(1, 1)
49 assert Barrier(1, 3) > Barrier(1)
50 assert Barrier(2) > Barrier(1, 3)
51 assert Barrier(2, 1) > Barrier(1, 3)
52
53
54 -class Oops(Exception):
57
61
62
63 encoder = g_encode.encoder(regex=r"""[^a-zA-Z0-9<>?,./:";'{}[\]!@$^&*()_+=|\\-]""")
64
65
67 e = encoder
68
69 - def __init__(self, host, port, Key, jobid):
70 self.my_id = None
71 self.host = host
72 self.port = port
73 self.jobid = jobid
74 self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
75 self.sock.connect((self.host, self.port))
76 self.sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True)
77 seed = hash(open("/dev/urandom", "r").read(16))
78 x = hashlib.sha1('%s:%s:%s' % (Key, self.jobid, seed)).hexdigest()
79 self.sf = self.sock.makefile("w", 4096)
80 self.rf = self.sock.makefile("r", 4096)
81 assert self.version()[1] == __version__
82 self.send("connect", jobid, seed, x)
83 self.flush()
84 self.my_id = self.recv()[1]
85
87 assert s
88 self.sf.write(' '.join([self.e.fwd(str(q)) for q in s]) + '\n')
89
92
94 tmp = [self.e.back(q) for q in self.rf.readline().strip().split()]
95 if not tmp:
96 raise Oops("Empty response")
97 if tmp[0] == "Fail":
98 raise Oops(*(tmp[1:]))
99 return tmp
100
102 self.send("list_ops", self.jobid, self.my_id)
103 self.flush()
104 return self.recv()[1:]
105
110
112 """@return: number_of_processes, rank_of_this_process
113 @rtype: (int, int)
114 """
115 self.send("rank", self.jobid, self.my_id)
116 self.flush()
117 a = self.recv()
118 return (int(a[1]), int(a[2]))
119
121 self.send("leave", self.jobid, self.my_id)
122 self.flush()
123 self.recv()
124 self.my_id = None
125
127 if self.my_id is not None:
128 self.close()
129
130 - def barrier(self, b, nmin=0, exc=True):
131 self.send("barrier", self.jobid, self.my_id, b, nmin)
132 self.flush()
133 x = self.recv()
134 if exc and x[0]!="OK":
135 raise LateToBarrier(x[1])
136 if x[0]=="OK":
137 return None
138 return Barrier(x[1])
139
141 """@return: (logp, vector)
142 """
143 size = v.shape[0]
144 self.send("*set_vector", self.jobid, self.my_id, size, logp, cPickle.dumps(v))
145 self.send("get_vector", self.jobid, self.my_id)
146 self.flush()
147 lp, v = self.recv()[1:]
148 return (float(lp), cPickle.loads(v))
149
150 - def set(self, *kv):
151 assert len(kv)>=2 and len(kv)%2 == 0
152 self.send("set_info", self.jobid, self.my_id, *kv)
153 self.flush()
154 self.recv()
155
157 self.send("get_info_list", self.jobid, self.my_id, key)
158 self.flush()
159 return self.recv()[1:]
160
162 self.send("get_info_combined", self.jobid, self.my_id, key, operation)
163 self.flush()
164 return Unpack[operation](self.recv()[1])
165
166 - def spread(self, key, value, barrier, nmin=0):
167 self.send("*barrier", self.jobid, self.my_id, barrier.deepen(0), nmin)
168 self.send("*set_info", self.jobid, self.my_id, key, value)
169 self.send("*barrier", self.jobid, self.my_id, barrier.deepen(1), nmin)
170 self.send("get_info_list", self.jobid, self.my_id, key)
171 self.flush()
172 tmp = self.recv()
173 return tmp[1:]
174
175
176 - def get_consensus(self, key, value, barrier, operation, nmin=0):
177 self.send("*barrier", self.jobid, self.my_id, barrier.deepen(0), nmin)
178 self.send("*set_info", self.jobid, self.my_id, key, value)
179 self.send("*barrier", self.jobid, self.my_id, barrier.deepen(1), nmin)
180 self.send("get_info_combined", self.jobid, self.my_id, key, operation)
181 self.flush()
182 return self.recv()[1]
183
184
203
204
225
227 import time
228 test = connection(*test_args)
229 print "rank=", test.rank()
230 test.set("key", "value")
231 assert "value" in test.get_list("key")
232 time.sleep(random.expovariate(100.0))
233 test.set("k2", "2", "k", "wahoonie")
234 test.barrier(Barrier(1))
235 print 'Barrier 1'
236 time.sleep(random.expovariate(100.0))
237 test.barrier(Barrier(1, 1))
238 time.sleep(random.expovariate(100.0))
239 test.set("k2", "1", "k", "no_value")
240 test.barrier(Barrier(2))
241 print 'Barrier 2'
242 time.sleep(random.expovariate(100.0))
243 test.barrier(Barrier(2, 0, 1))
244 tmp = test.get_list("k2")
245 assert tmp[0] == "1", "tmp=%s" % tmp
246 test.barrier(Barrier(2, 1))
247 print 'Barrier 2.1'
248 test.set("k2", "2", "k", "wahoonie")
249 time.sleep(random.expovariate(100.0))
250 test.barrier(Barrier(2, 1, 1))
251 tmp = test.get_list("k")
252 assert tmp[0] == "wahoonie", "tmp=%s" % tmp
253 test.set("k3", "x y")
254 assert "x y" in test.get_list("k3")
255 time.sleep(random.expovariate(100.0))
256 assert test.get_combined("k2", 'float_median') == 2.0
257 test.barrier(Barrier(3))
258 time.sleep(random.expovariate(100.0))
259 test.barrier(Barrier(3))
260 time.sleep(random.expovariate(100.0))
261 assert test.barrier(Barrier(2), exc=False) == Barrier(3)
262 time.sleep(random.expovariate(100.0))
263 assert test.barrier(Barrier(1), exc=False) == Barrier(3)
264 time.sleep(random.expovariate(100.0))
265 assert test.barrier(Barrier(3)) is None
266 time.sleep(random.expovariate(100.0))
267 assert test.barrier(Barrier(4)) is None
268 assert test.spread("k3", "x", Barrier(6))[0] == "x"
269 time.sleep(random.expovariate(100.0))
270 assert test.get_consensus("k3", "a", Barrier(6, 3), "string_median") == "a"
271 test.barrier(Barrier(7))
272 print "barrier 7"
273 tmp = test.spread("k3", "x", Barrier(7, 3))
274 assert tmp[0] == "x", "tmp= %s" % tmp
275 time.sleep(random.expovariate(100.0))
276 test.close()
277
279 import threading
280 t = []
281 for i in range(n):
282 t.append( threading.Thread(target=fcn) )
283 for tt in t:
284 tt.start()
285 for tt in t:
286 tt.join()
287
288
289 __version__ = Barrier(0, 1, 0)
290
291
298
299
309
310
311 Ops = {'string_median': op_string_median,
312 'float_median': op_float_median
313 }
314 Unpack = {'float_median': float, 'string_median': str}
315
316 test_args = ('localhost', 8487, "K", "job")
317
318 if __name__ == '__main__':
319 test_many(2, test0s)
320
321 test_many(2, test0)
322
323 test1()
324 print 'Test1 OK'
325 test_many(2, test1)
326 test_many(11, test0s)
327 test_many(11, test0)
328 test_many(11, test1)
329
330