Source code for spectrum.covar

import logging

import numpy as np
from .psd import ParametricSpectrum

__all__ = ["arcovar", "arcovar_marple", "pcovar", 'arcovar_marple']

[docs]def arcovar_marple(x, order):
r"""Estimate AR model parameters using covariance method

This implementation is based on [Marple]_. This code is far more
complicated and slower than :func:arcovar function, which is now the official version.
See :func:arcovar for a detailed description of Covariance method.

This function should be used in place of arcovar only if order<=4, for
which :func:arcovar does not work.

Fast algorithm for the solution of the covariance least squares normal
equations from Marple.

:param array X:  Array of complex data samples
:param int oder: Order of linear prediction model

:return:
* AF   - Array of complex forward linear prediction coefficients
* PF   - Real forward linear prediction variance at order IP
* AB   - Array of complex backward linear prediction coefficients
* PB   - Real backward linear prediction variance at order IP
* PV   - store linear prediction coefficients

.. note:: this code and the original code in Marple diverge for ip>10.
it seems that this is related to single precision used with
complex type in fortran whereas numpy uses double precision for
complex type.

:validation: the AR parameters are the same as those returned by
a completely different function :func:arcovar.

:References: [Marple]_
"""
assert len(x) >= order, "X must be dimensioned >=N"

#   ----------------------------------------------------- Initialization
x = np.array(x)
N = len(x)

# Equations 8.C.42
r0 = sum(abs(x)**2.)
r1 = abs(x[0])**2
rN = abs(x[N-1])**2

pf = r0 - r1
pb = r0 - rN

delta = 1. - r1 / r0
gamma = 1. - rN / r0

c = np.zeros(N, dtype=complex)
d = np.zeros(N, dtype=complex)
r = np.zeros(N, dtype=complex)
af = np.zeros(N, dtype=complex)
ab = np.zeros(N, dtype=complex)

c[0] = x[N-1].conjugate() / r0
d[0] = x[0].conjugate() / r0

# special case
if order == 0:
pf = r0 / float(N)
pb = pf
return af, pf, ab, pb, 0

# ----------------------------------------------------------  MAIN LOOP

#ip +1 because we want to enter in the loop to run the first part of the code.
pbv = []
for m in range(0, order+1):
logging.debug('----------------------------m={}'.format(m))
logging.debug(c[0:2])
logging.debug(d[0:2])
r1 = 1./pf
r2 = 1./pb
r3 = 1./delta
r4 = 1./gamma

#logging.debug('starting r1r2r3r4=', r1, r2, r3, r4, pf, pb, delta, gamma)

#Order update: AF and AB vectors ; time update: C and D vectors
temp = 0.+0.j
for k in range(m+1, N):
temp = temp + x[k]*x[k-m-1].conjugate()

r[m] =  temp.conjugate()
theta = x[0] * c[m]
#print(('theta', theta))
#    print(('c=', c[0:2]))
#    print(('d=', d[0:2]))
if m == 0:
pass
else:
for k in range(0, m):
theta = theta + x[m-k] * c[k]                   # Eq. (8.C.39)
r[k] = r[k] - x[N-m-1] * x[N-m+k].conjugate() # Eq. (8.C.32)
temp = temp + af[m-k-1] * r[k].conjugate()
#print 'loop1 k=', k
#print '         theta=',theta, 'r[k]=',r[k], 'temp=', temp
#print '         c=',c[k], 'af=',af[m-k-1]
"""if m > 0:
if debug:
print((m, N-m))
print(('Xk=0',x[m-0],x[N-m-1], x[N-m+0]))
if m > 1:
if debug:
print('Xk=1',x[m-1],x[N-m-1], x[N-m+1])
"""
c1 = -temp * r2
c2 = -r1 * temp.conjugate()
c3 = theta * r3
c4 = r4 *theta.conjugate()
#if debug:
#    print('c1 c2 c3 c4 before af=',c1 ,c2 ,c3 ,c4)
af[m] = c1                    #             ! Eq. (8.C.19)
ab[m] = c2                    #             ! Eq. (8.C.22)
save = c[m]
c[m] = save + c3*d[m]
d[m] = d[m] + c4*save
#if debug:
#    print('res',m,'af[m]=',af[m], ab[m], save, 'temp=',temp)

