Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] add pipeline configuration/structure #3

Merged
merged 44 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6fe2bfb
adding testing
jdkent Oct 22, 2024
bbbfc9d
add dependent pipeline
jdkent Oct 24, 2024
60c0978
mark pipeline as (in)dependent
jdkent Oct 24, 2024
89d45f3
wip: start modifying the existing pipeline
jdkent Oct 24, 2024
5e8bafc
merge in new changes
jdkent Oct 24, 2024
90bee39
Restructure package
adelavega Oct 30, 2024
cb16fc9
add filter_inputs function
jdkent Oct 30, 2024
58bc727
Refactor init logic to dataclasses
adelavega Oct 30, 2024
cfff8bb
Both group and independent can use the same function name ('function'…
adelavega Oct 30, 2024
79bfdcc
group_function to function
adelavega Oct 30, 2024
0625124
_hash_attrs instead
adelavega Oct 30, 2024
c26ef1b
Set default _hash_attrs
adelavega Oct 30, 2024
e907624
refactor based on feedback
jdkent Oct 31, 2024
85b20dd
add pipeline name to output path
jdkent Oct 31, 2024
ddd5b67
wip: modify readme
jdkent Oct 31, 2024
23bb537
fix merge
jdkent Oct 31, 2024
c99c83f
add tests dependencies
jdkent Oct 31, 2024
a78f241
add test for participant demographics
jdkent Nov 14, 2024
cdbdec2
opensource data
jdkent Nov 15, 2024
d1e2a31
remove old functions
jdkent Nov 15, 2024
cd5bb83
commit the cassette
jdkent Nov 15, 2024
99095fb
add dependencies
jdkent Nov 15, 2024
ef0b25d
allow installable pyproject
jdkent Nov 15, 2024
0839a6e
move test directory and remove top level __init__
jdkent Nov 15, 2024
585fc21
try underscores
jdkent Nov 15, 2024
5cdfc6d
Revert "allow installable pyproject"
jdkent Nov 15, 2024
306e9ec
Revert "Revert "allow installable pyproject""
jdkent Nov 15, 2024
693cb76
Revert "try underscores"
jdkent Nov 15, 2024
c3b5767
Revert "move test directory and remove top level __init__"
jdkent Nov 15, 2024
8e7152f
remove init
jdkent Nov 15, 2024
20af580
remove old files
jdkent Nov 15, 2024
5cff6be
switch to version 5
jdkent Nov 15, 2024
194e9b1
use editable install
jdkent Nov 15, 2024
6f45fba
trigger variable
jdkent Nov 15, 2024
b6e26b0
add fake key
jdkent Nov 15, 2024
08e534a
Update ns_pipelines/word_count/run.py
jdkent Nov 16, 2024
e8108fd
Update ns_pipelines/participant_demographics/run.py
jdkent Nov 16, 2024
c366e61
Update ns_pipelines/word_count/run.py
jdkent Nov 18, 2024
e1fcd2b
Update ns_pipelines/word_count/run.py
jdkent Nov 18, 2024
01a70f0
Update ns_pipelines/participant_demographics/run.py
jdkent Nov 18, 2024
ce537b8
Update ns_pipelines/word_count/run.py
jdkent Nov 19, 2024
44ad3c6
Update ns_pipelines/word_count/run.py
jdkent Nov 19, 2024
35c09aa
change the names
jdkent Nov 21, 2024
8c5237f
work with .keys file
jdkent Nov 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Install and Test

on:
push:
branches:
- main
pull_request:
branches:
- main

concurrency:
group: testing-${{ github.ref }}
cancel-in-progress: true

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.8'

- name: Install dependencies
run: pip install -e .[tests,participant_demographics,word_count]

- name: Test with pytest
env:
OPENAI_API_KEY: "fake_key"
run: |
cp .keys.example .keys
pytest
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ ipython_config.py
dmypy.json

# Environments
.keys
.env
.venv
env/
Expand Down Expand Up @@ -106,3 +107,5 @@ venv.bak/
*.swp
.swo
.swn

_version.py
1 change: 1 addition & 0 deletions .keys.example
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
OPENAI_CLIENT_API_KEY=fake_key
125 changes: 117 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# neurostore-text-extraction
# ns-text-extraction-workflows

This repository contains pipelines and scripts for extracting features from text using Natural Language Processing (NLP), Large Language Models (LLMs),
and other algorithms across thousands of articles in the NeuroStore database.
Expand All @@ -11,16 +11,125 @@ To install the necessary dependencies, run:


## Usage
### Running pipelines
Executable workflows in `pipelines/{pipeline_name}/run.py` will take as input standardized pubget-style text inputs (row row per article).

### Overview

Executable workflows in `pipelines/{pipeline_name}/run.py` will have a specific class that implements the `run` method.
The `run` method will take a `Dataset` object and an output directory as input, and will output extracted features to the output directory in the following format:

# the pipeline info file contains configuration information about the pipeline
output_dir/{pipeline_name}/{pipeline_version}/{input_hash}/pipeline_info.json
# the study results file contains whatever extracted features from the study by the pipeline
output_dir/{pipeline_name}/{pipeline_version}/{input_hash}/{study_id}/results.json
# the study info file contains metadata about the inputs to the pipeline
output_dir/{pipeline_name}/{pipeline_version}/{input_hash}/{study_id}/info.json

You will need to create a dataset object that contains the studies you want to process, and then pass that dataset object to the `run` method of the pipeline class.

Run all available pipelines and harmonize outputs using CLI (todo)

Pipelines can either be "dependent" or "independent".
Dependent pipelines are those whose outputs for each individual study depend on the outputs of other studies.
Independent pipelines are those whose outputs for each individual study do not depend on the outputs of other studies.

## Note(s) for self

#### Each study is independently processed

1) scenario 1: nothing changed
2) scenario 2: a study was added
3) scenario 3: a study was changed

