Package gmisclib :: Module g_pipe
[frames] | no frames]

Source Code for Module gmisclib.g_pipe

  1  """A multithreaded version of os.popen2().  Note that the argument list 
  2  isn't quite the same.""" 
  3   
  4   
  5  import subprocess as S 
  6   
  7   
  8   
  9   
10 -class pfd(object):
11 __doc__ = """Pseudo-file descriptor. Just like a FD, except 12 that it waits for the process at the end of the pipe 13 to terminate when you close it. 14 """ 15
16 - def __init__(self, p):
17 self.fd = p.stdout 18 assert p is not None 19 self.p = p 20 self.mode = "r" 21 self.closed = False 22 23 self.readline = self.fd.readline 24 self.next = self.fd.next 25 self.flush = self.fd.flush 26 self.__iter__ = self.fd.__iter__ 27 self.read = self.fd.read 28 self.readlines = self.fd.readlines 29 self.fileno = self.fd.fileno
30 31 # def read ... copied from file.read
32 - def read(self):
33 return self.fd.read()
34 35 # def readline ... copied from file.readline
36 - def readline(self):
37 return self.fd.readline()
38 39 # def readlines ... copied from file.readlines
40 - def readlines(self):
41 return self.fd.readlines()
42 43 # def next ... copied from file.next
44 - def next(self):
45 return self.fd.next()
46 47 # def __iter__ ... copied from file.__iter__ 48
49 - def __iter__(self):
50 return self.fd.__iter__()
51 52 # def flush ... copied from file.flush
53 - def flush(self):
54 return
55 56 # def fileno ... copied from file.fileno 57 58
59 - def xreadlines(self, sizehint=-1):
60 return self.fd.xreadlines(sizehint)
61 62
63 - def close(self):
64 if not self.closed: 65 self.fd.close() 66 self.closed = True 67 self.p.wait() 68 return self.p.returncode
69 70
71 - def __del__(self):
72 self.close()
73 74
75 -def popen2(path, args, bufsize=0):
76 """Forks off a process, and returns the processes 77 input and output file descriptors. The latter is really 78 a pfd (defined above). 79 80 Path and args are passed directly into os.execvp(). 81 """ 82 83 p = S.Popen(args, executable=path, 84 stdin=S.PIPE, stdout=S.PIPE, close_fds=True) 85 return (p.stdin, pfd(p))
86 87
88 -def test():
89 si, so = popen2("sed", ["sed", "-e", "s/or/er/"]) 90 si.write("hello, world!\nsecond line\n") 91 si.close() 92 assert so.readline() == "hello, werld!\n" 93 a = so.readlines() 94 assert len(a) == 1 95 assert a[0] == "second line\n" 96 tmp = so.close() 97 assert tmp == 0, "tmp=%s" % tmp
98 99 if __name__ == '__main__': 100 test() 101 test() 102