diff --git a/.github/workflows/terraform_build.yaml b/.github/workflows/terraform_build.yaml index 8eef729..41b1cda 100644 --- a/.github/workflows/terraform_build.yaml +++ b/.github/workflows/terraform_build.yaml @@ -26,7 +26,7 @@ jobs: - name: Test with pytest env: ENV: test - AWS_ACCESS_KEY_ID: ${{ secrets.aws_key_dev }} + AWS_ACCESS_KEY_ID: ${{ secrets.a ws_key_dev }} AWS_SECRET_ACCESS_KEY: ${{ secrets.aws_secret_dev }} AWS_REGION: ${{ secrets.aws_region_dev }} AWS_XRAY_SDK_ENABLED: false diff --git a/README.md b/README.md index 118c93f..e00db20 100644 --- a/README.md +++ b/README.md @@ -275,23 +275,3 @@ The GFW data lake is now in production, so this service will soon point to that * wdpa_protected_areas * whrc_aboveground_biomass_stock_2000 * wwf_eco_regions - -### VIIRS/MODIS Alerts - -These alerts are currently unsupported because we don't rasterize these layers. Instead, we store all enriched points in an document dataset. You can do on-the-fly analysis for these via SQL. (TBD: do we want to just forward that through here so there's only one endpoint?) - -### Aggregation - -There will be a new `String` parameter called `agg` that will accept one of `day | week | month | year` and return results aggregated by that timeline. - -### Whitelist - -There will be a new endpoint that will return a whitelist of whether layers intersect the input geometry. Details TBD. - -### Misc Layers - -Need to decide if/how we will support miscellaneous layers on GFW but not maintained in the data lake, like PRODES and Terra-i. TBD. - -### Lat/Lon Coordinates - -A new analysis will be added to retrieve lat/lon coordinates of points (e.g. for GLAD alerts). diff --git a/lambdas/raster_analysis/src/lambda_function.py b/lambdas/raster_analysis/src/lambda_function.py index 7f3bd5d..1aa96db 100644 --- a/lambdas/raster_analysis/src/lambda_function.py +++ b/lambdas/raster_analysis/src/lambda_function.py @@ -17,6 +17,8 @@ def handler(event, context): try: LOGGER.info(f"Running analysis with parameters: {event}") results_store = AnalysisResultsStore() + tile = event["tile"] + event = event["payload"] if "geometry" in event: source_geom = event["geometry"] @@ -27,7 +29,7 @@ def handler(event, context): else: raise KeyError("No valid geometry field") - tile_geojson = event.get("tile", None) + tile_geojson = tile.get("tile", None) geom_tile = GeometryTile(source_geom, tile_geojson, is_encoded) if not geom_tile.geom: @@ -44,10 +46,10 @@ def handler(event, context): results: DataFrame = query_executor.execute() LOGGER.debug(f"Ran analysis with results: {results.head(100)}") - results_store.save_result(results, event["cache_id"]) + results_store.save_result(results, tile["cache_id"]) except Exception as e: LOGGER.exception(e) results_store = AnalysisResultsStore() - results_store.save_status(event["cache_id"], ResultStatus.error, 0, str(e)) + results_store.save_status(tile["cache_id"], ResultStatus.error, 0, str(e)) raise e diff --git a/raster_analysis/boto.py b/raster_analysis/boto.py index 3bea8a0..f2cc7ec 100644 --- a/raster_analysis/boto.py +++ b/raster_analysis/boto.py @@ -1,13 +1,14 @@ -import boto3 import json -from typing import Callable, Dict, Any +from typing import Any, Callable, Dict + +import boto3 from raster_analysis.globals import ( - LOGGER, AWS_REGION, - S3_ENDPOINT_URL, - LAMBDA_ENDPOINT_URL, DYNAMODB_ENDPOINT_URL, + LAMBDA_ENDPOINT_URL, + LOGGER, + S3_ENDPOINT_URL, ) @@ -40,6 +41,7 @@ def client(): lambda_client = client_constructor("lambda", LAMBDA_ENDPOINT_URL) dynamodb_client = client_constructor("dynamodb", DYNAMODB_ENDPOINT_URL) dynamodb_resource = client_constructor("dynamodb", DYNAMODB_ENDPOINT_URL, "resource") +sfn_client = client_constructor("stepfunctions", LAMBDA_ENDPOINT_URL) def invoke_lambda(payload: Dict[str, Any], lambda_name: str, client) -> None: diff --git a/raster_analysis/grid.py b/raster_analysis/grid.py index 453a219..eba07f8 100644 --- a/raster_analysis/grid.py +++ b/raster_analysis/grid.py @@ -33,7 +33,7 @@ def get_grid(cls, name: GridName): degrees_str, pixels_str = name.split("/") degrees = int(degrees_str) pixels = int(pixels_str) - tile_degrees = degrees * (5000 / pixels) + tile_degrees = degrees * (2000 / pixels) return cls(degrees, pixels, tile_degrees) def get_pixel_width(self) -> float: diff --git a/raster_analysis/tiling.py b/raster_analysis/tiling.py index 2ce704f..acfc73c 100644 --- a/raster_analysis/tiling.py +++ b/raster_analysis/tiling.py @@ -1,6 +1,5 @@ import json import sys -from copy import deepcopy from datetime import datetime from io import StringIO from typing import Any, Dict, List, Tuple @@ -8,15 +7,12 @@ from pandas import DataFrame from shapely.geometry import Polygon, box, mapping, shape -from raster_analysis.boto import invoke_lambda, lambda_client +from raster_analysis.boto import sfn_client from raster_analysis.data_environment import DataEnvironment from raster_analysis.geometry import encode_geometry from raster_analysis.globals import ( - FANOUT_LAMBDA_NAME, - FANOUT_NUM, LAMBDA_ASYNC_PAYLOAD_LIMIT_BYTES, LOGGER, - RASTER_ANALYSIS_LAMBDA_NAME, BasePolygon, Numeric, ) @@ -157,31 +153,49 @@ def _execute_tiles(self) -> DataFrame: LOGGER.info(f"Processing {geom_count} tiles") - if geom_count <= FANOUT_NUM: - for tile in tiles_for_lambda: - tile_payload = deepcopy(payload) - tile_id = results_store.get_cache_key(tile, self.geom, self.raw_query) - tile_payload["cache_id"] = tile_id - tile_payload["tile"] = mapping(tile) - invoke_lambda( - tile_payload, RASTER_ANALYSIS_LAMBDA_NAME, lambda_client() - ) - else: - tile_geojsons = [ - ( - results_store.get_cache_key(tile, self.geom, self.raw_query), - mapping(tile), - ) - for tile in tiles - ] - tile_chunks = [ - tile_geojsons[x : x + FANOUT_NUM] - for x in range(0, len(tile_geojsons), FANOUT_NUM) - ] - - for chunk in tile_chunks: - event = {"payload": payload, "tiles": chunk} - invoke_lambda(event, FANOUT_LAMBDA_NAME, lambda_client()) + sfn_client().start_execution( + stateMachineArn="arn:aws:states:us-east-1:274931322839:stateMachine:raster-analysis-distributed", + input=json.dumps( + { + "payload": payload, + "tiles": [ + { + "cache_id": results_store.get_cache_key( + tile, self.geom, self.raw_query + ), + "tile": mapping(tile), + } + for tile in tiles_for_lambda + ], + } + ), + ) + + # if geom_count <= FANOUT_NUM: + # for tile in tiles_for_lambda: + # tile_payload = deepcopy(payload) + # tile_id = results_store.get_cache_key(tile, self.geom, self.raw_query) + # tile_payload["cache_id"] = tile_id + # tile_payload["tile"] = mapping(tile) + # invoke_lambda( + # tile_payload, RASTER_ANALYSIS_LAMBDA_NAME, lambda_client() + # ) + # else: + # tile_geojsons = [ + # ( + # results_store.get_cache_key(tile, self.geom, self.raw_query), + # mapping(tile), + # ) + # for tile in tiles + # ] + # tile_chunks = [ + # tile_geojsons[x : x + FANOUT_NUM] + # for x in range(0, len(tile_geojsons), FANOUT_NUM) + # ] + # + # for chunk in tile_chunks: + # event = {"payload": payload, "tiles": chunk} + # invoke_lambda(event, FANOUT_LAMBDA_NAME, lambda_client()) LOGGER.info( f"Geom count: going to lambda: {geom_count}, fetched from catch: {len(tile_keys) - geom_count}" diff --git a/terraform/policies/raster_analysis.json b/terraform/policies/raster_analysis.json index c74e4da..6b6a077 100644 --- a/terraform/policies/raster_analysis.json +++ b/terraform/policies/raster_analysis.json @@ -33,7 +33,8 @@ "xray:PutTelemetryRecords", "xray:GetSamplingRules", "xray:GetSamplingTargets", - "xray:GetSamplingStatisticSummaries" + "xray:GetSamplingStatisticSummaries", + "states:*" ], "Resource": "*" } diff --git a/terraform/variables.tf b/terraform/variables.tf index fc19f0f..9ed60dd 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -23,7 +23,7 @@ variable "lambda_fanout_runtime" { variable "lambda_raster_analysis_memory_size" { type = number - default = 3008 + default = 1024 description = "Memory size version for AWS Lambda" }