1
2
3 """This script runs a bunch of processes in parallel.
4 You tell it what to run on the standard input.
5 It spawns off a specified number of subprocesses, and hands the jobs
6 to the subprocesses.
7
8 Usage: run_several.py [-v] Ncpu Oom_adjust <list_of_commands >log_of_outputs
9
10 Flags: C{-v} means "be more verbose".
11
12 C{Ncpu} is the maximum number of cores to devote to running these tasks
13 (you can enter zero to have it use all the cores of the local machine),
14 or C{+N} or C{-N} to have it use C{N} more (or less) cores than the machine
15 has. Further C{x} and C{*} allow you to multiply the number of CPUs by a factor.
16 (NB: The C{+}, C{-}, C{x}, C{*} forms always leave at least one core running,
17 no matter what C{N} is or how few cores you have.)
18 All this can be useful if you want to avoid loading the machine too heavily,
19 or if you want to boost the loading because your jobs are limited by disk seek time.
20
21 C{Oom_adjust} is a number given to Linux's OOM killer. When
22 the system starts running out of memory, the OOM killer kills processes.
23 Positive numbers (e.g. 3) make it more likely that a process will be
24 killed. So, if your subtasks are likely to use too much memory
25 and are not critical, set Oom_adjust positive.
26
27 The commands in list_of_commands are just piped into Bash's standard input
28 one at a time. Each line is (typically) processed by a different
29 instance of bash, so don't try any multi-line commands.
30 The resulting standard outputs and standard errors are kept separate
31 and each processes' outputs are printed as a lump when it completes.
32 Stderr from a subprocess comes out as stdout from run_several.py;
33 various blocks of output are separated by lines with hash marks and
34 strings of "----" (see write_msgs()).
35
36 Note that the processes may finish in any arbitrary order. The integer
37 on the stdout and stderr separator lines gives you the (zero-based) line number
38 in the input file. Run_several.py returns failure if any of its
39 subprocesses fail, and it will terminate on the first failure.
40
41 Example 1::
42
43 $ echo "pwd" | run_several.py 0 0
44 # command 0: pwd
45 # stdout 0 ----------------(exited with 0)
46 /home/gpk/speechresearch/gmisclib/bin
47 # stderr 0 ----------------
48 $
49
50 Example 2::
51
52 $ { echo uname -m; echo uname -p; echo uname -o;} | run_several.py 0 0
53 # command 0: uname -m
54 # stdout 0 ----------------(exited with 0)
55 x86_64
56 # stderr 0 ----------------
57 # command 1: uname -p
58 # stdout 1 ----------------(exited with 0)
59 unknown
60 # stderr 1 ----------------
61 # command 2: uname -o
62 # stdout 2 ----------------(exited with 0)
63 GNU/Linux
64 # stderr 2 ----------------
65 $
66 """
67
68 import sys
69 import time
70 import tempfile
71 import subprocess
72 from gmisclib import die
73 from gmisclib import system_load as SL
74
75
76
77 DT = 1.0
78
80 """This produces the output.
81 """
82 p.x_stderr.seek(0)
83 p.x_stdout.seek(0)
84 sys.stdout.writelines('# command %d: %s\n' % (p.x_i, p.commandline))
85 sys.stdout.writelines('# stdout %d ----------------(exited with %d)\n' % (p.x_i, exitstatus))
86 sys.stdout.writelines(p.x_stdout.readlines())
87 sys.stdout.writelines('# stderr %d ----------------\n' % p.x_i)
88 sys.stdout.writelines(p.x_stderr.readlines())
89 sys.stdout.flush()
90 p.x_stderr.close()
91 p.x_stdout.close()
92
93
95 n = 0
96 for l in open("/proc/cpuinfo", "r"):
97 if l.startswith('processor'):
98 n += 1
99 assert n > 0
100 return n
101
102
104 """Wait a while until the system becomes less loaded.
105 It won't wait forever, though.
106 """
107 ncpu = SL.get_ncpu()
108 delay = 0
109 while delay < 20:
110 if SL.get_loadavg() < 2*ncpu and SL.mem_pressure() < 0.8:
111 break
112 die.info('# too busy: %.1f/%d %.2f' % (SL.get_loadavg(), ncpu, SL.mem_pressure(ncpu)))
113 time.sleep(2*delay*DT)
114 delay += 1
115
116
118 trials = 0
119 while trials < 6:
120 try:
121 open('/proc/%d/oom_adj' % pid, 'w').writelines('%d\n' % ooma)
122 except IOError, x:
123 time.sleep(trials*DT/10.0)
124 else:
125 trials += 1
126 break
127 if trials > 3:
128 die.warn("Slow start for process %d: writing to /proc/%d/oom_adj: %s" % (pid, pid, x))
129
130
132 running = []
133 for (i,line) in enumerate(fd):
134 j = 0
135 while j < len(running):
136 tmp = running[j]
137 es = tmp.poll()
138 if es is not None:
139 if verbose:
140 die.info("[End] %d" % es)
141 write_msgs(tmp, es)
142 if es != 0:
143 sys.exit(es)
144 del running[j]
145 else:
146 j += 1
147 while len(running) >= np:
148 tmp = running.pop(0)
149 es = tmp.wait()
150 if verbose:
151 die.info("[End] %d" % es)
152 write_msgs(tmp, es)
153 if es != 0:
154 sys.exit(es)
155 stderr = tempfile.TemporaryFile(prefix="stderr")
156 stdout = tempfile.TemporaryFile(prefix="stdout")
157 wait_until_unloaded()
158 if verbose:
159 die.info("[Start] %s" % line.strip())
160 p = subprocess.Popen(['bash'], stderr=stderr, stdout=stdout, stdin=subprocess.PIPE)
161 p.commandline = line.strip()
162 p.x_stderr = stderr
163 p.x_stdout = stdout
164 p.x_i = i
165 set_oom(p.pid, ooma)
166 p.stdin.write(line)
167 p.stdin.close()
168 running.append(p)
169 if np>1 and i < np:
170
171
172
173 time.sleep(DT)
174 while running:
175 tmp = running.pop(0)
176 es = tmp.wait()
177 if verbose:
178 die.info("[End] %d" % es)
179 write_msgs(tmp, es)
180 if es != 0:
181 sys.exit(es)
182
183
185 if s.startswith('+') or s.startswith('-'):
186 delta = int(s[1:])
187 assert -100 <= delta < 1000
188 if s.startswith('-'):
189 delta = -delta
190 return max(1, SL.get_ncpu() + delta)
191 elif s.startswith('*') or s.startswith('x'):
192 fac = float(s[1:])
193 assert 0 <= fac < 10
194 return max(1, int(round(SL.get_ncpu()*fac)))
195 n = int(s)
196 assert 0 <= n < 10000
197 if n == 0:
198 return SL.get_ncpu()
199 return n
200
201
202 if __name__ == '__main__':
203 if sys.argv[1] == '-v':
204 sys.argv.pop(1)
205 verbose = True
206 else:
207 verbose = False
208
209 NP = parse_NP(sys.argv[1])
210 assert 0 < NP < 100
211 oom_adj = int(sys.argv[2])
212 assert -32 < oom_adj < 32
213 run_processes(sys.stdin, NP, oom_adj, verbose)
214