Module irregularity
[frames] | no frames]

Source Code for Module irregularity

  1  #!/usr/bin/env python 
  2   
  3  """This script produces a measure of the irregularity of 
  4  a waveform.  It is zero if the waveform is precisely periodic. 
  5  It is approximately 1 for white noise. 
  6   
  7  This is used as the aperiodicity measure in 
  8  Kochanski, Coleman, Grabe and Rosner JASA 2005. 
  9  """ 
 10   
 11  import sys 
 12  import math as M 
 13  import numpy 
 14  from gmisclib import die 
 15  from gpk_voicing import power 
 16  import gpkimgclass 
 17  from gpk_voicing import voice_misc 
 18   
 19  DELTA_DELAY = 1.0/(2400.0 * 2 * M.pi) 
 20  # DELTA_DELAY is chosen so that we can align the oscillations 
 21  # at the first formant (f1 is normally under 1200 Hz) 
 22  # to an accuracy of 1 radian or less. 
 23   
 24  Fmin = 50.0 
 25  Fmax = 500.0 
 26  Fhp = 500.0 
 27  WINDOW = 0.020 
 28   
 29  BLOCK_EDGE = 0.1        # Seconds 
 30   
 31  DBG = None 
 32   
 33  # def one_predict(d, delay, dt, Dt): 
 34          # zl = Num.zeros((delay/2,), Num.Float) 
 35          # zr = Num.zeros((delay-delay/2,), Num.Float) 
 36          # dtmp = Num.concatenate( (zl, d, zr) ) 
 37          # tmp = dtmp[delay:] - dtmp[:-delay] 
 38          # assert tmp.shape == d.shape 
 39          # return power.smooth(tmp*tmp, dt, Dt, extra=WINDOW) 
 40   
 41   
