Page MenuHomec4science

stats.py
No OneTemporary

File Metadata

Created
Thu, Jun 6, 14:18

stats.py

def tscov(x, y=None, method='cor', dm='constant'):
"""
Compute auto- or cross-correlation or covariance function estimates of
a time series, following Shumway & Stoffer 2011.
x,y: 1D arrays. If y is not given, compute autocorrelation/autocovariance.
method: normalization method:
"cor" - normalizes the sequence so that the correlation at zero lag is 1.
This is the default.
"cov" - covariance (no normalisation)
dm: detrending method to be used, if 'linear' the linear trend
is removed, if 'constant' only the mean is removed (default).
RETURNS:
C: requested function
NOTES:
The zero lag is at position x.size // 2.
A positive (negative) lag means that the x lags y (y lags x). In other words
the lag is x with respect to y.
If autocovariance or autocorrelation is requested, only positive lags
are returned C(-h) = C(h)
Note that differently from the reference, we do not consider lags longer
than x.size // 2
"""
import numpy as np
a = np.float64(np.atleast_1d(x)).copy()
# Check size
if np.size(a.shape)==1:
N = a.size
else:
raise ValueError('xycov accepts only 1D vectors.')
# We do not want length-1 arrays
assert N>1
if y is None:
b = a
else:
b = np.float64(np.atleast_1d(y)).copy()
# B must be a vector
assert np.size(b.shape)==1
assert (method in ('cov','cor'))
if (a.shape!=b.shape):
Na = a.shape
Nb = b.shape
if Na!=Nb:
raise ValueError('The input arrays must have the same length.')
# Detrend
from scipy.signal import detrend
a = detrend(a, type=dm)
b = detrend(b, type=dm)
g = np.correlate(a, b, mode="same") / N
if y is None:
g = g[N//2:]
if method == "cov":
g *= a.std() * b.std()
else:
g /= a.std() * b.std()
return g
def tsave(x):
"""
Compute mean of a time series, with its standard error (based on covariance)
Equations 1.32 and 1.33 of Shumway & Stoffer 2011.
x: 1D array.
"""
import numpy as np
a = np.float64(np.atleast_1d(x))
# Check size
if np.size(a.shape)==1:
N = a.size
else:
raise ValueError('xycov accepts only 1D vectors.')
# stderr
# we have only the positive half of the autocovariance
if (N % 2) == 0:
w = np.ones(N//2)
w -= np.arange(0, N/2) / N
else:
w = np.ones(N//2 + 1)
w -= np.arange(0, N//2 + 1) / N
stderr = w * tscov(a, method="cov")
stderr = np.sqrt((stderr[0] + 2 * np.sum(stderr[1:])) / N)
return a.mean(), stderr
def xcorr(a, b=None, method='cor', alpha=0.05, err='gauss', dm='linear'):
"""
xcorr Cross-correlation function estimates.
a,b: 1D or 2D arrays, the data must be structured with each series in a row,
and data in each series in columns.
If b is not given, compute autocorrelation.
method: normalization method:
cor - normalizes the sequence so that the correlations at
zero lag are identically 1.0.
cov - covariance (no normalisation)
alpha: significance level for confidence intervals
err: choose the method for error computation, either 'gauss' (Default) or an
integer number, interpreted as the number of samples for random phase
bootstrapping, or 'none' for no error estimate.
dm: detrending method to be used, if 'linear' (default) the linear trend
is removed, otherwise if 'constant' only the mean is removed
RETURNS:
C: cross-correlation
cl,cu: lower/upper confidence intervals at given alpha
returned only if method='cor' and err='gauss'
sl: significance level for the given alpha at each lag,
computed via random-phase bootstrapping, returned only
if method='cor' and err is an integer number
lags: vector of lag indices, positive lags mean that b lags a.
"""
import numpy as np
import pyfftw
from scipy.signal import detrend
from pickle import dump, load
from os.path import isfile, expanduser
pyfftw.interfaces.cache.enable()
# load wisdom if it exists
cachefile = "{:s}/.cache/pyFFTW_wisdom".format(expanduser("~"))
if isfile(cachefile):
with open(cachefile, 'rb') as fwis:
try:
pyfftw.import_wisdom(load(fwis))
except EOFError:
print("Invalid wisdom file.")
a = np.float64(np.atleast_2d(a))
# Check size
if np.size(a.shape)==1:
M = a.size
N = 1
elif np.size(a.shape)==2:
N,M = a.shape
else:
raise ValueError('xcorr accepts only vectors or 2D matrices.')
# We do not want length-1 arrays
assert M>1
if b is None:
b = a
b = np.float64(np.atleast_2d(b))
# Be must be a vector
assert np.size(b.shape)<=2
assert (method in ('cov','cor'))
if (a.shape!=b.shape):
Na,Ma = a.shape; Nb,Mb = b.shape
if Na!=Nb:
raise ValueError('The input arrays must share the same'
'first dimension.')
M = max(Ma,Mb)
if Ma<M:
tmp = np.zeros((N,M))
tmp[:,:Ma] = a
a = tmp
elif Mb<M:
tmp = np.zeros((N,M))
tmp[:,:Mb] = b
b = tmp
# Compute cross correlation
a = detrend(a, type=dm) # Defaults to last axis
b = detrend(b, type=dm)
Mfft = np.int(2**np.ceil(np.log2(M)))
afft = pyfftw.empty_aligned(a.shape, dtype='float64')
afft[:] = a
bfft = pyfftw.empty_aligned(b.shape, dtype='float64')
bfft[:] = b
# we store the ffts because we may reuse it later
fftobj_a = pyfftw.builders.fft(afft, n=Mfft)
fftobj_b = pyfftw.builders.fft(bfft, n=Mfft)
fftb = fftobj_b()
# Compute correlation (fft defaults to last axis)
c = np.conj(fftobj_a()) * fftb
cfft = pyfftw.empty_aligned(c.shape, dtype='complex128')
cfft[:] = c
fftobj_c = pyfftw.builders.ifft(cfft)
c = np.real(fftobj_c())
c = np.fft.fftshift(c, axes=1)
if method=='cor':
# in this case, we normalise the correlation function
c /= (a.std(axis=1)*b.std(axis=1)*M)[:,np.newaxis]
if err=='none':
return c, np.arange(Mfft)-Mfft/2
elif err=='gauss':
from scipy.stats import norm
# pag. 253 Emery Thomson
Zr = 0.5*(np.log(1+c)-np.log(1-c))
sz = (M-3)**-0.5
rn = norm()
Zu = Zr + rn.ppf(alpha/2)*sz
cu = (np.exp(2*Zu)-1)/(1+np.exp(2*Zu))
Zl = Zr + rn.ppf(1-alpha/2)*sz
cl = (np.exp(2*Zl)-1)/(1+np.exp(2*Zl))
return c,cl,cu, np.arange(Mfft)-Mfft/2
elif np.dtype(type(err)).kind=='i':
# Ebisuzaki, JCLim 1997
if err > 1000:
from multiprocessing import cpu_count
nthr = cpu_count() - 1
else:
nthr = 1
np.random.seed(1982)
fftobj_a = pyfftw.builders.fft(afft)
ffta = fftobj_a()
# random phase
fr = pyfftw.empty_aligned((err,N,M), dtype='complex128')
fr[:, :, 0] = 0.0
fr[:,:,1:] = np.abs(ffta[:, 1:])*\
np.exp(1j*np.random.uniform(high=2*np.pi, size=(err,N,M-1)))
if M%2==0:
fr[:,:,M//2] = 2**0.5*np.abs(ffta[:,M//2])*\
np.cos(np.random.uniform(high=2*np.pi, size=(err,N)))
else:
fr[:,:,(M-1)//2:(M+1)//2+1] = 2**0.5 * \
np.abs(ffta[:,(M-1)//2:(M+1)//2+1]) * \
np.cos(np.random.uniform(high=2*np.pi, size=(err,N,2)))
# inverse transform
fftobj_r = pyfftw.builders.ifft(fr, threads=nthr)
# Compute correlation of new random phase arrays
r = np.real(fftobj_r())
rfft = pyfftw.empty_aligned(r.shape, dtype='float64')
rfft[:] = r
fftobj_r = pyfftw.builders.fft(rfft, n=Mfft, threads=nthr)
cr = np.conj(fftobj_r()) * fftb
cfft = pyfftw.empty_aligned(cr.shape, dtype='complex128')
cfft[:] = cr
fftobj_c = pyfftw.builders.ifft(cfft, threads=nthr)
cr = np.fft.fftshift(np.real(fftobj_c()), axes=2)
# compute statistics
cr /= r.std(axis=2)[:,:,np.newaxis]
cr /= (b.std(axis=1)*M)[:,np.newaxis]
# fftw wisdom
with open(cachefile, 'wb') as fwis:
dump(pyfftw.export_wisdom(), fwis)
return c, \
np.percentile(cr, 0.5*alpha*100, axis=0), \
np.percentile(cr, (1-0.5*alpha)*100, axis=0), \
np.arange(Mfft)-Mfft/2
else:
raise ValueError('Error method can be "gauss", "none" '
'or an integer for bootstrapping.')
else:
return c, np.arange(Mfft)-Mfft/2
def eof_compute(A, V=None, numpc=False, svd=True,
detrend=True, norm=False):
"""Compute Empirical Orthogonal Functions
The method is described in Legler, BAMS, vol 64, 1983
computing eigenvalues and eigenvectors of covariance matrix
A: Array of (n,m), with n the "temporal" axis and m the
"spatial" axis, along which EOF's are identified.
V: If passed, it must be an array with the same shape as A.
A and V will then be interpreted as two components of a
vector, for computing EOFs of velocity vectors.
numpc: Controls the number of EOF/PC's to extract. If greater
than 1, it is interpreted simply as the number of EOF's
which are returned. If between 0 and 1, it is interpreted
as the fraction of the total variance which must be (at
least) explained by the EOF's which are returned by the
function. By default, it is False, meaning that all EOF's
are returned.
svd: If True (default), use Singular Value Decomposition
method, otherwise solve the eigenproblem directly
(slightly slower).
detrend: Detrend the time series before compuing the EOF's
(default: True).
norm: Normalise the time series before computing the EOF's
(default: False).
Returns:
s: Relative amount of total variance explained by each mode.
V: Array containing the modes, with shape (m, num_modes).
coef: Array of the (time) coefficients enabling the reconstruction
of the input time series with shape (n, num_modes).
"""
from numpy import argsort, asarray, conj, cumsum, diag, dot, linalg, \
real, where
from scipy.signal import detrend
A = asarray(A)
if (A.ndim < 1) or (A.ndim > 2):
raise ValueError("A can only be 1D or 2D")
if V is not None:
if V.shape != A.shape:
raise ValueError("Shape mismatch between A and V.")
A = A + V * 1j
if (numpc > 1) & ((numpc%1) != 0):
raise ValueError("numpc must be integer > 0 "
"or float 0 < numpc < 1")
A -= A.mean(axis=0)
if detrend:
A = detrend(A, axis=0)
if norm:
A /= A.std(axis=0)
if svd:
# Use SVD method
coef, s, V = linalg.svd(A, full_matrices = False)
coef = dot(coef, diag(s))
# normalise to be consistent with the definition of covariance
s *= s / (A.shape[0] - 1)
s /= s.sum()
V = V.T
else:
s, V = linalg.eig(dot(conj(A.T), A) / (A.shape[0] - 1))
s = s[:A.shape[0]]
V = V[:, :A.shape[0]]
ids = argsort(s)[::-1]
s = real(s[ids])
s /= s.sum()
V = V[:, ids]
coef = dot(A, conj(V))
if numpc >= 1:
return s[:numpc], V[:, :numpc], coef[:, :numpc]
elif 0 < numpc < 1:
totvar = cumsum(s)
totvar /= totvar[-1]
ind = where(totvar>=numpc)[0][0]
ind = min(ind, s.size - 1)
return s[:ind+1], V[:, :ind+1], coef[:, :ind+1]
else:
return s, V, coef
def AR1_spectrum(f, lag1_corr, fN=None):
"""Returns the spectrum of the AR1 process with lag-1 correclation
lag1_corr, at frequencies f.
The optional argument fN is the Nyquist frequency, which is taken to
be the maximum frequency in f if not given."""
from numpy import cos, pi
lc12 = lag1_corr * lag1_corr
if fN is None:
fN = f.max()
psd = (1 - lc12) / (1 - 2 * lag1_corr * cos(f/fN * pi) + lc12)
return psd
def AR1_siglev(f, dof, lag1_corr, siglev=0.99, fN=None):
"""Returns the significance level for the spectrum of the equivalent
AR1 process.
"""
from scipy.stats import chi2
chisquare = chi2.ppf(siglev, dof) / dof
return AR1_spectrum(f, lag1_corr, fN) * chisquare
def variogram(array, mask=None, degree=2):
"""
Compute variogram of a 2D array over a square grid.
The array can have masked values or nans.
array: the array to pass
mask: the mask, same shape as array, identifying the bad data
If none, it will be estimated from nans (if present)
degree: degree of the structure function, by default 2.
"""
from numpy import atleast_2d, isfinite, meshgrid, zeros, uint8
from numpy import asarray, vstack, arange
from scipy.special import binom
from .variogram import cvar
array = atleast_2d(array)
(ia, ja) = array.shape
if mask is None:
try:
mask = array.mask
except AttributeError:
mask = ~isfinite(array)
else:
assert mask.shape == (ia, ja)
npt = int(binom(array[~mask].size, 2))
dist = zeros(npt)
varg = zeros(npt)
iind, jind = meshgrid(arange(ja, dtype="int32"), arange(ia, dtype="int32"))
indices = asarray(vstack((iind[~mask], jind[~mask])))
marray = array[~mask]
mask = uint8(mask)
cvar(ia, ja, marray.size,
marray, degree, indices,
dist, varg)
return dist, varg
def binned_variogram(array, bin_def, mask=None, degree=2, absolute=False):
"""
Compute variogram of a 2D array over a square grid.
The array can have masked values or nans.
array: the array to pass
bin_def: an array-like of size 3, defining minimum value, maximum
value and bin size. Bin intervals are defined as closed
to the left and open to the right. Points beyond the range
defined in bin_def are not taken into account. If the range
is not a multiple of the bin size, an error is returned.
mask: the mask, same shape as array, identifying the bad data
If none, it will be estimated from nans (if present)
degree: degree of the structure function, by default 2.
absolute: use absolute value before raising to "degree" in the structure
function (default: False). If absolute is True, the
"generalised structure function" is computed: |\Delta x|^degree
"""
from numpy import atleast_2d, isfinite, meshgrid, zeros, uint8
from numpy import asarray, vstack, arange, linspace
from multiprocessing import cpu_count
from .variogram import bin_cvar
array = atleast_2d(array)
(ia, ja) = array.shape
if mask is None:
try:
mask = array.mask
except AttributeError:
mask = ~isfinite(array)
else:
assert mask.shape == (ia, ja)
bin_def = asarray(bin_def).ravel()
if (bin_def.size != 3) or (bin_def[1] <= bin_def[0]):
raise ValueError("Wrong bin definition, must be: (min, max, dx).")
if ((bin_def[1] - bin_def[0]) % bin_def[2]) > 1e-14:
raise ValueError("Wrong bin definition: the range must be a multiple of the bin size.")
nbins = int((bin_def[1] - bin_def[0]) / bin_def[2])
ncpu = cpu_count()
dist = zeros((nbins, ncpu))
varg = zeros((nbins, ncpu))
count = zeros((nbins, ncpu), dtype="int64")
iind, jind = meshgrid(arange(ja, dtype="int32"), arange(ia, dtype="int32"))
indices = asarray(vstack((iind[~mask], jind[~mask])))
marray = array[~mask]
mask = uint8(mask)
bin_cvar(ia, ja, marray.size,
marray, degree, indices,
bin_def,
dist, varg, count,
absolute)
count = count.sum(axis=1)
dist = dist.sum(axis=1) / count
varg = varg.sum(axis=1) / count
return dist, varg, count

Event Timeline