Package gmisclib :: Module mcmc_socket
[frames] | no frames]

Source Code for Module gmisclib.mcmc_socket

  1  # -*- coding: utf-8 -*- 
  2  """This is a helper module to make use of mcmc.py and mcmc_big.py. 
  3  It allows you to conveniently run a Monte-Carlo simulation of any 
  4  kind until it converges (L{stepper.run_to_bottom}) or until it 
  5  has explored a large chunk of parameter space (L{stepper.run_to_ergodic}). 
  6   
  7  It also helps you with logging the process. 
  8   
  9  When run in parallel, each processor does its thing more-or-less 
 10  independently.   However, every few steps, they exchange notes on 
 11  their current progress.   If one finds an improved vertex, it will be 
 12  passed on to other processors via mcmc_cooperate.py. 
 13  """ 
 14   
 15  import sys 
 16  import random 
 17  import numpy 
 18   
 19  import die 
 20  import mcmc 
 21  import mcmc_helper as MCH 
 22  Debug = 0 
 23  from mcmc_helper import TooManyLoops, warnevery, logger_template, test 
 24  from mcmc_helper import step_acceptor, make_stepper_from_lov 
 25   
 26  import mcmc_cooperate as MC 
 27   
 28   
 29   
30 -class stepper(MCH.stepper):
31 - def __init__(self, x, maxloops=-1, logger=None, share=None):
32 assert maxloops == -1 33 MCH.stepper.__init__(self, x, maxloops, logger) 34 self.comm = MC.connection(*share) 35 self.barrier = MC.Barrier(0) 36 self.delta_b = MC.Barrier(1) 37 die.info('# mpi stepper rank=%d size=%d' % (self.rank(), self.size()))
38 39
40 - def reset_loops(self, maxloops=-1):
41 assert maxloops == -1 42 MCH.stepper.reset_loops(self, maxloops)
43 44
45 - def communicate_hook(self, id):
46 self.note('chook iter=%d' % self.iter, 4) 47 self.note('chook active iter=%d' % self.iter, 3) 48 c = self.x.current() 49 v = c.vec() 50 lp = c.logp() 51 nlp, nv = self.comm.swap_vec(lp, v) 52 delta = nlp - lp 53 if self.x.acceptable(delta): 54 q = self.x._set_current(c.new(nv-v, logp=nlp)) 55 self.note('communicate accepted: %s' % q, 1) 56 else: 57 self.note('communicate not accepted %g vs %g' % (nlp, lp), 1) 58 self.x._set_current(self.x.current()) 59 sys.stdout.flush()
60 61
62 - def _nc_get_hook(self, nc):
63 rv = self.comm.get_consensus('nc', nc, self.barrier, "float_median") 64 self.barrier += self.delta_b 65 return rv
66
67 - def _not_at_bottom(self, xchanged, nchg, es, dotchanged, ndot):
68 mytmp = (numpy.sometrue(numpy.less(xchanged,nchg)) 69 or es<1.0 or dotchanged<ndot 70 or self.x.acceptable.T()>1.5 71 ) 72 rv = self.comm.get_consensus('nc', mytmp, self.barrier, "float_median") 73 self.barrier += self.delta_b 74 return rv > 0.25
75 76
77 - def synchronize_start(self, id):
78 self.comm.barrier(self.barrier.deepen(1))
79
80 - def synchronize_end(self, id):
81 self.comm.barrier(self.barrier.deepen(2)) 82 self.barrier += self.delta_b
83 84
85 - def synchronize_abort(self, id):
86 self.comm.barrier(self.barrier.deepen(100)) 87 self.barrier += self.delta_b
88 89
90 - def note(self, s, lvl):
91 if Debug >= lvl: 92 sys.stderr.writelines('# %s, rank=%d\n' % (s, self.comm.rank())) 93 sys.stderr.flush()
94 95
96 - def close(self):
97 self.comm.close() 98 MCH.stepper.close(self)
99
100 - def size(self):
101 return self.comm.rank()[0]
102
103 - def rank(self):
104 return self.comm.rank()[1]
105 106
107 -def precompute_logp(lop):
108 raise RuntimeError, "Not Implemented"
109 110
111 -def test_opt(args):
112 def test_logp(x, c): 113 # print '#', x[0], x[1] 114 return -(x[0]-x[1]**2)**2 + 0.001*x[1]**2
115 x = mcmc.bootstepper(test_logp, numpy.array([0.0,2.0]), 116 numpy.array([[1.0,0],[0,2.0]])) 117 # print 'TEST: rank=', rank() 118 thr = stepper(x, share=args) 119 # nsteps = thr.run_to_bottom(x) 120 # print '#nsteps', nsteps 121 # assert nsteps < 100 122 for i in range(20): 123 print 'RTC' 124 thr.run_to_change(2) 125 print 'RTE' 126 thr.run_to_ergodic(1.0) 127 print 'DONE' 128 thr.close() 129 130 131 132 if __name__ == '__main__': 133 test_opt(MC.test_args) 134