Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/align with paper #11

Open
wants to merge 124 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
124 commits
Select commit Hold shift + click to select a range
8e06855
Remove previous mirror scripts
nick-j-roberts Jun 15, 2023
e27244c
Delete removed transfer scripts from readme
nick-j-roberts Jun 15, 2023
f857484
Start scaffolding structure of mirror job
nick-j-roberts Jun 15, 2023
fd7da25
Upgrade pip
nick-j-roberts Jun 16, 2023
653707b
Add pyshacl
nick-j-roberts Jun 16, 2023
a4f8058
Reorganize commonly used classes
nick-j-roberts Jun 16, 2023
8ccd345
Finish url verification and zip download functions
nick-j-roberts Jun 16, 2023
67c3a86
Finish url verificiation, download, and geospatial extent for RFCs
nick-j-roberts Jun 16, 2023
633531d
Delete cloud utils
nick-j-roberts Jun 16, 2023
b5b137b
Moved class and create source dataset skolemizer
nick-j-roberts Jun 16, 2023
94d719a
Temporarily change compose statement
nick-j-roberts Jun 19, 2023
daaceb9
Start on handwritten TTL ontology
nick-j-roberts Jun 19, 2023
bb623fe
Make transposition region and watershed region apply only to transpos…
nick-j-roberts Jun 20, 2023
b38544f
Add AORC namespace pointing to v0.9
nick-j-roberts Jun 20, 2023
f0728a3
Update aorc ontology
nick-j-roberts Jun 20, 2023
9a6025c
Create SHACL shapes for AORC ontology
nick-j-roberts Jun 20, 2023
cc83438
Get git and docker hashes and expose them to docker container
nick-j-roberts Jun 20, 2023
c984c0b
Fix reachable git url check
nick-j-roberts Jun 20, 2023
1ff1293
Add docker push before compose
nick-j-roberts Jun 20, 2023
e84d03a
use load_dotenv
nick-j-roberts Jun 20, 2023
cc4dee4
Add graphdb to docker compose
nick-j-roberts Jun 21, 2023
2c3cde2
Add guide on adding CKAN extension within docker environment
nick-j-roberts Jun 26, 2023
97f467c
Ignore graphdb temp data changes
nick-j-roberts Jul 1, 2023
07d39e1
Get rid of extension guide
nick-j-roberts Jul 1, 2023
b0311cd
Start on integration of CKAN extensions to mirror job
nick-j-roberts Jul 1, 2023
41ebcee
Progressed on mirror upload function
nick-j-roberts Jul 2, 2023
bfc078f
Add docker ignore file
nick-j-roberts Jul 2, 2023
d8d3072
Make compose which uses graphdb a dev version
nick-j-roberts Jul 2, 2023
45bd276
Revise docker compose statement
nick-j-roberts Jul 2, 2023
74deb31
Add env vars
nick-j-roberts Jul 2, 2023
85385a1
Reduce uncessesary copies
nick-j-roberts Jul 2, 2023
2992bbf
Change to safe requests version
nick-j-roberts Jul 2, 2023
a7096d8
Update structure and provenance resources
nick-j-roberts Jul 2, 2023
216d6f7
Update initiation shell script
nick-j-roberts Jul 2, 2023
03b6744
Attempt error fix
nick-j-roberts Jul 2, 2023
f29016c
End if block
nick-j-roberts Jul 2, 2023
460e9df
Include env vars in compose
nick-j-roberts Jul 2, 2023
447f6ff
Get rid of useless push statement
nick-j-roberts Jul 2, 2023
ec9bad5
Copy reqs
nick-j-roberts Jul 2, 2023
501985e
Put in pull statement
nick-j-roberts Jul 2, 2023
2983071
Took volume out of prod compose
nick-j-roberts Jul 2, 2023
a073e3a
Move timedelta translation
nick-j-roberts Jul 2, 2023
6654c77
Fix default arg
nick-j-roberts Jul 2, 2023
8baa31a
Fix namespace for IANA
nick-j-roberts Jul 2, 2023
69e7fd4
Fix return
nick-j-roberts Jul 2, 2023
53d7877
Comment unused graph creator class
nick-j-roberts Jul 2, 2023
a8434d9
Add TODO
nick-j-roberts Jul 2, 2023
ee62d31
Add argument validator to init.sh
nick-j-roberts Jul 2, 2023
bb449e1
Undo mistaken copy
nick-j-roberts Jul 2, 2023
d68f766
Narrow copy
nick-j-roberts Jul 2, 2023
b0a6fa8
Fixed equality comparison
nick-j-roberts Jul 2, 2023
a8abc15
Change to relative imports, fix upload
nick-j-roberts Jul 3, 2023
88e01c4
Simplify geoms to allow for CKAN upload
nick-j-roberts Jul 3, 2023
bb381b4
Undid relative imports
nick-j-roberts Jul 3, 2023
7ae1adc
Added geom simplification to allow CKAN upload
nick-j-roberts Jul 3, 2023
def8126
Replace simplifier with convex hull
nick-j-roberts Jul 3, 2023
f174b90
Change JSON-LD serialization to TTL
nick-j-roberts Jul 3, 2023
121a9e2
Get rid of sys.argv extensions
nick-j-roberts Jul 3, 2023
72e5537
Do modification of upload params in main mirror.py
nick-j-roberts Jul 3, 2023
4458b33
Rename preserved 'id'
nick-j-roberts Jul 3, 2023
f866cc6
Take out left over formatting in upload
nick-j-roberts Jul 3, 2023
e21c777
Bind namespaces
nick-j-roberts Jul 3, 2023
33bf793
Add resource creation to dataset upload
nick-j-roberts Jul 3, 2023
bfcc9e7
Add resources to upload
nick-j-roberts Jul 3, 2023
e8eaa8c
Re-enable upload of mirror
nick-j-roberts Jul 3, 2023
aed4377
Reorganize general utils
nick-j-roberts Jul 3, 2023
26c699b
Started structuring composite refactor
nick-j-roberts Jul 3, 2023
a47206f
Take metadata retrieval
nick-j-roberts Jul 5, 2023
973d4b5
Create pseudocode
nick-j-roberts Jul 5, 2023
d502045
First shot at composite job
nick-j-roberts Jul 6, 2023
ea952b8
Delete done TODO
nick-j-roberts Jul 6, 2023
d359e9a
Fix exit method
nick-j-roberts Jul 6, 2023
84a15f6
Fix temporal property path
nick-j-roberts Jul 6, 2023
f0d385f
Fix source url attribution
nick-j-roberts Jul 6, 2023
f683787
Reenable json writing
nick-j-roberts Jul 6, 2023
e194a6a
Fix import, take dev limit off urls
nick-j-roberts Jul 6, 2023
4b43252
Fix bucket parameter
nick-j-roberts Jul 6, 2023
1555138
Correct dataset id and description assignments
nick-j-roberts Jul 6, 2023
eca0af1
Align ids with hourly data
nick-j-roberts Jul 6, 2023
5482650
Unnest dataset creation
nick-j-roberts Jul 7, 2023
94e55a5
Fix key misname in upload
nick-j-roberts Jul 7, 2023
4c4f0d1
Make URIs lowercase
nick-j-roberts Jul 7, 2023
670c358
Move demo to full
nick-j-roberts Jul 7, 2023
61d8497
Fix json writing
nick-j-roberts Jul 7, 2023
cd7981a
Fix sqlite syntax
nick-j-roberts Jul 7, 2023
d7e2c5c
Add required compress type param
nick-j-roberts Jul 7, 2023
5da0551
Ensure all source mirrors are recorded
nick-j-roberts Jul 7, 2023
d11a2f8
Undo JSON writes for mirror uploads
nick-j-roberts Jul 7, 2023
422fdf3
Get rid of old methods for composite RDF creation
nick-j-roberts Jul 7, 2023
497bf14
Update psuedocode for transposition job
nick-j-roberts Jul 7, 2023
f1dd99e
Reorganize composite imports
nick-j-roberts Jul 7, 2023
b62fd8d
Add ms index and dss URI
nick-j-roberts Jul 7, 2023
373d240
Remove unused import
nick-j-roberts Jul 7, 2023
0792859
First draft of transposition metadata creator
nick-j-roberts Jul 7, 2023
e1572ef
First draft of transposition metadata creator
nick-j-roberts Jul 7, 2023
bca941d
Edit pseudocode
nick-j-roberts Jul 7, 2023
23d80a6
Fix broken import, convert list to dict
nick-j-roberts Jul 7, 2023
c027c14
Fix geojson streaming, convert to convex hull
nick-j-roberts Jul 7, 2023
3167afe
Remove unneeded imports
nick-j-roberts Jul 7, 2023
0519a37
Add meilisearch and dependencies
nick-j-roberts Jul 7, 2023
772c193
Remove irrelevant ckan scripts
nick-j-roberts Jul 7, 2023
009a4b5
Remove outdated DCAT-US extension work
nick-j-roberts Jul 7, 2023
3cf4b9c
Remove unused utils
nick-j-roberts Jul 7, 2023
e297240
Add # to aorc namespace
nick-j-roberts Jul 7, 2023
9a37f31
Fix syntax errors
nick-j-roberts Jul 7, 2023
096189a
Regenerate ontology HTML
nick-j-roberts Jul 7, 2023
fa8bd4e
Get rid of logs and mirrors directories
nick-j-roberts Jul 7, 2023
e83f4a5
Get rid of outdates rdf writing utils
nick-j-roberts Jul 7, 2023
8a76b31
modify rdf2py
nick-j-roberts Jul 7, 2023
bcfbe6e
Delete pseudocode
nick-j-roberts Jul 7, 2023
62ea943
Update readmes
nick-j-roberts Jul 7, 2023
c261941
Move bucket to .env
nick-j-roberts Jul 7, 2023
93c7761
Add .env example
nick-j-roberts Jul 7, 2023
f25440b
Update usage in readme
nick-j-roberts Jul 7, 2023
ef575f6
Remove broken dependency
nick-j-roberts Jul 7, 2023
044e407
Fix filter in sparql
nick-j-roberts Jul 7, 2023
1e9a864
Remove test prefix
nick-j-roberts Jul 7, 2023
516e13e
Get rid of unused class
nick-j-roberts Jul 7, 2023
bd03917
Move ckan AORC plugin to blobfish
nick-j-roberts Jul 11, 2023
254b1be
Add logging to mirror job
nick-j-roberts Jul 11, 2023
b024d5c
Add logging to composite and transposition jobs
nick-j-roberts Jul 11, 2023
2bd811e
Add script summary headers
nick-j-roberts Jul 11, 2023
94e6e90
Finish adding docstring, fix typo
nick-j-roberts Jul 11, 2023
824abbb
Update aorc.ttl
nick-j-roberts Aug 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Finish adding docstring, fix typo
  • Loading branch information
