Skip to content

Commit

Permalink
Add preliminary cloudwatch reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Jan 22, 2024
1 parent 16a7d24 commit d6911bb
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
6 changes: 5 additions & 1 deletion lambdas/query-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@ edition = "2021"

[dependencies]
anyhow = "1.0.79"
aws-config = "1.1.2"
aws-sdk-cloudwatch = "1.11.0"
aws-sdk-config = "1.11.0"
aws_lambda_events = { version = "0.12.0", default-features = false, features = ["eventbridge"] }
base64 = "0.21.7"
deltalake = { version = "0.16.5", features = ["datafusion", "s3"] }

lambda_runtime = "0.8.3"
serde = { version = "1.0.195", features = ["derive"] }
serde_yaml = "0.9.30"
tokio = { version = "1", features = ["macros"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "env-filter"] }
url = { version = "2.5.0", features = ["serde"] }

19 changes: 18 additions & 1 deletion lambdas/query-metrics/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use serde::Deserialize;
use std::collections::HashMap;
use std::convert::AsRef;
use std::fs::File;
use std::path::Path;

use base64::prelude::*;
use serde::Deserialize;
use url::Url;

#[derive(Debug, Deserialize)]
pub struct Configuration {
pub gauges: HashMap<String, Gauge>,
Expand All @@ -15,11 +17,18 @@ impl Configuration {
serde_yaml::from_reader(File::open(location).expect("Failed to open manifest"))
.expect("Failed to deserialize")
}

pub fn from_base64<S: AsRef<[u8]>>(buffer: S) -> Result<Self, anyhow::Error> {
let b64 = BASE64_STANDARD.decode(buffer)?;
Ok(serde_yaml::from_slice(&b64)?)
}
}

#[derive(Debug, Deserialize)]
pub struct Gauge {
pub url: Url,
#[serde(rename = "metric")]
pub name: String,
#[serde(rename = "type")]
pub measurement_type: Measurement,
pub query: String,
Expand All @@ -39,4 +48,12 @@ mod tests {
fn test_config_deser() {
let _conf: Configuration = Configuration::from_file("manifest.yml");
}

#[test]
fn test_config_b64() {
let b = b"LS0tCiMgVGhpcyBpcyBhbiBleGFtcGxlIG1hbmlmZXN0IGZpdWxlIHdoaWNoIGNvbmZpZ3VyZXMgdGhlIGxhbWJkYQpnYXVnZXM6CiAgIyBFYWNoIGdhdWdlIHNob3VsZCBoYXZlIGEgZGlzdGluY3QgbmFtZSBmb3IgbWFuYWdpbmcgaW5zaWRlIG9mIHRoZSBsYW1iZGEKICBpbXBvcnRhbnRfbWV0cmljOgogICAgIyBUaGVuIGRlZmluZSBhIG1ldHJpYyBuYW1lIHRvIGV4cG9ydCB0byBjbG91ZHdhdGNoCiAgICBtZXRyaWM6ICdsYXN0XzEwX3VuaXEnCiAgICB1cmw6ICdzMzovL2V4YW1wbGUtYnVja2V0L2RhdGFiYXNlcy9kcy1wYXJ0aXRpb25lZC1kZWx0YS10YWJsZS8nCiAgICAjIEN1cnJlbnRseSBvbmx5IGEgcXVlcnkgaGFuZGxlciB0eXBlIG9mIGBjb3VudGAgaXMgc3VwcG9ydGVkCiAgICB0eXBlOiBjb3VudAogICAgIyBUaGUgZXhhbXBsZSBEYXRhZnVzaW9uIFNRTCBxdWVyeSBiZWxvdyBxdWVyaWVzIHRoZSBzb3VyY2UgdGFibGUsIHdoaWNoIGlzIGRlZmluZWQgYnkKICAgICMgdGhlIFVSTCBhYm92ZSwgdG8gZmluZCBhbGwgdGhlIGRpc3RpbmN0IHV1aWRzIGluIHRoZSBsYXN0IDEwIG1pbnV0ZXMgb2YgdGhlIGN1cnJlbnQKICAgICMgYGRzYCBwYXJ0aXRpb24uCiAgICBxdWVyeTogfAogICAgICBTRUxFQ1QgRElTVElOQ1QgdXVpZCBBUyB0b3RhbCBGUk9NIHNvdXJjZQogICAgICAgIFdIRVJFIGRzID0gQVJST1dfQ0FTVChDVVJSRU5UX0RBVEUoKSAsICdVdGY4JykKICAgICAgICBBTkQgY3JlYXRlZF9hdCA+PSAoQVJST1dfQ0FTVChBUlJPV19DQVNUKE5PVygpLCAnVGltZXN0YW1wKFNlY29uZCwgTm9uZSknKSwgJ0ludDY0JykgLSAoNjAgKiAxMCkpCg==";
let conf: Configuration = Configuration::from_base64(&b).expect("Failed to deserialize");

assert_eq!(conf.gauges.len(), 1);
}
}
23 changes: 20 additions & 3 deletions lambdas/query-metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/// metrics.
///
use aws_lambda_events::event::eventbridge::EventBridgeEvent;
use aws_sdk_cloudwatch::types::MetricDatum;
use deltalake::datafusion::common::*;
use deltalake::datafusion::execution::context::SessionContext;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
Expand All @@ -13,7 +14,13 @@ use std::sync::Arc;
mod config;

async fn function_handler(_event: LambdaEvent<EventBridgeEvent>) -> Result<(), Error> {
let conf: config::Configuration = config::Configuration::from_file("manifest.yml");
let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let cloudwatch = aws_sdk_cloudwatch::Client::new(&aws_config);

let conf = config::Configuration::from_base64(
std::env::var("MANIFEST_B64").expect("The `MANIFEST_B64` variable was not defined"),
)
.expect("The `MANIFEST_B64` environment variable does not contain a valid manifest yml");
debug!("Configuration loaded: {conf:?}");

for (name, gauge) in conf.gauges.iter() {
Expand All @@ -35,8 +42,18 @@ async fn function_handler(_event: LambdaEvent<EventBridgeEvent>) -> Result<(), E
match gauge.measurement_type {
config::Measurement::Count => {
let count = df.count().await.expect("Failed to collect batches");

debug!("Found {count} distinct records");

let datum = MetricDatum::builder()
.metric_name(&gauge.name)
.value(count as f64)
.build();
let res = cloudwatch
.put_metric_data()
.metric_data(datum)
.send()
.await?;
debug!("Result of CloudWatch send: {res:?}");
}
}
}
Expand All @@ -47,7 +64,7 @@ async fn function_handler(_event: LambdaEvent<EventBridgeEvent>) -> Result<(), E
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
Expand Down

0 comments on commit d6911bb

Please sign in to comment.