Skip to content

Commit

Permalink
GTC-2928 GTC-2929 Allow id fields other than 'fid' & cleanups
Browse files Browse the repository at this point in the history
 - Allow id fields other than 'fid'
 - Check that input feature collection has id_field as a
   column/property, return error if not.
 - Change the results to identify the feature ids using the same
   id_field name they supplied, rather than 'geometry_id'
 - Changes to the state function to deal properly with errors in the
   preprocessing lambda - we need to check the status, and exit the step
   function early with the error message.
 - Include 'query' in the final result, to confirm to user its
   results for his/her query.
  • Loading branch information
danscales committed Aug 13, 2024
1 parent 6c7d72a commit 1c93633
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
12 changes: 6 additions & 6 deletions lambdas/aggregation/src/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
from aws_xray_sdk.core import patch, xray_recorder

from raster_analysis.boto import s3_client
from raster_analysis.exceptions import QueryParseException
from raster_analysis.globals import LOGGER

patch(["boto3"])


@xray_recorder.capture("Aggregation")
def handler(event, context):
id_field = event["id_field"]
query = event["query"]
results_meta = event["distributed_map"]["ResultWriterDetails"]
try:
bucket = results_meta["Bucket"]
Expand All @@ -28,11 +29,11 @@ def handler(event, context):
result = json.loads(geom_result["Output"])
if result["status"] == "success":
combined_data.append(
{"result": result["data"], "geometry_id": result["fid"]}
{"result": result["data"], id_field: result["fid"]}
)
else:
failed_geometries.append(
{"geometry_id": result["fid"], "detail": result["message"]}
{id_field: result["fid"], "detail": result["message"]}
)

for failed_record in manifest["ResultFiles"]["FAILED"]:
Expand All @@ -42,7 +43,7 @@ def handler(event, context):
input = json.loads(error["Input"])
if error["Status"] == "FAILED":
failed_geometries.append(
{"geometry_id": input["fid"], "detail": error["Error"]}
{id_field: input["fid"], "detail": error["Error"]}
)

LOGGER.info("Successfully aggregated results")
Expand Down Expand Up @@ -87,13 +88,12 @@ def handler(event, context):

return {
"status": status,
"query": query,
"data": {
"download_link": download_link,
"failed_geometries_link": failed_geometries_link,
},
}
except QueryParseException as e:
return {"status": "failed", "message": str(e)}
except Exception as e:
LOGGER.exception(e)
return {"status": "error", "message": str(e)}
10 changes: 6 additions & 4 deletions lambdas/preprocessing/src/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from shapely.wkb import dumps as wkb_dumps

from raster_analysis.boto import s3_client
from raster_analysis.exceptions import QueryParseException
from raster_analysis.globals import LOGGER, S3_PIPELINE_BUCKET

patch(["boto3"])
Expand All @@ -32,6 +31,9 @@ def handler(event, context):
else:
raise Exception("Please specify GeoJSON via (only) one parameter!")

if id_field not in gpdf.columns.tolist():
raise Exception(f"Input feature collection is missing ID field '{id_field}'")

rows = []
for record in gpdf.itertuples():
geom_wkb = wkb_dumps(getattr(record, "geometry"), hex=True)
Expand All @@ -44,7 +46,9 @@ def handler(event, context):

with tempfile.TemporaryDirectory() as tmp_dir:
some_path = os.path.join(tmp_dir, "geometries.csv")
df = pd.DataFrame(rows, columns=[id_field, "geometry"])
# In the file sent to the distributed map, use the standard name 'fid'
# for the id field, to make the step function code simpler.
df = pd.DataFrame(rows, columns=["fid", "geometry"])
df.to_csv(some_path, index=False)

upload_to_s3(some_path, S3_PIPELINE_BUCKET, geom_prefix)
Expand All @@ -54,8 +58,6 @@ def handler(event, context):
"geometries": {"bucket": S3_PIPELINE_BUCKET, "key": geom_prefix},
"output": {"bucket": S3_PIPELINE_BUCKET, "prefix": output_prefix},
}
except QueryParseException as e:
return {"status": "failed", "message": str(e)}
except Exception as e:
LOGGER.exception(e)
return {"status": "error", "message": str(e)}
Expand Down
34 changes: 29 additions & 5 deletions step_functions/process_list.json.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,38 @@
"Preprocessing": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Comment": "This lambda takes uri, id_field, and query [other fields also passed], chooses a output folder name [maybe using hash of data and query], creates a CSV with a row for each feature id and each geometry in WKB format, and outputs geometries.bucket, geometries.key [where CSV file was put], and output.bucket, and output.prefix [where intermediate and final results will go]",
"Comment": "This lambda takes uri or feature collection, id_field, and query [other fields also passed], chooses a output folder name [maybe using hash of data and query], creates a CSV with a row for each feature id and each geometry in WKB format, and outputs geometries.bucket, geometries.key [where CSV file was put], and output.bucket, and output.prefix [where intermediate and final results will go]",
"Parameters": {
"Payload.$": "$",
"FunctionName": "${lambda_preprocessing_name}"
},
"ResultSelector": {
"geometries.$": "$.Payload.geometries",
"output.$": "$.Payload.output"
"Payload.$": "$.Payload"
},
"ResultPath": "$.PreprocOutput",
"Next": "Check status"
},
"Check status": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.PreprocOutput.Payload.status",
"StringEquals": "error",
"Next": "Error state"
}
],
"Default": "Copy results"
},
"Error state": {
"Type": "Pass",
"OutputPath": "$.PreprocOutput.Payload",
"End": true
},
"Copy results": {
"Type": "Pass",
"Parameters": {
"geometries.$": "$.PreprocOutput.Payload.geometries",
"output.$": "$.PreprocOutput.Payload.output"
},
"ResultPath": "$.files",
"Next": "Process List"
Expand All @@ -29,7 +53,7 @@
"process_geometry": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Comment": "This lambda takes a query, data_environment, a feature id fid, and a geometry in WKB format, and returns status, data [results for this feature as CSV], and fid. The results will be written out to the output bucket by ResultWriter",
"Comment": "This lambda takes a query, data_environment, a feature id fid, and a geometry in WKB format, and returns status, data [results for this feature as dictionary], and fid. The results will be written out to the output bucket by ResultWriter",
"Parameters": {
"Payload.$": "$",
"FunctionName": "${lambda_list_tiled_raster_analysis_name}"
Expand Down Expand Up @@ -85,7 +109,7 @@
"Aggregation": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Comment": "This function takes files.output.bucket, files.output.prefix, distributed_map, and job_id [other fields also passed], aggregates the results to a new file, and puts the S3 URI of the result in its output as download_link",
"Comment": "This function takes files.output.bucket, files.output.prefix, distributed_map, and id_field [other fields also passed], aggregates the results to a new file, and puts the S3 URI of the result in its output as download_link",
"Parameters": {
"Payload.$": "$",
"FunctionName": "${lambda_aggregation_name}"
Expand Down

0 comments on commit 1c93633

Please sign in to comment.