Source code for spectrum.modcovar

"""Module related to the modcovar methods


import numpy as np
from .psd import ParametricSpectrum

__all__ = ['modcovar', 'modcovar_marple', 'pmodcovar']

[docs]def modcovar_marple (X,IP): """Fast algorithm for the solution of the modified covariance least squares normal equations. This implementation is based on [Marple]_. This code is far more complicated and slower than :func:`modcovar` function, which is now the official version. See :func:`modcovar` for a detailed description of Modified Covariance method. :param X: - Array of complex data samples X(1) through X(N) :param int IP: - Order of linear prediction model (integer) :return: * P - Real linear prediction variance at order IP * A - Array of complex linear prediction coefficients * ISTAT - Integer status indicator at time of exit 0. for normal exit (no numerical ill-conditioning) 1. if P is not a positive value 2. if DELTA' and GAMMA' do not lie in the range 0 to 1 3. if P' is not a positive value 4. if DELTA and GAMMA do not lie in the range 0 to 1 :validation: the AR parameters are the same as those returned by a completely different function :func:`modcovar`. .. note:: validation. results similar to test example in Marple but starts to differ for ip~8. with ratio of 0.975 for ip=15 probably due to precision. :References: [Marple]_ """ Pv = [] N = len(X) A = np.zeros(N, dtype=complex) D = np.zeros(N, dtype=complex) C = np.zeros(N, dtype=complex) R = np.zeros(N, dtype=complex) # Initialization R1=0. for K in range(1, N-1): R1=R1 + 2.*(X[K].real**2 + X[K].imag**2) R2 = X[0].real**2 + X[0].imag**2 R3 = X[N-1].real**2 + X[N-1].imag**2 R4 = 1. / (R1 + 2. * (R2 + R3)) P = R1 + R2 + R3 DELTA = 1. - R2 * R4 GAMMA = 1. - R3 * R4 LAMBDA = (X[0] * X[N-1]).conjugate()*R4 C[0] = X[N-1] * R4 D[0] = X[0].conjugate() * R4 M = 0 if (IP ==0): P = (.5*R1+R2+R3)/float(N) return [], P, [] #C Main loop for M in range(0, IP): #print '---------------------', M SAVE1 = 0+0j for K in range(M+1, N): SAVE1 = SAVE1 + X[K]*X[K-M-1].conjugate() SAVE1 *= 2. R[M] = SAVE1.conjugate() THETA = X[N-1]*D[0] PSI=X[N-1]*C[0] XI = X[0].conjugate() * D[0] if M==0: pass else: for K in range(0, M): THETA=THETA+X[N-K-2]*D[K+1] # Eq. (8.D.45) PSI = PSI + X[N-K-2]*C[K+1] # Eq. (8.D.45) XI = XI + X[K+1].conjugate() * D[K+1] # Eq. (8.D.45) R[K] = R[K]-X[N-M-1] * X[N+1-M+K-1].conjugate() - X[M].conjugate() * X[M-K-1] # Eq. (8.D.37) SAVE1=SAVE1+R[K].conjugate()*A[M-K-1] # Eq. (8.D.24) #print 'withini loop', K, THETA, PSI, XI, R[K], SAVE1 #print 'x -------', C[K], C[K+1] #print 'x -------', D[K], D[K+1] #print 'x -------', N-K-2, K+1,N-M-1, M, M-K-1 #print 'x -------', X[N-K-2], X[K+1],X[N-M-1], X[M], X[M-K-1] # Order update of A vector C1 = -SAVE1/P A[M]=C1 # Eq. (8.D.23) P=P*(1.-C1.real**2-C1.imag**2) # Eq. (8.D.25) #print 'C1=' , C1, 'A[M]', A[M], 'P=',P if M==0: pass else: #for k in range(0, (M-1)/2+1): #print 'BEFORE', A[0:4] for K in range(0, (M+1)//2): MK = M-K-1 SAVE1=A[K] A[K]=SAVE1+C1*A[MK].conjugate() # Eq. (8.D.22) #print 'A uopdate---->',M, K, MK, A[K], A[MK], SAVE1, C1 if (K != MK): #print 'ENTERI IF', K, MK, A[MK], C1, SAVE1 A[MK]=A[MK]+C1*(SAVE1.conjugate()) # Eq. (8.D.22) #print 'AFTER', A[0:4] #print M+1, IP if M+1 == IP: #Pv.append(P) P=.5*P/float(N-M-1) Pv.append(P) return A, P, Pv else: Pv.append(.5*P/float(N-M-1)) #Pv.append(P) # Time update of C,D vectors and GAMMA,DELTA,LAMBDA scalars R1=1./(DELTA*GAMMA-(LAMBDA.real)**2-(LAMBDA.imag)**2) C1=(THETA*(LAMBDA.conjugate())+PSI*DELTA)*R1 C2=(PSI*LAMBDA+THETA*GAMMA)*R1 C3=(XI*(LAMBDA.conjugate())+THETA*DELTA)*R1 C4=(THETA*LAMBDA+XI*GAMMA)*R1 #print 'C1C2C3C4', C1, C2, C3, C4 #print M, M/2+1, (M-1)/2+1 for K in range(0, (M)//2+1): MK=M-K SAVE1=C[K].conjugate() SAVE2=D[K].conjugate() SAVE3=C[MK].conjugate() SAVE4=D[MK].conjugate() C[K]=C[K]+C1*SAVE3+C2*SAVE4 # Eq. (8.D.43) D[K]=D[K]+C3*SAVE3+C4*SAVE4 # Eq. (8.D.44) #print '-------->', K,MK, SAVE1, SAVE2, SAVE3, SAVE4 if K != MK: C[MK]=C[MK]+C1*SAVE1+C2*SAVE2 # Eq. (8.D.43) D[MK]=D[MK]+C3*SAVE1+C4*SAVE2 # Eq. (8.D.44) #print 'witninloop loop ',K,MK,C[MK], D[MK] #print 'after loop' #print 'C=',C[0:2] #print 'D=',D[0:2] R2=PSI.real**2+PSI.imag**2 R3=THETA.real**2+THETA.imag**2 R4=XI.real**2+XI.imag**2 R5=GAMMA-(R2*DELTA+R3*GAMMA+2.*np.real(PSI*LAMBDA*THETA.conjugate()))*R1 R2=DELTA-(R3*DELTA+R4*GAMMA+2.*np.real(THETA*LAMBDA*XI.conjugate()))*R1 GAMMA=R5 # Eq. (8.D.46) DELTA=R2 # Eq. (8.D.47) LAMBDA=LAMBDA+C3*PSI.conjugate()+C4*THETA.conjugate() # Eq. (8.D.48) #print 'before time updaqt',R2, R3, R4, R5, LAMBDA if P <= 0.: raise ValueError('Found an invalid negative value for P.') if (DELTA > 0. and DELTA <= 1. and GAMMA > 0. and GAMMA <=1.): pass else: raise ValueError('Found an invalid DELTA or GAMMA value.') # Time update of A vector; order updates of C,D vectors aand GAMMA, # DELTA,LAMBDA scalars #print 'time update--------------' R1=1./P R2=1./(DELTA*GAMMA-LAMBDA.real**2-LAMBDA.imag**2) # Eq. (8.D.41) EF=X[M+1] EB=X[N-M-2] #print 'efeb', EF, EB #print 'A=',A[0:2] for K in range(0, M+1): #print 'efeb lloop', K,A[K], X[M-K], X[N-M+K-1] EF=EF+A[K]*X[M-K] # Eq. (8.D.1) EB=EB+A[K].conjugate()*X[N-M+K-1] # Eq. (8.D.2) C1=EB*R1 # Eq. (8.D.28) C2=EF.conjugate()*R1 # Eq. (8.D.29) C3=(EB.conjugate()*DELTA+EF*LAMBDA)*R2 C4=(EF*GAMMA+(EB*LAMBDA).conjugate())*R2 #print 'c1c2c3c4', C1 ,C2, C3, C4 #print 'efeb',EF, EB for K in range(M, -1, -1): SAVE1=A[K] A[K]=SAVE1+C3*C[K]+C4*D[K] # Eq. (8.D.38) C[K+1]=C[K]+C1*SAVE1 # Eq. (8.D.26) D[K+1]=D[K]+C2*SAVE1 # Eq. (8.D.27) #print 'ACD',K, A[K], C[K+1], D[K+1] C[0]=C1 D[0]=C2 R3=EB.real**2+EB.imag**2 R4=EF.real**2+EF.imag**2 P=P-(R3*DELTA+R4*GAMMA+2.*np.real(EF*EB*LAMBDA))*R2 # Eq. (8.D.42) DELTA=DELTA-R4*R1 # Eq. (8.D.32) GAMMA=GAMMA-R3*R1 # Eq. (8.D.33) LAMBDA=LAMBDA+(EF*EB).conjugate()*R1 # Eq. (8.D.35) #print 'final' #print R3, R4, P, DELTA, GAMMA, LAMBDA #print 'Afinal=',A[0:2] #print 'P=',P if (P > 0.): pass else: raise ValueError("Found a invalid negative P value ") if (DELTA > 0. and DELTA <= 1. and GAMMA > 0. and GAMMA <= 1.): pass else: raise ValueError("Found an invalid negative GAMMA or DELTA value ")
[docs]def modcovar(x, order): """Simple and fast implementation of the covariance AR estimate This code is 10 times faster than :func:`modcovar_marple` and more importantly only 10 lines of code, compared to a 200 loc for :func:`modcovar_marple` :param X: Array of complex data samples :param int order: Order of linear prediction model :return: * P - Real linear prediction variance at order IP * A - Array of complex linear prediction coefficients .. plot:: :include-source: :width: 80% from spectrum import modcovar, marple_data, arma2psd, cshift from pylab import log10, linspace, axis, plot a, p = modcovar(marple_data, 15) PSD = arma2psd(a) PSD = cshift(PSD, len(PSD)/2) # switch positive and negative freq plot(linspace(-0.5, 0.5, 4096), 10*log10(PSD/max(PSD))) axis([-0.5,0.5,-60,0]) .. seealso:: :class:`~spectrum.modcovar.pmodcovar` :validation: the AR parameters are the same as those returned by a completely different function :func:`modcovar_marple`. :References: Mathworks """ from spectrum import corrmtx import scipy.linalg X = corrmtx(x, order, 'modified') Xc = np.array(X[:,1:]) X1 = np.array(X[:,0]) # Coefficients estimated via the covariance method # Here we use lstsq rathre than solve function because Xc is not square matrix a, residues, rank, singular_values = scipy.linalg.lstsq(-Xc, X1) # Estimate the input white noise variance Cz =, Xc) e =, X1) +, a) assert e.imag < 1e-4, 'wierd behaviour' e = float(e.real) # ignore imag part that should be small return a, e
[docs]class pmodcovar(ParametricSpectrum): """Class to create PSD based on modified covariance algorithm See :func:`~spectrum.modcovar.modcovar` for description. .. rubric:: Examples .. plot:: :width: 80% :include-source: from spectrum import pmodcovar, marple_data p = pmodcovar(marple_data, 15, NFFT=4096) p.plot(sides='centerdc') .. seealso:: :class:`modcovar` """ def __init__(self, data, order, NFFT=None, sampling=1., scale_by_freq=False): """**Constructor** For a detailled description of the parameters, see :func:`modcovar`. :param array data: input data (list or numpy.array) :param int order: :param int NFFT: total length of the final data sets (padded with zero if needed; default is 4096) :param float sampling: sampling frequency of the input :attr:`data`. """ super(pmodcovar, self).__init__(data, ar_order=order, NFFT=NFFT, sampling=sampling, scale_by_freq=scale_by_freq) def __call__(self): from spectrum import arma2psd ar, e = modcovar(, self.ar_order) = ar psd = arma2psd(A=ar, T=self.sampling, NFFT=self.NFFT) if self.datatype == 'real': if self.NFFT % 2 == 0: newpsd = psd[0:int(self.NFFT/2+1)] * 2 else: newpsd = psd[0:int((self.NFFT+1)/2)] * 2 self.psd = newpsd else: self.psd = psd if self.scale_by_freq is True: self.scale() return self def _str_title(self): return "Modified covariance PSD estimate\n" def __str__(self): return super(pmodcovar, self).__str__()