From d6911bbed03e0021ef415af6d3177910933b3a22 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 22 Jan 2024 18:58:00 +0000 Subject: [PATCH] Add preliminary cloudwatch reporting --- lambdas/query-metrics/Cargo.toml | 6 +++++- lambdas/query-metrics/src/config.rs | 19 ++++++++++++++++++- lambdas/query-metrics/src/main.rs | 23 ++++++++++++++++++++--- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/lambdas/query-metrics/Cargo.toml b/lambdas/query-metrics/Cargo.toml index 09b26b6..5abcfa0 100644 --- a/lambdas/query-metrics/Cargo.toml +++ b/lambdas/query-metrics/Cargo.toml @@ -16,7 +16,11 @@ edition = "2021" [dependencies] anyhow = "1.0.79" +aws-config = "1.1.2" +aws-sdk-cloudwatch = "1.11.0" +aws-sdk-config = "1.11.0" aws_lambda_events = { version = "0.12.0", default-features = false, features = ["eventbridge"] } +base64 = "0.21.7" deltalake = { version = "0.16.5", features = ["datafusion", "s3"] } lambda_runtime = "0.8.3" @@ -24,6 +28,6 @@ serde = { version = "1.0.195", features = ["derive"] } serde_yaml = "0.9.30" tokio = { version = "1", features = ["macros"] } tracing = { version = "0.1", features = ["log"] } -tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "env-filter"] } url = { version = "2.5.0", features = ["serde"] } diff --git a/lambdas/query-metrics/src/config.rs b/lambdas/query-metrics/src/config.rs index 454eade..4158ace 100644 --- a/lambdas/query-metrics/src/config.rs +++ b/lambdas/query-metrics/src/config.rs @@ -1,10 +1,12 @@ -use serde::Deserialize; use std::collections::HashMap; use std::convert::AsRef; use std::fs::File; use std::path::Path; +use base64::prelude::*; +use serde::Deserialize; use url::Url; + #[derive(Debug, Deserialize)] pub struct Configuration { pub gauges: HashMap, @@ -15,11 +17,18 @@ impl Configuration { serde_yaml::from_reader(File::open(location).expect("Failed to open manifest")) .expect("Failed to deserialize") } + + pub fn from_base64>(buffer: S) -> Result { + let b64 = BASE64_STANDARD.decode(buffer)?; + Ok(serde_yaml::from_slice(&b64)?) + } } #[derive(Debug, Deserialize)] pub struct Gauge { pub url: Url, + #[serde(rename = "metric")] + pub name: String, #[serde(rename = "type")] pub measurement_type: Measurement, pub query: String, @@ -39,4 +48,12 @@ mod tests { fn test_config_deser() { let _conf: Configuration = Configuration::from_file("manifest.yml"); } + + #[test] + fn test_config_b64() { + let b = b"LS0tCiMgVGhpcyBpcyBhbiBleGFtcGxlIG1hbmlmZXN0IGZpdWxlIHdoaWNoIGNvbmZpZ3VyZXMgdGhlIGxhbWJkYQpnYXVnZXM6CiAgIyBFYWNoIGdhdWdlIHNob3VsZCBoYXZlIGEgZGlzdGluY3QgbmFtZSBmb3IgbWFuYWdpbmcgaW5zaWRlIG9mIHRoZSBsYW1iZGEKICBpbXBvcnRhbnRfbWV0cmljOgogICAgIyBUaGVuIGRlZmluZSBhIG1ldHJpYyBuYW1lIHRvIGV4cG9ydCB0byBjbG91ZHdhdGNoCiAgICBtZXRyaWM6ICdsYXN0XzEwX3VuaXEnCiAgICB1cmw6ICdzMzovL2V4YW1wbGUtYnVja2V0L2RhdGFiYXNlcy9kcy1wYXJ0aXRpb25lZC1kZWx0YS10YWJsZS8nCiAgICAjIEN1cnJlbnRseSBvbmx5IGEgcXVlcnkgaGFuZGxlciB0eXBlIG9mIGBjb3VudGAgaXMgc3VwcG9ydGVkCiAgICB0eXBlOiBjb3VudAogICAgIyBUaGUgZXhhbXBsZSBEYXRhZnVzaW9uIFNRTCBxdWVyeSBiZWxvdyBxdWVyaWVzIHRoZSBzb3VyY2UgdGFibGUsIHdoaWNoIGlzIGRlZmluZWQgYnkKICAgICMgdGhlIFVSTCBhYm92ZSwgdG8gZmluZCBhbGwgdGhlIGRpc3RpbmN0IHV1aWRzIGluIHRoZSBsYXN0IDEwIG1pbnV0ZXMgb2YgdGhlIGN1cnJlbnQKICAgICMgYGRzYCBwYXJ0aXRpb24uCiAgICBxdWVyeTogfAogICAgICBTRUxFQ1QgRElTVElOQ1QgdXVpZCBBUyB0b3RhbCBGUk9NIHNvdXJjZQogICAgICAgIFdIRVJFIGRzID0gQVJST1dfQ0FTVChDVVJSRU5UX0RBVEUoKSAsICdVdGY4JykKICAgICAgICBBTkQgY3JlYXRlZF9hdCA+PSAoQVJST1dfQ0FTVChBUlJPV19DQVNUKE5PVygpLCAnVGltZXN0YW1wKFNlY29uZCwgTm9uZSknKSwgJ0ludDY0JykgLSAoNjAgKiAxMCkpCg=="; + let conf: Configuration = Configuration::from_base64(&b).expect("Failed to deserialize"); + + assert_eq!(conf.gauges.len(), 1); + } } diff --git a/lambdas/query-metrics/src/main.rs b/lambdas/query-metrics/src/main.rs index 703ee24..a5f2232 100644 --- a/lambdas/query-metrics/src/main.rs +++ b/lambdas/query-metrics/src/main.rs @@ -3,6 +3,7 @@ /// metrics. /// use aws_lambda_events::event::eventbridge::EventBridgeEvent; +use aws_sdk_cloudwatch::types::MetricDatum; use deltalake::datafusion::common::*; use deltalake::datafusion::execution::context::SessionContext; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; @@ -13,7 +14,13 @@ use std::sync::Arc; mod config; async fn function_handler(_event: LambdaEvent) -> Result<(), Error> { - let conf: config::Configuration = config::Configuration::from_file("manifest.yml"); + let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; + let cloudwatch = aws_sdk_cloudwatch::Client::new(&aws_config); + + let conf = config::Configuration::from_base64( + std::env::var("MANIFEST_B64").expect("The `MANIFEST_B64` variable was not defined"), + ) + .expect("The `MANIFEST_B64` environment variable does not contain a valid manifest yml"); debug!("Configuration loaded: {conf:?}"); for (name, gauge) in conf.gauges.iter() { @@ -35,8 +42,18 @@ async fn function_handler(_event: LambdaEvent) -> Result<(), E match gauge.measurement_type { config::Measurement::Count => { let count = df.count().await.expect("Failed to collect batches"); - debug!("Found {count} distinct records"); + + let datum = MetricDatum::builder() + .metric_name(&gauge.name) + .value(count as f64) + .build(); + let res = cloudwatch + .put_metric_data() + .metric_data(datum) + .send() + .await?; + debug!("Result of CloudWatch send: {res:?}"); } } } @@ -47,7 +64,7 @@ async fn function_handler(_event: LambdaEvent) -> Result<(), E #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time.