Package gmisclib :: Module threaded_io
[frames] | no frames]

Source Code for Module gmisclib.threaded_io

  1  """This is designed to let you do asynchronous I/O conveniently. 
  2  """ 
  3   
  4  from __future__ import with_statement 
  5   
  6  import sys 
  7  import threading 
  8  import die 
  9  import system_load as SL 
 10  from gyropy import g_mailbox 
 11   
 12  Lock = threading.Lock 
13 14 -class CaughtException(object):
15 - def __init__(self, x):
16 self.ex_type, self.ex_value, self.ex_traceback = x
17
18 - def __repr__(self):
19 return "CaughtException: %s" % str(self.ex_value)
20
21 - def reraise(self):
22 raise self.ex_type, self.ex_value, self.ex_traceback
23 24 get = reraise
25
26 27 -class threading_with_result(threading.Thread):
28 - def __init__(self, tag=None, group=None, target=None, name=None, args=(), kwargs={}):
29 threading.Thread.__init__(self, group=group, target=target, name=name, args=args, kwargs=kwargs) 30 self.rv = None 31 self.rvset = False 32 self.tag = tag
33
34 - def run(self):
35 try: 36 self.rv = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) 37 except: 38 self.rv = CaughtException(sys.exc_info()) 39 self.rvset = True
40
41 - def get(self):
42 assert self.rvset 43 check_n_raise(self.rv) 44 return self.rv
45
46 47 -class Thread_pool(object):
48 EOF = g_mailbox.EOF 49 CaughtException = CaughtException 50
51 - def __init__(self, n=None):
52 if n is None: 53 n = SL.ncpu() 54 assert n > 0 55 self.iqueue = g_mailbox.listbox() 56 self.t = [] 57 for i in range(n): 58 tt = threading.Thread(target=self._target, args=(self.iqueue,) ) 59 tt.start() 60 self.t.append(tt)
61
62 - def __len__(self):
63 return len(self.t)
64
65 - def startRV(self, target=None, args=None, kwargs=None):
66 assert target is not None 67 wbox = g_mailbox.waitbox() 68 self.iqueue.put((wbox, target, args, kwargs)) 69 return wbox
70
71 - def startNR(self, target=None, args=None, kwargs=None):
72 assert target is not None 73 self.iqueue.put((None, target, args, kwargs))
74
75 - def startQ(self, queue, tag, target=None, args=None, kwargs=None):
76 def wrapped_target(target, *args, **kwargs): 77 return (tag, target(*args, **kwargs))
78 self.iqueue.put((queue, wrapped_target, args, kwargs))
79 80 @classmethod
81 - def _target(cls, iqueue):
82 while True: 83 try: 84 wbox, target, args, kwargs = iqueue.get() 85 except cls.EOF: 86 break 87 if kwargs is None: 88 kwargs = {} 89 if args is None: 90 args = () 91 92 try: 93 if wbox is not None: 94 wbox.put(target(*args, **kwargs)) 95 else: 96 target(*args, **kwargs) 97 except: 98 if wbox is not None: 99 wbox.put( cls.CaughtException(sys.exc_info()) ) 100 else: 101 die.catch("Uncaught exception in Thread_pool thread")
102
103 - def join(self):
104 if self.t: 105 self.iqueue.close() 106 while self.t: 107 tmp = self.t.pop(0) 108 tmp.join()
109
110 - def __del__(self):
111 self.join()
112
113 114 115 116 -def to_be_joined(tlist):
117 """Pick a ripe thread for joining. 118 @type tlist: list(L{threading.Thread}) 119 @param tlist: a list of threads. 120 @return: a thread that is either finished and ready to be joined, 121 or (if none are ready yet) the oldest thread. 122 @rtype: L{threading.Thread} 123 """ 124 for (i, t) in enumerate(tlist): 125 if not t.isAlive(): 126 return tlist.pop(i) 127 return tlist.pop(0)
128
129 130 -class Thread_poolW(Thread_pool):
131 """Here, you have a pool of n threads. You can call 132 C{x.start(function, args, kwargs)} to start something running 133 (perhaps a write to a file), and then go off and compute 134 something else while the I/O completes. 135 If you need to wait for completion, call C{x.join()}. 136 137 This class is not designed to return values from the function. 138 """ 139
140 - def __init__(self, n=None):
141 Thread_pool.__init__(self, n)
142 143
144 - def start(self, fn, args=None, kwargs=None):
145 self.startNR(fn, args, kwargs)
146 147 148 149 150 Thread_poolNR = Thread_poolW # Obsolete
151 152 153 -def check_n_raise(x):
154 if isinstance(x, CaughtException): 155 x.reraise() 156 return x
157
158 159 -class Thread_poolR(Thread_pool):
160 """Here, you have a pool of n threads. You can call 161 C{x.start(function, args, kwargs)} to start something running 162 (perhaps a write to a file), and then go off and compute 163 something else while the I/O completes. 164 If you need to wait for completion, call C{x.join()}. 165 166 This class is designed to return values from the function via 167 C{get()} or C{getany()}. 168 """ 169
170 - def __init__(self, n=None):
171 Thread_pool.__init__(self, n) 172 self.lock = threading.Lock() 173 self.oqueue = g_mailbox.mailbox() 174 self.tags = {}
175 176
177 - def start(self, tag, fn, args=None, kwargs=None):
178 with self.lock: 179 self.tags[tag] = self.startRV(target=fn, args=args, kwargs=kwargs)
180 181
182 - def getany(self):
183 """ 184 @return: the first available answer, as a (tag, value) pair. The tag was set 185 in the call to C{start} and the value is the result of the computation. 186 If the computation raised an exectption, C{getany} will return a CaughtException 187 object instead. 188 @rtype: a tuple(whatever, whatever) pair from a computation or a L{CaughtException} 189 """ 190 with self.lock: 191 for tag in self.tags.keys(): 192 if self.tags[tag]: 193 return (tag, self.tags.pop(tag).get()) 194 # This is not quite right. It picks a waitbox to wait on, but it 195 # really should defer that decision until one has an answer available. 196 tag, v = self.tags.popitem() 197 return (tag, v.get())
198 199
200 - def get(self, tag):
201 """ 202 @rtype: whatever or a L{CaughtException} 203 """ 204 with self.lock: 205 return self.tags.pop(tag).get()
206 207
208 - def has_answer(self):
209 """Is there an answer ready?""" 210 with self.lock: 211 for (k, v) in self.tags.items(): 212 if len(v): 213 return True 214 return False
215
216 217 # def loaded(self): 218 # """Have we loaded up all the processors?""" 219 # with self.lock: 220 # # This is not quite right. It only returns True when there 221 # # is one more job than processor. 222 # return len(self.iqueue) > 0 223 224 225 226 -def pairmap(fn, inlist, *args, **kwargs):
227 tp = Thread_poolR(kwargs.get('_poolsize', None)) 228 no = 0 229 s = 0 230 for (j,xi) in enumerate(inlist): 231 tp.start(xi, fn, (xi,)+args, kwargs) 232 s += 1 233 if s > len(tp): 234 yield tp.getany() 235 no += 1 236 while no < s: 237 yield tp.getany() 238 no += 1
239
240 241 -def testmap():
242 q = list(pairmap(lambda x:x, range(10))) 243 assert len(q)==10 244 seen = set() 245 for (i,j) in q: 246 assert i==j 247 seen.add(i) 248 assert sorted(seen)==range(10)
249 250 251 if __name__ == '__main__': 252 testmap() 253