Source code for spectrum.arma

"""ARMA and MA estimates, ARMA and MA PSD estimates.

.. topic:: ARMA model and Power Spectral Densities.

    .. autosummary::
       :nosignatures:

       arma_estimate
       ma
       arma2psd
       parma
       pma

    .. codeauthor:: Thomas Cokelaer 2011

    :References: See [Marple]_
"""
import numpy as np
from numpy.fft import fft

from spectrum.correlation import CORRELATION
from spectrum.covar import arcovar, arcovar_marple
import spectrum.yulewalker as yulewalker
from spectrum.psd import ParametricSpectrum


__all__ = ["arma2psd", "arma_estimate", "ma", "pma", "parma"]


[docs]def arma2psd(A=None, B=None, rho=1., T=1., NFFT=4096, sides='default', norm=False): r"""Computes power spectral density given ARMA values. This function computes the power spectral density values given the ARMA parameters of an ARMA model. It assumes that the driving sequence is a white noise process of zero mean and variance :math:`\rho_w`. The sampling frequency and noise variance are used to scale the PSD output, which length is set by the user with the `NFFT` parameter. :param array A: Array of AR parameters (complex or real) :param array B: Array of MA parameters (complex or real) :param float rho: White noise variance to scale the returned PSD :param float T: Sampling frequency in Hertz to scale the PSD. :param int NFFT: Final size of the PSD :param str sides: Default PSD is two-sided, but sides can be set to centerdc. .. warning:: By convention, the AR or MA arrays does not contain the A0=1 value. If :attr:`B` is None, the model is a pure AR model. If :attr:`A` is None, the model is a pure MA model. :return: two-sided PSD .. rubric:: Details: AR case: the power spectral density is: .. math:: P_{ARMA}(f) = T \rho_w \left|\frac{B(f)}{A(f)}\right|^2 where: .. math:: A(f) = 1 + \sum_{k=1}^q b(k) e^{-j2\pi fkT} .. math:: B(f) = 1 + \sum_{k=1}^p a(k) e^{-j2\pi fkT} .. rubric:: **Example:** .. plot:: :width: 80% :include-source: import spectrum.arma from pylab import plot, log10, legend plot(10*log10(spectrum.arma.arma2psd([1,0.5],[0.5,0.5])), label='ARMA(2,2)') plot(10*log10(spectrum.arma.arma2psd([1,0.5],None)), label='AR(2)') plot(10*log10(spectrum.arma.arma2psd(None,[0.5,0.5])), label='MA(2)') legend() :References: [Marple]_ """ if NFFT is None: NFFT = 4096 if A is None and B is None: raise ValueError("Either AR or MA model must be provided") psd = np.zeros(NFFT, dtype=complex) if A is not None: ip = len(A) den = np.zeros(NFFT, dtype=complex) den[0] = 1.+0j for k in range(0, ip): den[k+1] = A[k] denf = fft(den, NFFT) if B is not None: iq = len(B) num = np.zeros(NFFT, dtype=complex) num[0] = 1.+0j for k in range(0, iq): num[k+1] = B[k] numf = fft(num, NFFT) # Changed in version 0.6.9 (divided by T instead of multiply) if A is not None and B is not None: psd = rho / T * abs(numf)**2. / abs(denf)**2. elif A is not None: psd = rho / T / abs(denf)**2. elif B is not None: psd = rho / T * abs(numf)**2. psd = np.real(psd) # The PSD is a twosided PSD. # to obtain the centerdc if sides != 'default': from . import tools assert sides in ['centerdc'] if sides == 'centerdc': psd = tools.twosided_2_centerdc(psd) if norm == True: psd /= max(psd) return psd
[docs]def arma_estimate(X, P, Q, lag): """Autoregressive and moving average estimators. This function provides an estimate of the autoregressive parameters, the moving average parameters, and the driving white noise variance of an ARMA(P,Q) for a complex or real data sequence. The parameters are estimated using three steps: * Estimate the AR parameters from the original data based on a least squares modified Yule-Walker technique, * Produce a residual time sequence by filtering the original data with a filter based on the AR parameters, * Estimate the MA parameters from the residual time sequence. :param array X: Array of data samples (length N) :param int P: Desired number of AR parameters :param int Q: Desired number of MA parameters :param int lag: Maximum lag to use for autocorrelation estimates :return: * A - Array of complex P AR parameter estimates * B - Array of complex Q MA parameter estimates * RHO - White noise variance estimate .. note:: * lag must be >= Q (MA order) **dependencies**: * :meth:`spectrum.correlation.CORRELATION` * :meth:`spectrum.covar.arcovar` * :meth:`spectrum.arma.ma` .. plot:: :width: 80% :include-source: from spectrum import arma_estimate, arma2psd, marple_data import pylab a,b, rho = arma_estimate(marple_data, 15, 15, 30) psd = arma2psd(A=a, B=b, rho=rho, sides='centerdc', norm=True) pylab.plot(10 * pylab.log10(psd)) pylab.ylim([-50,0]) :reference: [Marple]_ """ R = CORRELATION(X, maxlags=lag, norm='unbiased') R0 = R[0] #C Estimate the AR parameters (no error weighting is used). #C Number of equation errors is M-Q . MPQ = lag - Q + P N = len(X) Y = np.zeros(N-P, dtype=complex) for K in range(0, MPQ): KPQ = K + Q - P+1 if KPQ < 0: Y[K] = R[-KPQ].conjugate() if KPQ == 0: Y[K] = R0 if KPQ > 0: Y[K] = R[KPQ] # The resize is very important for the normalissation. Y.resize(lag, refcheck=False) if P <= 4: res = arcovar_marple(Y.copy(), P) #! Eq. (10.12) ar_params = res[0] else: res = arcovar(Y.copy(), P) #! Eq. (10.12) ar_params = res[0] # the .copy is used to prevent a reference somewhere. this is a bug # to be tracked down. Y.resize(N-P, refcheck=False) #C Filter the original time series for k in range(P, N): SUM = X[k] #SUM += sum([ar_params[j]*X[k-j-1] for j in range(0,P)]) for j in range(0, P): SUM = SUM + ar_params[j] * X[k-j-1] #! Eq. (10.17) Y[k-P] = SUM # Estimate the MA parameters (a "long" AR of order at least 2*IQ #C is suggested) #Y.resize(N-P) ma_params, rho = ma(Y, Q, 2*Q) #! Eq. (10.3) return ar_params, ma_params, rho
[docs]class parma(ParametricSpectrum): """Class to create PSD using ARMA estimator. See :func:`arma_estimate` for description. .. plot:: :width: 80% :include-source: from spectrum import parma, marple_data p = parma(marple_data, 4, 4, 30, NFFT=4096) p.plot(sides='centerdc') """ def __init__(self, data, P, Q, lag, NFFT=None, sampling=1., scale_by_freq=False): """**Constructor:** For a detailed description of the parameters, see :func:`arma_estimate`. :param array data: input data (list or numpy.array) :param int P: :param int Q: :param int lag: :param int NFFT: total length of the final data sets (padded with zero if needed; default is 4096) :param float sampling: sampling frequency of the input :attr:`data`. """ super(parma, self).__init__(data, ma_order=Q, ar_order=P, lag=lag, NFFT=NFFT, sampling=sampling, scale_by_freq=scale_by_freq) self.lag = lag def __call__(self): ar_params, ma_params, rho = arma_estimate(self.data, self.ar_order, self.ma_order, self.lag) self.ma = ma_params self.ar = ar_params self.rho = rho psd = arma2psd(A=self.ar, B=self.ma, rho=self.rho, T=self.sampling, NFFT=self.NFFT) #self.psd = psd if self.datatype == 'real': if self.NFFT % 2 == 0: newpsd = psd[0:int(self.NFFT/2+1)] * 2 else: newpsd = psd[0:int((self.NFFT+1)/2)] * 2 self.psd = newpsd else: self.psd = psd if self.scale_by_freq is True: self.scale() return self def _str_title(self): return "ARMA PSD estimate\n" def __str__(self): return super(parma, self).__str__()
[docs]class pma(ParametricSpectrum): """Class to create PSD using MA estimator. See :func:`ma` for description. .. plot:: :width: 80% :include-source: from spectrum import pma, marple_data p = pma(marple_data, 15, 30, NFFT=4096) p.plot(sides='centerdc') """ def __init__(self, data, Q, M, NFFT=None, sampling=1., scale_by_freq=False): """**Constructor:** For a detailed description of the parameters, see :func:`ma`. :param array data: input data (list or numpy.array) :param int Q: MA order :param int M: AR model used to estimate the MA parameters :param int NFFT: total length of the final data sets (padded with zero if needed; default is 4096) :param float sampling: sampling frequency of the input :attr:`data`. """ super(pma, self).__init__(data, ma_order=Q, ar_order=M, NFFT=NFFT, sampling=sampling, scale_by_freq=scale_by_freq) def __call__(self): ma_params, rho = ma(self.data, self.ma_order, self.ar_order) self.ma = ma_params self.rho = rho psd = arma2psd(A=None, B=self.ma, rho=self.rho, T=self.sampling, NFFT=self.NFFT) if self.datatype == 'real': if self.NFFT % 2 == 0: newpsd = psd[0:int(self.NFFT/2+1)] * 2 else: newpsd = psd[0:int((self.NFFT+1)/2)] * 2 self.psd = newpsd else: self.psd = psd if self.scale_by_freq is True: self.scale() self.modified = False def _str_title(self): return "MA (moving average) PSD estimate\n" def __str__(self): return super(pma, self).__str__()
[docs]def ma(X, Q, M): """Moving average estimator. This program provides an estimate of the moving average parameters and driving noise variance for a data sequence based on a long AR model and a least squares fit. :param array X: The input data array :param int Q: Desired MA model order (must be >0 and <M) :param int M: Order of "long" AR model (suggest at least 2*Q ) :return: * MA - Array of Q complex MA parameter estimates * RHO - Real scalar of white noise variance estimate .. plot:: :width: 80% :include-source: from spectrum import arma2psd, ma, marple_data import pylab # Estimate 15 Ma parameters b, rho = ma(marple_data, 15, 30) # Create the PSD from those MA parameters psd = arma2psd(B=b, rho=rho, sides='centerdc') # and finally plot the PSD pylab.plot(pylab.linspace(-0.5, 0.5, 4096), 10 * pylab.log10(psd/max(psd))) pylab.axis([-0.5, 0.5, -30, 0]) :reference: [Marple]_ """ if Q <= 0 or Q >= M: raise ValueError('Q(MA) must be in ]0,lag[') #C Fit a high-order AR to the data a, rho, _c = yulewalker.aryule(X, M, 'biased') #! Eq. (10.5) #add an element unity to the AR parameter array a = np.insert(a, 0, 1) #C Find MA parameters from autocorrelations by Yule-Walker method ma_params, _p, _c = yulewalker.aryule(a, Q, 'biased') #! Eq. (10.7) return ma_params, rho