Skip to content

Commit

Permalink
Merge pull request #3 from jdkent/end/testing
Browse files Browse the repository at this point in the history
[ENH] add pipeline configuration/structure
  • Loading branch information
jdkent authored Nov 25, 2024
2 parents 5c746a6 + 8c5237f commit 830dd60
Show file tree
Hide file tree
Showing 63 changed files with 9,103 additions and 172 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Install and Test

on:
push:
branches:
- main
pull_request:
branches:
- main

concurrency:
group: testing-${{ github.ref }}
cancel-in-progress: true

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.8'

- name: Install dependencies
run: pip install -e .[tests,participant_demographics,word_count]

- name: Test with pytest
env:
OPENAI_API_KEY: "fake_key"
run: |
cp .keys.example .keys
pytest
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ ipython_config.py
dmypy.json

# Environments
.keys
.env
.venv
env/
Expand Down Expand Up @@ -106,3 +107,5 @@ venv.bak/
*.swp
.swo
.swn

_version.py
1 change: 1 addition & 0 deletions .keys.example
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
OPENAI_CLIENT_API_KEY=fake_key
125 changes: 117 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# neurostore-text-extraction
# ns-text-extraction-workflows

This repository contains pipelines and scripts for extracting features from text using Natural Language Processing (NLP), Large Language Models (LLMs),
and other algorithms across thousands of articles in the NeuroStore database.
Expand All @@ -11,16 +11,125 @@ To install the necessary dependencies, run:


## Usage
### Running pipelines
Executable workflows in `pipelines/{pipeline_name}/run.py` will take as input standardized pubget-style text inputs (row row per article).

### Overview

Executable workflows in `pipelines/{pipeline_name}/run.py` will have a specific class that implements the `run` method.
The `run` method will take a `Dataset` object and an output directory as input, and will output extracted features to the output directory in the following format:

# the pipeline info file contains configuration information about the pipeline
output_dir/{pipeline_name}/{pipeline_version}/{input_hash}/pipeline_info.json
# the study results file contains whatever extracted features from the study by the pipeline
output_dir/{pipeline_name}/{pipeline_version}/{input_hash}/{study_id}/results.json
# the study info file contains metadata about the inputs to the pipeline
output_dir/{pipeline_name}/{pipeline_version}/{input_hash}/{study_id}/info.json

You will need to create a dataset object that contains the studies you want to process, and then pass that dataset object to the `run` method of the pipeline class.

Run all available pipelines and harmonize outputs using CLI (todo)

Pipelines can either be "dependent" or "independent".
Dependent pipelines are those whose outputs for each individual study depend on the outputs of other studies.
Independent pipelines are those whose outputs for each individual study do not depend on the outputs of other studies.

## Note(s) for self

#### Each study is independently processed

1) scenario 1: nothing changed
2) scenario 2: a study was added
3) scenario 3: a study was changed

`info.json` in the output directory
increment (value): 0
date: 2021-09-01

ns-pond: no hashing
we will hash based on the inputs to the pipeline and then store the hash in the info.json in the output directory.

have a place for the raw output of the API/external service.
raw.json
and clean.json
clean function for a pipeline output, that can be used to clean the output of a pipeline

#### Each study is processed in the context of all other studies

Have a dev version
only include openaccess papers
pipeline name plus version then hash runs
pipeline/v1.0.0/hash_run-01

the hash is just the hash of the pipeline config


independent studies: copy over the studies that have been processed and havent been changed
independent studies: re-run the pipeline on studies that have been changed


## Notes

# study independent results:
/pipline_name/v1.0.0/conf-#000A/run-01/study-01/input.json
/study-02/input.json
/results.json

/pipline_name/v1.0.0/conf-#000A/run-02/study-03/

# study dependent results:
/pipline_name/v1.0.0/#sbqA_run-01/study-01
/study-02
/pipline_name/v1.0.0/#sbqA_run-02/study-01
/study-02
/study-03

Re-Run study independent pipeline:
1. Update with new - create new directory with only updated studies
2. Force re-run for a given set of inputs (from a particular directory, we are not using inheritance here)

Re-Run study dependent pipeline:
1. Re-run all


after update:
database.study_results_table
id, study, conf, run:
0 01 #000A, 01
1 02 #000A, 01
2 03 #000A, 02


after re-run:
database.study_results_table
id, study, conf, run:
0 01 #000A, 01
1 02 #000A, 01
2 03 #000A, 02
3 01 #000A, 02
4 02 #000A, 02

## Tf-idf gets it's own unique table
## participant demographics get their own unique table


## have a table for feature names?
database.study_results_values_table
id, study_results_table_fk, feature(name), value, certainty


database.pipeline_table
id, pipline_name, pipline_description, version, study_dependent?, ace_compatiable?, pubget_compat?, Derivative
0, gpt3_embed, wat, 1.0.0, False, True, True, False
1, HDBSCABN, wat, 1.0.0, True, False, False, True
2, TF-IDF, wat, 1.0.0, True, False, True, False
3, embed_and_HDBSCAN, wat, 1.0.0, True, True, True, False

database.pipeline_configs_table
id, pipline_fk, configuration, configuration_hash,
0, 0, {use_cheap_option: true}, #000A
1, 1, {dimensions: 10}, #XXXX

database.pipeline_run_table
id, pipline_fk, config_hash_fk, run_index, description, date

### Pipeline outputs
Pipeline results are output to `data/outputs/{input_hash}/{pipeline_name}/{arghash-timestamp}`.
Outputs include extracted features `features.csv`, feature descriptions `descriptions.json`, and extraction information `info.json`.

