Module run_several
[frames] | no frames]

Source Code for Module run_several

  1  #! python 
  2   
  3  """This script runs a bunch of processes in parallel. 
  4  You tell it what to run on the standard input. 
  5  It spawns off a specified number of subprocesses, and hands the jobs 
  6  to the subprocesses. 
  7   
  8  Usage: run_several.py [-v] Ncpu Oom_adjust <list_of_commands >log_of_outputs 
  9   
 10  Flags: C{-v} means "be more verbose". 
 11   
 12  C{Ncpu} is the maximum number of cores to devote to running these tasks 
 13  (you can enter zero to have it use all the cores of the local machine), 
 14  or C{+N} or C{-N} to have it use C{N} more (or less) cores than the machine 
 15  has. Further C{x} and C{*} allow you to multiply the number of CPUs by a factor. 
 16  (NB: The C{+}, C{-}, C{x}, C{*} forms always leave at least one core running, 
 17  no matter what C{N} is or how few cores you have.) 
 18  All this can be useful if you want to avoid loading the machine too heavily, 
 19  or if you want to boost the loading because your jobs are limited by disk seek time. 
 20   
 21  C{Oom_adjust} is a number given to Linux's OOM killer.   When 
 22  the system starts running out of memory, the OOM killer kills processes. 
 23  Positive numbers (e.g. 3) make it more likely that a process will be 
 24  killed.   So, if your subtasks are likely to use too much memory 
 25  and are not critical, set Oom_adjust positive. 
 26   
 27  The commands in list_of_commands are just piped into Bash's standard input 
 28  one at a time.   Each line is (typically) processed by a different 
 29  instance of bash, so don't try any multi-line commands. 
 30  The resulting standard outputs and standard errors are kept separate 
 31  and each processes' outputs are printed as a lump when it completes. 
 32  Stderr from a subprocess comes out as stdout from run_several.py; 
 33  various blocks of output are separated by lines with hash marks and 
 34  strings of "----" (see write_msgs()). 
 35   
 36  Note that the processes may finish in any arbitrary order.   The integer 
 37  on the stdout and stderr separator lines gives you the (zero-based) line number 
 38  in the input file.    Run_several.py returns failure if any of its 
 39  subprocesses fail, and it will terminate on the first failure. 
 40   
 41  Example 1:: 
 42   
 43          $ echo "pwd" | run_several.py 0 0 
 44          # command 0: pwd 
 45          # stdout 0 ----------------(exited with 0) 
 46          /home/gpk/speechresearch/gmisclib/bin 
 47          # stderr 0 ---------------- 
 48          $ 
 49   
 50  Example 2:: 
 51   
 52          $ { echo uname -m; echo uname -p; echo uname -o;} | run_several.py 0 0 
 53          # command 0: uname -m 
 54          # stdout 0 ----------------(exited with 0) 
 55          x86_64 
 56          # stderr 0 ---------------- 
 57          # command 1: uname -p 
 58          # stdout 1 ----------------(exited with 0) 
 59          unknown 
 60          # stderr 1 ---------------- 
 61          # command 2: uname -o 
 62          # stdout 2 ----------------(exited with 0) 
 63          GNU/Linux 
 64          # stderr 2 ---------------- 
 65          $  
 66  """ 
 67   
 68  import sys 
 69  import time 
 70  import tempfile 
 71  import subprocess 
 72  from gmisclib import die 
 73  from gmisclib import system_load as SL 
 74   
 75  #: Basic interval to sleep when waiting for the system to become less 
 76  #: heavily loaded. 
 77  DT = 1.0        # Seconds 
 78   
