Package gmisclib :: Module spread_jobs
[frames] | no frames]

Module spread_jobs

source code

A module that starts a bunch of subprocesses and distributes work amongst them, then collects the results.

Subprocesses must follow a protocol: they must listen for commands on the standard input (commands are encoded with cPickle), and they must produce cPickled tuples on their standard output. NOTE: THEY CANNOT PRINT ANYTHING ELSE! (But it's OK for subprocesses to produce stuff on the standard error output.)

PROTOCOL:

  1. Execcing state: spread_jobs execs a group of subprocesses. You have full control of the argument list.
  2. Preparation state: spread_jobs will send a list of cPickled things, one at a time to the subprocess. No explicit terminator is added, so the subprocess must either know how many things are coming or the list should contain some terminator. (E.g. the last item of the list could be None, and the subprocess would wait for it.) In this state, the subprocess must not produce anything on the standard output.
  3. Running state: spread_jobs will send one cPickled thing to the subprocess and then wait for a cPickled tuple to come back.

    The request to the subprocess is a tuple(int, arbitrary). The int is a task-ID number which must be returned with the answer. The arbitrary is whatever information the subprocess needs to do its job.

    The subprocess responds with a 3-tuple. The first element of the tuple is either:

    The subprocess loops in the running state. Normally, it should terminate when its standard input is closed. (It can terminate itself if it wishes by simply closing the standard output and exiting.)

  4. Shutdown state: The subprocess can produce anything it wants. This will be collected up and returned to the caller of spread_jobs.main.

    You can use this to run certain normal linux commands by not sending anything in the preparatory state or the running state. You will then be handed the standard output as a list of strings.

Normally, the action happens in the running state.

Normally, the subprocess looks much like this:

       import cPickle
       while True:
               try:
                       control = cPickle.load(stdin)
               except EOFError:
                       break
               d, so, se = compute(control)
               cPickle.dump((d, so, se), stdout, CP.HIGHEST_PROTOCOL)
               stdout.flush()
       stdout.close()
Classes
  notComputed
A singleton marker for values that haven't been computed.
  NoResponse
  RemoteException
An exception that corresponds to one raised by a subprocess.
  TooBusy
  PastPerformance
This class keeps track of which machines are more and less successful.
  CannotCreateConnection
  Connection
This class represents a connection from the master process down to one of the slaves.
  Connection_subprocess
This is a Connection via stdin/stdout to a subprocess.
  workers_c
This creates a group of worker threads that take tasks from the iqueue and put the answers on the oqueue.
  unpickled_pseudofile
For testing.
Functions
tuple(list(whatever), list(list(str)))
main(todo, list_of_args, connection_factory=<class 'gmisclib.spread_jobs.Connection_subprocess'>, stdin=None, verbose=False, timing_callback=None, tail_callback=None, past_performance={})
Pass a bunch of work to other processes.
source code
 
replace(list_of_lists, *fr) source code
 
Replace(list_of_lists, pat, length, replacement) source code
 
append(list_of_lists, *a) source code
 
delay_sanitize(x) source code
 
test_worker(x) source code
 
test_(script) source code
 
one_shot_test(input) source code
Variables
  __package__ = 'gmisclib'

Imports: re, sys, math, time, random, CP, threading, subprocess, StringIO, die, gpkmisc, dictops, MB


Function Details

main(todo, list_of_args, connection_factory=<class 'gmisclib.spread_jobs.Connection_subprocess'>, stdin=None, verbose=False, timing_callback=None, tail_callback=None, past_performance={})

source code 

Pass a bunch of work to other processes.

Parameters:
  • stdin (list(whatever)) - a list of stuff to send to the other processes before the computation is properly commenced.
  • todo (sequence(whatever)) - a sequence of tasks to do
  • list_of_args (list(list(str)))
  • past_performance (None or PastPerformance.) - a PastPerformance object if you want the system to remember which machines were more/less successful last time and to start jobs on the more successful machines first. None if you don't want any memory. The default is to have memory.
Returns: tuple(list(whatever), list(list(str)))
A 2-tuple. The first item is a list of the results produced by the other processes. Items in the returned list correspond to items in the todo list. These are the stuff that comes out, pickled, on the standard output after each chunk of data is fed into the standard input. The second item is a list of the remaining outputs, as read by file.readlines(); these are one per process.