Source code for dyconnmap.ts.surrogates

# -*- coding: utf-8 -*-
""" Surrogate Analysis



"""
# Author: Avraam Marimpis <avraam.marimpis@gmail.com>

from typing import Optional, Tuple, Callable
import numpy as np
import numbers


[docs]def surrogate_analysis( ts1: "np.ndarray[np.float32]", ts2: "np.ndarray[np.float32]", num_surr: Optional[int] = 1000, estimator_func: Optional[ Callable[["np.ndarray[np.float32]", "np.ndarray[np.float32]"], float] ] = None, ts1_no_surr: bool = False, rng: Optional[np.random.RandomState] = None, ) -> Tuple[float, "np.ndarray[np.int32]", "np.ndarray[np.float32]", float]: """ Surrogate Analysis Parameters ---------- ts1 : ts2 : num_surr : int estimator_func : function ts1_no_surr : boolean rng : object or None An object of type numpy.random.RandomState Returns ------- p_val : float corr_surr : surrogates : r_value : float """ if rng is None: rng = np.random.RandomState(0) if estimator_func is None: def estimator(x, y): return np.abs(np.corrcoef(x, y))[0, 1] estimator_func = estimator r_value = estimator_func(ts1, ts2) if isinstance(r_value, numbers.Real): r_value = [r_value] num_samples = len(ts1) num_r_vals = len(r_value) surrogates = np.zeros([2, num_surr, num_samples]) if ts1_no_surr is True: surrogates[0, ...] = np.tile(ts1, [num_surr, 1]) else: surrogates[0, ...] = aaft(ts1, num_surr, rng) surrogates[1, ...] = aaft(ts2, num_surr, rng) surr_vals = np.zeros((num_surr, len(r_value))) for i in range(num_surr): surr_vals[i, :] = estimator_func(surrogates[0, i, ...], surrogates[1, i, ...]) surr_vals = np.array(surr_vals) p_vals = np.zeros((num_r_vals)) for i in range(num_r_vals): r = np.where(surr_vals[:, i] > r_value[i])[0] p_val = 0.0 if len(r) == 0: p_val = 1.0 / float(num_surr) else: p_val = len(r) / float(num_surr) p_vals[i] = p_val p_vals = p_vals.squeeze() surr_vals = surr_vals.squeeze() return p_vals, surr_vals, surrogates, r_value
[docs]def aaft( ts: "np.ndarray[np.float32]", num_surr: Optional[int] = 1, rng: Optional[np.random.RandomState] = None, ) -> "np.ndarray[np.float32]": """ Amplitude Adjusted Fourier Transform Parameters ---------- ts : num_surr : rng : object or None An object of type numpy.random.RandomState Returns ------- """ if rng is None: rng = np.random.RandomState() n_samples = len(ts) s = np.zeros((num_surr, n_samples)) for i in range(num_surr): y = ts normal = np.sort(rng.randn(1, n_samples)).ravel() y, T = np.sort(ts), np.argsort(ts) T, r = np.sort(T), np.argsort(T) normal = phase_rand(normal[r], 1, rng).ravel() normal, T = np.sort(normal), np.argsort(normal) T, r = np.sort(T), np.argsort(T) s[i, :] = y[r] return s
[docs]def fdr( p_values: "np.ndarray[np.float32]", q: Optional[float] = 0.01, method: Optional[str] = "pdep", ) -> Tuple[bool, float]: """ False Discovery Rate Parameters ---------- p_values : q : method : Returns ------- """ crit_p = 0.0 h = False sorted_p_values = np.sort(p_values) m = len(sorted_p_values) thresh = np.arange(1, m + 1) * (q / m) rej = sorted_p_values <= thresh max_id = np.where(rej == True)[0] if max_id.size == 0: crit_p = 0.0 h = p_values * 0 else: max_id = np.max(max_id) crit_p = sorted_p_values[max_id] crit_p = crit_p.squeeze() h = p_values <= crit_p h = h.squeeze() h = h.astype(np.bool) return h, crit_p
[docs]def phase_rand( data, num_surr: Optional[int] = 1, rng: Optional[np.random.RandomState] = None ) -> "np.ndarray[np.float32]": """ Phase-randomized suggorates Parameters ---------- data : num_surr : rng : object or None An object of type numpy.random.RandomState Returns ------- """ if rng is None: rng = np.random.RandomState() n_samples = np.shape(data)[0] surrogates = np.zeros((num_surr, n_samples)) half = np.int32(np.floor(n_samples / 2.0)) surrogates = np.zeros((num_surr, n_samples)) y = np.fft.fft(data) m = np.abs(y) p = np.angle(y) for i in range(num_surr): if n_samples % 2 == 0: p1 = rng.randn(half - 1, 1) * 2.0 * np.pi a = p1.T.ravel() b = p[half] c = np.flipud(p1).T.ravel() p[list(range(1, n_samples))] = np.hstack((a, b, -c)) a = m[list(range(0, half + 1))] b = np.flipud(m[list(range(1, half))]) m = np.hstack((a, b)) else: p1 = rng.randn(half, 1) * 2.0 * np.pi a = p1 b = np.flipud(p1).ravel() p[list(range(1, n_samples))] = a - b surrogates[i, :] = np.real(np.fft.ifft(np.exp(1j * p) * m)) return surrogates