diff --git a/stats.py b/stats.py index c27d5be..db85af8 100644 --- a/stats.py +++ b/stats.py @@ -1,490 +1,490 @@ def tscov(x, y=None, method='cor', dm='constant'): """ Compute auto- or cross-correlation or covariance function estimates of a time series, following Shumway & Stoffer 2011. x,y: 1D arrays. If y is not given, compute autocorrelation/autocovariance. method: normalization method: "cor" - normalizes the sequence so that the correlation at zero lag is 1. This is the default. "cov" - covariance (no normalisation) dm: detrending method to be used, if 'linear' the linear trend is removed, if 'constant' only the mean is removed (default). RETURNS: C: requested function NOTES: The zero lag is at position x.size // 2. A positive (negative) lag means that the x lags y (y lags x). In other words the lag is x with respect to y. If autocovariance or autocorrelation is requested, only positive lags are returned C(-h) = C(h) Note that differently from the reference, we do not consider lags longer than x.size // 2 """ import numpy as np a = np.float64(np.atleast_1d(x)).copy() # Check size if np.size(a.shape)==1: N = a.size else: raise ValueError('xycov accepts only 1D vectors.') # We do not want length-1 arrays assert N>1 if y is None: b = a else: b = np.float64(np.atleast_1d(y)).copy() # B must be a vector assert np.size(b.shape)==1 assert (method in ('cov','cor')) if (a.shape!=b.shape): Na = a.shape Nb = b.shape if Na!=Nb: raise ValueError('The input arrays must have the same length.') # Detrend from scipy.signal import detrend a = detrend(a, type=dm) b = detrend(b, type=dm) g = np.correlate(a, b, mode="same") / N if y is None: g = g[N//2:] if method == "cov": g *= a.std() * b.std() else: g /= a.std() * b.std() return g def tsave(x): """ Compute mean of a time series, with its standard error (based on covariance) Equations 1.32 and 1.33 of Shumway & Stoffer 2011. x: 1D array. """ import numpy as np a = np.float64(np.atleast_1d(x)) # Check size if np.size(a.shape)==1: N = a.size else: raise ValueError('xycov accepts only 1D vectors.') # stderr # we have only the positive half of the autocovariance if (N % 2) == 0: w = np.ones(N//2) w -= np.arange(0, N/2) / N else: w = np.ones(N//2 + 1) w -= np.arange(0, N//2 + 1) / N stderr = w * tscov(a, method="cov") stderr = np.sqrt((stderr[0] + 2 * np.sum(stderr[1:])) / N) return a.mean(), stderr def xcorr(a, b=None, method='cor', alpha=0.05, err='gauss', dm='linear'): """ xcorr Cross-correlation function estimates. a,b: 1D or 2D arrays, the data must be structured with each series in a row, and data in each series in columns. If b is not given, compute autocorrelation. method: normalization method: cor - normalizes the sequence so that the correlations at zero lag are identically 1.0. cov - covariance (no normalisation) alpha: significance level for confidence intervals err: choose the method for error computation, either 'gauss' (Default) or an integer number, interpreted as the number of samples for random phase bootstrapping, or 'none' for no error estimate. dm: detrending method to be used, if 'linear' (default) the linear trend is removed, otherwise if 'constant' only the mean is removed RETURNS: C: cross-correlation cl,cu: lower/upper confidence intervals at given alpha returned only if method='cor' and err='gauss' sl: significance level for the given alpha at each lag, computed via random-phase bootstrapping, returned only if method='cor' and err is an integer number lags: vector of lag indices, positive lags mean that b lags a. """ import numpy as np import pyfftw from scipy.signal import detrend from pickle import dump, load from os.path import isfile, expanduser pyfftw.interfaces.cache.enable() # load wisdom if it exists cachefile = "{:s}/.cache/pyFFTW_wisdom".format(expanduser("~")) if isfile(cachefile): with open(cachefile, 'rb') as fwis: try: pyfftw.import_wisdom(load(fwis)) except EOFError: print("Invalid wisdom file.") a = np.float64(np.atleast_2d(a)) # Check size if np.size(a.shape)==1: M = a.size N = 1 elif np.size(a.shape)==2: N,M = a.shape else: raise ValueError('xcorr accepts only vectors or 2D matrices.') # We do not want length-1 arrays assert M>1 if b is None: b = a b = np.float64(np.atleast_2d(b)) # Be must be a vector assert np.size(b.shape)<=2 assert (method in ('cov','cor')) if (a.shape!=b.shape): Na,Ma = a.shape; Nb,Mb = b.shape if Na!=Nb: raise ValueError('The input arrays must share the same' 'first dimension.') M = max(Ma,Mb) if Ma 1000: from multiprocessing import cpu_count nthr = cpu_count() - 1 else: nthr = 1 np.random.seed(1982) fftobj_a = pyfftw.builders.fft(afft) ffta = fftobj_a() # random phase fr = pyfftw.empty_aligned((err,N,M), dtype='complex128') fr[:, :, 0] = 0.0 fr[:,:,1:] = np.abs(ffta[:, 1:])*\ np.exp(1j*np.random.uniform(high=2*np.pi, size=(err,N,M-1))) if M%2==0: fr[:,:,M//2] = 2**0.5*np.abs(ffta[:,M//2])*\ np.cos(np.random.uniform(high=2*np.pi, size=(err,N))) else: fr[:,:,(M-1)//2:(M+1)//2+1] = 2**0.5 * \ np.abs(ffta[:,(M-1)//2:(M+1)//2+1]) * \ np.cos(np.random.uniform(high=2*np.pi, size=(err,N,2))) # inverse transform fftobj_r = pyfftw.builders.ifft(fr, threads=nthr) # Compute correlation of new random phase arrays r = np.real(fftobj_r()) rfft = pyfftw.empty_aligned(r.shape, dtype='float64') rfft[:] = r fftobj_r = pyfftw.builders.fft(rfft, n=Mfft, threads=nthr) cr = np.conj(fftobj_r()) * fftb cfft = pyfftw.empty_aligned(cr.shape, dtype='complex128') cfft[:] = cr fftobj_c = pyfftw.builders.ifft(cfft, threads=nthr) cr = np.fft.fftshift(np.real(fftobj_c()), axes=2) # compute statistics cr /= r.std(axis=2)[:,:,np.newaxis] cr /= (b.std(axis=1)*M)[:,np.newaxis] # fftw wisdom with open(cachefile, 'wb') as fwis: dump(pyfftw.export_wisdom(), fwis) return c, \ np.percentile(cr, 0.5*alpha*100, axis=0), \ np.percentile(cr, (1-0.5*alpha)*100, axis=0), \ np.arange(Mfft)-Mfft/2 else: raise ValueError('Error method can be "gauss", "none" ' 'or an integer for bootstrapping.') else: return c, np.arange(Mfft)-Mfft/2 def eof_compute(A, V=None, numpc=False, svd=True, detrend=True, norm=False): """Compute Empirical Orthogonal Functions The method is described in Legler, BAMS, vol 64, 1983 computing eigenvalues and eigenvectors of covariance matrix A: Array of (n,m), with n the "temporal" axis and m the "spatial" axis, along which EOF's are identified. V: If passed, it must be an array with the same shape as A. A and V will then be interpreted as two components of a vector, for computing EOFs of velocity vectors. numpc: Controls the number of EOF/PC's to extract. If greater than 1, it is interpreted simply as the number of EOF's which are returned. If between 0 and 1, it is interpreted as the fraction of the total variance which must be (at least) explained by the EOF's which are returned by the function. By default, it is False, meaning that all EOF's are returned. svd: If True (default), use Singular Value Decomposition method, otherwise solve the eigenproblem directly (slightly slower). detrend: Detrend the time series before compuing the EOF's (default: True). norm: Normalise the time series before computing the EOF's (default: False). Returns: s: Relative amount of total variance explained by each mode. V: Array containing the modes, with shape (m, num_modes). coef: Array of the (time) coefficients enabling the reconstruction of the input time series with shape (n, num_modes). """ from numpy import argsort, asarray, conj, cumsum, diag, dot, linalg, \ real, where from scipy.signal import detrend A = asarray(A) if (A.ndim < 1) or (A.ndim > 2): raise ValueError("A can only be 1D or 2D") if V is not None: if V.shape != A.shape: raise ValueError("Shape mismatch between A and V.") A = A + V * 1j if (numpc > 1) & ((numpc%1) != 0): raise ValueError("numpc must be integer > 0 " "or float 0 < numpc < 1") A -= A.mean(axis=0) if detrend: A = detrend(A, axis=0) if norm: A /= A.std(axis=0) if svd: # Use SVD method coef, s, V = linalg.svd(A, full_matrices = False) coef = dot(coef, diag(s)) # normalise to be consistent with the definition of covariance s *= s / (A.shape[0] - 1) s /= s.sum() V = V.T else: s, V = linalg.eig(dot(conj(A.T), A) / (A.shape[0] - 1)) s = s[:A.shape[0]] V = V[:, :A.shape[0]] ids = argsort(s)[::-1] s = real(s[ids]) s /= s.sum() V = V[:, ids] coef = dot(A, conj(V)) if numpc >= 1: return s[:numpc], V[:, :numpc], coef[:, :numpc] elif 0 < numpc < 1: totvar = cumsum(s) totvar /= totvar[-1] ind = where(totvar>=numpc)[0][0] ind = min(ind, s.size - 1) return s[:ind+1], V[:, :ind+1], coef[:, :ind+1] else: return s, V, coef def AR1_spectrum(f, lag1_corr, fN=None): """Returns the spectrum of the AR1 process with lag-1 correclation lag1_corr, at frequencies f. The optional argument fN is the Nyquist frequency, which is taken to be the maximum frequency in f if not given.""" from numpy import cos, pi lc12 = lag1_corr * lag1_corr if fN is None: fN = f.max() psd = (1 - lc12) / (1 - 2 * lag1_corr * cos(f/fN * pi) + lc12) return psd def AR1_siglev(f, dof, lag1_corr, siglev=0.99, fN=None): """Returns the significance level for the spectrum of the equivalent AR1 process. """ from scipy.stats import chi2 chisquare = chi2.ppf(siglev, dof) / dof return AR1_spectrum(f, lag1_corr, fN) * chisquare def variogram(array, mask=None, degree=2): """ Compute variogram of a 2D array over a square grid. The array can have masked values or nans. array: the array to pass mask: the mask, same shape as array, identifying the bad data If none, it will be estimated from nans (if present) degree: degree of the structure function, by default 2. """ from numpy import atleast_2d, isfinite, meshgrid, zeros, uint8 from numpy import asarray, vstack, arange from scipy.special import binom from .variogram import cvar array = atleast_2d(array) (ia, ja) = array.shape if mask is None: try: mask = array.mask except AttributeError: mask = ~isfinite(array) else: assert mask.shape == (ia, ja) npt = int(binom(array[~mask].size, 2)) dist = zeros(npt) varg = zeros(npt) iind, jind = meshgrid(arange(ja, dtype="int32"), arange(ia, dtype="int32")) indices = asarray(vstack((iind[~mask], jind[~mask]))) marray = array[~mask] mask = uint8(mask) cvar(ia, ja, marray.size, marray, degree, indices, dist, varg) return dist, varg def binned_variogram(array, bin_def, mask=None, degree=2, absolute=False): """ Compute variogram of a 2D array over a square grid. The array can have masked values or nans. array: the array to pass bin_def: an array-like of size 3, defining minimum value, maximum value and bin size. Bin intervals are defined as closed to the left and open to the right. Points beyond the range defined in bin_def are not taken into account. If the range is not a multiple of the bin size, an error is returned. mask: the mask, same shape as array, identifying the bad data If none, it will be estimated from nans (if present) degree: degree of the structure function, by default 2. absolute: use absolute value before raising to "degree" in the structure function (default: False). If absolute is True, the "generalised structure function" is computed: |\Delta x|^degree """ from numpy import atleast_2d, isfinite, meshgrid, zeros, uint8 from numpy import asarray, vstack, arange, linspace from multiprocessing import cpu_count from .variogram import bin_cvar array = atleast_2d(array) (ia, ja) = array.shape if mask is None: try: mask = array.mask except AttributeError: mask = ~isfinite(array) else: assert mask.shape == (ia, ja) bin_def = asarray(bin_def).ravel() if (bin_def.size != 3) or (bin_def[1] <= bin_def[0]): raise ValueError("Wrong bin definition, must be: (min, max, dx).") if (((bin_def[1] - bin_def[0]) / bin_def[2]) % 1) > 1e-14: raise ValueError("Wrong bin definition: the range must be a multiple of the bin size.") nbins = int((bin_def[1] - bin_def[0]) / bin_def[2]) - ncpu = cpu_count() + ncpu = cpu_count() - 1 dist = zeros((nbins, ncpu)) varg = zeros((nbins, ncpu)) count = zeros((nbins, ncpu), dtype="int64") iind, jind = meshgrid(arange(ja, dtype="int32"), arange(ia, dtype="int32")) indices = asarray(vstack((iind[~mask], jind[~mask]))) marray = array[~mask] mask = uint8(mask) bin_cvar(ia, ja, marray.size, marray, degree, indices, bin_def, dist, varg, count, - absolute) + absolute, ncpu) count = count.sum(axis=1) dist = dist.sum(axis=1) / count varg = varg.sum(axis=1) / count return dist, varg, count \ No newline at end of file diff --git a/variogram.pyx b/variogram.pyx index e43c15d..d131378 100644 --- a/variogram.pyx +++ b/variogram.pyx @@ -1,97 +1,99 @@ # distutils: extra_compile_args = -fopenmp # distutils: extra_link_args = -fopenmp from cython cimport wraparound, boundscheck, cdivision from libc.math cimport pow, sqrt, fabs from libc.stdio cimport printf from cython.parallel cimport prange, threadid from openmp cimport omp_get_num_threads import numpy as np cimport numpy as np @wraparound(False) @boundscheck(False) @cdivision(True) def cvar(int ia, int ja, int npt, double[:] array, double degree, int[:, :] indices, double[:] dist, double[:] varg): cdef long n, n1, n2 cdef int ii, jj, i, j cdef double idist, jdist n = 0 for n1 in range(npt - 1): for n2 in range(n1 + 1, npt): i = indices[0, n1] j = indices[1, n1] ii = indices[0, n2] jj = indices[1, n2] idist = ii - i jdist = jj - j dist[n] = sqrt(idist*idist + jdist*jdist) varg[n] = 0.5 * pow(array[n1] - array[n2], degree) n += 1 @wraparound(False) @boundscheck(False) @cdivision(True) def bin_cvar(int ia, int ja, int npt, double[:] array, double degree, int[:, :] indices, double[:] bin_def, double[:, :] meand, double[:, :] varg, long[:, :] count, - bint absolute): + bint absolute, int ncpu): cdef long n1, n2 - for n1 in prange(npt - 1, nogil=True, schedule="static"): + for n1 in prange(npt - 1, nogil=True, + schedule="guided", chunksize=50, + num_threads=ncpu): for n2 in range(n1 + 1, npt): compute_sf(indices[0, n1], indices[1, n1], indices[0, n2], indices[1, n2], array[n1], array[n2], degree, absolute, bin_def, count, meand, varg) @wraparound(False) @boundscheck(False) @cdivision(True) cdef void compute_sf(int i, int j, int ii, int jj, double a1, double a2, double degree, bint absolute, double[:] bin_def, long[:, :] count, double[:, :] meand, double[:, :] varg) nogil: cdef int ni, nt cdef double idist, jdist, dist nt = threadid() idist = ii - i jdist = jj - j dist = sqrt(idist*idist + jdist*jdist) if (dist < bin_def[0]) or (dist >= bin_def[1]): pass else: ni = int((dist - bin_def[0]) / bin_def[2]) count[ni, nt] += 1 meand[ni, nt] += dist if absolute: varg[ni, nt] += 0.5 * pow(fabs(a1 - a2), degree) else: varg[ni, nt] += 0.5 * pow(a1 - a2, degree)