Package gmisclib :: Module xwaves_mark
[frames] | no frames]

Source Code for Module gmisclib.xwaves_mark

  1  """Run as an independent program, this prints out the information 
  2  in a .out file, generated for/by ESPS Xmark. 
  3  If you give it a -a flag, it will print just a single attribute from the 
  4  header of the Xmark file. 
  5   
  6  For example: 
  7   
  8  xwaves_mark.py -a utterance 
  9   
 10  returns the top line, which is normally the transcribed utterance. 
 11  """ 
 12   
 13  import os 
 14  import sys 
 15  from gmisclib import die 
 16  import types 
 17  from gmisclib.xwaves_errs import * 
 18   
 19  DecrTol = 0.004999      # The maximum amount that time marks can decrease 
 20                          # before it is considered an error. 
 21   
 22   
23 -def _monotonize_tail(d):
24 i = len(d)-1 25 t = d[-1][0] 26 sum = 0.0 27 n = 0 28 while i>=0 and isinstance(d[i], types.TupleType): 29 if d[i][0] < t: 30 break 31 sum += d[i][0] 32 n += 1 33 i -= 1 34 avg = sum/float(n) 35 for j in range(i+1, len(d)): 36 tt, label, ty = d[j] 37 d[j] = (avg, label, ty)
38 39 40 41 42 43 PHONE = 1 44 WORD = 0 45
46 -def write(fd, hdr, data, SortData=1):
47 cs = hdr.get('_COMMENT', '').split('\n') 48 for c in cs: 49 fd.writelines('# %s\n' % c) 50 for (k, v) in hdr.items(): 51 if k != '_COMMENT' and k != 'utterance': 52 fd.writelines('%s %s\n' % (k, v)) 53 if hdr.has_key('utterance'): 54 fd.writelines('** %s\n' % hdr['utterance']) 55 else: 56 fd.writelines('**\n') 57 58 if SortData: 59 tmp = data[:] 60 tmp.sort() 61 data = tmp 62 63 last = None 64 for (t, lbl, ty) in data: 65 if t < last: 66 raise DataOutOfOrderError((t, lbl, ty)) 67 if ty == WORD: 68 fd.writelines('%s\n' % lbl) 69 else: 70 fd.writelines('\t%s\t%f\n' % (lbl, t)) 71 last = t
72 73 74
75 -def read(filename):
76 """Read in .in files produced by ESPS xmark. 77 Returns (header, data), 78 where data is (time, word_or_phoneme, type), 79 where type is 0 for words, 1 for phonemes. 80 Times are guarenteed to be increasing inside the set of 81 all words, and also inside the set of all phonemes. 82 Word marks preceed the corresponding phoneme marks. 83 """ 84 85 hdr = {} 86 if filename == '-': 87 fd = sys.stdin 88 else: 89 try: 90 fd = open(filename, "r") 91 except IOError, x: 92 raise NoSuchFileError(x) 93 94 # First, we read the header: 95 comments = [] 96 n = 0 97 while True: 98 l = fd.readline() 99 n += 1 100 if l == '': 101 raise RuntimeError, 'Premature EOF / bad file format: %s:%d' % (filename, n) 102 if l.startswith('#'): 103 comments.append(l[1:].strip()) 104 continue 105 if l.startswith('**'): 106 hdr['utterance'] = l[2:].strip() 107 # Header is terminated by a line starting '**'. 108 break 109 try: 110 a, v = l.split(None, 1) 111 # Header can contain attribute/value pairs. 112 except ValueError: 113 die.warn("Line %d:"%n + l) 114 raise BadFileFormatError, '%s:%d' % (filename,n) 115 hdr[a.strip()] = v.strip() 116 117 # Now, we read in the data: 118 d = [] 119 t_last = -1e30 120 while True: 121 l = fd.readline() 122 n += 1 123 if not l: # EOF 124 break 125 ls = l.strip() 126 if not ls: # Ignore blank lines. 127 continue 128 if l[0].isspace(): # Segmentation information is indented. 129 try: 130 label, t = ls.split(None, 2) 131 except ValueError: # Only one thing on the line. 132 die.warn("Incomplete file: File: %s, Line %d:(%s)"%(filename, n, ls)) 133 # This is not a fatal problem at the end, especially not 134 # if the last label is silence. 135 tmp = fd.readline() 136 if tmp == '': # OK. Incomplete was last line. 137 break 138 raise BadFileFormatError, '%s:%d' % (filename, n) 139 t = float(t) 140 if t >= t_last: 141 d.append((t, label, PHONE)) # Put a tuple on the list. 142 elif t >= t_last - DecrTol: 143 d.append((t, label, PHONE)) # Put a tuple on the list. 144 _monotonize_tail(d) # Force the times to be monotonic. 145 die.warn("Time decreases slightly: %s:%d" % (filename, n)) 146 else: 147 raise DataOutOfOrderError, 'time is decreasing: %s:%d' % (filename, n) 148 t_last = t 149 else: # Words are not indented. 150 d.append(ls) # Put a string on the list. 151 152 fd.flush() 153 # os.fsync(fd.fileno()) # Commit to disk. 154 fd = None # This will close the file descriptor unless 155 # (as in the case of sys.stdin) something else 156 # is holding a reference. 157 158 # Next, we need to add a null word to the beginning. Recall that we report ending 159 # times: without a null word, we only know when the first word ended, not when 160 # it started. It starts when the null word ends, of course. 161 if len(d)>0 and isinstance(d[0], types.TupleType): 162 d.insert(0, '') 163 164 # Now, we go through on a second pass, and add timing information to the words. 165 # We assume that the word comes first, then the phonemes into which it is segmented. 166 defer = None # Marking the position of the last word seen. 167 last_t = None 168 for i in range(len(d)): 169 if isinstance(d[i], str): # Word -- just the string. 170 if defer is not None: 171 # Go back, and fix up the previous word, 172 # now that we have the ending time. 173 d[defer] = (last_t, d[defer], WORD) 174 defer = i 175 else: # Tuple: phoneme (segmentation) 176 last_t = d[i][0] # Remember the ending time of the phoneme. 177 178 if defer is not None: 179 d[defer] = (last_t, d[defer], WORD) 180 181 hdr['_COMMENT'] = '\n'.join(comments) 182 hdr['_NAME'] = filename 183 hdr['_FILETYPE'] = 'xmark' 184 hdr['NAXIS'] = 2 185 hdr['NAXIS2'] = len(d) 186 hdr['NAXIS1'] = 3 187 hdr['TTYPE1'] = 'time' 188 hdr['TUNIT1'] = 's' 189 hdr['TTYPE2'] = 'label' 190 hdr['TTYPE3'] = 'is_phoneme' 191 192 return (hdr, d)
193 194
195 -def mark_to_lab(data, ty):
196 """This function lets you take a mixed list 197 of word and phone labels, such as provided 198 by xwaves_mark.read(), and will select out 199 one or the other type. 200 Type is PHONE or WORD. 201 """ 202 return [(time, label) for (time, label, typ) in data if typ==ty ]
203 204 205
206 -def combine_2labs(whdr, wd, phdr, pd, utterance, TOL=0.001):
207 """Combine two XLAB files into one XMARK file. 208 It forces (within rounding errors) the words 209 to enclose the phones.""" 210 211 import xwaves_lab 212 213 hdr = phdr.copy() 214 hdr.update(whdr) 215 hdr['utterance'] = utterance 216 217 dw = xwaves_lab.start_stop(wd, dropfirst=1) 218 dp = xwaves_lab.start_stop(pd, dropfirst=1) 219 220 o = [] 221 j = 0 222 o.append( ( dp[0][0], '*', PHONE) ) 223 # print "PRE S - %f *" % dp[0][0] 224 while j < len(dp) and dp[j][1] < dw[0][0]-TOL: 225 # print "PRE S %d %f %f %s" % (j, dp[j][0], dp[j][1], dp[j][2]) 226 o.append( ( dp[j][1], dp[j][2], PHONE) ) 227 j += 1 228 for (start, stop, w) in dw: 229 # print "w %f %f %s" % (start, stop, w) 230 w_appended = 0 231 while j<len(dp) and dp[j][1]<stop+TOL and dp[j][0]<stop: 232 if not w_appended and dp[j][2] != '*': 233 # print "W %f %f %s" % (dp[j][0], stop, w) 234 o.append( (dp[j][0], w, WORD) ) 235 w_appended = 1 236 # print "IN S %d %f %f %s" % (j, dp[j][0], dp[j][1], dp[j][2]) 237 o.append( (dp[j][1], dp[j][2], PHONE) ) 238 j += 1 239 if not w_appended: 240 o.append( ( start, w, WORD) ) 241 while j < len(dp): 242 # print "POST S %d %f %f %s" % (j, dp[j][0], dp[j][1], dp[j][2]) 243 o.append( (dp[j][1], dp[j][2], PHONE) ) 244 j += 1 245 return (hdr, o)
246 247 248 __doc__ = read.__doc__ 249 250 if __name__ == '__main__': 251 arglist = sys.argv[1:] 252 if arglist[0] == '-a': 253 arglist.pop(0) 254 key = arglist.pop(0) 255 hdr, data = read(arglist.pop(0)) 256 print hdr[key] 257 elif arglist[0] == '-tolab': 258 import xwaves_lab 259 arglist.pop(0) 260 ty = int(arglist.pop(0)) 261 hdr, data = read(arglist.pop(0)) 262 xwaves_lab.write(sys.stdout, hdr, mark_to_lab(data, ty)) 263 elif arglist[0] == '-fromlab': 264 import xwaves_lab 265 arglist.pop(0) 266 whdr, wd = xwaves_lab.read(arglist.pop(0)) 267 phdr, pd = xwaves_lab.read(arglist.pop(0)) 268 hdr, data = combine_2labs(whdr, wd, phdr, pd, 269 whdr.get('utterance', phdr.get('utterance', '')), 270 TOL=0.001) 271 write(sys.stdout, hdr, data, SortData=1) 272 else: 273 print read(sys.argv[1]) 274