nick-j-roberts committed Jul 11, 2023
commit 94e6e9035081a4fe6ac8eb64a796e7b95597c7c0
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# blobfish

## Summary
This repository is a proof of concept to evaluate the usability of resource description framework (RDF) metadata in documenting extensible data pipelines. The data pipeline documented by this repository is the mirroring, transformation, and subsequent use of NOAA AORC gridded precipitation data for stochaistic storm transposition (SST) modeling.
This repository is a proof of concept to evaluate the usability of resource description framework (RDF) metadata in documenting extensible data pipelines. The data pipeline documented by this repository is the mirroring, transformation, and subsequent use of NOAA AORC gridded precipitation data for stochastic storm transposition (SST) modeling.

## Pipeline Description
This pipeline can be broken into 3 stages
Expand Down
2 changes: 1 addition & 1 deletion blobfish/aorc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ Main script for the mirror creation task
Responsible for verifying the total available data from NOAA, its asynchronous acquistion, and its upload to s3, as well as collecting and uploading relevant metadata not only for the source data but also the mirror datasets created in the process

### transposition_meta.py
Main script for collecting, parsing, and submitting metadata created during stochaistic storm transposition models to CKAN for serialization as RDF
Main script for collecting, parsing, and submitting metadata created during stochastic storm transposition models to CKAN for serialization as RDF
27 changes: 27 additions & 0 deletions blobfish/aorc/classes/composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def __exit__(self, exception_type, exception_value, traceback) -> Self:
logging.warning(f"Traceback: {traceback}")

