Package gmisclib :: Module avio
[frames] | no frames]

Source Code for Module gmisclib.avio

  1  """Module to parse and create lines in "a=v;" format. 
  2  Normally, the entry points are parse() and concoct(). 
  3  """ 
  4   
  5  import die 
  6  import g_encode 
  7  import gpk_writer 
  8   
  9  __version__ = "$Revision: 1.21 $" 
 10   
 11  _default_encoder = g_encode.encoder() 
 12   
13 -class BadFormatError(Exception):
14 - def __init__(self, *x):
15 Exception.__init__(self, *x)
16 17 18 fwd = _default_encoder.fwd 19 back = _default_encoder.back 20 21
22 -def parse(s, sep=';'):
23 """Parses a line in a=v; a=v form into a dictionary. 24 The line may be terminated by a '#' followed by an arbitrary 25 comments. Both attributes and values are assumed to be 26 encoded with the default L{g_encode} encoder. (However, the 27 comment is not assumed to be encoded.) 28 29 @param s: the string to be parsed. 30 @param sep: (optional) lets you choose the character 31 that separates attribute-value pairs. 32 @return: dictionary containing a=v pairs. One special 33 entry for comments: "_COMMENT". 34 @raise BadFormatError: when C{s} is not formatted properly. 35 """ 36 rv = {}; 37 if '#' in s: 38 ss, c = s.split('#', 1) 39 cs = c.strip() 40 if cs: 41 rv['_COMMENT'] = cs 42 if not ss: 43 return rv 44 else: 45 ss = s 46 avlist = ss.split(sep) 47 na = 0 48 for av in avlist: 49 try: 50 a, v = av.split('=', 1) 51 rv[a.strip()] = back(v.strip()) 52 na += 1 53 except ValueError: # need more than 1 value to unpack 54 # It's OK if it is entirely whitespace... 55 if av.strip(): 56 raise BadFormatError, 'no equals sign in "%s"' % str(av) 57 except g_encode.BadFormatError, x: 58 raise BadFormatError, "Bad character encoding: %s" % x 59 if na != len(rv): 60 raise BadFormatError, "Duplicate attribute (%d unique of %d): %s" % (len(rv), na, ','.join([av.split('=',1)[0] for av in avlist])) 61 return rv
62 63
64 -def concoct(s):
65 """Converts a dictionary to a line in the form a=v;a=v;#comment. 66 It returns a string. The entries in the dictionary will be 67 converted to a string representation by fwd().""" 68 c = None 69 l = [] 70 kvl = s.items() 71 kvl.sort() 72 for (k,v) in kvl: 73 if k == '_COMMENT': 74 c = str(v) 75 else : 76 l.append('%s=%s;' % (k, fwd(v))); 77 if c is not None: 78 l.extend( [ '#', c ] ) 79 return ' '.join(l)
80 81
82 -def read(fd, sep=';', line=None):
83 """Read a file in a=v; format. 84 If line is set, put the line number into a slot with the specified name. 85 @return: (data, comments) where data=[{a:v, ...}, ...] and comments=[str, ...] 86 @rtype: tuple(list(dict), list(str)) 87 @raise BadFormatError: when C{s} is not formatted properly. 88 """ 89 comments = [] 90 data = [] 91 line_num = 0 92 for l in fd: 93 line_num += 1 94 if l.startswith('#'): 95 c = l[1:].strip() 96 if c != '': 97 comments.append(c) 98 continue 99 if not l.endswith('\n'): 100 die.warn('avio.read(): ignoring line without newline.') 101 break 102 l = l.strip() 103 if l == '': 104 continue 105 tmp = parse(l, sep) 106 if line is not None: 107 tmp[line] = line_num 108 data.append( tmp ) 109 return (data, comments)
110 111
112 -def test1():
113 d = {} 114 d['test'] = 'george' 115 d['fred'] = 'rrr;;#=' 116 d['mt'] = '' 117 118 o = parse(concoct(d) + " # comment") 119 120 assert o['test'] == 'george' 121 # print "o['fred']=", o['fred'] 122 assert o['fred'] == 'rrr;;#=' 123 assert o['mt'] == ''
124 125
126 -class writer(gpk_writer.writer):
127 __doc__ = """Write a file in a=v; format. 128 """ 129
130 - def comment(self, comment):
131 """Add a comment to the data file.""" 132 self.fd.write("# %s\n" % comment)
133
134 - def header(self, k, v):
135 """NOTE: this is not a fully general function. 136 Keys may not have spaces or equals signs in them. 137 Note also that reading of headers from a=v; files 138 is not supported. 139 """ 140 self.fd.write("# %s = %s\n" % (k, fwd(v)))
141
142 - def __init__(self, fd):
143 gpk_writer.writer.__init__(self, fd)
144
145 - def datum(self, data_item):
146 self.fd.writelines( [ concoct(data_item), '\n'] )
147 148 149
150 -def read_hdc(fd, sep=';', line=None):
151 """This emulates fiatio.read(). 152 @return: (header, data, comments) where data=[{a:v, ...}, ...] and comments=[str, ...] 153 @rtype: tuple(dict, list(dict), list(str)) 154 """ 155 d, c = read(fd, sep=sep, line=line) 156 if d: 157 h = d[0].copy() 158 else: 159 h = {} 160 for datum in d: 161 for (k, v) in h.items(): 162 if k in datum and datum[k]!=v: 163 del h[k] 164 hk = h.keys() 165 for datum in d: 166 for k in hk: 167 del d[k] 168 return (h, d, c)
169 170 171
172 -def test2():
173 import StringIO 174 fd = StringIO.StringIO() 175 w = writer(fd) 176 w.comment('foo') 177 w.datum( {'a':'z', 'q':'u'} ) 178 w.close() 179 fd.seek(0) 180 d, c = read(fd) 181 assert c == ['foo'] 182 assert len(d) == 1 183 assert d[0]['a'] == 'z' 184 assert d[0]['q'] == 'u'
185 186 187 if __name__ == '__main__' : 188 assert back(fwd('hello \n!@;==')) == 'hello \n!@;==' 189 test1() 190 test2() 191 print "OK: passed tests" 192 # print concoct({'x': "a=b;"}) 193