From 5dc10a94359e7c339b02b44ae34f92b11e823747 Mon Sep 17 00:00:00 2001 From: Becky Sweger Date: Thu, 24 Oct 2024 14:35:30 -0400 Subject: [PATCH 1/8] Fix a rookie mistake --- src/cladetime/util/sequence.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cladetime/util/sequence.py b/src/cladetime/util/sequence.py index 85c45fa..ff88bb5 100644 --- a/src/cladetime/util/sequence.py +++ b/src/cladetime/util/sequence.py @@ -98,11 +98,11 @@ def _get_ncov_metadata( return metadata -def filter_covid_genome_metadata(metadata: pl.LazyFrame, cols: list = []) -> pl.LazyFrame: +def filter_covid_genome_metadata(metadata: pl.LazyFrame, cols: list | None = None) -> pl.LazyFrame: """Apply a standard set of filters to the GenBank genome metadata.""" # Default columns to include in the filtered metadata - if len(cols) == 0: + if not cols: cols = [ "clade_nextstrain", "country", From a8d38223c6ab6fcab39fb6e441a44e1f84aae3de Mon Sep 17 00:00:00 2001 From: Becky Sweger Date: Fri, 25 Oct 2024 11:43:36 -0400 Subject: [PATCH 2/8] Add paramater to specify state format in metadata "location" column This is a potential breaking change to anyone who was relying on full state name in the location field of sequence metadata datasets. The default is now the two letter state abbreviation, which is more commonly used in hub tasks.json files. --- src/cladetime/__init__.py | 4 +++ src/cladetime/_typing.py | 10 ------ src/cladetime/typing.py | 9 +++++ src/cladetime/util/sequence.py | 31 +++++++++++++--- tests/unit/util/test_sequence.py | 61 +++++++++++++++++++++++++++++--- 5 files changed, 97 insertions(+), 18 deletions(-) delete mode 100644 src/cladetime/_typing.py create mode 100644 src/cladetime/typing.py diff --git a/src/cladetime/__init__.py b/src/cladetime/__init__.py index 0822bea..d4bb66a 100644 --- a/src/cladetime/__init__.py +++ b/src/cladetime/__init__.py @@ -1,3 +1,4 @@ +import os import sys import structlog @@ -6,6 +7,9 @@ __all__ = ["CladeTime"] +# tells us package to consider DC a state +os.environ["DC_STATEHOOD"] = "1" + def setup_logging(): shared_processors = [ diff --git a/src/cladetime/_typing.py b/src/cladetime/_typing.py deleted file mode 100644 index 05ebc43..0000000 --- a/src/cladetime/_typing.py +++ /dev/null @@ -1,10 +0,0 @@ -"""Type aliases for this package.""" - -from pathlib import Path -from typing import TypeAlias, Union - -from cloudpathlib import AnyPath, CloudPath - -# Data types -# Pathlike: TypeAlias = Path | AnyPath | CloudPath -Pathlike: TypeAlias = Union["Path", "AnyPath", "CloudPath"] diff --git a/src/cladetime/typing.py b/src/cladetime/typing.py new file mode 100644 index 0000000..a98bcb1 --- /dev/null +++ b/src/cladetime/typing.py @@ -0,0 +1,9 @@ +"""Type aliases for this package.""" + +from enum import StrEnum + + +class StateFormat(StrEnum): + ABBR = "abbr" + NAME = "name" + FIPS = "fips" diff --git a/src/cladetime/util/sequence.py b/src/cladetime/util/sequence.py index ff88bb5..dad0ecc 100644 --- a/src/cladetime/util/sequence.py +++ b/src/cladetime/util/sequence.py @@ -10,7 +10,9 @@ import us from requests import Session -from cladetime.util.session import _get_session +from cladetime.typing import StateFormat +from cladetime.util.reference import _get_s3_object_url +from cladetime.util.session import _check_response, _get_session from cladetime.util.timing import time_function logger = structlog.get_logger() @@ -98,9 +100,14 @@ def _get_ncov_metadata( return metadata -def filter_covid_genome_metadata(metadata: pl.LazyFrame, cols: list | None = None) -> pl.LazyFrame: +def filter_covid_genome_metadata( + metadata: pl.LazyFrame, cols: list | None = None, state_format: StateFormat = StateFormat.ABBR +) -> pl.LazyFrame: """Apply a standard set of filters to the GenBank genome metadata.""" + if state_format not in StateFormat: + raise ValueError(f"Invalid state_format. Must be one of: {list(StateFormat.__members__.items())}") + # Default columns to include in the filtered metadata if not cols: cols = [ @@ -115,7 +122,7 @@ def filter_covid_genome_metadata(metadata: pl.LazyFrame, cols: list | None = Non # There are some other odd divisions in the data, but these are 50 states, DC and PR states = [state.name for state in us.states.STATES] - states.extend(["Washington DC", "Puerto Rico"]) + states.extend(["Washington DC", "District of Columbia", "Puerto Rico"]) # Filter dataset and do some general tidying filtered_metadata = ( @@ -125,7 +132,7 @@ def filter_covid_genome_metadata(metadata: pl.LazyFrame, cols: list | None = Non pl.col("division").is_in(states), pl.col("host") == "Homo sapiens", ) - .rename({"clade_nextstrain": "clade", "division": "location"}) + .rename({"clade_nextstrain": "clade"}) .cast({"date": pl.Date}, strict=False) # date filtering at the end ensures we filter out null # values created by the above .cast operation @@ -134,6 +141,22 @@ def filter_covid_genome_metadata(metadata: pl.LazyFrame, cols: list | None = Non ) ) + # Create state mappings based on state_format parameter, including a DC alias, since + # Nextrain's metadata uses a different name than the us package + if state_format == StateFormat.FIPS: + state_dict = {state.name: state.fips for state in us.states.STATES_AND_TERRITORIES} + state_dict["Washington DC"] = us.states.DC.fips + elif state_format == StateFormat.ABBR: + state_dict = {state.name: state.abbr for state in us.states.STATES_AND_TERRITORIES} + state_dict["Washington DC"] = us.states.DC.abbr + else: + state_dict = {state.name: state.name for state in us.states.STATES_AND_TERRITORIES} + state_dict["Washington DC"] = "Washington DC" + + filtered_metadata = filtered_metadata.with_columns(pl.col("division").replace(state_dict).alias("location")).drop( + "division" + ) + return filtered_metadata diff --git a/tests/unit/util/test_sequence.py b/tests/unit/util/test_sequence.py index 646250a..f35f802 100644 --- a/tests/unit/util/test_sequence.py +++ b/tests/unit/util/test_sequence.py @@ -3,6 +3,7 @@ import polars as pl import pytest +from cladetime.typing import StateFormat from cladetime.util.sequence import ( filter_covid_genome_metadata, get_covid_genome_metadata, @@ -78,7 +79,7 @@ def test_filter_covid_genome_metadata(): "Homo sapiens", ], "country": ["USA", "Argentina", "USA", "USA", "USA", "USA", "USA"], - "division": ["Alaska", "Maine", "Guam", "Puerto Rico", "Utah", "Pennsylvania", "Pennsylvania"], + "division": ["Alaska", "Maine", "Guam", "Puerto Rico", "Utah", "Washington DC", "Pennsylvania"], "clade_nextstrain": ["AAA", "BBB", "CCC", "DDD", "EEE", "FFF", "FFF"], "location": ["Vulcan", "Reisa", "Bajor", "Deep Space 9", "Earth", "Cardassia", "Cardassia"], "genbank_accession": ["A1", "A2", "B1", "B2", "C1", "C2", "C2"], @@ -87,9 +88,13 @@ def test_filter_covid_genome_metadata(): } lf_metadata = pl.LazyFrame(test_genome_metadata) - lf_filtered = filter_covid_genome_metadata(lf_metadata) + lf_filtered = filter_covid_genome_metadata(lf_metadata).collect() - assert len(lf_filtered.collect()) == 2 + assert len(lf_filtered) == 2 + + locations = lf_filtered["location"].to_list() + locations.sort() + assert locations == ["AK", "DC"] actual_schema = lf_filtered.collect_schema() expected_schema = pl.Schema( @@ -97,15 +102,63 @@ def test_filter_covid_genome_metadata(): "clade": pl.String, "country": pl.String, "date": pl.Date, - "location": pl.String, "genbank_accession": pl.String, "genbank_accession_rev": pl.String, "host": pl.String, + "location": pl.String, } ) assert actual_schema == expected_schema +def test_filter_covid_genome_metadata_state_name(): + num_test_rows = 4 + test_genome_metadata = { + "date": ["2022-01-01"] * num_test_rows, + "host": ["Homo sapiens"] * num_test_rows, + "country": ["USA"] * num_test_rows, + "clade_nextstrain": ["AAA"] * num_test_rows, + "location": ["Earth"] * num_test_rows, + "genbank_accession": ["A1"] * num_test_rows, + "genbank_accession_rev": ["A1.1"] * num_test_rows, + "division": ["Alaska", "Puerto Rico", "Washington DC", "Fake State"], + } + + lf_metadata = pl.LazyFrame(test_genome_metadata) + lf_filtered = filter_covid_genome_metadata(lf_metadata, state_format=StateFormat.NAME) + lf_filtered = lf_filtered.collect() + + # Un-mapped states are dropped from dataset + assert len(lf_filtered) == 3 + + locations = set(lf_filtered["location"].to_list()) + assert locations == {"Alaska", "Puerto Rico", "Washington DC"} + + +def test_filter_covid_genome_metadata_state_fips(): + num_test_rows = 4 + test_genome_metadata = { + "date": ["2022-01-01"] * num_test_rows, + "host": ["Homo sapiens"] * num_test_rows, + "country": ["USA"] * num_test_rows, + "clade_nextstrain": ["AAA"] * num_test_rows, + "location": ["Earth"] * num_test_rows, + "genbank_accession": ["A1"] * num_test_rows, + "genbank_accession_rev": ["A1.1"] * num_test_rows, + "division": ["Massachusetts", "Puerto Rico", "Washington DC", "Fake State"], + } + + lf_metadata = pl.LazyFrame(test_genome_metadata) + lf_filtered = filter_covid_genome_metadata(lf_metadata, state_format=StateFormat.FIPS) + lf_filtered = lf_filtered.collect() + + # Un-mapped states are dropped from dataset + assert len(lf_filtered) == 3 + + locations = set(lf_filtered["location"].to_list()) + assert locations == {"11", "25", "72"} + + def test_parse_sequence_assignments(df_assignments): result = parse_sequence_assignments(df_assignments) From 877407889f53616068237814ef5109465f2a5028 Mon Sep 17 00:00:00 2001 From: Becky Sweger Date: Sat, 26 Oct 2024 19:46:46 -0400 Subject: [PATCH 3/8] Rename typing --- src/cladetime/types.py | 14 ++++++++++++++ src/cladetime/typing.py | 9 --------- src/cladetime/util/sequence.py | 2 +- tests/unit/util/test_sequence.py | 2 +- 4 files changed, 16 insertions(+), 11 deletions(-) create mode 100644 src/cladetime/types.py delete mode 100644 src/cladetime/typing.py diff --git a/src/cladetime/types.py b/src/cladetime/types.py new file mode 100644 index 0000000..b6272d5 --- /dev/null +++ b/src/cladetime/types.py @@ -0,0 +1,14 @@ +"""Type aliases for this package.""" + +from enum import StrEnum + + +class StateFormat(StrEnum): + """Options for formatting state names in sequence metadata""" + + ABBR = "abbr" + """Format states as two-letter abbreviations""" + NAME = "name" + """Format states as full names""" + FIPS = "fips" + """Format states as FIPS codes""" diff --git a/src/cladetime/typing.py b/src/cladetime/typing.py deleted file mode 100644 index a98bcb1..0000000 --- a/src/cladetime/typing.py +++ /dev/null @@ -1,9 +0,0 @@ -"""Type aliases for this package.""" - -from enum import StrEnum - - -class StateFormat(StrEnum): - ABBR = "abbr" - NAME = "name" - FIPS = "fips" diff --git a/src/cladetime/util/sequence.py b/src/cladetime/util/sequence.py index dad0ecc..13b697b 100644 --- a/src/cladetime/util/sequence.py +++ b/src/cladetime/util/sequence.py @@ -10,7 +10,7 @@ import us from requests import Session -from cladetime.typing import StateFormat +from cladetime.types import StateFormat from cladetime.util.reference import _get_s3_object_url from cladetime.util.session import _check_response, _get_session from cladetime.util.timing import time_function diff --git a/tests/unit/util/test_sequence.py b/tests/unit/util/test_sequence.py index f35f802..62f06f1 100644 --- a/tests/unit/util/test_sequence.py +++ b/tests/unit/util/test_sequence.py @@ -3,7 +3,7 @@ import polars as pl import pytest -from cladetime.typing import StateFormat +from cladetime.types import StateFormat from cladetime.util.sequence import ( filter_covid_genome_metadata, get_covid_genome_metadata, From d42d58db966bc24a9f014cee48e792993d11d706 Mon Sep 17 00:00:00 2001 From: Becky Sweger Date: Sat, 26 Oct 2024 19:48:08 -0400 Subject: [PATCH 4/8] Update documentation --- docs/conf.py | 2 +- .../filter_covid_genome_metadata.rst | 6 ++ docs/reference/index.rst | 3 + docs/reference/types.rst | 7 ++ docs/user-guide.rst | 4 +- src/cladetime/util/sequence.py | 70 ++++++++++++++++++- 6 files changed, 87 insertions(+), 5 deletions(-) create mode 100644 docs/reference/filter_covid_genome_metadata.rst create mode 100644 docs/reference/types.rst diff --git a/docs/conf.py b/docs/conf.py index e969f1f..d3d0134 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -117,7 +117,7 @@ pygments_style = "friendly" # Show typehints as content of the function or method -autodoc_typehints = "description" +autodoc_typehints = "signature" autodoc_member_order = "bysource" # Open Graph metadata diff --git a/docs/reference/filter_covid_genome_metadata.rst b/docs/reference/filter_covid_genome_metadata.rst new file mode 100644 index 0000000..d52c80f --- /dev/null +++ b/docs/reference/filter_covid_genome_metadata.rst @@ -0,0 +1,6 @@ +==================================================== +cladetime.util.sequence.filter_covid_genome_metadata +==================================================== + +.. autofunction:: cladetime.util.sequence.filter_covid_genome_metadata + diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 7660209..b1ea9ff 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -4,3 +4,6 @@ API Reference .. toctree:: cladetime + filter_covid_genome_metadata + types + diff --git a/docs/reference/types.rst b/docs/reference/types.rst new file mode 100644 index 0000000..75f8178 --- /dev/null +++ b/docs/reference/types.rst @@ -0,0 +1,7 @@ +===== +Types +===== + + +.. autoclass:: cladetime.types.StateFormat + :members: diff --git a/docs/user-guide.rst b/docs/user-guide.rst index fb1f6aa..e02f56e 100644 --- a/docs/user-guide.rst +++ b/docs/user-guide.rst @@ -1,7 +1,5 @@ -=============== User Guide -=============== - +=========== Finding Nextstrain SARS-CoV-2 sequences and sequence metadata diff --git a/src/cladetime/util/sequence.py b/src/cladetime/util/sequence.py index 13b697b..708d0aa 100644 --- a/src/cladetime/util/sequence.py +++ b/src/cladetime/util/sequence.py @@ -103,8 +103,76 @@ def _get_ncov_metadata( def filter_covid_genome_metadata( metadata: pl.LazyFrame, cols: list | None = None, state_format: StateFormat = StateFormat.ABBR ) -> pl.LazyFrame: - """Apply a standard set of filters to the GenBank genome metadata.""" + """Apply standard filters to Nextstrain's SARS-CoV-2 sequence metadata. + A helper function to apply commonly-used filters to a Polars LazyFrame + that represents Nextstrain's SARS-CoV-2 sequence metadata. It filters + on human sequences from the United States (including Puerto Rico and + Washington, DC). + + This function also performs small transformations to the metadata, + such as casting the collection date to a date type, renaming columns, + and returning alternate state formats if requested. + + Parameters + ---------- + metadata : :class:`polars.LazyFrame` + The :attr:`cladetime.CladeTime.url_sequence_metadata` + attribute of a :class:`cladetime.CladeTime` object. This parameter + represents SARS-CoV-2 sequence metadata produced by Nextstrain + as an intermediate file in their daily workflow + cols : list + Optional. A list of columns to include in the filtered metadata. + The default columns included in the filtered metadata are: + clade_nextstrain, country, date, division, genbank_accession, + genbank_accession_rev, host + state_format : :class:`cladetime.types.StateFormat` + Optional. The state name format returned in the filtered metadata's + location column. Defaults to `StateFormat.ABBR` + + Returns + ------- + :class:`polars.LazyFrame` + A Polars LazyFrame that represents the filtered SARS-CoV-2 sequence + metadata. + + Raises + ------ + ValueError + If the state_format parameter is not a valid + :class:`cladetime.types.StateFormat`. + + Notes + ----- + This function will filter out metadata rows with invalid state names or + date strings that cannot be cast to a Polars date format. + + Example: + -------- + >>> from cladetime import CladeTime + >>> from cladetime.util.sequence import filter_covid_genome_metadata + + Apply common filters to the sequence metadata of a CladeTime object: + + >>> ct = CladeTime(seq_as_of="2024-10-15") + >>> ct = CladeTime(sequence_as_of="2024-10-15") + >>> filtered_metadata = filter_covid_genome_metadata(ct.sequence_metadata) + >>> filtered_metadata.collect().head(5) + shape: (5, 7) + ┌───────┬─────────┬────────────┬────────────┬────────────┬──────────────┬──────┬ + │ clade ┆ country ┆ date ┆ genbank_ ┆ genbank_ac ┆ host ┆ loca │ + │ ┆ ┆ ┆ accession ┆ cession_rev┆ ┆ tion │ + │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ + │ str ┆ str ┆ date ┆ str ┆ str ┆ str ┆ str │ + │ ┆ ┆ ┆ ┆ ┆ ┆ │ + ╞═══════╪═════════╪════════════╪════════════╪════════════╪══════════════╪══════╡ + │ 22A ┆ USA ┆ 2022-07-07 ┆ PP223234 ┆ PP223234.1 ┆ Homo sapiens ┆ AL │ + │ 22B ┆ USA ┆ 2022-07-02 ┆ PP223435 ┆ PP223435.1 ┆ Homo sapiens ┆ AZ │ + │ 22B ┆ USA ┆ 2022-07-19 ┆ PP223235 ┆ PP223235.1 ┆ Homo sapiens ┆ AZ │ + │ 22B ┆ USA ┆ 2022-07-15 ┆ PP223236 ┆ PP223236.1 ┆ Homo sapiens ┆ AZ │ + │ 22B ┆ USA ┆ 2022-07-20 ┆ PP223237 ┆ PP223237.1 ┆ Homo sapiens ┆ AZ │ + └───────┴─────────┴────────────┴────────────┴────────────┴─────────────────────┴ + """ if state_format not in StateFormat: raise ValueError(f"Invalid state_format. Must be one of: {list(StateFormat.__members__.items())}") From 629b5325ea30c82b8abc1e6ae69b8f02a4820e58 Mon Sep 17 00:00:00 2001 From: Becky Sweger Date: Sat, 26 Oct 2024 20:39:45 -0400 Subject: [PATCH 5/8] Move sequence file and rename filter function Documenting the filter_covid_genome_metadata function made it clear that its name should better align with the CladeTime vernacular ('sequence metadata'). Additionally, it makes more sense to move sequence.py out of the utility folder, since sequence-related functions are integral to the package. util/sequence.py remains in the code base for now, so we can import the filtering function with its old name for backward compatibility. --- .../filter_covid_genome_metadata.rst | 6 - docs/reference/index.rst | 2 +- docs/reference/sequence.rst | 6 + docs/reference/types.rst | 2 +- src/cladetime/assign_clades.py | 7 +- src/cladetime/cladetime.py | 2 +- src/cladetime/sequence.py | 331 ++++++++++++++++++ src/cladetime/util/sequence.py | 266 +------------- tests/unit/util/test_sequence.py | 30 +- 9 files changed, 356 insertions(+), 296 deletions(-) delete mode 100644 docs/reference/filter_covid_genome_metadata.rst create mode 100644 docs/reference/sequence.rst create mode 100644 src/cladetime/sequence.py diff --git a/docs/reference/filter_covid_genome_metadata.rst b/docs/reference/filter_covid_genome_metadata.rst deleted file mode 100644 index d52c80f..0000000 --- a/docs/reference/filter_covid_genome_metadata.rst +++ /dev/null @@ -1,6 +0,0 @@ -==================================================== -cladetime.util.sequence.filter_covid_genome_metadata -==================================================== - -.. autofunction:: cladetime.util.sequence.filter_covid_genome_metadata - diff --git a/docs/reference/index.rst b/docs/reference/index.rst index b1ea9ff..2ddb1d7 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -4,6 +4,6 @@ API Reference .. toctree:: cladetime - filter_covid_genome_metadata + sequence types diff --git a/docs/reference/sequence.rst b/docs/reference/sequence.rst new file mode 100644 index 0000000..49f4800 --- /dev/null +++ b/docs/reference/sequence.rst @@ -0,0 +1,6 @@ +========= +sequence +========= + +.. autofunction:: cladetime.sequence.filter_sequence_metadata + diff --git a/docs/reference/types.rst b/docs/reference/types.rst index 75f8178..a577dd1 100644 --- a/docs/reference/types.rst +++ b/docs/reference/types.rst @@ -1,5 +1,5 @@ ===== -Types +types ===== diff --git a/src/cladetime/assign_clades.py b/src/cladetime/assign_clades.py index 0f26d40..4bbfdff 100644 --- a/src/cladetime/assign_clades.py +++ b/src/cladetime/assign_clades.py @@ -11,8 +11,11 @@ import structlog from cladetime import CladeTime +from cladetime.sequence import ( + get_covid_genome_data, +) from cladetime.util.config import Config -from cladetime.util.sequence import _download_from_url, filter_covid_genome_metadata +from cladetime.util.reference import _download_from_url from cladetime.util.session import _get_session from cladetime.util.timing import time_function @@ -60,7 +63,7 @@ def get_sequence_metadata(metadata: pl.DataFrame, sequence_collection_date: date ] # clean and filter metadata (same process used to generate the weekly clade list) - filtered_metadata = filter_covid_genome_metadata(metadata, cols) + filtered_metadata = get_covid_genome_data(metadata, cols) # add filters based on user input filtered_metadata = filtered_metadata.filter(pl.col("date") >= sequence_collection_date) diff --git a/src/cladetime/cladetime.py b/src/cladetime/cladetime.py index d54bb87..ee6ded3 100644 --- a/src/cladetime/cladetime.py +++ b/src/cladetime/cladetime.py @@ -7,9 +7,9 @@ import structlog from cladetime.exceptions import CladeTimeFutureDateWarning, CladeTimeInvalidDateError, CladeTimeInvalidURLError +from cladetime.sequence import _get_ncov_metadata, get_covid_genome_metadata from cladetime.util.config import Config from cladetime.util.reference import _get_s3_object_url -from cladetime.util.sequence import _get_ncov_metadata, get_covid_genome_metadata logger = structlog.get_logger() diff --git a/src/cladetime/sequence.py b/src/cladetime/sequence.py new file mode 100644 index 0000000..83a5be3 --- /dev/null +++ b/src/cladetime/sequence.py @@ -0,0 +1,331 @@ +"""Functions for retrieving and parsing SARS-CoV-2 virus genome data.""" + +import json +import lzma +import zipfile +from datetime import datetime, timezone +from pathlib import Path + +import polars as pl +import structlog +import us +from requests import Session + +from cladetime.types import StateFormat +from cladetime.util.reference import _get_s3_object_url +from cladetime.util.session import _check_response, _get_session +from cladetime.util.timing import time_function + +logger = structlog.get_logger() + + +@time_function +def get_covid_genome_data(released_since_date: str, base_url: str, filename: str): + """ + Download genome data package from NCBI. + FIXME: Download the Nextclade-processed GenBank sequence data (which originates from NCBI) + from https://data.nextstrain.org/files/ncov/open/sequences.fasta.zst instead of using + the NCBI API. + """ + headers = { + "Accept": "application/zip", + } + session = _get_session() + session.headers.update(headers) + + # TODO: this might be a better as an item in the forthcoming config file + request_body = { + "released_since": released_since_date, + "taxon": "SARS-CoV-2", + "refseq_only": False, + "annotated_only": False, + "host": "Homo sapiens", + "complete_only": False, + "table_fields": ["unspecified"], + "include_sequence": ["GENOME"], + "aux_report": ["DATASET_REPORT"], + "format": "tsv", + "use_psg": False, + } + + logger.info("NCBI API call starting", released_since_date=released_since_date) + + response = session.post(base_url, data=json.dumps(request_body), timeout=(300, 300)) + _check_response(response) + + # Originally tried saving the NCBI package via a stream call and iter_content (to prevent potential + # memory issues that can arise when download large files). However, ran into an intermittent error: + # ChunkedEncodingError(ProtocolError('Response ended prematurely'). + # We may need to revisit this at some point, depending on how much data we place to request via the + # API and what kind of machine the pipeline will run on. + with open(filename, "wb") as f: + f.write(response.content) + + +@time_function +def download_covid_genome_metadata( + session: Session, bucket: str, key: str, data_path: Path, as_of: str | None = None, use_existing: bool = False +) -> Path: + """Download the latest GenBank genome metadata data from Nextstrain.""" + + if as_of is None: + as_of_datetime = datetime.now().replace(tzinfo=timezone.utc) + else: + as_of_datetime = datetime.strptime(as_of, "%Y-%m-%d").replace(tzinfo=timezone.utc) + + (s3_version, s3_url) = _get_s3_object_url(bucket, key, as_of_datetime) + filename = data_path / f"{as_of_datetime.date().strftime('%Y-%m-%d')}-{Path(key).name}" + + if use_existing and filename.exists(): + logger.info("using existing genome metadata file", metadata_file=str(filename)) + return filename + + logger.info("starting genome metadata download", source=s3_url, destination=str(filename)) + with session.get(s3_url, stream=True) as result: + result.raise_for_status() + with open(filename, "wb") as f: + for chunk in result.iter_content(chunk_size=None): + f.write(chunk) + + return filename + + +def get_covid_genome_metadata( + metadata_path: Path | None = None, metadata_url: str | None = None, num_rows: int | None = None +) -> pl.LazyFrame: + """ + Read GenBank genome metadata into a Polars LazyFrame. + + Parameters + ---------- + metadata_path : Path | None + Path to location of a NextStrain GenBank genome metadata file. + Cannot be used with metadata_url. + metadata_url: str | None + URL to a NextStrain GenBank genome metadata file. + Cannot be used with metadata_path. + num_rows : int | None, default = None + The number of genome metadata rows to request. + When not supplied, request all rows. + """ + + path_flag = metadata_path is not None + url_flag = metadata_url is not None + + assert path_flag + url_flag == 1, "Specify metadata_path or metadata_url, but not both." + + if metadata_url: + metadata = pl.scan_csv(metadata_url, separator="\t", n_rows=num_rows) + return metadata + + if metadata_path: + if (compression_type := metadata_path.suffix) in [".tsv", ".zst"]: + metadata = pl.scan_csv(metadata_path, separator="\t", n_rows=num_rows) + elif compression_type == ".xz": + metadata = pl.read_csv( + lzma.open(metadata_path), separator="\t", n_rows=num_rows, infer_schema_length=100000 + ).lazy() + + return metadata + + +def _get_ncov_metadata( + url_ncov_metadata: str, + session: Session | None = None, +) -> dict: + """Return metadata emitted by the Nextstrain ncov pipeline.""" + if not session: + session = _get_session(retry=False) + + response = session.get(url_ncov_metadata) + if not response.ok: + logger.warn( + "Failed to retrieve ncov metadata", + status_code=response.status_code, + response_text=response.text, + request=response.request.url, + request_body=response.request.body, + ) + return {} + + metadata = response.json() + if metadata.get("nextclade_dataset_name", "").lower() == "sars-cov-2": + metadata["nextclade_dataset_name_full"] = "nextstrain/sars-cov-2/wuhan-hu-1/orfs" + + return metadata + + +def filter_sequence_metadata( + metadata: pl.LazyFrame, cols: list | None = None, state_format: StateFormat = StateFormat.ABBR +) -> pl.LazyFrame: + """Apply standard filters to Nextstrain's SARS-CoV-2 sequence metadata. + + A helper function to apply commonly-used filters to a Polars LazyFrame + that represents Nextstrain's SARS-CoV-2 sequence metadata. It filters + on human sequences from the United States (including Puerto Rico and + Washington, DC). + + This function also performs small transformations to the metadata, + such as casting the collection date to a date type, renaming columns, + and returning alternate state formats if requested. + + Parameters + ---------- + metadata : :class:`polars.LazyFrame` + The :attr:`cladetime.CladeTime.url_sequence_metadata` + attribute of a :class:`cladetime.CladeTime` object. This parameter + represents SARS-CoV-2 sequence metadata produced by Nextstrain + as an intermediate file in their daily workflow + cols : list + Optional. A list of columns to include in the filtered metadata. + The default columns included in the filtered metadata are: + clade_nextstrain, country, date, division, genbank_accession, + genbank_accession_rev, host + state_format : :class:`cladetime.types.StateFormat` + Optional. The state name format returned in the filtered metadata's + location column. Defaults to `StateFormat.ABBR` + + Returns + ------- + :class:`polars.LazyFrame` + A Polars LazyFrame that represents the filtered SARS-CoV-2 sequence + metadata. + + Raises + ------ + ValueError + If the state_format parameter is not a valid + :class:`cladetime.types.StateFormat`. + + Notes + ----- + This function will filter out metadata rows with invalid state names or + date strings that cannot be cast to a Polars date format. + + Example: + -------- + >>> from cladetime import CladeTime + >>> from cladetime.sequence import filter_covid_genome_metadata + + Apply common filters to the sequence metadata of a CladeTime object: + + >>> ct = CladeTime(seq_as_of="2024-10-15") + >>> ct = CladeTime(sequence_as_of="2024-10-15") + >>> filtered_metadata = filter_covid_genome_metadata(ct.sequence_metadata) + >>> filtered_metadata.collect().head(5) + shape: (5, 7) + ┌───────┬─────────┬────────────┬────────────┬────────────┬──────────────┬──────┬ + │ clade ┆ country ┆ date ┆ genbank_ ┆ genbank_ac ┆ host ┆ loca │ + │ ┆ ┆ ┆ accession ┆ cession_rev┆ ┆ tion │ + │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ + │ str ┆ str ┆ date ┆ str ┆ str ┆ str ┆ str │ + │ ┆ ┆ ┆ ┆ ┆ ┆ │ + ╞═══════╪═════════╪════════════╪════════════╪════════════╪══════════════╪══════╡ + │ 22A ┆ USA ┆ 2022-07-07 ┆ PP223234 ┆ PP223234.1 ┆ Homo sapiens ┆ AL │ + │ 22B ┆ USA ┆ 2022-07-02 ┆ PP223435 ┆ PP223435.1 ┆ Homo sapiens ┆ AZ │ + │ 22B ┆ USA ┆ 2022-07-19 ┆ PP223235 ┆ PP223235.1 ┆ Homo sapiens ┆ AZ │ + │ 22B ┆ USA ┆ 2022-07-15 ┆ PP223236 ┆ PP223236.1 ┆ Homo sapiens ┆ AZ │ + │ 22B ┆ USA ┆ 2022-07-20 ┆ PP223237 ┆ PP223237.1 ┆ Homo sapiens ┆ AZ │ + └───────┴─────────┴────────────┴────────────┴────────────┴─────────────────────┴ + """ + if state_format not in StateFormat: + raise ValueError(f"Invalid state_format. Must be one of: {list(StateFormat.__members__.items())}") + + # Default columns to include in the filtered metadata + if not cols: + cols = [ + "clade_nextstrain", + "country", + "date", + "division", + "genbank_accession", + "genbank_accession_rev", + "host", + ] + + # There are some other odd divisions in the data, but these are 50 states, DC and PR + states = [state.name for state in us.states.STATES] + states.extend(["Washington DC", "District of Columbia", "Puerto Rico"]) + + # Filter dataset and do some general tidying + filtered_metadata = ( + metadata.select(cols) + .filter( + pl.col("country") == "USA", + pl.col("division").is_in(states), + pl.col("host") == "Homo sapiens", + ) + .rename({"clade_nextstrain": "clade"}) + .cast({"date": pl.Date}, strict=False) + # date filtering at the end ensures we filter out null + # values created by the above .cast operation + .filter( + pl.col("date").is_not_null(), + ) + ) + + # Create state mappings based on state_format parameter, including a DC alias, since + # Nextrain's metadata uses a different name than the us package + if state_format == StateFormat.FIPS: + state_dict = {state.name: state.fips for state in us.states.STATES_AND_TERRITORIES} + state_dict["Washington DC"] = us.states.DC.fips + elif state_format == StateFormat.ABBR: + state_dict = {state.name: state.abbr for state in us.states.STATES_AND_TERRITORIES} + state_dict["Washington DC"] = us.states.DC.abbr + else: + state_dict = {state.name: state.name for state in us.states.STATES_AND_TERRITORIES} + state_dict["Washington DC"] = "Washington DC" + + filtered_metadata = filtered_metadata.with_columns(pl.col("division").replace(state_dict).alias("location")).drop( + "division" + ) + + return filtered_metadata + + +def get_clade_counts(filtered_metadata: pl.LazyFrame) -> pl.LazyFrame: + """Return a count of clades by location and date.""" + + cols = [ + "clade", + "country", + "date", + "location", + "host", + ] + + counts = filtered_metadata.select(cols).group_by("location", "date", "clade").agg(pl.len().alias("count")) + + return counts + + +def _unzip_sequence_package(filename: Path, data_path: Path): + """Unzip the downloaded virus genome data package.""" + with zipfile.ZipFile(filename, "r") as package_zip: + zip_contents = package_zip.namelist() + is_metadata = next((s for s in zip_contents if "data_report" in s), None) + is_sequence = next((s for s in zip_contents if "genomic" in s), None) + if is_metadata and is_sequence: + package_zip.extractall(data_path) + else: + logger.error("NCBI package is missing expected files", zip_contents=zip_contents) + # Exit the pipeline without displaying a traceback + raise SystemExit("Error downloading NCBI package") + + +def parse_sequence_assignments(df_assignments: pl.DataFrame) -> pl.DataFrame: + """Parse out the sequence number from the seqName column returned by the clade assignment tool.""" + + # polars apparently can't split out the sequence number from that big name column + # without resorting an apply, so here we're dropping into pandas to do that + # (might be a premature optimization, since this manoever requires both pandas and pyarrow) + seq = pl.from_pandas(df_assignments.to_pandas()["seqName"].str.split(" ").str[0].rename("seq")) + + # we're expecting one row per sequence + if seq.n_unique() != df_assignments.shape[0]: + raise ValueError("Clade assignment data contains duplicate sequence. Stopping assignment process.") + + # add the parsed sequence number as a new column + df_assignments = df_assignments.insert_column(1, seq) # type: ignore + + return df_assignments diff --git a/src/cladetime/util/sequence.py b/src/cladetime/util/sequence.py index 708d0aa..71b55e3 100644 --- a/src/cladetime/util/sequence.py +++ b/src/cladetime/util/sequence.py @@ -1,262 +1,6 @@ -"""Functions for retrieving and parsing SARS-CoV-2 virus genome data.""" +"""cladetime.util.sequence moved to cladetime.sequence.""" -import lzma -import os -from pathlib import Path -from urllib.parse import urlparse - -import polars as pl -import structlog -import us -from requests import Session - -from cladetime.types import StateFormat -from cladetime.util.reference import _get_s3_object_url -from cladetime.util.session import _check_response, _get_session -from cladetime.util.timing import time_function - -logger = structlog.get_logger() - - -@time_function -def _download_from_url(session: Session, url: str, data_path: Path) -> Path: - """Download a file from the specified URL and save it to data_path.""" - - parsed_url = urlparse(url) - url_filename = os.path.basename(parsed_url.path) - filename = data_path / url_filename - - with session.get(url, stream=True) as result: - result.raise_for_status() - with open(filename, "wb") as f: - for chunk in result.iter_content(chunk_size=None): - f.write(chunk) - - return filename - - -def get_covid_genome_metadata( - metadata_path: Path | None = None, metadata_url: str | None = None, num_rows: int | None = None -) -> pl.LazyFrame: - """ - Read GenBank genome metadata into a Polars LazyFrame. - - Parameters - ---------- - metadata_path : Path | None - Path to location of a NextStrain GenBank genome metadata file. - Cannot be used with metadata_url. - metadata_url: str | None - URL to a NextStrain GenBank genome metadata file. - Cannot be used with metadata_path. - num_rows : int | None, default = None - The number of genome metadata rows to request. - When not supplied, request all rows. - """ - - path_flag = metadata_path is not None - url_flag = metadata_url is not None - - assert path_flag + url_flag == 1, "Specify metadata_path or metadata_url, but not both." - - if metadata_url: - metadata = pl.scan_csv(metadata_url, separator="\t", n_rows=num_rows) - return metadata - - if metadata_path: - if (compression_type := metadata_path.suffix) in [".tsv", ".zst"]: - metadata = pl.scan_csv(metadata_path, separator="\t", n_rows=num_rows) - elif compression_type == ".xz": - metadata = pl.read_csv( - lzma.open(metadata_path), separator="\t", n_rows=num_rows, infer_schema_length=100000 - ).lazy() - - return metadata - - -def _get_ncov_metadata( - url_ncov_metadata: str, - session: Session | None = None, -) -> dict: - """Return metadata emitted by the Nextstrain ncov pipeline.""" - if not session: - session = _get_session(retry=False) - - response = session.get(url_ncov_metadata) - if not response.ok: - logger.warn( - "Failed to retrieve ncov metadata", - status_code=response.status_code, - response_text=response.text, - request=response.request.url, - request_body=response.request.body, - ) - return {} - - metadata = response.json() - if metadata.get("nextclade_dataset_name", "").lower() == "sars-cov-2": - metadata["nextclade_dataset_name_full"] = "nextstrain/sars-cov-2/wuhan-hu-1/orfs" - - return metadata - - -def filter_covid_genome_metadata( - metadata: pl.LazyFrame, cols: list | None = None, state_format: StateFormat = StateFormat.ABBR -) -> pl.LazyFrame: - """Apply standard filters to Nextstrain's SARS-CoV-2 sequence metadata. - - A helper function to apply commonly-used filters to a Polars LazyFrame - that represents Nextstrain's SARS-CoV-2 sequence metadata. It filters - on human sequences from the United States (including Puerto Rico and - Washington, DC). - - This function also performs small transformations to the metadata, - such as casting the collection date to a date type, renaming columns, - and returning alternate state formats if requested. - - Parameters - ---------- - metadata : :class:`polars.LazyFrame` - The :attr:`cladetime.CladeTime.url_sequence_metadata` - attribute of a :class:`cladetime.CladeTime` object. This parameter - represents SARS-CoV-2 sequence metadata produced by Nextstrain - as an intermediate file in their daily workflow - cols : list - Optional. A list of columns to include in the filtered metadata. - The default columns included in the filtered metadata are: - clade_nextstrain, country, date, division, genbank_accession, - genbank_accession_rev, host - state_format : :class:`cladetime.types.StateFormat` - Optional. The state name format returned in the filtered metadata's - location column. Defaults to `StateFormat.ABBR` - - Returns - ------- - :class:`polars.LazyFrame` - A Polars LazyFrame that represents the filtered SARS-CoV-2 sequence - metadata. - - Raises - ------ - ValueError - If the state_format parameter is not a valid - :class:`cladetime.types.StateFormat`. - - Notes - ----- - This function will filter out metadata rows with invalid state names or - date strings that cannot be cast to a Polars date format. - - Example: - -------- - >>> from cladetime import CladeTime - >>> from cladetime.util.sequence import filter_covid_genome_metadata - - Apply common filters to the sequence metadata of a CladeTime object: - - >>> ct = CladeTime(seq_as_of="2024-10-15") - >>> ct = CladeTime(sequence_as_of="2024-10-15") - >>> filtered_metadata = filter_covid_genome_metadata(ct.sequence_metadata) - >>> filtered_metadata.collect().head(5) - shape: (5, 7) - ┌───────┬─────────┬────────────┬────────────┬────────────┬──────────────┬──────┬ - │ clade ┆ country ┆ date ┆ genbank_ ┆ genbank_ac ┆ host ┆ loca │ - │ ┆ ┆ ┆ accession ┆ cession_rev┆ ┆ tion │ - │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ - │ str ┆ str ┆ date ┆ str ┆ str ┆ str ┆ str │ - │ ┆ ┆ ┆ ┆ ┆ ┆ │ - ╞═══════╪═════════╪════════════╪════════════╪════════════╪══════════════╪══════╡ - │ 22A ┆ USA ┆ 2022-07-07 ┆ PP223234 ┆ PP223234.1 ┆ Homo sapiens ┆ AL │ - │ 22B ┆ USA ┆ 2022-07-02 ┆ PP223435 ┆ PP223435.1 ┆ Homo sapiens ┆ AZ │ - │ 22B ┆ USA ┆ 2022-07-19 ┆ PP223235 ┆ PP223235.1 ┆ Homo sapiens ┆ AZ │ - │ 22B ┆ USA ┆ 2022-07-15 ┆ PP223236 ┆ PP223236.1 ┆ Homo sapiens ┆ AZ │ - │ 22B ┆ USA ┆ 2022-07-20 ┆ PP223237 ┆ PP223237.1 ┆ Homo sapiens ┆ AZ │ - └───────┴─────────┴────────────┴────────────┴────────────┴─────────────────────┴ - """ - if state_format not in StateFormat: - raise ValueError(f"Invalid state_format. Must be one of: {list(StateFormat.__members__.items())}") - - # Default columns to include in the filtered metadata - if not cols: - cols = [ - "clade_nextstrain", - "country", - "date", - "division", - "genbank_accession", - "genbank_accession_rev", - "host", - ] - - # There are some other odd divisions in the data, but these are 50 states, DC and PR - states = [state.name for state in us.states.STATES] - states.extend(["Washington DC", "District of Columbia", "Puerto Rico"]) - - # Filter dataset and do some general tidying - filtered_metadata = ( - metadata.select(cols) - .filter( - pl.col("country") == "USA", - pl.col("division").is_in(states), - pl.col("host") == "Homo sapiens", - ) - .rename({"clade_nextstrain": "clade"}) - .cast({"date": pl.Date}, strict=False) - # date filtering at the end ensures we filter out null - # values created by the above .cast operation - .filter( - pl.col("date").is_not_null(), - ) - ) - - # Create state mappings based on state_format parameter, including a DC alias, since - # Nextrain's metadata uses a different name than the us package - if state_format == StateFormat.FIPS: - state_dict = {state.name: state.fips for state in us.states.STATES_AND_TERRITORIES} - state_dict["Washington DC"] = us.states.DC.fips - elif state_format == StateFormat.ABBR: - state_dict = {state.name: state.abbr for state in us.states.STATES_AND_TERRITORIES} - state_dict["Washington DC"] = us.states.DC.abbr - else: - state_dict = {state.name: state.name for state in us.states.STATES_AND_TERRITORIES} - state_dict["Washington DC"] = "Washington DC" - - filtered_metadata = filtered_metadata.with_columns(pl.col("division").replace(state_dict).alias("location")).drop( - "division" - ) - - return filtered_metadata - - -def get_clade_counts(filtered_metadata: pl.LazyFrame) -> pl.LazyFrame: - """Return a count of clades by location and date.""" - - cols = [ - "clade", - "country", - "date", - "location", - "host", - ] - - counts = filtered_metadata.select(cols).group_by("location", "date", "clade").agg(pl.len().alias("count")) - - return counts - - -def parse_sequence_assignments(df_assignments: pl.DataFrame) -> pl.DataFrame: - """Parse out the sequence number from the seqName column returned by the clade assignment tool.""" - - # polars apparently can't split out the sequence number from that big name column - # without resorting an apply, so here we're dropping into pandas to do that - # (might be a premature optimization, since this manoever requires both pandas and pyarrow) - seq = pl.from_pandas(df_assignments.to_pandas()["seqName"].str.split(" ").str[0].rename("seq")) - - # we're expecting one row per sequence - if seq.n_unique() != df_assignments.shape[0]: - raise ValueError("Clade assignment data contains duplicate sequence. Stopping assignment process.") - - # add the parsed sequence number as a new column - df_assignments = df_assignments.insert_column(1, seq) # type: ignore - - return df_assignments +# For temporary backwards compatibility +from cladetime.sequence import _get_ncov_metadata as _get_ncov_metadata # noqa: F401 +from cladetime.sequence import filter_sequence_metadata as filter_covid_genome_metadata # noqa: F401 +from cladetime.sequence import get_clade_counts as get_clade_counts diff --git a/tests/unit/util/test_sequence.py b/tests/unit/util/test_sequence.py index 62f06f1..bc1bdf6 100644 --- a/tests/unit/util/test_sequence.py +++ b/tests/unit/util/test_sequence.py @@ -3,12 +3,11 @@ import polars as pl import pytest -from cladetime.types import StateFormat -from cladetime.util.sequence import ( - filter_covid_genome_metadata, +from cladetime.sequence import ( + filter_sequence_metadata, get_covid_genome_metadata, - parse_sequence_assignments, ) +from cladetime.types import StateFormat @pytest.fixture @@ -88,7 +87,7 @@ def test_filter_covid_genome_metadata(): } lf_metadata = pl.LazyFrame(test_genome_metadata) - lf_filtered = filter_covid_genome_metadata(lf_metadata).collect() + lf_filtered = filter_sequence_metadata(lf_metadata).collect() assert len(lf_filtered) == 2 @@ -125,7 +124,7 @@ def test_filter_covid_genome_metadata_state_name(): } lf_metadata = pl.LazyFrame(test_genome_metadata) - lf_filtered = filter_covid_genome_metadata(lf_metadata, state_format=StateFormat.NAME) + lf_filtered = filter_sequence_metadata(lf_metadata, state_format=StateFormat.NAME) lf_filtered = lf_filtered.collect() # Un-mapped states are dropped from dataset @@ -149,7 +148,7 @@ def test_filter_covid_genome_metadata_state_fips(): } lf_metadata = pl.LazyFrame(test_genome_metadata) - lf_filtered = filter_covid_genome_metadata(lf_metadata, state_format=StateFormat.FIPS) + lf_filtered = filter_sequence_metadata(lf_metadata, state_format=StateFormat.FIPS) lf_filtered = lf_filtered.collect() # Un-mapped states are dropped from dataset @@ -157,20 +156,3 @@ def test_filter_covid_genome_metadata_state_fips(): locations = set(lf_filtered["location"].to_list()) assert locations == {"11", "25", "72"} - - -def test_parse_sequence_assignments(df_assignments): - result = parse_sequence_assignments(df_assignments) - - # resulting dataframe should have an additional column called "seq" - assert Counter(result.columns) == Counter(["seqName", "clade", "seq"]) - - # check resulting sequence numbers - assert Counter(result["seq"].to_list()) == Counter(["PP782799.1", "ABCDEFG", "12345678"]) - - -def test_parse_sequence_duplicates(df_assignments): - df_duplicates = pl.concat([df_assignments, df_assignments]) - - with pytest.raises(ValueError): - parse_sequence_assignments(df_duplicates) From 520add4eb2dead6ca5ec0e1159f4dd6b2a632e38 Mon Sep 17 00:00:00 2001 From: Becky Sweger Date: Mon, 28 Oct 2024 09:45:23 -0400 Subject: [PATCH 6/8] fix up some merge conflicts --- src/cladetime/assign_clades.py | 7 +-- src/cladetime/sequence.py | 88 ++++---------------------------- tests/unit/util/test_sequence.py | 1 - 3 files changed, 11 insertions(+), 85 deletions(-) diff --git a/src/cladetime/assign_clades.py b/src/cladetime/assign_clades.py index 4bbfdff..1b48ab6 100644 --- a/src/cladetime/assign_clades.py +++ b/src/cladetime/assign_clades.py @@ -11,11 +11,8 @@ import structlog from cladetime import CladeTime -from cladetime.sequence import ( - get_covid_genome_data, -) +from cladetime.sequence import _download_from_url, filter_sequence_metadata from cladetime.util.config import Config -from cladetime.util.reference import _download_from_url from cladetime.util.session import _get_session from cladetime.util.timing import time_function @@ -63,7 +60,7 @@ def get_sequence_metadata(metadata: pl.DataFrame, sequence_collection_date: date ] # clean and filter metadata (same process used to generate the weekly clade list) - filtered_metadata = get_covid_genome_data(metadata, cols) + filtered_metadata = filter_sequence_metadata(metadata, cols) # add filters based on user input filtered_metadata = filtered_metadata.filter(pl.col("date") >= sequence_collection_date) diff --git a/src/cladetime/sequence.py b/src/cladetime/sequence.py index 83a5be3..b677abe 100644 --- a/src/cladetime/sequence.py +++ b/src/cladetime/sequence.py @@ -1,10 +1,9 @@ """Functions for retrieving and parsing SARS-CoV-2 virus genome data.""" -import json import lzma -import zipfile -from datetime import datetime, timezone +import os from pathlib import Path +from urllib.parse import urlparse import polars as pl import structlog @@ -12,76 +11,21 @@ from requests import Session from cladetime.types import StateFormat -from cladetime.util.reference import _get_s3_object_url -from cladetime.util.session import _check_response, _get_session +from cladetime.util.session import _get_session from cladetime.util.timing import time_function logger = structlog.get_logger() @time_function -def get_covid_genome_data(released_since_date: str, base_url: str, filename: str): - """ - Download genome data package from NCBI. - FIXME: Download the Nextclade-processed GenBank sequence data (which originates from NCBI) - from https://data.nextstrain.org/files/ncov/open/sequences.fasta.zst instead of using - the NCBI API. - """ - headers = { - "Accept": "application/zip", - } - session = _get_session() - session.headers.update(headers) - - # TODO: this might be a better as an item in the forthcoming config file - request_body = { - "released_since": released_since_date, - "taxon": "SARS-CoV-2", - "refseq_only": False, - "annotated_only": False, - "host": "Homo sapiens", - "complete_only": False, - "table_fields": ["unspecified"], - "include_sequence": ["GENOME"], - "aux_report": ["DATASET_REPORT"], - "format": "tsv", - "use_psg": False, - } - - logger.info("NCBI API call starting", released_since_date=released_since_date) - - response = session.post(base_url, data=json.dumps(request_body), timeout=(300, 300)) - _check_response(response) - - # Originally tried saving the NCBI package via a stream call and iter_content (to prevent potential - # memory issues that can arise when download large files). However, ran into an intermittent error: - # ChunkedEncodingError(ProtocolError('Response ended prematurely'). - # We may need to revisit this at some point, depending on how much data we place to request via the - # API and what kind of machine the pipeline will run on. - with open(filename, "wb") as f: - f.write(response.content) - - -@time_function -def download_covid_genome_metadata( - session: Session, bucket: str, key: str, data_path: Path, as_of: str | None = None, use_existing: bool = False -) -> Path: - """Download the latest GenBank genome metadata data from Nextstrain.""" +def _download_from_url(session: Session, url: str, data_path: Path) -> Path: + """Download a file from the specified URL and save it to data_path.""" - if as_of is None: - as_of_datetime = datetime.now().replace(tzinfo=timezone.utc) - else: - as_of_datetime = datetime.strptime(as_of, "%Y-%m-%d").replace(tzinfo=timezone.utc) - - (s3_version, s3_url) = _get_s3_object_url(bucket, key, as_of_datetime) - filename = data_path / f"{as_of_datetime.date().strftime('%Y-%m-%d')}-{Path(key).name}" - - if use_existing and filename.exists(): - logger.info("using existing genome metadata file", metadata_file=str(filename)) - return filename + parsed_url = urlparse(url) + url_filename = os.path.basename(parsed_url.path) + filename = data_path / url_filename - logger.info("starting genome metadata download", source=s3_url, destination=str(filename)) - with session.get(s3_url, stream=True) as result: + with session.get(url, stream=True) as result: result.raise_for_status() with open(filename, "wb") as f: for chunk in result.iter_content(chunk_size=None): @@ -299,20 +243,6 @@ def get_clade_counts(filtered_metadata: pl.LazyFrame) -> pl.LazyFrame: return counts -def _unzip_sequence_package(filename: Path, data_path: Path): - """Unzip the downloaded virus genome data package.""" - with zipfile.ZipFile(filename, "r") as package_zip: - zip_contents = package_zip.namelist() - is_metadata = next((s for s in zip_contents if "data_report" in s), None) - is_sequence = next((s for s in zip_contents if "genomic" in s), None) - if is_metadata and is_sequence: - package_zip.extractall(data_path) - else: - logger.error("NCBI package is missing expected files", zip_contents=zip_contents) - # Exit the pipeline without displaying a traceback - raise SystemExit("Error downloading NCBI package") - - def parse_sequence_assignments(df_assignments: pl.DataFrame) -> pl.DataFrame: """Parse out the sequence number from the seqName column returned by the clade assignment tool.""" diff --git a/tests/unit/util/test_sequence.py b/tests/unit/util/test_sequence.py index bc1bdf6..515d0df 100644 --- a/tests/unit/util/test_sequence.py +++ b/tests/unit/util/test_sequence.py @@ -1,4 +1,3 @@ -from collections import Counter from pathlib import Path import polars as pl From 2d7271b35b112c891112deb52df84b304769d610 Mon Sep 17 00:00:00 2001 From: Becky Sweger Date: Mon, 28 Oct 2024 11:27:08 -0400 Subject: [PATCH 7/8] Update filter_sequence_metadata docstring This function can also accept and return a polars DataFrame (in addition to a LazyFrame) --- docs/conf.py | 2 ++ src/cladetime/sequence.py | 30 ++++++++++++++++-------------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index d3d0134..45da7dd 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -135,6 +135,8 @@ nitpick_ignore = [ ("py:class", "datetime"), ("py:class", "polars.LazyFrame"), + ("py:class", "polars.dataframe.frame.DataFrame"), + ("py:class", "polars.DataFrame"), ("py:class", "polars.lazyframe.frame.LazyFrame"), ] diff --git a/src/cladetime/sequence.py b/src/cladetime/sequence.py index b677abe..7d32718 100644 --- a/src/cladetime/sequence.py +++ b/src/cladetime/sequence.py @@ -100,14 +100,14 @@ def _get_ncov_metadata( def filter_sequence_metadata( - metadata: pl.LazyFrame, cols: list | None = None, state_format: StateFormat = StateFormat.ABBR -) -> pl.LazyFrame: + metadata: pl.DataFrame | pl.LazyFrame, cols: list | None = None, state_format: StateFormat = StateFormat.ABBR +) -> pl.DataFrame | pl.LazyFrame: """Apply standard filters to Nextstrain's SARS-CoV-2 sequence metadata. - A helper function to apply commonly-used filters to a Polars LazyFrame - that represents Nextstrain's SARS-CoV-2 sequence metadata. It filters - on human sequences from the United States (including Puerto Rico and - Washington, DC). + A helper function to apply commonly-used filters to a Polars DataFrame + or LazyFrame that represents Nextstrain's SARS-CoV-2 sequence metadata. + It filters on human sequences from the United States (including Puerto Rico + and Washington, DC). This function also performs small transformations to the metadata, such as casting the collection date to a date type, renaming columns, @@ -115,11 +115,12 @@ def filter_sequence_metadata( Parameters ---------- - metadata : :class:`polars.LazyFrame` - The :attr:`cladetime.CladeTime.url_sequence_metadata` - attribute of a :class:`cladetime.CladeTime` object. This parameter - represents SARS-CoV-2 sequence metadata produced by Nextstrain - as an intermediate file in their daily workflow + metadata : :class:`polars.DataFrame` or :class:`polars.LazyFrame` + A Polars DataFrame or LazyFrame that represents SARS-CoV-2 + sequence metadata produced by Nextstrain as an intermediate file in + their daily workflow. This parameter is often the + :attr:`cladetime.CladeTime.url_sequence_metadata` attribute + of a :class:`cladetime.CladeTime` object cols : list Optional. A list of columns to include in the filtered metadata. The default columns included in the filtered metadata are: @@ -131,9 +132,10 @@ def filter_sequence_metadata( Returns ------- - :class:`polars.LazyFrame` - A Polars LazyFrame that represents the filtered SARS-CoV-2 sequence - metadata. + :class:`polars.DataFrame` or :class:`polars.LazyFrame` + A Polars object that represents the filtered SARS-CoV-2 sequence + metadata. The type of returned object will match the type of the + function's metadata parameter. Raises ------ From 87c06aab042dd2007ed121de5004652fb2f2c6ac Mon Sep 17 00:00:00 2001 From: Becky Sweger Date: Mon, 28 Oct 2024 11:39:07 -0400 Subject: [PATCH 8/8] Falsy fix --- src/cladetime/sequence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cladetime/sequence.py b/src/cladetime/sequence.py index 7d32718..c06ffbc 100644 --- a/src/cladetime/sequence.py +++ b/src/cladetime/sequence.py @@ -178,7 +178,7 @@ def filter_sequence_metadata( raise ValueError(f"Invalid state_format. Must be one of: {list(StateFormat.__members__.items())}") # Default columns to include in the filtered metadata - if not cols: + if cols is None: cols = [ "clade_nextstrain", "country",