Skip to content

Commit

Permalink
Add the basic scaffolding for running the queries from the manifest
Browse files Browse the repository at this point in the history
This was ported over from a working binary target and is being reworked
into the lambda runtime which does introduce some complexities
  • Loading branch information
rtyler committed Jan 22, 2024
1 parent dae60d4 commit 16a7d24
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 8 deletions.
5 changes: 5 additions & 0 deletions lambdas/query-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ edition = "2021"
# and it will keep the alphabetic ordering for you.

[dependencies]
anyhow = "1.0.79"
aws_lambda_events = { version = "0.12.0", default-features = false, features = ["eventbridge"] }
deltalake = { version = "0.16.5", features = ["datafusion", "s3"] }

lambda_runtime = "0.8.3"
serde = { version = "1.0.195", features = ["derive"] }
serde_yaml = "0.9.30"
tokio = { version = "1", features = ["macros"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
url = { version = "2.5.0", features = ["serde"] }

17 changes: 17 additions & 0 deletions lambdas/query-metrics/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
# This is an example manifest fiule which configures the lambda
gauges:
# Each gauge should have a distinct name for managing inside of the lambda
important_metric:
# Then define a metric name to export to cloudwatch
metric: 'last_10_uniq'
url: 's3://example-bucket/databases/ds-partitioned-delta-table/'
# Currently only a query handler type of `count` is supported
type: count
# The example Datafusion SQL query below queries the source table, which is defined by
# the URL above, to find all the distinct uuids in the last 10 minutes of the current
# `ds` partition.
query: |
SELECT DISTINCT uuid AS total FROM source
WHERE ds = ARROW_CAST(CURRENT_DATE() , 'Utf8')
AND created_at >= (ARROW_CAST(ARROW_CAST(NOW(), 'Timestamp(Second, None)'), 'Int64') - (60 * 10))
42 changes: 42 additions & 0 deletions lambdas/query-metrics/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use serde::Deserialize;
use std::collections::HashMap;
use std::convert::AsRef;
use std::fs::File;
use std::path::Path;

use url::Url;
#[derive(Debug, Deserialize)]
pub struct Configuration {
pub gauges: HashMap<String, Gauge>,
}

impl Configuration {
pub fn from_file<S: Into<String> + AsRef<Path>>(location: S) -> Self {
serde_yaml::from_reader(File::open(location).expect("Failed to open manifest"))
.expect("Failed to deserialize")
}
}

#[derive(Debug, Deserialize)]
pub struct Gauge {
pub url: Url,
#[serde(rename = "type")]
pub measurement_type: Measurement,
pub query: String,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Measurement {
Count,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_config_deser() {
let _conf: Configuration = Configuration::from_file("manifest.yml");
}
}
49 changes: 41 additions & 8 deletions lambdas/query-metrics/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,45 @@
use aws_lambda_events::event::eventbridge::EventBridgeEvent;use lambda_runtime::{run, service_fn, Error, LambdaEvent};
///
/// This lambda function will run configured datafusion queries and report results CloudWatch
/// metrics.
///
use aws_lambda_events::event::eventbridge::EventBridgeEvent;
use deltalake::datafusion::common::*;
use deltalake::datafusion::execution::context::SessionContext;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use tracing::log::*;

use std::sync::Arc;

/// This is the main body for the function.
/// Write your code inside it.
/// There are some code example in the following URLs:
/// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples
/// - https://github.com/aws-samples/serverless-rust-demo/
async fn function_handler(event: LambdaEvent<EventBridgeEvent>) -> Result<(), Error> {
// Extract some useful information from the request
mod config;

async fn function_handler(_event: LambdaEvent<EventBridgeEvent>) -> Result<(), Error> {
let conf: config::Configuration = config::Configuration::from_file("manifest.yml");
debug!("Configuration loaded: {conf:?}");

for (name, gauge) in conf.gauges.iter() {
debug!("Querying the {name} table");
let ctx = SessionContext::new();
let table = deltalake::open_table(&gauge.url)
.await
.expect("Failed to register table");
ctx.register_table("source", Arc::new(table))
.expect("Failed to register table with datafusion");

debug!("Running query: {}", gauge.query);

let df = ctx
.sql(&gauge.query)
.await
.expect("Failed to execute query");

match gauge.measurement_type {
config::Measurement::Count => {
let count = df.count().await.expect("Failed to collect batches");

debug!("Found {count} distinct records");
}
}
}

Ok(())
}
Expand All @@ -22,5 +54,6 @@ async fn main() -> Result<(), Error> {
.without_time()
.init();

info!("Starting the query-metrics lambda");
run(service_fn(function_handler)).await
}

0 comments on commit 16a7d24

Please sign in to comment.