1 """This is a helper module to make use of mcmc.py and mcmc_big.py.
2 It allows you to conveniently run a Monte-Carlo simulation of any
3 kind until it converges (L{stepper.run_to_bottom}) or until it
4 has explored a large chunk of parameter space (L{stepper.run_to_ergodic}).
5
6 It also helps you with logging the process.
7
8 When run in parallel, each processor does its thing more-or-less
9 independently. However, every few steps, they exchange notes on
10 their current progress. If one finds an improved vertex, it will be
11 passed on to other processors via MPI.
12
13 Tested with mpi4py version 1.1.0, http://mpi4py.scipy.org
14 """
15
16 import sys
17 from mpi4py import MPI
18 import random
19 import numpy
20 from gmisclib import die
21 from gmisclib import mcmc
22 from gmisclib import mcmc_helper as MCH
23 Debug = 0
24 from gmisclib.mcmc_helper import TooManyLoops, warnevery, logger_template, test
25 from gmisclib.mcmc_helper import step_acceptor, make_stepper_from_lov
26
27 mpi = MPI.COMM_WORLD
28
29
30
31
33 - def __init__(self, x, maxloops=-1, logger=None):
38
39
41 assert maxloops == -1
42 self.maxloops = -1
43
44
46 self.note('chook iter=%d' % self.iter, 4)
47 if size() > 1:
48 self.note('chook active iter=%d' % self.iter, 3)
49 c = self.x.current()
50 v = c.vec()
51 lp = c.logp()
52
53 r = rank()
54 s = size()
55 self.note('sendrecv from %d to %d' % (r, (r+1)%s), 5)
56 nv, nlp, nid = mpi.sendrecv(sendobj=(v, lp, id),
57 dest=(r+1)%s, sendtag=self.MPIID,
58 source=(r+s-1)%s,
59 recvtag=self.MPIID
60 )
61
62 self.note('communicate succeeded from %s' % nid, 1)
63 delta = nlp - lp
64 if self.x.acceptable(delta):
65 q = self.x._set_current(c.new(nv-v, logp=nlp))
66 self.note('communicate accepted: %s' % q, 1)
67 else:
68 self.note('communicate not accepted %g vs %g' % (nlp, lp), 1)
69 self.x._set_current(self.x.current())
70 sys.stdout.flush()
71
72
73 MPIID = 1241
74
75
76
77
78
79
80
81
82
84 self.note('_nye pre', 5)
85 ncsum = mpi.allreduce(float(nc), MPI.SUM)
86 self.note('_nye post', 5)
87 return ncsum/float(size())
88
89
91 mytmp = (numpy.sometrue(numpy.less(xchanged,nchg))
92 or es<1.0 or dotchanged<ndot
93 or self.x.acceptable.T()>1.5
94 )
95 self.note('_nab pre', 5)
96 ntrue = mpi.allreduce(int(mytmp), MPI.SUM)
97 self.note('_nab post', 5)
98 return ntrue*4 >= size()
99
100
101 - def join(self, id):
102 self.note('pre join %s' % id, 5)
103 rootid = mpi.bcast(id)
104 assert rootid == id
105 self.note('post join %s' % id, 5)
106
107
108 - def note(self, s, lvl):
109 if Debug >= lvl:
110 sys.stderr.writelines('# %s, rank=%d\n' % (s, rank()))
111 sys.stderr.flush()
112
113
115 """Does a parallel evaluation of logp for all items in lop.
116 """
117 nper = len(lop)//size()
118 r = rank()
119 mychunk = lop[r*nper:(r+1)*nper]
120 for p in mychunk:
121 q = p.logp()
122 print 'logp=', q, 'for rank', r
123 for r in range(size()):
124 nc = mpi.bcast(mychunk, r)
125 lop[r*nper:(r+1)*nper] = nc
126 mpi.barrier()
127
128
130 return mpi.Get_rank() == 0
131
133 return mpi.Get_size()
134
136 return mpi.Get_rank()
137
138
140 def test_logp(x, c):
141
142 return -(x[0]-x[1]**2)**2 + 0.001*x[1]**2
143 x = mcmc.bootstepper(test_logp, numpy.array([0.0,2.0]),
144 numpy.array([[1.0,0],[0,2.0]]))
145 print 'TEST: rank=', rank()
146 thr = stepper(x)
147
148
149
150 for i in range(2):
151 print 'RTC'
152 thr.run_to_change(2)
153 print 'RTE'
154 thr.run_to_ergodic(1.0)
155 print 'DONE'
156 thr.close()
157
158
159
160 if __name__ == '__main__':
161 test_()
162