import logging
import numpy as np
from .psd import ParametricSpectrum
__all__ = ["arcovar", "arcovar_marple", "pcovar", 'arcovar_marple']
[docs]def arcovar_marple(x, order):
r"""Estimate AR model parameters using covariance method
This implementation is based on [Marple]_. This code is far more
complicated and slower than :func:`arcovar` function, which is now the official version.
See :func:`arcovar` for a detailed description of Covariance method.
This function should be used in place of arcovar only if order<=4, for
which :func:`arcovar` does not work.
Fast algorithm for the solution of the covariance least squares normal
equations from Marple.
:param array X: Array of complex data samples
:param int oder: Order of linear prediction model
:return:
* AF - Array of complex forward linear prediction coefficients
* PF - Real forward linear prediction variance at order IP
* AB - Array of complex backward linear prediction coefficients
* PB - Real backward linear prediction variance at order IP
* PV - store linear prediction coefficients
.. note:: this code and the original code in Marple diverge for ip>10.
it seems that this is related to single precision used with
complex type in fortran whereas numpy uses double precision for
complex type.
:validation: the AR parameters are the same as those returned by
a completely different function :func:`arcovar`.
:References: [Marple]_
"""
assert len(x) >= order, "X must be dimensioned >=N"
# ----------------------------------------------------- Initialization
x = np.array(x)
N = len(x)
# Equations 8.C.42
r0 = sum(abs(x)**2.)
r1 = abs(x[0])**2
rN = abs(x[N-1])**2
pf = r0 - r1
pb = r0 - rN
delta = 1. - r1 / r0
gamma = 1. - rN / r0
c = np.zeros(N, dtype=complex)
d = np.zeros(N, dtype=complex)
r = np.zeros(N, dtype=complex)
af = np.zeros(N, dtype=complex)
ab = np.zeros(N, dtype=complex)
c[0] = x[N-1].conjugate() / r0
d[0] = x[0].conjugate() / r0
# special case
if order == 0:
pf = r0 / float(N)
pb = pf
return af, pf, ab, pb, 0
# ---------------------------------------------------------- MAIN LOOP
#ip +1 because we want to enter in the loop to run the first part of the code.
pbv = []
for m in range(0, order+1):
logging.debug('----------------------------m={}'.format(m))
logging.debug(c[0:2])
logging.debug(d[0:2])
r1 = 1./pf
r2 = 1./pb
r3 = 1./delta
r4 = 1./gamma
#logging.debug('starting r1r2r3r4=', r1, r2, r3, r4, pf, pb, delta, gamma)
#Order update: AF and AB vectors ; time update: C and D vectors
temp = 0.+0.j
for k in range(m+1, N):
temp = temp + x[k]*x[k-m-1].conjugate()
r[m] = temp.conjugate()
theta = x[0] * c[m]
#print(('theta', theta))
# print(('c=', c[0:2]))
# print(('d=', d[0:2]))
if m == 0:
pass
else:
for k in range(0, m):
theta = theta + x[m-k] * c[k] # Eq. (8.C.39)
r[k] = r[k] - x[N-m-1] * x[N-m+k].conjugate() # Eq. (8.C.32)
temp = temp + af[m-k-1] * r[k].conjugate()
#print 'loop1 k=', k
#print ' theta=',theta, 'r[k]=',r[k], 'temp=', temp
#print ' c=',c[k], 'af=',af[m-k-1]
"""if m > 0:
if debug:
print((m, N-m))
print(('Xk=0',x[m-0],x[N-m-1], x[N-m+0]))
if m > 1:
if debug:
print('Xk=1',x[m-1],x[N-m-1], x[N-m+1])
"""
c1 = -temp * r2
c2 = -r1 * temp.conjugate()
c3 = theta * r3
c4 = r4 *theta.conjugate()
#if debug:
# print('c1 c2 c3 c4 before af=',c1 ,c2 ,c3 ,c4)
af[m] = c1 # ! Eq. (8.C.19)
ab[m] = c2 # ! Eq. (8.C.22)
save = c[m]
c[m] = save + c3*d[m]
d[m] = d[m] + c4*save
#if debug:
# print('res',m,'af[m]=',af[m], ab[m], save, 'temp=',temp)
if m == 0:
pass
else:
#if debug:print('af before', af[0:2])
for k in range(0, m):
save = af[k]
af[k] = save + c1 * ab[m-k-1] # Eq. (8.C.18)
ab[m-k-1] = ab[m-k-1] + c2 * save # Eq. (8.C.21)
save = c[k]
c[k] = save + c3*d[k] # Eq. (8.C.37)
d[k] = d[k] + c4*save # Eq. (8.C.38)
#if debug:
# print('loop2 k=', k)
# print(' af[k]=', af[k])
# print(' ab[m-k-1]=', ab[m-k-1])
# print(' c[k]=', c[k])
# print(' d[k]=', d[k])
#if debug:
# print('af after=', af[0:2])
# print('ab=', ab[0:2])
r5 = temp.real**2 + temp.imag**2
pf = pf - r5*r2 # Eq. (8.C.20)
pb = pb - r5*r1 # Eq. (8.C.23)
r5 = theta.real**2 + theta.imag**2
delta = delta - r5*r4 # Eq. (8.C.39)
gamma = gamma - r5*r3 # Eq. (8.C.40)
#if debug:
# print('r5r2r1deltagamma', r5, r2, r1 , delta, gamma)
# print('pf before norm', pf, pb, N-m-1)
if m != order-1:
pass
else:
pf = pf / float(N-m-1)
pb = pb / float(N-m-1)
#if debug:
# print('ENDING', N-m-1)
break
#if debug:
# print('pf and pb', pf, pb)
if pf > 0 and pb > 0:
pass
else:
ValueError("Negative PF or PB value")
if (delta > 0. and delta <=1 and gamma > 0. and gamma <=1):
pass
else:
ValueError("Invalid delta or gamma value")
#C Time update: AF and AB vectors; order update: C and D vectors
r1 = 1./pf
r2 = 1./pb
r3 = 1./delta
r4 = 1./gamma
#if debug:
# print('--------time update', r1, r2, r3, r4, m+1, N-m-1, x[m+1], x[N-m-2])
ef = x[m+1]
eb = x[(N-1)-m-1]
for k in range(0,m+1):
#print 'k=', k, 'ef=', ef, ' eb=',eb,' af=',af[k], ab[k]
#print x[m-k],x[N-m+k-1]
ef = ef + af[k] * x[m-k] # Eq. (8.C.1)
eb = eb + ab[k] * x[N-m+k-1] # Eq. (8.C.2)
#ef = sum(af)
#if debug:
# print('efweb', ef , eb)
c1 = ef*r3
c2 = eb*r4
c3 = eb.conjugate() * r2
c4 = ef.conjugate() * r1
#if debug:
# print('c1c2c3c4', c1, c2, c3, c4)
# print('af before', af[0:2])
for k in range(m, -1, -1):
save = af[k]
af[k] = save + c1 * d[k] # Eq. (8.C.33)
d[k+1] = d[k] + c4 * save # Eq. (8.C.25)
save = ab[k]
ab[k] = save + c2 * c[m-k] # Eq. (8.C.35)
c[m-k] = c[m-k] + c3 * save # Eq. (8.C.24)
#if debug:
# print('af after', af[0:2])
# print('d', d[0:2])
# print('ab', ab[0:2])
# print('c', c[0:2])
#if debug:print('Pb before', pf, pb)
c[m+1] = c3
d[0] = c4
#r5 = abs(ef)**2
r5 = ef.real**2 + ef.imag**2
pf = pf - r5 * r3 # Eq. (8.C.34)
delta = delta-r5 * r1 # Eq. (8.C.30)
#r5 = abs(eb)**2
r5 = eb.real**2 + eb.imag**2
pb = pb - r5 * r4 # Eq. (8.C.36)
#if debug:
# print('Pb---------------------', m, pb, r5, r4)
gamma = gamma-r5*r2 # Eq. (8.C.31)
pbv.append(pb)
if (pf > 0. and pb > 0.):
pass
else:
ValueError("Negative PF or PB value")
#if debug:
# print(delta, gamma)
if (delta > 0. and delta <= 1.) and (gamma > 0. and gamma <= 1.):
pass
else:
ValueError("Invalid delta or gamma value")
#af=array of forward coeff
#ab=array of barward coeff
#pb=backward variance
#pf=forward variance
return af, pf, ab, pb, pbv
[docs]def arcovar(x, order):
r"""Simple and fast implementation of the covariance AR estimate
This code is 10 times faster than :func:`arcovar_marple` and more importantly
only 10 lines of code, compared to a 200 loc for :func:`arcovar_marple`
:param array X: Array of complex data samples
:param int oder: Order of linear prediction model
:return:
* a - Array of complex forward linear prediction coefficients
* e - error
The covariance method fits a Pth order autoregressive (AR) model to the
input signal, which is assumed to be the output of
an AR system driven by white noise. This method minimizes the forward
prediction error in the least-squares sense. The output vector
contains the normalized estimate of the AR system parameters
The white noise input variance estimate is also returned.
If is the power spectral density of y(n), then:
.. math:: \frac{e}{\left| A(e^{jw}) \right|^2} = \frac{e}{\left| 1+\sum_{k-1}^P a(k)e^{-jwk}\right|^2}
Because the method characterizes the input data using an all-pole model,
the correct choice of the model order p is important.
.. plot::
:width: 80%
:include-source:
from spectrum import arcovar, marple_data, arma2psd
from pylab import plot, log10, linspace, axis
ar_values, error = arcovar(marple_data, 15)
psd = arma2psd(ar_values, sides='centerdc')
plot(linspace(-0.5, 0.5, len(psd)), 10*log10(psd/max(psd)))
axis([-0.5, 0.5, -60, 0])
.. seealso:: :class:`pcovar`
:validation: the AR parameters are the same as those returned by
a completely different function :func:`arcovar_marple`.
:References: [Mathworks]_
"""
from spectrum import corrmtx
import scipy.linalg
X = corrmtx(x, order, 'covariance')
Xc = np.array(X[:, 1:])
X1 = np.array(X[:, 0])
# Coefficients estimated via the covariance method
# Here we use lstsq rathre than solve function because Xc is not square
# matrix
a, _residues, _rank, _singular_values = scipy.linalg.lstsq(-Xc, X1)
# Estimate the input white noise variance
Cz = np.dot(X1.conj().transpose(), Xc)
e = np.dot(X1.conj().transpose(), X1) + np.dot(Cz, a)
assert e.imag < 1e-4, 'wierd behaviour'
e = float(e.real) # ignore imag part that should be small
return a, e
[docs]class pcovar(ParametricSpectrum):
"""Class to create PSD based on covariance algorithm
See :func:`arcovar` for description.
.. plot::
:width: 80%
:include-source:
from spectrum import pcovar, marple_data
p = pcovar(marple_data, 15, NFFT=4096)
p.plot(sides='centerdc')
.. seealso:: :class:`arcovar`
"""
def __init__(self, data, order, NFFT=None, sampling=1.,
scale_by_freq=False):
"""**Constructor**
For a detailled description of the parameters, see :func:`arcovar`.
:param array data: input data (list or numpy.array)
:param int order:
:param int NFFT: total length of the final data sets (padded
with zero if needed; default is 4096)
:param float sampling: sampling frequency of the input :attr:`data`.
"""
super(pcovar, self).__init__(data, ar_order=order,
NFFT=NFFT, sampling=sampling,
scale_by_freq=scale_by_freq)
def __call__(self):
from spectrum import arma2psd
ar, _e = arcovar(self.data, self.ar_order)
self.ar = ar
psd = arma2psd(A=ar, T=self.sampling, NFFT=self.NFFT)
if self.datatype == 'real':
if self.NFFT % 2 == 0:
newpsd = psd[0:int(self.NFFT/2+1)] * 2
else:
newpsd = psd[0:int((self.NFFT+1)/2)] * 2
self.psd = newpsd
else:
self.psd = psd
if self.scale_by_freq is True:
self.scale()
return self
def _str_title(self):
return "Covariance PSD estimate\n"
def __str__(self):
return super(pcovar, self).__str__()