# Source code for spectrum.correlation

"""
.. topic:: Correlation module

Provides two correlation functions. :func:CORRELATION is slower than
:func:xcorr. However, the output is as expected by some other functions.
Ultimately, it should be replaced by :func:xcorr.

For real data, the behaviour of the 2 functions is identical. However, for
complex data, xcorr returns a 2-sides correlation.

.. autosummary::

~spectrum.correlation.CORRELATION
~spectrum.correlation.xcorr

.. codeauthor: Thomas Cokelaer, 2011

"""
import numpy as np

__all__ = ['CORRELATION', 'xcorr']

def pylab_rms_flat(a):
"""
Return the root mean square of all the elements of *a*, flattened out.
(Copied 1:1 from matplotlib.mlab.)
"""
return np.sqrt(np.mean(np.absolute(a) ** 2))

[docs]def CORRELATION(x, y=None, maxlags=None, norm='unbiased'): r"""Correlation function This function should give the same results as :func:xcorr but it returns the positive lags only. Moreover the algorithm does not use FFT as compared to other algorithms. :param array x: first data array of length N :param array y: second data array of length N. If not specified, computes the autocorrelation. :param int maxlags: compute cross correlation between [0:maxlags] when maxlags is not specified, the range of lags is [0:maxlags]. :param str norm: normalisation in ['biased', 'unbiased', None, 'coeff'] * *biased* correlation=raw/N, * *unbiased* correlation=raw/(N-|lag|) * *coeff* correlation=raw/(rms(x).rms(y))/N * None correlation=raw :return: * a numpy.array correlation sequence, r[1,N] * a float for the zero-lag correlation, r[0] The *unbiased* correlation has the form: .. math:: \hat{r}_{xx} = \frac{1}{N-m}T \sum_{n=0}^{N-m-1} x[n+m]x^*[n] T The *biased* correlation differs by the front factor only: .. math:: \check{r}_{xx} = \frac{1}{N}T \sum_{n=0}^{N-m-1} x[n+m]x^*[n] T with :math:0\leq m\leq N-1. .. doctest:: >>> from spectrum import CORRELATION >>> x = [1,2,3,4,5] >>> res = CORRELATION(x,x, maxlags=0, norm='biased') >>> res[0] 11.0 .. note:: this function should be replaced by :func:xcorr. .. seealso:: :func:xcorr """ assert norm in ['unbiased','biased', 'coeff', None] #transform lag into list if it is an integer x = np.array(x) if y is None: y = x else: y = np.array(y) # N is the max of x and y N = max(len(x), len(y)) if len(x) < N: x = y.copy() x.resize(N, refcheck=False) if len(y) < N: y = y.copy() y.resize(N, refcheck=False) #default lag is N-1 if maxlags is None: maxlags = N - 1 assert maxlags < N, 'lag must be less than len(x)' realdata = np.isrealobj(x) and np.isrealobj(y) #create an autocorrelation array with same length as lag if realdata == True: r = np.zeros(maxlags, dtype=float) else: r = np.zeros(maxlags, dtype=complex) if norm == 'coeff': rmsx = pylab_rms_flat(x) rmsy = pylab_rms_flat(y) for k in range(0, maxlags+1): nk = N - k - 1 if realdata == True: sum = 0 for j in range(0, nk+1): sum = sum + x[j+k] * y[j] else: sum = 0. + 0j for j in range(0, nk+1): sum = sum + x[j+k] * y[j].conjugate() if k == 0: if norm in ['biased', 'unbiased']: r0 = sum/float(N) elif norm is None: r0 = sum else: r0 = 1. else: if norm == 'unbiased': r[k-1] = sum / float(N-k) elif norm == 'biased': r[k-1] = sum / float(N) elif norm is None: r[k-1] = sum elif norm == 'coeff': r[k-1] = sum/(rmsx*rmsy)/float(N) r = np.insert(r, 0, r0) return r
[docs]def xcorr(x, y=None, maxlags=None, norm='biased'): """Cross-correlation using numpy.correlate Estimates the cross-correlation (and autocorrelation) sequence of a random process of length N. By default, there is no normalisation and the output sequence of the cross-correlation has a length 2*N+1. :param array x: first data array of length N :param array y: second data array of length N. If not specified, computes the autocorrelation. :param int maxlags: compute cross correlation between [-maxlags:maxlags] when maxlags is not specified, the range of lags is [-N+1:N-1]. :param str option: normalisation in ['biased', 'unbiased', None, 'coeff'] The true cross-correlation sequence is .. math:: r_{xy}[m] = E(x[n+m].y^*[n]) = E(x[n].y^*[n-m]) However, in practice, only a finite segment of one realization of the infinite-length random process is available. The correlation is estimated using numpy.correlate(x,y,'full'). Normalisation is handled by this function using the following cases: * 'biased': Biased estimate of the cross-correlation function * 'unbiased': Unbiased estimate of the cross-correlation function * 'coeff': Normalizes the sequence so the autocorrelations at zero lag is 1.0. :return: * a numpy.array containing the cross-correlation sequence (length 2*N-1) * lags vector .. note:: If x and y are not the same length, the shorter vector is zero-padded to the length of the longer vector. .. rubric:: Examples .. doctest:: >>> from spectrum import xcorr >>> x = [1,2,3,4,5] >>> c, l = xcorr(x,x, maxlags=0, norm='biased') >>> c array([ 11.]) .. seealso:: :func:CORRELATION. """ N = len(x) if y is None: y = x assert len(x) == len(y), 'x and y must have the same length. Add zeros if needed' if maxlags is None: maxlags = N-1 lags = np.arange(0, 2*N-1) else: assert maxlags <= N, 'maxlags must be less than data length' lags = np.arange(N-maxlags-1, N+maxlags) res = np.correlate(x, y, mode='full') if norm == 'biased': Nf = float(N) res = res[lags] / float(N) # do not use /= !! elif norm == 'unbiased': res = res[lags] / (float(N)-abs(np.arange(-N+1, N)))[lags] elif norm == 'coeff': Nf = float(N) rms = pylab_rms_flat(x) * pylab_rms_flat(y) res = res[lags] / rms / Nf else: res = res[lags] lags = np.arange(-maxlags, maxlags+1) return res, lags