def register_netcdfs(self, mirror_dataset: URIRef, nc_paths: list[str]) -> None:
"""Adds mirror dataset uris and netCDF paths to sqlite database for tracking

Args:
mirror_dataset (URIRef): Mirror dataset URI
nc_paths (list[str]): List of netCDF files associated with the mirror dataset
"""
insert_rows = []
for nc_path in nc_paths:
match = re.search(self.datetime_pattern, nc_path)
Expand All @@ -55,16 +61,37 @@ def register_netcdfs(self, mirror_dataset: URIRef, nc_paths: list[str]) -> None:
self.cur.executemany("insert into mirror_datasets VALUES(?, ?, ?)", insert_rows)

def get_nc_files(self, timestamp: datetime.datetime) -> list[str]:
"""Retrieve netCDF file names with a given timestamp

Args:
timestamp (datetime.datetime): Timestamp of interest

Returns:
list[str]: List of netCDF files
"""
self.cur.execute("select nc_path from mirror_datasets where t = ?", timestamp.isoformat())
nc_files = [f[0] for f in self.cur.fetchall()]
return nc_files

def get_mirror_datasets(self, timestamp: datetime.datetime) -> list[URIRef]:
"""Retrieve mirror dataset URIs with a given timestamp

Args:
timestamp (datetime.datetime): Timestamp of interest

Returns:
list[URIRef]: List of mirror dataset URIs
"""
self.cur.execute("select DISTINCT uri from mirror_datasets where t = ?", timestamp.isoformat())
mirror_datasets = [URIRef(f[0]) for f in self.cur.fetchall()]
return mirror_datasets

