"""BURG method of AR model estimate
.. topic:: This module provides BURG method and BURG PSD estimate
.. autosummary::
arburg
pburg
.. codeauthor:: Thomas Cokelaer 2011
"""
#TODO: convert arburg into arburg2 to get a nicer and faster algorithm.
import logging
import numpy as np
from spectrum.psd import ParametricSpectrum
__all__ = ["arburg", 'pburg']
def _arburg2(X, order):
"""This version is 10 times faster than arburg, but the output rho is not correct.
returns [1 a0,a1, an-1]
"""
x = np.array(X)
N = len(x)
if order <= 0.:
raise ValueError("order must be > 0")
# Initialisation
# ------ rho, den
rho = sum(abs(x)**2.) / N # Eq 8.21 [Marple]_
den = rho * 2. * N
# ------ backward and forward errors
ef = np.zeros(N, dtype=complex)
eb = np.zeros(N, dtype=complex)
for j in range(0, N): #eq 8.11
ef[j] = x[j]
eb[j] = x[j]
# AR order to be stored
a = np.zeros(1, dtype=complex)
a[0] = 1
# ---- rflection coeff to be stored
ref = np.zeros(order, dtype=complex)
temp = 1.
E = np.zeros(order+1)
E[0] = rho
for m in range(0, order):
#print m
# Calculate the next order reflection (parcor) coefficient
efp = ef[1:]
ebp = eb[0:-1]
#print efp, ebp
num = -2.* np.dot(ebp.conj().transpose(), efp)
den = np.dot(efp.conj().transpose(), efp)
den += np.dot(ebp, ebp.conj().transpose())
ref[m] = num / den
# Update the forward and backward prediction errors
ef = efp + ref[m] * ebp
eb = ebp + ref[m].conj().transpose() * efp
# Update the AR coeff.
a.resize(len(a)+1, refcheck=False)
a = a + ref[m] * np.flipud(a).conjugate()
# Update the prediction error
E[m+1] = (1 - ref[m].conj().transpose()*ref[m]) * E[m]
#print 'REF', ref, num, den
return a, E[-1], ref
[docs]class pburg(ParametricSpectrum):
"""Class to create PSD based on Burg algorithm
See :func:`arburg` for description.
.. plot::
:width: 80%
:include-source:
from spectrum import *
p = pburg(marple_data, 15, NFFT=4096)
p.plot(sides='centerdc')
Another example based on a real data set is shown here below. Note here
that we set the scale_by_freq value to False and True. False should give
results equivalent to octave or matlab convention while setting to True
implies that the data is multiplied by :math:`2\pi df` where
:math:`df = sampling / N`.
.. plot::
:width: 80%
:include-source:
from spectrum import data_two_freqs, pburg
p = pburg(data_two_freqs(), 7, NFFT=4096)
p.plot()
p = pburg(data_two_freqs(), 7, NFFT=4096, scale_by_freq=True)
p.plot()
from pylab import legend
legend(["un-scaled", "scaled by 2 pi df"])
"""
def __init__(self, data, order, criteria=None, NFFT=None, sampling=1.,
scale_by_freq=False):
"""**Constructor**
For a detailled description of the parameters, see :func:`burg`.
:param array data: input data (list or np.array)
:param int order:
:param str criteria:
:param int NFFT: total length of the final data sets (padded with zero if
needed; default is 4096)
:param float sampling: sampling frequency of the input :attr:`data`.
"""
super(pburg, self).__init__(data, ar_order=order,
sampling=sampling, NFFT=NFFT,
scale_by_freq=scale_by_freq)
self.criteria = criteria
def __call__(self):
from .arma import arma2psd
ar, rho, ref = arburg(self.data, self.ar_order, self.criteria)
self.ar = ar
self.rho = rho
self.reflection = ref
psd = arma2psd(A=self.ar, B=self.ma, rho=self.rho,
T=self.sampling, NFFT=self.NFFT)
if self.datatype == 'real':
# see doc/concepts.rst for details
if self.NFFT % 2 == 0:
newpsd = psd[0:int(self.NFFT/2 +1)] * 2
else:
newpsd = psd[0:int((self.NFFT+1)/2)] * 2
self.psd = newpsd
else:
self.psd = psd
self.scale()
return self
def _str_title(self):
return "Periodogram PSD estimate\n"
def __str__(self):
return super(pburg, self).__str__()
[docs]def arburg(X, order, criteria=None):
r"""Estimate the complex autoregressive parameters by the Burg algorithm.
.. math:: x(n) = \sqrt{(v}) e(n) + \sum_{k=1}^{P+1} a(k) x(n-k)
:param x: Array of complex data samples (length N)
:param order: Order of autoregressive process (0<order<N)
:param criteria: select a criteria to automatically select the order
:return:
* A Array of complex autoregressive parameters A(1) to A(order). First
value (unity) is not included !!
* P Real variable representing driving noise variance (mean square
of residual noise) from the whitening operation of the Burg
filter.
* reflection coefficients defining the filter of the model.
.. plot::
:width: 80%
:include-source:
from pylab import plot, log10, linspace, axis
from spectrum import *
AR, P, k = arburg(marple_data, 15)
PSD = arma2psd(AR, sides='centerdc')
plot(linspace(-0.5, 0.5, len(PSD)), 10*log10(PSD/max(PSD)))
axis([-0.5,0.5,-60,0])
.. note::
1. no detrend. Should remove the mean trend to get PSD. Be careful if
presence of large mean.
2. If you don't know what the order value should be, choose the
criterion='AKICc', which has the least bias and best
resolution of model-selection criteria.
.. note:: real and complex results double-checked versus octave using
complex 64 samples stored in marple_data. It does not agree with Marple
fortran routine but this is due to the simplex precision of complex
data in fortran.
:reference: [Marple]_ [octave]_
"""
if order <= 0.:
raise ValueError("order must be > 0")
if order > len(X):
raise ValueError("order must be less than length input - 2")
x = np.array(X)
N = len(x)
# Initialisation
# ------ rho, den
rho = sum(abs(x)**2.) / float(N) # Eq 8.21 [Marple]_
den = rho * 2. * N
# ---- criteria
if criteria:
from spectrum import Criteria
crit = Criteria(name=criteria, N=N)
crit.data = rho
logging.debug('Step {}. old criteria={} new one={}. rho={}'.format(
0, crit.old_data, crit.data, rho))
#p =0
a = np.zeros(0, dtype=complex)
ref = np.zeros(0, dtype=complex)
ef = x.astype(complex)
eb = x.astype(complex)
temp = 1.
# Main recursion
for k in range(0, order):
# calculate the next order reflection coefficient Eq 8.14 Marple
num = sum([ef[j]*eb[j-1].conjugate() for j in range(k+1, N)])
den = temp * den - abs(ef[k])**2 - abs(eb[N-1])**2
kp = -2. * num / den #eq 8.14
temp = 1. - abs(kp)**2.
new_rho = temp * rho
if criteria:
logging.debug('Step {}. old criteria={} new one={}. rho={}'.format(
k+1, crit.old_data, crit.data, new_rho))
#k+1 because order goes from 1 to P whereas k starts at 0.
status = crit(rho=temp*rho, k=k+1)
if status is False:
logging.debug('Stop criteria reached %s %s ' % (crit.data, crit.old_data))
break
# this should be after the criteria
rho = new_rho
if rho <= 0:
raise ValueError("Found a negative value (expected positive strictly) %s. Decrease the order" % rho)
a.resize(a.size+1,refcheck=False)
a[k] = kp
if k == 0:
for j in range(N-1, k, -1):
save2 = ef[j]
ef[j] = save2 + kp * eb[j-1] # Eq. (8.7)
eb[j] = eb[j-1] + kp.conjugate() * save2
else:
# update the AR coeff
khalf = (k+1)//2 # FIXME here khalf must be an integer
for j in range(0, khalf):
ap = a[j] # previous value
a[j] = ap + kp * a[k-j-1].conjugate() # Eq. (8.2)
if j != k-j-1:
a[k-j-1] = a[k-j-1] + kp * ap.conjugate() # Eq. (8.2)
# update the prediction error
for j in range(N-1, k, -1):
save2 = ef[j]
ef[j] = save2 + kp * eb[j-1] # Eq. (8.7)
eb[j] = eb[j-1] + kp.conjugate() * save2
# save the reflection coefficient
ref.resize(ref.size+1, refcheck=False)
ref[k] = kp
return a, rho, ref