diff --git a/tests/test_pathway_analysis.py b/tests/test_pathway_analysis.py index f5d4fe0..6e8b044 100644 --- a/tests/test_pathway_analysis.py +++ b/tests/test_pathway_analysis.py @@ -26,8 +26,8 @@ def test_ndp(): @pytest.mark.parametrize("source,target", [('import', 'heu_pu_collector'), - ('mine', 'export'), - ('import', 'export')]) + ('mine', 'export'), + ('import', 'export')]) def test_ndp_raise_exception(source, target): G = nx.DiGraph() edges = [('mine', 'civ_enrich'), ('mine', 'mil_enrich'), @@ -36,7 +36,7 @@ def test_ndp_raise_exception(source, target): ('mil_enrich', 'heu_pu_collector')] G.add_edges_from(edges) - with pytest.raises(ValueError) as excinfo: + with pytest.raises(ValueError) as excinfo: obj = pa.find_node_disjoint_paths(G, source, target) assert 'Source and/or target not in graph G!' in str(excinfo.value) @@ -101,7 +101,7 @@ def test_has_multiedges_not_multigraph(): exp = None G = nx.DiGraph() G.add_edge('a', 'b') - + obs = pa.has_multiedges(G) assert obs == exp @@ -119,6 +119,17 @@ def test_transform_to_digraph(): assert obs == exp +def test_transform_to_digraph_has_multiedges(): + G = nx.MultiDiGraph() + G.add_edge('a', 'b') + G.add_edge('a', 'b') + exp = False + + (obs_H, obs_safe) = pa.transform_to_digraph(G) + + assert obs_safe == exp + + @pytest.mark.parametrize("G", [(nx.Graph()), (nx.MultiGraph())]) def test_transform_to_digraph_unsafe(G): exp = False @@ -130,7 +141,7 @@ def test_transform_to_digraph_unsafe(G): assert obs_safe == exp -@pytest.mark.parametrize("G,exp", [(nx.Graph(), None), +@pytest.mark.parametrize("G,exp", [(nx.Graph(), None), (nx.MultiGraph(), None)]) def test_transform_to_digraph_formats(G, exp): G.add_edge('a', 'b') @@ -139,7 +150,7 @@ def test_transform_to_digraph_formats(G, exp): assert obs_H == exp -@pytest.mark.parametrize("G,exp", [(nx.Graph(), False), +@pytest.mark.parametrize("G,exp", [(nx.Graph(), False), (nx.DiGraph(), True), (nx.MultiGraph(), False), (nx.MultiDiGraph(), True)]) @@ -176,7 +187,7 @@ def test_maximum_flow_several_paths(): def test_maximum_flow_multigraph(): - '''NetworkX retains only the most recently added edge if multiple edges + '''NetworkX retains only the most recently added edge if multiple edges exit and a MultiGraph is converted into a DiGraph''' exp = 1 G = nx.DiGraph() @@ -190,7 +201,7 @@ def test_maximum_flow_multigraph(): def test_maximum_flow_multigraph_order_flipped(): - '''NetworkX retains only the most recently added edge if multiple edges + '''NetworkX retains only the most recently added edge if multiple edges exit and a MultiGraph is converted into a DiGraph''' exp = 2 G = nx.DiGraph() @@ -221,8 +232,8 @@ def test_maximum_flow_directed_only(): def test_maximum_flow_not_directed_or_multi(): exp = None G = nx.Graph() - G.add_edge('a', 'b', capacity = 2) - G.add_edge('b', 'c', capacity = 5) + G.add_edge('a', 'b', capacity=2) + G.add_edge('b', 'c', capacity=5) with pytest.raises(TypeError) as excinfo: (obs_path, obs) = pa.find_maximum_flow(G, 'a', 'c') @@ -232,14 +243,14 @@ def test_maximum_flow_not_directed_or_multi(): def test_find_pathway_flow(): G = nx.DiGraph() edges = [(0, 1, {'capacity': 3}), - (1, 2, {'capacity': 5}), - (2, 3, {'capacity': 3}), - (3, 4, {'capacity': 4}), - (0, 5, {'capacity': 2}), - (5, 6, {'capacity': 3}), - (6, 3, {'capacity': 2}), - (3, 7, {'capacity': 1}), - (7, 4, {'capacity': 2})] + (1, 2, {'capacity': 5}), + (2, 3, {'capacity': 3}), + (3, 4, {'capacity': 4}), + (0, 5, {'capacity': 2}), + (5, 6, {'capacity': 3}), + (6, 3, {'capacity': 2}), + (3, 7, {'capacity': 1}), + (7, 4, {'capacity': 2})] G.add_edges_from(edges) path = (0, 5, 6, 3, 7) @@ -248,15 +259,16 @@ def test_find_pathway_flow(): assert obs == exp + def test_find_pathway_flow_no_capacity(): G = nx.DiGraph() - edges = [(0,1), (1,2), (2,3)] + edges = [(0, 1), (1, 2), (2, 3)] G.add_edges_from(edges) path = (0, 1, 2, 3) - with pytest.raises(nx.exception.NetworkXUnbounded) as excinfo: - obj = pa.find_pathway_flow(G, path) - assert 'Infinite capacity path, flow unbounded above.' in str(excinfo.value) + with pytest.raises(nx.exception.NetworkXUnbounded) as excinfo: + obj = pa.find_pathway_flow(G, path) + assert 'Infinite capacity path' in str(excinfo.value) def test_find_pathway_flow_single_infinite(): @@ -281,14 +293,14 @@ def test_find_pathway_flow_all_infinite(): G.add_edges_from(edges) path = (0, 1, 2, 3) - with pytest.raises(nx.exception.NetworkXUnbounded) as excinfo: - obj = pa.find_pathway_flow(G, path) - assert 'Infinite capacity path, flow unbounded above.' in str(excinfo.value) + with pytest.raises(nx.exception.NetworkXUnbounded) as excinfo: + obj = pa.find_pathway_flow(G, path) + assert 'Infinite capacity path' in str(excinfo.value) def test_find_pathway_flow_multiedges(): G = nx.MultiDiGraph() - edges = [(0,1), (1,2), (1,2), (2,3)] + edges = [(0, 1), (1, 2), (1, 2), (2, 3)] G.add_edges_from(edges) pathway = (0, 1, 2, 3) @@ -299,7 +311,7 @@ def test_find_pathway_flow_multiedges(): def test_find_pathway_flow_other_type(): G = nx.Graph() - edges = [(0,1), (1,2), (2,3)] + edges = [(0, 1), (1, 2), (2, 3)] G.add_edges_from(edges) pathway = (0, 1, 2, 3) @@ -310,7 +322,15 @@ def test_find_pathway_flow_other_type(): @pytest.mark.parametrize("steps,exp", [(('FacilityA', 'FacilityB'), 1), (('FacilityA', 'Sink'), -1)]) -def test_check_if_sublist(steps,exp): +def test_check_if_sublist(steps, exp): + path = ('Source', 'FacilityA', 'FacilityB', 'Sink') + pos = pa.check_if_sublist(path, steps) + assert pos == exp + + +@pytest.mark.parametrize("path,steps,exp", [(('Source', 'Sink'), (), -1), + ((), ('Sink'), -1)]) +def test_check_if_sublist_len_zero(path, steps, exp): path = ('Source', 'FacilityA', 'FacilityB', 'Sink') pos = pa.check_if_sublist(path, steps) assert pos == exp @@ -331,7 +351,7 @@ def test_roll_cycle(cycle, exp): def test_insert_cycle_single(): path = (0, 1, 3, 4, 7, 8) rolled_cycles = {(7, 3, 4)} - + exp = (0, 1, 3, 4, (7, 3, 4), 7, 8) obs = pa.insert_cycles(path, rolled_cycles) assert obs == exp @@ -340,7 +360,7 @@ def test_insert_cycle_single(): def test_insert_cycle_loop(): path = (0, 1, 3, 4, 7, 8) rolled_cycles = {(7,)} - + exp = (0, 1, 3, 4, (7,), 7, 8) obs = pa.insert_cycles(path, rolled_cycles) assert obs == exp @@ -356,10 +376,10 @@ def test_insert_cycle_multiple(): def test_insert_cycle_multiple_same_index(): - path = ('Source' , 'A', 'B', 'C', 'Sink') + path = ('Source', 'A', 'B', 'C', 'Sink') rolled_cycles = (('C', 'D', 'B'), ('C', 'D', 'E', 'A', 'B')) - - exp = ('Source' , 'A', 'B', ('C', 'D', 'B'), ('C', 'D', 'E', 'A', 'B'), + + exp = ('Source', 'A', 'B', ('C', 'D', 'B'), ('C', 'D', 'E', 'A', 'B'), 'C', 'Sink') obs = pa.insert_cycles(path, rolled_cycles) assert obs == exp @@ -393,7 +413,7 @@ def test_get_pathways_with_cycles_multiple(): (0, 2, (6, 5), 6, (7, 3, 4), 7, 8)} obs = pa.get_pathways_with_cycles(pathways, sc) - #assert all(x in obs for x in exp) + # assert all(x in obs for x in exp) assert obs == exp @@ -422,10 +442,10 @@ def data(): @pytest.mark.parametrize("source,exp", [("SourceA", {("SourceA", "Facility", "SinkA"), - ("SourceA", "Facility", "SinkB")}), + ("SourceA", "Facility", "SinkB")}), ("SourceC", set()), ("Facility", set())]) -def test_find_paths_with_source(source,exp): +def test_find_paths_with_source(source, exp): pathways = testdata[2][4] obs_subset = pa.find_paths_with_source(pathways, source) @@ -441,10 +461,10 @@ def test_find_paths_with_source_none(): @pytest.mark.parametrize("sink,exp", [("SinkB", {("SourceA", "Facility", "SinkB"), - ("SourceB", "Facility", "SinkB")}), + ("SourceB", "Facility", "SinkB")}), ("SinkC", set()), ("SourceA", set())]) -def test_find_paths_with_sink(sink,exp): +def test_find_paths_with_sink(sink, exp): pathways = testdata[2][4] obs_subset = pa.find_paths_with_sink(pathways, sink) @@ -458,9 +478,9 @@ def test_find_paths_with_sink_none(): assert obs == set() -@pytest.mark.parametrize("contain,exp", [("SourceB", +@pytest.mark.parametrize("contain,exp", [("SourceB", {("SourceB", "Facility", "SinkA"), - ("SourceB", "Facility", "SinkB")}), + ("SourceB", "Facility", "SinkB")}), (["SourceB", "SinkB"], {("SourceB", "Facility", "SinkB")}), (["SourceB", "SourceA"], set()), @@ -472,17 +492,18 @@ def test_find_paths_containing_all(contain, exp): assert obs == exp -@pytest.mark.parametrize("contain,exp", [(5, {(0,1,5,6,4)}), - ([5], {(0,1,5,6,4)}), - ([1, 5], {(0,1,5,6,4)}), - ([0, 3], {(0,1,2,3,4), (0,2,3,4)}), +@pytest.mark.parametrize("contain,exp", [(5, {(0, 1, 5, 6, 4)}), + ([5], {(0, 1, 5, 6, 4)}), + ([1, 5], {(0, 1, 5, 6, 4)}), + ([0, 3], {(0, 1, 2, 3, 4), + (0, 2, 3, 4)}), ([5, 7], set()), ([8], set()), - ([7,8], set()), + ([7, 8], set()), ([], set())]) def test_find_paths_containing_all_int(contain, exp): - pathways = {(0,1,2,3,4), (0,2,3,4), (0,1,5,6,4), (0,1,7,4)} - + pathways = {(0, 1, 2, 3, 4), (0, 2, 3, 4), (0, 1, 5, 6, 4), (0, 1, 7, 4)} + obs = pa.find_paths_containing_all(pathways, contain) assert obs == exp @@ -496,11 +517,11 @@ def test_find_paths_containing_all_none(): @pytest.mark.parametrize("contain,exp", [("SourceA", {("SourceA", "Facility", "SinkA"), - ("SourceA", "Facility", "SinkB")}), + ("SourceA", "Facility", "SinkB")}), (["SourceA", "SinkA"], {("SourceA", "Facility", "SinkA"), - ("SourceA", "Facility", "SinkB"), - ("SourceB", "Facility", "SinkA")}), + ("SourceA", "Facility", "SinkB"), + ("SourceB", "Facility", "SinkA")}), (["SourceC"], set()), ([], set())]) def test_find_paths_containing_one_of(contain, exp): @@ -510,15 +531,16 @@ def test_find_paths_containing_one_of(contain, exp): assert obs == exp -@pytest.mark.parametrize("contain,exp", [(5, {(0,1,5,6,4)}), - ([5], {(0,1,5,6,4)}), - ([5, 7], {(0,1,5,6,4),(0,1,7,4)}), +@pytest.mark.parametrize("contain,exp", [(5, {(0, 1, 5, 6, 4)}), + ([5], {(0, 1, 5, 6, 4)}), + ([5, 7], {(0, 1, 5, 6, 4), + (0, 1, 7, 4)}), (8, set()), ([8], set()), - ([7,8], {(0,1,7,4)}), + ([7, 8], {(0, 1, 7, 4)}), ([], set())]) def test_find_paths_containing_one_of_int(contain, exp): - pathways = {(0,1,2,3,4), (0,2,3,4),(0,1,5,6,4), (0,1,7,4)} + pathways = {(0, 1, 2, 3, 4), (0, 2, 3, 4), (0, 1, 5, 6, 4), (0, 1, 7, 4)} obs = pa.find_paths_containing_one_of(pathways, contain) assert obs == exp @@ -552,3 +574,41 @@ def test_get_longest_paths(name, short, long, edges, paths, sc): exp = {path for path in paths if len(path) == long} obs = pa.get_longest_path(paths) assert obs == exp + + +def test_sort_shortest(): + pathways = {(2, 3), (3, 4, 7, 4, 3, 32, 3), (10000, 10000, 0)} + exp = [(2, 3), (10000, 10000, 0), (3, 4, 7, 4, 3, 32, 3)] + + obs = pa.sort_by_shortest(pathways) + assert obs == exp + + +def test_sort_longest(): + pathways = {(2, 3), (3, 4, 7, 4, 3, 32, 3), (10000, 10000, 0)} + exp = [(3, 4, 7, 4, 3, 32, 3), (10000, 10000, 0), (2, 3)] + + obs = pa.sort_by_longest(pathways) + assert obs == exp + + +@pytest.mark.parametrize("pathways", [{("a", "b")}, + {("a", "b"), (1, "b")}, + {(1, 4)}, + {(4, 5), (6, 5.9)}, + {(6.3, "a")}]) +def test_check_for_invalid_pathways(pathways): + obs = pa.check_for_invalid_pathways(pathways) + assert obs is None + + +@pytest.mark.parametrize("pathways", [{("a")}, + {("a", "b"), "c"}, + {(1)}, + {(4, 5), 6}, + {(6.3, "a"), 4.6}, + {(5.3)}]) +def test_check_for_invalid_pathways_type_error(pathways): + with pytest.raises(TypeError) as excinfo: + obj = pa.sort_by_longest(pathways) + assert 'pathways contains' in str(excinfo.value) diff --git a/trailmap/pathway_analysis.py b/trailmap/pathway_analysis.py index 972aa9f..e03a257 100644 --- a/trailmap/pathway_analysis.py +++ b/trailmap/pathway_analysis.py @@ -5,7 +5,7 @@ from collections import Counter -def print_graph_parameters(G, pathways): # pragma: no cover +def print_graph_parameters(G, pathways): # pragma: no cover '''Prints a set of parameters characterizing the graph ''' print('\nGRAPH PARAMETERS') @@ -85,7 +85,7 @@ def transform_to_digraph(G): def find_maximum_flow(H, s, t): '''Finds maximum flow between a source and target node in DiGraph G. Requires edge attribute 'capacity'. MultiDiGraphs not supported. - ''' + ''' if type(H) == nx.classes.digraph.DiGraph: max_flow_path = nx.maximum_flow(H, s, t) max_flow = nx.maximum_flow_value(H, s, t) @@ -97,7 +97,7 @@ def find_maximum_flow(H, s, t): def find_pathway_flow(H, pathway): - '''returns the maximum permissible flow for a given pathway in DiGraph G. + '''returns the maximum permissible flow for a given pathway in DiGraph G. Any edge without 'capacity' attribute will be given infinite capacity. MultiDiGraphs not supported. ''' @@ -109,7 +109,7 @@ def find_pathway_flow(H, pathway): sinks = get_sinks(H_sg) path_flow = nx.maximum_flow_value(H_sg, sources[0], sinks[0]) - + return path_flow elif type(H) == nx.classes.multidigraph.MultiDiGraph: @@ -120,7 +120,7 @@ def find_pathway_flow(H, pathway): raise TypeError('Graph must be DiGraph type.') -def find_simple_cycles(G): # pragma: no cover +def find_simple_cycles(G): # pragma: no cover '''finds cycles in a graph and returns them in a list of lists. ''' sc = list(nx.simple_cycles(G)) @@ -141,7 +141,7 @@ def check_if_sublist(path, list_of_steps): if path[i] == list_of_steps[0]: n = 1 while (n < len(list_of_steps) and path[i+n] == list_of_steps[n]): - n+=1 + n += 1 if n == len(list_of_steps): pos = i break @@ -167,7 +167,7 @@ def roll_cycle(path, cycle): def insert_cycles(pathway, rolled_cycles): '''Inserts already-rolled cycles as a tuple into a path ''' - path=list(pathway) + path = list(pathway) for rc in rolled_cycles: path.insert(path.index(rc[0]), rc) path_with_cycles = tuple(path) @@ -188,7 +188,7 @@ def get_pathways_with_cycles(pathways, sc): rolled_cycle = roll_cycle(path, cycle) if rolled_cycle: rolled_cycles.add(rolled_cycle) - + # record all the pathways that have cycles, insert single cycle if rolled_cycles: pathways_with_cycles.add(insert_cycles(path, rolled_cycles)) @@ -202,7 +202,7 @@ def find_paths_with_source(pathways, source): if len(pathways) is 0: return set() - subset_pathways = set([ path for path in pathways if path[0] == source]) + subset_pathways = set([path for path in pathways if path[0] == source]) return subset_pathways @@ -211,8 +211,8 @@ def find_paths_with_sink(pathways, sink): ''' if len(pathways) is 0: return set() - - subset_pathways = set([ path for path in pathways if path[-1] == sink]) + + subset_pathways = set([path for path in pathways if path[-1] == sink]) return subset_pathways @@ -225,7 +225,7 @@ def find_paths_containing_all(pathways, facilities): # convert to list if user passed a string or int if type(facilities) == int or type(facilities) == str: facilities = [facilities] - + # if user passed an empty list, return no pathways if not facilities: return set() @@ -254,9 +254,12 @@ def find_paths_containing_one_of(pathways, facilities): def get_shortest_path(pathways): - '''Finds the set of pathways with the shortest number of steps from source to - target. Returns a tuple with path and length. + '''Finds the set of pathways with the shortest number of steps from source + to target. Returns a tuple with path and length. ''' + # check that there are no single-item pathways + check_for_invalid_pathways(pathways) + if len(pathways) is not 0: short_len = min([len(path) for path in pathways]) shortest = set([path for path in pathways if len(path) == short_len]) @@ -270,6 +273,9 @@ def get_longest_path(pathways): '''Finds the pathway with the longest number of steps from source to target. Returns a tuple with path and length. ''' + # check that there are no single-item pathways + check_for_invalid_pathways(pathways) + if len(pathways) is not 0: long_len = max([len(path) for path in pathways]) longest = set([path for path in pathways if len(path) == long_len]) @@ -291,3 +297,30 @@ def get_sinks(G): ''' sinks = list(node for node, out_deg in G.out_degree() if out_deg == 0) return sinks + + +def sort_by_shortest(pathways): + '''Returns the pathways sorted from shortest to longest + ''' + # check that there are no single-item pathways + check_for_invalid_pathways(pathways) + return sorted(list(pathways), key=len) + + +def sort_by_longest(pathways): + '''Returns the pathways sorted from shortest to longest + ''' + # check that there are no single-item pathways + check_for_invalid_pathways(pathways) + return sorted(list(pathways), key=len, reverse=True) + + +def check_for_invalid_pathways(pathways): + '''checks if pathways contains any errors, such as having single-value + pathways, such as ('facility') + ''' + if any([isinstance(i, (int, float, str)) for i in pathways]): + raise TypeError('pathways contains pathway(s) with only one facility' + ". All pathways should include at least two items") + + return