Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the Numeric metric type and add some documentation and tooling #6

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

.PHONY: help
help: ## Show this help
@egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'

.PHONY: all build build-release check test clean
all: check build test ## Perform all the checks builds and testing

check: ## Ensure that the crate meets the basic formatting and structure
cargo fmt --check
cargo clippy

build: ## Build the crate with each set of features
./ci/build.sh

build-release: check test ## Build the release versions of Lambdas
./ci/build-release.sh
test: ## Run the crate's tests with each set of features
./ci/test.sh

clean: ## Clean up resources from build
cargo clean
7 changes: 7 additions & 0 deletions ci/build-release.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/sh

if [ -f "${HOME}/.cargo/env" ]; then
. "${HOME}/.cargo/env"
fi;

exec cargo lambda build --release --output-format zip
11 changes: 11 additions & 0 deletions ci/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/sh

if [ -f "${HOME}/.cargo/env" ]; then
. "${HOME}/.cargo/env"
fi;

set -xe

cargo fmt --check

exec cargo build
6 changes: 6 additions & 0 deletions ci/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/sh
if [ -f "${HOME}/.cargo/env" ]; then
. "${HOME}/.cargo/env"
fi;

exec cargo test --verbose
2 changes: 1 addition & 1 deletion lambdas/query-metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "query-metrics"
version = "0.3.0"
version = "0.4.0"
edition = "2021"

[[bin]]
Expand Down
50 changes: 50 additions & 0 deletions lambdas/query-metrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@

# query-metrics

This Lambda will execute DataFusion queries defined in a YAML file and submit
the results to CloudWatch Metrics which can be alerted upon or forwarded into
other tools



## Types

### Count

This is the simplest type of query and will simply record the number of rows from the query, e.g.:

```sql
SELECT id FROM source WHERE id > 1000 AND id <= 2000
```

Would consistently produce a counted metric value of `1000`.


### Numeric

Numeric is likely the most common and easy to understand query. There should only be one row in the result set and all of its values should be numeric values, e.g.:

```sql
SELECT COUNT(*) AS total, SUM(CASE WHEN (id > 1000 AND id <= 2000) THEN 1 ELSE 0 END) AS valid_ids FROM source
```

This will produce a result set of:

```
+-------+-----------+
| total | valid_ids |
+-------+-----------+
| 4096 | 1000 |
+-------+-----------+
```

Which wiull produce metric values of:

* `total` 4096
* `valid_ids` 1000


### Dimensional Count

The dimensional count is the most advanced query type and can be used to
provide dimensional (or tagged) metrics in CloudWatch
28 changes: 28 additions & 0 deletions lambdas/query-metrics/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,34 @@ async fn main() -> anyhow::Result<()> {
let count = df.count().await.expect("Failed to collect batches");
println!("Counted {count} rows");
}
config::Measurement::Numeric => {
println!("Need to run dimensional count");
let batches = df.collect().await.expect("Failed to collect batches");
let _ = print_batches(&batches);

println!("I see this many batches: {}", batches.len());
let mut dimensions: HashMap<String, i64> = HashMap::new();
for batch in batches.iter().filter(|b| b.num_rows() > 0) {
let schema = batch.schema();
let fields = schema.fields();
for row in 0..batch.num_rows() {
for (idx, column) in batch.columns().iter().enumerate() {
let field = &fields[idx];
let name = field.name();

if !dimensions.contains_key(name) {
dimensions.insert(name.to_string(), 0);
}
let current = dimensions.get(name).expect("Failed to retrieve");
let arr: &PrimitiveArray<Int64Type> =
arrow::array::cast::as_primitive_array(&column);
let count = arr.value(row);
dimensions.insert(name.to_string(), count + current);
}
}
}
println!("results: {dimensions:?}");
}
config::Measurement::DimensionalCount => {
println!("Need to run dimensional count");
let batches = df.collect().await.expect("Failed to collect batches");
Expand Down
1 change: 1 addition & 0 deletions lambdas/query-metrics/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct Gauge {
#[serde(rename_all = "lowercase")]
pub enum Measurement {
Count,
Numeric,
DimensionalCount,
}

Expand Down
45 changes: 43 additions & 2 deletions lambdas/query-metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
std::env::var("MANIFEST_B64").expect("The `MANIFEST_B64` variable was not defined"),
)
.expect("The `MANIFEST_B64` environment variable does not contain a valid manifest yml");
debug!("Configuration loaded: {conf:?}");
info!("Configuration loaded: {conf:?}");

for (name, gauges) in conf.gauges.iter() {
for gauge in gauges.iter() {
Expand All @@ -41,7 +41,7 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
ctx.register_table("source", Arc::new(table))
.expect("Failed to register table with datafusion");

debug!("Running query: {}", gauge.query);
info!("Running query: {}", gauge.query);

let df = ctx
.sql(&gauge.query)
Expand All @@ -68,6 +68,47 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
.await?;
debug!("Result of CloudWatch send: {res:?}");
}
config::Measurement::Numeric => {
let batches = df.collect().await.expect("Failed to collect batches");
let mut values: HashMap<String, i64> = HashMap::new();

for batch in batches.iter().filter(|b| b.num_rows() > 0) {
let schema = batch.schema();
let fields = schema.fields();
for row in 0..batch.num_rows() {
for (idx, column) in batch.columns().iter().enumerate() {
let field = &fields[idx];
let name = field.name();

if !values.contains_key(name) {
values.insert(name.to_string(), 0);
}
let current = values.get(name).expect("Failed to retrieve");
let arr: &PrimitiveArray<Int64Type> =
arrow::array::cast::as_primitive_array(&column);
let count = arr.value(row);
values.insert(name.to_string(), count + current);
}
}
}
info!("results: {values:?}");
for (key, value) in values.into_iter() {
let datum = MetricDatum::builder()
.metric_name(&key)
.timestamp(DateTime::from(SystemTime::now()))
.unit(StandardUnit::Count)
.value(value as f64)
.build();

let res = cloudwatch
.put_metric_data()
.namespace(format!("DataLake/{name}"))
.metric_data(datum)
.send()
.await?;
info!("submitting {key} to cloudwatch: {res:?}");
}
}
config::Measurement::DimensionalCount => {
let batches = df.collect().await.expect("Failed to collect batches");
debug!("I see this many batches: {}", batches.len());
Expand Down