Skip to content

Commit

Permalink
Test out using distributed map for orchestrating lambdas
Browse files Browse the repository at this point in the history
  • Loading branch information
jterry64 committed Jan 5, 2024
1 parent 1e624d7 commit 4974e8f
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 62 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/terraform_build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Test with pytest
env:
ENV: test
AWS_ACCESS_KEY_ID: ${{ secrets.aws_key_dev }}
AWS_ACCESS_KEY_ID: ${{ secrets.a ws_key_dev }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.aws_secret_dev }}
AWS_REGION: ${{ secrets.aws_region_dev }}
AWS_XRAY_SDK_ENABLED: false
Expand Down
20 changes: 0 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,23 +275,3 @@ The GFW data lake is now in production, so this service will soon point to that
* wdpa_protected_areas
* whrc_aboveground_biomass_stock_2000
* wwf_eco_regions

### VIIRS/MODIS Alerts

These alerts are currently unsupported because we don't rasterize these layers. Instead, we store all enriched points in an document dataset. You can do on-the-fly analysis for these via SQL. (TBD: do we want to just forward that through here so there's only one endpoint?)

### Aggregation

There will be a new `String` parameter called `agg` that will accept one of `day | week | month | year` and return results aggregated by that timeline.

### Whitelist

There will be a new endpoint that will return a whitelist of whether layers intersect the input geometry. Details TBD.

### Misc Layers

Need to decide if/how we will support miscellaneous layers on GFW but not maintained in the data lake, like PRODES and Terra-i. TBD.

### Lat/Lon Coordinates

A new analysis will be added to retrieve lat/lon coordinates of points (e.g. for GLAD alerts).
8 changes: 5 additions & 3 deletions lambdas/raster_analysis/src/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def handler(event, context):
try:
LOGGER.info(f"Running analysis with parameters: {event}")
results_store = AnalysisResultsStore()
tile = event["tile"]
event = event["payload"]

if "geometry" in event:
source_geom = event["geometry"]
Expand All @@ -27,7 +29,7 @@ def handler(event, context):
else:
raise KeyError("No valid geometry field")

tile_geojson = event.get("tile", None)
tile_geojson = tile.get("tile", None)
geom_tile = GeometryTile(source_geom, tile_geojson, is_encoded)

if not geom_tile.geom:
Expand All @@ -44,10 +46,10 @@ def handler(event, context):
results: DataFrame = query_executor.execute()

LOGGER.debug(f"Ran analysis with results: {results.head(100)}")
results_store.save_result(results, event["cache_id"])
results_store.save_result(results, tile["cache_id"])
except Exception as e:
LOGGER.exception(e)

results_store = AnalysisResultsStore()
results_store.save_status(event["cache_id"], ResultStatus.error, 0, str(e))
results_store.save_status(tile["cache_id"], ResultStatus.error, 0, str(e))
raise e
12 changes: 7 additions & 5 deletions raster_analysis/boto.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import boto3
import json
from typing import Callable, Dict, Any
from typing import Any, Callable, Dict

import boto3

from raster_analysis.globals import (
LOGGER,
AWS_REGION,
S3_ENDPOINT_URL,
LAMBDA_ENDPOINT_URL,
DYNAMODB_ENDPOINT_URL,
LAMBDA_ENDPOINT_URL,
LOGGER,
S3_ENDPOINT_URL,
)


Expand Down Expand Up @@ -40,6 +41,7 @@ def client():
lambda_client = client_constructor("lambda", LAMBDA_ENDPOINT_URL)
dynamodb_client = client_constructor("dynamodb", DYNAMODB_ENDPOINT_URL)
dynamodb_resource = client_constructor("dynamodb", DYNAMODB_ENDPOINT_URL, "resource")
sfn_client = client_constructor("stepfunctions", LAMBDA_ENDPOINT_URL)


def invoke_lambda(payload: Dict[str, Any], lambda_name: str, client) -> None:
Expand Down
2 changes: 1 addition & 1 deletion raster_analysis/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_grid(cls, name: GridName):
degrees_str, pixels_str = name.split("/")
degrees = int(degrees_str)
pixels = int(pixels_str)
tile_degrees = degrees * (5000 / pixels)
tile_degrees = degrees * (2000 / pixels)
return cls(degrees, pixels, tile_degrees)

