import logging
import numpy as np
from numpy.fft import fft
from numpy.linalg import svd
from spectrum import default_NFFT
from .psd import ParametricSpectrum
from .tools import twosided_2_onesided, centerdc_2_twosided, twosided_2_centerdc
from scipy import linalg
__all__ = ["music", "ev", "pmusic", "eigen", "pev"]
[docs]class pmusic(ParametricSpectrum):
"""Class to create PSD using ARMA estimator.
See :func:`pmusic` and :func:`eigenfre` for description.
.. plot::
:width: 80%
:include-source:
from spectrum import pmusic, marple_data
p = pmusic(marple_data, 15, NFFT=4096)
p.plot()
Another example using a two sinusoidal components::
n = arange(200)
x = cos(0.257*pi*n) + sin(0.2*pi*n) + 0.01*randn(size(n));
p = pmusic(x, 6,4)
p.plot()
"""
def __init__(self, data, IP, NSIG=None, NFFT=None, sampling=1.,
threshold=None, criteria="aic", verbose=False, scale_by_freq=False):
"""**Constructor:**
For a detailed description of the parameters, see :func:`arma_estimate`.
:param array data: input data (list or numpy.array)
:param int P: maximum number of eigen values to compute. NSIG (if
specified) must therefore be less than P.
:param int NSIG: If specified, the signal sub space uses NSIG eigen values.
:param int NFFT: total length of the final data sets (padded with zero if needed; default is 4096)
:param float sampling: sampling frequency of the input :attr:`data`.
"""
super(pmusic, self).__init__(data, ar_order=IP,
scale_by_freq=scale_by_freq, NFFT=NFFT, sampling=sampling)
self.NSIG = NSIG
self.threshold = threshold
self.criteria = criteria
self.verbose = verbose
def __call__(self):
psd, eigenvalues = eigen(self.data, self.ar_order, NSIG=self.NSIG,
NFFT=self.NFFT, threshold=self.threshold, criteria=self.criteria,
verbose=self.verbose, method='music')
self.eigenvalues = eigenvalues
if self.datatype == 'real':
if self.NFFT % 2 == 0:
newpsd = psd[0:int(self.NFFT/2+1)] * 2
else:
newpsd = psd[0:int((self.NFFT+1)/2)] * 2
# we need to flip the data
self.psd = newpsd[::-1]
else:
self.psd = centerdc_2_twosided(psd)
self.scale()
return self
def _str_title(self):
return "Music PSD estimate\n"
def __str__(self):
return super(pmusic, self).__str__()
[docs]class pev(ParametricSpectrum):
"""Class to create PSD using ARMA estimator.
See :func:`eigenfre` for description.
.. plot::
:width: 80%
:include-source:
from spectrum import pev, marple_data
p = pev(marple_data, 15, NFFT=4096)
p.plot()
"""
def __init__(self, data, IP, NSIG=None, NFFT=None, sampling=1.,
scale_by_freq=False,
threshold=None, criteria="aic", verbose=False):
"""**Constructor:**
For a detailed description of the parameters, see :func:`arma_estimate`.
:param array data: input data (list or numpy.array)
:param int P: maximum number of eigen values to compute. NSIG (if
specified) must therefore be less than P.
:param int NSIG: If specified, the signal sub space uses NSIG eigen values.
:param int NFFT: total length of the final data sets (padded with
zero if needed; default is 4096)
:param float sampling: sampling frequency of the input :attr:`data`.
"""
super(pev, self).__init__(data, ar_order=IP,
scale_by_freq=scale_by_freq, NFFT=NFFT, sampling=sampling)
self.NSIG = NSIG
self.threshold = threshold
self.criteria = criteria
self.verbose = verbose
def __call__(self):
psd, eigenvalues = eigen(self.data, self.ar_order, NSIG=self.NSIG,
NFFT=self.NFFT, threshold=self.threshold, criteria=self.criteria,
verbose=self.verbose, method='ev')
self.eigenvalues = eigenvalues
if self.datatype == 'real':
if self.NFFT % 2 == 0:
newpsd = psd[0:int(self.NFFT/2+1)] * 2
else:
newpsd = psd[0:int((self.NFFT+1)/2)] * 2
# we need to flip the data
self.psd = newpsd[::-1]
else:
self.psd = centerdc_2_twosided(psd)
self.scale()
return self
def _str_title(self):
return "EV PSD estimate\n"
def __str__(self):
return super(pev, self).__str__()
def music(X, IP, NSIG=None, NFFT=default_NFFT, threshold=None, criteria='aic',
verbose=False):
"""Eigen value pseudo spectrum estimate. See :func:`eigenfre`"""
return eigen(X, IP, NSIG=NSIG, method='music', NFFT=NFFT,
threshold=threshold, criteria=criteria, verbose=verbose)
def ev(X, IP, NSIG=None, NFFT=default_NFFT, threshold=None, criteria='aic',
verbose=False):
"""Eigen value pseudo spectrum estimate. See :func:`eigenfre`"""
return eigen(X, IP, NSIG=NSIG, method='ev', NFFT=NFFT,
threshold=threshold, criteria=criteria, verbose=verbose)
[docs]def eigen(X, P, NSIG=None, method='music', threshold=None, NFFT=default_NFFT,
criteria='aic', verbose=False):
r"""Pseudo spectrum using eigenvector method (EV or Music)
This function computes either the Music or EigenValue (EV) noise
subspace frequency estimator.
First, an autocorrelation matrix of order `P` is computed from
the data. Second, this matrix is separated into vector subspaces,
one a signal subspace and the other a noise
subspace using a SVD method to obtain the eigen values and vectors.
From the eigen values :math:`\lambda_i`, and eigen vectors :math:`v_k`,
the **pseudo spectrum** (see note below) is computed as follows:
.. math:: P_{ev}(f) = \frac{1}{e^H(f)\left(\sum\limits_{k=M+1}^{p} \frac{1}{\lambda_k}v_kv_k^H\right)e(f)}
The separation of the noise and signal subspaces requires expertise
of the signal. However, AIC and MDL criteria may be used to automatically
perform this task.
You still need to provide the parameter `P` to indicate the maximum number
of eigen values to be computed. The criteria will just select a subset
to estimate the pseudo spectrum (see :func:`~spectrum.criteria.aic_eigen`
and :func:`~spectrum.criteria.mdl_eigen` for details.
.. note:: **pseudo spectrum**. func:`eigen` does not compute a PSD estimate.
Indeed, the method does not preserve the measured process power.
:param X: Array data samples
:param int P: maximum number of eigen values to compute. NSIG (if
specified) must therefore be less than P.
:param str method: 'music' or 'ev'.
:param int NSIG: If specified, the signal sub space uses NSIG eigen values.
:param float threshold: If specified, the signal sub space is made of the
eigen values larger than :math:`\rm{threshold} \times \lambda_{min}`,
where :math:`\lambda_{min}` is the minimum eigen values.
:param int NFFT: total length of the final data sets (padded with zero
if needed; default is 4096)
:return:
* PSD: Array of real frequency estimator values (two sided for
complex data and one sided for real data)
* S, the eigen values
.. plot::
:width: 80%
:include-source:
from spectrum import eigen, marple_data
from pylab import plot, log10, linspace, legend, axis
psd, ev = eigen(marple_data, 15, NSIG=11)
f = linspace(-0.5, 0.5, len(psd))
plot(f, 10 * log10(psd/max(psd)), label='User defined')
psd, ev = eigen(marple_data, 15, threshold=2)
plot(f, 10 * log10(psd/max(psd)), label='threshold method (100)')
psd, ev = eigen(marple_data, 15)
plot(f, 10 * log10(psd/max(psd)), label='AIC method (8)')
legend()
axis([-0.5, 0.5, -120, 0])
.. seealso::
:func:`pev`,
:func:`pmusic`,
:func:`~spectrum.criteria.aic_eigen`
:References: [Marple]_, Chap 13
.. todo:: for developers:
* what should be the second argument of the criteria N, N-P, P...?
* what should be the max value of NP
"""
if method not in ['music', 'ev']:
raise ValueError("method must be 'music' or 'ev'")
if NSIG != None and threshold != None:
raise ValueError("NSIG and threshold cannot be provided together")
if NSIG is not None:
if NSIG < 0:
raise ValueError('NSIG must be positive')
if NSIG >= P:
raise ValueError("NSIG must be stricly less than IP")
#
N = len(X)
NP = N - P
assert 2 * NP > P-1, 'decrease the second argument'
if NP > 100:
NP = 100
FB = np.zeros((2*NP, P), dtype=complex)
#FB = numpy.zeros((MAXU, IP), dtype=complex)
Z = np.zeros(NFFT, dtype=complex)
PSD = np.zeros(NFFT)
# These loops can surely be replaced by a function that create such matrix
for I in range(0, NP):
for K in range(0, P):
FB[I, K] = X[I-K+P-1]
FB[I+NP, K] = X[I+K+1].conjugate()
# This PR #71 does not pass the test
#FB[:NP, :] = linalg.toeplitz(X[P - 1 : NP+P-1], X[P - 1 :: -1])
#FB[NP:, :] = linalg.hankel(X[1 : -P + 1].conjugate(), X[-P:NP+P].conjugate())
# This commented line produces the correct FB, as the 2 for loops above
# It is more elegant but slower...corrmtx needs to be optimised (20/4/11)
#FB2 = spectrum.linalg.corrmtx(X, P-1, method='modified')
#Compute the eigen values / vectors
_U, S, V = svd (FB)
# U and V are not the same as in Marple. Real or Imaginary absolute values
# are correct but signs are not. This is wierd because the svd function
# gives the same result as cvsd in Marple. Is FB correct ? it seems so.
# The following operation has to be done. Otherwise, the resulting PSD is
# not corect
V = -V.transpose()
NSIG = _get_signal_space(S, 2*NP,
verbose=verbose, threshold=threshold,
NSIG=NSIG, criteria=criteria)
#C AI or Expert Knowledge to choose "signal" singular values, or input
#C NSIG at this point
for I in range(NSIG, P):
Z[0:P] = V[0:P, I]
Z[P:NFFT] = 0
Z = fft(Z, NFFT)
if method == 'music':
PSD = PSD + abs(Z)**2.
elif method == 'ev' :
PSD = PSD + abs(Z)**2. / S[I]
PSD = 1./PSD
# for some reasons, we need to rearrange the output. this is related to
# the way U and V are order in the routine svd
nby2 = int(NFFT/2)
#return PSD, S
newpsd = np.append(PSD[nby2:0:-1], PSD[nby2*2-1:nby2-1:-1])
return newpsd, S
def _get_signal_space(S, NP, verbose=False, threshold=None, NSIG=None,
criteria='aic'):
"""todo
"""
from .criteria import aic_eigen, mdl_eigen
# This section selects automatically the noise and signal subspaces.
# NSIG being the number of eigenvalues corresponding to signals.
if NSIG is None:
if threshold is None:
logging.debug('computing NSIG using AIC method')
# get the minimum index of the AIC vector
if criteria == 'aic':
aic = aic_eigen(S, NP*2)
elif criteria == 'mdl':
aic = mdl_eigen(S, NP*2)
# get the minimum index of the AIC vector, add 1 to get the NSIG
NSIG = np.argmin(aic) + 1
logging.debug('NSIG={} found as the number of pertinent sinusoids'.format(NSIG))
else:
logging.debug('computing NSIG using user threshold ')
# following an idea from Matlab, pmusic, we look at the minimum
# eigen value, and split the eigen values above and below
# K times min eigen value, where K is >1
m = threshold * min(S)
new_s = S[np.where(S>m)]
NSIG = len(new_s)
logging.debug('found {}'.format(NSIG))
if NSIG == 0:
NSIG = 1
return NSIG