Source code for nuee.permutation.permanova

"""
PERMANOVA (Permutational Multivariate Analysis of Variance).

This module provides a basic implementation of distance-based PERMANOVA
that mirrors the behaviour of ``vegan::adonis2`` for sequential (type I)
tests.  The routine accepts a distance matrix together with one or more
predictor terms supplied as columns in a ``DataFrame`` or array.  The
current implementation focuses on the common use case where terms are
entered in the order they should be evaluated; interactions and strata
are not yet supported.
"""

from __future__ import annotations

import numpy as np
import pandas as pd
from numpy.linalg import qr
from scipy.spatial.distance import squareform
from typing import Dict, List, Optional, Sequence, Tuple, Union


ArrayLike = Union[np.ndarray, pd.DataFrame, pd.Series]


def _as_numpy_distance(matrix: ArrayLike,
                       method: str = "bray") -> np.ndarray:
    if isinstance(matrix, pd.DataFrame):
        matrix = matrix.values
    matrix = np.asarray(matrix, dtype=float)
    if matrix.ndim == 1:
        matrix = squareform(matrix)
    if matrix.ndim != 2:
        raise ValueError("Distance input must be 1-D condensed, 2-D square, or a data matrix.")
    if matrix.shape[0] != matrix.shape[1]:
        from ..dissimilarity.distances import vegdist
        return vegdist(matrix, method=method)
    if matrix.shape[0] < 2:
        raise ValueError("Distance matrix must contain at least two observations.")
    if not np.allclose(matrix, matrix.T, atol=1e-12):
        raise ValueError("Distance matrix must be symmetric.")
    return matrix


def _prepare_terms(design: ArrayLike) -> List[Tuple[str, np.ndarray]]:
    if isinstance(design, pd.Series):
        design = design.to_frame()
    elif not isinstance(design, pd.DataFrame):
        design = pd.DataFrame(design)
    terms: List[Tuple[str, np.ndarray]] = []
    for column in design.columns:
        values = design[column]
        if values.dtype.kind in ("U", "S", "O") or str(values.dtype).startswith("category"):
            dummies = pd.get_dummies(values, drop_first=True).astype(float)
            if dummies.shape[1] == 0:
                continue
            terms.append((str(column), dummies.values))
        else:
            col = np.asarray(values, dtype=float)
            if np.all(np.isfinite(col)):
                terms.append((str(column), col[:, None]))
    return terms


def _orthonormal_basis(X: np.ndarray, tol: float = 1e-10) -> Tuple[np.ndarray, int]:
    if X.size == 0:
        return np.empty((X.shape[0], 0)), 0
    Q, R = qr(X, mode="reduced")
    if Q.size == 0:
        return Q, 0
    diag = np.abs(np.diag(R))
    keep = diag > tol
    Q = Q[:, keep]
    return Q, Q.shape[1]


def _sequential_components(G: np.ndarray,
                           terms: List[Tuple[str, np.ndarray]],
                           tol: float = 1e-10) -> Tuple[List[Dict[str, Union[str, float, int, np.ndarray]]],
                                                         float, int, float]:
    n = G.shape[0]
    total_ss = float(np.trace(G))
    q_prev = np.empty((n, 0))
    term_results: List[Dict[str, Union[str, float, int, np.ndarray]]] = []
    accumulated_ss = 0.0
    accumulated_df = 0

    for name, block in terms:
        block = np.asarray(block, dtype=float)
        if block.ndim == 1:
            block = block[:, None]
        block = block - block.mean(axis=0, keepdims=True)
        if q_prev.size:
            block = block - q_prev @ (q_prev.T @ block)
        q_j, rank_j = _orthonormal_basis(block, tol=tol)
        if rank_j > 0:
            ss_j = float(np.trace(q_j.T @ G @ q_j))
            q_prev = np.hstack([q_prev, q_j]) if q_prev.size else q_j
        else:
            ss_j = 0.0
        term_results.append({"name": name, "ss": ss_j, "df": rank_j, "basis": q_j})
        accumulated_ss += ss_j
        accumulated_df += rank_j

    residual_df = max(n - 1 - accumulated_df, 0)
    residual_ss = max(total_ss - accumulated_ss, 0.0)
    return term_results, residual_ss, residual_df, total_ss