79 -def write_msgs(p, exitstatus):
80 """This produces the output. 81 """ 82 p.x_stderr.seek(0) 83 p.x_stdout.seek(0) 84 sys.stdout.writelines('# command %d: %s\n' % (p.x_i, p.commandline)) 85 sys.stdout.writelines('# stdout %d ----------------(exited with %d)\n' % (p.x_i, exitstatus)) 86 sys.stdout.writelines(p.x_stdout.readlines()) 87 sys.stdout.writelines('# stderr %d ----------------\n' % p.x_i) 88 sys.stdout.writelines(p.x_stderr.readlines()) 89 sys.stdout.flush() 90 p.x_stderr.close() 91 p.x_stdout.close()
92 93
94 -def get_ncpu():
95 n = 0 96 for l in open("/proc/cpuinfo", "r"): 97 if l.startswith('processor'): 98 n += 1 99 assert n > 0 100 return n
101 102
103 -def wait_until_unloaded():
104 """Wait a while until the system becomes less loaded. 105 It won't wait forever, though. 106 """ 107 ncpu = SL.get_ncpu() 108 delay = 0 109 while delay < 20: 110 if SL.get_loadavg() < 2*ncpu and SL.mem_pressure() < 0.8: 111 break 112 die.info('# too busy: %.1f/%d %.2f' % (SL.get_loadavg(), ncpu, SL.mem_pressure(ncpu))) 113 time.sleep(2*delay*DT) 114 delay += 1
115 116
117 -def set_oom(pid, ooma):
118 trials = 0 119 while trials < 6: 120 try: 121 open('/proc/%d/oom_adj' % pid, 'w').writelines('%d\n' % ooma) 122 except IOError, x: 123 time.sleep(trials*DT/10.0) 124 else: 125 trials += 1 126 break 127 if trials > 3: 128 die.warn("Slow start for process %d: writing to /proc/%d/oom_adj: %s" % (pid, pid, x))
129 130
131 -def run_processes(fd, np, ooma, verbose):
132 running = [] 133 for (i,line) in enumerate(fd): 134 j = 0 135 while j < len(running): 136 tmp = running[j] 137 es = tmp.poll() 138 if es is not None: 139 if verbose: 140 die.info("[End] %d" % es) 141 write_msgs(tmp, es) 142 if es != 0: 143 sys.exit(es) 144 del running[j] 145 else: 146 j += 1 147 while len(running) >= np: 148 tmp = running.pop(0) 149 es = tmp.wait() 150 if verbose: 151 die.info("[End] %d" % es) 152 write_msgs(tmp, es) 153 if es != 0: 154 sys.exit(es) 155 stderr = tempfile.TemporaryFile(prefix="stderr") 156 stdout = tempfile.TemporaryFile(prefix="stdout") 157 wait_until_unloaded() 158 if verbose: 159 die.info("[Start] %s" % line.strip()) 160 p = subprocess.Popen(['bash'], stderr=stderr, stdout=stdout, stdin=subprocess.PIPE) 161 p.commandline = line.strip() 162 p.x_stderr = stderr 163 p.x_stdout = stdout 164 p.x_i = i 165 set_oom(p.pid, ooma) 166 p.stdin.write(line) 167 p.stdin.close() 168 running.append(p) 169 if np>1 and i < np: 170 # Don't start the first few processes exactly at the same time. 171 # The delays let the memory statistics and system load catch up. 172 # This can be important for wait_until_unloaded() 173 time.sleep(DT) 174 while running: 175 tmp = running.pop(0) 176 es = tmp.wait() 177 if verbose: 178 die.info("[End] %d" % es) 179 write_msgs(tmp, es) 180 if es != 0: 181 sys.exit(es)
182 183
184 -def parse_NP(s):
185 if s.startswith('+') or s.startswith('-'): 186 delta = int(s[1:]) 187 assert -100 <= delta < 1000 188 if s.startswith('-'): 189 delta = -delta 190 return max(1, SL.get_ncpu() + delta) 191 elif s.startswith('*') or s.startswith('x'): 192 fac = float(s[1:]) 193 assert 0 <= fac < 10 194 return max(1, int(round(SL.get_ncpu()*fac))) 195 n = int(s) 196 assert 0 <= n < 10000 197 if n == 0: 198 return SL.get_ncpu() 199 return n
200 201 202 if __name__ == '__main__': 203 if sys.argv[1] == '-v': 204 sys.argv.pop(1) 205 verbose = True 206 else: 207 verbose = False 208 209 NP = parse_NP(sys.argv[1]) 210 assert 0 < NP < 100 211 oom_adj = int(sys.argv[2]) 212 assert -32 < oom_adj < 32 213 run_processes(sys.stdin, NP, oom_adj, verbose) 214