From ea718fa94a9409b0255015ae2627fcf605ee5ea1 Mon Sep 17 00:00:00 2001 From: colganwi Date: Mon, 20 Jan 2025 16:55:58 -0500 Subject: [PATCH] added autocorr --- docs/api.md | 1 + src/pycea/tl/__init__.py | 1 + src/pycea/tl/autocorr.py | 127 +++++++++++++++++++++++++++++++++++++++ tests/test_autocorr.py | 46 ++++++++++++++ 4 files changed, 175 insertions(+) create mode 100755 src/pycea/tl/autocorr.py create mode 100755 tests/test_autocorr.py diff --git a/docs/api.md b/docs/api.md index 82c181f..6629d71 100644 --- a/docs/api.md +++ b/docs/api.md @@ -22,6 +22,7 @@ :toctree: generated tl.ancestral_states + tl.autocorr tl.clades tl.compare_distance tl.distance diff --git a/src/pycea/tl/__init__.py b/src/pycea/tl/__init__.py index 7c338f2..98499f5 100644 --- a/src/pycea/tl/__init__.py +++ b/src/pycea/tl/__init__.py @@ -5,3 +5,4 @@ from .sort import sort from .tree_distance import tree_distance from .tree_neighbors import tree_neighbors +from .autocorr import autocorr diff --git a/src/pycea/tl/autocorr.py b/src/pycea/tl/autocorr.py new file mode 100755 index 0000000..a59b542 --- /dev/null +++ b/src/pycea/tl/autocorr.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +from collections.abc import Sequence + +import numpy as np +import pandas as pd +import scanpy as sc +import scipy as sp +import treedata as td + +from pycea.utils import get_keyed_obs_data + + +def _g_moments(w: sp.sparse.spmatrix | np.ndarray) -> tuple[float, float, float]: + """ + Compute moments of adjacency matrix for analytic p-value calculation. + + See `pysal `_ implementation. + """ + # s0 + s0 = w.sum() + # s1 + t = w.transpose() + w + t2 = t.multiply(t) if isinstance(t,sp.sparse.spmatrix) else t * t + s1 = t2.sum() / 2.0 + # s2 + s2 = (np.array(w.sum(1) + w.sum(0).transpose()) ** 2).sum() + return s0, s1, s2 + + +def _analytic_pval(score: float, g: sp.sparse.spmatrix | np.ndarray, method: str) -> tuple[float, float]: + """ + Analytic p-value computation. + + See `Moran's I `_ and + `Geary's C `_ implementation. + """ + s0, s1, s2 = _g_moments(g) + n = g.shape[0] + s02 = s0 * s0 + n2 = n * n + v_num = n2 * s1 - n * s2 + 3 * s02 + v_den = (n - 1) * (n + 1) * s02 + + Vscore_norm = v_num / v_den - (1.0 / (n - 1)) ** 2 + seScore_norm = Vscore_norm ** (1 / 2.0) + if method == "moran": + expected = -1.0 / (g.shape[0] - 1) + elif method == "geary": + expected = 1.0 + + z_norm = (score - expected) / seScore_norm + p_norm = np.empty(score.shape) + p_norm[z_norm > 0] = 1 - sp.stats.norm.cdf(z_norm[z_norm > 0]) + p_norm[z_norm <= 0] = sp.stats.norm.cdf(z_norm[z_norm <= 0]) + + return p_norm, Vscore_norm + + +def autocorr( + tdata: td.TreeData, + keys: str | Sequence[str] = None, + connect_key: str = None, + method: str = "moran", + layer: str = None, + copy: bool = False, +) -> None | pd.DataFrame: + """Calculate autocorrelation statistic. + + Parameters + ---------- + tdata + TreeData object. + keys + One or more `obs_keys`, `var_names`, `obsm_keys`, or `obsp_keys` to calculate autocorrelation for. Defaults to all 'var_names'. + connect_key + `tdata.obsp` connectivity key specifying set of neighbors for each observation. + method + Method to calculate autocorrelation. Options are: + + * 'moran' : `Moran's I autocorrelation `. + * 'geary' : `Geary's C autocorrelation `. + layer + Name of the TreeData object layer to use. If `None`, `tdata.X` is used. + copy + If True, returns a :class:`DataFrame ` with autocorrelation. + + Returns + ------- + Returns `None` if `copy=False`, else return :class:`DataFrame ` columns. + + - `'autocorr'` - Moran's I or Geary's C statistic. + - `'pval_norm'` - p-value under normality assumption. + - `'var_norm'` - variance of `'score'` under normality assumption. + + Sets the following fields for each key: + + * `tdata.uns['moranI']` : Above DataFrame for if method is `'moran'`. + * `tdata.uns['gearyC']` : Above DataFrame for if method is `'geary'`. + """ + # Setup + if keys is None: + keys = tdata.var_names + if layer is None: + data = tdata.X + else: + data = tdata.layers[layer] + else: + if isinstance(keys, str): + keys = [keys] + data, _ = get_keyed_obs_data(tdata, keys, layer=layer) + method_names = {"moran": "moranI", "geary": "gearyC"} + # Calculate autocorrelation + if method == "moran": + corr = sc.metrics.morans_i(tdata.obsp[connect_key],data.T) + tdata.uns["moranI"] = corr + elif method == "geary": + corr = sc.metrics.gearys_c(tdata.obsp[connect_key],data.T) + tdata.uns["gearyC"] = corr + else: + raise ValueError(f"Method {method} not recognized. Must be 'moran' or 'geary'.") + # Calculate p-value + p_norm, var_norm = _analytic_pval(corr, tdata.obsp[connect_key], method) + corr = pd.DataFrame({"autocorr": corr, "pval_norm": p_norm, "var_norm": var_norm}, index=keys) + tdata.uns[method_names[method]] = corr + if copy: + return corr diff --git a/tests/test_autocorr.py b/tests/test_autocorr.py new file mode 100755 index 0000000..9d4d0fe --- /dev/null +++ b/tests/test_autocorr.py @@ -0,0 +1,46 @@ +import numpy as np +import pandas as pd +import pytest +import scipy as sp +import treedata as td + +from pycea.tl.autocorr import autocorr + + +@pytest.fixture +def tdata(): + tdata = td.TreeData( + obs=pd.DataFrame({"value": [1, 1, 0]}, index=["A", "B", "C"]), + var=pd.DataFrame(index=["1", "2", "3"]), + X = np.array([[1, 0, 1], [1, 1, 2], [0, 1, 5]]), + obsp={"connectivities": sp.sparse.csr_matrix(([1, 1, 1, 1, 1], + ([0, 0, 1, 1, 2], [0, 1, 0, 1, 2])), shape=(3, 3))}, + ) + yield tdata + + +def test_moran(tdata): + autocorr(tdata, keys="value", connect_key="connectivities", method="moran") + assert "moranI" in tdata.uns.keys() + assert tdata.uns["moranI"]["autocorr"].values[0] == pytest.approx(0.8) + result = autocorr(tdata, keys=["1","2"], connect_key="connectivities", method="moran", copy=True) + assert tdata.uns["moranI"].shape == (2,3) + assert list(result.index.values) == ["1","2"] + assert result.autocorr.values == pytest.approx([0.8,0.2]) + assert result.pval_norm.values == pytest.approx([1.84e-12,9.14e-5], rel=1e-2) + + +def test_geary(tdata): + autocorr(tdata, keys="value", connect_key="connectivities", method="geary") + assert "gearyC" in tdata.uns.keys() + assert tdata.uns["gearyC"]["autocorr"].values[0] == pytest.approx(0) + result = autocorr(tdata, connect_key="connectivities", method="geary", copy=True) + print(result) + assert tdata.uns["gearyC"].shape == (3,3) + assert list(result.index.values) == ["1","2","3"] + assert result.autocorr.values == pytest.approx([0.0,0.6,0.04615], rel=1e-2) + assert result.pval_norm.values == pytest.approx([4.51e-8,1.62e-2,1.71e-7], rel=1e-2) + + +if __name__ == "__main__": + pytest.main(["-v", __file__])