# Source Code for Module lib.power

```  1  #!/usr/bin/env python
2
3  import math
4  import numpy
5  from gmisclib import hilbert_xform
6  from gpk_voicing import power1 as P1
7
8  # import pylab
9
10 -def calc_var_of_window():
11          N = 100
12          phase = (math.pi/float(N))*(numpy.arange(N)+0.5)
13          w = 1 + numpy.cos(phase)
14          return numpy.sum(w*numpy.square(phase))/numpy.sum(w)
15
16  _ALPHA = 1.0/math.sqrt(calc_var_of_window())
17
18
19 -def smooth(ph, dt_in, dt_out, extra=0.0, wt=None):
20          """Smooths a data set, simultaneously resampling to
21          a lower sampling rate.   It uses successive boxcar averages
22          followed by decimations for the initial smooth, then a convolution
23          with a Gaussian.   Even if C{dt_out>>dt_in}, it only uses
24          C{O[log(dt_out/dt_in)} operations.
25          @param dt_in: input sampling rate.
26          @param dt_out: output sampling rate.
27          @param extra: extra smoothing time constant to apply.
28                  Extra is the standard deviation of a Gaussian kernel smooth that is
29                  applied as the last step.
30                  This last step is not implemented efficiently, so if if C{extra>>dt_out}
31                  it can slow down the algorithm substantially.
32          @type extra: float  (in the same units as dt_in and dt_out).
33          @type dt_in: float  (in the same units as extra and dt_out).
34          @type dt_out: float  (in the same units as dt_in and extra).
35          @param ph: Normally a 1-dimensional array containing data to be smoothed.
36                  If the data is higher-dimensional, the time axis is assumed to run
37                  along axis=0, and the return value will be an array of the same dimension.
38          @type ph: L{numpy.ndarray}.
39          @param ph: None (which indicates a uniform weighting) or
40                  a L{numpy.ndarray} that is the same length (axis 0) as C{ph}.
41          @type wt: L{numpy.ndarray}
42          @return: C{(rv, t0)} where C{rv} is a L{numpy} array and C{t0} it a C{float}
43                  offset of the first element, relative to the start of the input data.
44          """
45          assert dt_in <= dt_out
46          dt_in = float(dt_in)
47          # dt_ratio = float(dt_out)/dt_in
48          w = math.hypot(dt_out, _ALPHA*extra)
49          # print "w=", w, "dt_ratio=", dt_ratio, "ALPHA=", _ALPHA, "extra=", extra, "dt_in=", dt_in
50          assert w >= dt_in
51
52          # First, we try to see if it makes sense to do it in three
53          # passes:
54          dt_m1 = min(dt_out, ((w/dt_in)**0.667)*dt_in)
55          ifac = int(round(dt_out/dt_m1))
56          dt_m1 = dt_out/ifac
57          assert dt_in <= dt_m1 <= dt_out
58          dt_m2 = dt_m1/int(round(math.sqrt(dt_m1/dt_in)))
59          assert dt_in <= dt_m2 <= dt_m1
60          if w >= 3*dt_m1 and dt_m1 >= 3*dt_m2 and dt_m2 >= 3*dt_in:
61                  # print '3 steps:', dt_in, dt_m2, dt_m1, dt_out
62                  # pylab.plot(numpy.arange(ph.shape[0])*dt_in, ph)
63                  # print "dt_m1/dt_m2=", dt_m2/dt_in
64                  sm2, wt2 = P1.smooth_guts(ph, dt_in, dt_m2, dt_m2/dt_in, wt)
65                  # pylab.plot(numpy.arange(sm2.shape[0])*dt_m2, sm2)
66                  # print "dt_m1/dt_m2=", dt_m1/dt_m2
67                  sm1, wt1 = P1.smooth_guts(sm2, dt_m2, dt_m1, dt_m1/dt_m2, wt2)
68                  # pylab.plot(numpy.arange(sm1.shape[0])*dt_m1, sm1)
69                  # print "dt_m1=", dt_m1, "dt_out=", dt_out, "w/dt_m1=", w/dt_m1
70                  tmp = P1.smooth_guts(sm1, dt_m1, dt_out, w/dt_m1, wt1)[0]
71                  # pylab.plot(numpy.arange(tmp.shape[0])*dt_out, tmp)
72                  return (tmp, 0.0)
73                  # return (P1.smooth_guts(sm1, dt_m1, dt_out, w/dt_m1, wt1)[0], 0.0)
74          # If not, we try to see if it makes sense to do it in two
75          # passes:
76          dt_mid = min(dt_out, math.sqrt(w*dt_in))
77          ifac = int(round(dt_out/dt_mid))
78          dt_mid = dt_out/ifac
79          assert dt_in <= dt_mid <= dt_out
80          if w >= 3*dt_mid and dt_mid >= 3*dt_in:
81                  # print '2 steps'
82                  sm1, wt1 = P1.smooth_guts(ph, dt_in, dt_mid, dt_mid/dt_in, wt)
83                  return (P1.smooth_guts(sm1, dt_mid, dt_out, w/dt_mid, wt1)[0], 0.0)
84          # Otherwise, we do it in a single pass.
85          # print '1 step'
86          return (P1.smooth_guts(ph, dt_in, dt_out, w/dt_in, wt)[0], 0.0)
87
88
89 -def smooth_guts(ph, dt_in, dt_out, w, wt=None):
90          assert w >= 1.0
91          assert dt_in <= dt_out
92          dt_ratio = dt_out/dt_in
93          no = int(math.floor(ph.shape[0]/dt_ratio))
94          o = numpy.zeros((no,)+ph.shape[1:], numpy.float)
95          if wt is not None:
96                  owt = numpy.zeros((no,)+ph.shape[1:], numpy.float)
97
98          # print 'w=', w, 'dj=', dj, 'dt_ratio=', dt_ratio
99          tmp0 = numpy.zeros((int(2*w)+3,))
100          cache = {}
101          SS = float(10.0)
102          ni = ph.shape[0]
103          for i in range(no):
104                  jr = i*dt_ratio
105                  jrr = round(jr*SS)/SS
106                  j0 = max(0, int(math.ceil(jrr-w)))
107                  je = min(ni, int(math.floor(jrr+w))+1)
108                  wkey = (je-j0, jrr-j0)
109                  try:
110                          window, wsum = cache[wkey]
111                  except KeyError:
112                          window = _wfunc(je-j0, jrr-j0, w)
113                          wsum = numpy.sum(window)
114                          cache[wkey] = (window, wsum)
115                  tmp = tmp0[:je-j0]
116                  if wt is not None:
117                          numpy.multiply(wt[j0:je], window, tmp)
118                          # owt[i] = numpy.sum(wt[j0:je]*window, axis=0)
119                          owt[i] = numpy.sum(tmp)
120                          numpy.multiply(tmp, ph[j0:je], tmp)
121                          # o[i] = numpy.sum(ph[j0:je]*wt[j0:je]*window, axis=0)/owt[i]
122                          o[i] = numpy.sum(tmp)/owt[i]
123                  else:
124                          numpy.multiply(ph[j0:je], window, tmp)
125                          # o[i] = numpy.sum(ph[j0:je]*window, axis=0)/numpy.sum(window, axis=0)
126                          o[i] = numpy.sum(tmp)/wsum
127                          owt = None
128          return o, owt
129
130
131 -def _wfunc(n, ctr, w):
132          phase = (math.pi/w) * (numpy.arange(n) - ctr)
133          # print 'phases for', n, ctr, w, '=', phase
134          assert phase[0]>=-3.142 and phase[-1]<=3.142, "Bad phases: %s or %s for %s %s %s" % (str(phase[0]), str(phase[-1]), str(n), str(ctr), str(w))
135          return 1.0 + numpy.cos(phase)
136
137
138 -def old_smooth(ph, dt_in, dt_out, extra=0.0, wt=None):
139          """Smooths a data set, simultaneously resampling to
140          a lower sampling rate.   It uses successive boxcar averages
141          followed by decimations for the initial smooth, then a convolution
142          with a Gaussian.   Even if C{dt_out>>dt_in}, it only uses
143          C{O[log(dt_out/dt_in)} operations.
144          @param dt_in: input sampling rate.
145          @param dt_out: output sampling rate.
146          @param extra: extra smoothing time constant to apply.
147                  Extra is the standard deviation of a Gaussian kernel smooth that is
148                  applied as the last step.
149                  This last step is not implemented efficiently, so if if C{extra>>dt_out}
150                  it can slow down the algorithm substantially.
151          @type extra: float  (in the same units as dt_in and dt_out).
152          @type dt_in: float  (in the same units as extra and dt_out).
153          @type dt_out: float  (in the same units as dt_in and extra).
154          @param ph: a 1???-dimensionan array containing data to be smoothed.
155                  (Query: will this work for higher-dimensional data?)
156          @type ph: L{numpy.array}.
157          @return: C{(rv, t0)} where C{rv} is a L{numpy} array and C{t0} it a C{float}
158                  offset of the first element, relative to the start of the input data.
159          """
160          assert dt_in <= dt_out
161          dt_ratio = dt_out/dt_in
162          no = int(math.floor(ph.shape[0]/dt_ratio))
163          # print 'no=', dt_in, len(ph), dt_out, dt_in*len(ph)/dt_out
164          dti = dt_in
165          # In vari, we accumulate the width of the equivalent
166          # convolution kernel that has been applied so far.
167          # Specifically, it is the second moment of the kernel.
168          if wt is not None:
169                  ph = ph * wt
170                  weight = wt
171          else:
172                  weight = numpy.ones((ph.shape[0],), numpy.int32)
173          kwi = 0.0
174          t0 = 0.0
175          while dti < 0.33*dt_out:
176                  # print "ph=", ph
177                  nph = numpy.array(ph[::2], copy=True)
178                  ph12 = ph[1::2]
180                  del ph12
181                  ph = nph
182                  nwt = numpy.array(weight[::2], copy=True)
183                  wt12 = weight[1::2]
185                  del wt12
186                  weight = nwt
187
188                  t0 += 0.5*dti
189                  kwi += (dti/2.0)**2
190                  dti *= 2.0
191          # kwo is the desired width of the convolution kernel
192          # between th input and output data streams.
193          # It is just the second moment of a rectangular boxcar average.
194          kwo = extra**2 + dt_out**2/12.0 - dt_in**2/12.0
195          # kwx is the extra smoothing kernel that is yet to be applied.
196          kwx = kwo - kwi
197          assert kwx >= 0.0
198
199          # print 'ph=', ph
200          # Now, we set up a Gaussian of the appropriate width
201          # to do the final convolution.
202          if kwx > 0.0:
203                  N = int(math.ceil(2.3*math.sqrt(kwx)/dti))
204                  i = numpy.arange(2*N+1) - N
205                  k0 = numpy.exp( (-1/(2*kwx)) * (i*dti)**2 )
206                  assert k0.shape[0] < ph.shape[0]
207                  kernel = k0/numpy.sum(k0)
208                  ph = numpy.convolve(ph, kernel, 1)
209                  if wt is not None:
210                          weight = numpy.convolve(weight, kernel, 1)
211          numpy.divide(ph, weight, ph)
212          # print 'phs=', ph
213
214          # Then grab the necessary samples.
215          q = numpy.arange(no)*(dt_out/dti)
216          qi = numpy.around(q).astype(numpy.int)
217          assert qi[-1] < ph.shape[0]
218          rv = numpy.take(ph, qi)
219          assert rv.shape[0] == no
220          return (rv, t0)
221
222
223 -def local_power(d, dt_in, dt_out, extra=0.0):
224          """ THIS IS WRONG!   IT PROBABLY SHOULD BE something like
225          sqrt(d**2 + hilbert_transform(d)**2).
226          The hilbert transform just supplies the imaginary part of
227          an analytic function.   Current code leaves out the
229          """
230          raise RuntimeError, "Bug!"
231          cutoff = math.hypot(dt_out, extra)/dt_in
232          ph = numpy.absolute(hilbert_xform.hilbert(d, cutoff))
233          return smooth(ph, dt_in, dt_out, extra)
234
235
236 -def test1():
237          x = numpy.zeros((50,), numpy.float)
238          x[10:] = 1.0
239          k = [0.1, 0.2, 0.4, 0.2, 0.1]
240          xk = numpy.convolve(x, k, 1)
241          xs0, ts0 = smooth(x, 1.0, 1.0, 0.01)
242          xs1, ts1 = smooth(x, 1.0, 1.0, 1.0)
243          xs2, ts2 = smooth(x, 1.0, 1.0, 2.0)
244          for i in range(20):
245                  print i, x[i], xk[i], xs0[i], xs1[i], xs2[i]
246                  assert numpy.alltrue( numpy.equal(numpy.less(x, 0.5),
247                                                  numpy.less(xk, 0.5)
248                                                  )
249                                          )
250                  assert numpy.alltrue( numpy.equal(numpy.less(x, 0.5),
251                                                  numpy.less(xs0, 0.5)
252                                                  )
253                                          )
254                  assert numpy.alltrue( numpy.equal(numpy.less(x, 0.5),
255                                                  numpy.less(xs1, 0.5)
256                                                  )
257                                          )
258                  assert numpy.alltrue( numpy.equal(numpy.less(x, 0.5),
259                                                  numpy.less(xs2, 0.5)
260                                                  )
261                                          )
262
263
264 -def test2():
265          x = numpy.zeros((500,), numpy.float)
266          x[100:] = 1.0
267          xs0, ts0 = smooth(x, 0.1, 1.0, 0.01)
268          xs1, ts1 = smooth(x, 0.1, 1.0, 1.0)
269          xs2, ts2 = smooth(x, 0.1, 1.0, 2.0)
270          for i in range(200):
271                  if i%10 == 5:
272                          print i, x[i], xs0[i//10], xs1[i//10], xs2[i//10]
273                  else:
274                          print i, x[i]
275
276 -def test3():
277          x = numpy.zeros((200,), numpy.float)
278          x[100] = 1.0
279          xs0, ts0 = smooth(x, 0.1, 0.1, 0)
280          xs1, ts1 = smooth(x, 0.1, 0.3, 0)
281          xs2, ts2 = smooth(x, 0.1, 0.9, 0)
282          xs3, ts3 = smooth(x, 0.1, 1.5, 0)
283          for i in range(197):
284                  o = [ '%d'%i ]
285                  o.append( '%g' % x[i])
286                  o.append( '%g' % xs0[i])
287                  if i%3 == 1:
288                          o.append('%g' % xs1[i//3])
289                  else:
290                          o.append('')
291
292                  if i%9 == 5:
293                          o.append('%g' % xs2[i//9])
294                  else:
295                          o.append('')
296
297                  if i%15 == 7:
298                          o.append('%g' % xs3[i//15])
299                  else:
300                          o.append('')
301
302                  print '\t'.join(o)
303
304
305  if __name__ == '__main__':
306          test1()
307          test2()
308          test3()
309
