diff --git a/api/openapi/api.yaml b/api/openapi/api.yaml index c48559e47..11584d386 100644 --- a/api/openapi/api.yaml +++ b/api/openapi/api.yaml @@ -849,3 +849,100 @@ paths: description: Check schematic log. tags: - Schema Operation + /visualize/tangled_tree/layers: + get: + summary: Get layers of tangled tree. + description: >- + Get tangled tree node layers to display for a given data model and figure type + operationId: api.routes.get_viz_tangled_tree_layers + parameters: + - in: query + name: schema_url + schema: + type: string + description: Data Model URL + example: >- + https://raw.githubusercontent.com/Sage-Bionetworks/schematic/develop/tests/data/example.model.jsonld + required: true + - in: query + name: figure_type + schema: + type: string + enum: ["component", "dependency"] + description: Figure type to generate. + example: 'component' + required: true + responses: + "200": + description: Returns a dataframe as a JSON String. + content: + text/json: + schema: + type: string + tags: + - Visualization Operations + /visualize/tangled_tree/text: + get: + summary: Get text to display on tangled tree. + description: >- + Get tangled tree plain or higlighted text to display for a given data model, text formatting and figure type + operationId: api.routes.get_viz_tangled_tree_text + parameters: + - in: query + name: schema_url + schema: + type: string + description: Data Model URL + example: >- + https://raw.githubusercontent.com/Sage-Bionetworks/schematic/develop/tests/data/example.model.jsonld + required: true + - in: query + name: figure_type + schema: + type: string + enum: ["component", "dependency"] + description: Figure type to generate. + example: 'component' + required: true + - in: query + name: text_format + schema: + type: string + enum: ["plain", "highlighted"] + description: Text formatting type. + example: 'plain' + required: true + responses: + "200": + description: Returns a dataframe as a JSON String. + content: + text/csv: + schema: + type: string + tags: + - Visualization Operations + /visualize/attributes: + get: + summary: Get an attributes table for a data model, as a CSV (JSON String) + description: >- + Get all the attributes associated with a data model formatted as a + dataframe (stored as a JSON String) for use in Observable visualization. + operationId: api.routes.get_viz_attributes_explorer + parameters: + - in: query + name: schema_url + schema: + type: string + description: Data Model URL + example: >- + https://raw.githubusercontent.com/Sage-Bionetworks/schematic/develop/tests/data/example.model.jsonld + required: true + responses: + "200": + description: Returns a CSV as a JSON String. + content: + text/csv: + schema: + type: string + tags: + - Visualization Operations diff --git a/api/routes.py b/api/routes.py index ed1ee24d7..5840eab14 100644 --- a/api/routes.py +++ b/api/routes.py @@ -12,11 +12,14 @@ from schematic import CONFIG +from schematic.visualization.attributes_explorer import AttributesExplorer +from schematic.visualization.tangled_tree import TangledTree from schematic.manifest.generator import ManifestGenerator from schematic.models.metadata import MetadataModel from schematic.schemas.generator import SchemaGenerator from schematic.schemas.explorer import SchemaExplorer from schematic.store.synapse import SynapseStorage +from flask_cors import CORS, cross_origin from schematic.schemas.explorer import SchemaExplorer import pandas as pd import json @@ -349,7 +352,6 @@ def populate_manifest_route(schema_url, title=None, data_type=None): return populated_manifest_link - def get_storage_projects(input_token, asset_view): # call config handler config_handler(asset_view=asset_view) @@ -374,7 +376,6 @@ def get_storage_projects_datasets(input_token, asset_view, project_id): return sorted_dataset_lst - def get_files_storage_dataset(input_token, asset_view, dataset_id, full_path, file_names=None): # call config handler config_handler(asset_view=asset_view) @@ -389,6 +390,7 @@ def get_files_storage_dataset(input_token, asset_view, dataset_id, full_path, fi # call getFilesInStorageDataset function file_lst = store.getFilesInStorageDataset(datasetId=dataset_id, fileNames=file_names, fullpath=full_path) return file_lst + def get_component_requirements(schema_url, source_component, as_graph): metadata_model = initalize_metadata_model(schema_url) @@ -396,6 +398,40 @@ def get_component_requirements(schema_url, source_component, as_graph): return req_components +def get_viz_attributes_explorer(schema_url): + # call config_handler() + config_handler() + + temp_path_to_jsonld = get_temp_jsonld(schema_url) + + attributes_csv = AttributesExplorer(temp_path_to_jsonld).parse_attributes(save_file=False) + + return attributes_csv + +def get_viz_tangled_tree_text(schema_url, figure_type, text_format): + + temp_path_to_jsonld = get_temp_jsonld(schema_url) + + # Initialize TangledTree + tangled_tree = TangledTree(temp_path_to_jsonld, figure_type) + + # Get text for tangled tree. + text_df = tangled_tree.get_text_for_tangled_tree(text_format, save_file=False) + + return text_df + +def get_viz_tangled_tree_layers(schema_url, figure_type): + + temp_path_to_jsonld = get_temp_jsonld(schema_url) + + # Initialize Tangled Tree + tangled_tree = TangledTree(temp_path_to_jsonld, figure_type) + + # Get tangled trees layers JSON. + layers = tangled_tree.get_tangled_tree_layers(save_file=False) + + return layers[0] + def download_manifest(input_token, dataset_id, asset_view, as_json, new_manifest_name=''): # call config handler config_handler(asset_view=asset_view) diff --git a/poetry.lock b/poetry.lock index 0f97296f4..6b1104156 100644 --- a/poetry.lock +++ b/poetry.lock @@ -404,6 +404,18 @@ dev = ["coverage", "pallets-sphinx-themes", "pytest", "sphinx", "sphinx-issues", docs = ["pallets-sphinx-themes", "sphinx", "sphinx-issues", "sphinxcontrib-log-cabinet"] dotenv = ["python-dotenv"] +[[package]] +name = "flask-cors" +version = "3.0.10" +description = "A Flask extension adding a decorator for CORS support" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +Flask = ">=0.9" +Six = "*" + [[package]] name = "google-api-core" version = "2.8.2" @@ -1989,7 +2001,7 @@ testing = ["func-timeout", "jaraco-itertools", "pytest (>=6)", "pytest-black (>= [metadata] lock-version = "1.1" python-versions = ">=3.7.1,<3.11" -content-hash = "1d885d895f7f261434a5a9d388b5cc720b5f4ec7b818e2a0f3a9cabad3c80095" +content-hash = "77f2068fe6df8f020fe207e2f6ce49d8850e3b507d11288f7f7e0bdc69dc0c0d" [metadata.files] alabaster = [ @@ -2305,6 +2317,10 @@ flask = [ {file = "Flask-1.1.4-py2.py3-none-any.whl", hash = "sha256:c34f04500f2cbbea882b1acb02002ad6fe6b7ffa64a6164577995657f50aed22"}, {file = "Flask-1.1.4.tar.gz", hash = "sha256:0fbeb6180d383a9186d0d6ed954e0042ad9f18e0e8de088b2b419d526927d196"}, ] +flask-cors = [ + {file = "Flask-Cors-3.0.10.tar.gz", hash = "sha256:b60839393f3b84a0f3746f6cdca56c1ad7426aa738b70d6c61375857823181de"}, + {file = "Flask_Cors-3.0.10-py2.py3-none-any.whl", hash = "sha256:74efc975af1194fc7891ff5cd85b0f7478be4f7f59fe158102e91abb72bb4438"}, +] google-api-core = [ {file = "google-api-core-2.8.2.tar.gz", hash = "sha256:06f7244c640322b508b125903bb5701bebabce8832f85aba9335ec00b3d02edc"}, {file = "google_api_core-2.8.2-py3-none-any.whl", hash = "sha256:93c6a91ccac79079ac6bbf8b74ee75db970cc899278b97d53bc012f35908cf50"}, diff --git a/pyproject.toml b/pyproject.toml index e5bb12ecc..4cf0a526d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,7 @@ itsdangerous = "1.1.0" Jinja2 = "2.11.3" openpyxl = "^3.0.9" "backports.zoneinfo" = {markers = "python_version < \"3.9\"", version = "^0.2.1"} +Flask-Cors = "^3.0.10" [tool.poetry.dev-dependencies] diff --git a/run_api.py b/run_api.py index 80127d872..cf278da9c 100755 --- a/run_api.py +++ b/run_api.py @@ -3,6 +3,7 @@ # import our application # Run our application from api import create_app +from flask_cors import CORS import os if __name__ == "__main__": @@ -13,4 +14,5 @@ # Launch app app = create_app() - app.run(host=host, port=port, debug=True) + CORS(app, resources={r"*": {"origins": "*"}}) + app.run(port=3001, debug=True) diff --git a/schematic/__main__.py b/schematic/__main__.py index f1c04725e..c03219524 100644 --- a/schematic/__main__.py +++ b/schematic/__main__.py @@ -11,6 +11,9 @@ from schematic.schemas.commands import ( schema as schema_cli, ) # schema conversion commands +from schematic.visualization.commands import ( + viz as viz_cli, +) # viz generation commands from schematic import init as init_cli # schematic initialization commands logger = logging.getLogger() @@ -34,6 +37,8 @@ def main(): main.add_command(manifest_cli) # add manifest commands main.add_command(model_cli) # add model commands main.add_command(schema_cli) # add schema commands +main.add_command(viz_cli) # add viz commands + if __name__ == "__main__": diff --git a/schematic/help.py b/schematic/help.py index 4bbb24c17..de402b183 100644 --- a/schematic/help.py +++ b/schematic/help.py @@ -178,3 +178,18 @@ ), } } + +viz_commands = { + "visualization": { + "config": ( + "Specify the path to the `config.yml` using this option. This is a required argument." + ), + "tangled_tree": { + "figure_type": ("Specify the type of schema visualization to make. Either 'dependency' or 'component'." + ), + "text_format": ("Specify the type of text to gather for tangled tree visualization, either 'plain' or 'highlighted'." + ), + } + } + +} diff --git a/schematic/schemas/explorer.py b/schematic/schemas/explorer.py index 984bde184..5c3000166 100644 --- a/schematic/schemas/explorer.py +++ b/schematic/schemas/explorer.py @@ -617,6 +617,26 @@ def update_property(self, property_info): validate_schema(self.schema) logger.info(f"Updated the property {property_info['rdfs:label']} successfully.") + def get_nodes_descendants(self, graph, component): + """ + Return a list of nodes reachable from source in graph + graph: networkx graph object + component: any given node + """ + all_descendants = list(nx.descendants(graph, component)) + + return all_descendants + + def get_nodes_ancestors(self, graph, component): + """ + Return a list of nodes reachable from source in graph + graph: networkx graph object + component: any given node + """ + all_ancestors = list(nx.ancestors(graph, component)) + + return all_ancestors + def get_digraph_by_edge_type(self, edge_type): multi_digraph = self.schema_nx diff --git a/schematic/schemas/generator.py b/schematic/schemas/generator.py index 37ed9ad9e..e1f4b436c 100644 --- a/schematic/schemas/generator.py +++ b/schematic/schemas/generator.py @@ -1,3 +1,4 @@ +import gc import os import json import logging diff --git a/schematic/store/synapse.py b/schematic/store/synapse.py index d4501669c..e425f64f4 100644 --- a/schematic/store/synapse.py +++ b/schematic/store/synapse.py @@ -1437,7 +1437,7 @@ def make_synapse_table(self, if col in table_schema_by_cname: col_type = table_schema_by_cname[col]['columnType'] - max_size = table_schema_by_cname[col]['maximumSize'] + max_size = table_schema_by_cname[col]['maximumSize'] if 'maximumSize' in table_schema_by_cname[col].keys() else 100 max_list_len = 250 if max_size and max_list_len: cols.append(Column(name=col, columnType=col_type, @@ -1487,7 +1487,7 @@ def make_synapse_table(self, for col in table_to_load.columns: if col in table_schema_by_cname: col_type = table_schema_by_cname[col]['columnType'] - max_size = table_schema_by_cname[col]['maximumSize'] + max_size = table_schema_by_cname[col]['maximumSize'] if 'maximumSize' in table_schema_by_cname[col].keys() else 100 max_list_len = 250 if max_size and max_list_len: cols.append(Column(name=col, columnType=col_type, diff --git a/schematic/visualization/__init__.py b/schematic/visualization/__init__.py new file mode 100644 index 000000000..b5dbf3f05 --- /dev/null +++ b/schematic/visualization/__init__.py @@ -0,0 +1,2 @@ +from schematic.visualization.attributes_explorer import AttributesExplorer +from schematic.visualization.tangled_tree import TangledTree \ No newline at end of file diff --git a/schematic/visualization/attributes_explorer.py b/schematic/visualization/attributes_explorer.py new file mode 100644 index 000000000..2b7ad290f --- /dev/null +++ b/schematic/visualization/attributes_explorer.py @@ -0,0 +1,146 @@ +import gc +import json +import logging +import numpy as np +import os +import pandas as pd +import resource +from typing import Any, Dict, Optional, Text, List + +from schematic.schemas import SchemaGenerator +from schematic.utils.io_utils import load_json +from schematic import CONFIG + +logger = logging.getLogger(__name__) + +class AttributesExplorer(): + def __init__(self, + path_to_jsonld: str, + )-> None: + + self.path_to_jsonld = path_to_jsonld + self.json_data_model = load_json(self.path_to_jsonld) + self.jsonld = load_json(self.path_to_jsonld) + + # instantiate a schema generator to retrieve db schema graph from metadata model graph + self.sg = SchemaGenerator(self.path_to_jsonld) + + self.output_path = self.create_output_path('merged_csv') + + def create_output_path(self, terminal_folder): + ''' Create output path to store Observable visualization data if it does not already exist. + + Args: self.path_to_jsonld + + Returns: output_path (str): path to store outputs + ''' + base_dir = os.path.dirname(self.path_to_jsonld) + self.schema_name = self.path_to_jsonld.split('/')[-1].split('.model.jsonld')[0] + output_path = os.path.join(base_dir, 'visualization', self.schema_name, terminal_folder) + if not os.path.exists(output_path): + os.makedirs(output_path) + return output_path + + def convert_string_cols_to_json(self, df: pd.DataFrame, cols_to_modify: list): + """Converts values in a column from strings to JSON list + for upload to Synapse. + """ + for col in df.columns: + if col in cols_to_modify: + df[col] = df[col].apply(lambda x: json.dumps([y.strip() for y in x]) if x != "NaN" and x and x == np.nan else x) + return df + + def parse_attributes(self, save_file=True): + ''' + Args: save_file (bool): + True: merged_df is saved locally to output_path. + False: merged_df is returned. + + Returns: + merged_df (pd.DataFrame): dataframe containing data relating to attributes + for the provided data model for all components in the data model. + Dataframe is saved locally as a csv if save_file == True, or returned if + save_file == False. + Raises: + ValueError: + If unable hits an error while attempting to get conditional requirements. + This error is likely to be found if there is a mismatch in naming. + ''' + # get all components + component_dg = self.sg.se.get_digraph_by_edge_type('requiresComponent') + components = component_dg.nodes() + # For each data type to be loaded gather all attribtes the user would + # have to provide. + + df_store = [] + for component in components: + data_dict = {} + # get the json schema + json_schema = self.sg.get_json_schema_requirements( + source_node=component, schema_name=self.path_to_jsonld) + + # Gather all attribues, their valid values and requirements + for key, value in json_schema['properties'].items(): + data_dict[key] = {} + for k, v in value.items(): + if k == 'enum': + data_dict[key]['Valid Values'] = value['enum'] + if key in json_schema['required']: + data_dict[key]['Required'] = True + else: + data_dict[key]['Required'] = False + data_dict[key]['Component'] = component + # Add additional details per key (from the JSON-ld) + for dic in self.jsonld['@graph']: + if 'sms:displayName' in dic.keys(): + key = dic['sms:displayName'] + if key in data_dict.keys(): + data_dict[key]['Attribute'] = dic['sms:displayName'] + data_dict[key]['Label'] = dic['rdfs:label'] + data_dict[key]['Description'] = dic['rdfs:comment'] + if 'validationRules' in dic.keys(): + data_dict[key]['Validation Rules'] = dic['validationRules'] + # Find conditional dependencies + if 'allOf' in json_schema.keys(): + for conditional_dependencies in json_schema['allOf']: + key = list(conditional_dependencies['then']['properties'])[0] + try: + if key in data_dict.keys(): + if 'Cond_Req' not in data_dict[key].keys(): + data_dict[key]['Cond_Req'] = [] + data_dict[key]['Conditional Requirements'] = [] + attribute = list(conditional_dependencies['if']['properties'])[0] + value = conditional_dependencies['if']['properties'][attribute]['enum'] + # Capitalize attribute if it begins with a lowercase letter, for aesthetics. + if attribute[0].islower(): + attribute = attribute.capitalize() + conditional_statement = f'{attribute} -is- "{value[0]}"' + if conditional_statement not in data_dict[key]['Conditional Requirements']: + data_dict[key]['Cond_Req'] = True + data_dict[key]['Conditional Requirements'].extend([conditional_statement]) + except: + raise ValueError( + f"There is an error getting conditional requirements related " + "to the attribute: {key}. The error is likely caused by naming inconsistencies (e.g. uppercase, camelcase, ...)" + ) + for key, value in data_dict.items(): + if 'Conditional Requirements' in value.keys(): + data_dict[key]['Conditional Requirements'] = ' || '.join(data_dict[key]['Conditional Requirements']) + df = pd.DataFrame(data_dict) + df = df.T + cols = ['Attribute', 'Label', 'Description', 'Required', 'Cond_Req', 'Valid Values', 'Conditional Requirements', 'Validation Rules', 'Component'] + cols = [col for col in cols if col in df.columns] + df = df[cols] + df = self.convert_string_cols_to_json(df, ['Valid Values']) + #df.to_csv(os.path.join(csv_output_path, data_type + '.vis_data.csv')) + df_store.append(df) + + merged_attributes_df = pd.concat(df_store, join='outer') + cols = ['Attribute', 'Label', 'Description', 'Required', 'Cond_Req', 'Valid Values', 'Conditional Requirements', 'Validation Rules', 'Component'] + cols = [col for col in cols if col in merged_attributes_df.columns] + + merged_attributes_df = merged_attributes_df[cols] + if save_file == True: + return merged_attributes_df.to_csv(os.path.join(self.output_path, self.schema_name + 'attributes_data.vis_data.csv')) + elif save_file == False: + return merged_attributes_df.to_csv() diff --git a/schematic/visualization/commands.py b/schematic/visualization/commands.py new file mode 100644 index 000000000..0930ba8ea --- /dev/null +++ b/schematic/visualization/commands.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 + +import logging +import sys + +import click +import click_log + +from schematic.visualization.attributes_explorer import AttributesExplorer +from schematic.visualization.tangled_tree import TangledTree +from schematic.utils.cli_utils import get_from_config, fill_in_from_config, query_dict +from schematic.help import viz_commands +from schematic.help import model_commands +from schematic import CONFIG + +logger = logging.getLogger(__name__) +click_log.basic_config(logger) + +CONTEXT_SETTINGS = dict(help_option_names=["--help", "-h"]) # help options + +# invoke_without_command=True -> forces the application not to show aids before losing them with a --h +@click.group(context_settings=CONTEXT_SETTINGS, invoke_without_command=True) +@click_log.simple_verbosity_option(logger) +@click.option( + "-c", + "--config", + type=click.Path(), + envvar="SCHEMATIC_CONFIG", + help=query_dict(model_commands, ("model", "config")), +) +@click.pass_context +def viz(ctx, config): # use as `schematic model ...` + """ + Sub-commands for Visualization methods. + """ + try: + logger.debug(f"Loading config file contents in '{config}'") + ctx.obj = CONFIG.load_config(config) + except ValueError as e: + logger.error("'--config' not provided or environment variable not set.") + logger.exception(e) + sys.exit(1) + +@viz.command( + "attributes", +) +@click_log.simple_verbosity_option(logger) + +@click.pass_obj +def get_attributes(ctx): + """ + + """ + # Get JSONLD file path + path_to_jsonld = get_from_config(CONFIG.DATA, ("model", "input", "location")) + # Run attributes explorer + AttributesExplorer(path_to_jsonld).parse_attributes(save_file=True) + return + +@viz.command( + "tangled_tree_text" +) +@click_log.simple_verbosity_option(logger) +@click.option( + "-ft", + "--figure_type", + type=click.Choice(['component', 'dependency'], case_sensitive=False), + help=query_dict(viz_commands, ("visualization", "tangled_tree", "figure_type")), +) +@click.option( + "-tf", + "--text_format", + type=click.Choice(['plain', 'highlighted'], case_sensitive=False), + help=query_dict(viz_commands, ("visualization", "tangled_tree", "text_format")), +) + +@click.pass_obj +def get_tangled_tree_text(ctx, figure_type, text_format): + """ Get text to be placed on the tangled tree visualization. + """ + # Get JSONLD file path + path_to_jsonld = get_from_config(CONFIG.DATA, ("model", "input", "location")) + + # Initialize TangledTree + tangled_tree = TangledTree(path_to_jsonld, figure_type) + + # Get text for tangled tree. + text_df = tangled_tree.get_text_for_tangled_tree(text_format, save_file=True) + return + +@viz.command( + "tangled_tree_layers" +) +@click_log.simple_verbosity_option(logger) +@click.option( + "-ft", + "--figure_type", + type=click.Choice(['component', 'dependency'], case_sensitive=False), + help=query_dict(viz_commands, ("visualization", "tangled_tree", "figure_type")), +) + +@click.pass_obj +def get_tangled_tree_component_layers(ctx, figure_type): + ''' Get the components that belong in each layer of the tangled tree visualization. + ''' + # Get JSONLD file path + path_to_jsonld = get_from_config(CONFIG.DATA, ("model", "input", "location")) + + # Initialize Tangled Tree + tangled_tree = TangledTree(path_to_jsonld, figure_type) + + # Get tangled trees layers JSON. + layers = tangled_tree.get_tangled_tree_layers(save_file=True) + + return diff --git a/schematic/visualization/tangled_tree.py b/schematic/visualization/tangled_tree.py new file mode 100644 index 000000000..613dfc59b --- /dev/null +++ b/schematic/visualization/tangled_tree.py @@ -0,0 +1,782 @@ +from io import StringIO +import json +import logging +import networkx as nx +import numpy as np +import os +from os import path +import pandas as pd +from pathlib import Path + +# allows specifying explicit variable types +from typing import Any, Dict, Optional, Text, List + +from schematic.utils.viz_utils import visualize +from schematic.visualization.attributes_explorer import AttributesExplorer +from schematic.schemas.explorer import SchemaExplorer +from schematic.schemas.generator import SchemaGenerator +from schematic import LOADER +from schematic.utils.io_utils import load_json +from copy import deepcopy + +# Make sure to have newest version of decorator + + +logger = logging.getLogger(__name__) +#OUTPUT_DATA_DIR = str(Path('tests/data/visualization/AMPAD').resolve()) +#DATA_DIR = str(Path('tests/data').resolve()) + +class TangledTree(object): + """ + """ + + def __init__(self, + path_to_json_ld: str, + figure_type: str, + ) -> None: + # Load jsonld + self.path_to_json_ld = path_to_json_ld + self.json_data_model = load_json(self.path_to_json_ld) + + # Parse schema name + self.schema_name = path.basename(self.path_to_json_ld).split(".model.jsonld")[0] + + # Instantiate a schema generator to retrieve db schema graph from metadata model graph + self.sg = SchemaGenerator(self.path_to_json_ld) + + # Get metadata model schema graph + self.G = self.sg.se.get_nx_schema() + + # Set Parameters + self.figure_type = figure_type.lower() + self.dependency_type = ''.join(('requires', self.figure_type.capitalize())) + + # Get names + self.schema = load_json(self.path_to_json_ld) + self.schema_abbr = self.schema_name.split('_')[0] + + # Initialize AttributesExplorer + self.ae = AttributesExplorer(self.path_to_json_ld) + + # Create output paths. + self.text_csv_output_path = self.ae.create_output_path('text_csv') + self.json_output_path = self.ae.create_output_path('tangled_tree_json') + + def strip_double_quotes(self, string): + # Remove double quotes from beginning and end of string. + if string.startswith('"') and string.endswith('"'): + string = string[1:-1] + # now remove whitespace + string = "".join(string.split()) + return string + + def get_text_for_tangled_tree(self, text_type, save_file=False): + '''Gather the text that needs to be either higlighted or plain for the tangled tree visualization. + Args: + text_type (str): Choices = ['highlighted', 'plain'], determines the type of text + rendering to return. + save_file (bool): Determines if the outputs should be saved to disk or returned. + Returns: + If save_file==True: Saves plain or highlighted text as a CSV (to disk). + save_file==False: Returns plain or highlighted text as a csv string. + ''' + # Get nodes in the digraph, many more nodes returned if figure type is dependency + cdg = self.sg.se.get_digraph_by_edge_type(self.dependency_type) + nodes = cdg.nodes() + + if self.dependency_type == 'requiresComponent': + component_nodes = nodes + else: + # get component nodes if making dependency figure + component_dg = self.sg.se.get_digraph_by_edge_type('requiresComponent') + component_nodes = component_dg.nodes() + + # Initialize lists + highlighted = [] + plain = [] + + # For each component node in the tangled tree gather the plain and higlighted text. + for node in component_nodes: + # Get the highlighted components based on figure_type + if self.figure_type == 'component': + highlight_descendants = self.sg.se.get_descendants_by_edge_type(node, 'requiresComponent') + elif self.figure_type == 'dependency': + highlight_descendants = [node] + + + # Format text to be higlighted and gather text to be formated plain. + if not highlight_descendants: + # If there are no highlighted descendants just highlight the selected node (format for observable.) + highlighted.append([node, "id", node]) + # Gather all the text as plain text. + plain_descendants = [n for n in nodes if n != node] + else: + # Format higlighted text for Observable. + for hd in highlight_descendants: + highlighted.append([node, "id", hd]) + # Gather the non-higlighted text as plain text descendants. + plain_descendants = [node for node in nodes if node not in highlight_descendants] + + # Format all the plain text for observable. + for nd in plain_descendants: + plain.append([node, "id", nd]) + + # Prepare df depending on what type of text we need. + df = pd.DataFrame(locals()[text_type.lower()], columns = ['Component', 'type', 'name']) + + # Depending on input either export csv locally to disk or as a string. + if save_file==True: + file_name = f"{self.schema_abbr}_{self.figure_type}_{text_type}.csv" + df.to_csv(os.path.join(self.text_csv_output_path, file_name)) + return + elif save_file==False: + return df.to_csv() + + def get_topological_generations(self): + ''' Gather topological_gen, nodes and edges based on figure type. + Outputs: + topological_gen (List(list)):list of lists. Indicates layers of nodes. + nodes: (Networkx NodeView) Nodes of the component or dependency graph. When iterated over it functions like a list. + edges: (Networkx EdgeDataView) Edges of component or dependency graph. When iterated over it works like a list of tuples. + ''' + # Get nodes in the digraph + digraph = self.sg.se.get_digraph_by_edge_type(self.dependency_type) + nodes = digraph.nodes() + + # Get subgraph + mm_graph = self.sg.se.get_nx_schema() + subg = self.sg.get_subgraph_by_edge_type(mm_graph, self.dependency_type) + + # Get edges and topological_gen based on figure type. + if self.figure_type == 'component': + edges = digraph.edges() + topological_gen = list(reversed(list(nx.topological_generations(subg)))) + + elif self.figure_type == 'dependency': + rev_digraph = nx.DiGraph.reverse(digraph) + edges = rev_digraph.edges() + topological_gen = list(nx.topological_generations(subg)) + + return topological_gen, nodes, edges, subg + + def get_ca_alias(self, conditional_requirements: list) -> dict: + '''Get the alias for each conditional attribute. + + NOTE: Obtaining attributes(attr) and aliases(ali) in this function is specific to how formatting + is set in AttributesExplorer. If that formatting changes, this section + will likely break or in the worst case have a silent error. + Input: + conditional_requirements_list (list): list of strings of conditional requirements from outputs of AttributesExplorer. + Output: + ca_alias (dict): + key: alias (attribute response) + value: attribute + ''' + ca_alias = {} + for i, req in enumerate(conditional_requirements): + req = req.split(' || ') + for r in req: + attr, ali = r.split(' -is- ') + attr = "".join(attr.split()) + ali = self.strip_double_quotes(ali) + ca_alias[ali] = attr + return ca_alias + + def gather_component_dependency_info(self, cn, attributes_df): + '''Gather all component dependency information. + Inputs: + cn: (str) component name + attributes_df: (Pandas DataFrame) Details for all attributes across all components. From AttributesExplorer. + Outputs: + conditional_attributes (list): List of conditional attributes for a particular component + ca_alias (dict): + key: alias (attribute response) + value: attribute + all_attributes (list): all attributes associated with a particular component. + ''' + + # Gather all component dependency information + component_attributes = self.sg.get_descendants_by_edge_type( + cn, + self.dependency_type, + connected=True + ) + + # Dont want to display `Component` in the figure so remove + if 'Component' in component_attributes: + component_attributes.remove('Component') + + # Gather conditional attributes so they can be added to the figure. + if 'Cond_Req' in attributes_df.columns: + conditional_attributes = list(attributes_df[(attributes_df['Cond_Req']==True) + &(attributes_df['Component']==cn)]['Label']) + ca_df = attributes_df[(attributes_df['Cond_Req']==True)&(attributes_df['Component']==cn)] + conditional_requirements = list(attributes_df[(attributes_df['Cond_Req']==True) + &(attributes_df['Component']==cn)]['Conditional Requirements']) + ca_alias = self.get_ca_alias(conditional_requirements) + else: + # If there are no conditional attributes/requirements, initialize blank lists. + conditional_attributes = [] + ca_alias = {} + + # Gather a list of all attributes for the current component. + all_attributes = list(np.append(component_attributes,conditional_attributes)) + + return conditional_attributes, ca_alias, all_attributes + + def find_source_nodes(self, nodes, edges, all_attributes=[]): + '''Find all nodes in the graph that do not have a parent node. + Inputs: + nodes: (Networkx NodeView) Nodes of the component or dependency graph. When iterated over it functions like a list. + edges: (Networkx EdgeDataView) Edges of component or dependency graph. When iterated over it works like a list of tuples. + attributes_df: (Pandas DataFrame) Details for all attributes across all components. From AttributesExplorer. + + Outputs: + source_nodes (list(str)): List of parentless nodes in + ''' + # Find edges that are not source nodes. + not_source = [] + for node in nodes: + for edge_pair in edges: + if node == edge_pair[0]: + not_source.append(node) + + # Find source nodes as nodes that are not in not_source. + source_nodes = [] + for node in nodes: + if self.figure_type == 'dependency': + if node not in not_source and node in all_attributes: + source_nodes.append(node) + else: + if node not in not_source: + source_nodes.append(node) + return source_nodes + + def get_parent_child_dictionary(self, nodes, edges, all_attributes=[]): + '''Based on the dependency type, create dictionaries between parent and child and child and parent attributes. + Input: + nodes: (Networkx NodeView) Nodes of the component or dependency graph. + edges: (Networkx EdgeDataView (component figure) or List(list) (dependency figure)) + Edges of component or dependency graph. + all_attributes: + Output: + child_parents (dict): + key: child + value: list of the childs parents + parent_children (dict): + key: parent + value: list of the parents children + ''' + child_parents = {} + parent_children = {} + + if self.dependency_type == 'requiresComponent': + + # Construct child_parents dictionary + for edge in edges: + + # Add child as a key + if edge[0] not in child_parents.keys(): + child_parents[edge[0]] = [] + + # Add parents to list + child_parents[edge[0]].append(edge[1]) + + # Construct parent_children dictionary + for edge in edges: + + # Add parent as a key + if edge[1] not in parent_children.keys(): + parent_children[edge[1]] = [] + + # Add children to list + parent_children[edge[1]].append(edge[0]) + + elif self.dependency_type == 'requiresDependency': + + # Construct child_parents dictionary + for edge in edges: + + # Check if child is an attribute for the current component + if edge[0] in all_attributes: + + # Add child as a key + if edge[0] not in child_parents.keys(): + child_parents[edge[0]] = [] + + # Add parent to list if it is an attriute for the current component + if edge[1] in all_attributes: + child_parents[edge[0]].append(edge[1]) + + # Construct parent_children dictionary + for edge in edges: + + # Check if parent is an attribute for the current component + if edge[1] in all_attributes: + + # Add parent as a key + if edge[1] not in parent_children.keys(): + parent_children[edge[1]] = [] + + # Add child to list if it is an attriute for the current component + if edge[0] in all_attributes: + parent_children[edge[1]].append(edge[0]) + + return child_parents, parent_children + + def alias_edges(self, ca_alias:dict, edges) -> List[list]: + '''Create new edges based on aliasing between an attribute and its response. + Purpose: + Create aliased edges. + For example: + If BiospecimenType (attribute) is AnalyteBiospecimenType (response) + Then ShippingConditionType (conditional requirement) is now required. + In the model the edges that connect these options are: + (AnalyteBiospecimenType, BiospecimenType) + (ShippingConditionType, AnalyteBiospecimenType) + Use alias defined in self.get_ca_alias along to define new edges that would + directly link attributes to their conditional requirements, in this + example the new edge would be: + [ShippingConditionType, BiospecimenType] + Inputs: + ca_alias (dict): + key: alias (attribute response) + value: attribute + edges (Networkx EdgeDataView): Edges of component or dependency graph. When iterated over it works like a list of tuples. + Output: + aliased_edges (List[lists]) of aliased edges. + ''' + aliased_edges = [] + for i, edge in enumerate(edges): + + # construct one set of edges at a time + edge_set = [] + + # If the first edge has an alias add alias to the first position in the current edge set + if edge[0] in ca_alias.keys(): + edge_set.append(ca_alias[edge[0]]) + + # Else add the non-aliased edge + else: + edge_set.append(edge[0]) + + # If the secod edge has an alias add alias to the first position in the current edge set + if edge[1] in ca_alias.keys(): + edge_set.append(ca_alias[edge[1]]) + + # Else add the non-aliased edge + else: + edge_set.append(edge[1]) + + # Add new edge set to a the list of aliased edges. + aliased_edges.append(edge_set) + + return aliased_edges + + def prune_expand_topological_gen(self, topological_gen, all_attributes, conditional_attributes): + ''' + Purpose: + Remake topological_gen with only relevant nodes. + This is necessary since for the figure this function is being used in we + only want to display a portion of the graph data. + In addition to only displaying relevant nodes, we want to add conditional + attributes to topological_gen so we can visualize them in the tangled tree + as well. + Input: + topological_gen (List[list]): Indicates layers of nodes. + all_attributes (list): all attributes associated with a particular component. + conditional_attributes (list): List of conditional attributes for a particular component + Output: + new_top_gen (List[list]): mimics structure of topological_gen but only + includes the nodes we want + ''' + + pruned_topological_gen = [] + + # For each layer(gen) in the topological generation list + for i, layer in enumerate(topological_gen): + + current_layer = [] + next_layer = [] + + # For each node in the layer + for node in layer: + + # If the node is relevant to this component and is not a conditional attribute add it to the current layer. + if node in all_attributes and node not in conditional_attributes: + current_layer.append(node) + + # If its a conditional attribute add it to a followup layer. + if node in conditional_attributes: + next_layer.append(node) + + # Added layers to new pruned_topological_gen list + if current_layer: + pruned_topological_gen.append(current_layer) + if next_layer: + pruned_topological_gen.append(next_layer) + + return pruned_topological_gen + + def get_base_layers(self, topological_gen, child_parents, source_nodes, cn): + ''' + Purpose: + Reconfigure topological gen to move things back appropriate layers if + they would have a back reference. + + The Tangle Tree figure requrires an acyclic directed graph that has additional + layering rules between connected nodes. + - If there is a backward connection then the line connecting them will + break (this would suggest a cyclic connection.) + - Additionally if two or more nodes are connecting to a downstream node it is + best to put both parent nodes at the same level, if possible, to + prevent line breaks. + - Also want to move any children nodes one layer below + the parent node(s). If there are multiple parents, put one layer below the + parent that is furthest from the origin. + + This is an iterative process that needs to run twice to move all the nodes to their + appropriate positions. + Input: + topological_gen: list of lists. Indicates layers of nodes. + child_parents (dict): + key: child + value: list of the childs parents + source_nodes: list, list of nodes that do not have a parent. + cn: str, component name, default='' + Output: + base_layers: dict, key: component name, value: layer + represents initial layering of toplogical_gen + base_layers_copy_copy: dict, key: component name, value: layer + represents the final layering after moving the components/attributes to + their desired layer.c + ''' + # Convert topological_gen to a dictionary + base_layers = {com:i for i, lev in enumerate(topological_gen) + for com in lev} + + # Make another version to iterate on -- Cant set to equal or will overwrite the original. + base_layers_copy = {com:i for i, lev in enumerate(topological_gen) + for com in lev} + + # Move child nodes one node downstream of their parents. + for level in topological_gen: + for node in level: + + # Check if node has a parent. + if node in child_parents.keys(): + + #node_level = base_layers[node] + # Look at the parents for the node. + parent_levels = [] + for par in child_parents[node]: + + # Get the layer the parent is located at. + parent_levels.append(base_layers[par]) + + # Get the max layer a parent of the node can be found. + max_parent_level = max(parent_levels) + + # Move the node one layer beyond the max parent node position, so it will be downstream of its parents. + base_layers_copy[node] = max_parent_level + 1 + + # Make another version of updated positions iterate on further. + base_layers_copy_copy = base_layers_copy + + # Move parental source nodes if necessary. + for level in topological_gen: + for node in level: + + # Check if node has any parents. + if node in child_parents.keys(): + parent_levels = [] + modify_par = [] + + # For each parent get their position. + for par in child_parents[node]: + parent_levels.append(base_layers_copy[par]) + + # If one of the parents is a source node move + # it to the same level as the other nodes the child connects to so + # that the connections will not be backwards (and result in a broken line) + for par in child_parents[node]: + + # For a given parent determine if its a source node and that the parents + # are not already at level 0, and the parent is not the current component node. + if (par in source_nodes and + (parent_levels.count(parent_levels[0]) != len(parent_levels)) + and par != cn): + + # If so, remove its position from parent_levels + parent_levels.remove(base_layers_copy[par]) + + # Add this parent to a list of parental positions to modify later. + modify_par.append(par) + + # Get the new max parent level for this node. + max_parent_level = max(parent_levels) + + # Move the node one position downstream of its max parent level. + base_layers_copy_copy[node] = max_parent_level + 1 + + # For each parental position to modify, move the parents level up to the max_parent_level. + for par in modify_par: + base_layers_copy_copy[par] = max_parent_level + + return base_layers, base_layers_copy_copy + + def adjust_node_placement(self, base_layers_copy_copy, base_layers, topological_gen): + '''Reorder nodes within topological_generations to match how they were ordered in base_layers_copy_copy + Input: + topological_gen: list of lists. Indicates layers of nodes. + base_layers: dict, key: component name, value: layer + represents initial layering of toplogical_gen + base_layers_copy_copy: dict, key: component name, value: layer + represents the final layering after moving the components/attributes to + their desired layer. + Output: + topological_gen: same format but as the incoming topologial_gen but + ordered to match base_layers_copy_copy. + ''' + if self.figure_type == 'component': + # For each node get its new layer in the tangled tree + for node, i in base_layers_copy_copy.items(): + + # Check if node is not already in the proper layer + if node not in topological_gen[i]: + + # If not put it in the appropriate layer + topological_gen[i].append(node) + + # Remove from inappropriate layer. + topological_gen[base_layers[node]].remove(node) + + elif self.figure_type == 'dependency': + for node, i in base_layers_copy_copy.items(): + + # Check if the location of the node is more than the number of + # layers topological gen current handles + if i > len(topological_gen) - 1: + + # If so, add node to new node at the end of topological_gen + topological_gen.append([node]) + + # Remove the node from its previous position. + topological_gen[base_layers[node]].remove(node) + + # Else, check if node is not already in the proper layer + elif node not in topological_gen[i]: + + # If not put it in the appropriate layer + topological_gen[i].append(node) + + # Remove from inappropriate layer. + topological_gen[base_layers[node]].remove(node) + return topological_gen + + def move_source_nodes_to_bottom_of_layer(self, node_layers, source_nodes): + '''For aesthetic purposes move source nodes to the bottom of their respective layers. + Input: + node_layers (List(list)): Lists of lists of each layer and the nodes contained in that layer as strings. + source_nodes (list): list of nodes that do not have a parent. + Output: + node_layers (List(list)): modified to move source nodes to the bottom of each layer. + ''' + for i, layer in enumerate(node_layers): + nodes_to_move = [] + for node in layer: + if node in source_nodes: + nodes_to_move.append(node) + for node in nodes_to_move: + node_layers[i].remove(node) + node_layers[i].append(node) + return node_layers + + def get_layers_dict_list(self, node_layers, child_parents, parent_children, all_parent_children): + '''Convert node_layers to a list of lists of dictionaries that specifies each node and its parents (if applicable). + Inputs: + node_layers: list of lists of each layer and the nodes contained in that layer as strings. + child_parents (dict): + key: child + value: list of the childs parents + parent_children (dict): + key: parent + value: list of the parents children + Outputs: + layers_list (List(list): list of lists of dictionaries that specifies each node and its parents (if applicable) + ''' + num_layers = len(node_layers) + layers_list = [[] for i in range(0, num_layers)] + for i, layer in enumerate(node_layers): + for node in layer: + if node in child_parents.keys(): + parents = child_parents[node] + else: + parents = [] + + if node in parent_children.keys(): + direct_children = parent_children[node] + else: + direct_children = [] + + if node in all_parent_children.keys(): + all_children = all_parent_children[node] + else: + all_children = [] + layers_list[i].append({'id': node, 'parents': parents, 'direct_children': direct_children, 'children': all_children}) + + return layers_list + + def get_node_layers_json(self, topological_gen, source_nodes, child_parents, parent_children, cn='', all_parent_children=None): + '''Return all the layers of a single tangled tree as a JSON String. + Inputs: + topological_gen:list of lists. Indicates layers of nodes. + source_nodes: list of nodes that do not have a parent. + child_parents (dict): + key: child + value: list of the childs parents + parent_children (dict): + key: parent + value: list of the parents children + all_parent_children (dict): + key: parent + value: list of the parents children (including all downstream nodes). Default to an empty dictionary + Outputs: + layers_json (JSON String): Layers of nodes in the tangled tree as a json string. + ''' + + base_layers, base_layers_copy_copy = self.get_base_layers(topological_gen, + child_parents, source_nodes, cn) + + # Rearrange node_layers to follow the pattern laid out in component layers. + node_layers = self.adjust_node_placement(base_layers_copy_copy, + base_layers, topological_gen) + + # Move source nodes to the bottom of each layer. + node_layers = self.move_source_nodes_to_bottom_of_layer(node_layers, source_nodes) + + # Convert layers to a list of dictionaries + if not all_parent_children: + # default to an empty dictionary + all_parent_children = dict() + + layers_dicts = self.get_layers_dict_list(node_layers, child_parents, parent_children, all_parent_children) + + # Convert dictionary to a JSON string + layers_json = json.dumps(layers_dicts) + + return layers_json + + def save_outputs(self, save_file, layers_json, cn='', all_layers=[]): + ''' + Inputs: + save_file (bool): Indicates whether to save a file locally or not.: + layers_json (JSON String): Layers of nodes in the tangled tree as a json string. + cn (str): component name, default='' + all_layers (list of json strings): Each string represents contains the layers for a single tangled tree. + If a dependency figure the list is added to each time this function is called, so starts incomplete. + default=[]. + Outputs: + all_layers (list of json strings): + If save_file == False: Each string represents contains the layers for a single tangled tree. + If save_file ==True: is an empty list. + ''' + if save_file == True: + if cn: + output_file_name = f"{self.schema_abbr}_{self.figure_type}_{cn}_tangled_tree.json" + else: + output_file_name = f"{self.schema_abbr}_{self.figure_type}_tangled_tree.json" + with open(os.path.join(self.json_output_path, output_file_name), 'w') as outfile: + outfile.write(layers_json) + logger.info(f"Tangled Tree JSON String saved to {os.path.join(self.json_output_path, output_file_name)}.") + all_layers = layers_json + elif save_file == False: + all_layers.append(layers_json) + return all_layers + + def get_ancestors_nodes(self, subgraph, components): + """ + Inputs: + subgraph: networkX graph object + components: a list of nodes + outputs: + all_parent_children: a dictionary that indicates a list of children (including all the intermediate children) of a given node + """ + all_parent_children = {} + for component in components: + all_ancestors = self.sg.se.get_nodes_ancestors(subgraph, component) + all_parent_children[component] = all_ancestors + + return all_parent_children + + def get_tangled_tree_layers(self, save_file=True): + '''Based on user indicated figure type, construct the layers of nodes of a tangled tree. + Inputs: + save_file (bool): Indicates whether to save a file locally or not. + Outputs: + all_layers (list of json strings): + If save_file == False: Each string represents contains the layers for a single tangled tree. + If save_file ==True: is an empty list. + + Note on Dependency Tangled Tree: + If there are many conditional requirements associated with a depependency, and those + conditional requirements have overlapping attributes associated with them + the tangled tree will only report one + + ''' + # Gather the data model's, topological generations, nodes and edges + topological_gen, nodes, edges, subg = self.get_topological_generations() + + if self.figure_type == 'component': + # Gather all source nodes + source_nodes = self.find_source_nodes(nodes, edges) + + # Map all children to their parents and vice versa + child_parents, parent_children = self.get_parent_child_dictionary(nodes, edges) + + # find all the downstream nodes + all_parent_children = self.get_ancestors_nodes(subg, parent_children.keys()) + + # Get the layers that each node belongs to. + layers_json = self.get_node_layers_json(topological_gen, source_nodes, child_parents, parent_children, all_parent_children=all_parent_children) + + # If indicated save outputs locally else gather all layers. + all_layers = self.save_outputs(save_file, layers_json) + + if self.figure_type == 'dependency': + # Get component digraph and nodes. + component_dg = self.sg.se.get_digraph_by_edge_type('requiresComponent') + component_nodes = component_dg.nodes() + + # Get table of attributes. + attributes_csv_str = self.ae.parse_attributes(save_file=False) + attributes_df = pd.read_table(StringIO(attributes_csv_str), sep=",") + + + all_layers =[] + for cn in component_nodes: + # Gather attribute and dependency information per node + conditional_attributes, ca_alias, all_attributes = self.gather_component_dependency_info(cn, attributes_df) + + # Gather all source nodes + source_nodes = self.find_source_nodes(component_nodes, edges, all_attributes) + + # Alias the conditional requirement edge back to its actual parent label, + # then apply aliasing back to the edges + aliased_edges = self.alias_edges(ca_alias, edges) + + # Gather relationships between children and their parents. + child_parents, parent_children = self.get_parent_child_dictionary(nodes, + aliased_edges, all_attributes) + + # Remake topological_gen so it has only relevant nodes. + pruned_topological_gen = self.prune_expand_topological_gen(topological_gen, all_attributes, conditional_attributes) + + # Get the layers that each node belongs to. + layers_json = self.get_node_layers_json(pruned_topological_gen, source_nodes, child_parents, parent_children, cn) + + # If indicated save outputs locally else, gather all layers. + all_layers = self.save_outputs(save_file, layers_json, cn, all_layers) + return all_layers + + \ No newline at end of file diff --git a/tests/data/example.model.jsonld b/tests/data/example.model.jsonld index bce96a4dc..8d468a30e 100644 --- a/tests/data/example.model.jsonld +++ b/tests/data/example.model.jsonld @@ -2188,6 +2188,28 @@ "list strict" ] }, + { + "@id": "bts:ClinicalData", + "@type": "rdfs:Class", + "rdfs:comment": "TBD", + "rdfs:label": "ClinicalData", + "rdfs:subClassOf": [ + { + "@id": "bts:ValidValue" + } + ], + "schema:isPartOf": { + "@id": "http://schema.biothings.io" + }, + "sms:displayName": "Clinical Data", + "sms:required": "sms:false", + "sms:requiresDependency": [ + { + "@id": "bts:FamilyHistory" + } + ], + "sms:validationRules": [] + }, { "@id": "bts:Biospecimen", "@type": "rdfs:Class", diff --git a/tests/data/test_config.yml b/tests/data/test_config.yml index 96f5ebfd2..b8c1b8202 100644 --- a/tests/data/test_config.yml +++ b/tests/data/test_config.yml @@ -1,7 +1,7 @@ definitions: creds_path: "../../credentials.json" token_pickle: "token.pickle" - # synapse_config: "../../.synapseConfig" + #synapse_config: "../../.synapseConfig" ### Note: this key is required for people who use Synapse token authentication approach. ### If you run into errors similar to KeyError: 'synpase_config', you might want to consider adding this key. diff --git a/tests/test_viz.py b/tests/test_viz.py new file mode 100644 index 000000000..954f45df5 --- /dev/null +++ b/tests/test_viz.py @@ -0,0 +1,134 @@ +from io import StringIO +import json +import os +import pandas as pd +import logging + +import pytest + +from schematic.visualization.attributes_explorer import AttributesExplorer +from schematic.visualization.tangled_tree import TangledTree + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +@pytest.fixture +def attributes_explorer(helpers): + + # Get JSONLD file path + path_to_jsonld = helpers.get_data_path("example.model.jsonld") + + # Initialize TangledTree + attributes_explorer = AttributesExplorer(path_to_jsonld) + yield attributes_explorer + +@pytest.fixture +def tangled_tree(helpers): + figure_type = 'component' + + # Get JSONLD file path + path_to_jsonld = helpers.get_data_path("example.model.jsonld") + + # Initialize TangledTree + tangled_tree = TangledTree(path_to_jsonld, figure_type) + yield tangled_tree + +class TestVisualization: + def test_ae(self, helpers, attributes_explorer): + attributes_str = attributes_explorer.parse_attributes(save_file=False) + + df = pd.read_csv(StringIO(attributes_str)).drop(columns=['Unnamed: 0']) + + # For the attributes df define expected columns + expect_col_names = ['Attribute', 'Label', 'Description', + 'Required', 'Cond_Req', 'Valid Values', 'Conditional Requirements', + 'Component'] + expected_components = ['Biospecimen', 'Patient', 'BulkRNA-seqAssay'] + + # Get actual values + actual_column_names = df.columns.tolist() + actual_components = df.loc[df['Attribute']== 'Component']['Component'].tolist() + + assert actual_column_names == expect_col_names + assert actual_components == expected_components + + def test_text(self, helpers, tangled_tree): + text_format = 'plain' + + # Get text for tangled tree. + text_str = tangled_tree.get_text_for_tangled_tree(text_format, save_file=False) + + df= pd.read_csv(StringIO(text_str)).drop(columns=['Unnamed: 0']) + + # Define expected text associated with 'Patient' and 'Imaging' tree + expected_patient_text = ['Biospecimen', 'BulkRNA-seqAssay'] + + expected_Biospecimen_text = ['BulkRNA-seqAssay'] + + # Get actual text + actual_patient_text = df.loc[df['Component'] == 'Patient']['name'].tolist() + + actual_Biospecimen_text = df.loc[df['Component'] == 'Biospecimen']['name'].tolist() + + # Check some random pieces of text we would assume to be in the plain text. + assert ((df['Component'] == 'Patient') & (df['name'] == 'Biospecimen')).any() + + # Check the extracted text matches expected text. + assert actual_patient_text == expected_patient_text + assert actual_Biospecimen_text == expected_Biospecimen_text + + def test_layers(self, helpers, tangled_tree): + layers_str = tangled_tree.get_tangled_tree_layers(save_file=False)[0] + + # Define what we expect the layers list to be. + expected_layers_list = [[ + { + "id": "Patient", + "parents": [], + "direct_children": [ + "Biospecimen" + ], + "children": [ + "BulkRNA-seqAssay", + "Biospecimen", + ] + } + ], + [ + { + "id": "Biospecimen", + "parents": [ + "Patient" + ], + "direct_children": [ + "BulkRNA-seqAssay" + ], + "children": [ + "BulkRNA-seqAssay" + ] + } + ], + [ + { + "id": "BulkRNA-seqAssay", + "parents": [ + "Biospecimen" + ], + "direct_children": [], + "children": [] + } + ] + ] + + + # Get actual layers. + actual_layers_list = json.loads(layers_str) + + # compare + for index, item in enumerate(actual_layers_list): + assert item[0]["id"] == expected_layers_list[index][0]["id"] + assert item[0]["parents"] == expected_layers_list[index][0]["parents"] + assert item[0]["direct_children"] == expected_layers_list[index][0]["direct_children"] + + # ensure that order of children doesn't matterß + assert set(item[0]["children"]) == set(expected_layers_list[index][0]["children"])