[docs] def permanova(distance_matrix: ArrayLike, factors: ArrayLike, permutations: int = 999, random_state: Optional[Union[int, np.random.Generator]] = None, distance_method: str = "bray", **kwargs) -> pd.DataFrame: """ Distance-based PERMANOVA (sequential sums of squares). Parameters ---------- distance_matrix: Square or condensed distance matrix. factors: DataFrame, Series, or array of predictor variables. Each column is treated as a separate term evaluated sequentially. permutations: Number of permutations for significance testing. Set to ``0`` or ``None`` to skip permutation p-values. random_state: Seed or ``Generator`` for reproducible permutations. Returns ------- dict Dictionary containing a result table (``table``), the total sum of squares, and the permutation F-statistics. """ D = _as_numpy_distance(distance_matrix, method=distance_method) terms = _prepare_terms(factors) if not terms: raise ValueError("No valid predictor terms supplied to PERMANOVA.") n = D.shape[0] H = np.eye(n) - np.ones((n, n)) / n G = -0.5 * H @ (D ** 2) @ H components, residual_ss, residual_df, total_ss = _sequential_components(G, terms) residual_f = residual_ss / residual_df if residual_df > 0 else np.nan observed_F = [] rows = [] sum_ss = 0.0 sum_df = 0 for comp in components: df = comp["df"] ss = comp["ss"] r2 = ss / total_ss if total_ss > 0 else np.nan if df > 0 and residual_df > 0 and residual_f > 0: f_stat = (ss / df) / residual_f else: f_stat = np.nan observed_F.append(f_stat) rows.append({ "Df": df, "SumOfSqs": ss, "R2": r2, "F": f_stat, "Pr(>F)": np.nan, # to be filled after permutations }) sum_ss += ss sum_df += df rows.append({ "Df": residual_df, "SumOfSqs": residual_ss, "R2": residual_ss / total_ss if total_ss > 0 else np.nan, "F": np.nan, "Pr(>F)": np.nan, }) rows.append({ "Df": n - 1, "SumOfSqs": total_ss, "R2": 1.0, "F": np.nan, "Pr(>F)": np.nan, }) if permutations and permutations > 0 and residual_df > 0: rng = np.random.default_rng(random_state) term_bases = [comp["basis"] for comp in components] perm_counts = np.zeros((len(components),), dtype=int) for _ in range(permutations): perm = rng.permutation(n) G_perm = G[perm][:, perm] total_perm = float(np.trace(G_perm)) ss_terms = [] for basis in term_bases: if basis is None or basis.size == 0: ss_terms.append(0.0) else: ss_terms.append(float(np.trace(basis.T @ G_perm @ basis))) ss_terms = np.array(ss_terms) res_ss_perm = max(total_perm - ss_terms.sum(), 0.0) res_f_perm = res_ss_perm / residual_df if residual_df > 0 else np.nan for idx, (ss_j, df_j) in enumerate(zip(ss_terms, [comp["df"] for comp in components])): obs_F = observed_F[idx] if df_j > 0 and np.isfinite(res_f_perm) and res_f_perm > 0 and np.isfinite(obs_F): f_perm = (ss_j / df_j) / res_f_perm if np.isfinite(f_perm) and f_perm >= obs_F - 1e-12: perm_counts[idx] += 1 for idx, row in enumerate(rows[:len(components)]): if observed_F[idx] is not None and np.isfinite(observed_F[idx]) and permutations > 0: row["Pr(>F)"] = (perm_counts[idx] + 1) / (permutations + 1) table = pd.DataFrame(rows, index=[comp["name"] for comp in components] + ["Residual", "Total"], columns=["Df", "SumOfSqs", "R2", "F", "Pr(>F)"]) table.attrs.update({ "total_ss": total_ss, "residual_ss": residual_ss, "permutations": permutations, "distance_method": distance_method, }) return table
[docs] def adonis2(distance_matrix: ArrayLike, factors: ArrayLike, permutations: int = 999, random_state: Optional[Union[int, np.random.Generator]] = None, distance_method: str = "bray", **kwargs) -> pd.DataFrame: """ Convenience wrapper for distance-based PERMANOVA. Parameters ---------- distance_matrix: Square or condensed distance matrix. factors: Predictor variables supplied as a ``DataFrame``/``Series``/array. permutations: Number of permutations for the significance test. random_state: Seed or ``Generator`` controlling permutation reproducibility. Returns ------- dict Same payload as :func:`permanova`. """ return permanova(distance_matrix, factors, permutations=permutations, random_state=random_state, distance_method=distance_method, **kwargs)