42 -def predict_err(d, delays, dt, Dt):
43 global WINDOW 44 assert min(delays) >= 0 45 mxdelay = max(delays)//2 + 1 46 n = d.shape[0] 47 ze = numpy.zeros((mxdelay,)) 48 dtmp = numpy.concatenate( (ze, d, ze) ) 49 del d # save memory. 50 o = None 51 for delay in delays: 52 dlh = delay//2 53 dlH = delay - dlh 54 ddtmp = dtmp[ mxdelay+dlh : mxdelay+dlh+n ] - dtmp[ mxdelay-dlH : mxdelay-dlH+n ] 55 assert ddtmp.shape[0] == n, "Shapes do not match: %s and %s" % (ddtmp.shape, n) 56 tmp, t0 = power.smooth(ddtmp**2, dt, Dt, extra=WINDOW) 57 if DBG: 58 DBG.plot(tmp) 59 if o is None: 60 o = tmp 61 else: 62 numpy.minimum(o, tmp, o) 63 if DBG: 64 DBG.show() 65 return o
66 67
68 -def local_power(d, dt, Dt):
69 global WINDOW 70 return power.smooth(d**2, dt, Dt, extra=WINDOW)
71 72
73 -def fractional_step_range(initial, final, step=1):
74 """Generates an array of window offsets. 75 No two offsets are equal, and they are integers 76 between initial and final.""" 77 78 assert step != 0 79 if step < 0: 80 return fractional_step_range(initial, final, -step) 81 if final < initial: 82 return [] 83 o = [] 84 tmp = float(initial) 85 assert tmp > 0.5, "Initial=%g too close to zero, or even negative!" % tmp 86 ilast = 0 87 while tmp <= final: 88 itmp = int(round(tmp)) 89 if itmp != ilast and itmp>=initial: 90 o.append(itmp) 91 ilast = itmp 92 tmp += step 93 return o
94 95
96 -def aperiodicity(d, dt, Dt):
97 if Fmin*dt > 0.5 or Fmin*dt<1.0/d.shape[0]: 98 die.warn("Silly time scale") 99 die.info("Hipass at %f" % (Fmin*dt)) 100 d = voice_misc.hipass_sym_butterworth(d, Fmin*dt) 101 d = voice_misc.hipass_first_order(d, Fhp*dt) 102 if DBG is not None: 103 DBG.plot(numpy.arange(d.shape[0])*dt, d, 'k-') 104 DBG.title('filtered signal') 105 p, tshift = local_power(d, dt, Dt) 106 if DBG is not None: 107 DBG.figure() 108 DBG.plot(tshift+numpy.arange(p.shape[0])*Dt, p) 109 DBG.title('Local power') 110 DBG.show() 111 assert tshift < Dt 112 imin = 1.0/(dt * Fmax) 113 imax = 1.0/(dt * Fmin) 114 istep = DELTA_DELAY/dt 115 delays = fractional_step_range(imin, imax, istep) 116 perr = predict_err(d, delays, dt, Dt) 117 # The factor of 2.0 below is because perr is a difference 118 # of two signals. If they were independent random noise, 119 # then the variances would add, so that perr=2*p for 120 # white noise. 121 # In other words, this measure goes to 1 for white noise, 122 # and to zero for a periodic signal. 123 assert perr.shape == p.shape 124 return (tshift, numpy.sqrt(perr/(2.0*p)))
125 126
127 -def block_aperiodicity(data, dt, Dt, blocksize=6.0):
128 if blocksize is None: 129 return aperiodicity(data, dt, Dt) 130 assert BLOCK_EDGE >= Dt 131 blocksize = Dt * int(round(blocksize/Dt)) 132 assert blocksize > BLOCK_EDGE 133 m = data.shape[0] 134 ntau = int(M.ceil(m*dt/blocksize)) 135 nout = int(M.ceil(m*dt/Dt)) 136 pad = int(round(Dt*round(BLOCK_EDGE/Dt)/dt)) 137 138 irr = None 139 tshift = None 140 S0 = 0.0 141 e = 0 142 for i in range(ntau): 143 s0 = int(round((m*i)/float(ntau))) 144 se = int(round((m*(i+1))/float(ntau))) 145 ps0 = max(0, s0-pad) 146 pse = min(m, se+pad) 147 ttshift, x = aperiodicity(data[ps0:pse], dt, Dt) 148 if tshift is None: 149 tshift = ttshift 150 if irr is None: 151 irr = numpy.zeros((nout+1,)) 152 tau0 = int(M.floor((s0-ps0)*(dt/DT))) 153 nS = min((se-s0)*(dt/DT), x.shape[0]-tau0) 154 inS = int(M.ceil(nS)) 155 iS0 = int(M.floor(S0)) 156 assert iS0+inS <= nout, "S0=%.1f nS=%.1f S0+nS=%.1f nout=%d" % (S0, nS, S0+nS, nout) 157 # print 'irr', irr.shape, iS0, iS0+inS, 'x=', x.shape, tau0, tau0+inS 158 irr[iS0:iS0+inS] = x[tau0:tau0+inS] 159 S0 += nS 160 e = iS0+inS 161 assert nout-2 <= e <= nout+1, "e=%d nout=%d" % (e, nout) 162 return (tshift, irr[:e])
163 164
165 -def test(pylab):
166 import random 167 DT = 0.01 168 dt = 1.0/random.normalvariate(16000.0, 1000.0) 169 L = random.expovariate(1.0/15.0) 170 n = int(round(L/dt)) 171 signal = numpy.random.normal(0.0, 0.1, size=(n,)) 172 step = int(round(2.0/dt)) 173 k = int(round(0.5/dt)) 174 omega = 2*M.pi*100.0 175 i = 0 176 while i < n: 177 kk = min(k, n-i) 178 signal[i:i+kk] = numpy.sin(numpy.arange(kk)*dt*omega) 179 if i+k < n: 180 kk = min(k, n-i-k) 181 signal[i+k:i+k+kk] = numpy.random.normal(0.0, 0.9, size=(kk,)) 182 i += step 183 184 if pylab: 185 pylab.plot(numpy.arange(signal.shape[0])*dt, signal) 186 tshift, o = block_aperiodicity(signal, dt, DT) 187 x = numpy.arange(o.shape[0])*DT + tshift 188 if pylab: 189 pylab.plot(x, o) 190 pylab.show()
191 192 193 194 if __name__ == '__main__': 195 DT = 0.01 # Seconds. Default output sampling interval. 196 arglist = sys.argv[1:] 197 Blocksize = 10.0 198 arglist0 = arglist 199 column = None 200 signalfile = None 201 outfile = None 202 extrahdr = {} 203 while arglist and arglist[0].startswith('-'): 204 arg = arglist.pop(0) 205 if arg == '--': 206 break 207 elif arg == '-params': # For optimization purposes 208 Fmin = atof(arglist.pop(0)) 209 Fmax = atof(arglist.pop(0)) 210 Fhp = atof(arglist.pop(0)) 211 WINDOW = atof(arglist.pop(0)) 212 elif arg == '-dt': 213 DT = float(arglist.pop(0)) 214 elif arg == '-f': 215 signalfile = arglist.pop(0) 216 elif arg == '-c': 217 tmp = arglist.pop(0) 218 try: 219 column = int( tmp ) 220 except ValueError: 221 column = tmp 222 elif arg == '-b': 223 Blocksize = float(arglist.pop(0)) 224 if Blocksize <= 0.0: 225 Blocksize = None 226 elif arg == '-d': 227 import pylab as DBG 228 elif arg == '-test': 229 test(None) 230 sys.exit(0) 231 elif arg == '-Test': 232 import pylab 233 test(pylab) 234 sys.exit(0) 235 elif arg == '-o': 236 outfile = arglist.pop(0) 237 elif arg == '-write': 238 extrahdr = dict( [q.strip().split('=', 1) 239 for q in arglist.pop(0).split(';') ] 240 ) 241 else: 242 die.info("Unrecognized flag: %s" % arg) 243 print __doc__ 244 die.exit(1) 245 if arglist and signalfile is None: 246 signalfile = arglist.pop(0) 247 if arglist and outfile is None: 248 outfile = arglist.pop(0) 249 elif outfile is None: 250 outfile = 'pdur.dat' 251 if column is None: 252 column = 0 253 if signalfile is None: 254 die.info("No signal file specified.") 255 print __doc__ 256 die.exit(1) 257 if arglist: 258 die.info('Extra arguments!') 259 print __doc__ 260 die.exit(1) 261 signal = gpkimgclass.read(signalfile) 262 try: 263 signal.column(column) 264 except KeyError: 265 die.info("File has %d columns" % signal.n[1]) 266 die.die("Bad column: %s" % str(column)) 267 268 tshift, o = block_aperiodicity(signal.column(column), signal.dt(), DT, 269 blocksize=Blocksize) 270 271 hdr = signal.hdr.copy() 272 hdr['program'] = sys.argv[0] 273 hdr['ARGV'] = arglist0 274 hdr['input_file'] = signalfile 275 hdr['column'] = column 276 hdr['CDELT2'] = DT 277 hdr['CRPIX2'] = 1 278 hdr['CRVAL2'] = signal.start() + tshift 279 hdr['CRPIX1'] = 1 280 hdr['CDELT1'] = 1 281 hdr['TTYPE1'] = 'aperiodicity' 282 hdr['BITPIX'] = -32 283 hdr['DataSamplingFreq'] = 1.0/signal.dt() 284 hdr.update( extrahdr ) 285 gpkimgclass.gpk_img(hdr, o).write(outfile) 286