Package gyropy :: Module g_pipe
[frames] | no frames]

Source Code for Module gyropy.g_pipe

  1  """A multithreaded version of os.popen2().  Note that the argument list 
  2  isn't quite the same.""" 
  3   
  4   
  5  import subprocess as S 
  6   
  7   
  8   
  9   
10 -class pfd(object):
11 __doc__ = """Pseudo-file descriptor. Just like a FD, except 12 that it waits for the process at the end of the pipe 13 to terminate when you close it. 14 """ 15
16 - def __init__(self, p):
17 self.fd = p.stdout 18 self.p = p 19 self.mode = "r" 20 self.closed = False 21 22 self.readline = self.fd.readline 23 self.next = self.fd.next 24 self.flush = self.fd.flush 25 self.__iter__ = self.fd.__iter__ 26 self.read = self.fd.read 27 self.readlines = self.fd.readlines 28 self.fileno = self.fd.fileno
29 30 # def read ... copied from file.read
31 - def read(self):
32 return self.fd.read()
33 34 # def readline ... copied from file.readline
35 - def readline(self):
36 return self.fd.readline()
37 38 # def readlines ... copied from file.readlines
39 - def readlines(self):
40 return self.fd.readlines()
41 42 # def next ... copied from file.next
43 - def next(self):
44 return self.fd.next()
45 46 # def __iter__ ... copied from file.__iter__ 47
48 - def __iter__(self):
49 return self.fd.__iter__()
50 51 # def flush ... copied from file.flush
52 - def flush(self):
53 return
54 55 # def fileno ... copied from file.fileno 56 57
58 - def xreadlines(self, sizehint=-1):
59 return self.fd.xreadlines(sizehint)
60 61
62 - def close(self):
63 if not self.closed: 64 self.fd.close() 65 self.closed = True 66 self.p.wait() 67 return self.p.returncode
68 69
70 - def __del__(self):
71 self.close()
72 73
74 -def popen2(path, args, bufsize=0):
75 """Forks off a process, and returns the processes 76 input and output file descriptors. The latter is really 77 a pfd (defined above). 78 79 Path and args are passed directly into os.execvp(). 80 """ 81 82 p = S.Popen(args, executable=path, 83 stdin=S.PIPE, stdout=S.PIPE, close_fds=True) 84 return (p.stdin, pfd(p))
85 86
87 -def test():
88 si, so = popen2("sed", ["sed", "-e", "s/or/er/"]) 89 si.write("hello, world!\nsecond line\n") 90 si.close() 91 assert so.readline() == "hello, werld!\n" 92 a = so.readlines() 93 assert len(a) == 1 94 assert a[0] == "second line\n" 95 tmp = so.close() 96 assert tmp is None, "tmp=%s" % tmp
97 98 if __name__ == '__main__': 99 test() 100 test() 101