Source code for spectrum.mtm

"""Multitapering method.

This module provides a multi-tapering method for spectral estimation.
The computation of the tapering windows being computationally expensive, a C
module is used to compute the tapering window (see :func:`spectrum.mtm.dpss`).

The estimation itself can be performed with the :func:`spectrum.mtm.pmtm` function


"""
import sys
import platform
import os

import numpy as np
import ctypes
from ctypes import *
import ctypes as C
from ctypes import POINTER
import os
from os.path import join as pj
from spectrum.tools import nextpow2
from numpy.ctypeslib import load_library
from spectrum.psd import Spectrum

"""

from ctypes import *
my_sum=CDLL('sum.so')
a=numpy.array(range(10),dtype=float)
my_sum.sum(a.ctypes.data_as(c_void_p),int(10))


which you want to call from python. First make a shared object library by doing (at the command line)

gcc -c sum.c
gcc -shared -o sum.so sum.o

Note that on OSX -shared should be replaced by -dynamiclib and sum.so should be called sum.dylib Then you can do

"""

# Import shared mtspec library
if hasattr(sys, 'frozen'):
    p = os.path.abspath(os.path.dirname(sys.executable))
else:
    p = os.path.abspath(os.path.dirname(__file__))

lib_name = 'mydpss'
try:
    mtspeclib = load_library(lib_name, p)
except:
    print("Library %s not found" % lib_name)