def group_data_by_time(self) -> list[tuple[list[str], list[URIRef], datetime.datetime]]:
"""Gets all netCDF files and mirror URIs grouped by timestamp

Returns:
list[tuple[list[str], list[URIRef], datetime.datetime]]: Tuple containing a list of netCDF files, a list of mirror dataset URIs, and the datetime which is within the temporal coverage of the mirror datasets and netCDF files, in that order
"""
results = []
for nc_path_concat, uri_concat, t in self.cur.execute(
"select GROUP_CONCAT(nc_path) AS nc_path_concat, GROUP_CONCAT(uri) AS uri_concat, t from mirror_datasets group by t"
Expand Down
8 changes: 8 additions & 0 deletions blobfish/aorc/classes/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ class AORCDataURL:
additional_args: dict = None

def s3_key(self, prefix: str = "mirrors/aorc/precip/") -> str:
"""Construct assumed s3 key for mirror dataset

Args:
prefix (str, optional): prefix to attach to zip file. Defaults to "mirrors/aorc/precip/".

Returns:
str: mirror dataset s3 key
"""
if prefix[-1] != "/":
prefix += "/"
url_parts = self.url.split("/")
Expand Down
22 changes: 22 additions & 0 deletions blobfish/aorc/classes/transposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,45 @@ class TranspositionMetadata:
atlas_s3: str = field(init=False)

def __get_s3_base(self) -> tuple[str, list[str], str]:
"""Gets transposition s3 resource divided into the bucket, the parts of the uri between the bucket and basename, and the basename of the key
ex: s3://bucket/1/2/3/key.zip -> ('bucket', [1, 2, 3], 'key.zip')

Returns:
tuple[str, list[str], str]: s3 resource divided into the bucket, the parts of the uri between the bucket and basename, and the basename of the key
"""
geo_bucket, geo_key = extract_bucketname_and_keyname(self.transposition_geo_s3)
key_parts = geo_key.split("/")
return geo_bucket, key_parts[:-1], key_parts[-1]

def __create_dss_s3_path(self) -> str:
"""Creates presumed path to dss s3 resource

Returns:
str: DSS s3 URI
"""
bucket, key_parts, transpo_file_name = self.__get_s3_base()
transpo_folder = transpo_file_name.replace(".geojson", "")
path = os.path.join(bucket, *key_parts, transpo_folder, "72h", "dss", self.start_time.strftime("%Y%m%d.dss"))
return f"s3://{path}"

def __create_atlas_s3_path(self) -> str:
"""Gives the ATLAS14 path used during normalization - this uses a single .vrt file currently so just returns that value

Returns:
str: s3 URI of ATLAS dataset
"""
return "s3://tempest/noaa-atlas-14/2yr03da.vrt"

def __create_max_precip_point(self) -> Point:
"""Creates Point feature from metadata for max precip

Returns:
Point: Point conversion of max precip metadata
"""
return Point(self.max_precip_x, self.max_precip_y)

def __post_init__(self):
"""Creates dss_s3, atlas_s3, and max_precip_point attributes"""
self.max_precip_point = self.__create_max_precip_point()
self.dss_s3 = self.__create_dss_s3_path()
self.atlas_s3 = self.__create_atlas_s3_path()
19 changes: 19 additions & 0 deletions blobfish/aorc/composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@
def create_composite_dataset_identifiers(
start_date: datetime.datetime, end_date: datetime.datetime, location_name: str
) -> BasicDescriptors:
"""Creates required identifiers for composite dataset

Args:
start_date (datetime.datetime): start date for composite dataset
end_date (datetime.datetime): end date for composite dataset
location_name (str): name of spatial coverage location

Returns:
BasicDescriptors: Descriptors including title, id, name, URL, and description
"""
dataset_id = f"composite_{start_date.strftime('%Y%m%d%H')}".lower()
dataset_name = dataset_id
start_time_formatted = start_date.strftime("%Y-%m-%d %H:%M")
Expand All @@ -22,6 +32,15 @@ def create_composite_dataset_identifiers(


def create_composite_s3_path(bucket: str, start_time: datetime.datetime) -> str:
"""Creates s3 URI of zarr for composite dataset

Args:
bucket (str): Target bucket
start_time (datetime.datetime): start time for dataset

Returns:
str: s3 URI
"""
return f"s3://{bucket}/transforms/aorc/precipitation/{start_time.year}/{start_time.strftime('%Y%m%d%H')}.zarr"


Expand Down
14 changes: 14 additions & 0 deletions blobfish/aorc/composite_utils/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@


def create_composite_dataset(dataset_paths: set[str]) -> xr.Dataset:
"""Merges netCDF files provided into single dataset based on shared time coordinate

Args:
dataset_paths (set[str]): List of netCDF file paths

Returns:
xr.Dataset: Spatially merged data
"""
datasets = []
for dataset_path in dataset_paths:
ds = xr.open_dataset(dataset_path)
Expand All @@ -14,4 +22,10 @@ def create_composite_dataset(dataset_paths: set[str]) -> xr.Dataset:


def upload_zarr(zarr_s3_path: str, dataset: xr.Dataset) -> None:
"""Uploads dataset to zarr format

Args:
zarr_s3_path (str): s3 target path for zarr dataset
dataset (xr.Dataset): Dataset to upload
"""
dataset.to_zarr(store=storage.FSStore(zarr_s3_path))
19 changes: 19 additions & 0 deletions blobfish/aorc/composite_utils/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@


def stream_s3_zipped(s3_resource, s3_uri: str, extract_directory: str) -> list[str]:
"""Streams zipped s3 resource to provided directory

Args:
s3_resource: s3 service resource to use in streaming
s3_uri (str): s3 uri of zipped resource (ie s3://bucket/key.zip)
extract_directory (str): Directory to which zipped resources will be extracted

Returns:
list[str]: Paths of unzipped data
"""
bucket, key = extract_bucketname_and_keyname(s3_uri)
response = s3_resource.meta.client.get_object(Bucket=bucket, Key=key)
zip_data = response["Body"].read()
Expand All @@ -18,6 +28,15 @@ def stream_s3_zipped(s3_resource, s3_uri: str, extract_directory: str) -> list[s


def check_zarr_modification(s3_resource, zarr_path: str) -> datetime.datetime:
"""Checks modification date of a .zmetadata resource in a zarr dataset

Args:
s3_resource: s3 service resource to use in streaming
zarr_path (str): s3 uri of zarr dataset (ie s3://bucket/zarr_dataset)

Returns:
datetime.datetime: _description_
"""
bucket, key = extract_bucketname_and_keyname(zarr_path)
key += "/.zmetadata"
obj = s3_resource.Object(bucket, key)
Expand Down
10 changes: 10 additions & 0 deletions blobfish/aorc/composite_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def upload_composite_to_ckan(
resources: list[dict],
**kwargs,
) -> int:
"""Uploads composite dataset JSON data to CKAN"""

if not ckan_base_url.endswith("/"):
ckan_base_url = ckan_base_url[:-1]
upload_endpoint = f"{ckan_base_url}/api/3/action/package_create"
Expand Down Expand Up @@ -73,6 +75,14 @@ def upload_composite_to_ckan(


def create_composite_wkt(mirror_wkts: list[str]) -> str:
"""Converts list of polygon WKTs into a convex hull of all the provided WKTs merged together

Args:
mirror_wkts (list[str]): List of polygon WKTs

Returns:
str: Convex hull of merged polygons
"""
logging.info("Merging geometries from RFC regions into composite coverage area")
polys = [shapely.wkt.loads(wkt) for wkt in mirror_wkts]
multipoly = MultiPolygon(polys)
Expand Down
21 changes: 21 additions & 0 deletions blobfish/aorc/composite_utils/rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@


def retrieve_mirror_dataset_metadata(ckan_base_url: str, rfc_count: int) -> Iterator[list[RetrievedMirror]]:
"""Get mirror dataset metadata from CKAN catalog

Args:
ckan_base_url (str): Base CKAN instance URL
rfc_count (int): RFC count

Yields:
Iterator[list[RetrievedMirror]]: Yields mirror dataset metadata
"""
logging.info("Retreiving mirror dataset metadata")
if not ckan_base_url.endswith("/"):
ckan_base_url += "/"
Expand Down Expand Up @@ -43,6 +52,18 @@ def retrieve_mirror_dataset_metadata(ckan_base_url: str, rfc_count: int) -> Iter


def verify_date_rfc_count(catalog_graph: Graph, rfc_count: int) -> Iterator[tuple[Literal, Literal]]:
"""Verifies that mirror dataset count in a catalog matches expected count

Args:
catalog_graph (Graph): Graph of mirror dataset catalog
rfc_count (int): Count of RFCs that will be merged together when composite dataset is created

Raises:
ValueError: Error if count doesn't match

Yields:
Iterator[tuple[Literal, Literal]]: Yields tuple of unique start date and end date literals found in catalog associated with mirror datasets
"""
logging.info(f"Verifying match between mirror dataset count and RFC count")
query_string = """
SELECT ?sd ?ed
Expand Down
23 changes: 23 additions & 0 deletions blobfish/aorc/general_utils/ckan.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ def create_ckan_resource(
description: str,
s3: bool,
) -> dict:
"""Creates the general dict form of a dcat:Distribution instance as expected in CKAN

Args:
download_url (str): Download URL
format (str): URI of data format
compress_format (str | None): URI of compression format, if there is compression
description (str): Description of distribution
s3 (bool): True if distribution is on s3 and has implied access rights constraint

Returns:
dict: Distribution data
"""
args_dict = {}
if s3:
args_dict["access_rights"] = "Access to distribution requires access to parent s3 bucket"
Expand All @@ -28,6 +40,17 @@ def create_ckan_resource(


def query_ckan_catalog(catalog_url: str) -> Graph:
"""Queries provided catalog for RDF data

Args:
catalog_url (str): URL of dataset catalog

Raises:
ValueError: Catalog URL doesn't have expected format

Returns:
Graph: Parsed catalog graph
"""
logging.info(f"Querying CKAN catalog {catalog_url}")
graph = Graph()
if catalog_url.endswith(".ttl"):
Expand Down
21 changes: 21 additions & 0 deletions blobfish/aorc/general_utils/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,34 @@


def create_s3_resource(access_key_id: str, secret_access_key: str, region_name: str):
"""Creates s3 resource

Args:
access_key_id (str): AWS access key ID
secret_access_key (str): AWS secret access key
region_name (str): Default region for AWS resource to use

Returns:
Any: s3 resource
"""
logging.info("Creating s3 resource")
session = boto3.Session(access_key_id, secret_access_key, region_name=region_name)
resource = session.resource("s3")
return resource


def extract_bucketname_and_keyname(s3path: str) -> tuple[str, str]:
"""Extracts bucket and key name from s3 URI

Args:
s3path (str): s3 URI (ex: s3://bucket/key)

Raises:
ValueError: Error if path doesn't have correct format

Returns:
tuple[str, str]: Tuple of bucket and key
"""
if not s3path.startswith("s3://"):
raise ValueError(f"s3path does not start with s3://: {s3path}")
bucket, _, key = s3path[5:].partition("/")
Expand Down
22 changes: 22 additions & 0 deletions blobfish/aorc/general_utils/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@


def retrieve_meta() -> ProvenanceMetadata:
"""Retrieves provenance metadata from environment variables

Returns:
ProvenanceMetadata: Provenance metadata
"""
relative_composite_path = os.environ["COMPOSE_FILE_PATH"]
relative_docker_file_path = os.environ["DOCKER_FILE_PATH"]
git_url = os.environ["GIT_REPO"]
Expand All @@ -26,6 +31,18 @@ def retrieve_meta() -> ProvenanceMetadata:


def create_raw_content_url(relative_path: str, git_url: str) -> str:
"""Creates the publicly accessible format of raw content at github address

Args:
relative_path (str): relative path of resource in git repo
git_url (str): github repo url

Raises:
ValueError: GitHub url is not in correct format

Returns:
str: Raw content URL
"""
if not git_url.startswith("raw."):
git_url = git_url.replace("https://github.com", "https://raw.githubusercontent.com/", 1)
git_url = git_url.replace("/commit", "", 1)
Expand All @@ -35,6 +52,11 @@ def create_raw_content_url(relative_path: str, git_url: str) -> str:


def get_command_list() -> list[str]:
"""Retrieves command list supplied to python

Returns:
list[str]: Commands, split by space delimiter
"""
command_list = [sys.executable, *sys.argv]
logging.info(f"Command list: {command_list}")
return command_list
Loading