diff --git a/doc/references.bib b/doc/references.bib index dbb5fbfe..e25162fa 100644 --- a/doc/references.bib +++ b/doc/references.bib @@ -16,6 +16,14 @@ @article{Colombo2012 url = {https://doi.org/10.1214/11-AOS940} } +@article{Entner2010causal, + title = {On causal discovery from time series data using FCI}, + author = {Entner, Doris and Hoyer, Patrik O}, + journal = {Probabilistic graphical models}, + pages = {121--128}, + year = {2010} +} + @article{Lopez2016revisiting, title = {Revisiting classifier two-sample tests}, author = {Lopez-Paz, David and Oquab, Maxime}, diff --git a/doc/tutorials/index.rst b/doc/tutorials/index.rst index 46070e2f..14ddbbb3 100644 --- a/doc/tutorials/index.rst +++ b/doc/tutorials/index.rst @@ -16,3 +16,12 @@ no latent confounders. markovian/example-pc-algo +[Experimental] Time-series causal discovery +=========================================== +We include some tutorials on novel time-series causal discovery algorithms. + +.. toctree:: + :maxdepth: 1 + :titlesonly: + + timeseries/lpcmci-with-summary \ No newline at end of file diff --git a/doc/tutorials/timeseries/lpcmci-with-summary.ipynb b/doc/tutorials/timeseries/lpcmci-with-summary.ipynb new file mode 100644 index 00000000..e69de29b diff --git a/dodiscover/__init__.py b/dodiscover/__init__.py index 446ee292..d15ddbe2 100644 --- a/dodiscover/__init__.py +++ b/dodiscover/__init__.py @@ -8,4 +8,4 @@ from ._protocol import EquivalenceClass, Graph from ._version import __version__ # noqa: F401 from .constraint import FCI, PC -from .context_builder import ContextBuilder, make_context +from .context_builder import ContextBuilder, make_context, make_ts_context diff --git a/dodiscover/_protocol.py b/dodiscover/_protocol.py index 84e32def..958b2c97 100644 --- a/dodiscover/_protocol.py +++ b/dodiscover/_protocol.py @@ -1,7 +1,9 @@ -from typing import Dict, FrozenSet, Iterable, Protocol +from typing import Dict, FrozenSet, Iterable, List, Optional, Protocol import networkx as nx +from .typing import Column + class Graph(Protocol): """Protocol for graphs to work with dodiscover algorithms.""" @@ -27,7 +29,7 @@ def remove_node(self, u) -> None: """Remove a node from the graph.""" pass - def remove_edges_from(self, edges) -> None: + def remove_edges_from(self, edges: List) -> None: """Remove a set of edges from the graph.""" pass @@ -60,6 +62,33 @@ def copy(self): pass +class TimeSeriesGraph(Graph, Protocol): + """A protocol for time-series graphs.""" + + @property + def max_lag(self) -> int: + pass + + def lagged_neighbors(self, u: Column) -> List[Column]: + """Return neighbors of u that are in a previous time point.""" + pass + + def contemporaneous_neighbors(self, u: Column) -> List[Column]: + """Return neighbors of u that are in the same time point.""" + pass + + # TODO: refactor to + # 1. remove_forward_homologous_edges(self, u, v) + # 2. remove_backward_homologous_edges(self, u, v) + def set_auto_removal(self, auto: Optional[str]) -> None: + """Specify how to auto-remove homologous edges.""" + pass + + def nodes_at(self, t): + """Nodes at specific time point.""" + pass + + class EquivalenceClass(Graph, Protocol): """Protocol for equivalence class of graphs.""" @@ -88,6 +117,10 @@ def bidirected_edge_name(self) -> str: """Name of the bidirected edges.""" pass + def get_graphs(self, graph_name: Optional[str] = None): + """Get edge sub-graph.""" + pass + def orient_uncertain_edge(self, u, v) -> None: """Orients an uncertain edge in the equivalence class to directed ``'u'*->'v'``.""" pass diff --git a/dodiscover/ci/base.py b/dodiscover/ci/base.py index 9c1bceb9..b4102ad6 100644 --- a/dodiscover/ci/base.py +++ b/dodiscover/ci/base.py @@ -6,8 +6,7 @@ import sklearn.utils from numpy.typing import ArrayLike -from dodiscover.typing import Column - +from ..typing import Column from .monte_carlo import generate_knn_in_subspace, restricted_nbr_permutation diff --git a/dodiscover/ci/oracle.py b/dodiscover/ci/oracle.py index f21208b5..132b6e76 100644 --- a/dodiscover/ci/oracle.py +++ b/dodiscover/ci/oracle.py @@ -1,4 +1,4 @@ -from typing import Optional, Set +from typing import Optional, Set, Union import networkx as nx import numpy as np @@ -6,7 +6,7 @@ from dodiscover.typing import Column -from .._protocol import Graph +from .._protocol import Graph, TimeSeriesGraph from .base import BaseConditionalIndependenceTest @@ -23,7 +23,7 @@ class Oracle(BaseConditionalIndependenceTest): _allow_multivariate_input: bool = True - def __init__(self, graph: Graph) -> None: + def __init__(self, graph: Union[Graph, TimeSeriesGraph]) -> None: self.graph = graph def test( diff --git a/dodiscover/ci/simulate.py b/dodiscover/ci/simulate.py index f6d0eaf1..dc60a00e 100644 --- a/dodiscover/ci/simulate.py +++ b/dodiscover/ci/simulate.py @@ -1,4 +1,4 @@ -from typing import Callable, Tuple +from typing import Callable, Tuple, Optional import numpy as np from numpy.typing import NDArray @@ -106,3 +106,9 @@ def nonlinear_additive_gaussian( Y = nonlinear_func(freq * (2 * Axy * X + Z * Azy + std * Y_noise + cause_var)) return X, Y, Z + + +def vector_auto_regressive_from_summary( + summary_G, max_lag: int=1, n_times: int=1000, random_state: Optional[int] = None +): + pass diff --git a/dodiscover/constraint/__init__.py b/dodiscover/constraint/__init__.py index 6bf409ef..b4bf9018 100644 --- a/dodiscover/constraint/__init__.py +++ b/dodiscover/constraint/__init__.py @@ -1,3 +1,5 @@ +from .config import SkeletonMethods from .fcialg import FCI from .pcalg import PC -from .skeleton import LearnSemiMarkovianSkeleton, LearnSkeleton, SkeletonMethods +from .skeleton import LearnSemiMarkovianSkeleton, LearnSkeleton +from .timeseries import LearnTimeSeriesSkeleton, TimeSeriesFCI, TimeSeriesPC diff --git a/dodiscover/constraint/config.py b/dodiscover/constraint/config.py index b78c335d..5ac8f566 100644 --- a/dodiscover/constraint/config.py +++ b/dodiscover/constraint/config.py @@ -31,3 +31,5 @@ class SkeletonMethods(Enum, metaclass=MetaEnum): NBRS_PATH = "neighbors_path" PDS = "pds" PDS_PATH = "pds_path" + PDS_T = "pds_t" + PDS_T_PATH = "pds_t_path" diff --git a/dodiscover/constraint/fcialg.py b/dodiscover/constraint/fcialg.py index cdff78fb..677ad365 100644 --- a/dodiscover/constraint/fcialg.py +++ b/dodiscover/constraint/fcialg.py @@ -838,7 +838,9 @@ def learn_skeleton( d.pop("test_stat") if "pvalue" in d: d.pop("pvalue") - context = make_context(context).graph(new_init_graph).state_variable("PAG", pag).build() + context = ( + make_context(context).init_graph(new_init_graph).state_variable("PAG", pag).build() + ) # # now compute all possibly d-separating sets and learn a better skeleton skel_alg = LearnSemiMarkovianSkeleton( diff --git a/dodiscover/constraint/pcalg.py b/dodiscover/constraint/pcalg.py index ec9e6020..46debc49 100644 --- a/dodiscover/constraint/pcalg.py +++ b/dodiscover/constraint/pcalg.py @@ -1,16 +1,16 @@ import logging from itertools import combinations -from typing import Optional +from typing import Iterator, Optional import networkx as nx from dodiscover.ci.base import BaseConditionalIndependenceTest -from dodiscover.constraint.config import SkeletonMethods from dodiscover.constraint.utils import is_in_sep_set from dodiscover.typing import Column, SeparatingSet from .._protocol import EquivalenceClass from ._classes import BaseConstraintDiscovery +from .config import SkeletonMethods logger = logging.getLogger() @@ -121,6 +121,24 @@ def convert_skeleton_graph(self, graph: nx.Graph) -> EquivalenceClass: graph = CPDAG(incoming_undirected_edges=graph) return graph + def _orientable_triples(self, graph: EquivalenceClass) -> Iterator: + """Iterate through orientable triple edges. + + Used in orientation of colliders. + """ + for u in graph.nodes: + for v_i, v_j in combinations(graph.neighbors(u), 2): + yield (v_i, u, v_j) + + def _orientable_edges(self, graph: EquivalenceClass) -> Iterator: + """Iterate through orientable edges. + + Used in orientation of edges. + """ + for i in graph.nodes: + for j in graph.neighbors(i): + yield (i, j) + def orient_edges(self, graph: EquivalenceClass) -> None: """Orient edges in a skeleton graph to estimate the causal DAG, or CPDAG. @@ -140,30 +158,27 @@ def orient_edges(self, graph: EquivalenceClass) -> None: finished = False while idx < self.max_iter and not finished: # type: ignore change_flag = False - for i in graph.nodes: - for j in graph.neighbors(i): - if i == j: - continue - # Rule 1: Orient i-j into i->j whenever there is an arrow k->i - # such that k and j are nonadjacent. - r1_add = self._apply_meek_rule1(graph, i, j) - - # Rule 2: Orient i-j into i->j whenever there is a chain - # i->k->j. - r2_add = self._apply_meek_rule2(graph, i, j) - - # Rule 3: Orient i-j into i->j whenever there are two chains - # i-k->j and i-l->j such that k and l are nonadjacent. - r3_add = self._apply_meek_rule3(graph, i, j) - - # Rule 4: Orient i-j into i->j whenever there are two chains - # i-k->l and k->l->j such that k and j are nonadjacent. - # - # However, this rule is not necessary when the PC-algorithm - # is used to estimate a DAG. - - if any([r1_add, r2_add, r3_add]) and not change_flag: - change_flag = True + for (i, j) in self._orientable_edges(graph): + # Rule 1: Orient i-j into i->j whenever there is an arrow k->i + # such that k and j are nonadjacent. + r1_add = self._apply_meek_rule1(graph, i, j) + + # Rule 2: Orient i-j into i->j whenever there is a chain + # i->k->j. + r2_add = self._apply_meek_rule2(graph, i, j) + + # Rule 3: Orient i-j into i->j whenever there are two chains + # i-k->j and i-l->j such that k and l are nonadjacent. + r3_add = self._apply_meek_rule3(graph, i, j) + + # Rule 4: Orient i-j into i->j whenever there are two chains + # i-k->l and k->l->j such that k and j are nonadjacent. + # + # However, this rule is not necessary when the PC-algorithm + # is used to estimate a DAG. + + if any([r1_add, r2_add, r3_add]) and not change_flag: + change_flag = True if not change_flag: finished = True logger.info(f"Finished applying R1-3, with {idx} iterations") @@ -177,6 +192,11 @@ def orient_unshielded_triples( ) -> None: """Orient colliders given a graph and separation set. + Unshielded triples are (i, j, k), such that there is no edge (i, k). + This will be oriented as a collider, if 'i' is not conditionally + independent to 'j' given 'k', but is conditionally independent without + 'k'. + Parameters ---------- graph : EquivalenceClass @@ -185,16 +205,15 @@ def orient_unshielded_triples( The separating set between any two nodes. """ # for every node in the PAG, evaluate neighbors that have any edge - for u in graph.nodes: - for v_i, v_j in combinations(graph.neighbors(u), 2): - # Check that there is no edge of any type between - # v_i and v_j, else this is a "shielded" collider. - # Then check to see if 'u' is in "any" separating - # set. If it is not, then there is a collider. - if v_j not in graph.neighbors(v_i) and not is_in_sep_set( - u, sep_set, v_i, v_j, mode="any" - ): - self._orient_collider(graph, v_i, u, v_j) + for v_i, u, v_j in self._orientable_triples(graph): + # Check that there is no edge of any type between + # v_i and v_j, else this is a "shielded" collider. + # Then check to see if 'u' is in "any" separating + # set. If it is not, then there is a collider. + if v_j not in graph.neighbors(v_i) and not is_in_sep_set( + u, sep_set, v_i, v_j, mode="any" + ): + self._orient_collider(graph, v_i, u, v_j) def _orient_collider( self, graph: EquivalenceClass, v_i: Column, u: Column, v_j: Column diff --git a/dodiscover/constraint/skeleton.py b/dodiscover/constraint/skeleton.py index b0512345..f6b15d8a 100644 --- a/dodiscover/constraint/skeleton.py +++ b/dodiscover/constraint/skeleton.py @@ -1,7 +1,7 @@ import logging from collections import defaultdict -from itertools import chain, combinations -from typing import Iterable, Optional, Set, SupportsFloat, Tuple, Union +from itertools import chain +from typing import Optional, Set, Tuple import networkx as nx import numpy as np @@ -13,86 +13,11 @@ from ..context import Context from ..context_builder import make_context +from .utils import _find_neighbors_along_path, _iter_conditioning_set logger = logging.getLogger() -def _iter_conditioning_set( - possible_variables: Iterable, - x_var: Union[SupportsFloat, str], - y_var: Union[SupportsFloat, str], - size_cond_set: int, -) -> Iterable[Set]: - """Iterate function to generate the conditioning set. - - Parameters - ---------- - possible_variables : iterable - A set/list/dict of possible variables to consider for the conditioning set. - This can be for example, the current adjacencies. - x_var : node - The node for the 'x' variable. - y_var : node - The node for the 'y' variable. - size_cond_set : int - The size of the conditioning set to consider. If there are - less adjacent variables than this number, then all variables will be in the - conditioning set. - - Yields - ------ - Z : set - The set of variables for the conditioning set. - """ - exclusion_set = {x_var, y_var} - - all_adj_excl_current = [p for p in possible_variables if p not in exclusion_set] - - # loop through all possible combinations of the conditioning set size - for cond in combinations(all_adj_excl_current, size_cond_set): - cond_set = set(cond) - yield cond_set - - -def _find_neighbors_along_path(G: nx.Graph, start, end) -> Set: - """Find neighbors that are along a path from start to end. - - Parameters - ---------- - G : nx.Graph - The graph. - start : Node - The starting node. - end : Node - The ending node. - - Returns - ------- - nbrs : Set - The set of neighbors that are also along a path towards - the 'end' node. - """ - - def _assign_weight(u, v, edge_attr): - if u == node or v == node: - return np.inf - else: - return 1 - - nbrs = set() - for node in G.neighbors(start): - if not G.has_edge(start, node): - raise RuntimeError(f"{start} and {node} are not connected, but they are assumed to be.") - - # find a path from start node to end - path = nx.shortest_path(G, source=node, target=end, weight=_assign_weight) - if len(path) > 0: - if start in path: - raise RuntimeError("There is an error with the input. This is not possible.") - nbrs.add(node) - return nbrs - - class LearnSkeleton: """Learn a skeleton graph from a Markovian causal model. @@ -268,6 +193,9 @@ def _initialize_params(self) -> None: else: self.max_combinations_ = self.max_combinations + # track progress of the algorithm for which edges to remove to ensure stability + self.remove_edges = set() + def evaluate_edge( self, data: pd.DataFrame, X: Column, Y: Column, Z: Optional[Set[Column]] = None ) -> Tuple[float, float]: @@ -293,10 +221,34 @@ def evaluate_edge( """ if Z is None: Z = set() - test_stat, pvalue = self.ci_estimator.test(data, {X}, {Y}, Z, **self.ci_estimator_kwargs) + try: + test_stat, pvalue = self.ci_estimator.test( + data, {X}, {Y}, Z, **self.ci_estimator_kwargs + ) + except Exception as e: + print(X, Y, Z) + raise (e) self.n_ci_tests += 1 return test_stat, pvalue + def _preprocess_data(self, data: pd.DataFrame, context: Context): + if set(context.observed_variables) != set(data.columns.values): + raise RuntimeError( + "The observed variable names in data and context do not match: \n" + f"- {context.observed_variables} \n" + f"- {data.columns}" + ) + + edge_attrs = set( + chain.from_iterable(d.keys() for *_, d in context.init_graph.edges(data=True)) + ) + if "test_stat" in edge_attrs or "pvalue" in edge_attrs: + raise RuntimeError( + "Running skeleton discovery with adjacency graph " + "with 'test_stat' or 'pvalue' is not supported yet." + ) + return data, context + def fit(self, data: pd.DataFrame, context: Context) -> None: """Run structure learning to learn the skeleton of the causal graph. @@ -307,28 +259,19 @@ def fit(self, data: pd.DataFrame, context: Context) -> None: context : Context A context object. """ + data, context = self._preprocess_data(data, context) self.context = make_context(context).build() + # initialize learning parameters + self._initialize_params() + # get the initialized graph adj_graph = self.context.init_graph X = data - # track progress of the algorithm for which edges to remove to ensure stability - self.remove_edges = set() - - # initialize learning parameters - self._initialize_params() - # the size of the conditioning set will start off at the minimum size_cond_set = self.min_cond_set_size_ - edge_attrs = set(chain.from_iterable(d.keys() for *_, d in adj_graph.edges(data=True))) - if "test_stat" in edge_attrs or "pvalue" in edge_attrs: - raise RuntimeError( - "Running skeleton discovery with adjacency graph " - "with 'test_stat' or 'pvalue' is not supported yet." - ) - # store the absolute value of test-statistic values and pvalue for # every single candidate parent-child edge (X -> Y) nx.set_edge_attributes(adj_graph, np.inf, "test_stat") @@ -372,12 +315,6 @@ def fit(self, data: pd.DataFrame, context: Context) -> None: skeleton_method=self.skeleton_method, ) - logger.debug( - f"Adj({x_var}) without {y_var} with size={len(possible_adjacencies) - 1} " - f"with p={size_cond_set}. The possible variables to condition on are: " - f"{possible_variables}." - ) - # check that number of adjacencies is greater then the # cardinality of the conditioning set if len(possible_variables) < size_cond_set: diff --git a/dodiscover/constraint/timeseries/__init__.py b/dodiscover/constraint/timeseries/__init__.py new file mode 100644 index 00000000..5345cd6b --- /dev/null +++ b/dodiscover/constraint/timeseries/__init__.py @@ -0,0 +1,3 @@ +from .skeleton import LearnTimeSeriesSemiMarkovianSkeleton, LearnTimeSeriesSkeleton +from .tsfcialg import TimeSeriesFCI +from .tspcalg import TimeSeriesPC diff --git a/dodiscover/constraint/timeseries/skeleton.py b/dodiscover/constraint/timeseries/skeleton.py new file mode 100644 index 00000000..4d6563ae --- /dev/null +++ b/dodiscover/constraint/timeseries/skeleton.py @@ -0,0 +1,456 @@ +import logging +from itertools import chain +from typing import Iterator, Optional, Set, Tuple + +import networkx as nx +import numpy as np +import pandas as pd + +from dodiscover._protocol import TimeSeriesGraph +from dodiscover.ci import BaseConditionalIndependenceTest +from dodiscover.context import Context, TimeSeriesContext +from dodiscover.typing import Column, SeparatingSet + +from ...context_builder import make_ts_context +from ..config import SkeletonMethods +from ..skeleton import LearnSkeleton +from ..utils import _iter_conditioning_set +from .utils import convert_ts_df_to_multiindex + +logger = logging.getLogger() + + +def nodes_in_time_order(G: TimeSeriesGraph) -> Iterator: + """Return nodes from G in time order starting from max-lag to t=0.""" + for t in range(G.max_lag, -1, -1): + for node in G.nodes_at(t): + yield node + + +class LearnTimeSeriesSkeleton(LearnSkeleton): + """Learn a skeleton time-series graph from a Markovian causal model. + + Learning time-series causal graph skeletons is a more complex task compared to its + iid counterpart. There are a few key differences to be aware of: + + 1. Without extending the maximum-lag, there is always latent confounding: Consider as + an example, two variable lag-1 ts-causal-graph. + + t-1 t + X o -> o + Y o -> o + + with Y(t-1) -> X(t). Assuming stationarity, then + + Parameters + ---------- + ci_estimator : BaseConditionalIndependenceTest + The conditional independence test function. + sep_set : dictionary of dictionary of list of set + Mapping node to other nodes to separating sets of variables. + If ``None``, then an empty dictionary of dictionary of list of sets + will be initialized. + alpha : float, optional + The significance level for the conditional independence test, by default 0.05. + min_cond_set_size : int + The minimum size of the conditioning set, by default 0. The number of variables + used in the conditioning set. + max_cond_set_size : int, optional + Maximum size of the conditioning set, by default None. Used to limit + the computation spent on the algorithm. + max_combinations : int, optional + The maximum number of conditional independence tests to run from the set + of possible conditioning sets. By default None, which means the algorithm will + check all possible conditioning sets. If ``max_combinations=n`` is set, then + for every conditioning set size, 'p', there will be at most 'n' CI tests run + before the conditioning set size 'p' is incremented. For controlling the size + of 'p', see ``min_cond_set_size`` and ``max_cond_set_size``. This can be used + in conjunction with ``keep_sorted`` parameter to only test the "strongest" + dependences. + skeleton_method : SkeletonMethods + The method to use for testing conditional independence. Must be one of + ('complete', 'neighbors', 'neighbors_path'). See Notes for more details. + keep_sorted : bool + Whether or not to keep the considered conditioning set variables in sorted + dependency order. If True (default) will sort the existing dependencies of each variable + by its dependencies from strongest to weakest (i.e. largest CI test statistic value + to lowest). This can be used in conjunction with ``max_combinations`` parameter + to only test the "strongest" dependences. + separate_lag_phase : bool, + Whether or not to separate the lagged and contemporaneous skeleton learning + phase. If False (default), then will test all CI dependences in the same loop. + contemporaneous_edges : bool, + Whether or not there are contemporaneous edges (i.e. edges that occur at the same time point + between two nodes). By default is True. + """ + + def __init__( + self, + ci_estimator: BaseConditionalIndependenceTest, + sep_set: Optional[SeparatingSet] = None, + alpha: float = 0.05, + min_cond_set_size: int = 0, + max_cond_set_size: Optional[int] = None, + max_combinations: Optional[int] = None, + skeleton_method: SkeletonMethods = SkeletonMethods.NBRS, + keep_sorted: bool = False, + max_path_length: Optional[int] = None, + separate_lag_phase: bool = False, + contemporaneous_edges: bool = True, + **ci_estimator_kwargs, + ) -> None: + super().__init__( + ci_estimator, + sep_set, + alpha, + min_cond_set_size, + max_cond_set_size, + max_combinations, + skeleton_method, + keep_sorted, + **ci_estimator_kwargs, + ) + self.max_path_length = max_path_length + self.separate_lag_phase = separate_lag_phase + self.contemporaneous_edges = contemporaneous_edges + + def evaluate_edge( + self, data: pd.DataFrame, X: Column, Y: Column, Z: Optional[Set[Column]] = None + ) -> Tuple[float, float]: + """Evaluate an edge, but the data frame has columns as '_'.""" + return super().evaluate_edge(data, X, Y, Z) + + def _learn_skeleton( + self, data: pd.DataFrame, adj_graph: TimeSeriesGraph, nbr_search: str = "all" + ): + """Private method to learn a skeleton""" + # the size of the conditioning set will start off at the minimum + size_cond_set = self.min_cond_set_size_ + + # If there is latent confounding, we need to test all nodes starting from + # max-lag. Because adjacencies are only repeated backwards in time + testable_nodes = list(nodes_in_time_order(adj_graph)) + + # to do causal discovery of time-series graphs, + # homologous edges should not be removed automatically + adj_graph.set_auto_removal("forwards") + + # # to do causal discovery of time-series graphs, + # # homologous edges should not be removed automatically + # adj_graph.set_auto_removal("backwards") + + print(f"Testing nodes in the following order for learning skeleton: {testable_nodes}") + + # Outer loop: iterate over 'size_cond_set' until stopping criterion is met + # - 'size_cond_set' > 'max_cond_set_size' or + # - All (X, Y) pairs have candidate conditioning sets of size < 'size_cond_set' + while 1: + cont = False + # initialize set of edges to remove at the end of every loop + self.remove_edges = set() + + # loop through every node + # Note: in time-series graphs, all nodes are defined as a 2-tuple + # of (, ) + for y_var in testable_nodes: + # we only consider variables with required lag + # if y_var[1] != 0: + # continue + + # TODO: need more efficient way of querying all possible edges + if nbr_search == "all": + lagged_nbrs = adj_graph.lagged_neighbors(y_var) + contemporaneous_nbrs = adj_graph.contemporaneous_neighbors(y_var) + possible_adjacencies = list(set(lagged_nbrs).union(set(contemporaneous_nbrs))) + elif nbr_search == "lagged": + possible_adjacencies = adj_graph.lagged_neighbors(y_var) + elif nbr_search == "contemporaneous": + possible_adjacencies = adj_graph.contemporaneous_neighbors(y_var) + + logger.info(f"Considering node {y_var}...\n\n") + print(f"\n\nTesting {y_var} against possible adjacencies {possible_adjacencies}") + print(f"size conditioning set p = {size_cond_set}") + for x_var in possible_adjacencies: + # a node cannot be a parent to itself in DAGs + if y_var == x_var: + continue + + # ignore fixed edges + if (x_var, y_var) in self.context.included_edges.edges: + continue + + # compute the possible variables used in the conditioning set + possible_variables = self._compute_candidate_conditioning_sets( + adj_graph, + y_var, + x_var, + skeleton_method=self.skeleton_method, + ) + + logger.debug( + f"Adj({x_var}) without {y_var} with size={len(possible_adjacencies) - 1} " + f"with p={size_cond_set}. The possible variables to condition on are: " + f"{possible_variables}." + ) + + # check that number of adjacencies is greater then the + # cardinality of the conditioning set + if len(possible_variables) < size_cond_set: + logger.debug( + f"\n\nBreaking for {x_var}, {y_var}, {len(possible_adjacencies)}, " + f"{size_cond_set}, {possible_variables}" + ) + continue + else: + cont = True + + # generate iterator through the conditioning sets + conditioning_sets = _iter_conditioning_set( + possible_variables=possible_variables, + x_var=x_var, + y_var=y_var, + size_cond_set=size_cond_set, + ) + + # now iterate through the possible parents + for comb_idx, cond_set in enumerate(conditioning_sets): + # check the number of combinations of possible parents we have tried + # to use as a separating set + if ( + self.max_combinations_ is not None + and comb_idx >= self.max_combinations_ + ): + break + + # compute conditional independence test + test_stat, pvalue = self.evaluate_edge(data, x_var, y_var, set(cond_set)) + + # if any "independence" is found through inability to reject + # the null hypothesis, then we will break the loop comparing X and Y + # and say X and Y are conditionally independent given 'cond_set' + if pvalue > self.alpha: + print(f"Removing {x_var} - {y_var} with {cond_set}.") + break + + # post-process the CI test results + removed_edge = self._postprocess_ci_test( + adj_graph, x_var, y_var, cond_set, test_stat, pvalue + ) + + # summarize the comparison of XY + self._summarize_xy_comparison(x_var, y_var, removed_edge, pvalue) + + # finally remove edges after performing + # conditional independence tests + logger.info("\n---------------------------------------------") + logger.info(f"For p = {size_cond_set}, removing all edges: {self.remove_edges}") + + # TODO: should not hack the removal of edges to remove + from_set = [] + to_set = [] + for u, v in self.remove_edges: + # the opposite is already in there... + if v in to_set and u in from_set: + continue + from_set.append(u) + to_set.append(v) + self.remove_edges = set() + for u, v in zip(from_set, to_set): + self.remove_edges.add((u, v)) + + # Remove non-significant links + # Note: Removing edges at the end ensures "stability" of the algorithm + # with respect to the randomness choice of pairs of edges considered in the inner loop + print(f"Removing edges {self.remove_edges}") + adj_graph.remove_edges_from(list(self.remove_edges)) + + # increment the conditioning set size + size_cond_set += 1 + + # only allow conditioning set sizes up to maximum set number + if size_cond_set > self.max_cond_set_size_ or cont is False: + break + + return adj_graph + + def fit(self, data: pd.DataFrame, context: Context) -> None: + """Run structure learning to learn the skeleton of the causal graph. + + Parameters + ---------- + data : pd.DataFrame + The data to learn the causal graph from. + context : Context + A context object. + """ + if self.separate_lag_phase and not self.contemporaneous_edges: + raise ValueError( + "There is assumed no contemporaneous edges, but you also " + "specified to separate the lag and contemporaneous phase." + ) + + data, context = self._preprocess_data(data, context) + self.context = ( + make_ts_context(context) + .observed_variables(data.columns.get_level_values("variable").tolist()) + .build() + ) + + # initialize learning parameters + self._initialize_params() + + # get the initialized graph + adj_graph: TimeSeriesGraph = self.context.init_graph + + # store the absolute value of test-statistic values and pvalue for + # every single candidate parent-child edge (X -> Y) + nx.set_edge_attributes(adj_graph, np.inf, "test_stat") + nx.set_edge_attributes(adj_graph, -1e-5, "pvalue") + + logger.info( + f"\n\nRunning skeleton phase with: \n" + f"max_combinations: {self.max_combinations_},\n" + f"min_cond_set_size: {self.min_cond_set_size_},\n" + f"max_cond_set_size: {self.max_cond_set_size_},\n" + ) + + # learn the skeleton graph + if self.separate_lag_phase: + # first do the lagged search + adj_graph = self._learn_skeleton(data, adj_graph, nbr_search="lagged") + + # then do contemporaneous + adj_graph = self._learn_skeleton(data, adj_graph, nbr_search="contemporaneous") + else: + adj_graph = self._learn_skeleton(data, adj_graph, nbr_search="all") + + # possibly remove all contemporaneous edges if there is + # no assumption of contemporaneous causal structure + # TODO: can make sure we don't inner-test the CI relations between contemporaneous edges + if not self.contemporaneous_edges: + auto_removal = adj_graph._auto_removal # type: ignore + adj_graph.set_auto_removal(None) + for u, v in adj_graph.edges: # type: ignore + if u[1] == v[1]: + adj_graph.remove_edge(u, v) # type: ignore + adj_graph.set_auto_removal(auto_removal) + + self.adj_graph_ = adj_graph + + def _preprocess_data(self, data: pd.DataFrame, context: TimeSeriesContext): + """Preprocess data and context. + + In time-series causal discovery, dataframe of the shape (n_times, n_signals) + are re-formatted to a dataframe with (n_samples, n_signals x lag_points) + with a multi-index column with variable names at level 1 and lag time + points at level 2. For example, a multi-index column of ``[('x', 0), ('x', -1)]`` + would indicate the first column is the variable x at lag 0 and the second + column is variable x at lag 1. + """ + # first reformat data + max_lag = context.max_lag # type: ignore + data = convert_ts_df_to_multiindex(data, max_lag) + + # run preprocessing + if set(context.observed_variables) != set(data.columns.get_level_values("variable")): + raise RuntimeError( + "The observed variable names in data and context do not match: \n" + f"- {context.observed_variables} \n" + f"- {data.columns}" + ) + edge_attrs = set( + chain.from_iterable(d.keys() for *_, d in context.init_graph.edges(data=True)) + ) + if "test_stat" in edge_attrs or "pvalue" in edge_attrs: + raise RuntimeError( + "Running skeleton discovery with adjacency graph " + "with 'test_stat' or 'pvalue' is not supported yet." + ) + + return data, context + + +class LearnTimeSeriesSemiMarkovianSkeleton(LearnTimeSeriesSkeleton): + def __init__( + self, + ci_estimator: BaseConditionalIndependenceTest, + sep_set: Optional[SeparatingSet] = None, + alpha: float = 0.05, + min_cond_set_size: int = 0, + max_cond_set_size: Optional[int] = None, + max_combinations: Optional[int] = None, + skeleton_method: SkeletonMethods = SkeletonMethods.PDS_T, + keep_sorted: bool = False, + max_path_length: Optional[int] = None, + separate_lag_phase: bool = False, + contemporaneous_edges: bool = True, + **ci_estimator_kwargs, + ) -> None: + super().__init__( + ci_estimator, + sep_set, + alpha, + min_cond_set_size, + max_cond_set_size, + max_combinations, + skeleton_method, + keep_sorted, + **ci_estimator_kwargs, + ) + if max_path_length is None: + max_path_length = np.inf + self.max_path_length = max_path_length + self.separate_lag_phase = separate_lag_phase + self.contemporaneous_edges = contemporaneous_edges + + def _compute_candidate_conditioning_sets( + self, adj_graph: nx.Graph, x_var: Column, y_var: Column, skeleton_method: SkeletonMethods + ) -> Set[Column]: + import pywhy_graphs as pgraph + + # get PAG from the context object + pag = self.context.state_variable("PAG") + + if skeleton_method == SkeletonMethods.PDS: + # determine how we want to construct the candidates for separating nodes + # perform conditioning independence testing on all combinations + possible_variables = pgraph.pds( + pag, x_var, y_var, max_path_length=self.max_path_length # type: ignore + ) + elif skeleton_method == SkeletonMethods.PDS_PATH: + # determine how we want to construct the candidates for separating nodes + # perform conditioning independence testing on all combinations + possible_variables = pgraph.pds_path( + pag, x_var, y_var, max_path_length=self.max_path_length # type: ignore + ) + elif skeleton_method == SkeletonMethods.PDS_T: + # determine how we want to construct the candidates for separating nodes + # perform conditioning independence testing on all combinations + possible_variables = pgraph.pds_t( + pag, x_var, y_var, max_path_length=self.max_path_length # type: ignore + ) + elif skeleton_method == SkeletonMethods.PDS_T_PATH: + # determine how we want to construct the candidates for separating nodes + # perform conditioning independence testing on all combinations + possible_variables = pgraph.pds_t_path( + pag, x_var, y_var, max_path_length=self.max_path_length # type: ignore + ) + + if self.keep_sorted: + # Note it is assumed in public API that 'test_stat' is set + # inside the adj_graph + possible_variables = sorted( + possible_variables, + key=lambda n: adj_graph.edges[x_var, n]["test_stat"], + reverse=True, + ) # type: ignore + + if x_var in possible_variables: + possible_variables.remove(x_var) + if y_var in possible_variables: + possible_variables.remove(y_var) + + return possible_variables + + def fit(self, data: pd.DataFrame, context: TimeSeriesContext) -> None: + return super().fit(data, context) diff --git a/dodiscover/constraint/timeseries/tsfcialg.py b/dodiscover/constraint/timeseries/tsfcialg.py new file mode 100644 index 00000000..f5e9b944 --- /dev/null +++ b/dodiscover/constraint/timeseries/tsfcialg.py @@ -0,0 +1,167 @@ +from typing import Optional, Tuple + +import networkx as nx +import pandas as pd + +from dodiscover.ci.base import BaseConditionalIndependenceTest +from dodiscover.constraint.timeseries import ( + LearnTimeSeriesSemiMarkovianSkeleton, + LearnTimeSeriesSkeleton, +) +from dodiscover.context import Context +from dodiscover.context_builder import make_ts_context +from dodiscover.typing import SeparatingSet + +from ..._protocol import EquivalenceClass +from ..config import SkeletonMethods +from ..fcialg import FCI + + +class TimeSeriesFCI(FCI): + def __init__( + self, + ci_estimator: BaseConditionalIndependenceTest, + alpha: float = 0.05, + min_cond_set_size: Optional[int] = None, + max_cond_set_size: Optional[int] = None, + max_combinations: Optional[int] = None, + skeleton_method: SkeletonMethods = SkeletonMethods.NBRS, + apply_orientations: bool = True, + max_iter: int = 1000, + max_path_length: Optional[int] = None, + selection_bias: bool = False, + pds_skeleton_method: SkeletonMethods = SkeletonMethods.PDS, + separate_lag_phase: bool = False, + contemporaneous_edges: bool = True, + **ci_estimator_kwargs, + ): + super().__init__( + ci_estimator, + alpha, + min_cond_set_size, + max_cond_set_size, + max_combinations, + skeleton_method, + apply_orientations, + max_iter, + max_path_length, + selection_bias, + pds_skeleton_method, + **ci_estimator_kwargs, + ) + self.separate_lag_phase = separate_lag_phase + self.contemporaneous_edges = contemporaneous_edges + + def learn_skeleton( + self, data: pd.DataFrame, context: Context, sep_set: Optional[SeparatingSet] = None + ) -> Tuple[nx.Graph, SeparatingSet]: + import pywhy_graphs + + from dodiscover import make_ts_context + + # initially learn the skeleton + skel_alg = LearnTimeSeriesSkeleton( + self.ci_estimator, + sep_set=sep_set, + alpha=self.alpha, + min_cond_set_size=self.min_cond_set_size, + max_cond_set_size=self.max_cond_set_size, + max_combinations=self.max_combinations, + skeleton_method=self.skeleton_method, + keep_sorted=False, + separate_lag_phase=self.separate_lag_phase, + contemporaneous_edges=self.contemporaneous_edges, + **self.ci_estimator_kwargs, + ) + skel_alg.fit(data, context) + + skel_graph = skel_alg.adj_graph_ + sep_set = skel_alg.sep_set_ + self.n_ci_tests += skel_alg.n_ci_tests + + # convert the undirected skeleton graph to a PAG, where + # all left-over edges have a "circle" endpoint + pag = pywhy_graphs.StationaryTimeSeriesPAG( + incoming_circle_edges=skel_graph, name="PAG derived with tsFCI" + ) + + # orient colliders + self.orient_unshielded_triples(pag, sep_set) + + # convert the adjacency graph + new_init_graph = pag.to_ts_undirected() + + # Update the Context: + # add the corresponding intermediate PAG now to the context + # new initialization graph + for (_, _, d) in new_init_graph.edges(data=True): + if "test_stat" in d: + d.pop("test_stat") + if "pvalue" in d: + d.pop("pvalue") + context = ( + make_ts_context(context).init_graph(new_init_graph).state_variable("PAG", pag).build() + ) + + # # now compute all possibly d-separating sets and learn a better skeleton + skel_alg = LearnTimeSeriesSemiMarkovianSkeleton( + self.ci_estimator, + sep_set=sep_set, + alpha=self.alpha, + min_cond_set_size=self.min_cond_set_size, + max_cond_set_size=self.max_cond_set_size, + max_combinations=self.max_combinations, + skeleton_method=self.pds_skeleton_method, + keep_sorted=False, + max_path_length=self.max_path_length, + separate_lag_phase=self.separate_lag_phase, + contemporaneous_edges=self.contemporaneous_edges, + **self.ci_estimator_kwargs, + ) + skel_alg.fit(data, context) + + skel_graph = skel_alg.adj_graph_ + sep_set = skel_alg.sep_set_ + self.n_ci_tests += skel_alg.n_ci_tests + return skel_graph, sep_set + + def fit(self, data: pd.DataFrame, context: Context) -> None: + self.context_ = make_ts_context(context).build() + graph = self.context_.init_graph + self.init_graph_ = graph + self.fixed_edges_ = self.context_.included_edges + + # create a reference to the underlying data to be used + self.X_ = data + + # initialize graph object to apply learning + self.separating_sets_ = self._initialize_sep_sets(self.init_graph_) + + # learn skeleton graph and the separating sets per variable + graph, self.separating_sets_ = self.learn_skeleton( + self.X_, self.context_, self.separating_sets_ + ) + + # convert networkx.Graph to relevant causal graph object + graph = self.convert_skeleton_graph(graph) + + # orient edges on the causal graph object + if self.apply_orientations: + # first orient lagged edges + self.orient_lagged_edges(graph) + + if self.contemporaneous_edges: + # next orient contemporaneous edges if necessary + self.orient_contemporaneous_edges(graph) + + # store resulting data structures + self.graph_ = graph + + def orient_lagged_edges(self, graph: EquivalenceClass): + pass + + def orient_contemporaneous_edges(self, graph: EquivalenceClass): + pass + + def convert_skeleton_graph(self, graph: nx.Graph) -> EquivalenceClass: + return super().convert_skeleton_graph(graph) diff --git a/dodiscover/constraint/timeseries/tspcalg.py b/dodiscover/constraint/timeseries/tspcalg.py new file mode 100644 index 00000000..987e865c --- /dev/null +++ b/dodiscover/constraint/timeseries/tspcalg.py @@ -0,0 +1,176 @@ +import logging +from itertools import combinations, permutations +from typing import Iterator, Optional, Tuple + +import networkx as nx +import pandas as pd + +from dodiscover.ci.base import BaseConditionalIndependenceTest +from dodiscover.context import Context +from dodiscover.context_builder import make_ts_context +from dodiscover.typing import SeparatingSet + +from ..._protocol import EquivalenceClass +from ..config import SkeletonMethods +from ..pcalg import PC +from .skeleton import LearnTimeSeriesSkeleton + +logger = logging.getLogger() + + +class TimeSeriesPC(PC): + def __init__( + self, + ci_estimator: BaseConditionalIndependenceTest, + alpha: float = 0.05, + min_cond_set_size: Optional[int] = None, + max_cond_set_size: Optional[int] = None, + max_combinations: Optional[int] = None, + skeleton_method: SkeletonMethods = SkeletonMethods.NBRS, + apply_orientations: bool = True, + max_iter: int = 1000, + separate_lag_phase: bool = False, + contemporaneous_edges: bool = True, + **ci_estimator_kwargs, + ): + """[Experimental] Time-series PC algorithm. + + A PC algorithm specialized for time-series, which differs in two ways: + 1. learning the skeleton: during the removal of a non-contemporaneous edge, + remove all corresponding homologous edges. + 2. orienting edges: during the orientation of a non-contemporaneous edge, + remove all corresponding homologous edges. + + Homologous edges are edges that have repeating structure over time. + + Parameters + ---------- + ci_estimator : BaseConditionalIndependenceTest + _description_ + alpha : float, optional + _description_, by default 0.05 + min_cond_set_size : Optional[int], optional + _description_, by default None + max_cond_set_size : Optional[int], optional + _description_, by default None + max_combinations : Optional[int], optional + _description_, by default None + skeleton_method : SkeletonMethods, optional + _description_, by default SkeletonMethods.NBRS + apply_orientations : bool, optional + _description_, by default True + max_iter : int, optional + _description_, by default 1000 + contemporaneous_edges : bool + Whether or not to assume contemporaneous edges. + + References + ---------- + .. footbibliography:: + """ + super().__init__( + ci_estimator, + alpha, + min_cond_set_size, + max_cond_set_size, + max_combinations, + skeleton_method, + apply_orientations, + max_iter, + **ci_estimator_kwargs, + ) + self.separate_lag_phase = separate_lag_phase + self.contemporaneous_edges = contemporaneous_edges + + def learn_skeleton( + self, + data: pd.DataFrame, + context: Context, + sep_set: Optional[SeparatingSet] = None, + ) -> Tuple[nx.Graph, SeparatingSet]: + skel_alg = LearnTimeSeriesSkeleton( + self.ci_estimator, + sep_set=sep_set, + alpha=self.alpha, + min_cond_set_size=self.min_cond_set_size, + max_cond_set_size=self.max_cond_set_size, + max_combinations=self.max_combinations, + skeleton_method=self.skeleton_method, + keep_sorted=False, + separate_lag_phase=False, + contemporaneous_edges=self.contemporaneous_edges, + **self.ci_estimator_kwargs, + ) + skel_alg.fit(data, context) + + skel_graph = skel_alg.adj_graph_ + sep_set = skel_alg.sep_set_ + self.n_ci_tests += skel_alg.n_ci_tests + return skel_graph, sep_set + + def fit(self, data: pd.DataFrame, context: Context) -> None: + self.context_ = make_ts_context(context).build() + graph = self.context_.init_graph + self.init_graph_ = graph + self.fixed_edges_ = self.context_.included_edges + + # create a reference to the underlying data to be used + self.X_ = data + + # initialize graph object to apply learning + self.separating_sets_ = self._initialize_sep_sets(self.init_graph_) + + # learn skeleton graph and the separating sets per variable + graph, self.separating_sets_ = self.learn_skeleton( + self.X_, self.context_, self.separating_sets_ + ) + + # convert networkx.Graph to relevant causal graph object + graph = self.convert_skeleton_graph(graph) + + # orient edges on the causal graph object + if self.apply_orientations: + # first orient lagged edges + self.orient_lagged_edges(graph) + + if self.contemporaneous_edges: + # next orient contemporaneous edges if necessary + self.orient_contemporaneous_edges(graph) + + # store resulting data structures + self.graph_ = graph + + def orient_lagged_edges(self, graph: EquivalenceClass): + undirected_subgraph = graph.get_graphs(graph.undirected_edge_name) + # get non-lag nodes + for node in undirected_subgraph.nodes_at(t=0): + # get all lagged nbrs + for nbr in undirected_subgraph.lagged_neighbors(node): + # now orient this edge as u -> v + graph.orient_uncertain_edge(nbr, node) + + def orient_contemporaneous_edges(self, graph): + # for all pairs of non-adjacent variables with a common neighbor + # check if we can orient the edge as a collider + self.orient_unshielded_triples(graph, self.separating_sets_) + self.orient_edges(graph) + + def _orientable_edges(self, graph: EquivalenceClass) -> Iterator: + for (i, j) in permutations(graph.nodes_at(t=0), 2): # type: ignore + if i == j: + continue + yield (i, j) + + def _orientable_triples(self, graph: EquivalenceClass) -> Iterator: + # for every node in the PAG, evaluate neighbors that have any edge + for u in graph.nodes_at(t=0): # type: ignore + for v_i, v_j in combinations(graph.neighbors(u), 2): + yield (v_i, u, v_j) + + def convert_skeleton_graph(self, graph: nx.Graph) -> EquivalenceClass: + from pywhy_graphs import StationaryTimeSeriesCPDAG + + # convert Graph object to a CPDAG object with + # all undirected edges + graph = StationaryTimeSeriesCPDAG(incoming_undirected_edges=graph) + return graph diff --git a/dodiscover/constraint/timeseries/utils.py b/dodiscover/constraint/timeseries/utils.py new file mode 100644 index 00000000..e304b3e4 --- /dev/null +++ b/dodiscover/constraint/timeseries/utils.py @@ -0,0 +1,26 @@ +import numpy as np + + +def convert_ts_df_to_multiindex(df, max_lag): + n_samples = df.shape[0] + + # add lag column denoting which row corresponds to which lag + # for a set time window of 'max_lag' + q, r = divmod(n_samples, max_lag + 1) + lag_list = [lag for lag in range(max_lag, -1, -1)] + lag_col = q * lag_list + lag_list[:r] + df["lag"] = lag_col + + # add naming for the time-series variables + df.rename_axis("variable", axis=1, inplace=True) + + # compute which window each row in the dataframe is on + df["windowed_sample"] = np.arange(len(df)) // (max_lag + 1) + + # convert lag to '-' + df = df.assign(lag=-df["lag"]) + + # create a multi-index with the first level as variable and + # the second level as lag + df = df.pivot(index="windowed_sample", columns="lag") + return df diff --git a/dodiscover/constraint/utils.py b/dodiscover/constraint/utils.py index 00e6e2a2..b76a4724 100644 --- a/dodiscover/constraint/utils.py +++ b/dodiscover/constraint/utils.py @@ -1,3 +1,8 @@ +from itertools import combinations +from typing import Iterable, Set, SupportsFloat, Union + +import networkx as nx +import numpy as np import pandas as pd from dodiscover import Graph @@ -49,3 +54,79 @@ def is_in_sep_set( check_var in _sep_set for _sep_set in sep_set[x_var][y_var] ) return func(check_var in _sep_set for _sep_set in sep_set[x_var][y_var]) + + +def _iter_conditioning_set( + possible_variables: Iterable, + x_var: Union[SupportsFloat, str], + y_var: Union[SupportsFloat, str], + size_cond_set: int, +) -> Iterable[Set]: + """Iterate function to generate the conditioning set. + + Parameters + ---------- + possible_variables : iterable + A set/list/dict of possible variables to consider for the conditioning set. + This can be for example, the current adjacencies. + x_var : node + The node for the 'x' variable. + y_var : node + The node for the 'y' variable. + size_cond_set : int + The size of the conditioning set to consider. If there are + less adjacent variables than this number, then all variables will be in the + conditioning set. + + Yields + ------ + Z : set + The set of variables for the conditioning set. + """ + exclusion_set = {x_var, y_var} + + all_adj_excl_current = [p for p in possible_variables if p not in exclusion_set] + + # loop through all possible combinations of the conditioning set size + for cond in combinations(all_adj_excl_current, size_cond_set): + cond_set = set(cond) + yield cond_set + + +def _find_neighbors_along_path(G: nx.Graph, start, end) -> Set: + """Find neighbors that are along a path from start to end. + + Parameters + ---------- + G : nx.Graph + The graph. + start : Node + The starting node. + end : Node + The ending node. + + Returns + ------- + nbrs : Set + The set of neighbors that are also along a path towards + the 'end' node. + """ + + def _assign_weight(u, v, edge_attr): + if u == node or v == node: + return np.inf + else: + return 1 + + nbrs = set() + for node in G.neighbors(start): + if not G.has_edge(start, node): + raise RuntimeError(f"{start} and {node} are not connected, but they are assumed to be.") + + # find a path from start node to end + path = nx.shortest_path(G, source=node, target=end, weight=_assign_weight) + if len(path) > 0: + if start in path: + raise RuntimeError("There is an error with the input. This is not possible.") + nbrs.add(node) + return nbrs diff --git a/dodiscover/context.py b/dodiscover/context.py index 8830af85..c0c93b65 100644 --- a/dodiscover/context.py +++ b/dodiscover/context.py @@ -1,9 +1,11 @@ -from typing import Any, Dict, Set +import inspect +from typing import Any, Dict, Set, TypeVar import networkx as nx -from ._protocol import Graph +from ._protocol import TimeSeriesGraph from .typing import Column, NetworkxGraph +from .utils import dict_compare class Context: @@ -14,11 +16,11 @@ class Context: Parameters ---------- - variables : Set + observed_variables : Set Set of observed variables. If neither ``latents``, nor ``variables`` is set, then it is presumed that ``variables`` consists of the columns of ``data`` and ``latents`` is the empty set. - latents : Set + latent_variables : Set Set of latent "unobserved" variables. If neither ``latents``, nor ``variables`` is set, then it is presumed that ``variables`` consists of the columns of ``data`` and ``latents`` is the empty set. @@ -28,6 +30,10 @@ class Context: Included edges without direction. excluded_edges : nx.Graph Excluded edges without direction. + state_variables : dict + A dictionary of state variables that are preserved during the + causal discovery algorithm. For example, the FCI algorithm must store + the intermediate PAG before running a second phase of skeleton discovery. Raises ------ @@ -46,28 +52,115 @@ class Context: _variables: Set[Column] _latents: Set[Column] - _init_graph: Graph + _init_graph: nx.Graph _included_edges: nx.Graph _excluded_edges: nx.Graph _state_variables: Dict[str, Any] def __init__( self, - variables: Set[Column], - latents: Set[Column], - init_graph: Graph, + observed_variables: Set[Column], + latent_variables: Set[Column], + init_graph: nx.Graph, included_edges: NetworkxGraph, excluded_edges: NetworkxGraph, state_variables: Dict[str, Any], ) -> None: # set to class self._state_variables = state_variables - self._variables = variables - self._latents = latents + self._variables = observed_variables + self._latents = latent_variables self._init_graph = init_graph self._included_edges = included_edges self._excluded_edges = excluded_edges + @property + def _internal_graphs(self): + """Private property to store a list of the names of graph objects.""" + graphs = ["init_graph", "included_edges", "excluded_edges"] + return graphs + + @classmethod + def _get_param_names(cls): + """Get parameter names for the context.""" + # fetch the constructor or the original constructor before + # deprecation wrapping if any + init = getattr(cls.__init__, "deprecated_original", cls.__init__) + if init is object.__init__: + # No explicit constructor to introspect + return [] + + # introspect the constructor arguments to find the context parameters + # to represent + init_signature = inspect.signature(init) + # Consider the constructor parameters excluding 'self' + parameters = [ + p + for p in init_signature.parameters.values() + if p.name != "self" and p.kind != p.VAR_KEYWORD + ] + for p in parameters: + if p.kind == p.VAR_POSITIONAL: + raise RuntimeError( + "dodiscover context objects should always " + "specify their parameters in the signature" + " of their __init__ (no varargs)." + " %s with constructor %s doesn't " + " follow this convention." % (cls, init_signature) + ) + # Extract and sort argument names excluding 'self' + return sorted([p.name for p in parameters]) + + def get_params(self, deep=True): + """ + Get parameters for this context. + + Parameters + ---------- + deep : bool, default=True + If True, will return the parameters for this context and + contained subobjects that are context. + + Returns + ------- + params : dict + Parameter names mapped to their values. + """ + out = dict() + for key in self._get_param_names(): + value = getattr(self, key) + if deep and hasattr(value, "get_params") and not isinstance(value, type): + deep_items = value.get_params().items() + out.update((key + "__" + k, val) for k, val in deep_items) + out[key] = value + return out + + def __eq__(self, context: object) -> bool: + if not isinstance(context, Context): + raise RuntimeError(f"Context is not comparable to non-context types {context}.") + + context_params = context.get_params() + self_params = self.get_params() + + # graph objects that we must check explicitly + graph_comps = self._internal_graphs + context_graphs = [] + self_graphs = [] + for name in graph_comps: + context_graphs.append(context_params.pop(name)) + self_graphs.append(self_params.pop(name)) + + # check all graphs are isomorphic + for ctx_graph, self_graph in zip(context_graphs, self_graphs): + if not nx.is_isomorphic(ctx_graph, self_graph): + return False + + # finally check the rest + added, removed, modified, _ = dict_compare(context_params, self_params) + if len(added) > 0 or len(removed) > 0 or len(modified) > 0: + return False + return True + @property def included_edges(self) -> nx.Graph: return self._included_edges @@ -77,7 +170,7 @@ def excluded_edges(self) -> nx.Graph: return self._excluded_edges @property - def init_graph(self) -> Graph: + def init_graph(self) -> nx.Graph: return self._init_graph @property @@ -124,3 +217,93 @@ def state_variable(self, name: str) -> Any: raise RuntimeError(f"{name} is not a state variable: {self._state_variables}") return self._state_variables[name] + + +class TimeSeriesContext(Context): + """Context for time-series causal discovery. + + Parameters + ---------- + variables : Set + Set of observed variables. If neither ``latents``, + nor ``variables`` is set, then it is presumed that ``variables`` consists + of the columns of ``data`` and ``latents`` is the empty set. In a time-series + context, variables do not have a time-index. See Notes for details. + latents : Set + Set of latent "unobserved" variables. If neither ``latents``, + nor ``variables`` is set, then it is presumed that ``variables`` consists + of the columns of ``data`` and ``latents`` is the empty set. + init_graph : Graph + The graph to start with. + included_edges : nx.Graph + Included edges without direction. + excluded_edges : nx.Graph + Excluded edges without direction. + state_variables : Dict[str, Any] + _description_ + included_lag_edges : _type_ + _description_ + excluded_lag_edges : _type_ + _description_ + max_lag : int + _description_ + contemporaneous_edges : bool + Whether or not to assume contemporaneous edges. + """ + + def __init__( + self, + observed_variables: Set[Column], + latent_variables: Set[Column], + init_graph: nx.Graph, + included_edges: NetworkxGraph, + excluded_edges: NetworkxGraph, + state_variables: Dict[str, Any], + included_lag_edges: TimeSeriesGraph, + excluded_lag_edges: TimeSeriesGraph, + max_lag: int, + contemporaneous_edges: bool, + ) -> None: + super().__init__( + observed_variables, + latent_variables, + init_graph, + included_edges, + excluded_edges, + state_variables, + ) + self._max_lag = max_lag + self._included_lag_edges = included_lag_edges + self._excluded_lag_edges = excluded_lag_edges + self._contemporaneous_edges = contemporaneous_edges + + @property + def _internal_graphs(self): + """Private property to store a list of the names of graph objects.""" + graphs = [ + "init_graph", + "included_edges", + "excluded_edges", + "included_lag_edges", + "excluded_lag_edges", + ] + return graphs + + @property + def contemporaneous_edges(self) -> bool: + return self._contemporaneous_edges + + @property + def max_lag(self) -> int: + return self._max_lag + + @property + def included_lag_edges(self) -> TimeSeriesGraph: + return self._included_lag_edges + + @property + def excluded_lag_edges(self) -> TimeSeriesGraph: + return self._excluded_lag_edges + + +ContextType = TypeVar("ContextType", bound=Context) diff --git a/dodiscover/context_builder.py b/dodiscover/context_builder.py index dd331e0b..168407b7 100644 --- a/dodiscover/context_builder.py +++ b/dodiscover/context_builder.py @@ -1,11 +1,11 @@ -from copy import copy, deepcopy +from copy import deepcopy from typing import Any, Dict, Optional, Set, Tuple, cast import networkx as nx import pandas as pd -from ._protocol import Graph -from .context import Context +from ._protocol import TimeSeriesGraph +from .context import Context, TimeSeriesContext from .typing import Column, NetworkxGraph @@ -17,15 +17,19 @@ class ContextBuilder: `dodiscover.make_context` to build a Context data structure. """ - _graph: Optional[Graph] = None + _graph: Optional[nx.Graph] = None _included_edges: Optional[NetworkxGraph] = None _excluded_edges: Optional[NetworkxGraph] = None _observed_variables: Optional[Set[Column]] = None _latent_variables: Optional[Set[Column]] = None _state_variables: Dict[str, Any] = dict() - def graph(self, graph: Graph) -> "ContextBuilder": - """Set the partial graph to start with. + def init_graph(self, graph: nx.Graph) -> "ContextBuilder": + """Set the initial partial undirected graph to start with. + + For example, this could be the complete graph to start with, if there is + no prior knowledge. Or this could be a graph that is a continuation of a + previous causal discovery algorithm. Parameters ---------- @@ -40,18 +44,12 @@ def graph(self, graph: Graph) -> "ContextBuilder": self._graph = graph return self - def edges( - self, - include: Optional[NetworkxGraph] = None, - exclude: Optional[NetworkxGraph] = None, - ) -> "ContextBuilder": - """Set edge constraints to apply in discovery. + def excluded_edges(self, edges: NetworkxGraph) -> "ContextBuilder": + """Set excluded non-directional edge constraints to apply in discovery. Parameters ---------- - included : Optional[NetworkxGraph] - Edges that should be included in the resultant graph - excluded : Optional[NetworkxGraph] + edges : Optional[NetworkxGraph] Edges that must be excluded in the resultant graph Returns @@ -59,8 +57,33 @@ def edges( self : ContextBuilder The builder instance """ - self._included_edges = include - self._excluded_edges = exclude + self._excluded_edges = edges + return self + + def included_edges(self, edges: NetworkxGraph) -> "ContextBuilder": + """Set included non-directional edge constraints to apply in discovery. + + Parameters + ---------- + edges : Optional[NetworkxGraph] + Edges that must be included in the resultant graph + + Returns + ------- + self : ContextBuilder + The builder instance + """ + self._included_edges = edges + return self + + def latent_variables(self, latents: Set[Column]): + """Latent variables.""" + self._latent_variables = latents + return self + + def observed_variables(self, observed: Set[Column]): + """Observed variables.""" + self._observed_variables = observed return self def variables( @@ -145,13 +168,15 @@ def build(self) -> Context: if self._observed_variables is None: raise ValueError("Could not infer variables from data or given arguments.") + # initialize an empty graph object as the default for included/excluded edges empty_graph = lambda: nx.empty_graph(self._observed_variables, create_using=nx.Graph) + return Context( init_graph=self._interpolate_graph(), included_edges=self._included_edges or empty_graph(), excluded_edges=self._excluded_edges or empty_graph(), - variables=self._observed_variables, - latents=self._latent_variables or set(), + observed_variables=self._observed_variables, + latent_variables=self._latent_variables or set(), state_variables=self._state_variables, ) @@ -202,6 +227,112 @@ def _interpolate_graph(self) -> nx.Graph: return self._graph +class TimeSeriesContextBuilder(ContextBuilder): + """A builder class for creating TimeSeriesContext objects ergonomically. + + The context builder provides a way to capture assumptions, domain knowledge, + and data relevant for time-series. This should NOT be instantiated directly. + One should instead use `dodiscover.make_ts_context` to build a TimeSeriesContext + data structure. + """ + + _contemporaneous_edges: Optional[bool] = None + _max_lag: Optional[int] = None + _included_lag_edges: Optional[TimeSeriesGraph] = None + _exccluded_lag_edges: Optional[TimeSeriesGraph] = None + + def _interpolate_graph(self) -> nx.Graph: + from pywhy_graphs.classes import StationaryTimeSeriesGraph + from pywhy_graphs.classes.timeseries import complete_ts_graph + + if self._observed_variables is None: + raise ValueError("Must set variables() before building Context.") + if self._max_lag is None: + raise ValueError("Must set max_lag before building Context.") + + # initialize the starting graph + if self._graph is None: + include_contemporaneous = self._contemporaneous_edges or True + # create a complete graph over all nodes and time points + complete_graph = complete_ts_graph( + variables=self._observed_variables, + max_lag=self._max_lag, + include_contemporaneous=include_contemporaneous, + create_using=StationaryTimeSeriesGraph, + ) + return complete_graph + else: + if set(self._graph.variables) != set(self._observed_variables): + raise ValueError( + f"The nodes within the initial graph, {self._graph.nodes}, " + f"do not match the nodes in the passed in data, {self._observed_variables}." + ) + + for var_name in self._observed_variables: + for lag in range(self._max_lag + 1): + if (var_name, -lag) not in self._graph.nodes: + raise RuntimeError( + f"Graph does not contain all possible nodes, " + f"such as {(var_name, -lag)}." + ) + + return self._graph + + def init_graph(self, graph: TimeSeriesGraph) -> "ContextBuilder": + """Set the initial time-series graph to begin causal discovery with.""" + return super().init_graph(graph) + + def max_lag(self, lag: int) -> "ContextBuilder": + """Set the maximum time lag.""" + if lag <= 0: + raise ValueError(f"Lag in time-series graphs should be > 0, not {lag}.") + self._max_lag = lag + return self + + def contemporaneous_edges(self, present: bool) -> "ContextBuilder": + """Whether or not to assume contemporaneous edges.""" + self._contemporaneous_edges = present + return self + + def included_lag_edges(self, edges: TimeSeriesGraph) -> "ContextBuilder": + """Apriori set lagged edges.""" + self._included_lag_edges = edges + return self + + def excluded_lag_edges(self, edges: TimeSeriesGraph) -> "ContextBuilder": + """Apriori excluded lagged edges.""" + self._excluded_lag_edges = edges + return self + + def build(self) -> TimeSeriesContext: + """Build a time-series context object.""" + from pywhy_graphs.classes.timeseries import empty_ts_graph + + if self._observed_variables is None: + raise ValueError("Could not infer variables from data or given arguments.") + + if self._max_lag is None: + raise ValueError("Max lag must be set before building time-series context") + + # initialize an empty graph object as the default for included/excluded edges + empty_graph = lambda: nx.empty_graph(self._observed_variables, create_using=nx.Graph) + empty_time_graph = lambda: empty_ts_graph(self._observed_variables, max_lag=self._max_lag) + + # initialize assumption of contemporaneous edges by default + return TimeSeriesContext( + init_graph=self._interpolate_graph(), + included_edges=self._included_edges or empty_graph(), + excluded_edges=self._excluded_edges or empty_graph(), + observed_variables=self._observed_variables, + latent_variables=self._latent_variables or set(), + state_variables=self._state_variables, + included_lag_edges=self._included_lag_edges or empty_time_graph(), + excluded_lag_edges=self._included_lag_edges or empty_time_graph(), + max_lag=self._max_lag, + contemporaneous_edges=self._contemporaneous_edges or True, + ) + + def make_context(context: Optional[Context] = None) -> ContextBuilder: """Create a new ContextBuilder instance. @@ -219,8 +350,23 @@ def make_context(context: Optional[Context] = None) -> ContextBuilder: """ result = ContextBuilder() if context is not None: - result.graph(deepcopy(context.init_graph)) - result.edges(deepcopy(context.included_edges), deepcopy(context.excluded_edges)) - result.variables(copy(context.observed_variables), copy(context.latent_variables)) - result.state_variables(deepcopy(context.state_variables)) + params = context.get_params() + for param, value in params.items(): + if not hasattr(result, param): + raise RuntimeError(f"{param} is not a member of Context and ContexBuilder.") + # get the function for parameter + getattr(result, param)(deepcopy(value)) + return result + + +def make_ts_context(context: Optional[Context] = None) -> TimeSeriesContextBuilder: + """Create a time-series context builder.""" + result = TimeSeriesContextBuilder() + if context is not None: + params = context.get_params() + for param, value in params.items(): + if not hasattr(result, param): + raise RuntimeError(f"{param} is not a member of Context and ContexBuilder.") + # get the function for parameter + getattr(result, param)(deepcopy(value)) return result diff --git a/dodiscover/typing.py b/dodiscover/typing.py index ddbbd3d6..426fd765 100644 --- a/dodiscover/typing.py +++ b/dodiscover/typing.py @@ -2,10 +2,14 @@ import networkx as nx +# from .context import Context as BaseContext +# from .context_builder import ContextBuilder as BaseContextBuilder + # Pandas DataFrame columns that are also compatible with Graph nodes Column = Union[int, float, str] # The separating set used in constraint-based causal discovery SeparatingSet = Dict[Column, Dict[Column, List[Set[Column]]]] +# The relevant networkx graphs accepted in this module NetworkxGraph = Union[nx.Graph, nx.DiGraph] diff --git a/dodiscover/utils.py b/dodiscover/utils.py new file mode 100644 index 00000000..210bd5c4 --- /dev/null +++ b/dodiscover/utils.py @@ -0,0 +1,10 @@ +def dict_compare(d1, d2): + """Compare two dictionaries.""" + d1_keys = set(d1.keys()) + d2_keys = set(d2.keys()) + shared_keys = d1_keys.intersection(d2_keys) + added = d1_keys - d2_keys + removed = d2_keys - d1_keys + modified = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]} + same = set(o for o in shared_keys if d1[o] == d2[o]) + return added, removed, modified, same diff --git a/tests/unit_tests/constraint/test_skeleton.py b/tests/unit_tests/constraint/test_skeleton.py index 0b622e85..12f0f69a 100644 --- a/tests/unit_tests/constraint/test_skeleton.py +++ b/tests/unit_tests/constraint/test_skeleton.py @@ -188,7 +188,7 @@ def test_learn_pds_skeleton(): # learn the skeleton of the graph now with the first stage skeleton context = ( make_context(context) - .graph(first_stage_pag.to_undirected()) + .init_graph(first_stage_pag.to_undirected()) .state_variable("PAG", first_stage_pag) .build() ) diff --git a/tests/unit_tests/constraint/timeseries/test_fcialg.py b/tests/unit_tests/constraint/timeseries/test_fcialg.py new file mode 100644 index 00000000..2660c150 --- /dev/null +++ b/tests/unit_tests/constraint/timeseries/test_fcialg.py @@ -0,0 +1,112 @@ +import networkx as nx +import numpy as np +import pandas as pd +from pywhy_graphs import StationaryTimeSeriesDiGraph + +from dodiscover.ci import Oracle +from dodiscover.constraint.timeseries import TimeSeriesFCI +from dodiscover.constraint.timeseries.utils import convert_ts_df_to_multiindex +from dodiscover.context_builder import make_ts_context + +seed = 12345 +rng = np.random.default_rng(seed) + + +def test_evaluate_edge(): + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + (("x1", 0), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + alg = TimeSeriesFCI(ci_estimator=oracle) + data = convert_ts_df_to_multiindex(data, max_lag) + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x1", -1)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0), {("x1", -1)}) + assert pvalue == 1.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0), {("x1", -1)}) + assert pvalue == 1.0 + + +def test_timeseries_fci_oracle(): + r"""Test tsFCI's algorithm assuming no contemporaneous edges. + + Tests the ts skeleton method with an oracle from + Figure 1 and 2 of the tsFCI paper with the difference that the graph + is fully observed here. + + Figure 1, where "\>" and "/>" are edges pointing + down, or up diagonally. + + x1(t-2) -> x1(t-1) -> x1(t) + \> \> + x2(t-2) x2(t-1) x2(t) + /> /> + x3(t-2) -> x3(t-1) -> x3(t) + """ + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + print(data.columns) + data.drop("x3", axis=1, inplace=True) + + # create an oracle + oracle = Oracle(G) + alg = TimeSeriesFCI(ci_estimator=oracle, separate_lag_phase=False, contemporaneous_edges=False) + + context = make_ts_context().max_lag(max_lag).variables(data=data).build() + + # learn the skeleton graph + skel_graph, sep_set = alg.learn_skeleton(data, context) + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + assert sep_set[("x1", 0)][("x1", -1)] == [] + assert {("x1", -1)} in sep_set[("x2", 0)][("x1", -2)] + + # now test the full fit algorithm + # alg.fit(data, context) + # learned_graph = alg.graph_ + + # # all edges in skeleton are inside G + # assert nx.is_isomorphic(learned_graph.to_directed(), G.to_directed()) diff --git a/tests/unit_tests/constraint/timeseries/test_pcalg.py b/tests/unit_tests/constraint/timeseries/test_pcalg.py new file mode 100644 index 00000000..f4400825 --- /dev/null +++ b/tests/unit_tests/constraint/timeseries/test_pcalg.py @@ -0,0 +1,186 @@ +import networkx as nx +import numpy as np +import pandas as pd +from pywhy_graphs import StationaryTimeSeriesDiGraph + +from dodiscover.ci import Oracle +from dodiscover.constraint.timeseries import TimeSeriesPC +from dodiscover.constraint.timeseries.utils import convert_ts_df_to_multiindex +from dodiscover.context_builder import make_ts_context + +seed = 12345 +rng = np.random.default_rng(seed) + + +def test_evaluate_edge(): + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + (("x1", 0), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + alg = TimeSeriesPC(ci_estimator=oracle) + data = convert_ts_df_to_multiindex(data, max_lag) + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x1", -1)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0), {("x1", -1)}) + assert pvalue == 1.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0), {("x1", -1)}) + assert pvalue == 1.0 + + +class TestTsPCSimple: + """Test tsPC algorithm against the modified graph in the tsFCI paper.""" + + def setup(self): + pass + + def test_timeseries_pc_oracle(self): + r"""Test tsPC's algorithm assuming no contemporaneous edges. + + Tests the ts skeleton method with an oracle from + Figure 1 and 2 of the tsFCI paper with the difference that the graph + is fully observed here. + + Figure 1, where "\>" and "/>" are edges pointing + down, or up diagonally. + + x1(t-2) -> x1(t-1) -> x1(t) + \> \> + x2(t-2) x2(t-1) x2(t) + /> /> + x3(t-2) -> x3(t-1) -> x3(t) + """ + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + alg = TimeSeriesPC( + ci_estimator=oracle, separate_lag_phase=False, contemporaneous_edges=False + ) + + context = make_ts_context().max_lag(max_lag).variables(data=data).build() + + # learn the skeleton graph + skel_graph, sep_set = alg.learn_skeleton(data, context) + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + assert sep_set[("x1", -1)][("x3", -1)] == [] + assert {("x3", -1)} in sep_set[("x2", 0)][("x3", 0)] + + # now test the full fit algorithm + alg.fit(data, context) + learned_graph = alg.graph_ + + # all edges in skeleton are inside G + assert nx.is_isomorphic(learned_graph.to_directed(), G.to_directed()) + + def test_timeseries_pc_contemporaneous(self): + """Test tsPC algorithm with contemporaneous edges. + + Uses figure 2 from PCMCI+ paper. + """ + # t-1 t + # x o -> o + # ^ \> ^ + # y o -> o + # ^ ^ + # z o o + var_names = ["x", "y", "z"] + ts_edges = [ + (("x", -1), ("x", 0)), + (("x", -1), ("y", 0)), + (("y", -1), ("y", 0)), + (("y", 0), ("x", 0)), + (("z", 0), ("y", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + context = make_ts_context().max_lag(max_lag).variables(data=data).build() + + # First: test that the algorithm return the correct answer + # learn the skeleton graph without contemporaneous edges + alg = TimeSeriesPC( + ci_estimator=oracle, separate_lag_phase=False, contemporaneous_edges=True + ) + skel_graph, _ = alg.learn_skeleton(data, context) + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + # now test the full fit algorithm + alg.fit(data, context) + learned_graph = alg.graph_ + + # all edges in skeleton are inside G + assert nx.is_isomorphic(learned_graph.to_directed(), G.to_directed()) + assert nx.is_isomorphic(learned_graph.sub_directed_graph(), G) + + # learn the skeleton graph without contemporaneous edges + alg = TimeSeriesPC( + ci_estimator=oracle, separate_lag_phase=False, contemporaneous_edges=False + ) + skel_graph, _ = alg.learn_skeleton(data, context) + # all edges in skeleton are inside G + assert not all(edge in skel_graph.edges for edge in G.edges) + assert not nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + + +class TestTsPCSimulated: + """Test tsPC algorithm against more complex simulated graphs.""" + + def setup(self): + pass diff --git a/tests/unit_tests/constraint/timeseries/test_skeleton.py b/tests/unit_tests/constraint/timeseries/test_skeleton.py new file mode 100644 index 00000000..8c39ca2f --- /dev/null +++ b/tests/unit_tests/constraint/timeseries/test_skeleton.py @@ -0,0 +1,246 @@ +import networkx as nx +import numpy as np +import pandas as pd +import pytest +from pywhy_graphs import StationaryTimeSeriesDiGraph, StationaryTimeSeriesGraph + +from dodiscover.ci import Oracle +from dodiscover.constraint.timeseries import LearnTimeSeriesSkeleton +from dodiscover.constraint.timeseries.utils import convert_ts_df_to_multiindex +from dodiscover.context_builder import make_ts_context + +seed = 12345 +rng = np.random.default_rng(seed) + + +def test_skeleton_evaluate_edge(): + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + (("x1", 0), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + alg = LearnTimeSeriesSkeleton(ci_estimator=oracle) + data = convert_ts_df_to_multiindex(data, max_lag) + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x1", -1)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0), {("x1", -1)}) + assert pvalue == 1.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0), {("x1", -1)}) + assert pvalue == 1.0 + + _, pvalue = alg.evaluate_edge(data, ("x2", -1), ("x3", 0), {}) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x2", -1), ("x3", 0), {("x3", -1)}) + assert pvalue == 0.0 + + +def test_markovian_skeleton_oracle(): + r"""Test tsPC's skeleton algorithm assuming no latent confounders nor contemporaneous edges. + + Tests the ts skeleton method with an oracle from + Figure 1 and 2 of the tsFCI paper with the difference that the graph + is fully observed here. + + Figure 1, where "\>" and "/>" are edges pointing + down, or up diagonally. + + x1(t-2) -> x1(t-1) -> x1(t) + \> \> + x2(t-2) x2(t-1) x2(t) + /> /> + x3(t-2) -> x3(t-1) -> x3(t) + """ + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + alg = LearnTimeSeriesSkeleton( + ci_estimator=oracle, separate_lag_phase=False, contemporaneous_edges=False + ) + + context = make_ts_context().max_lag(max_lag).variables(data=data).build() + + alg.fit(data, context) + skel_graph = alg.adj_graph_ + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + + +def test_markovian_skeleton_with_contemporaneous_edges(): + r"""Test tsFCI's "PC" algorithm skeleton assuming contemporaneous edges. + + Tests the ts skeleton method with an oracle from a modified version of + Figure 1 and 2 of the tsFCI paper. + + Figure 1, where "\>" and "/>" are edges pointing + down, or up diagonally. + + x1(t-2) -> x1(t-1) -> x1(t) + \> \> + x2(t-2) x2(t-1) x2(t) + /> /> + x3(t-2) -> x3(t-1) -> x3(t) + + with contemporaneous edges + x1(t) -> x2(t); + x3(t) -> x2(t) + """ + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + (("x1", 0), ("x3", 0)), + (("x3", 0), ("x2", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G.copy(double_max_lag=True)) + alg = LearnTimeSeriesSkeleton(ci_estimator=oracle) + context = make_ts_context().max_lag(max_lag + 1).variables(data=data).build() + + # learn the graph + alg.fit(data, context) + skel_graph = alg.adj_graph_ + skel_graph.set_max_lag(max_lag) + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + print(skel_graph) + print(G) + + for edge in skel_graph.to_undirected().edges: + if not G.to_undirected().has_edge(*edge): + print(edge) + + assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + + +@pytest.mark.skip(reason="make work...") +def test_semi_markovian_skeleton_oracle(): + r"""Test tsFCI's "FCI" algorithm skeleton assuming latent confounders. + + Tests the ts skeleton method with an oracle from + Figure 1 and 2 of the tsFCI paper. + + Figure 1, where "\>" and "/>" are edges pointing + down, or up diagonally. + + x1(t-2) -> x1(t-1) -> x1(t) + \> \> + x2(t-2) x2(t-1) x2(t) + /> /> + x3(t-2) -> x3(t-1) -> x3(t) + + However, in the dataset, x3 is not observed. The expected + skeleton is the undirected graph version of Figure 2b. + + x1(t-2) -> x1(t-1) -> x1(t) + ^ + | \ \ + v > > + x2(t-2) <-> x2(t-1) <-> x2(t) + + and x2(t-2) <-> x2(t) + """ + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + ] + max_lag = 2 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create expected graph + expected_G = StationaryTimeSeriesGraph(max_lag=max_lag) + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x2", -1), ("x2", 0)), + (("x2", -2), ("x2", 0)), + (("x1", -2), ("x2", -2)), # contemporaneous edge at end + ] + expected_G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + data.drop("x3", axis=1, inplace=True) + + # create an oracle using the original graph`` + oracle = Oracle(G) + alg = LearnTimeSeriesSkeleton( + ci_estimator=oracle, + ) + + context = make_ts_context().max_lag(max_lag).variables(data=data).build() + alg.fit(data, context) + skel_graph = alg.adj_graph_ + + for edge in skel_graph.edges: + print(sorted(edge, key=lambda x: x[1])) + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + assert nx.is_isomorphic(skel_graph.to_undirected(), expected_G.to_undirected()) diff --git a/tests/unit_tests/test_context_builder.py b/tests/unit_tests/test_context_builder.py index 42e3feeb..4cc33a7b 100644 --- a/tests/unit_tests/test_context_builder.py +++ b/tests/unit_tests/test_context_builder.py @@ -4,6 +4,7 @@ import pytest from dodiscover import make_context +from dodiscover.context_builder import make_ts_context seed = 12345 @@ -15,8 +16,8 @@ def make_df() -> pd.DataFrame: return pd.DataFrame(np.hstack((X, Y)), columns=["x", "y"]) -def test_constructor(): - ctx = make_context() +@pytest.mark.parametrize("ctx", [make_context(), make_ts_context()]) +def test_constructor(ctx): assert ctx is not None @@ -24,7 +25,7 @@ def test_build_with_initial_graph(): graph = nx.DiGraph() graph.add_edges_from([("x", "y")]) data = make_df() - ctx = make_context().graph(graph).variables(data=data).build() + ctx = make_context().init_graph(graph).variables(data=data).build() assert ctx.init_graph is graph # if the initial graph does not match the variables passed in, then raise an error @@ -37,6 +38,19 @@ def test_build_with_observed_and_latents(): assert ctx.observed_variables == set("x") assert ctx.latent_variables == set("y") + +def test_with_context(): + """Test make and builder with a previous context""" + graph = nx.DiGraph() + graph.add_edges_from([("x", "y")]) + data = make_df() + ctx = make_context().init_graph(graph).variables(data=data).build() + + new_ctx = make_context(ctx).build() + + # test equality + assert ctx == new_ctx + df = make_df() ctx_builder = make_context() # if we only set observed, then the latents should be inferred from the @@ -49,6 +63,23 @@ def test_build_with_observed_and_latents(): assert ctx.observed_variables == {"y"} +def test_ts_context(): + graph = nx.DiGraph() + graph.add_edges_from([("x", "y")]) + data = make_df() + max_lag = 2 + builder = make_ts_context().init_graph(graph).variables(data=data) + + with pytest.raises(ValueError, match="Max lag must be set before building time-series context"): + builder.build() + + ctx = builder.max_lag(max_lag).build() + new_ctx = make_ts_context(ctx).build() + + # test equality + assert ctx == new_ctx + + def test_build_context_errors(): ctx_builder = make_context() df = make_df()