Package gmisclib :: Module mcmc_helper
[frames] | no frames]

Source Code for Module gmisclib.mcmc_helper

  1  # -*- coding: utf-8 -*- 
  2  """This is a helper module to make use of mcmc.py and mcmc_big.py. 
  3  It allows you to conveniently run a Monte-Carlo simulation of any 
  4  kind until it converges (L{stepper.run_to_bottom}) or until it 
  5  has explored a large chunk of parameter space (L{stepper.run_to_ergodic}). 
  6   
  7  It also helps you with logging the process. 
  8  """ 
  9   
 10  from __future__ import with_statement 
 11  import sys 
 12  import math 
 13  import random 
 14  import thread as Thr 
 15  import numpy 
 16   
 17  import g_implements 
 18  import mcmc 
 19  import die 
 20   
 21  Debug = 0 
 22   
 23  _NumPyType = type(numpy.zeros((1,))) 
 24   
25 -class TooManyLoops(Exception):
26 - def __init__(self, *s):
27 Exception.__init__(self, *s)
28 29
30 -class warnevery(object):
31 - def __init__(self, interval):
32 self.c = 0 33 self.interval = interval 34 self.lwc = 0
35
36 - def inc(self, incr=1):
37 self.c += incr 38 if self.c > self.lwc + self.interval: 39 self.lwc = self.c 40 return 1 41 return 0
42
43 - def count(self):
44 return self.c
45 46
47 -class _counter(object):
48 - def __init__(self):
49 self.lock = Thr.allocate_lock() 50 self.count = 0
51
52 - def update(self, accepted):
53 with self.lock: 54 self.count += accepted
55
56 - def get(self):
57 with self.lock: 58 return self.count
59 60 61
62 -class logger_template(object):
63 - def add(self, stepperInstance, iter):
64 pass
65
66 - def close(self):
67 pass
68
69 - def reset(self):
70 pass
71
72 -def _sign(x):
73 if x > 0.0: 74 return 1 75 elif x < 0.0: 76 return -1 77 return 0
78 79
80 -class stepper(object):
81 - def __init__(self, x, maxloops=-1, logger=None, share=None):
82 self.logger = None # This sets things up for close() 83 self.iter = 0 84 self.last_resetid = -1 85 self.maxloops = maxloops 86 g_implements.check(x, mcmc.stepper) 87 self.x = x 88 if logger is None: 89 self.logger = logger_template() 90 else: 91 g_implements.check(logger, logger_template) 92 self.logger = logger
93 94
95 - def reset_loops(self, maxloops=-1):
96 self.maxloops = maxloops
97 98
99 - def run_to_change(self, ncw=1, acceptable_step=None, update_T=False):
100 """Run the X{Markov Chain Monte-Carlo} until it finds 101 an acceptable step. 102 103 @param ncw: How many steps should be accepted before it stops? 104 @type ncw: int 105 @param acceptable_step: A function that decides what steps are acceptable and what are not. 106 This is passed to the MCMC stepper, L{mcmc.BootStepper}. 107 @type acceptable_step: function(float) -> bool, often L{mcmc.T_acceptor} or L{step_acceptor} 108 @param update_T: Obsolete. Must be False. 109 @type update_T: boolean. 110 @raise TooManyLoops: if it takes a long time before enough steps are accepted. 111 """ 112 assert not update_T, "Obsolete!" 113 if acceptable_step is not None: 114 self.x.acceptable = acceptable_step 115 old_acceptable = self.x.acceptable 116 n = 0 117 self.synchronize_start('rtc') 118 self.communicate_hook("RTC:%d" % ncw) 119 try: 120 while n < ncw: 121 n += self.x.step() 122 if self.maxloops > 0: 123 self.maxloops -= 1 124 if self.maxloops == 0: 125 raise TooManyLoops, "run_to_change" 126 self.iter += 1 127 finally: 128 self.x.acceptable_step = old_acceptable 129 self.synchronize_end('rtc') 130 return self.x.current()
131 132
133 - def communicate_hook(self, id):
134 return
135 136
137 - def close(self):
138 """After calling close(), it is no longer legal 139 to call run_to_change(), run_to_ergodic(), or 140 run_to_bottom(). Logging is shut down, but the 141 contents of the stepper are still available for 142 inspection. 143 """ 144 if self.logger is not None: 145 self.logger.close() 146 self.logger = None
147 148
149 - def _nc_get_hook(self, nc):
150 return nc
151 152
153 - def run_to_ergodic(self, ncw=1, T=1.0):
154 """Run the stepper until it has explored all of parameter space 155 C{ncw} times (as best as we can estimate). Note that this 156 is a pretty careful routine. If the stepper finds a better 157 region of parameter space so that log(P) improves part way 158 through the process, the routine will reset itself and begin 159 again. 160 161 NOTE: this sets C{T} and C{sortstrategy} for the L{stepper<mcmc.BootStepper>}. 162 163 @param ncw: how many times (or what fraction) of an ergodic 164 exploration to make. 165 @type ncw: int. 166 @param T: temperature at which to run the Monte-Carlo process. 167 This is normally 1.0. If it is C{None}, then the 168 current temperature is not modified. 169 @type T: C{float} or C{None}. 170 @return: a L{position<mcmc.position_base>} at the end of the process. 171 """ 172 ISSMAX = 20 173 self.synchronize_start('rte') 174 if T is not None: 175 acceptable_step = mcmc.T_acceptor(T) 176 else: 177 acceptable_step = None 178 ss = self.x.set_strategy(self.x.SSAMPLE) 179 #: nc is (roughly speaking) how many times we have wandered back and forth 180 #: along the long axis of the probability distribution. 181 nc = 0.0 182 try: 183 #: iss controls the interval between (a) calls to self.x.ergodic and 184 #: (b) inter-process communication. Having iss>1 means we don't 185 #: waste too much time on these expensive operations. 186 iss = 1 187 while True: 188 ncg = self._nc_get_hook(nc) 189 if ncg >= ncw: 190 break 191 elif ncg < 0.5*ncw and iss<ISSMAX: 192 iss += 1 193 # print 'ncg=', ncg, '/', ncw, 'iss=', iss 194 self.run_to_change(ncw=iss, acceptable_step=acceptable_step) 195 nc += self.x.ergodic() * iss 196 if self.x.reset_id() != self.last_resetid: 197 self.last_resetid = self.x.reset_id() 198 nc = 0.0 199 self.logger.reset() 200 else: 201 e = self.x.ergodic() 202 nc += e * iss 203 if e > 0.0: # I.e. it's been a while since a reset. 204 self.logger.add(self.x, self.iter) 205 finally: 206 self.x.set_strategy(ss) # Return to previous sortstrategy. 207 # There will be trouble if you get an exception caused by maxloops. 208 # There is yet no way to synchronize all MPI threads. 209 self.synchronize_end('rte') 210 return self.x.current()
211 212 213 214
215 - def _not_at_bottom(self, xchanged, nchg, es, dotchanged, ndot):
216 return (numpy.sometrue(numpy.less(xchanged,nchg)) 217 or es<0.5 or dotchanged<ndot 218 or self.x.acceptable.T()>1.5 219 )
220 221
222 - def run_to_bottom(self, ns=3, acceptable_step=None):
223 """Run the X{Markov Chain Monte-Carlo} until it converges 224 near a minimum. 225 226 The general idea of the termination condition is that 227 it keeps track of the angle between successive sequences of steps, 228 where each sequence contains C{ns} steps. 229 If the angle is less than 90 degrees, it is evidence that the 230 solution is still drifting in some consistent direction and therefore 231 not yet at the maximum. Angles of greater than 90 degrees 232 suggest that the optimization is wandering aimlessly near 233 a minimum. It will run until a sufficient number 234 of large angles are seen since the last BMCMC reset. 235 A similar check is made on individual components of the 236 parameter vector. 237 238 @param ns: This controls how carefully if checks that 239 it has really found a minimum. Large values 240 will take longer but will assure more accurate 241 convergence. 242 @type ns: L{int} 243 @param acceptable_step: A callable object (i.e. class or function) that returns C{True} or C{False}, 244 depending on whether a proposed step is acceptable. 245 See L{mcmc.T_acceptor} for an example. 246 @raise TooManyLoops: ? 247 """ 248 assert ns > 0 249 m = self.x.np 250 if acceptable_step is None: #Backwards compatibility 251 acceptable_step = step_acceptor(n=m, T0=10.0, T=1.0) 252 assert acceptable_step is not None 253 nchg = 1 + m//2 + ns 254 ndot = 1 + m//2 + ns 255 for i in range(2+m//2): 256 self.run_to_change(5, acceptable_step=acceptable_step) # get it warmed up. 257 self.logger.add(self.x, self.iter) 258 before = self.run_to_change(ns, acceptable_step=acceptable_step) 259 self.logger.add(self.x, self.iter) 260 mid = self.run_to_change(ns, acceptable_step=acceptable_step) 261 self.logger.add(self.x, self.iter) 262 es = 0.0 263 lock = Thr.allocate_lock() 264 xchanged = numpy.zeros(m, numpy.int) 265 dotchanged = 0 266 last_resetid = self.x.reset_id() 267 self.synchronize_start('rtb') 268 while self._not_at_bottom(xchanged, nchg, es, dotchanged, ndot): 269 try: 270 after = self.run_to_change(ns, acceptable_step=acceptable_step) 271 except TooManyLoops, x: 272 self.synchronize_abort('rtb') 273 raise TooManyLoops, ('run_to_bottom: s' % x, self.iter, es, dotchanged, m, 274 numpy.sum(numpy.less(xchanged, nchg)), 275 self.x.current().prms() 276 ) 277 self.logger.add(self.x, self.iter) 278 # We keep track of direction reversals to see when 279 # we reach the bottom. 280 numpy.subtract(xchanged, 281 numpy.sign( (after.vec()-mid.vec()) 282 * (mid.vec()-before.vec())), 283 xchanged) 284 285 dotchanged -= _sign(numpy.dot(after.vec()-mid.vec(), mid.vec()-before.vec())) 286 287 es += self.x.ergodic() * ns**0.5 288 289 # print 'LA' 290 lock.acquire() 291 if self.x.reset_id() != last_resetid: 292 # A new maximum: we are probably 293 # not stable. Likely, we are still in the 294 # improvement phase. 295 last_resetid = self.x.reset_id() 296 lock.release() 297 self.logger.reset() 298 es = 0.0 299 xchanged[:] = 0 300 dotchanged = 0 301 self.note('run_to_bottom: Reset: T=%g' % acceptable_step.T(), 1) 302 else: 303 lock.release() 304 self.note('run_to_bottom: T=%g' % acceptable_step.T(), 3) 305 306 before = mid 307 mid = after 308 self.synchronize_end('rtb') 309 return self.iter
310 311
312 - def synchronize_start(self, id):
313 return
314
315 - def synchronize_end(self, id):
316 return
317
318 - def synchronize_abort(self, id):
319 return
320
321 - def note(self, s, lvl):
322 if Debug >= lvl: 323 print s 324 sys.stdout.flush()
325 326 327
328 -class step_acceptor(mcmc.rough_acceptor_base):
329 """This class defines the annealing schedule when L{mcmc} is used 330 as an optimizer. It corresponds to C{n} extra degrees of freedom 331 that exchange probability (i.e. `energy') with each other, and 332 the system to be optimized. 333 Over time, the excess log(probability) gradually oozes away. 334 """ 335
336 - def __init__(self, n, T0, T=1.0, maxT=None, probestep=None, tau_probe=None, jitterdroop=None):
337 """Create a step_acceptor and define it's properties. 338 @param n: How many degrees of freedom should the step 339 acceptor store? 340 Note that the energy of the step acceptor is gradually equilibrated to 341 the desired temperature. Each step, there is a 1/n chance of equibrilating 342 one DOF. So, when n=1, this is exactly a T_acceptor. When n>>1, 343 the acceptor is very nearly leak-free, and exchanges energy with the system 344 to be optimized, but hardly at all with the heat bath. 345 So, for n=1, the temperature is fixed at T. When n>>1, the temperature can 346 rise as logP goes up, and will fall when logP goes down. 347 @type n: L{int} 348 @param T: What's the temperature of the heat bath? 349 This is the final temperature that the annealing 350 schedule will eventually approach. Default: C{1.0}. 351 @type T: L{float} 352 @param T0: The starting temperature. 353 @type T0: L{float} 354 @param maxT: In optimization mode, it's normal for the system being 355 optimized to increase it's log(P). That dumps log(P) or 356 `energy' into the L{step_acceptor}'s degrees of freedom 357 and raises its temperature. This parameter lets you define 358 an approximate maximum temperature. Setting this 359 may speed up convergence, though there is a corresponding risk 360 of getting trapped in a local minimum. 361 @type maxT: L{float} or L{None} 362 """ 363 mcmc.rough_acceptor_base.__init__(self, probestep, tau_probe, jitterdroop) 364 365 T = float(T) 366 assert T > 0 and T0 > 0 367 assert n > 0 368 self._T = T 369 self.E = [random.expovariate(1.0)*T0 for i in range(n+1)] 370 if maxT is not None: 371 assert maxT > max(T, T0) 372 self.maxT = maxT 373 else: 374 self.maxT = T0
375 376
377 - def T(self):
378 """@return: the current effective temperature. 379 @rtype: float 380 """ 381 Tsum = 0.0 382 for e in self.E: 383 Tsum += e 384 return math.hypot(Tsum/len(self.E), self.jitter())
385 386
387 - def __call__(self, delta):
388 """@param delta: How much did the candidate step change C{log(P)}? 389 @type delta: L{float} 390 @return: Whether to accept the candidate step or not. 391 @rtype: L{bool} 392 """ 393 print 'delta=', delta, "E=", self.E 394 n = len(self.E) 395 i = random.randrange(n) 396 if not ( delta > -self.E[i] ): 397 j = random.randrange(n) 398 if j == i: 399 self.E[i] = math.hypot(self._T, self.jitter())*random.expovariate(1.0) 400 else: 401 Esum = self.E[i] + self.E[j] 402 f = random.random() 403 self.E[i] = f * Esum 404 self.E[j] = Esum - self.E[i] 405 print 'Not OK E->', self.E 406 return False 407 if i == random.randrange(n): 408 self.E[i] = math.hypot(self._T, self.jitter())*random.expovariate(1.0) 409 else: 410 self.E[i] = min(delta + self.E[i], 2*self.maxT) 411 print 'OK: updating E[%d] to %g' % (i, self.E[i]) 412 return True
413
414 - def __repr__(self):
415 return '<step_acceptor(T=%g) %s>' % (self._T, str(self.E))
416
417 - def is_root(self):
418 """This is a stub for compatibility with MPI code.""" 419 return self.rank() == 0
420
421 - def size(self):
422 """This is a stub for compatibility with MPI code.""" 423 return 1
424
425 - def rank(self):
426 """This is a stub for compatibility with MPI code.""" 427 return 0
428 429 430 431 _NumpyType = type(numpy.zeros((1,))) 432
433 -def make_stepper_from_lov(problem_def, vector_generator, 434 mcmc_mod=mcmc, posn_class=mcmc.position_repeatable, n=None):
435 """ 436 @param vector_generator: Initial starting points for the optimization. 437 @type vector_generator: A sequence of numpy.ndarray, normally a list of them. 438 @type n: L{None} or int 439 @param n: The minimum number of samples to use. None means number of parameters+2. 440 @return: A C{BootStepper}, ready to run, from the specified module. 441 @rtype: instance of C{mcmc_mod}, typically L{BootStepper}. 442 """ 443 lop = [] 444 lov = [] 445 for x in vector_generator: 446 assert type(x) == _NumpyType, "Whoops: type=%s vs %s x=%s" % (str(type(x)), str(_NumPyType), str(x)) 447 try: 448 lop.append(posn_class(x, problem_def)) 449 lov.append(x) 450 except mcmc.NotGoodPosition, xcpt: 451 die.warn("Bad initial position for guess: %s" % xcpt) 452 nwp = 2+x.shape[0] 453 if n is None or n < nwp: 454 n = nwp 455 if len(lop) >= n: 456 break 457 # Initialize the Markov-Chain Monte-Carlo: 458 if len(lov) < 2: 459 die.warn("Not enough data") 460 raise ValueError, "Not enough vectors that are good positions." 461 v = mcmc.diag_variance(lov) 462 st = mcmc_mod.BootStepper(lop, v) 463 return st
464 465
466 -def make_stepper_from_lop(position_generator, mcmc_mod=mcmc, n=None):
467 """ 468 @param position_generator: Initial starting points for the optimization. 469 @type position_generator: A sequence of L{mcmc.position_base}. 470 @type n: L{None} or int 471 @param n: The minimum number of samples to use. None means number of parameters+2. 472 @return: A C{BootStepper}, ready to run, from the specified module. 473 @rtype: instance of C{mcmc_mod}, typically L{BootStepper}. 474 """ 475 lop = [] 476 lov = [] 477 for x in position_generator: 478 assert isinstance(x, mcmc.position_base) 479 lop.append(x) 480 lov.append(x.vec()) 481 nwp = 2 + x.vec().shape[0] 482 if n is None or n < nwp: 483 n = nwp 484 if len(lop) >= n: 485 break 486 # Initialize the Markov-Chain Monte-Carlo: 487 if len(lov) < 2: 488 die.warn("Not enough data") 489 raise ValueError, "Not enough vectors that are good positions." 490 v = mcmc.diag_variance(lov) 491 st = mcmc_mod.BootStepper(lop, v) 492 return st
493 494
495 -def test1():
496 def test_logp(x, c): 497 # print '#', x[0], x[1] 498 return -(x[0]-x[1]**2)**2 - 0.001*x[1]**2
499 global Debug 500 mcmc.Debug = 3 501 Debug = 3 502 x = mcmc.bootstepper(test_logp, numpy.array([0.0,2.0]), 503 numpy.array([[1.0,0],[0,2.0]])) 504 thr = stepper(x) 505 nsteps = thr.run_to_bottom(acceptable_step=step_acceptor(x.np, T0=5.0, T=1.0)) 506 print '#nsteps', nsteps, "lopg=", x.current().logp_nocompute() 507 assert 0 >= x.current().logp_nocompute() > -10 508 assert nsteps < 4000 509 for i in range(2): 510 print 'RTC' 511 thr.run_to_change(2) 512 print 'RTE' 513 thr.run_to_ergodic(1.0) 514 print 'DONE' 515 thr.close() 516 517
518 -def test_probe():
519 def test_logp(x, c): 520 # print '#', x[0], x[1] 521 return -(x[0]-x[1]**2)**2 - 0.001*x[1]**2
522 523 def p(stepper): 524 return numpy.array([0.01, 0.1]) 525 526 # mcmc.Debug = 3 527 # Debug = 3 528 x = mcmc.bootstepper(test_logp, numpy.array([0.0,2.0]), 529 numpy.array([[1.0,0],[0,2.0]])) 530 thr = stepper(x) 531 nsteps = thr.run_to_bottom(acceptable_step=step_acceptor(x.np, T0=5.0, T=1.0, probestep=p)) 532 print '#nsteps', nsteps, "lopg=", x.current().logp_nocompute() 533 assert 0 >= x.current().logp_nocompute() > -10 534 assert nsteps < 4000 535 for i in range(2): 536 print 'RTC' 537 thr.run_to_change(2) 538 print 'RTE' 539 thr.run_to_ergodic(1.0) 540 print 'DONE' 541 thr.close() 542
543 -def test():
544 test1() 545 test_probe()
546 547 if __name__ == '__main__': 548 test() 549