Skip to content

Commit

Permalink
Namespace the metrics a little more to allow for nesting more queries
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Jan 22, 2024
1 parent 9069294 commit 775d585
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 43 deletions.
19 changes: 9 additions & 10 deletions lambdas/query-metrics/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
gauges:
# Each gauge should have a distinct name for managing inside of the lambda
important_metric:
# Then define a metric name to export to cloudwatch
metric: 'last_10_uniq'
url: 's3://example-bucket/databases/ds-partitioned-delta-table/'
# Currently only a query handler type of `count` is supported
type: count
# The example Datafusion SQL query below queries the source table, which is defined by
# the URL above, to find all the distinct uuids in the last 10 minutes of the current
# `ds` partition.
query: |
SELECT DISTINCT uuid AS total FROM source
- metric: 'last_10_uniq' # Then define a metric name to export to cloudwatch
url: 's3://example-bucket/databases/ds-partitioned-delta-table/'
# Currently only a query handler type of `count` is supported
type: count
# The example Datafusion SQL query below queries the source table, which is defined by
# the URL above, to find all the distinct uuids in the last 10 minutes of the current
# `ds` partition.
query: |
SELECT DISTINCT uuid AS total FROM source
WHERE ds = ARROW_CAST(CURRENT_DATE() , 'Utf8')
AND created_at >= (ARROW_CAST(ARROW_CAST(NOW(), 'Timestamp(Second, None)'), 'Int64') - (60 * 10))
11 changes: 8 additions & 3 deletions lambdas/query-metrics/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use url::Url;

#[derive(Debug, Deserialize)]
pub struct Configuration {
pub gauges: HashMap<String, Gauge>,
pub gauges: HashMap<String, Vec<Gauge>>,
}

impl Configuration {
Expand Down Expand Up @@ -51,10 +51,15 @@ mod tests {
}

#[test]
fn test_config_b64() {
let b = b"LS0tCiMgVGhpcyBpcyBhbiBleGFtcGxlIG1hbmlmZXN0IGZpdWxlIHdoaWNoIGNvbmZpZ3VyZXMgdGhlIGxhbWJkYQpnYXVnZXM6CiAgIyBFYWNoIGdhdWdlIHNob3VsZCBoYXZlIGEgZGlzdGluY3QgbmFtZSBmb3IgbWFuYWdpbmcgaW5zaWRlIG9mIHRoZSBsYW1iZGEKICBpbXBvcnRhbnRfbWV0cmljOgogICAgIyBUaGVuIGRlZmluZSBhIG1ldHJpYyBuYW1lIHRvIGV4cG9ydCB0byBjbG91ZHdhdGNoCiAgICBtZXRyaWM6ICdsYXN0XzEwX3VuaXEnCiAgICB1cmw6ICdzMzovL2V4YW1wbGUtYnVja2V0L2RhdGFiYXNlcy9kcy1wYXJ0aXRpb25lZC1kZWx0YS10YWJsZS8nCiAgICAjIEN1cnJlbnRseSBvbmx5IGEgcXVlcnkgaGFuZGxlciB0eXBlIG9mIGBjb3VudGAgaXMgc3VwcG9ydGVkCiAgICB0eXBlOiBjb3VudAogICAgIyBUaGUgZXhhbXBsZSBEYXRhZnVzaW9uIFNRTCBxdWVyeSBiZWxvdyBxdWVyaWVzIHRoZSBzb3VyY2UgdGFibGUsIHdoaWNoIGlzIGRlZmluZWQgYnkKICAgICMgdGhlIFVSTCBhYm92ZSwgdG8gZmluZCBhbGwgdGhlIGRpc3RpbmN0IHV1aWRzIGluIHRoZSBsYXN0IDEwIG1pbnV0ZXMgb2YgdGhlIGN1cnJlbnQKICAgICMgYGRzYCBwYXJ0aXRpb24uCiAgICBxdWVyeTogfAogICAgICBTRUxFQ1QgRElTVElOQ1QgdXVpZCBBUyB0b3RhbCBGUk9NIHNvdXJjZQogICAgICAgIFdIRVJFIGRzID0gQVJST1dfQ0FTVChDVVJSRU5UX0RBVEUoKSAsICdVdGY4JykKICAgICAgICBBTkQgY3JlYXRlZF9hdCA+PSAoQVJST1dfQ0FTVChBUlJPV19DQVNUKE5PVygpLCAnVGltZXN0YW1wKFNlY29uZCwgTm9uZSknKSwgJ0ludDY0JykgLSAoNjAgKiAxMCkpCg==";
fn test_config_b64() -> anyhow::Result<()> {
use std::io::Read;
let mut manifest = File::open("manifest.yml")?;
let mut buf = vec![];
let count = manifest.read_to_end(&mut buf)?;
let b = BASE64_STANDARD.encode(buf);
let conf: Configuration = Configuration::from_base64(&b).expect("Failed to deserialize");

assert_eq!(conf.gauges.len(), 1);
Ok(())
}
}
63 changes: 33 additions & 30 deletions lambdas/query-metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,43 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
.expect("The `MANIFEST_B64` environment variable does not contain a valid manifest yml");
debug!("Configuration loaded: {conf:?}");

for (name, gauge) in conf.gauges.iter() {
debug!("Querying the {name} table");
let ctx = SessionContext::new();
let table = deltalake::open_table(&gauge.url)
.await
.expect("Failed to register table");
ctx.register_table("source", Arc::new(table))
.expect("Failed to register table with datafusion");
for (name, gauges) in conf.gauges.iter() {
for gauge in gauges.iter() {
debug!("Querying the {name} table");
let ctx = SessionContext::new();
let table = deltalake::open_table(&gauge.url)
.await
.expect("Failed to register table");
ctx.register_table("source", Arc::new(table))
.expect("Failed to register table with datafusion");

debug!("Running query: {}", gauge.query);
debug!("Running query: {}", gauge.query);

let df = ctx
.sql(&gauge.query)
.await
.expect("Failed to execute query");
let df = ctx
.sql(&gauge.query)
.await
.expect("Failed to execute query");

match gauge.measurement_type {
config::Measurement::Count => {
let count = df.count().await.expect("Failed to collect batches");
debug!("Found {count} distinct records");
match gauge.measurement_type {
config::Measurement::Count => {
let count = df.count().await.expect("Failed to collect batches");
debug!("Found {count} distinct records");

let datum = MetricDatum::builder()
.metric_name(&gauge.name)
.timestamp(DateTime::from(SystemTime::now()))
.value(count as f64)
.unit(StandardUnit::Count)
.build();
let res = cloudwatch
.put_metric_data()
.namespace("DataLake")
.metric_data(datum)
.send()
.await?;
debug!("Result of CloudWatch send: {res:?}");
let datum = MetricDatum::builder()
.metric_name(&gauge.name)
.timestamp(DateTime::from(SystemTime::now()))
.value(count as f64)
.unit(StandardUnit::Count)
.build();

let res = cloudwatch
.put_metric_data()
.namespace(format!("DataLake/{name}"))
.metric_data(datum)
.send()
.await?;
debug!("Result of CloudWatch send: {res:?}");
}
}
}
}
Expand Down

0 comments on commit 775d585

Please sign in to comment.