Package gmisclib :: Module mcmc_m4p
[frames] | no frames]

Source Code for Module gmisclib.mcmc_m4p

  1  """This is a helper module to make use of mcmc.py and mcmc_big.py. 
  2  It allows you to conveniently run a Monte-Carlo simulation of any 
  3  kind until it converges (L{stepper.run_to_bottom}) or until it 
  4  has explored a large chunk of parameter space (L{stepper.run_to_ergodic}). 
  5   
  6  It also helps you with logging the process. 
  7   
  8  When run in parallel, each processor does its thing more-or-less 
  9  independently.   However, every few steps, they exchange notes on 
 10  their current progress.   If one finds an improved vertex, it will be 
 11  passed on to other processors via MPI. 
 12   
 13  Tested with mpi4py version 1.1.0, http://mpi4py.scipy.org 
 14  """ 
 15   
 16  import sys 
 17  from mpi4py import MPI  # This uses mpi4py 
 18  import random 
 19  import numpy 
 20  from gmisclib import die 
 21  from gmisclib import mcmc 
 22  from gmisclib import mcmc_helper as MCH 
 23  Debug = 0 
 24  from gmisclib.mcmc_helper import TooManyLoops, warnevery, logger_template, test 
 25  from gmisclib.mcmc_helper import step_acceptor, make_stepper_from_lov 
 26   
 27  mpi = MPI.COMM_WORLD 
 28   
 29   
 30   
 31   
32 -class stepper(MCH.stepper):
33 - def __init__(self, x, maxloops=-1, logger=None):
34 die.info('# mpi stepper rank=%d size=%d' % (rank(), size())) 35 assert maxloops == -1 36 self.maxloops = -1 37 MCH.stepper.__init__(self, x, maxloops, logger)
38 39
40 - def reset_loops(self, maxloops=-1):
41 assert maxloops == -1 42 self.maxloops = -1
43 44
45 - def communicate_hook(self, id):
46 self.note('chook iter=%d' % self.iter, 4) 47 if size() > 1: 48 self.note('chook active iter=%d' % self.iter, 3) 49 c = self.x.current() 50 v = c.vec() 51 lp = c.logp() 52 53 r = rank() 54 s = size() 55 self.note('sendrecv from %d to %d' % (r, (r+1)%s), 5) 56 nv, nlp, nid = mpi.sendrecv(sendobj=(v, lp, id), 57 dest=(r+1)%s, sendtag=self.MPIID, 58 source=(r+s-1)%s, 59 recvtag=self.MPIID 60 ) 61 62 self.note('communicate succeeded from %s' % nid, 1) 63 delta = nlp - lp 64 if self.x.acceptable(delta): 65 q = self.x._set_current(c.new(nv-v, logp=nlp)) 66 self.note('communicate accepted: %s' % q, 1) 67 else: 68 self.note('communicate not accepted %g vs %g' % (nlp, lp), 1) 69 self.x._set_current(self.x.current()) 70 sys.stdout.flush()
71 72 73 MPIID = 1241 74 75 # def _not_enough_changes(self, n, ncw): 76 # mytmp = n < ncw 77 # self.note('_nyc pre') 78 # ntrue = mpi.allreduce(int(mytmp), MPI.SUM) 79 # self.note('_nyc post') 80 # return ntrue*3 >= size() 81 82
83 - def _nc_get_hook(self, nc):
84 self.note('_nye pre', 5) 85 ncsum = mpi.allreduce(float(nc), MPI.SUM) 86 self.note('_nye post', 5) 87 return ncsum/float(size())
88 89
90 - def _not_at_bottom(self, xchanged, nchg, es, dotchanged, ndot):
91 mytmp = (numpy.sometrue(numpy.less(xchanged,nchg)) 92 or es<1.0 or dotchanged<ndot 93 or self.x.acceptable.T()>1.5 94 ) 95 self.note('_nab pre', 5) 96 ntrue = mpi.allreduce(int(mytmp), MPI.SUM) 97 self.note('_nab post', 5) 98 return ntrue*4 >= size()
99 100
101 - def join(self, id):
102 self.note('pre join %s' % id, 5) 103 rootid = mpi.bcast(id) 104 assert rootid == id 105 self.note('post join %s' % id, 5)
106 107
108 - def note(self, s, lvl):
109 if Debug >= lvl: 110 sys.stderr.writelines('# %s, rank=%d\n' % (s, rank())) 111 sys.stderr.flush()
112 113
114 -def precompute_logp(lop):
115 """Does a parallel evaluation of logp for all items in lop. 116 """ 117 nper = len(lop)//size() 118 r = rank() 119 mychunk = lop[r*nper:(r+1)*nper] 120 for p in mychunk: 121 q = p.logp() 122 print 'logp=', q, 'for rank', r 123 for r in range(size()): 124 nc = mpi.bcast(mychunk, r) 125 lop[r*nper:(r+1)*nper] = nc 126 mpi.barrier()
127 128
129 -def is_root():
130 return mpi.Get_rank() == 0
131
132 -def size():
133 return mpi.Get_size()
134
135 -def rank():
136 return mpi.Get_rank()
137 138
139 -def test_():
140 def test_logp(x, c): 141 # print '#', x[0], x[1] 142 return -(x[0]-x[1]**2)**2 + 0.001*x[1]**2
143 x = mcmc.bootstepper(test_logp, numpy.array([0.0,2.0]), 144 numpy.array([[1.0,0],[0,2.0]])) 145 print 'TEST: rank=', rank() 146 thr = stepper(x) 147 # nsteps = thr.run_to_bottom(x) 148 # print '#nsteps', nsteps 149 # assert nsteps < 100 150 for i in range(2): 151 print 'RTC' 152 thr.run_to_change(2) 153 print 'RTE' 154 thr.run_to_ergodic(1.0) 155 print 'DONE' 156 thr.close() 157 158 159 160 if __name__ == '__main__': 161 test_() 162