Package classifiers :: Module sim_cen_gauss_class
[frames] | no frames]

Source Code for Module classifiers.sim_cen_gauss_class

  1  #!/usr/bin/env python 
  2   
  3  """Simple centered Gaussian Classifier. 
  4  It's designed for a case where there is a lot of data and 
  5  where the data is a multivariate Gaussian. 
  6  """ 
  7   
  8   
  9  from gmisclib import die 
 10  from gmisclib import Num 
 11  from gmisclib import dictops 
 12  from gmisclib import fiatio 
 13  from gmisclib import gpk_writer 
 14  from g_classifiers import q_classifier_r as QCR 
 15   
 16  COVERAGE = 3 
 17  FTEST = 0.25 
 18  Fudge = 1e-6 
 19  PSYCO = False 
 20   
21 -def logdet(cov):
22 """@param cov: a 2-dimensional matrix 23 @type cov: numpy.ndarray 24 @return: the log of the determinant of a matrix. 25 @rtype: float 26 """ 27 q = Num.LA.eigvalsh(cov) 28 assert Num.alltrue(q > 0) 29 return Num.sum(Num.log(q))
30 31
32 -def typ_trace(covlist):
33 """This is used with boost_diag(). It returns a typical 34 value for the the trace of the covariance matrix, 35 averaged over all matrices. 36 @param covlist: a dictionary mapping names to covariance matrices. 37 @type covlist: dict(str:numpy.ndarray) 38 @return: trace of covariance 39 @rtype: float 40 """ 41 tr = 0.0 42 dim = None 43 for (k, cov) in covlist.items(): 44 tr += Num.trace(cov)/cov.shape[0] 45 if dim is None: 46 dim = cov.shape[0] 47 else: 48 assert cov.shape[0] == dim 49 tr /= len(covlist) 50 return tr
51 52
53 -def boost_diag(covlist, diagboost):
54 """This raises all the eigenvalues a little bit, 55 so that processing can proceed even if the covariance 56 is singular. To first order, the fudge gets subtracted 57 later on in the processing. 58 @param covlist: a dictionary mapping names to covariance matrices. 59 @type covlist: dict(str:numpy.ndarray) 60 @param diagboost: how much to boost 61 @type diagboost: float 62 """ 63 for c in covlist.values(): 64 trm = Num.identity(c.shape[0])*diagboost 65 Num.add(c, trm, c)
66 67 68 69
70 -def go_auto(fd, Mu=False, coverage=COVERAGE, ftest=FTEST, cb=None, cfd_file = None):
71 """ 72 @param cb: a function that builds the classifier. E.g. n_zero_cov_builder. 73 @type cb: list(QCR.classifier) 74 @param Mu: should the class means be allowed to be nonzero? 75 Not here; Mu must be false. 76 @type Mu: bool 77 @param cfd_file: Pathname where the definition of the classifier should 78 be written. None => don't write. 79 @type cfd_file: str 80 """ 81 assert not Mu 82 d = QCR.read_data(fd) 83 if cfd_file == None: 84 classout = gpk_writer.null_writer() 85 else: 86 classout = fiatio.writer(open(cfd_file, 'w')) 87 summary, out, wrong = QCR.compute_self_class(d, coverage=coverage, ftest=ftest, 88 n_per_dim = 0, 89 modelchoice = None, 90 builder=cb, 91 classout=classout) 92 QCR.default_writer(summary, out, classout, wrong)
93 94 95
96 -def _do_test(d, coverage=COVERAGE, ftest=FTEST, cb=None):
97 classout = gpk_writer.null_writer() 98 summary, out, wrong = QCR.compute_self_class(d, coverage=coverage, ftest=ftest, 99 n_per_dim = 0, 100 modelchoice = None, 101 builder=cb, 102 classout=classout) 103 print 'Summary=', summary 104 return summary
105 106 107
108 -def n_zero_cov_builder(d, N, modelchoice=None, trainingset_name=None, 109 diagboost=Fudge):
110 # ignore N 111 assert modelchoice is None 112 die.info('Mu_cov') 113 assert len(d) > 0, "No data!" 114 dim = d[0].value.shape[0] 115 covlist = dictops.dict_of_X( lambda v: Num.array(v, Num.Float, copy=True), 116 lambda acc, delta: Num.add(acc, delta, acc) 117 ) 118 nlist = dictops.dict_of_accums() 119 for x in d: 120 nlist.add(x.classid, 1) 121 covlist.add(x.classid, Num.outerproduct(x.value, x.value)) 122 123 for k in covlist.keys(): 124 # print 'k=', k 125 Num.divide(covlist[k], nlist[k], covlist[k]) 126 # print 'Eigenval:a:', Num.LA.eigvalsh(covlist[k]) 127 128 boost_diag(covlist, diagboost*typ_trace(covlist)) 129 130 # We can't subtract the two inverse covariances here, 131 # otherwise we mess up the calculation of logdet(). 132 133 models = {} 134 for k in covlist.keys(): 135 models[k] = QCR.qmodel(Num.zeros((dim,), Num.Float), 136 Num.LA.inv(covlist[k]), 137 -logdet(covlist[k]) 138 ) 139 140 141 die.info('Mu_cov done') 142 return [ QCR.classifier('quadratic_class_model', models, trainingset_name=trainingset_name, 143 uid=trainingset_name 144 ) 145 ]
146 147
148 -def _droplist1(dim):
149 return [ None ] + [ (i,) for i in range(dim) ]
150
151 -def _droplist2s(dim):
152 return [ None ] + [ (i,(i+1)%dim) for i in range(dim) ]
153
154 -def _droplist2x(dim):
155 return [ None ] + [ (i,j) for i in range(dim) for j in range(dim)]
156 157 158
159 -def n_zero_cov_builder_drop(d, N, modelchoice=None, trainingset_name=None, 160 diagboost=Fudge):
161 """Needs to return a list, not an iterator.""" 162 # ignore N 163 assert modelchoice is None 164 die.info('Mu_cov') 165 assert len(d) > 0, "No data!" 166 dim = d[0].value.shape[0] 167 168 def _drop(v, droplist): 169 tmp = Num.array(v, copy=True) 170 if droplist is not None: 171 for i in droplist: 172 tmp[i] = 0.0 173 return tmp
174 175 typ_tr = None 176 o = [] 177 global Droplist 178 for drop in Droplist(dim): 179 covlist = dictops.dict_of_X( lambda v: Num.array(v, Num.Float, copy=True), 180 lambda acc, delta: Num.add(acc, delta, acc) 181 ) 182 nlist = dictops.dict_of_accums() 183 for x in d: 184 nlist.add(x.classid, 1) 185 xvtmp = _drop(x.value, drop) 186 covlist.add(x.classid, Num.outerproduct(xvtmp, xvtmp)) 187 188 for k in covlist.keys(): 189 # print 'k=', k 190 Num.divide(covlist[k], nlist[k], covlist[k]) 191 # print 'Eigenval:a:', Num.LA.eigvalsh(covlist[k]) 192 193 if typ_tr is None: 194 # We need to boost the diagonal by the same amount 195 # for each different dropped component, otherwise 196 # the discriminants of the inverse covariance matrix 197 # will be affected in bad ways. 198 typ_tr = typ_trace(covlist) 199 boost_diag(covlist, diagboost*typ_tr) 200 201 # We can't subtract the two inverse covariances here, 202 # otherwise we mess up the calculation of logdet(). 203 204 models = {} 205 for k in covlist.keys(): 206 models[k] = QCR.qmodel(Num.zeros((dim,), Num.Float), 207 Num.LA.inv(covlist[k]), 208 -logdet(covlist[k]) 209 ) 210 211 212 tmp = QCR.classifier('quadratic_class_model', models, 213 trainingset_name=trainingset_name, 214 uid=hash((trainingset_name, drop)) 215 ) 216 tmp.add_info('drop_component', drop) 217 o.append( tmp ) 218 return o 219 220 221
222 -def n_mu_cov_builder(d, N, modelchoice=None, trainingset_name=None, 223 diagboost=Fudge):
224 # ignore N 225 assert modelchoice is None 226 die.info('Mu_cov') 227 dim = d[0].value.shape[0] 228 mulist = dictops.dict_of_X( lambda v: Num.array(v, Num.Float), 229 lambda acc, delta: Num.add(acc, delta, acc) 230 ) 231 covlist = dictops.dict_of_X( lambda v: Num.array(v, Num.Float), 232 lambda acc, delta: Num.add(acc, delta, acc) 233 ) 234 nlist = dictops.dict_of_accums() 235 for x in d: 236 nlist.add(x.classid, 1) 237 for x in d: 238 mulist.add(x.classid, x.value) 239 for k in mulist.keys(): 240 Num.divide(mulist[k], nlist[k], mulist[k]) 241 mdof = dim 242 die.info('Mu done') 243 for x in d: 244 tmp = x - mulist[x.classid] 245 covlist.add(x.classid, Num.outerproduct(tmp, tmp)) 246 for k in mulist.keys(): 247 # print 'k=', k 248 assert nlist[k] > mdof 249 Num.divide(covlist[k], nlist[k]-mdof, covlist[k]) 250 # print 'Eigenval:a:', Num.LA.eigvalsh(covlist[k]) 251 252 for k in mulist.keys(): 253 # print 'k=', k 254 Num.divide(covlist[k], nlist[k], covlist[k]) 255 # print 'Eigenval:a:', Num.LA.eigvalsh(covlist[k]) 256 257 boost_diag(covlist, diagboost) 258 259 260 models = {} 261 for k in mulist.keys(): 262 models[k] = QCR.qmodel(mulist[k], 263 Num.LA.inv(covlist[k]), 264 logdet(covlist[k]) 265 ) 266 267 268 die.info('Mu_cov done') 269 return [QCR.classifier('quadratic_class_model', models, trainingset_name=trainingset_name, 270 uid=trainingset_name 271 ) 272 ]
273 274 275
276 -def _test(dim=3, f=10, low=0.90, high=1.00, N=1000, u=False):
277 import math 278 if u: 279 tmp1 = Num.RA.standard_normal((dim,dim)) 280 umat1, s, vt = Num.LA.svd( Num.dot(tmp1, Num.transpose(tmp1)) ) 281 tmp2 = Num.RA.standard_normal((dim,dim)) 282 umat2, s, vt = Num.LA.svd( Num.dot(tmp2, Num.transpose(tmp2)) ) 283 d = [] 284 for i in range(N): 285 vector = Num.RA.standard_normal((dim,)) 286 if u: 287 vector = Num.dot(vector, umat1) 288 d.append( QCR.datum_tr(vector, 'a') ) 289 for i in range(N): 290 vector = Num.RA.standard_normal((dim,)) 291 Num.multiply(vector, 292 Num.exp((math.log(f)/dim)*(1+Num.arange(dim))), 293 vector) 294 if u: 295 vector = Num.dot(vector, umat2) 296 d.append( QCR.datum_tr(vector, 'b') ) 297 summary = _do_test(d, cb=Cb) 298 assert low <= summary['Pcorrect'] <= high
299 300
301 -def test1():
302 _test(dim=3, f=10, low=0.90, high=1.00) 303 _test(dim=3, f=0.1, low=0.90, high=1.00) 304 _test(dim=6, f=30, low=0.97, high=1.00) 305 _test(dim=3, f=1.0, low=0.30, high=0.70) 306 _test(dim=6, f=30, low=0.97, high=1.00, u=True) 307 _test(dim=9, f=10, low=0.97, high=1.00, u=True) 308 _test(dim=9, f=0.1, low=0.97, high=1.00, u=True) 309 _test(dim=2, f=1.0, low=0.30, high=0.70, u=True) 310 _test(dim=12, f=1.0, low=0.30, high=0.70, u=True) 311 _test(dim=1, f=2.0, low=0.60, high=0.85) 312 _test(dim=1, f=1.5, low=0.50, high=0.8) 313 _test(dim=1, f=2.5, low=0.60, high=0.85) 314 _test(dim=1, f=3.5, low=0.65, high=0.90) 315 _test(dim=1, f=5.0, low=0.75, high=0.96)
316 317 318 if __name__ == '__main__': 319 import sys 320 321 if PSYCO: 322 try: 323 import psyco 324 psyco.full() 325 except ImportError: 326 pass 327 328 arglist = sys.argv[1:] 329 Cb = n_zero_cov_builder 330 cfd_file = 'classified.fiat' 331 Droplist = _droplist1 332 while arglist and arglist[0].startswith('-'): 333 arg = arglist.pop(0) 334 if arg == '--': 335 break 336 elif arg == '-coverage': 337 COVERAGE = float(arglist.pop(0)) 338 elif arg == '-drop' or arg == '-drop1': 339 Droplist = _droplist1 340 Cb = n_zero_cov_builder_drop 341 elif arg == '-drop2s': 342 Droplist = _droplist2s 343 Cb = n_zero_cov_builder_drop 344 elif arg == '-drop2x': 345 Droplist = _droplist2x 346 Cb = n_zero_cov_builder_drop 347 elif arg == '-nodetails': 348 cfd_file = None 349 elif arg == '-test': 350 test1() 351 sys.exit(0) 352 elif arg == '-ftest': 353 FTEST = float(arglist.pop(0)) 354 assert 0.0 < FTEST < 1.0 355 else: 356 die.die('Unrecognized argument: %s' % arg) 357 go_auto(sys.stdin, coverage=COVERAGE, ftest=FTEST, cb=Cb, 358 cfd_file = cfd_file) 359