if m == 0:
pass
else:
#if debug:print('af before', af[0:2])
for k in range(0, m):
save = af[k]
af[k] = save + c1 * ab[m-k-1] # Eq. (8.C.18)
ab[m-k-1] = ab[m-k-1] + c2 * save   # Eq. (8.C.21)

save = c[k]
c[k] = save + c3*d[k]       # Eq. (8.C.37)
d[k] = d[k] + c4*save       # Eq. (8.C.38)
#if debug:
#    print('loop2 k=', k)
#    print('      af[k]=', af[k])
#    print('      ab[m-k-1]=', ab[m-k-1])
#    print('      c[k]=', c[k])
#    print('      d[k]=', d[k])
#if debug:
#    print('af after=', af[0:2])
#    print('ab=', ab[0:2])

r5 = temp.real**2 + temp.imag**2
pf = pf - r5*r2         # Eq. (8.C.20)
pb = pb - r5*r1         # Eq. (8.C.23)
r5 = theta.real**2 + theta.imag**2
delta = delta - r5*r4               # Eq. (8.C.39)
gamma = gamma - r5*r3               # Eq. (8.C.40)
#if debug:
#    print('r5r2r1deltagamma', r5, r2, r1 , delta, gamma)
#    print('pf before norm', pf, pb, N-m-1)
if m != order-1:
pass
else:
pf = pf / float(N-m-1)
pb = pb / float(N-m-1)
#if debug:
#    print('ENDING', N-m-1)
break

#if debug:
#    print('pf and pb', pf, pb)
if pf > 0 and pb > 0:
pass
else:
ValueError("Negative PF or PB value")
if (delta > 0. and delta <=1 and gamma > 0. and gamma <=1):
pass
else:
ValueError("Invalid delta or gamma value")

#C   Time update:  AF and AB vectors; order update:  C and D vectors

r1 = 1./pf
r2 = 1./pb
r3 = 1./delta
r4 = 1./gamma
#if debug:
#    print('--------time update', r1, r2, r3, r4, m+1, N-m-1, x[m+1], x[N-m-2])
ef = x[m+1]
eb = x[(N-1)-m-1]

for k in range(0,m+1):
#print 'k=', k, 'ef=', ef, ' eb=',eb,' af=',af[k], ab[k]
#print x[m-k],x[N-m+k-1]
ef = ef + af[k] * x[m-k]             # Eq. (8.C.1)
eb = eb + ab[k] * x[N-m+k-1]                   # Eq. (8.C.2)

#ef = sum(af)

#if debug:
#    print('efweb', ef , eb)
c1 = ef*r3
c2 = eb*r4
c3 = eb.conjugate() * r2
c4 = ef.conjugate() * r1
#if debug:
#    print('c1c2c3c4', c1, c2, c3, c4)
#    print('af before', af[0:2])
for k in range(m, -1, -1):
save = af[k]
af[k] = save + c1 * d[k]                    #  Eq. (8.C.33)
d[k+1] = d[k] + c4 * save                    # Eq. (8.C.25)
save = ab[k]
ab[k] = save + c2 * c[m-k]                 # Eq. (8.C.35)
c[m-k] = c[m-k] + c3 * save              # Eq. (8.C.24)
#if debug:
#    print('af after', af[0:2])
#    print('d', d[0:2])
#    print('ab', ab[0:2])
#    print('c', c[0:2])
#if debug:print('Pb before', pf, pb)
c[m+1] = c3
d[0] = c4
#r5 = abs(ef)**2
r5 = ef.real**2 + ef.imag**2
pf = pf - r5 * r3                              # Eq. (8.C.34)

delta = delta-r5 * r1                        # Eq. (8.C.30)
#r5 = abs(eb)**2
r5 = eb.real**2 + eb.imag**2
pb = pb - r5 * r4                              # Eq. (8.C.36)
#if debug:
#    print('Pb---------------------', m, pb, r5, r4)
gamma = gamma-r5*r2                        # Eq. (8.C.31)
pbv.append(pb)