`info.json` in the output directory
increment (value): 0
date: 2021-09-01

ns-pond: no hashing
we will hash based on the inputs to the pipeline and then store the hash in the info.json in the output directory.

have a place for the raw output of the API/external service.
raw.json
and clean.json
clean function for a pipeline output, that can be used to clean the output of a pipeline

#### Each study is processed in the context of all other studies

Have a dev version
only include openaccess papers
pipeline name plus version then hash runs
pipeline/v1.0.0/hash_run-01

the hash is just the hash of the pipeline config


independent studies: copy over the studies that have been processed and havent been changed
independent studies: re-run the pipeline on studies that have been changed


## Notes

# study independent results:
/pipline_name/v1.0.0/conf-#000A/run-01/study-01/input.json
/study-02/input.json
/results.json

/pipline_name/v1.0.0/conf-#000A/run-02/study-03/

# study dependent results:
/pipline_name/v1.0.0/#sbqA_run-01/study-01
/study-02
/pipline_name/v1.0.0/#sbqA_run-02/study-01
/study-02
/study-03

Re-Run study independent pipeline:
1. Update with new - create new directory with only updated studies
2. Force re-run for a given set of inputs (from a particular directory, we are not using inheritance here)

Re-Run study dependent pipeline:
1. Re-run all


after update:
database.study_results_table
id, study, conf, run:
0 01 #000A, 01
1 02 #000A, 01
2 03 #000A, 02


after re-run:
database.study_results_table
id, study, conf, run:
0 01 #000A, 01
1 02 #000A, 01
2 03 #000A, 02
3 01 #000A, 02
4 02 #000A, 02

## Tf-idf gets it's own unique table
## participant demographics get their own unique table


## have a table for feature names?
database.study_results_values_table
id, study_results_table_fk, feature(name), value, certainty


database.pipeline_table
id, pipline_name, pipline_description, version, study_dependent?, ace_compatiable?, pubget_compat?, Derivative
0, gpt3_embed, wat, 1.0.0, False, True, True, False
1, HDBSCABN, wat, 1.0.0, True, False, False, True
2, TF-IDF, wat, 1.0.0, True, False, True, False
3, embed_and_HDBSCAN, wat, 1.0.0, True, True, True, False

database.pipeline_configs_table
id, pipline_fk, configuration, configuration_hash,
0, 0, {use_cheap_option: true}, #000A
1, 1, {dimensions: 10}, #XXXX

database.pipeline_run_table
id, pipline_fk, config_hash_fk, run_index, description, date

### Pipeline outputs
Pipeline results are output to `data/outputs/{input_hash}/{pipeline_name}/{arghash-timestamp}`.
Outputs include extracted features `features.csv`, feature descriptions `descriptions.json`, and extraction information `info.json`.

Pipeline outputs are not stored as part of this repository.
See `ns-text-extraction-outputs` sub repository.
## TODO: how do I represent results in the database?
Empty file removed __init__.py
Empty file.
8 changes: 8 additions & 0 deletions ns_pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .participant_demographics import ParticipantDemographicsExtractor
from .word_count import WordCountExtractor, WordDevianceExtractor

__all__ = [
"ParticipantDemographicsExtractor",
"WordCountExtractor",
"WordDevianceExtractor",
]
182 changes: 182 additions & 0 deletions ns_pipelines/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
"""Dataset creation for processing inputs."""
from copy import deepcopy
from dataclasses import dataclass, field
from pathlib import Path
import re
import json
from typing import Union, Optional

INPUTS = [
"text",
"coordinates",
"metadata",
"html",
"xml",
"tables",
"tables_xml",
]

@dataclass
class AceRaw:
html: Path

