Package gmisclib :: Module spread_jobs
[frames] | no frames]

Source Code for Module gmisclib.spread_jobs

  1  #! python 
  2   
  3  """ 
  4  A module that starts a bunch of subprocesses and distributes 
  5  work amongst them, then collects the results. 
  6   
  7  Subprocesses must follow a protocol: they must listen for commands 
  8  on the standard input (commands are encoded with C{cPickle}), 
  9  and they must produce C{cPickle}d tuples on their standard output. 
 10  NOTE: THEY CANNOT PRINT ANYTHING ELSE!  (But it's OK for subprocesses 
 11  to produce stuff on the standard error output.) 
 12   
 13  PROTOCOL: 
 14   
 15          1. Execcing state: C{spread_jobs} execs a group of subprocesses.  You have 
 16          full control of the argument list. 
 17   
 18          2. Preparation state: C{spread_jobs} will send a list of C{cPickled} things, 
 19          one at a time to the subprocess.   No explicit terminator 
 20          is added, so the subprocess must either know how many things 
 21          are coming or the list should contain some terminator. 
 22          (E.g. the last item of the list could be L{None}, and the 
 23          subprocess would wait for it.) 
 24          In this state, the subprocess must not produce anything on 
 25          the standard output. 
 26   
 27          3. Running state: C{spread_jobs} will send one C{cPickled} thing to the 
 28          subprocess and then wait for a C{cPickled} C{tuple} to come 
 29          back. 
 30   
 31          The request to the subprocess is a C{tuple(int, arbitrary)}. 
 32          The L{int} is a task-ID number which must be returned with the answer. 
 33          The C{arbitrary} is whatever information the subprocess needs to do its job. 
 34   
 35          The subprocess responds with a 3-L{tuple}.   The first element of the 
 36          tuple is either: 
 37           
 38                  - An instance of L{TooBusy}.  This causes the main process to 
 39                          put the task back on the queue and ignore this subprocess for 
 40                          a while.  The second element is printed; the third is ignored. 
 41                  - An instance of L{RemoteException}.   This leads to termination of 
 42                          the job and causes an exception to be raised on the main thread. 
 43                          The other two elements of the tuple are printed. 
 44                  - Anything else.   In that case, the first element is returned on the 
 45                          output list and the other two elements are printed. 
 46   
 47          The subprocess loops in the running state. 
 48          Normally, it should terminate when its standard input is closed. 
 49          (It can terminate itself if it wishes by simply closing the standard output and exiting.) 
 50   
 51          4. Shutdown state: The subprocess can produce anything it wants.   This will 
 52          be collected up and returned to the caller of C{spread_jobs.main}. 
 53   
 54          You can use this to run certain normal linux commands by not sending anything 
 55          in the preparatory state or the running state.  You will then be handed 
 56          the standard output as a list of strings. 
 57   
 58  Normally, the action happens in the running state. 
 59   
 60  Normally, the subprocess looks much like this:: 
 61   
 62          import cPickle 
 63          while True: 
 64                  try: 
 65                          control = cPickle.load(stdin) 
 66                  except EOFError: 
 67                          break 
 68                  d, so, se = compute(control) 
 69                  cPickle.dump((d, so, se), stdout, CP.HIGHEST_PROTOCOL) 
 70                  stdout.flush() 
 71          stdout.close() 
 72   
 73  @sort: main, replace, Replace, append 
 74  """ 
 75   
 76   
 77  from __future__ import with_statement 
 78   
 79  import re 
 80  import sys 
 81  import math 
 82  import time 
 83  import random 
 84  import cPickle as CP 
 85  import threading 
 86  import subprocess 
 87  import StringIO 
 88   
 89  from gmisclib import die 
 90  from gmisclib import gpkmisc 
 91  from gmisclib import dictops 
 92  from gyropy import g_mailbox as MB 
 93  # from gyropy import g_mailbox as MB 
 94   
 95   
