1 from __future__ import with_statement
2
3 """A multi-producer, multi-consumer mailbox that can contain one item."""
4
5 import thread as T
6 import sys
7
8
9 DEBUG = False
10
11
12 start_new_thread = T.start_new_thread
13 get_ident = T.get_ident
14
15
16
18 - def __init__(self, fname=None, llen=79):
19 self.fd = open(fname, 'w')
20 self.llen = llen
21 self.offset = {}
22 self.lock = T.allocate_lock()
23 self.spaces = ' ' * (llen-1) + '\n'
24 self.tname = {}
25
26 - def write(self, name, *s):
27 with self.lock:
28 me = T.get_ident()
29 tname = self.tname.get(me,me)
30 self._write('LC:', name, '\tth:', tname, '\t', *s)
31 self._write('TH:', tname, '\to:', name, '\t', *s)
32
33 - def _write(self, ty, name, *s):
34 try:
35 o = self.offset[name]
36 except KeyError:
37 o = len(self.offset) * self.llen
38 self.offset[name] = o
39 s = '%s %s: %s\n' % (ty, name, ''.join( [ str(q) for q in s ] ) )
40 ls = len(s)
41 if ls > self.llen:
42 s = s[:self.llen-1] + '\n'
43 elif ls < self.llen:
44 s = s[:-1] + self.spaces[ls-1:]
45 assert len(s) == self.llen
46 self.fd.seek(o, 0)
47 self.fd.write(s)
48 self.fd.flush()
49 sys.stderr.write(s)
50 sys.stderr.flush()
51
53 with self.lock:
54 if me is None:
55 me = T.get_ident()
56 self.tname[me] = name
57
59 with self.lock:
60 me = T.get_ident()
61 tmp = self.tname.get(me, me)
62 return tmp
63
64 if __debug__:
65 Lockstate = State('g_threading.state')
66 Lockstate.setname('Main Thread')
67 else:
68 Lockstate = None
69
70
72 ls = Lockstate
73
74 - def __init__(self, verbose=0, name=None):
75 self.lock = T.allocate_lock()
76 self.verbose = verbose
77 self.name = '%s%d' % (name, id(self))
78
80 if __debug__ and self.verbose > 1:
81 self.ls.write(self.name, 'Lock.before acquire')
82 self.lock.acquire()
83 if __debug__ and self.verbose > 1:
84 self.ls.write(self.name, 'Lock.after acquire')
85
87 if __debug__ and not self.lock.locked():
88 if self.verbose:
89 self.ls.write(self.name, 'ERR: release but not locked.')
90 raise AssertionError, "Release of unlocked lock: %s" % self.name
91 self.lock.release()
92 if __debug__ and self.verbose > 1:
93 self.ls.write(self.name, 'Lock.released')
94
97
99 if __debug__ and self.lock.locked():
100 if self.verbose:
101 self.ls.write(self.name, 'ERR: __del__ while locked.')
102 raise AssertionError, "__del__ of locked Lock: %s" % self.name
103
105 self.lock.acquire()
106 return self
107
108 - def __exit__(self, ty, value, traceback):
109 self.lock.release()
110 return False
111
112
114 ls = Lockstate
115
116 - def __init__(self, lock=None, verbose=0, name=None):
117 if lock is None:
118 self.lock = T.allocate_lock()
119 else:
120 self.lock = lock
121 self.waiters = []
122 self.verbose = verbose
123 self.name = '%s%d' % (name, id(self))
124
126 """Called from lock."""
127 if __debug__ and not self.lock.locked():
128 if self.verbose:
129 self.ls.write(self.name, 'ERR: wait but not locked.')
130 raise AssertionError, "Wait on unlocked Condition: %s" % self.name
131 tmplock = T.allocate_lock()
132 tmplock.acquire()
133 self.waiters.append( tmplock )
134 self.lock.release()
135 if __debug__ and self.verbose:
136 self.ls.write(self.name, 'Condition.before wait')
137 tmplock.acquire()
138 self.lock.acquire()
139 if __debug__ and self.verbose:
140 self.ls.write(self.name, 'Condition.after wait')
141
143 if __debug__ and self.verbose > 1:
144 self.ls.write(self.name, 'Condition.before acquire')
145 self.lock.acquire()
146 if __debug__ and self.verbose > 1:
147 self.ls.write(self.name, 'Condition.after acquire')
148
150 if __debug__ and not self.lock.locked():
151 if self.verbose:
152 self.ls.write(self.name, 'ERR: release but not locked.')
153 raise AssertionError, "Release of unlocked Condition: %s" % self.name
154 self.lock.release()
155 if __debug__ and self.verbose > 1:
156 self.ls.write(self.name, 'Condition.released')
157
159 if __debug__ and not self.lock.locked():
160 if self.verbose:
161 self.ls.write(self.name, 'ERR: notify but not locked.')
162 raise AssertionError, "Notify on unlocked Condition: %s" % self.name
163 if __debug__ and self.verbose:
164 self.ls.write(self.name, 'Condition.notify')
165 try:
166 self.waiters.pop(0).release()
167 except IndexError:
168 prinfo('Condition: notify when no one is waiting', self.name)
169
170
172 if __debug__ and not self.lock.locked():
173 if self.verbose:
174 self.ls.write(self.name, 'ERR: notifyAll but not locked.')
175 raise AssertionError, "NotifyAll on unlocked Condition: %s" % self.name
176 if __debug__ and self.verbose:
177 self.ls.write(self.name, 'Condition.notify %d' % len(self.waiters))
178 if not self.waiters:
179 prinfo('Condition: notifyAll when no one is waiting', self.name)
180 for tmplock in self.waiters:
181 tmplock.release()
182 self.waiters = []
183
184
186 if __debug__ and self.lock.locked():
187 if self.verbose:
188 self.ls.write(self.name, 'ERR: __del__ while locked, w:%d.' % len(self.waiters))
189 raise AssertionError, "__del__ of locked Condition: %s, with %d waiters" % (self.name, len(self.waiters))
190
191
192 if __debug__:
193 _prinfo_lock = T.allocate_lock()
205 else:
208