if (pf > 0. and pb > 0.):
pass
else:
ValueError("Negative PF or PB value")
#if debug:
#    print(delta, gamma)
if (delta > 0. and delta <= 1.) and (gamma > 0. and gamma <= 1.):
pass
else:
ValueError("Invalid delta or gamma value")

#af=array of forward coeff
#ab=array of barward coeff
#pb=backward variance
#pf=forward variance
return af, pf, ab, pb, pbv

[docs]def arcovar(x, order):
r"""Simple and fast implementation of the covariance AR estimate

This code is 10 times faster than :func:arcovar_marple and more importantly
only 10 lines of code, compared to a 200 loc for :func:arcovar_marple

:param array X:  Array of complex data samples
:param int oder: Order of linear prediction model

:return:
* a - Array of complex forward linear prediction coefficients
* e - error

The covariance method fits a Pth order autoregressive (AR) model to the
input signal, which is assumed to be the output of
an AR system driven by white noise. This method minimizes the forward
prediction error in the least-squares sense. The output vector
contains the normalized estimate of the AR system parameters

The white noise input variance estimate is also returned.

If is the power spectral density of y(n), then:

.. math:: \frac{e}{\left| A(e^{jw}) \right|^2} = \frac{e}{\left| 1+\sum_{k-1}^P a(k)e^{-jwk}\right|^2}

Because the method characterizes the input data using an all-pole model,
the correct choice of the model order p is important.

.. plot::
:width: 80%
:include-source:

from spectrum import arcovar, marple_data, arma2psd
from pylab import plot, log10, linspace, axis

ar_values, error = arcovar(marple_data, 15)
psd = arma2psd(ar_values, sides='centerdc')
plot(linspace(-0.5, 0.5, len(psd)), 10*log10(psd/max(psd)))
axis([-0.5, 0.5, -60, 0])

.. seealso:: :class:pcovar

:validation: the AR parameters are the same as those returned by
a completely different function :func:arcovar_marple.

:References: [Mathworks]_
"""

from spectrum import corrmtx
import scipy.linalg

X = corrmtx(x, order, 'covariance')
Xc = np.array(X[:, 1:])
X1 = np.array(X[:, 0])

# Coefficients estimated via the covariance method
# Here we use lstsq rathre than solve function because Xc is not square
# matrix

a, _residues, _rank, _singular_values = scipy.linalg.lstsq(-Xc, X1)

# Estimate the input white noise variance
Cz = np.dot(X1.conj().transpose(), Xc)
e = np.dot(X1.conj().transpose(), X1) + np.dot(Cz, a)
assert e.imag < 1e-4, 'wierd behaviour'
e = float(e.real) # ignore imag part that should be small

return a, e

[docs]class pcovar(ParametricSpectrum):
"""Class to create PSD based on covariance algorithm

See :func:arcovar for description.

.. plot::
:width: 80%
:include-source:

from spectrum import pcovar, marple_data
p = pcovar(marple_data, 15, NFFT=4096)
p.plot(sides='centerdc')

.. seealso:: :class:arcovar
"""
def __init__(self, data, order, NFFT=None, sampling=1.,
scale_by_freq=False):
"""**Constructor**

For a detailled description of the parameters, see :func:arcovar.

:param array data: input data (list or numpy.array)
:param int order:
:param int NFFT: total length of the final data sets (padded
with zero if needed; default is 4096)
:param float sampling: sampling frequency of the input :attr:data.

"""
super(pcovar, self).__init__(data, ar_order=order,
NFFT=NFFT, sampling=sampling,
scale_by_freq=scale_by_freq)

def __call__(self):
from spectrum import arma2psd
ar, _e = arcovar(self.data, self.ar_order)
self.ar = ar
psd = arma2psd(A=ar, T=self.sampling, NFFT=self.NFFT)

if self.datatype == 'real':
if self.NFFT % 2 == 0:
newpsd = psd[0:int(self.NFFT/2+1)] * 2
else:
newpsd = psd[0:int((self.NFFT+1)/2)] * 2
self.psd = newpsd
else:
self.psd = psd
if self.scale_by_freq is True:
self.scale()
return self

def _str_title(self):
return "Covariance PSD estimate\n"

def __str__(self):
return super(pcovar, self).__str__()