96 -class notComputed(object):
97 """A singleton marker for values that haven't been computed.""" 98 pass
99 100 101
102 -class NoResponse(RuntimeError):
103 - def __init__(self, *s):
104 RuntimeError.__init__(self, *s)
105 106
107 -class RemoteException(Exception):
108 """An exception that corresponds to one raised by a subprocess. 109 This is raised in the parent process. 110 """
111 - def __init__(self, *s):
112 Exception.__init__(self, *s) 113 self.index = '' 114 self.comment = ''
115
116 - def __repr__(self):
117 return '<%s.RemoteException: %s>' % (__name__, repr(self.args))
118
119 - def raise_me(self):
120 raise self
121 122
123 -class TooBusy(object):
124 - def __init__(self, delay):
125 self.delay = delay
126 127 128
129 -class PastPerformance(dictops.dict_of_averages):
130 """This class keeps track of which machines are more and less successful. 131 It is normally used as a key, to sort machines into order. 132 """ 133
134 - def __init__(self):
136
137 - def add_many(self, kvpairs):
138 for (k, v) in kvpairs: 139 self.add(k, v)
140
141 - def __call__(self, x):
142 s = 0.5 143 n = 1 144 for t in x: 145 try: 146 sm, wts = self.get_both(str(t)) 147 except KeyError: 148 sm, wts = 0.5, 1.0 149 n += wts 150 s += sm 151 return -s/n
152 153
154 -class CannotCreateConnection(Exception):
155 - def __init__(self, *s):
156 Exception.__init__(self, *s)
157
158 -class Connection(object):
159 """This class represents a connection from the master process down to one of the slaves. 160 It also keeps track of how often the slave reports that it is too busy to work. 161 """ 162 163 BUSYFAC1 = 0.85 164 BUSYFAC2 = 1-BUSYFAC1 165 ConnectError = () 166 SendError = (IOError, ValueError) 167 GetError = (EOFError,) 168
169 - def __init__(self, arglist):
170 """ 171 @param arglist: an argument list to execute to start a subprocess. 172 @type arglist: a sequence of anything that can be converted to strings. 173 @note: This is where the arglist is finally converted to a list of strings. 174 @except OSError: when connection cannot be set up. 175 """ 176 self.arglist = [str(q) for q in arglist ] 177 self.lock = threading.Lock() 178 self.uness = 0.0 #: How useful is this thread?
179
180 - def send(self, todo):
181 """@except IOError: Trouble sending.""" 182 raise RuntimeError, "Virtual Method"
183
184 - def get(self):
185 """@return: (answer, standard_output, standard_error) or None. 186 @except EOFError: No data available from slave. 187 """ 188 raise RuntimeError, "Virtual Method"
189
190 - def close(self):
191 """Closes the channel to the slave.""" 192 raise RuntimeError, "Virtual Method"
193
194 - def wait(self):
195 """Waits for the slave to terminate and closes the channel from the slave. 196 @return: any final output. 197 @rtype: list(str) 198 """ 199 raise RuntimeError, "Virtual Method"
200
201 - def argstring(self):
202 return ' '.join(self.arglist)
203 204
205 - def usefulness(self):
206 with self.lock: 207 assert 0 <= self.uness <= 1.0 208 return self.uness
209
210 - def I_am_working(self, now):
211 with self.lock: 212 self.uness = self.uness*self.BUSYFAC1 + self.BUSYFAC2*now
213
214 - def mystate(self, state):
215 with self.lock: 216 if state == "running": 217 self.uness = 0.5 218 else: 219 self.uness = 0.0
220
221 - def performance(self):
222 u = self.usefulness() 223 for arg in self.arglist: 224 yield (arg, u)
225 226
227 -class Connection_subprocess(Connection):
228 """This is a L{Connection} via stdin/stdout to a subprocess.""" 229 GetError = (EOFError, CP.UnpicklingError, ValueError) 230 ConnectError = (OSError,) 231
232 - def __init__(self, arglist):
233 Connection.__init__(self, arglist) 234 try: 235 self.p = subprocess.Popen(self.arglist, stdin=subprocess.PIPE, 236 stdout=subprocess.PIPE, stderr=sys.stderr, 237 close_fds=True 238 ) 239 except self.ConnectError, ex: 240 raise CannotCreateConnection(str(ex), *(ex.args + ("subprocess.Popen",) + tuple(self.arglist)))
241
242 - def send(self, todo):
243 CP.dump(todo, self.p.stdin) 244 self.p.stdin.flush()
245 # sys.stdout.write('send - completed.\n') 246 247
248 - def get(self):
249 while True: 250 try: 251 rv = CP.load(self.p.stdout) 252 return rv 253 except CP.UnpicklingError, y: 254 die.warn("spread_jobs: Junk response: %s" % repr(y)) 255 die.info("spread_jobs: Junk remainder: %s" % self.p.stdout.readline()) 256 die.info("spread_jobs: Junk arglist: %s" % self.argstring()) 257 raise 258 return None
259 260
261 - def close1(self):
262 # die.info("Close stdin of subprocess") 263 self.p.stdin.close()
264
265 - def close2(self):
266 # die.info("Close stdout from subprocess") 267 tmp = self.p.stdout.readlines() 268 self.p.stdout.close() 269 # die.info("Wait for subprocess") 270 self.p.wait() 271 self.p = None 272 return tmp
273 274 # def __del__(self): 275 # if self.p is not None: 276 # die.warn("Connection_subprocess: close2 never called on %s" % str(arglist)) 277 # else: 278 # die.info("Connection_subprocess: close2 OK") 279
280 -def delay_sanitize(x):
281 return max(0.01, min(1000.0, float(x)))
282 283 284
285 -class _ThreadJob(threading.Thread):
286
287 - def __init__(self, iqueue, oqueue, p, stdin, solock, wss):
288 """@param stdin: something to send at the start of the subprocess 289 to get it going. This is before the main processing starts. 290 @type stdin: any iterable that yields something that can be 291 given to L{cPickle.dumps}. 292 @param p: The process to run. It's already been started, 293 but no input/output has occurred. 294 @type p: L{Connection} 295 @param solock: a lock to serialize the standard output 296 @type solock: threading.Lock 297 """ 298 threading.Thread.__init__(self, name='spread_jobs%s' % id(self)) 299 self.iqueue = iqueue 300 self.oqueue = oqueue 301 self.wss = wss 302 assert isinstance(p, Connection) 303 self.p = p 304 self.solock = solock 305 try: 306 for x in stdin: 307 self.p.send(x) 308 except self.p.SendError, x: 309 die.info("I/O error in thread start-up: %s" % str(x)) 310 self.p.close1()
311 312
313 - def want_shutdown(self):
314 # If it weren't for the fact that we use a maillist, and we start 315 # with it filled, this could lead to premature termination of some 316 # workers. 317 # 318 # Note that it is a mistake to shut down threads that aren't TooBusy. 319 # If you're in a situation where some machines are heavily loaded and 320 # never do any work, you will suffer severely if you accidentally shut 321 # down the last thread that is actually doing any work. 322 na, nlive = self.wss.num_active() 323 return (na > 3+1.3*(len(self.iqueue)+1) and 324 nlive*self.p.usefulness() < na 325 )
326 327
328 - def compute_delay(self, qdelay, delta_t):
329 """The reason we have the dependence on nw and nq is that we want to 330 shorten delays as the queue empties. Basically, we don't want any 331 processes sleeping when the queue is empty. That would just waste 332 time to no purpose. 333 334 The reason we have the dependence on delta_t is that we want to limit 335 the number of CPU cycles that are wasted in polling other machines to 336 see if they are too busy. 337 """ 338 nw = self.wss.num_active()[0] 339 nq = len(self.iqueue) 340 delay = delay_sanitize(qdelay) 341 fac1 = math.exp(1.0-self.p.usefulness()) # Useless threads sleep longer. 342 fac2 = max(0.1, float(nq+1)/float(nq+nw+1)) # When there are too many workers, sleep short. 343 delay *= min(fac1, fac2) 344 return math.sqrt(delta_t*delay) + delay
345 346
347 - def run(self):
348 self.p.mystate("running") 349 while True: 350 # die.info("Waiting on iqueue") 351 try: 352 i, todo = self.iqueue.get() 353 except MB.EOF: 354 # die.info("Got EOF on iqueue") 355 self.p.mystate("iqueue EOF") 356 break 357 t0 = time.time() 358 try: 359 self.p.send(todo) 360 except self.p.SendError, x: 361 die.warn("IO Error on send task %d to worker: %s" % (i, str(x))) 362 self.iqueue.put((i, todo)) 363 self.p.mystate("SendError") 364 break 365 try: 366 q, so, se = self.p.get() 367 except self.p.GetError, x: 368 die.warn("Exception %s when trying to read worker %s" % (x, self.p.argstring())) 369 self.iqueue.put((i, todo)) 370 self.p.mystate("BadRead") 371 break 372 t2 = time.time() 373 if isinstance(q, TooBusy): 374 self.iqueue.put((i, todo)) 375 if self.want_shutdown(): 376 die.info("TooBusy: giving up on %s" % str(so)) 377 self.p.mystate("giving up") 378 break 379 else: 380 tsleep = self.compute_delay(q.delay, t2-t0) 381 die.info("TooBusy: sleeping %.3f for %s" % (tsleep, str(so))) 382 self.p.I_am_working(0) 383 time.sleep(tsleep) 384 continue 385 self.p.I_am_working(1) 386 # die.info("Waiting on oqueue put") 387 self.oqueue.put((i, t0, t2, q)) 388 with self.solock: 389 if so: 390 sys.stdout.writelines('#slot so%d ------\n' % i) 391 sys.stdout.writelines(so) 392 sys.stdout.flush() 393 if se: 394 sys.stderr.writelines('#slot se%d ------\n' % i) 395 sys.stderr.writelines(se) 396 sys.stderr.flush() 397 if isinstance(q, RemoteException): 398 die.info("Remote Exception info: %s" % str(q.args)) 399 die.warn("Exception from remote job (index=%d): %s" % (i, str(q))) 400 q.index = "index=%d" % i 401 q.comment = "so=%s # se=%s" % (gpkmisc.truncate(';'.join(so), 40), 402 gpkmisc.truncate(';'.join(se), 40) 403 ) 404 # Eat all remaining jobs 405 while True: 406 try: 407 j, todo = self.iqueue.get() 408 except MB.EOF: 409 self.p.mystate("RemoteException") 410 break 411 self.oqueue.put( (j, t0, t2, notComputed) ) 412 # die.info("Thread %s closing via %s" % (self, self.p)) 413 self.p.close1()
414 # die.info("Thread %s closed" % self) 415
416 - def join(self, timeout=None):
417 tmp = self.p.close2() 418 threading.Thread.join(self, timeout=timeout) 419 self.wss = None # This breaks a loop of references. 420 return tmp
421 422 # def __del__(self): 423 # if self.wss is not None: 424 # die.warn("_ThreadJob not joined") 425 # else: 426 # die.info("__del__ on _ThreadJob") 427 428 429 _past_performance = PastPerformance() 430
431 -def main(todo, list_of_args, connection_factory=Connection_subprocess, 432 stdin=None, verbose=False, timing_callback=None, tail_callback=None, 433 past_performance=_past_performance):
434 """Pass a bunch of work to other processes. 435 @param stdin: a list of stuff to send to the other processes before the computation is 436 properly commenced. 437 @type stdin: list(whatever) 438 @param todo: a sequence of tasks to do 439 @type todo: sequence(whatever) 440 @param list_of_args: 441 @type list_of_args: list(list(str)) 442 @param past_performance: a L{PastPerformance} object if you want the system to remember which 443 machines were more/less successful last time and to start jobs on the more successful 444 machines first. L{None} if you don't want any memory. The default is to have memory. 445 @type past_performance: None or L{PastPerformance}. 446 @rtype: C{tuple(list(whatever), list(list(str)))} 447 @return: A 2-tuple. The first item is 448 a list of the results produced by the other processes. 449 Items in the returned list correspond to items in the todo list. 450 These are the stuff that comes out, pickled, on the standard 451 output after each chunk of data is fed into the standard input. 452 The second item is a list of the remaining outputs, as read 453 by file.readlines(); these are one per process. 454 """ 455 # sys.stderr.write('main starting\n') 456 if stdin is None: 457 stdin = [] 458 solock = threading.Lock() 459 iqueue = MB.maillist(enumerate(todo)) 460 ntodo = len(iqueue) 461 oqueue = MB.mailbox() 462 ths = workers_c(connection_factory, list_of_args, iqueue, oqueue, stdin, solock, 463 tail_callback=tail_callback, verbose=verbose, 464 past_performance=past_performance 465 ) 466 if verbose: 467 die.info("%d jobs started" % len(ths)) 468 if not ths: 469 raise RuntimeError, "No subprocesses started." 470 471 oi = 0 472 rv = [notComputed] * ntodo 473 while oi < ntodo: 474 try: 475 i, ts, te, ans = oqueue.get() 476 except MB.EOF: 477 raise RuntimeError, "whoops" 478 if timing_callback: 479 timing_callback(ts, te) 480 assert rv[i] is notComputed 481 rv[i] = ans 482 oi += 1 483 # die.info("Closing iqueue and oqueue") 484 iqueue.putclose() 485 oqueue.putclose() 486 if verbose: 487 die.info("Joining %d jobs" % len(ths)) 488 ths.join() 489 if past_performance is not None: 490 ths.pass_performance(past_performance) 491 return rv
492 493 494
495 -class workers_c(object):
496 """This creates a group of worker threads that take tasks from the iqueue and put the 497 answers on the oqueue. 498 """
499 - def __init__(self, connection_factory, list_of_args, iqueue, oqueue, stdin, solock, 500 verbose=False, tail_callback=None, past_performance=None):
501 502 self.tail_callback = tail_callback 503 self.args = list_of_args 504 self.ths = [] 505 self.verbose = verbose 506 507 for args in sorted(list_of_args, key=past_performance): 508 # There seems to be an issue with forking simultaneously 509 # from many threads. Since the file descriptors are 510 # shared, various things get closed that should be left 511 # open and vice versa. See http://psf.upfronthosting.co.za/roundup/tracker/issue2320 512 # and http://bugs.python.org/issue7213 . 513 # So, we fork in the main thread and pass the process to the new thread. 514 if self.verbose: 515 die.info("Args= %s" % str(args)) 516 try: 517 p = connection_factory(args) 518 except CannotCreateConnection, x: 519 die.warn("Cannot create connection: %s on %s" % (x, args)) 520 continue 521 t = _ThreadJob(iqueue, oqueue, p, stdin, solock, self) 522 self.ths.append(t) 523 t.start()
524
525 - def join(self):
526 nj = 0 527 for t in self.ths: 528 oo = t.join() 529 nj += 1 530 if self.tail_callback: 531 self.tail_callback(t.arglist, oo) 532 if self.verbose: 533 die.info("Joined %d jobs" % nj)
534
535 - def pass_performance(self, x):
536 x.clear() 537 for t in self.ths: 538 x.add_many(t.p.performance())
539
540 - def __len__(self):
541 return len(self.ths)
542 # This doesn't count any workers that have terminated! 543 544
545 - def num_active(self):
546 """@return: total usefulness of all workers and the number of live workers 547 @rtype: (float, int) 548 """ 549 EPS = 1e-6 550 na = 0.0 551 nlive = 0 552 # print 'act:', ','.join(['%.3f' % q.p.usefulness() for q in self.ths]) 553 for t in self.ths: 554 tmp = t.p.usefulness() 555 if tmp > EPS: 556 # We don't count threads that have been shut down, because 557 # they have tmp==0 exactly. 558 na += tmp 559 nlive += 1 560 return (na, nlive)
561 562 563
564 -def test_worker(x):
565 if x > 0 and random.random()<0.3: 566 sys.exit(random.randrange(2)) 567 sys.stderr.write('test_worker starting\n') 568 while True: 569 sys.stderr.write('test_worker loop\n') 570 try: 571 txt = CP.load(sys.stdin) 572 except EOFError: 573 sys.stderr.write('test_worker got EOF\n') 574 break 575 if random.random() < 0.1: 576 sys.stderr.write("Sending TooBusy") 577 CP.dump((TooBusy(0.1), None, None), sys.stdout, CP.HIGHEST_PROTOCOL) 578 sys.stdout.flush() 579 continue 580 if random.random() < 0.5: 581 time.sleep(random.expovariate(30.0)) 582 sys.stderr.write('test worker control=%s\n' % txt) 583 if txt is None: 584 sys.stderr.write('test_worker got stop\n') 585 break 586 sys.stderr.write('test_worker dump %s\n' % txt) 587 CP.dump((txt, ['stdout:%s\n' % txt], ['stderr\n']), sys.stdout, CP.HIGHEST_PROTOCOL) 588 sys.stdout.flush() 589 sys.stderr.write('test_worker finished\n') 590 sys.stdout.close()
591 592
593 -def test_(script):
594 for np in range(1, 5): 595 print 'NP=%d' % np 596 for i in range(1, 6): 597 print 'ntasks=%d' % (i*5) 598 x = ['a', 'b', 'c', 'd', 'e'] * i 599 arglist = [ ['python', script, 'worker', str(j)] for j in range(np) ] 600 y = list(main(x, arglist, verbose=True)) 601 assert x == y
602 603 604
605 -class unpickled_pseudofile(StringIO.StringIO):
606 """For testing. 607 """ 608
609 - def __init__(self):
610 StringIO.StringIO.__init__(self)
611
612 - def close(self):
613 self.seek(0, 0) 614 while True: 615 try: 616 d, so, se = CP.load(self) 617 except EOFError: 618 break 619 sys.stdout.write('STDOUT:\n') 620 sys.stdout.writelines(so) 621 sys.stdout.write('STDERR:\n') 622 sys.stdout.writelines(se) 623 sys.stdout.write('d=%s\n' % str(d)) 624 sys.stdout.flush()
625 626
627 -def one_shot_test(input):
628 stdin = StringIO.StringIO() 629 CP.dump(input, stdin) 630 stdin.flush() 631 stdin.seek(0, 0) 632 stdout = unpickled_pseudofile() 633 return (stdin, stdout)
634 635 636
637 -def replace(list_of_lists, *fr):
638 # assert isinstance(list_of_lists, list) 639 frc = [] 640 while fr: 641 frc.append( (re.compile(fr[0]), fr[1]) ) 642 fr = fr[2:] 643 o = [] 644 for l in list_of_lists: 645 assert isinstance(l, (tuple, list)), "List of lists contains %s within %s" % (repr(l), list_of_lists) 646 tmp = [] 647 for t in l: 648 for (find, repl) in frc: 649 assert isinstance(t, str), "whoops! t=%s" % str(t) 650 t = find.sub(repl, t) 651 tmp.append(t) 652 o.append(tmp) 653 return o
654 655
656 -def Replace(list_of_lists, pat, length, replacement):
657 assert isinstance(replacement, list) 658 assert length > 0 659 cpat = re.compile(pat) 660 o = [] 661 for l in list_of_lists: 662 assert isinstance(l, (tuple, list)), "List of lists contains %s within %s" % (repr(l), list_of_lists) 663 tmp = list(l) 664 while tmp: 665 found = None 666 for (i, t) in enumerate(tmp): 667 if cpat.match(t): 668 found = i 669 break 670 if found is not None: 671 tmp[i:i+length] = list(replacement) 672 else: 673 break 674 o.append(tmp) 675 return o
676 677
678 -def append(list_of_lists, *a):
679 o = [] 680 for l in list_of_lists: 681 tmp = tuple(l) + a 682 o.append(tmp) 683 return o
684 685 686 687 if __name__ == '__main__': 688 if len(sys.argv)==3 and sys.argv[1] == 'worker': 689 test_worker(int(sys.argv[2])) 690 else: 691 test_(sys.argv[0]) 692