def get_pixel_width(self) -> float:
Expand Down
74 changes: 44 additions & 30 deletions raster_analysis/tiling.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
import json
import sys
from copy import deepcopy
from datetime import datetime
from io import StringIO
from typing import Any, Dict, List, Tuple

from pandas import DataFrame
from shapely.geometry import Polygon, box, mapping, shape

from raster_analysis.boto import invoke_lambda, lambda_client
from raster_analysis.boto import sfn_client
from raster_analysis.data_environment import DataEnvironment
from raster_analysis.geometry import encode_geometry
from raster_analysis.globals import (
FANOUT_LAMBDA_NAME,
FANOUT_NUM,
LAMBDA_ASYNC_PAYLOAD_LIMIT_BYTES,
LOGGER,
RASTER_ANALYSIS_LAMBDA_NAME,
BasePolygon,
Numeric,
)
Expand Down Expand Up @@ -157,31 +153,49 @@ def _execute_tiles(self) -> DataFrame:

LOGGER.info(f"Processing {geom_count} tiles")

if geom_count <= FANOUT_NUM:
for tile in tiles_for_lambda:
tile_payload = deepcopy(payload)
tile_id = results_store.get_cache_key(tile, self.geom, self.raw_query)
tile_payload["cache_id"] = tile_id
tile_payload["tile"] = mapping(tile)
invoke_lambda(
tile_payload, RASTER_ANALYSIS_LAMBDA_NAME, lambda_client()
)
else:
tile_geojsons = [
(
results_store.get_cache_key(tile, self.geom, self.raw_query),
mapping(tile),
)
for tile in tiles
]
tile_chunks = [
tile_geojsons[x : x + FANOUT_NUM]
for x in range(0, len(tile_geojsons), FANOUT_NUM)
]

for chunk in tile_chunks:
event = {"payload": payload, "tiles": chunk}
invoke_lambda(event, FANOUT_LAMBDA_NAME, lambda_client())
sfn_client().start_execution(
stateMachineArn="arn:aws:states:us-east-1:274931322839:stateMachine:raster-analysis-distributed",
input=json.dumps(
{
"payload": payload,
"tiles": [
{
"cache_id": results_store.get_cache_key(
tile, self.geom, self.raw_query
),
"tile": mapping(tile),
}
for tile in tiles_for_lambda
],
}
),
)

# if geom_count <= FANOUT_NUM:
# for tile in tiles_for_lambda:
# tile_payload = deepcopy(payload)
# tile_id = results_store.get_cache_key(tile, self.geom, self.raw_query)
# tile_payload["cache_id"] = tile_id
# tile_payload["tile"] = mapping(tile)
# invoke_lambda(
# tile_payload, RASTER_ANALYSIS_LAMBDA_NAME, lambda_client()
# )
# else:
# tile_geojsons = [
# (
# results_store.get_cache_key(tile, self.geom, self.raw_query),
# mapping(tile),
# )
# for tile in tiles
# ]
# tile_chunks = [
# tile_geojsons[x : x + FANOUT_NUM]
# for x in range(0, len(tile_geojsons), FANOUT_NUM)
# ]
#
# for chunk in tile_chunks:
# event = {"payload": payload, "tiles": chunk}
# invoke_lambda(event, FANOUT_LAMBDA_NAME, lambda_client())

LOGGER.info(
f"Geom count: going to lambda: {geom_count}, fetched from catch: {len(tile_keys) - geom_count}"
Expand Down
3 changes: 2 additions & 1 deletion terraform/policies/raster_analysis.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
"xray:PutTelemetryRecords",
"xray:GetSamplingRules",
"xray:GetSamplingTargets",
"xray:GetSamplingStatisticSummaries"
"xray:GetSamplingStatisticSummaries",
"states:*"
],
"Resource": "*"
}
Expand Down
2 changes: 1 addition & 1 deletion terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ variable "lambda_fanout_runtime" {

variable "lambda_raster_analysis_memory_size" {
type = number
default = 3008
default = 1024
description = "Memory size version for AWS Lambda"
}

Expand Down

0 comments on commit 4974e8f

Please sign in to comment.