diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..69a16c8 --- /dev/null +++ b/Makefile @@ -0,0 +1,22 @@ + +.PHONY: help +help: ## Show this help + @egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' + +.PHONY: all build build-release check test clean +all: check build test ## Perform all the checks builds and testing + +check: ## Ensure that the crate meets the basic formatting and structure + cargo fmt --check + cargo clippy + +build: ## Build the crate with each set of features + ./ci/build.sh + +build-release: check test ## Build the release versions of Lambdas + ./ci/build-release.sh +test: ## Run the crate's tests with each set of features + ./ci/test.sh + +clean: ## Clean up resources from build + cargo clean diff --git a/ci/build-release.sh b/ci/build-release.sh new file mode 100755 index 0000000..f0411c7 --- /dev/null +++ b/ci/build-release.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +if [ -f "${HOME}/.cargo/env" ]; then + . "${HOME}/.cargo/env" +fi; + +exec cargo lambda build --release --output-format zip diff --git a/ci/build.sh b/ci/build.sh new file mode 100755 index 0000000..dd926e8 --- /dev/null +++ b/ci/build.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +if [ -f "${HOME}/.cargo/env" ]; then + . "${HOME}/.cargo/env" +fi; + +set -xe + +cargo fmt --check + +exec cargo build diff --git a/ci/test.sh b/ci/test.sh new file mode 100755 index 0000000..3852d80 --- /dev/null +++ b/ci/test.sh @@ -0,0 +1,6 @@ +#!/bin/sh +if [ -f "${HOME}/.cargo/env" ]; then + . "${HOME}/.cargo/env" +fi; + +exec cargo test --verbose diff --git a/lambdas/query-metrics/Cargo.toml b/lambdas/query-metrics/Cargo.toml index e1cda12..b62cd1d 100644 --- a/lambdas/query-metrics/Cargo.toml +++ b/lambdas/query-metrics/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "query-metrics" -version = "0.3.0" +version = "0.4.0" edition = "2021" [[bin]] diff --git a/lambdas/query-metrics/README.md b/lambdas/query-metrics/README.md new file mode 100644 index 0000000..e087644 --- /dev/null +++ b/lambdas/query-metrics/README.md @@ -0,0 +1,50 @@ + +# query-metrics + +This Lambda will execute DataFusion queries defined in a YAML file and submit +the results to CloudWatch Metrics which can be alerted upon or forwarded into +other tools + + + +## Types + +### Count + +This is the simplest type of query and will simply record the number of rows from the query, e.g.: + +```sql +SELECT id FROM source WHERE id > 1000 AND id <= 2000 +``` + +Would consistently produce a counted metric value of `1000`. + + +### Numeric + +Numeric is likely the most common and easy to understand query. There should only be one row in the result set and all of its values should be numeric values, e.g.: + +```sql +SELECT COUNT(*) AS total, SUM(CASE WHEN (id > 1000 AND id <= 2000) THEN 1 ELSE 0 END) AS valid_ids FROM source +``` + +This will produce a result set of: + +``` ++-------+-----------+ +| total | valid_ids | ++-------+-----------+ +| 4096 | 1000 | ++-------+-----------+ +``` + +Which wiull produce metric values of: + +* `total` 4096 +* `valid_ids` 1000 + + +### Dimensional Count + +The dimensional count is the most advanced query type and can be used to +provide dimensional (or tagged) metrics in CloudWatch diff --git a/lambdas/query-metrics/src/cli.rs b/lambdas/query-metrics/src/cli.rs index c0a42a7..6ff180a 100644 --- a/lambdas/query-metrics/src/cli.rs +++ b/lambdas/query-metrics/src/cli.rs @@ -47,6 +47,34 @@ async fn main() -> anyhow::Result<()> { let count = df.count().await.expect("Failed to collect batches"); println!("Counted {count} rows"); } + config::Measurement::Numeric => { + println!("Need to run dimensional count"); + let batches = df.collect().await.expect("Failed to collect batches"); + let _ = print_batches(&batches); + + println!("I see this many batches: {}", batches.len()); + let mut dimensions: HashMap = HashMap::new(); + for batch in batches.iter().filter(|b| b.num_rows() > 0) { + let schema = batch.schema(); + let fields = schema.fields(); + for row in 0..batch.num_rows() { + for (idx, column) in batch.columns().iter().enumerate() { + let field = &fields[idx]; + let name = field.name(); + + if !dimensions.contains_key(name) { + dimensions.insert(name.to_string(), 0); + } + let current = dimensions.get(name).expect("Failed to retrieve"); + let arr: &PrimitiveArray = + arrow::array::cast::as_primitive_array(&column); + let count = arr.value(row); + dimensions.insert(name.to_string(), count + current); + } + } + } + println!("results: {dimensions:?}"); + } config::Measurement::DimensionalCount => { println!("Need to run dimensional count"); let batches = df.collect().await.expect("Failed to collect batches"); diff --git a/lambdas/query-metrics/src/config.rs b/lambdas/query-metrics/src/config.rs index 247e52d..29be7cf 100644 --- a/lambdas/query-metrics/src/config.rs +++ b/lambdas/query-metrics/src/config.rs @@ -38,6 +38,7 @@ pub struct Gauge { #[serde(rename_all = "lowercase")] pub enum Measurement { Count, + Numeric, DimensionalCount, } diff --git a/lambdas/query-metrics/src/main.rs b/lambdas/query-metrics/src/main.rs index ac3472a..bba39c5 100644 --- a/lambdas/query-metrics/src/main.rs +++ b/lambdas/query-metrics/src/main.rs @@ -29,7 +29,7 @@ async fn function_handler(_event: LambdaEvent) -> Result<(), Er std::env::var("MANIFEST_B64").expect("The `MANIFEST_B64` variable was not defined"), ) .expect("The `MANIFEST_B64` environment variable does not contain a valid manifest yml"); - debug!("Configuration loaded: {conf:?}"); + info!("Configuration loaded: {conf:?}"); for (name, gauges) in conf.gauges.iter() { for gauge in gauges.iter() { @@ -41,7 +41,7 @@ async fn function_handler(_event: LambdaEvent) -> Result<(), Er ctx.register_table("source", Arc::new(table)) .expect("Failed to register table with datafusion"); - debug!("Running query: {}", gauge.query); + info!("Running query: {}", gauge.query); let df = ctx .sql(&gauge.query) @@ -68,6 +68,47 @@ async fn function_handler(_event: LambdaEvent) -> Result<(), Er .await?; debug!("Result of CloudWatch send: {res:?}"); } + config::Measurement::Numeric => { + let batches = df.collect().await.expect("Failed to collect batches"); + let mut values: HashMap = HashMap::new(); + + for batch in batches.iter().filter(|b| b.num_rows() > 0) { + let schema = batch.schema(); + let fields = schema.fields(); + for row in 0..batch.num_rows() { + for (idx, column) in batch.columns().iter().enumerate() { + let field = &fields[idx]; + let name = field.name(); + + if !values.contains_key(name) { + values.insert(name.to_string(), 0); + } + let current = values.get(name).expect("Failed to retrieve"); + let arr: &PrimitiveArray = + arrow::array::cast::as_primitive_array(&column); + let count = arr.value(row); + values.insert(name.to_string(), count + current); + } + } + } + info!("results: {values:?}"); + for (key, value) in values.into_iter() { + let datum = MetricDatum::builder() + .metric_name(&key) + .timestamp(DateTime::from(SystemTime::now())) + .unit(StandardUnit::Count) + .value(value as f64) + .build(); + + let res = cloudwatch + .put_metric_data() + .namespace(format!("DataLake/{name}")) + .metric_data(datum) + .send() + .await?; + info!("submitting {key} to cloudwatch: {res:?}"); + } + } config::Measurement::DimensionalCount => { let batches = df.collect().await.expect("Failed to collect batches"); debug!("I see this many batches: {}", batches.len());