[docs]class MultiTapering(Spectrum): """ See :func:`pmtm` for details """ def __init__(self, data, NW=None, k=None, NFFT=None, e=None, v=None, method="adapt", scale_by_freq=True, sampling=1): super(MultiTapering, self).__init__(data, sampling=sampling, NFFT=NFFT, scale_by_freq=scale_by_freq) self.NW = NW self.k = k self.e = e self.v = v self.method = method def __call__(self): Sk_complex, weights, eigenvalues = pmtm(self.data, self.NW, self.k, NFFT=self.NFFT, e=self.e, v=self.v, method=self.method, show=False) Sk = abs(Sk_complex)**2 if self.method == "adapt": Sk = Sk.transpose() Sk = np.mean(Sk * weights, axis=1) else: Sk = np.mean(Sk * weights, axis=0) self.Sk = Sk self.weights = weights self.eigenvalues = eigenvalues if self.datatype == "real": # see doc/concepts.rst for details if self.NFFT % 2 == 0: newpsd = self.Sk[0:int(self.NFFT/2 +1)] * 2 else: newpsd = self.Sk[0:int((self.NFFT+1)/2)] * 2 self.psd = newpsd else: self.psd = self.Sk return self def __str_title(self): return "Multitapering estimate\n" def __str__(self): return super(MultiTapering, self).__str__()
[docs]def pmtm(x, NW=None, k=None, NFFT=None, e=None, v=None, method='adapt', show=False): """Multitapering spectral estimation :param array x: the data :param float NW: The time half bandwidth parameter (typical values are 2.5,3,3.5,4). Must be provided otherwise the tapering windows and eigen values (outputs of dpss) must be provided :param int k: uses the first k Slepian sequences. If *k* is not provided, *k* is set to *NW*2*. :param NW: :param e: the window concentrations (eigenvalues) :param v: the matrix containing the tapering windows :param str method: set how the eigenvalues are used when weighting the results. Must be in ['unity', 'adapt', 'eigen']. see below for details. :param bool show: plot results :return: Sk (complex), weights, eigenvalues Usually in spectral estimation the mean to reduce bias is to use tapering window. In order to reduce variance we need to average different spectrum. The problem is that we have only one set of data. Thus we need to decompose a set into several segments. Such method are well-known: simple daniell's periodogram, Welch's method and so on. The drawback of such methods is a loss of resolution since the segments used to compute the spectrum are smaller than the data set. The interest of multitapering method is to keep a good resolution while reducing bias and variance. How does it work? First we compute different simple periodogram with the whole data set (to keep good resolution) but each periodgram is computed with a different tapering windows. Then, we average all these spectrum. To avoid redundancy and bias due to the tapers mtm use special tapers. Method can be eigen, unity or adapt. If *unity*, weights are set to 1. If *eigen* are proportional to the eigen-values. If *adapt*, equations from [2] (P&W pp 368-370) are used. The output is made of 2 matrices called *Sk* and *weights*. The third item stored the eigenvalues. The two matrices have dimensions equal to the number of windows used multiplied by the number of input points. The first matrix stored the spectral results while the second stores the weights. Would you wish to plot the spectrum, you will have to take the means of the different windows and weight down the results before mean(Sk * weigths). Please see the code for details. .. plot:: :width: 80% :include-source: from spectrum import data_cosine, dpss, pmtm data = data_cosine(N=2048, A=0.1, sampling=1024, freq=200) # If you already have the DPSS windows [tapers, eigen] = dpss(2048, 2.5, 4) res = pmtm(data, e=eigen, v=tapers, show=False) # You do not need to compute the DPSS before end res = pmtm(data, NW=2.5, show=False) res = pmtm(data, NW=2.5, k=4, show=True) .. versionchanged:: 0.6.2 APN modified method to return each Sk as complex values, the eigenvalues and the weights """ assert method in ['adapt','eigen','unity'] N = len(x) # if dpss not provided, compute them if e is None and v is None: if NW is not None: [tapers, eigenvalues] = dpss(N, NW, k=k) else: raise ValueError("NW must be provided (e.g. 2.5, 3, 3.5, 4") elif e is not None and v is not None: eigenvalues = e[:] tapers = v[:] else: raise ValueError("if e provided, v must be provided as well and viceversa.") nwin = len(eigenvalues) # length of the eigen values vector to be used later # set the NFFT if NFFT==None: NFFT = max(256, 2**nextpow2(N)) Sk_complex = np.fft.fft(np.multiply(tapers.transpose(), x), NFFT) Sk = abs(Sk_complex)**2 # si nfft smaller thqn N, cut otherwise add wero. # compute if method in ['eigen', 'unity']: if method == 'unity': weights = np.ones((nwin, 1)) elif method == 'eigen': # The S_k spectrum can be weighted by the eigenvalues, as in Park et al. weights = np.array([_x/float(i+1) for i,_x in enumerate(eigenvalues)]) weights = weights.reshape(nwin,1) elif method == 'adapt': # This version uses the equations from [2] (P&W pp 368-370). # Wrap the data modulo nfft if N > nfft sig2 = np.dot(x, x) / float(N) Sk = abs(np.fft.fft(np.multiply(tapers.transpose(), x), NFFT))**2 Sk = Sk.transpose() S = (Sk[:,0] + Sk[:,1]) / 2 # Initial spectrum estimate S = S.reshape(NFFT, 1) Stemp = np.zeros((NFFT,1)) S1 = np.zeros((NFFT,1)) # Set tolerance for acceptance of spectral estimate: tol = 0.0005 * sig2 / float(NFFT) i = 0 a = sig2 * (1 - eigenvalues) wk = np.ones((NFFT, 1)) * eigenvalues.transpose() # converges very quickly but for safety; set i<100 while sum(np.abs(S-S1))/NFFT > tol and i<100: i = i + 1 # calculate weights b1 = np.multiply(S, np.ones((1,nwin))) b2 = np.multiply(S,eigenvalues.transpose()) + np.ones((NFFT,1))*a.transpose() b = b1/b2 # calculate new spectral estimate wk=(b**2)*(np.ones((NFFT,1))*eigenvalues.transpose()) S1 = sum(wk.transpose()*Sk.transpose())/ sum(wk.transpose()) S1 = S1.reshape(NFFT, 1) S, S1 = S1, S # swap S and S1 weights=wk if show is True: print("""To plot the spectrum please use Multitapering class instead of pmtm. Same syntax but more correct plot. This plotting functionality is kept for book-keeping but lacks sampling option, and amplitude is not correct.""") from pylab import semilogy if method == "adapt": Sk = np.mean(Sk * weights, axis=1) else: Sk = np.mean(Sk * weights, axis=0) semilogy(Sk) return Sk_complex, weights, eigenvalues
[docs]def dpss(N, NW=None, k=None): r"""Discrete prolate spheroidal (Slepian) sequences Calculation of the Discrete Prolate Spheroidal Sequences also known as the slepian sequences, and the corresponding eigenvalues. :param int N: desired window length :param float NW: The time half bandwidth parameter (typical values are 2.5,3,3.5,4). :param int k: returns the first k Slepian sequences. If *k* is not provided, *k* is set to *NW*2*. :return: * tapers, a matrix of tapering windows. Matrix is a N by *k* (k is the number of windows) * eigen, a vector of eigenvalues of length *k* The discrete prolate spheroidal or Slepian sequences derive from the following time-frequency concentration problem. For all finite-energy sequences index limited to some set , which sequence maximizes the following ratio: .. math:: \lambda = \frac{\int_{-W}^{W}\left| X(f) \right|^2 df} {\int_{-F_s/2}^{F_s/2}\left| X(f) \right|^2 df} where :math:`F_s` is the sampling frequency and :math:`|W| < F_s/2`. This ratio determines which index-limited sequence has the largest proportion of its energy in the band :math:`[-W,W]` with :math:`0 < \lambda < 1`. The sequence maximizing the ratio is the first discrete prolate spheroidal or Slepian sequence. The second Slepian sequence maximizes the ratio and is orthogonal to the first Slepian sequence. The third Slepian sequence maximizes the ratio of integrals and is orthogonal to both the first and second Slepian sequences and so on. .. note:: Note about the implementation. Since the slepian generation is computationally expensive, we use a C implementation based on the C code written by Lees as published in: Lees, J. M. and J. Park (1995): Multiple-taper spectral analysis: A stand-alone C-subroutine: Computers & Geology: 21, 199-236. However, the original C code has been trimmed. Indeed, we only require the multitap function (that depends on jtridib, jtinvit functions only). .. plot:: :width: 80% :include-source: from spectrum import * from pylab import * N = 512 [w, eigens] = dpss(N, 2.5, 4) plot(w) title('Slepian Sequences N=%s, NW=2.5' % N) axis([0, N, -0.15, 0.15]) legend(['1st window','2nd window','3rd window','4th window']) Windows are normalised: .. math:: \sum_k h_k h_k = 1 :references: [Percival]_ Slepian, D. Prolate spheroidal wave functions, Fourier analysis, and uncertainty V: The discrete case. Bell System Technical Journal, Volume 57 (1978), 1371430 .. note:: the C code to create the slepian windows is extracted from original C code from Lees and Park (1995) and uses the conventions of Percival and Walden (1993). Functions that are not used here were removed. """ assert NW < N/2 , "NW ({}) must be stricly less than N/2 ({}/2)".format(NW, N) if k is None: k = min(round(2*NW),N) k = int(max(k,1)) from numpy import dot, zeros, arange, sqrt mtspeclib.multitap.restype = None lam = zeros(k, dtype=float) tapers = zeros(k*N, dtype=float) tapsum = zeros(k, dtype=float) res = mtspeclib.multitap( c_int(N), c_int(k), lam.ctypes.data_as(c_void_p), c_float(NW), tapers.ctypes.data_as(c_void_p), tapsum.ctypes.data_as(c_void_p), ) # normalisation by sqtr(N). It is required to have normalised windows tapers = tapers.reshape(k,N).transpose() / sqrt(N) for i in range(k): # By convention (Percival and Walden, 1993 pg 379) # * symmetric tapers (k=0,2,4,...) should have a positive average. # * antisymmetric tapers should begin with a positive lobe if i%2 == 0: if tapsum[i]<0: tapsum[i] *= -1 tapers[:,i] *= -1 else: if tapers[0,i] < 0: tapsum[i] *= -1 tapers[:,i] *= -1 # Now find the eigenvalues of the original # Use the autocovariance sequence technique from Percival and Walden, 1993 # pg 390 to get the eigenvalues more precisely (same as matlab output) # The values returned in lam are not exacly the same as in the following methods. acvs = _autocov(tapers.transpose(), debias=False) * N nidx = arange(N) W = float(NW)/N r = 4*W*np.sinc(2*W*nidx) r[0] = 2*W eigvals = dot(acvs, r) #return (tapers, lam) return [tapers, eigvals]
def _other_dpss_method(N, NW, Kmax): """Returns the Discrete Prolate Spheroidal Sequences of orders [0,Kmax-1] for a given frequency-spacing multiple NW and sequence length N. See dpss function that is the official version. This version is indepedant of the C code and relies on Scipy function. However, it is slower by a factor 3 Tridiagonal form of DPSS calculation from: """ # here we want to set up an optimization problem to find a sequence # whose energy is maximally concentrated within band [-W,W]. # Thus, the measure lambda(T,W) is the ratio between the energy within # that band, and the total energy. This leads to the eigen-system # (A - (l1)I)v = 0, where the eigenvector corresponding to the largest # eigenvalue is the sequence with maximally concentrated energy. The # collection of eigenvectors of this system are called Slepian sequences, # or discrete prolate spheroidal sequences (DPSS). Only the first K, # K = 2NW/dt orders of DPSS will exhibit good spectral concentration # [see http://en.wikipedia.org/wiki/Spectral_concentration_problem] # Here I set up an alternative symmetric tri-diagonal eigenvalue problem # such that # (B - (l2)I)v = 0, and v are our DPSS (but eigenvalues l2 != l1) # the main diagonal = ([N-1-2*t]/2)**2 cos(2PIW), t=[0,1,2,...,N-1] # and the first off-diangonal = t(N-t)/2, t=[1,2,...,N-1] # [see Percival and Walden, 1993] from scipy import linalg as la Kmax = int(Kmax) W = float(NW)/N ab = np.zeros((2,N), 'd') nidx = np.arange(N) ab[0,1:] = nidx[1:]*(N-nidx[1:])/2. ab[1] = ((N-1-2*nidx)/2.)**2 * np.cos(2*np.pi*W) # only calculate the highest Kmax-1 eigenvectors l,v = la.eig_banded(ab, select='i', select_range=(N-Kmax, N-1)) dpss = v.transpose()[::-1] # By convention (Percival and Walden, 1993 pg 379) # * symmetric tapers (k=0,2,4,...) should have a positive average. # * antisymmetric tapers should begin with a positive lobe fix_symmetric = (dpss[0::2].sum(axis=1) < 0) for i, f in enumerate(fix_symmetric): if f: dpss[2*i] *= -1 fix_skew = (dpss[1::2,1] < 0) for i, f in enumerate(fix_skew): if f: dpss[2*i+1] *= -1 # Now find the eigenvalues of the original # Use the autocovariance sequence technique from Percival and Walden, 1993 # pg 390 # XXX : why debias false? it's all messed up o.w., even with means # on the order of 1e-2 acvs = _autocov(dpss, debias=False) * N r = 4*W*np.sinc(2*W*nidx) r[0] = 2*W eigvals = np.dot(acvs, r) return dpss, eigvals def _autocov(s, **kwargs): """Returns the autocovariance of signal s at all lags. Adheres to the definition sxx[k] = E{S[n]S[n+k]} = cov{S[n],S[n+k]} where E{} is the expectation operator, and S is a zero mean process """ # only remove the mean once, if needed debias = kwargs.pop('debias', True) axis = kwargs.get('axis', -1) if debias: s = _remove_bias(s, axis) kwargs['debias'] = False return _crosscov(s, s, **kwargs) def _crosscov(x, y, axis=-1, all_lags=False, debias=True): """Returns the crosscovariance sequence between two ndarrays. This is performed by calling fftconvolve on x, y[::-1] Parameters x: ndarray y: ndarray axis: time axis all_lags: {True/False} whether to return all nonzero lags, or to clip the length of s_xy to be the length of x and y. If False, then the zero lag covariance is at index 0. Otherwise, it is found at (len(x) + len(y) - 1)/2 debias: {True/False} Always removes an estimate of the mean along the axis, unless told not to. cross covariance is defined as sxy[k] := E{X[t]*Y[t+k]}, where X,Y are zero mean random processes """ if x.shape[axis] != y.shape[axis]: raise ValueError( 'crosscov() only works on same-length sequences for now' ) if debias: x = _remove_bias(x, axis) y = _remove_bias(y, axis) slicing = [slice(d) for d in x.shape] slicing[axis] = slice(None,None,-1) sxy = _fftconvolve(x, y[tuple(slicing)], axis=axis, mode='full') N = x.shape[axis] sxy /= N if all_lags: return sxy slicing[axis] = slice(N-1,2*N-1) return sxy[tuple(slicing)] def _crosscorr(x, y, **kwargs): """ Returns the crosscorrelation sequence between two ndarrays. This is performed by calling fftconvolve on x, y[::-1] Parameters x: ndarray y: ndarray axis: time axis all_lags: {True/False} whether to return all nonzero lags, or to clip the length of r_xy to be the length of x and y. If False, then the zero lag correlation is at index 0. Otherwise, it is found at (len(x) + len(y) - 1)/2 Notes cross correlation is defined as rxy[k] := E{X[t]*Y[t+k]}/(E{X*X}E{Y*Y})**.5, where X,Y are zero mean random processes. It is the noramlized cross covariance. """ sxy = _crosscov(x, y, **kwargs) # estimate sigma_x, sigma_y to normalize sx = np.std(x) sy = np.std(y) return sxy/(sx*sy) def _remove_bias(x, axis): "Subtracts an estimate of the mean from signal x at axis" padded_slice = [slice(d) for d in x.shape] padded_slice[axis] = np.newaxis mn = np.mean(x, axis=axis) return x - mn[tuple(padded_slice)] def _fftconvolve(in1, in2, mode="full", axis=None): """ Convolve two N-dimensional arrays using FFT. See convolve. This is a fix of scipy.signal.fftconvolve, adding an axis argument and importing locally the stuff only needed for this function """ #Locally import stuff only required for this: from scipy.fftpack import fftn, fft, ifftn, ifft from scipy.signal.signaltools import _centered from numpy import array, product s1 = array(in1.shape) s2 = array(in2.shape) complex_result = (np.issubdtype(in1.dtype, np.complexfloating) or np.issubdtype(in2.dtype, np.complexfloating)) if axis is None: size = s1+s2-1 fslice = tuple([slice(0, int(sz)) for sz in size]) else: equal_shapes = s1==s2 # allow equal_shapes[axis] to be False equal_shapes[axis] = True assert equal_shapes.all(), 'Shape mismatch on non-convolving axes' size = s1[axis]+s2[axis]-1 fslice = [slice(l) for l in s1] fslice[axis] = slice(0, int(size)) fslice = tuple(fslice) # Always use 2**n-sized FFT fsize = (2**np.ceil(np.log2(size))).astype(np.int64) if axis is None: IN1 = fftn(in1,fsize) IN1 *= fftn(in2,fsize) ret = ifftn(IN1)[fslice].copy() else: IN1 = fft(in1,fsize,axis=axis) IN1 *= fft(in2,fsize,axis=axis) ret = ifft(IN1,axis=axis)[fslice].copy() if not complex_result: del IN1 ret = ret.real if mode == "full": return ret elif mode == "same": if product(s1,axis=0) > product(s2,axis=0): osize = s1 else: osize = s2 return _centered(ret,osize) elif mode == "valid": return _centered(ret,abs(s2-s1)+1)