def __post_init__(self):
# Preprocessing logic for AceRaw can be added here if needed
if not self.html.exists():
raise ValueError(f"HTML file {self.html} does not exist.")

@dataclass
class PubgetRaw:
xml: Path
tables: dict = field(default_factory=dict)
tables_xml: Path = None

def __post_init__(self):
# Load tables and assign file paths
if not self.xml.exists():
raise ValueError(f"XML file {self.xml} does not exist.")

if self.tables_xml and not self.tables_xml.exists():
raise ValueError(f"Tables XML file {self.tables_xml} does not exist.")

if self.tables_xml:
tables_files = list(self.tables_xml.parent.glob("*.xml"))
tables_files = [t for t in tables_files if t.name != self.tables_xml.name]

num_tables = len(tables_files) // 2
self.tables = {f'{t:03}': {"metadata": None, "contents": None} for t in range(num_tables)}

for tf in tables_files:
table_number = tf.stem.split("_")[1]
if tf.suffix == ".json":
key = "metadata"
else:
key = "contents"
self.tables[table_number][key] = tf

@dataclass
class ProcessedData:
coordinates: Path = None
text: Path = None
metadata: Path = None
raw: Optional[Union['PubgetRaw', 'AceRaw']] = field(default=None)

def __post_init__(self):
# Ensure the processed data files exist
if self.coordinates and not self.coordinates.exists():
raise ValueError(f"Coordinates file {self.coordinates} does not exist.")
if self.text and not self.text.exists():
raise ValueError(f"Text file {self.text} does not exist.")
if self.metadata and not self.metadata.exists():
raise ValueError(f"Metadata file {self.metadata} does not exist.")

@dataclass
class Study:
study_dir: Path
dbid: str = None
doi: str = None
pmid: str = None
pmcid: str = None
ace: ProcessedData = None
pubget: ProcessedData = None

def __post_init__(self):
self.dbid = self.study_dir.name

# Load identifiers
with open((self.study_dir / "identifiers.json"), "r") as ident_fp:
ids = json.load(ident_fp)

# Setup the processed data objects
# Load AceRaw if available
source_dir = self.study_dir / "source"
ace_raw = None
pubget_raw = None

# Load AceRaw if available
ace_path = source_dir / "ace" / f"{self.pmid}.html"
if ace_path.exists():
ace_raw = AceRaw(html=ace_path)

# Load PubgetRaw if available
pubget_dir = source_dir / "pubget"
pubget_xml_path = pubget_dir / f"{self.pmcid}.xml"
tables_xml_path = pubget_dir / "tables" / "tables.xml"
if pubget_xml_path.exists():
pubget_raw = PubgetRaw(
xml=pubget_xml_path,
tables_xml=tables_xml_path
)

# Load processed data
for t in ["ace", "pubget"]:
processed_dir = self.study_dir / "processed" / t
if processed_dir.exists():
processed = ProcessedData(
coordinates=processed_dir / "coordinates.csv",
text=processed_dir / "text.txt",
metadata=processed_dir / "metadata.json",
raw = ace_raw if t == "ace" else pubget_raw
)

setattr(self, t, processed)


class Dataset:
"""Dataset class for processing inputs."""

def __init__(self, input_directory):
"""Initialize the dataset."""
self.data = self.load_directory(input_directory)

def slice(self, ids):
"""Slice the dataset."""
deepcopy_obj = deepcopy(self)
deepcopy_obj.data = {k: v for k, v in deepcopy_obj.data.items() if k in ids}
return deepcopy_obj

def load_directory(self, input_directory):
"""Load the input directory."""
pattern = re.compile(r'^[a-zA-Z0-9]{12}$')
sub_directories = input_directory.glob("[0-9A-Za-z]*")
study_directories = [
dir_ for dir_ in sub_directories
if dir_.is_dir() and pattern.match(dir_.name)
]

dset_data = {}

for study_dir in study_directories:
study_obj = Study(study_dir=study_dir)

dset_data[study_obj.dbid] = study_obj

return dset_data
def __len__(self):
"""Return the length of the dataset."""
return len(self.data)

def __getitem__(self, idx):
"""Return an item from the dataset."""
return self.data[idx]



class PipelineInputFilter:
"""Filter for pipeline inputs."""

def __init__(self, pipeline, output_directory, overwrite=False):
"""Initialize the filter.

pipeline (Pipeline): The pipeline to filter.
output_directory (str): The output directory where the pipeline has been previously run.
overwrite (bool): Whether to overwrite the existing output
"""

def filter(self, dataset):
"""Filter the dataset."""
pass

def load_outputs(self):
"""Load the outputs."""
pass
5 changes: 5 additions & 0 deletions ns_pipelines/participant_demographics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .model import ParticipantDemographicsExtractor

__all__ = [
"ParticipantDemographicsExtractor",
]
Loading
Loading