Package gmisclib :: Module cache
[frames] | no frames]

Source Code for Module gmisclib.cache

  1   
  2  import os 
  3  import sys 
  4  import stat 
  5  import types 
  6  import atexit 
  7  import time 
  8  # import atexit 
  9  import random 
 10  import hashlib 
 11  import cPickle 
 12  import threading 
 13  from gyropy import g_mailbox 
 14   
 15  import die 
 16  import gpkmisc 
 17   
 18  DEBUG = 0 
 19   
 20  UnpicklingError = cPickle.UnpicklingError 
 21   
22 -class CannotGetStat(OSError):
23 - def __init__(self, *s):
24 OSError.__init__(self, *s)
25
26 -def cachepath(f, tail='', root=''):
27 """OBSOLETE: 28 Return a pathname suitable for cacheing some result. 29 @return: C{(path_to_root,path_with_tail)}. C{Path_to_root} is/will be a directory; 30 C{path_with_tail} is a path to a data file within that directory. 31 Normally, the actual cache is at the location 32 C{os.path.join(path_to_root,path_with_tail)} on the disk; 33 that is what you would pass as the C{fname} argument to L{load_cache} or 34 L{dump_cache}. 35 @rtype: C{tuple(str, str)} 36 @param f: An arbitrary key, could be a pathname or a tuple of information about a file. 37 @type f: often L{str}, but could be anything convertible to a L{str} via L{repr}. 38 @param tail: something to add at the end of the constructed path. 39 @type tail: str or None 40 @raise ValueError: if C{suffix_to_del} is specified and C{f} doesn't end that way. 41 """ 42 if len(f)==0: 43 raise ValueError, "Need to specify cache info" 44 hf = hashlib.md5(repr(f)).hexdigest() 45 d1 = hf[:2] 46 d2 = hf[2:4] 47 d3 = hf[4:6] 48 fc = hf[6:] 49 r = os.path.join(root, d1, d2, d3) 50 # p = os.path.join(r, fc) + tail 51 return (r, "%s%s" % (fc, tail))
52 53
54 -def fileinfo(fname, *other):
55 """Collect enough information about a file to determine whether or 56 not the cache can be used. 57 """ 58 if fname is None: 59 return None 60 try: 61 s = os.stat(fname) 62 except (IOError, OSError), ex: 63 raise CannotGetStat("path=%s" % fname, *(ex.args)) 64 return (os.path.normpath(fname), 65 # This is OK, as long as access patterns are stable, but it's clearly 66 # possible to make the cacheing inefficient by accessing the same file 67 # in different ways. Using os.path.abspath() might be better. 68 s[stat.ST_INO], # This seems to be the same on different machines 69 # that NFS mount the same file. 70 # s[stat.ST_DEV], # Not a good idea: this differs from machine to machine 71 # when accessing the same file. 72 s[stat.ST_SIZE], s[stat.ST_MTIME]) + other
73 74
75 -def modFileInfo(fname, *other):
76 """Collect enough information about a file to determine whether or 77 not the cache can be used. 78 """ 79 if fname is None: 80 return None 81 try: 82 s = os.stat(fname) 83 except (IOError, OSError), ex: 84 raise CannotGetStat("path=%s" % fname, *(ex.args)) 85 # This is tuned to match .pyc files that may be identical, even if they 86 # were installed at different times. It's a bit weak, but the alternative 87 # (i.e. including inode # or mtime) fails if you are trying to share computation 88 # across a cluster. 89 return (os.path.abspath(fname), s[stat.ST_SIZE]) + other
90 91
92 -class BadFileFormat(Exception):
93 - def __init__(self, *s):
94 Exception.__init__(self, *s)
95 96 97
98 -def modinfo(m, seen):
99 if not isinstance(m, tuple): 100 m = (m,) 101 for mm in m: 102 try: 103 modinfo_guts(mm, seen) 104 except (IOError, OSError), ex: 105 raise CannotGetStat('path: %s' % getattr(m, '__file__', '???'), *(ex.args)) 106 return tuple(sorted(seen.values()))
107 108 109 _modinfocache = {} #: We cache module info because they are normally not reloaded during the run of a program.
110 -def modinfo_guts(m, seen):
111 assert isinstance(m, types.ModuleType) 112 if m.__name__ in seen: 113 return 114 try: 115 seen[m.__name__] = _modinfocache[m.__name__] 116 except KeyError: 117 try: 118 tmp = modFileInfo(m.__file__) 119 except AttributeError: 120 try: 121 tmp = (m.__name__, len(m.__dict__)) 122 except AttributeError: 123 tmp = m.__name__ 124 seen[m.__name__] = tmp 125 _modinfocache[m.__name__] = tmp 126 127 for o in m.__dict__.values(): 128 if isinstance(o, types.ModuleType): 129 modinfo_guts(o, seen)
130 131
132 -def namedModInfo(nm):
133 if not isinstance(nm, tuple): 134 nm = (nm,) 135 s = {} 136 return tuple( [ modinfo(sys.modules[q], s) for q in nm ] )
137 138
139 -def _bg_dumper(mb):
140 while True: 141 try: 142 ci, e = mb.get() 143 except g_mailbox.EOF: 144 break 145 ci.dump(e)
146
147 -def _bg_cleaner(mb):
148 while True: 149 try: 150 ci, avoid = mb.get() 151 except g_mailbox.EOF: 152 break 153 ci.makespace(avoid)
154 155
156 -def _shutdown():
157 cache_info._dump.putclose() 158 cache_info._clean.putclose() 159 cache_info._dumpthread.join() 160 cache_info._cleanthread.join()
161 162 atexit.register(_shutdown) 163 164 # def _stop(): 165 # DumpBox.putclose() 166 # CleanBox.putclose() 167 168 # atexit.register(_stop) 169 170
171 -def _dodel(path, avoid, when, n):
172 try: 173 dx = os.listdir(path) 174 except OSError: 175 return 0 176 ldx = len(dx) 177 n *= ldx 178 rv = 0 179 empty = True 180 for d in random.sample(dx, ldx): 181 p = os.path.join(path, d) 182 if p == avoid: 183 empty = False 184 continue 185 try: 186 s = os.stat(p) 187 except OSError: 188 continue 189 if stat.S_ISDIR(s.st_mode): 190 # Whoops. There's a directory here. 191 # Give up at this level and work downwards instead. 192 return rv + _dodel(p, avoid, when, n) 193 if s.st_mtime > when or n < 1: 194 empty = False 195 continue 196 try: 197 os.remove(p) 198 rv += 1 199 except OSError: 200 pass 201 # The directory may be empty now, so we'll delete it too. 202 if empty: 203 try: 204 os.rmdir(path) 205 except OSError: 206 pass 207 return rv
208 209
210 -class cache_info(object):
211 """This class manages a disk cache of arbitrary objects. 212 It first constructs a unique name, based on information that you give, 213 then you can C{dump} data to that path, or C{load} data from that path. 214 An attempt to C{load} data will either succeed or raise an exception; 215 an attempt to C{dump} will either succeed or silently fail. 216 217 Typical use:: 218 219 def cached_f(parameters): 220 ci = cache_info(info=tuple(parameters)) 221 if ci is not None: 222 try: 223 return ci.load() 224 except (BadFileFormat, IOError, OSError): 225 pass 226 o = f(parameters) 227 if ci is not None: 228 ci.dump(o) 229 return o 230 231 @note: This class assumes that the results it is cacheing are generated by a 232 function C{f(parameters)}. You need to be careful to give all of the 233 relevant parameters to C{cache_info}, otherwise you can get the wrong 234 results back. For instance, if you have five parameters and you forget 235 to give C{parameters[2]} to C{cache_info}, it will happily store 236 values obtained with all different values of C{parameters[2]} in the same 237 slot, and when you later call C{load}, you'll get whatever you asked for, 238 even if it is not what you wanted. 239 """ 240 Age = 864000 241 NumObj = 10000 242 243 _dump = g_mailbox.mailbox() 244 _dumpthread = threading.Thread(target=_bg_dumper, args=(_dump,), name='cache_dumper') 245 _dumpthread.daemon = True 246 _dumpthread.start() 247 _clean = g_mailbox.mailbox() 248 _cleanthread = threading.Thread(target=_bg_cleaner, args=(_clean,), name='cache_cleaner') 249 _cleanthread.daemon = True 250 _cleanthread.start() 251
252 - def __init__(self, root, info=(), fname=None, modname=None, mod=None):
253 """ 254 @param info: This is where you specify the parameters from which the cached value 255 can be computed. It is essentially a look-up key for the value. 256 @type info: tuple(anything) 257 @param fname: You can specify that the cached value depends on the contents of 258 a file (in addition to other parameters). See L{fileinfo} for details. 259 @type fname: str 260 @type modname: str or tuple(str) 261 @param modname: You can specify that the value depends on a module (or a list of modules). 262 (You give the names of the modules here.) In which case, it tries 263 to detect changes to the specified modules. See L{namedModInfo} for details. 264 You use this argument to protect youself against changes to the code 265 used to compute the cached value. Obviously, you don't want to L{load} 266 a value from last weeks, buggy implementation. 267 @param modname: You can specify that the value depends on a module (or a list of modules). 268 (You give the module itself here.) In which case, it tries 269 to detect changes to the specified modules. See L{modinfo} for details. 270 @type mod: L{module} or L{tuple}(L{module}). 271 @note: Certain compromises were made in the handling of C{modname} and C{mod}. 272 Even if you use them, you are not 100% guaranteed to be protected from 273 all changes to the code used to compute the cached values. To be 274 entirely safe, you should manually clear the cache when ever you 275 change your code. However, this will probably save your tail if 276 you forget to clear the cache. See L{modinfo} for details. 277 @note: If there is an error when reading files 278 (i.e. if C{fname}, C{modname}, or C{mod} is specified), 279 then the object will be constructed with C{info=None}. 280 This will lead to a L{OSError} if you then call L{load} on the object, 281 which is what you'd get from a cache miss. 282 Calling L{dump} or L{bg_dump} will silently do nothing. 283 """ 284 if info is None: 285 raise ValueError, "info=None. Is this the result of a previous constructor failure?" 286 self.info = info 287 try: 288 if fname is not None: 289 self.info += fileinfo(fname) 290 if modname is not None: 291 self.info += namedModInfo(modname) 292 if mod is not None: 293 self.info += modinfo(mod, {}) 294 except CannotGetStat, ex: 295 die.warn("Error in getting cache info for %s" % str(ex)) 296 self.info = None 297 self.root = root 298 self.tail = '.pickle' 299 self._dname = None #: Pathname info, computed only when needed 300 self._fpath = None #: Pathname info, computed only when needed
301
302 - def __repr__(self):
303 return "<cache_info %s %s %s>" % (self.root, self.info, self.tail)
304
305 - def copy(self):
306 if self.info is None: 307 return None 308 return cache_info(self.root, info=self.info)
309 310
311 - def addinfo(self, *s, **kv):
312 """ 313 @note: This does I{not} modify self! It creates a new object. 314 """ 315 if self.info is None: 316 return None 317 return cache_info(root=self.root, info=self.info+s, 318 fname=kv.get('fname', None), 319 modname=kv.get('modname', None), 320 mod=kv.get('mod', None) 321 )
322 323
324 - def makespace(self, avoid):
325 n = 0 326 p = 0 327 when = time.time() - self.Age 328 while n<2 and p<3: 329 n += _dodel(self.root, avoid, when, 1.0/self.NumObj) 330 p += 1 331 if DEBUG and n > 0: 332 die.info("Deleted %d files from the cache %s" % (n, self.root))
333 334
335 - def dump(self, e):
336 """Cache some data on the disk. 337 @rtype: could be anything picklable. 338 @return: whatever was passed as C{e}. 339 @param e: the data to write. 340 @type e: anything picklable. 341 @note: This function might quietly fail. Since this is a cache, failure to write 342 is not considered a major problem. In my experience, failure to write is 343 often caused by intermittent network problems, and you don't want it to crash 344 a long-running computation. 345 """ 346 if self.info is None: 347 return 348 li = len(self.info) 349 assert li>0 350 i1 = self.info[:(1+li)//2] 351 i2 = self.info[(li-1)//2:] 352 dname, fpath = self.cachepath() 353 fd = None 354 try: 355 gpkmisc.makedirs(dname) 356 fd = open(fpath, 'w') 357 # We are assuming that the pickel protocol is sequential, 358 # and that disk writes are sequential, so that if the two 359 # ends of the dump are correct (i1 and i2) then the stuff 360 # in between will be correct. 361 cPickle.dump((i1, e, i2), fd, protocol=cPickle.HIGHEST_PROTOCOL) 362 # os.rename(ftmp, fpath) 363 # self.makespace(fpath) 364 except (OSError, IOError, cPickle.PicklingError), ex: 365 die.warn("cache:dump: %s on %s" % (ex, fpath)) 366 try: 367 os.remove(fpath) 368 except (IOError, OSError): 369 pass 370 finally: 371 if fd is not None: 372 try: 373 fd.close() 374 except IOError, ex: 375 die.warn("Dump to cache failed on fd.close(): %s" % ex) 376 self._clean.put((self, fpath)) 377 return e
378 379
380 - def bg_dump(self, e):
381 if self.info is None: 382 return 383 if len(self.info)==0: 384 raise ValueError, "Need to specify cache info" 385 # t = threading.Thread(target=self.dump, args=(e,), name='cache_dumper%s' % id(e)) 386 # t.start() 387 self._dump.put((self, e))
388 389
390 - def load(self):
391 """Pull in some data from the disk. 392 @rtype: could be anything picklable. 393 @return: whatever was cached on disk. 394 @raise BadFileFormat: when the data isn't valid. 395 @raise OSError: on cache miss. 396 @raise IOError: e.g. network problems. 397 """ 398 if self.info is None: 399 raise IOError, "cache_path.load: does not know what file to open, as info=None" 400 dname, fpath = self.cachepath() 401 fd = open(fpath, 'r') 402 x = cPickle.load(fd) 403 del fd 404 li = len(self.info) 405 assert li>0 406 if(isinstance(x, tuple) and len(x)==3 407 and len(x[0])>0 and len(x[2])>0 and len(x[0])+len(x[2])==li+1 408 and x[0][-1]==x[2][0] 409 and x[0]==self.info[:(1+li)//2] and x[2]==self.info[(li-1)//2:] 410 ): 411 if DEBUG: 412 die.info("Cache hit: %s" % fpath) 413 return x[1] 414 die.info("Cache fail: %s" % fpath) 415 # Cache fail should catch cases of file truncation and data corruption, 416 # even if the cPickle.load call doesn't. 417 raise BadFileFormat, "Cache fail %s" % fpath
418 419
420 - def cachepath(self):
421 """Return a pathname suitable for cacheing some result. 422 @return: C{(path_to_root,path_with_tail)}. C{Path_to_root} is/will be a directory; 423 C{path_with_tail} is a path to a data file within that directory. 424 Normally, the actual cache is at the location 425 C{os.path.join(path_to_root,path_with_tail)} on the disk; 426 that is what you would pass as the C{fname} argument to L{load_cache} or 427 L{dump_cache}. 428 @rtype: C{tuple(str, str)} 429 @raise ValueError: if you haven't specified any C{info} yet. 430 """ 431 if self._dname is not None: 432 return (self._dname, self._fpath) 433 if len(self.info)==0: 434 raise ValueError, "Need to specify cache info" 435 hf = hashlib.md5(repr(self.info)).hexdigest() 436 d1 = hf[:2] 437 d2 = hf[2:4] 438 d3 = hf[4:6] 439 fc = hf[6:] 440 self._dname = os.path.join(self.root, d1, d2, d3) 441 self._fpath = os.path.join(self._dname, fc) + self.tail 442 return (self._dname, self._fpath)
443 444 445 Errors = (IOError, EOFError, UnpicklingError, BadFileFormat)
446 447 448
449 -def walkcache(top):
450 """This is to help humans read the cache. 451 """ 452 for (dirpath, dirnames, filenames) in os.walk(top): 453 for fn in filenames: 454 p = os.path.join(dirpath, fn) 455 try: 456 x = cPickle.load(open(p, 'r')) 457 except cPickle.UnpicklingError, ex: 458 print '#', p, str(ex) 459 continue 460 if(isinstance(x, tuple) and len(x)==3 461 and len(x[0])>0 and len(x[2])>0 462 and x[0][-1]==x[2][0] 463 ): 464 print p, x[0][:-1] + x[2] 465 else: 466 print '#', p, '???'
467 468
469 -def test_errs():
470 cache_info('/tmp', fname="/u/ajdn/wlkenql/q!!")
471
472 -def test_normal():
473 import shutil 474 import g_exec 475 N = 500 476 tmp = '/tmp/ci_cache_py_test' 477 cache_info.NumObj = N 478 cache_info.Age = 1.0 479 for i in range(2*N): 480 cache_info(tmp, info=(i,0), mod=die).bg_dump(i%97) 481 # if i%100 == 99: 482 # die.info("filling: i=%d" % i) 483 nmiss = 0 484 nhit = 0 485 for i in range(2*N-N//10,2*N): 486 try: 487 x = cache_info(tmp, info=(i,0), mod=die).load() 488 assert x == (i%97) 489 nhit += 1 490 except IOError: 491 nmiss += 1 492 print 'BG dump done' 493 for i in range(2*N): 494 cache_info(tmp, info=(i,1), mod=die).dump(i%197) 495 # if i%100 == 99: 496 # die.info("filling: i=%d" % i) 497 for i in range(2*N-N//10,2*N): 498 try: 499 x = cache_info(tmp, info=(i,1), mod=die).load() 500 assert x == (i%197) 501 nhit += 1 502 except IOError: 503 nmiss += 1 504 die.info("Cache: misses=%d hits=%d" % (nmiss, nhit)) 505 assert nmiss < nhit//10 506 n = len(list(g_exec.getiter_raw(None, ["find", tmp, "-type", "f", "-name", "*.pickle"]))) 507 die.info("Number of files=%d" % n) 508 assert 0.3*N < n < 3*N 509 shutil.rmtree(tmp);
510
511 -def test():
512 test_normal() 513 test_errs()
514 515 if __name__ == '__main__': 516 test() 517 # import sys 518 # walkcache(sys.argv[1]) 519