Page MenuHomec4science

measure.py
No OneTemporary

File Metadata

Created
Tue, Mar 4, 06:40

measure.py

import math
import numpy
import statistics
import scipy
import scipy.optimize
def error_variance(*errs):
if len(errs) == 1:
return math.fabs(*errs)
out = 0
for err in errs:
out += err**2
return math.sqrt(out)
def error_abs(*errs):
out = 0
for err in errs:
out += math.fabs(err)
return out
class Measure:
def __init__(self, v, e = 0.0, sig_digit = 3, **args):
self.v = float(v)
self.e = math.fabs(float(e))
self.sig_digit = sig_digit
self.scient = False
# format
if "format" in args:
self.format = args["format"] # !!! no control
elif "scient" in args and \
isinstance(args["scient"], bool) and \
args["scient"]:
self.scient = True
self.format = "%%.%d%s" % (sig_digit, 'e')
else:
self.format = "%%.%d%s" % (sig_digit, 'g')
# std deviation, !! no control
self.errop = error_variance
if "stddev" in args:
self.errop = args["stddev"]
def __str__(self):
return ("%s +- %s" % (self.format, self.format)) % (self.v, self.e)
def __tuple__(self):
return (self.v, self.e)
def __iadd__(self, m):
if isinstance(m, Measure):
self.v += m.v
self.e = self.errop(self.e, m.e)
else:
self.v += m
return self
def __isub__(self, m):
if isinstance(m, Measure):
self.v -= m.v
self.e = self.errop(self.e, m.e)
else:
self.v -= m
return self
def __imul__(self, m):
if isinstance(m, Measure):
self.e = self.errop(self.v * m.e, self.e * m.v)
self.v *= m.v
else:
self.v *= m
self.e = self.errop(self.e * m)
return self
def __itruediv__(self, m):
if isinstance(m, Measure):
self.e = self.errop(self.e / m.v, self.v * m.e / m.v**2)
self.v /= m.v
else:
self.v /= m
self.e = self.errop(self.e / m)
return self
def __radd__(self, k):
return Measure(self.v + k, self.e, self.sig_digit, format = self.format)
def __rsub__(self, k):
return Measure(k - self.v, self.e, self.sig_digit, format = self.format)
def __rmul__(self, k):
return Measure(self.v * k, self.errop(self.e * k), self.sig_digit, format = self.format)
def __rtruediv__(self, k):
return Measure(k / self.v, self.errop(k * self.e / self.v**2), self.sig_digit, format = self.format)
def __add__(self, m):
if not isinstance(m, Measure):
return self.__radd__(m)
return Measure(self.v + m.v, self.errop(self.e, m.e), min(self.sig_digit, m.sig_digit), scient = self.scient)
def __sub__(self, m):
if not isinstance(m, Measure):
return Measure(self.v - m, self.e, self.sig_digit, scient = self.scient)
return Measure(self.v - m.v, self.errop(self.e, m.e), min(self.sig_digit, m.sig_digit), scient = self.scient)
def __mul__(self, m):
if not isinstance(m, Measure):
return self.__rmul__(m)
return Measure(self.v * m.v, self.errop(self.v * m.e, self.e * m.v), min(self.sig_digit, m.sig_digit), scient = self.scient)
def __truediv__(self, m):
if not isinstance(m, Measure):
return Measure(self.v / m, self.errop(self.e / m), self.sig_digit, scient = self.scient)
return Measure(self.v / m.v, self.errop(self.e / m.v, self.v * m.e / m.v**2), min(self.sig_digit, m.sig_digit), scient = self.scient)
def __pow__(self, r):
if not isinstance(r, int) and not isinstance(r, float):
raise TypeError("Measure: pow is only for int or float args")
err = r * self.v**(r-1) * self.e
return Measure(self.v**r, self.errop(err), self.sig_digit, scient = self.scient)
def __neg__(self):
return Measure(-self.v, self.e, self.sig_digit, scient = self.scient)
def __eq__(self, m):
if isinstance(m, Measure):
return m.v == self.v and m.e == self.e
return self.v == m
def __ne__(self, m):
if isinstance(m, Measure):
return m.v != self.v or m.e != self.e
return self.v != m
def __lt__(self, m):
if isinstance(m, Measure):
return self.v < m.v
return self.v < m
def __le__(self, m):
if isinstance(m, Measure):
return self.v <= m.v
return self.v <= m
def __gt__(self, m):
if isinstance(m, Measure):
return self.v > m.v
return self.v > m
def __ge__(self, m):
if isinstance(m, Measure):
return self.v >= m.v
return self.v >= m
def square(self):
return Measure(self.v**2, self.errop(2 * self.v * self.e), self.sig_digit, scient = self.scient)
def sqrt(self):
return Measure(math.sqrt(self.v), self.e / (2 * math.sqrt(self.v)), self.sig_digit, scient = self.scient)
def abs(self):
return Measure(math.fabs(self.v), self.e, self.sig_digit, scient = self.scient)
def exp(self):
ex = math.exp(self.v)
return Measure(ex, self.errop(ex * self.e), self.sig_digit, scient = self.scient)
def log(self):
return Measure(math.log(self.v), self.errop(self.e / self.v), self.sig_digit, scient = self.scient)
def sin(self):
return Measure(math.sin(self.v), self.errop(math.cos(self.v) * self.e), self.sig_digit, scient = self.scient)
def cos(self):
return Measure(math.cos(self.v), self.errop(math.sin(self.v) * self.e), self.sig_digit, scient = self.scient)
def __float__(self):
return self.v
def extr(self, coeff = 1.0):
return (self.v - self.e * coeff, self.v + self.e * coeff)
#
# List wrapper
#
class Array:
def __init__(self, arglist):
self.list = arglist
def __getitem__(self, index):
return self.list[index]
def append(self, obj):
self.list.append(obj)
#
# Utils
#
# print a line into stream
def print_line(stream, *measures, **kargs):
M = []
for m in measures:
M.extend((m.format % m.v, m.format % m.e))
print(*M, file=stream, sep=" ", end="\n")
# give list of proper max and min, see Measure.extr
def extremums(measures, coeff = 1.0):
down = []
up = []
for m in measures:
e = m.extr(coeff)
down.append(e[0])
up.append(e[1])
return (tuple(up), tuple(down))
def values(measures):
v = []
for m in measures:
v.append(m.v)
return tuple(v)
# generate a list of measures
def genMeasures(values, error, sig_digit = 3):
out = []
for value in values:
out.append(Measure(value, error, sig_digit))
return out
# generate a measure basing on min and max
def minMaxMeasure(m, M):
return Measure((float(M) + float(m))/2, (float(M) - float(m))/2)
# return the maximal error
def maxError(measures):
M = 0
for m in measures:
if (m.e > M):
M = m.e
return M
# TODO, don't remember
def computeRange(y, Y, x, X):
dy = Y - y
dx = (X - x).v
rm = (dy.v-dy.e)/dx
rp = (dy.v+dy.e)/dx
return ( (rm, Y.v - Y.e - rm * x.v), (rp, Y.v + Y.e - rp * x.v) )
#
# analysis instruments
#
def polyfit(X, Y, deg = 1, **args):
coeff = 1
if "coeff" in args:
coeff = args["coeff"]
del args["coeff"]
sig_digit = 8
if "sig_digit" in args and isinstance(args["sig_digit"], int):
sig_digit = args["sig_digit"]
del args["sig_digit"]
extX = extremums(X, coeff)
extY = extremums(Y, coeff)
res = [None] * 4
# x down, y down
res[0] = numpy.polyfit(extX[0], extY[0], deg)
# x down, y up
res[1] = numpy.polyfit(extX[0], extY[1], deg)
# x up, y down
res[2] = numpy.polyfit(extX[1], extY[0], deg)
# x up, y up
res[3] = numpy.polyfit(extX[1], extY[1], deg)
(centered, cov) = (None, None)
try:
(centered, cov) = numpy.polyfit(values(X), values(Y), deg, None, False, None, True)
except:
centered = numpy.polyfit(values(X), values(Y), deg)
# compute the nearest coefficients
coeff = []
for i in range(deg + 1):
sample = (res[0][i], res[1][i], res[2][i], res[3][i])
stddev = statistics.variance(sample)
if cov is not None:
stddev += cov[i, i]
coeff.append(Measure( \
centered[i], \
math.sqrt(stddev), \
sig_digit \
))
return coeff
# fit a set of data
# basing on a general expression
# expr = expr(x, ...)
# where ... stands for the set of coefficients to deduce
def fit(X, Y, expr, p0 = None, **args):
coeff = 1
if "coeff" in args:
coeff = args["coeff"]
del args["coeff"]
sig_digit = 8
if "sig_digit" in args and isinstance(args["sig_digit"], int):
sig_digit = args["sig_digit"]
del args["sig_digit"]
extX = extremums(X, coeff)
extY = extremums(Y, coeff)
res = [None] * 4
# x down, y down
res[0] = scipy.optimize.curve_fit(expr, extX[0], extY[0], p0, **args)[0]
# x down, y up
res[1] = scipy.optimize.curve_fit(expr, extX[0], extY[1], p0, **args)[0]
# x up, y down
res[2] = scipy.optimize.curve_fit(expr, extX[1], extY[0], p0, **args)[0]
# x up, y up
res[3] = scipy.optimize.curve_fit(expr, extX[1], extY[1], p0, **args)[0]
# centered solution
centered, cov = scipy.optimize.curve_fit(expr, values(X), values(Y), p0, **args)
# compute the nearest coefficients
coeff = []
for i in range(len(res[0])):
sample = (res[0][i], res[1][i], res[2][i], res[3][i], centered[i])
coeff.append(Measure( \
centered[i], \
math.sqrt(statistics.variance(sample) + cov[i,i]), \
sig_digit \
))
return coeff
"""
def fit(X, Y, expr, p0 = None):
_X = values(X)
_Y = values(Y)
(res, pcov) = scipy.optimize.curve_fit(expr, _X, _Y, p0)
var = maxError(X)**2 + + maxError(Y)**2
out = []
for i in range(len(res)):
out.append(Measure(res[i], \
#math.sqrt(pcov[i][i] + var), \
math.sqrt(var), \
8))
return out
"""
def rawfit(X, Y, expr, p0 = None):
res = scipy.optimize.curve_fit(expr, X, Y, p0)[0]
return genMeasures(res, 0, 8)
#
# file loading helpers
#
def splitline(line, sep):
out = (line[:-1]).split(sep)
flag = False
while(not flag):
i = 0
flag = False
while(i < len(out)):
if (out[i] == ''):
del out[i]
flag = True
else:
i += 1
return out
class Visit:
def __init__(self, visitmap):
self.visitmap = visitmap
if callable in visitmap:
self.callable = True
def __call__(self, entry, *args, **kargs):
if (type(entry) in self.visitmap) :
return self.visitmap[type(entry)](entry, *args, **kargs)
elif self.callable and callable(entry):
return self.visitmap[callable](entry, *args, **kargs)
class Line:
def __init__(self, line, sep):
# store original line
self.rawline = line
# split line
self.splitted = splitline(line, sep)
def __getitem__(self, index):
try:
return self.splitted[index]
except:
print("Error")
print("Involved line:", self.rawline)
print("Index out of bound:", index, ", Size of container:", len(self.splitted))
return None;
def is_comment(self):
return self.rawline == "" or (len(self.splitted) == 0) or self.rawline.startswith('#')
def parse(self, columns, col_visit, uncertainty, unc_visit, sigdigit, sigd_visit, transform, transf_visit):
buff = []
for index in range(len(columns)):
m = Measure(col_visit(columns, self, index), unc_visit(uncertainty, self, index), sigd_visit(sigdigit, index))
if transform is not None:
m = transf_visit(m)
buff.append(m)
return buff
class FirstLine(Line):
def __init__(self, line, sep):
Line.__init__(self, line, sep)
# verify whether the elements are all indexes or not
self.all_indexes = True
for elem in self.splitted:
try:
elem = float(elem) # try to convert to float
except:
pass # nothing happens
finally:
self.all_indexes = False
def indicise(self, keylist):
for i in range(len(keylist)):
key = keylist[i]
if isinstance(key, str):
if key in self.splitted:
keylist[i] = self.splitted.index(key)
else:
raise Exception("Invalid argument: no key was found to match the input line to match", key)
elif isinstance(key, int):
pass
else:
keylist[i] = int(key) # try to convert it to an integer index
return keylist
#
# Load a specified columns
# filename: input file name which you load the data
# *columns: name or index of the column containing the data to be loaded (!! indexes start from 0)
# **kargs:
# - uncertainty =
# - str: column which contains the uncertainty
# - float/int: constant uncertainty
# - tuple/list: apply the incertainty to the corresponding column in order of passed argument depending on the elements' type
# - transform = apply a functor once the data are loaded
# - functor: apply to all data
# - tuple/list: apply the functor to the corresponding column in order of passed argument (!! the array size must correspond, theelements must be functors)
# - sigdigit = significant digit needed to construct a Measure once the data are loaded
# - int: apply to all data
# - tuple/list: apply the significant digits to the corresponding column in order of passed argument (!! the array size must correspond, elements must be integers)
# - verbose = say everything is happening loading values (!! it must be convertible to a boolean)
# - sep = column separator character (!! it must be convertible to char)
#
def loadMeasures(filename, *columns, **kargs):
if len(columns) == 0:
raise Exception("Invalid argument: entering an empty column set")
# initialize column visitor
col_visit = Visit({ int : lambda data, line, index: line[data], \
list : lambda data, line, index: line[data[index]], \
tuple : lambda data, line, index: line[data[index]] })
# determine uncertainty
uncertainty = 0.0
if "uncertainty" in kargs:
uncertainty = kargs["uncertainty"]
unc_visit = Visit({int : lambda data, line, index: line[data], \
float : lambda data, line, index: data, \
list : lambda data, line, index: line[data[index]], \
tuple : lambda data, line, index: line[data[index]] })
# determine transform
transform = None # identity transformation
if "transform" in kargs:
transform = kargs["transform"]
transf_visit = Visit({callable : lambda T, m, index: T(m), \
list : lambda T, m, index: T[index](m), \
tuple : lambda T, m, index: T[index](m) })
# determine significant digits
sigdigit = 3
if "sigdigit" in kargs:
sigdigit = kargs["sigdigit"]
sigd_visit = Visit({ int : lambda value, index: value, \
list : lambda value, index: int(value[index]), \
tuple : lambda value, index: int(value[index]) })
# enable/disable verbose mode
verbose = False
if "verbose" in kargs:
verbose = bool(kargs["verbose"])
# determine column separator character
sep = ' '
if "sep" in kargs:
sep = str(kargs["sep"])
if verbose:
print("Opening file:", filename)
# open file
f = open(filename, 'r')
# skip first comment lines
firstline = FirstLine(f.readline(), sep)
while firstline.is_comment():
firstline = FirstLine(f.readline(), sep) # get next line
if verbose:
print("Processing first line:", firstline.rawline)
# indicise inputs
columns = firstline.indicise(list(columns))
if verbose:
print("Reading columns:", columns)
if len(columns) < 2:
columns = columns[0]
if isinstance(uncertainty, list) or isinstance(uncertainty, tuple):
if len(uncertainty) != len(columns):
raise Exception("Invalid argument uncertainty: columns and uncertainty lists must correspond in size")
else:
uncertainty = firstline.indicise(list(uncertainty))
# create output list
out = []
for i in range(len(columns)):
out.append([])
for rawline in f:
if verbose:
print("Loading line:", rawline)
line = Line(rawline, sep)
if line.is_comment():
continue # ignore
buff = line.parse(columns, col_visit, uncertainty, unc_visit, sigdigit, sigd_visit, transform, transf_visit)
for i in range(len(buff)):
out[i].append(buff[i])
f.close()
return tuple(out)
def _loadMeasures(filename, values, errors = 0.0, gain = 1.0, offset = 0.0, verbose = False, **args):
f = open(filename, 'r')
sig_digit = 3
if "sig_digit" in args and isinstance(args["sig_digit"], int):
sig_digit = int(args["sig_digit"])
del args["sig_digit"]
firstline = None
if isinstance(values, str):
firstline = splitline(f.readline())
values = firstline.index(values)
errStr = False
if isinstance(errors, str):
if firstline is None:
firstline = splitline(f.readline())
errors = firstline.index(errors)
errStr = True
gainStr = False
if isinstance(gain, str):
if firstline is None:
firstline = splitline(f.readline())
gain = firstline.index(gain)
gainStr = True
offStr = False
if isinstance(offset, str):
if firstline is None:
firstline = splitline(f.readline())
offset = firstline.index(offset)
offStr = True
out = []
if verbose:
print("File %s: loading columns (%d, %d)" % (filename, values, errors))
for line in f:
l = splitline(line)
if line.startswith('#'):
continue # ignore
try:
# evaluate error
err = 0
if (errStr):
err = float(l[errors])
else:
err = errors
# evaluate gain scaling
g = 1.0
if (gainStr):
g = float(l[gain])
else:
g = gain
# evaluate offset
off = 1.0
if (offStr):
off = float(l[offset])
else:
off = offset
if values < len(l):
out.append(Measure(l[values], err, sig_digit, **args) * g + off)
except ValueError:
print("Could not parse line", line, "Ignoring it")
f.close()
return out

Event Timeline