Package gmisclib :: Module mcmc_pypar
[frames] | no frames]

Source Code for Module gmisclib.mcmc_pypar

  1  """This is a helper module to make use of mcmc.py and mcmc_big.py. 
  2  It allows you to conveniently run a Monte-Carlo simulation of any 
  3  kind until it converges (L{stepper.run_to_bottom}) or until it 
  4  has explored a large chunk of parameter space (L{stepper.run_to_ergodic}). 
  5   
  6  It also helps you with logging the process. 
  7  """ 
  8   
  9  import sys 
 10  import pypar as mpi             # This uses pypar 
 11  import operator 
 12  import random 
 13  from gmisclib import die 
 14  from gmisclib import Num 
 15  from gmisclib import mcmc 
 16  from gmisclib import mcmc_helper as MCH 
 17  # MCH.Debug = True 
 18  from gmisclib.mcmc_helper import TooManyLoops, warnevery, test, logger_template 
 19   
 20   
21 -class _counter(object):
22 - def __init__(self):
23 self.count = 0
24
25 - def update(self, accepted):
26 self.count += accepted
27
28 - def get(self):
29 return self.count
30 31
32 -def Reduce(x, root):
33 REDUCE = 2287 34 REDUCE2 = 228 35 q = mpi.broadcast(444, 0) 36 assert q == 444 37 mpi.send(x, root, tag=REDUCE) 38 s = -1 39 if mpi.rank() == 0: 40 for i in range(mpi.size()): 41 s += mpi.receive(i, tag=REDUCE) 42 return mpi.broadcast(s, root)
43 44 45
46 -class _accum_erg(object):
47 - def __init__(self):
48 self.erg = 0.0 49 self.last_resetid = -1
50
51 - def update(self, x):
52 """Increment an accumulator, but reset it to 53 zero if the underlying stepper has called reset(). 54 The increment is by the ergodicity estimate, so 55 C{self.erg} should approximate how many times the 56 MCMC process had wandered across the probability 57 distribution. 58 """ 59 tmp = x.reset_id() 60 if tmp != self.last_resetid: 61 self.erg = 0.0 62 self.last_resetid = tmp 63 else: 64 self.erg += x.ergodic()
65 66
67 - def get(self):
68 return self.erg
69 70
71 -class stepper(MCH.stepper):
72 - def __init__(self, x, maxloops=-1, logger=None):
73 die.info('# mpi stepper rank=%d size=%d' % (mpi.rank(), mpi.size())) 74 assert maxloops == -1 75 self.maxloops = -1 76 MCH.stepper.__init__(self, x, maxloops, logger)
77 78
79 - def reset_loops(self, maxloops=-1):
80 assert maxloops == -1 81 self.maxloops = -1
82 83
84 - def communicate_hook(self):
85 self.note('chook iter=%d' % self.iter) 86 if mpi.size() > 1: 87 self.note('chook active iter=%d' % self.iter) 88 c = self.x.current() 89 v = c.vec() 90 lp = c.logp() 91 if mpi.rank()%2 == 0: 92 mpi.send((v, lp), (mpi.rank()+1)%mpi.size(), tag=self.MPIID) 93 nv, nlp = mpi.receive(-1, tag=self.MPIID) 94 if mpi.rank()%2 == 1: 95 mpi.send((v, lp), (mpi.rank()+1)%mpi.size(), tag=self.MPIID) 96 97 if nlp > lp - self.x.T*random.expovariate(1.0): 98 self.x._set_current(c.new(nv-v, logp=nlp)) 99 self.note('communicate succeeded') 100 else: 101 self.note('communicate not accepted') 102 sys.stdout.flush()
103 104 105 MPIID = 1241 106 107 # def _not_enough_changes(self, n, ncw): 108 # mytmp = n < ncw 109 # self.note('_nyc pre') 110 # ntrue = mpi.allreduce(int(mytmp), mpi.SUM) 111 # self.note('_nyc post') 112 # return ntrue*3 >= mpi.size() 113 114
115 - def _not_yet_ergodic(self, nc, ncw):
116 mytmp = nc.get() < ncw 117 self.note('_nye pre') 118 ntrue = Reduce(int(mytmp), 0) 119 self.note('_nye post') 120 return ntrue*2 >= mpi.size()
121 122
123 - def _not_at_bottom(self, xchanged, nchg, es, dotchanged, ndot, update_T):
124 mytmp = (Num.sometrue(Num.less(xchanged,nchg)) 125 or es<1.0 or dotchanged<ndot 126 or (update_T and self.x.T>1.5) 127 ) 128 self.note('_nab pre') 129 ntrue = Reduce(int(mytmp), 0) 130 self.note('_nab post') 131 return ntrue*4 >= mpi.size()
132 133
134 - def join(self, id):
135 self.note('pre join %s' % id) 136 rootid = mpi.broadcast(id, 0) 137 assert rootid == id 138 self.note('post join %s' % id)
139
140 - def note(self, s):
141 if MCH.Debug: 142 print s, "rank=", mpi.rank() 143 sys.stdout.flush()
144 145
146 -def precompute_logp(lop):
147 """Does a parallel evaluation of logp for all items in lop. 148 """ 149 nper = len(lop)//mpi.size() 150 r = mpi.rank() 151 mychunk = lop[r*nper:(r+1)*nper] 152 for p in mychunk: 153 q = p.logp() 154 print 'logp=', q, 'for rank', r 155 for r in range(mpi.size()): 156 nc = mpi.broadcast(mychunk, r) 157 lop[r*nper:(r+1)*nper] = nc 158 mpi.broadcast(0, 0)
159 160
161 -def is_root():
162 return mpi.rank() == 0
163
164 -def size():
165 return mpi.size()
166 167
168 -def test_opt():
169 def test_logp(x, c): 170 # print '#', x[0], x[1] 171 return -(x[0]-x[1]**2)**2 + 0.001*x[1]**2
172 x = mcmc.bootstepper(test_logp, Num.array([0.0,2.0]), 173 Num.array([[1.0,0],[0,2.0]])) 174 print 'rank=', mpi.rank() 175 thr = stepper(x) 176 # nsteps = thr.run_to_bottom(x) 177 # print '#nsteps', nsteps 178 # assert nsteps < 100 179 for i in range(2): 180 print 'RTC' 181 thr.run_to_change(2) 182 print 'RTE' 183 thr.run_to_ergodic(1.0) 184 print 'DONE' 185 thr.close() 186 187 188 if __name__ == '__main__': 189 test_opt() 190