Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: add feature tables #842

Merged
merged 12 commits into from
Jan 17, 2025
107 changes: 107 additions & 0 deletions store/neurostore/ingest/extracted_features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""Ingest extracted features into the database."""

import json
import os.path as op
from pathlib import Path
import hashlib
from dateutil.parser import parse as parse_date

from neurostore.database import db
from neurostore.models import (
Pipeline,
PipelineConfig,
PipelineRun,
PipelineRunResult,
)


def ingest_feature(feature_directory):
"""Ingest demographics data into the database."""
# read pipeline_info.json from the base feature directory
with open(op.join(feature_directory, "pipeline_info.json")) as f:
pipeline_info = json.load(f)

# search if there is an existing pipeline with the same name and version
pipeline = (
db.session.query(Pipeline)
.filter(
Pipeline.name == pipeline_info["name"],
Pipeline.version == pipeline_info["version"],
)
.first()
)
# create a pipeline if it does not exist
if not pipeline:
pipeline = Pipeline(
name=pipeline_info["name"],
version=pipeline_info["version"],
description=pipeline_info.get("description"),
study_dependent=(
True if pipeline_info.get("type", False) == "dependent" else False
),
ace_compatible="ace"
in pipeline_info.get("arguments", {}).get("input_sources", []),
pubget_compatible="pubget"
in pipeline_info.get("arguments", {}).get("input_sources", []),
derived_from=pipeline_info.get("derived_from", None),
)
db.session.add(pipeline)

# search within the pipeline and see if there are any existing pipeline configs
# that match the "arguements" field in the pipeline_info.json
# create a hash of the config arguments
config_hash = hashlib.sha256(
json.dumps(pipeline_info["arguments"]).encode()
).hexdigest()
pipeline_config = (
db.session.query(PipelineConfig)
.filter(
PipelineConfig.pipeline_id == pipeline.id,
PipelineConfig.config_hash == config_hash,
)
.first()
)
# create a pipeline config if it does not exist
if not pipeline_config:
pipeline_config = PipelineConfig(
pipeline_id=pipeline.id,
config=pipeline_info["arguments"],
config_hash=config_hash,
)
db.session.add(pipeline_config)

# create a new pipeline run
pipeline_run = PipelineRun(
pipeline_id=pipeline.id,
config_id=pipeline_config.id,
)

# get a list of all the paper directories in the feature directory
paper_dirs = [d for d in Path(feature_directory).iterdir() if d.is_dir()]

# for each subject directory, read the results.json file and the info.json file
pipeline_run_results = []
for paper_dir in paper_dirs:
with open(op.join(paper_dir, "results.json")) as f:
results = json.load(f)

with open(op.join(paper_dir, "info.json")) as f:
info = json.load(f)

# use the directory name as the base_study_id
base_study_id = paper_dir.name
# create a new result record
pipeline_run_results.append(
PipelineRunResult(
base_study_id=base_study_id,
data=results,
date_executed=parse_date(info["date"]),
file_inputs=info["inputs"],
run=pipeline_run,
)
)

db.session.add(pipeline_run)
db.session.add_all(pipeline_run_results)

db.session.commit()
10 changes: 10 additions & 0 deletions store/neurostore/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
AnnotationAnalysis,
PointValue,
AnalysisConditions,
Pipeline,
PipelineConfig,
PipelineRun,
PipelineRunResult,
PipelineRunResultVote,
)
from .auth import User, Role

Expand All @@ -31,4 +36,9 @@
"AnalysisConditions",
"User",
"Role",
"Pipeline",
"PipelineConfig",
"PipelineRun",
"PipelineRunResult",
"PipelineRunResultVote",
]
70 changes: 70 additions & 0 deletions store/neurostore/models/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ class Study(BaseMixin, db.Model):
level = db.Column(db.String)
metadata_ = db.Column(JSONB)
source = db.Column(db.String, index=True)
base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True)
source_id = db.Column(db.String, index=True)
source_updated_at = db.Column(db.DateTime(timezone=True))
base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True)
Expand Down Expand Up @@ -538,6 +539,75 @@ class PointValue(BaseMixin, db.Model):
user = relationship("User", backref=backref("point_values", passive_deletes=True))


class Pipeline(BaseMixin, db.Model):
__tablename__ = "pipelines"

name = db.Column(db.String)
description = db.Column(db.String)
version = db.Column(db.String)
study_dependent = db.Column(db.Boolean, default=False)
ace_compatible = db.Column(db.Boolean, default=False)
pubget_compatible = db.Column(db.Boolean, default=False)
derived_from = db.Column(db.Text)


class PipelineConfig(BaseMixin, db.Model):
__tablename__ = "pipeline_configs"

pipeline_id = db.Column(
db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True
)
config = db.Column(JSONB)
config_hash = db.Column(db.String, index=True)
pipeline = relationship(
"Pipeline", backref=backref("configs", passive_deletes=True)
)


class PipelineRun(BaseMixin, db.Model):
__tablename__ = "pipeline_runs"

pipeline_id = db.Column(
db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True
)
config_id = db.Column(
db.Text, db.ForeignKey("pipeline_configs.id", ondelete="CASCADE"), index=True
)
config = relationship(
"PipelineConfig", backref=backref("runs", passive_deletes=True)
)
run_index = db.Column(db.Integer())


class PipelineRunResult(BaseMixin, db.Model):
__tablename__ = "pipeline_run_results"

run_id = db.Column(
db.Text, db.ForeignKey("pipeline_runs.id", ondelete="CASCADE"), index=True
)
base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True)
date_executed = db.Column(db.DateTime(timezone=True))
data = db.Column(JSONB)
file_inputs = db.Column(JSONB)
run = relationship("PipelineRun", backref=backref("results", passive_deletes=True))


class PipelineRunResultVote(BaseMixin, db.Model):
__tablename__ = "pipeline_run_result_votes"

run_result_id = db.Column(
db.Text,
db.ForeignKey("pipeline_run_results.id", ondelete="CASCADE"),
index=True,
)
user_id = db.Column(db.Text, db.ForeignKey("users.external_id"), index=True)
accurate = db.Column(db.Boolean)
run_result = relationship(
"PipelineRunResult", backref=backref("votes", passive_deletes=True)
)
user = relationship("User", backref=backref("votes", passive_deletes=True))


# from . import event_listeners # noqa E402

# del event_listeners
2 changes: 1 addition & 1 deletion store/neurostore/openapi
Submodule openapi updated 1 files
+471 −0 neurostore-openapi.yml
13 changes: 13 additions & 0 deletions store/neurostore/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
UsersView,
)

from .pipeline import (
PipelinesView,
PipelineConfigsView,
PipelineRunsView,
PipelineRunResultsView,
PipelineRunResultVotesView,
)

__all__ = [
"StudysetsView",
"AnnotationsView",
Expand All @@ -27,4 +35,9 @@
"PointsView",
"PointValuesView",
"UsersView",
"PipelinesView",
"PipelineConfigsView",
"PipelineRunsView",
"PipelineRunResultsView",
"PipelineRunResultVotesView",
]
2 changes: 1 addition & 1 deletion store/neurostore/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ def search(self):
validate_search_query(s)
except errors.SyntaxError as e:
abort(400, description=e.args[0])
tsquery = func.to_tsquery('english', pubmed_to_tsquery(s))
tsquery = func.to_tsquery("english", pubmed_to_tsquery(s))
q = q.filter(m._ts_vector.op("@@")(tsquery))

# Alternatively (or in addition), search on individual fields.
Expand Down
Loading
Loading