Package lib :: Module power
[frames] | no frames]

Source Code for Module lib.power

  1  #!/usr/bin/env python 
  2   
  3  import math 
  4  import numpy 
  5  from gmisclib import hilbert_xform 
  6  from gpk_voicing import power1 as P1 
  7   
  8  # import pylab 
  9   
10 -def calc_var_of_window():
11 N = 100 12 phase = (math.pi/float(N))*(numpy.arange(N)+0.5) 13 w = 1 + numpy.cos(phase) 14 return numpy.sum(w*numpy.square(phase))/numpy.sum(w)
15 16 _ALPHA = 1.0/math.sqrt(calc_var_of_window()) 17 18
19 -def smooth(ph, dt_in, dt_out, extra=0.0, wt=None):
20 """Smooths a data set, simultaneously resampling to 21 a lower sampling rate. It uses successive boxcar averages 22 followed by decimations for the initial smooth, then a convolution 23 with a Gaussian. Even if C{dt_out>>dt_in}, it only uses 24 C{O[log(dt_out/dt_in)} operations. 25 @param dt_in: input sampling rate. 26 @param dt_out: output sampling rate. 27 @param extra: extra smoothing time constant to apply. 28 Extra is the standard deviation of a Gaussian kernel smooth that is 29 applied as the last step. 30 This last step is not implemented efficiently, so if if C{extra>>dt_out} 31 it can slow down the algorithm substantially. 32 @type extra: float (in the same units as dt_in and dt_out). 33 @type dt_in: float (in the same units as extra and dt_out). 34 @type dt_out: float (in the same units as dt_in and extra). 35 @param ph: Normally a 1-dimensional array containing data to be smoothed. 36 If the data is higher-dimensional, the time axis is assumed to run 37 along axis=0, and the return value will be an array of the same dimension. 38 @type ph: L{numpy.ndarray}. 39 @param ph: None (which indicates a uniform weighting) or 40 a L{numpy.ndarray} that is the same length (axis 0) as C{ph}. 41 @type wt: L{numpy.ndarray} 42 @return: C{(rv, t0)} where C{rv} is a L{numpy} array and C{t0} it a C{float} 43 offset of the first element, relative to the start of the input data. 44 """ 45 assert dt_in <= dt_out 46 dt_in = float(dt_in) 47 # dt_ratio = float(dt_out)/dt_in 48 w = math.hypot(dt_out, _ALPHA*extra) 49 # print "w=", w, "dt_ratio=", dt_ratio, "ALPHA=", _ALPHA, "extra=", extra, "dt_in=", dt_in 50 assert w >= dt_in 51 52 # First, we try to see if it makes sense to do it in three 53 # passes: 54 dt_m1 = min(dt_out, ((w/dt_in)**0.667)*dt_in) 55 ifac = int(round(dt_out/dt_m1)) 56 dt_m1 = dt_out/ifac 57 assert dt_in <= dt_m1 <= dt_out 58 dt_m2 = dt_m1/int(round(math.sqrt(dt_m1/dt_in))) 59 assert dt_in <= dt_m2 <= dt_m1 60 if w >= 3*dt_m1 and dt_m1 >= 3*dt_m2 and dt_m2 >= 3*dt_in: 61 # print '3 steps:', dt_in, dt_m2, dt_m1, dt_out 62 # pylab.plot(numpy.arange(ph.shape[0])*dt_in, ph) 63 # print "dt_m1/dt_m2=", dt_m2/dt_in 64 sm2, wt2 = P1.smooth_guts(ph, dt_in, dt_m2, dt_m2/dt_in, wt) 65 # pylab.plot(numpy.arange(sm2.shape[0])*dt_m2, sm2) 66 # print "dt_m1/dt_m2=", dt_m1/dt_m2 67 sm1, wt1 = P1.smooth_guts(sm2, dt_m2, dt_m1, dt_m1/dt_m2, wt2) 68 # pylab.plot(numpy.arange(sm1.shape[0])*dt_m1, sm1) 69 # print "dt_m1=", dt_m1, "dt_out=", dt_out, "w/dt_m1=", w/dt_m1 70 tmp = P1.smooth_guts(sm1, dt_m1, dt_out, w/dt_m1, wt1)[0] 71 # pylab.plot(numpy.arange(tmp.shape[0])*dt_out, tmp) 72 return (tmp, 0.0) 73 # return (P1.smooth_guts(sm1, dt_m1, dt_out, w/dt_m1, wt1)[0], 0.0) 74 # If not, we try to see if it makes sense to do it in two 75 # passes: 76 dt_mid = min(dt_out, math.sqrt(w*dt_in)) 77 ifac = int(round(dt_out/dt_mid)) 78 dt_mid = dt_out/ifac 79 assert dt_in <= dt_mid <= dt_out 80 if w >= 3*dt_mid and dt_mid >= 3*dt_in: 81 # print '2 steps' 82 sm1, wt1 = P1.smooth_guts(ph, dt_in, dt_mid, dt_mid/dt_in, wt) 83 return (P1.smooth_guts(sm1, dt_mid, dt_out, w/dt_mid, wt1)[0], 0.0) 84 # Otherwise, we do it in a single pass. 85 # print '1 step' 86 return (P1.smooth_guts(ph, dt_in, dt_out, w/dt_in, wt)[0], 0.0)
87 88
89 -def smooth_guts(ph, dt_in, dt_out, w, wt=None):
90 assert w >= 1.0 91 assert dt_in <= dt_out 92 dt_ratio = dt_out/dt_in 93 no = int(math.floor(ph.shape[0]/dt_ratio)) 94 o = numpy.zeros((no,)+ph.shape[1:], numpy.float) 95 if wt is not None: 96 owt = numpy.zeros((no,)+ph.shape[1:], numpy.float) 97 98 # print 'w=', w, 'dj=', dj, 'dt_ratio=', dt_ratio 99 tmp0 = numpy.zeros((int(2*w)+3,)) 100 cache = {} 101 SS = float(10.0) 102 ni = ph.shape[0] 103 for i in range(no): 104 jr = i*dt_ratio 105 jrr = round(jr*SS)/SS 106 j0 = max(0, int(math.ceil(jrr-w))) 107 je = min(ni, int(math.floor(jrr+w))+1) 108 wkey = (je-j0, jrr-j0) 109 try: 110 window, wsum = cache[wkey] 111 except KeyError: 112 window = _wfunc(je-j0, jrr-j0, w) 113 wsum = numpy.sum(window) 114 cache[wkey] = (window, wsum) 115 tmp = tmp0[:je-j0] 116 if wt is not None: 117 numpy.multiply(wt[j0:je], window, tmp) 118 # owt[i] = numpy.sum(wt[j0:je]*window, axis=0) 119 owt[i] = numpy.sum(tmp) 120 numpy.multiply(tmp, ph[j0:je], tmp) 121 # o[i] = numpy.sum(ph[j0:je]*wt[j0:je]*window, axis=0)/owt[i] 122 o[i] = numpy.sum(tmp)/owt[i] 123 else: 124 numpy.multiply(ph[j0:je], window, tmp) 125 # o[i] = numpy.sum(ph[j0:je]*window, axis=0)/numpy.sum(window, axis=0) 126 o[i] = numpy.sum(tmp)/wsum 127 owt = None 128 return o, owt
129 130
131 -def _wfunc(n, ctr, w):
132 phase = (math.pi/w) * (numpy.arange(n) - ctr) 133 # print 'phases for', n, ctr, w, '=', phase 134 assert phase[0]>=-3.142 and phase[-1]<=3.142, "Bad phases: %s or %s for %s %s %s" % (str(phase[0]), str(phase[-1]), str(n), str(ctr), str(w)) 135 return 1.0 + numpy.cos(phase)
136 137
138 -def old_smooth(ph, dt_in, dt_out, extra=0.0, wt=None):
139 """Smooths a data set, simultaneously resampling to 140 a lower sampling rate. It uses successive boxcar averages 141 followed by decimations for the initial smooth, then a convolution 142 with a Gaussian. Even if C{dt_out>>dt_in}, it only uses 143 C{O[log(dt_out/dt_in)} operations. 144 @param dt_in: input sampling rate. 145 @param dt_out: output sampling rate. 146 @param extra: extra smoothing time constant to apply. 147 Extra is the standard deviation of a Gaussian kernel smooth that is 148 applied as the last step. 149 This last step is not implemented efficiently, so if if C{extra>>dt_out} 150 it can slow down the algorithm substantially. 151 @type extra: float (in the same units as dt_in and dt_out). 152 @type dt_in: float (in the same units as extra and dt_out). 153 @type dt_out: float (in the same units as dt_in and extra). 154 @param ph: a 1???-dimensionan array containing data to be smoothed. 155 (Query: will this work for higher-dimensional data?) 156 @type ph: L{numpy.array}. 157 @return: C{(rv, t0)} where C{rv} is a L{numpy} array and C{t0} it a C{float} 158 offset of the first element, relative to the start of the input data. 159 """ 160 assert dt_in <= dt_out 161 dt_ratio = dt_out/dt_in 162 no = int(math.floor(ph.shape[0]/dt_ratio)) 163 # print 'no=', dt_in, len(ph), dt_out, dt_in*len(ph)/dt_out 164 dti = dt_in 165 # In vari, we accumulate the width of the equivalent 166 # convolution kernel that has been applied so far. 167 # Specifically, it is the second moment of the kernel. 168 if wt is not None: 169 ph = ph * wt 170 weight = wt 171 else: 172 weight = numpy.ones((ph.shape[0],), numpy.int32) 173 kwi = 0.0 174 t0 = 0.0 175 while dti < 0.33*dt_out: 176 # print "ph=", ph 177 nph = numpy.array(ph[::2], copy=True) 178 ph12 = ph[1::2] 179 numpy.add(nph[:ph12.shape[0]], ph12, nph[:ph12.shape[0]]) 180 del ph12 181 ph = nph 182 nwt = numpy.array(weight[::2], copy=True) 183 wt12 = weight[1::2] 184 numpy.add(nwt[:wt12.shape[0]], wt12, nwt[:wt12.shape[0]]) 185 del wt12 186 weight = nwt 187 188 t0 += 0.5*dti 189 kwi += (dti/2.0)**2 190 dti *= 2.0 191 # kwo is the desired width of the convolution kernel 192 # between th input and output data streams. 193 # It is just the second moment of a rectangular boxcar average. 194 kwo = extra**2 + dt_out**2/12.0 - dt_in**2/12.0 195 # kwx is the extra smoothing kernel that is yet to be applied. 196 kwx = kwo - kwi 197 assert kwx >= 0.0 198 199 # print 'ph=', ph 200 # Now, we set up a Gaussian of the appropriate width 201 # to do the final convolution. 202 if kwx > 0.0: 203 N = int(math.ceil(2.3*math.sqrt(kwx)/dti)) 204 i = numpy.arange(2*N+1) - N 205 k0 = numpy.exp( (-1/(2*kwx)) * (i*dti)**2 ) 206 assert k0.shape[0] < ph.shape[0] 207 kernel = k0/numpy.sum(k0) 208 ph = numpy.convolve(ph, kernel, 1) 209 if wt is not None: 210 weight = numpy.convolve(weight, kernel, 1) 211 numpy.divide(ph, weight, ph) 212 # print 'phs=', ph 213 214 # Then grab the necessary samples. 215 q = numpy.arange(no)*(dt_out/dti) 216 qi = numpy.around(q).astype(numpy.int) 217 assert qi[-1] < ph.shape[0] 218 rv = numpy.take(ph, qi) 219 assert rv.shape[0] == no 220 return (rv, t0)
221 222
223 -def local_power(d, dt_in, dt_out, extra=0.0):
224 """ THIS IS WRONG! IT PROBABLY SHOULD BE something like 225 sqrt(d**2 + hilbert_transform(d)**2). 226 The hilbert transform just supplies the imaginary part of 227 an analytic function. Current code leaves out the 228 read part! 229 """ 230 raise RuntimeError, "Bug!" 231 cutoff = math.hypot(dt_out, extra)/dt_in 232 ph = numpy.absolute(hilbert_xform.hilbert(d, cutoff)) 233 return smooth(ph, dt_in, dt_out, extra)
234 235
236 -def test1():
237 x = numpy.zeros((50,), numpy.float) 238 x[10:] = 1.0 239 k = [0.1, 0.2, 0.4, 0.2, 0.1] 240 xk = numpy.convolve(x, k, 1) 241 xs0, ts0 = smooth(x, 1.0, 1.0, 0.01) 242 xs1, ts1 = smooth(x, 1.0, 1.0, 1.0) 243 xs2, ts2 = smooth(x, 1.0, 1.0, 2.0) 244 for i in range(20): 245 print i, x[i], xk[i], xs0[i], xs1[i], xs2[i] 246 assert numpy.alltrue( numpy.equal(numpy.less(x, 0.5), 247 numpy.less(xk, 0.5) 248 ) 249 ) 250 assert numpy.alltrue( numpy.equal(numpy.less(x, 0.5), 251 numpy.less(xs0, 0.5) 252 ) 253 ) 254 assert numpy.alltrue( numpy.equal(numpy.less(x, 0.5), 255 numpy.less(xs1, 0.5) 256 ) 257 ) 258 assert numpy.alltrue( numpy.equal(numpy.less(x, 0.5), 259 numpy.less(xs2, 0.5) 260 ) 261 )
262 263
264 -def test2():
265 x = numpy.zeros((500,), numpy.float) 266 x[100:] = 1.0 267 xs0, ts0 = smooth(x, 0.1, 1.0, 0.01) 268 xs1, ts1 = smooth(x, 0.1, 1.0, 1.0) 269 xs2, ts2 = smooth(x, 0.1, 1.0, 2.0) 270 for i in range(200): 271 if i%10 == 5: 272 print i, x[i], xs0[i//10], xs1[i//10], xs2[i//10] 273 else: 274 print i, x[i]
275
276 -def test3():
277 x = numpy.zeros((200,), numpy.float) 278 x[100] = 1.0 279 xs0, ts0 = smooth(x, 0.1, 0.1, 0) 280 xs1, ts1 = smooth(x, 0.1, 0.3, 0) 281 xs2, ts2 = smooth(x, 0.1, 0.9, 0) 282 xs3, ts3 = smooth(x, 0.1, 1.5, 0) 283 for i in range(197): 284 o = [ '%d'%i ] 285 o.append( '%g' % x[i]) 286 o.append( '%g' % xs0[i]) 287 if i%3 == 1: 288 o.append('%g' % xs1[i//3]) 289 else: 290 o.append('') 291 292 if i%9 == 5: 293 o.append('%g' % xs2[i//9]) 294 else: 295 o.append('') 296 297 if i%15 == 7: 298 o.append('%g' % xs3[i//15]) 299 else: 300 o.append('') 301 302 print '\t'.join(o)
303 304 305 if __name__ == '__main__': 306 test1() 307 test2() 308 test3() 309