Pipeline outputs are not stored as part of this repository.
See `ns-text-extraction-outputs` sub repository.
## TODO: how do I represent results in the database?
Empty file removed __init__.py
Empty file.
8 changes: 8 additions & 0 deletions ns_pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .participant_demographics import ParticipantDemographicsExtractor
from .word_count import WordCountExtractor, WordDevianceExtractor

__all__ = [
"ParticipantDemographicsExtractor",
"WordCountExtractor",
"WordDevianceExtractor",
]
182 changes: 182 additions & 0 deletions ns_pipelines/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
"""Dataset creation for processing inputs."""
from copy import deepcopy
from dataclasses import dataclass, field
from pathlib import Path
import re
import json
from typing import Union, Optional

INPUTS = [
"text",
"coordinates",
"metadata",
"html",
"xml",
"tables",
"tables_xml",
]

@dataclass
class AceRaw:
html: Path

def __post_init__(self):
# Preprocessing logic for AceRaw can be added here if needed
if not self.html.exists():
raise ValueError(f"HTML file {self.html} does not exist.")

@dataclass
class PubgetRaw:
xml: Path
tables: dict = field(default_factory=dict)
tables_xml: Path = None

def __post_init__(self):
# Load tables and assign file paths
if not self.xml.exists():
raise ValueError(f"XML file {self.xml} does not exist.")

if self.tables_xml and not self.tables_xml.exists():
raise ValueError(f"Tables XML file {self.tables_xml} does not exist.")

if self.tables_xml:
tables_files = list(self.tables_xml.parent.glob("*.xml"))
tables_files = [t for t in tables_files if t.name != self.tables_xml.name]

num_tables = len(tables_files) // 2
self.tables = {f'{t:03}': {"metadata": None, "contents": None} for t in range(num_tables)}

for tf in tables_files:
table_number = tf.stem.split("_")[1]
if tf.suffix == ".json":
key = "metadata"
else:
key = "contents"
self.tables[table_number][key] = tf

@dataclass
class ProcessedData:
coordinates: Path = None
text: Path = None
metadata: Path = None
raw: Optional[Union['PubgetRaw', 'AceRaw']] = field(default=None)

def __post_init__(self):
# Ensure the processed data files exist
if self.coordinates and not self.coordinates.exists():
raise ValueError(f"Coordinates file {self.coordinates} does not exist.")
if self.text and not self.text.exists():
raise ValueError(f"Text file {self.text} does not exist.")
if self.metadata and not self.metadata.exists():
raise ValueError(f"Metadata file {self.metadata} does not exist.")

@dataclass
class Study:
study_dir: Path
dbid: str = None
doi: str = None
pmid: str = None
pmcid: str = None
ace: ProcessedData = None
pubget: ProcessedData = None

def __post_init__(self):
self.dbid = self.study_dir.name

# Load identifiers
with open((self.study_dir / "identifiers.json"), "r") as ident_fp:
ids = json.load(ident_fp)

# Setup the processed data objects
# Load AceRaw if available
source_dir = self.study_dir / "source"
ace_raw = None
pubget_raw = None

# Load AceRaw if available
ace_path = source_dir / "ace" / f"{self.pmid}.html"
if ace_path.exists():
ace_raw = AceRaw(html=ace_path)

# Load PubgetRaw if available
pubget_dir = source_dir / "pubget"
pubget_xml_path = pubget_dir / f"{self.pmcid}.xml"
tables_xml_path = pubget_dir / "tables" / "tables.xml"
if pubget_xml_path.exists():
pubget_raw = PubgetRaw(
xml=pubget_xml_path,
tables_xml=tables_xml_path
)

# Load processed data
for t in ["ace", "pubget"]:
processed_dir = self.study_dir / "processed" / t
if processed_dir.exists():
processed = ProcessedData(
coordinates=processed_dir / "coordinates.csv",
text=processed_dir / "text.txt",
metadata=processed_dir / "metadata.json",
raw = ace_raw if t == "ace" else pubget_raw
)

setattr(self, t, processed)


class Dataset:
"""Dataset class for processing inputs."""

def __init__(self, input_directory):
"""Initialize the dataset."""
self.data = self.load_directory(input_directory)

def slice(self, ids):
"""Slice the dataset."""
deepcopy_obj = deepcopy(self)
deepcopy_obj.data = {k: v for k, v in deepcopy_obj.data.items() if k in ids}
return deepcopy_obj

def load_directory(self, input_directory):
"""Load the input directory."""
pattern = re.compile(r'^[a-zA-Z0-9]{12}$')
sub_directories = input_directory.glob("[0-9A-Za-z]*")
study_directories = [
dir_ for dir_ in sub_directories
if dir_.is_dir() and pattern.match(dir_.name)
]

dset_data = {}

for study_dir in study_directories:
study_obj = Study(study_dir=study_dir)

dset_data[study_obj.dbid] = study_obj

return dset_data
def __len__(self):
"""Return the length of the dataset."""
return len(self.data)

def __getitem__(self, idx):
"""Return an item from the dataset."""
return self.data[idx]



class PipelineInputFilter:
"""Filter for pipeline inputs."""

def __init__(self, pipeline, output_directory, overwrite=False):
"""Initialize the filter.
pipeline (Pipeline): The pipeline to filter.
output_directory (str): The output directory where the pipeline has been previously run.
overwrite (bool): Whether to overwrite the existing output
"""

def filter(self, dataset):
"""Filter the dataset."""
pass

def load_outputs(self):
"""Load the outputs."""
pass
5 changes: 5 additions & 0 deletions ns_pipelines/participant_demographics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .model import ParticipantDemographicsExtractor

__all__ = [
"ParticipantDemographicsExtractor",
]
Loading

0 comments on commit 830dd60

Please sign in to comment.