diff --git a/lambdas/query-metrics/manifest.yml b/lambdas/query-metrics/manifest.yml index e077ce9..0dc6db8 100644 --- a/lambdas/query-metrics/manifest.yml +++ b/lambdas/query-metrics/manifest.yml @@ -3,15 +3,14 @@ gauges: # Each gauge should have a distinct name for managing inside of the lambda important_metric: - # Then define a metric name to export to cloudwatch - metric: 'last_10_uniq' - url: 's3://example-bucket/databases/ds-partitioned-delta-table/' - # Currently only a query handler type of `count` is supported - type: count - # The example Datafusion SQL query below queries the source table, which is defined by - # the URL above, to find all the distinct uuids in the last 10 minutes of the current - # `ds` partition. - query: | - SELECT DISTINCT uuid AS total FROM source + - metric: 'last_10_uniq' # Then define a metric name to export to cloudwatch + url: 's3://example-bucket/databases/ds-partitioned-delta-table/' + # Currently only a query handler type of `count` is supported + type: count + # The example Datafusion SQL query below queries the source table, which is defined by + # the URL above, to find all the distinct uuids in the last 10 minutes of the current + # `ds` partition. + query: | + SELECT DISTINCT uuid AS total FROM source WHERE ds = ARROW_CAST(CURRENT_DATE() , 'Utf8') AND created_at >= (ARROW_CAST(ARROW_CAST(NOW(), 'Timestamp(Second, None)'), 'Int64') - (60 * 10)) diff --git a/lambdas/query-metrics/src/config.rs b/lambdas/query-metrics/src/config.rs index ccace4a..ac7ea76 100644 --- a/lambdas/query-metrics/src/config.rs +++ b/lambdas/query-metrics/src/config.rs @@ -9,7 +9,7 @@ use url::Url; #[derive(Debug, Deserialize)] pub struct Configuration { - pub gauges: HashMap, + pub gauges: HashMap>, } impl Configuration { @@ -51,10 +51,15 @@ mod tests { } #[test] - fn test_config_b64() { - let b = b"LS0tCiMgVGhpcyBpcyBhbiBleGFtcGxlIG1hbmlmZXN0IGZpdWxlIHdoaWNoIGNvbmZpZ3VyZXMgdGhlIGxhbWJkYQpnYXVnZXM6CiAgIyBFYWNoIGdhdWdlIHNob3VsZCBoYXZlIGEgZGlzdGluY3QgbmFtZSBmb3IgbWFuYWdpbmcgaW5zaWRlIG9mIHRoZSBsYW1iZGEKICBpbXBvcnRhbnRfbWV0cmljOgogICAgIyBUaGVuIGRlZmluZSBhIG1ldHJpYyBuYW1lIHRvIGV4cG9ydCB0byBjbG91ZHdhdGNoCiAgICBtZXRyaWM6ICdsYXN0XzEwX3VuaXEnCiAgICB1cmw6ICdzMzovL2V4YW1wbGUtYnVja2V0L2RhdGFiYXNlcy9kcy1wYXJ0aXRpb25lZC1kZWx0YS10YWJsZS8nCiAgICAjIEN1cnJlbnRseSBvbmx5IGEgcXVlcnkgaGFuZGxlciB0eXBlIG9mIGBjb3VudGAgaXMgc3VwcG9ydGVkCiAgICB0eXBlOiBjb3VudAogICAgIyBUaGUgZXhhbXBsZSBEYXRhZnVzaW9uIFNRTCBxdWVyeSBiZWxvdyBxdWVyaWVzIHRoZSBzb3VyY2UgdGFibGUsIHdoaWNoIGlzIGRlZmluZWQgYnkKICAgICMgdGhlIFVSTCBhYm92ZSwgdG8gZmluZCBhbGwgdGhlIGRpc3RpbmN0IHV1aWRzIGluIHRoZSBsYXN0IDEwIG1pbnV0ZXMgb2YgdGhlIGN1cnJlbnQKICAgICMgYGRzYCBwYXJ0aXRpb24uCiAgICBxdWVyeTogfAogICAgICBTRUxFQ1QgRElTVElOQ1QgdXVpZCBBUyB0b3RhbCBGUk9NIHNvdXJjZQogICAgICAgIFdIRVJFIGRzID0gQVJST1dfQ0FTVChDVVJSRU5UX0RBVEUoKSAsICdVdGY4JykKICAgICAgICBBTkQgY3JlYXRlZF9hdCA+PSAoQVJST1dfQ0FTVChBUlJPV19DQVNUKE5PVygpLCAnVGltZXN0YW1wKFNlY29uZCwgTm9uZSknKSwgJ0ludDY0JykgLSAoNjAgKiAxMCkpCg=="; + fn test_config_b64() -> anyhow::Result<()> { + use std::io::Read; + let mut manifest = File::open("manifest.yml")?; + let mut buf = vec![]; + let count = manifest.read_to_end(&mut buf)?; + let b = BASE64_STANDARD.encode(buf); let conf: Configuration = Configuration::from_base64(&b).expect("Failed to deserialize"); assert_eq!(conf.gauges.len(), 1); + Ok(()) } } diff --git a/lambdas/query-metrics/src/main.rs b/lambdas/query-metrics/src/main.rs index 865708a..5b07fe8 100644 --- a/lambdas/query-metrics/src/main.rs +++ b/lambdas/query-metrics/src/main.rs @@ -27,40 +27,43 @@ async fn function_handler(_event: LambdaEvent) -> Result<(), Er .expect("The `MANIFEST_B64` environment variable does not contain a valid manifest yml"); debug!("Configuration loaded: {conf:?}"); - for (name, gauge) in conf.gauges.iter() { - debug!("Querying the {name} table"); - let ctx = SessionContext::new(); - let table = deltalake::open_table(&gauge.url) - .await - .expect("Failed to register table"); - ctx.register_table("source", Arc::new(table)) - .expect("Failed to register table with datafusion"); + for (name, gauges) in conf.gauges.iter() { + for gauge in gauges.iter() { + debug!("Querying the {name} table"); + let ctx = SessionContext::new(); + let table = deltalake::open_table(&gauge.url) + .await + .expect("Failed to register table"); + ctx.register_table("source", Arc::new(table)) + .expect("Failed to register table with datafusion"); - debug!("Running query: {}", gauge.query); + debug!("Running query: {}", gauge.query); - let df = ctx - .sql(&gauge.query) - .await - .expect("Failed to execute query"); + let df = ctx + .sql(&gauge.query) + .await + .expect("Failed to execute query"); - match gauge.measurement_type { - config::Measurement::Count => { - let count = df.count().await.expect("Failed to collect batches"); - debug!("Found {count} distinct records"); + match gauge.measurement_type { + config::Measurement::Count => { + let count = df.count().await.expect("Failed to collect batches"); + debug!("Found {count} distinct records"); - let datum = MetricDatum::builder() - .metric_name(&gauge.name) - .timestamp(DateTime::from(SystemTime::now())) - .value(count as f64) - .unit(StandardUnit::Count) - .build(); - let res = cloudwatch - .put_metric_data() - .namespace("DataLake") - .metric_data(datum) - .send() - .await?; - debug!("Result of CloudWatch send: {res:?}"); + let datum = MetricDatum::builder() + .metric_name(&gauge.name) + .timestamp(DateTime::from(SystemTime::now())) + .value(count as f64) + .unit(StandardUnit::Count) + .build(); + + let res = cloudwatch + .put_metric_data() + .namespace(format!("DataLake/{name}")) + .metric_data(datum) + .send() + .await?; + debug!("Result of CloudWatch send: {res:?}"); + } } } }