diff --git a/infra/lib/alerting.ts b/infra/lib/alerting.ts index 1113e4bf..d381ba5a 100644 --- a/infra/lib/alerting.ts +++ b/infra/lib/alerting.ts @@ -17,10 +17,14 @@ interface MatanoAlertingProps { } export class MatanoAlerting extends Construct { + ruleMatchesTopic: sns.Topic; alertingTopic: sns.Topic; constructor(scope: Construct, id: string, props: MatanoAlertingProps) { super(scope, id); + this.ruleMatchesTopic = new sns.Topic(this, "RuleMatchesTopic", { + displayName: "MatanoRuleMatchesTopic", + }); this.alertingTopic = new sns.Topic(this, "Topic", { displayName: "MatanoAlertingTopic", }); @@ -29,14 +33,16 @@ export class MatanoAlerting extends Construct { const alertForwarder = new AlertForwarder(this, "Forwarder", { integrationsStore: props.integrationsStore, alertTrackerTable: props.alertTrackerTable, - alertsSnsTopic: this.alertingTopic, + ruleMatchesSnsTopic: this.ruleMatchesTopic, + alertingSnsTopic: this.alertingTopic, }); } } } interface AlertForwarderProps { - alertsSnsTopic: sns.Topic; + ruleMatchesSnsTopic: sns.Topic; + alertingSnsTopic: sns.Topic; integrationsStore: IntegrationsStore; alertTrackerTable: ddb.Table; } @@ -64,6 +70,7 @@ export class AlertForwarder extends Construct { environment: { RUST_LOG: "warn,alert_forwarder=info", ALERT_TRACKER_TABLE_NAME: props.alertTrackerTable.tableName, + ALERTING_SNS_TOPIC_ARN: props.alertingSnsTopic.topicArn, DESTINATION_TO_CONFIGURATION_MAP: JSON.stringify(props.integrationsStore?.integrationsInfoMap ?? {}), DESTINATION_TO_SECRET_ARN_MAP: JSON.stringify(destinationToSecretArnMap), }, @@ -90,11 +97,12 @@ export class AlertForwarder extends Construct { visibilityTimeout: cdk.Duration.seconds(Math.max(this.function.timeout!.toSeconds(), 30)), }); - props.alertsSnsTopic.addSubscription( + props.ruleMatchesSnsTopic.addSubscription( new SqsSubscription(this.queue, { rawMessageDelivery: true, }) ); + props.alertingSnsTopic.grantPublish(this.function); this.function.addEventSource( new SqsEventSource(this.queue, { diff --git a/infra/lib/lake-writer.ts b/infra/lib/lake-writer.ts index 689e37c9..d2d59d89 100644 --- a/infra/lib/lake-writer.ts +++ b/infra/lib/lake-writer.ts @@ -10,7 +10,7 @@ interface LakeWriterProps { realtimeBucket: s3.IBucket; outputBucket: s3.IBucket; outputObjectPrefix: string; - alertingSnsTopic: sns.Topic; + ruleMatchesSnsTopic: sns.Topic; } export class LakeWriter extends Construct { @@ -43,7 +43,7 @@ export class LakeWriter extends Construct { RUST_LOG: "warn,lake_writer=info", OUT_BUCKET_NAME: props.outputBucket.bucketName, OUT_KEY_PREFIX: props.outputObjectPrefix, - ALERTING_SNS_TOPIC_ARN: props.alertingSnsTopic.topicArn, + RULE_MATCHES_SNS_TOPIC_ARN: props.ruleMatchesSnsTopic.topicArn, }, timeout: cdk.Duration.seconds(120), // prevent concurrency @@ -51,6 +51,6 @@ export class LakeWriter extends Construct { }); props.realtimeBucket.grantRead(this.alertsLakeWriterLambda); props.outputBucket.grantReadWrite(this.alertsLakeWriterLambda); - props.alertingSnsTopic.grantPublish(this.alertsLakeWriterLambda); + props.ruleMatchesSnsTopic.grantPublish(this.alertsLakeWriterLambda); } } diff --git a/infra/src/DPMainStack.ts b/infra/src/DPMainStack.ts index b6d6f096..a1aea5c2 100644 --- a/infra/src/DPMainStack.ts +++ b/infra/src/DPMainStack.ts @@ -82,7 +82,7 @@ export class DPMainStack extends MatanoStack { const lakeWriter = new LakeWriter(this, "LakeWriter", { realtimeBucket: props.realtimeBucket, - alertingSnsTopic: matanoAlerting.alertingTopic, + ruleMatchesSnsTopic: matanoAlerting.ruleMatchesTopic, outputBucket: props.lakeStorageBucket.bucket, outputObjectPrefix: "lake", }); diff --git a/lib/rust/Cargo.lock b/lib/rust/Cargo.lock index 55f4cfce..b211a3a3 100644 --- a/lib/rust/Cargo.lock +++ b/lib/rust/Cargo.lock @@ -81,6 +81,7 @@ dependencies = [ "aws-config 0.51.0", "aws-sdk-dynamodb", "aws-sdk-secretsmanager 0.21.0", + "aws-sdk-sns", "aws-sdk-sqs", "aws_lambda_events", "base64 0.13.1", diff --git a/lib/rust/alert_forwarder/Cargo.toml b/lib/rust/alert_forwarder/Cargo.toml index 52653517..95696dbc 100644 --- a/lib/rust/alert_forwarder/Cargo.toml +++ b/lib/rust/alert_forwarder/Cargo.toml @@ -31,6 +31,7 @@ lambda_runtime = "0.7.0" aws-config = "0.51.0" aws_lambda_events = "0.7.2" aws-sdk-sqs = "0.21.0" +aws-sdk-sns = "0.21.0" aws-sdk-dynamodb = "0.21.0" serde_dynamo = { version = "4", features = ["aws-sdk-dynamodb+0_21"] } diff --git a/lib/rust/alert_forwarder/src/main.rs b/lib/rust/alert_forwarder/src/main.rs index 04218c14..4ca6fcb6 100644 --- a/lib/rust/alert_forwarder/src/main.rs +++ b/lib/rust/alert_forwarder/src/main.rs @@ -1,18 +1,14 @@ +use aws_lambda_events::sqs::SqsEvent; use aws_sdk_dynamodb::model::AttributeValue; -use bytes::Bytes; use serde_dynamo::aws_sdk_dynamodb_0_21::{from_item, to_item}; +use serde_json::json; use std::collections::HashMap; use std::env::var; -use std::pin::Pin; +use std::sync::Arc; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Context, Ok, Result}; use async_once::AsyncOnce; -use async_stream::stream; -use aws_lambda_events::event::s3::{S3Event, S3EventRecord}; -use aws_lambda_events::event::sqs::SqsEvent; -use aws_sdk_sqs::model::SendMessageBatchRequestEntry; use base64::decode; -use futures::future::join_all; use futures::{Stream, StreamExt, TryStreamExt}; use lambda_runtime::{ run, service_fn, Context as LambdaContext, Error as LambdaError, LambdaEvent, @@ -20,11 +16,8 @@ use lambda_runtime::{ use lazy_static::lazy_static; use log::{debug, error, info}; use serde::{Deserialize, Serialize}; -use serde_json::json; -use std::cell::RefCell; use std::collections::BTreeMap; -use tokio::io::AsyncReadExt; use tokio_util::codec::{FramedRead, LinesCodec}; use tokio_util::io::StreamReader; // use chrono_tz::Tz; @@ -34,12 +27,12 @@ use async_compression::tokio::bufread::ZstdDecoder; use ::value::Value; use shared::secrets::load_secret; +use shared::setup_logging; use shared::vrl_util::vrl; -use shared::{setup_logging, LOG_SOURCES_CONFIG}; use vrl::{state, value}; pub mod slack; -use slack::{post_message, post_message_to_thread}; +use slack::publish_alert_to_slack; #[global_allocator] static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; @@ -49,6 +42,8 @@ lazy_static! { AsyncOnce::new(async { aws_config::load_from_env().await }); static ref SQS_CLIENT: AsyncOnce = AsyncOnce::new(async { aws_sdk_sqs::Client::new(AWS_CONFIG.get().await) }); + static ref SNS_CLIENT: AsyncOnce = + AsyncOnce::new(async { aws_sdk_sns::Client::new(AWS_CONFIG.get().await) }); static ref DYNAMODB_CLIENT: AsyncOnce = AsyncOnce::new(async { aws_sdk_dynamodb::Client::new(AWS_CONFIG.get().await) }); static ref DESTINATION_TO_CONFIGURATION_MAP: HashMap = { @@ -66,63 +61,30 @@ lazy_static! { } const FLATTENED_CONTEXT_EXPANDER: &str = r#" -key_to_label = { - "related.ip": ":mag: IP", - "related.user": ":bust_in_silhouette: User", - "related.hosts": ":globe_with_meridians: Host", - "related.hash": ":hash: Hash", -} - context = . . = {} for_each(context) -> |k, v| { - label = get(key_to_label, [k]) ?? null values = array!(v) - value_str_prefix = if label != null { label } else { k } - - value_str_prefix, err = "*" + to_string(value_str_prefix) + ":* " - vals, err = map_values(values) -> |v| { - "`" + to_string(v) + "`" - } - more_count_short = length(vals) - 5 - values_short = slice!(vals, 0, 5) - value_short_str = value_str_prefix + join!(values_short, " ") - if more_count_short > 0 { - value_short_str = value_short_str + " +" + to_string(more_count_short) + " more..." - } - - more_count_long = length(vals) - 25 - values_long = slice!(vals, 0, 25) - value_long_str = value_str_prefix + join!(values_long, " ") - if more_count_long > 0 { - value_long_str = value_long_str + " +" + to_string(more_count_long) + " more..." - } k_parts = split(k, ".") - .long_fmt = set!(object!(.long_fmt || {}), k_parts, value_long_str) - .short_fmt = set!(object!(.short_fmt || {}), k_parts, value_short_str) .values = set!(object!(.values || {}), k_parts, values) } "#; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Alert { - // pub alert_info: Value, pub id: String, pub creation_time: DateTime, pub title: String, - pub title_fmt: String, pub severity: String, pub severity_icon_url: String, pub runbook: String, - pub false_positives_str: String, + pub false_positives: Vec, pub destinations: Vec, - pub related_strs: Vec, - pub context_strs: Vec, - pub context_values: Value, + pub context: Value, pub tables: Vec, pub match_count: i64, @@ -130,6 +92,13 @@ pub struct Alert { pub destination_to_alert_info: HashMap, } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct AlertCDCPayload { + pub updated_alert: Alert, + pub incoming_rule_matches_context: Value, + pub context_diff: Value, +} + #[tokio::main] async fn main() -> Result<(), LambdaError> { setup_logging(); @@ -137,7 +106,7 @@ async fn main() -> Result<(), LambdaError> { let func = service_fn(handler); run(func).await?; - Ok(()) + core::result::Result::Ok(()) } /// Checks if object is definitely compressed (false negatives) @@ -171,7 +140,8 @@ async fn handler(event: LambdaEvent) -> Result<()> { .collect::>(); let stream: tokio_stream::Iter>> = - tokio_stream::iter(vec![Ok(bytes::Bytes::from(bytes))]); + tokio_stream::iter(vec![Ok(bytes::Bytes::from(bytes)) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))]); let reader = StreamReader::new(stream); let mut reader = ZstdDecoder::new(reader); @@ -179,486 +149,339 @@ async fn handler(event: LambdaEvent) -> Result<()> { let lines = FramedRead::new(reader, LinesCodec::new()).map_err(|e| anyhow!(e)); - let _ = lines.try_for_each_concurrent(50, |l| async move { - let mut ecs_alert_info_agg: Value = serde_json::from_str::(l.as_str())?.into(); - - let wrapped_prog = r#" - . = compact(.) - - combined_context = {} - - rule_matches, err = map_values(array!(.rule_matches.new)) -> |v| { - context = del(v.matano.alert.context) - # del(v.matano.alert.original_event) - # del(v.matano.alert.original_event) - # del(v.matano.alert.rule.match.id) - # del(v.matano.alert.rule.destinations) - # del(v.matano.alert.rule.deduplication_window) - # del(v.matano.alert.dedupe) - # del(v.matano.alert.activated) - # del(matano.alert.first_matched_at) - matano_alert_info = del(v.matano.alert) - del(v.ts) - del(v.event.id) - del(v.event.created) - del(v.event.kind) - del(v.event.duration) - # del(v.event.created) - - flatten(v) - } + let _ = lines + .try_for_each_concurrent(50, |l| async move { + let mut ecs_alert_info_agg: Value = serde_json::from_str::(l.as_str())?.into(); - for_each(rule_matches) -> |_i, m| { - for_each(object!(m)) -> |k,v| { - merged = get(combined_context, [k]) ?? [] - merged = flatten([merged, v]) - combined_context = set!(combined_context, [k], merged) - } - } + let mut alert_cdc_payload = + merge_incoming_rule_matches_to_alert_and_compute_cdc(&mut ecs_alert_info_agg) + .await?; + // arc it - combined_context = map_values(combined_context) -> |v| { - v = unique(v) - } - - combined_context = compact(combined_context, recursive: true) - - . = combined_context - - matano_alert_info.match_count = length(rule_matches) - matano_alert_info - "#; - let mut matano_alert_info= vrl(&wrapped_prog, &mut ecs_alert_info_agg)?.0; - vrl(FLATTENED_CONTEXT_EXPANDER, &mut ecs_alert_info_agg)?; + let mut updated_alert = alert_cdc_payload.updated_alert.clone(); - let alert_id = vrl(".id", &mut matano_alert_info)?.0.as_str().context("missing id")?.to_string(); + println!("destinations: {:?}", updated_alert.destinations); - let alert_creation_time = vrl(r#"to_timestamp!(int!(.created) * 1000, "nanoseconds")"#, &mut matano_alert_info)?.0.as_timestamp_unwrap().to_owned(); - let alert_title = vrl(".title", &mut matano_alert_info)?.0.as_str().context("missing title")?.to_string(); - let alert_severity = vrl(".severity", &mut matano_alert_info)?.0.as_str().context("missing severity")?.to_string(); - let alert_runbook = vrl(".runbook || \"\"", &mut matano_alert_info)?.0.as_str().context("missing runbook")?.to_string(); - let alert_false_positives_str = vrl( - r#" - fps = array(.rule.false_positives) ?? [] - fps_str = join!(fps, "\n• ") - if fps_str != "" && length(fps) > 1 { - fps_str = "\n• " + fps_str - } - fps_str - "#, - &mut matano_alert_info - )?.0.as_str().context("missing fp's")?.to_string(); - let alert_destinations = vrl(".destinations || []", &mut matano_alert_info)?.0.as_array_unwrap().into_iter().map(|v| v.as_str().unwrap().to_string()).collect::>(); - let match_count = vrl(".match_count", &mut matano_alert_info)?.0.as_integer().unwrap(); - - let title_fmt = match alert_severity.as_str() { - "critical" => format!("💥 🚨 [{}] {}", alert_severity.to_uppercase(), alert_title), - "high" => format!("🚨 [{}] {}", alert_severity.to_uppercase(), alert_title), - "notice" | "info" => format!("📢 {}", alert_title), - _ => format!("{}", alert_title), - }; - - let severity_icon_label = match alert_severity.as_str() { - "critical" => "high", - "notice" => "info", - s => &s, - }; - // TODO(shaeq): host these images properly - let severity_icon_url = format!("https://gist.githubusercontent.com/shaeqahmed/6c38fc5f0c3adb7e1a3fe6c5f78bbc4f/raw/9a12ff8d23592b31f224f9e27503e77b843b075c/apple-sev-{}-icon.png", severity_icon_label); - - let related_strs = vrl(".short_fmt.related", &mut ecs_alert_info_agg)?.0; - let related_strs = match related_strs { - Value::Object(related_strs) => related_strs.into_values().map(|v| v.as_str().unwrap().to_string()).collect::>(), - _ => vec![], - }; - let mut context_values = vrl(".values", &mut ecs_alert_info_agg)?.0; - let context_strs = vrl("flatten!(.long_fmt)", &mut ecs_alert_info_agg)?.0; - let context_strs = match context_strs { - Value::Object(context_strs) => context_strs.into_values().map(|v| v.as_str().unwrap().to_string()).collect::>(), - _ => vec![], - }; - - let alert_tables = vrl(".matano.table", &mut context_values)?.0 - .as_array() - .context("missing matano.table")? - .into_iter() - .map(|v| v.as_str().unwrap().to_string()) - .collect::>(); - - let dynamodb = DYNAMODB_CLIENT.get().await; - - let alert_tracker_table_name = var("ALERT_TRACKER_TABLE_NAME")?; - let existing_alert_item = dynamodb.get_item().table_name(&alert_tracker_table_name).key("id", AttributeValue::S(alert_id.clone())).send().await?.item; - let mut existing_alert = match existing_alert_item { - Some(existing_alert_item) => { - let existing_alert = from_item::(existing_alert_item).context("failed to deserialize existing alert")?; - Some(existing_alert) - }, - None => None, - }; - let mut new_alert = Alert { - // alert_info: matano_alert_info, - id: alert_id, - creation_time: alert_creation_time, - title: alert_title, - title_fmt: title_fmt, - severity: alert_severity, - severity_icon_url: severity_icon_url, - runbook: alert_runbook, - false_positives_str: alert_false_positives_str, - destinations: alert_destinations, - tables: alert_tables, - match_count: match_count, - update_count: 1, - related_strs: related_strs, - context_strs: context_strs, - context_values: context_values, - destination_to_alert_info: HashMap::new(), - }; - - println!("destinations: {:?}", new_alert.destinations); - - for destination in &new_alert.destinations { - let dest_config = DESTINATION_TO_CONFIGURATION_MAP.get(destination); - if dest_config.is_none() { - continue; - } - let dest_config = dest_config.unwrap(); - let dest_type = dest_config["type"].as_str().unwrap(); - let dest_name = dest_config["name"].as_str().unwrap(); - println!("processing alert"); - - if dest_type == "slack" { - println!("sending slack alert"); - let channel = dest_config["properties"]["channel"].as_str().unwrap(); - let client_id = dest_config["properties"]["client_id"].as_str().unwrap(); - let res = publish_alert_to_slack(dest_name, &existing_alert, &new_alert, channel, client_id).await.unwrap(); - println!("published to slack: {:?}", res); - new_alert.destination_to_alert_info.entry(destination.to_string()).or_insert(res); + for destination in &updated_alert.destinations { + let dest_config = DESTINATION_TO_CONFIGURATION_MAP.get(destination); + if dest_config.is_none() { + continue; + } + let dest_config = dest_config.unwrap(); + let dest_type = dest_config["type"].as_str().unwrap(); + let dest_name = dest_config["name"].as_str().unwrap(); + println!("processing alert"); + + if dest_type == "slack" { + println!("sending slack alert"); + let channel = dest_config["properties"]["channel"].as_str().unwrap(); + let client_id = dest_config["properties"]["client_id"].as_str().unwrap(); + let res = publish_alert_to_slack( + &mut alert_cdc_payload, + dest_name, + channel, + client_id, + ) + .await + .unwrap(); + + updated_alert + .destination_to_alert_info + .entry(destination.to_string()) + .or_insert(res); + } } - } - - if existing_alert.is_none() { - existing_alert = Some(new_alert.clone()); - } else { - existing_alert = existing_alert.map(|mut a| { - a.match_count += new_alert.match_count; - a.update_count += 1; - a - }); - } - - let existing_alert_item = Some(to_item(&existing_alert)?); - let res = dynamodb.put_item().table_name(&alert_tracker_table_name).set_item(existing_alert_item).send().await?; - Ok(()) - }).await?; + alert_cdc_payload.updated_alert = updated_alert; + + // update alert in dynamodb + let dynamodb = DYNAMODB_CLIENT.get().await; + let alert_tracker_table_name = var("ALERT_TRACKER_TABLE_NAME")?; + let updated_alert_item = Some(to_item(&alert_cdc_payload.updated_alert)?); + let res = dynamodb + .put_item() + .table_name(&alert_tracker_table_name) + .set_item(updated_alert_item) + .send() + .await?; + + // cast each Value to String in alert_cdc_payload.updated_alert.destination_to_alert_info + let mut destination_to_alert_info = HashMap::new(); + for (k, v) in alert_cdc_payload.updated_alert.destination_to_alert_info { + destination_to_alert_info.insert(k, json!(v.to_string())); + } + let updated_alert = Alert { + destination_to_alert_info, + ..alert_cdc_payload.updated_alert + }; + let alert_cdc_payload = AlertCDCPayload { + updated_alert, + ..alert_cdc_payload + }; + // send alert change stream payload to alerting sns topic + let sns = SNS_CLIENT.get().await; + let alerting_sns_topic_arn = var("ALERTING_SNS_TOPIC_ARN")?; + let res = sns + .publish() + .topic_arn(&alerting_sns_topic_arn) + .message(serde_json::to_string(&alert_cdc_payload)?) + .send() + .await?; + + info!("published to sns: {:?}", res); + + Ok(()) + }) + .await?; println!("DONE"); Ok(()) } -async fn publish_alert_to_slack( - dest_name: &str, - existing_alert: &Option, - alert: &Alert, - channel: &str, - client_id: &str, -) -> Result { - let api_token = &get_secret_for_destination(dest_name).await?["bot_user_oauth_token"]; - - let mut res = json!(null); - - match existing_alert { - Some(existing_alert) => { - println!("existing alert"); - - let existing_dest_info = existing_alert - .destination_to_alert_info - .get(dest_name) - .unwrap(); - let thread_ts = existing_dest_info["ts"].as_str().unwrap(); - - let compute_new_context = r#" - new_context = {} - - .a = flatten(object!(.a)) - .b = flatten(object!(.b)) - - for_each(.b) -> |k, v| { - a_v = array!(get(.a, [k]) ?? []) - a_v_set = {} - for_each(a_v) -> |_i, f| { - a_v_set = set!(a_v_set, [f], true) - } - v_new = [] - for_each(array!(v)) -> |_i, f| { - exists = get(a_v_set, [f]) ?? false - exists = if exists == true { - true - } else { - false - } - if !exists { - v_new = push(v_new, f) - } - } - new_context = set!(new_context, [k], v_new) - } - compact(new_context)"#; - let existing_context = existing_alert.context_values.to_owned(); - let incoming_context = alert.context_values.to_owned(); - let mut new_context = vrl( - compute_new_context, - &mut value!({ - "a": existing_context, - "b": incoming_context, - }), - )? - .0; - vrl(FLATTENED_CONTEXT_EXPANDER, &mut new_context)?; - let new_context_strs = vrl("flatten(.long_fmt) ?? {}", &mut new_context)?.0; - let new_context_strs = match new_context_strs { - Value::Object(context_strs) => context_strs - .into_values() - .map(|v| v.as_str().unwrap().to_string()) - .collect::>(), - _ => vec![], - }; - // let new_context_values = vrl(".values || {}", &mut new_context)?.0; - // let new_context_values_json: serde_json::Value = new_context_values.try_into().unwrap(); - // let new_context_values_json_str = serde_json::to_string_pretty(&new_context_values_json).unwrap(); - let mut blocks = json!([ - { - "type": "header", - "text": { - "type": "plain_text", - "emoji": true, - "text": format!("➕ {} new rule matches", alert.match_count) - } - }, - { - "type": "divider" - } - ]); - - if new_context_strs.len() > 0 { - blocks.as_array_mut().unwrap().insert( - 2, - json!({ - "type": "section", - "text": { - "type": "mrkdwn", - "text": "*New context*" - }, - }), - ); - blocks.as_array_mut().unwrap().insert( - 3, - json!({ - "type": "context", - "elements": [ - { - "type": "mrkdwn", - "text": new_context_strs.join("\n\n") - } - ] - }), - ); - } else { - blocks.as_array_mut().unwrap().insert( - 2, - json!({ - "type": "context", - "elements": [ - { - "type": "mrkdwn", - "text": "No new context" - } - ] - }), - ); - } - - let blocks_str = serde_json::to_string(&blocks).unwrap(); +// returns AlertCDCPayload { merged_alert, incoming_rule_matches_context, context_diff } +async fn merge_incoming_rule_matches_to_alert_and_compute_cdc( + incoming_ecs_alert_info_agg: &mut Value, +) -> Result { + let mut merged_ecs_alert_context: Value = incoming_ecs_alert_info_agg.clone(); + + // TODO(shaeq): make this cleaner by not modifying the input value + let wrapped_prog = r#" + . = compact(.) + + combined_context = flatten(.existing_context) ?? {} + + rule_matches, err = map_values(array!(.rule_matches.new)) -> |v| { + context = del(v.matano.alert.context) + # del(v.matano.alert.original_event) + # del(v.matano.alert.original_event) + # del(v.matano.alert.rule.match.id) + # del(v.matano.alert.rule.destinations) + # del(v.matano.alert.rule.deduplication_window) + # del(v.matano.alert.dedupe) + # del(v.matano.alert.activated) + # del(matano.alert.first_matched_at) + matano_alert_info = del(v.matano.alert) + del(v.ts) + del(v.event.id) + del(v.event.created) + del(v.event.kind) + del(v.event.duration) + + flatten(v) + } - println!("{}", blocks.to_string()); + for_each(rule_matches) -> |_i, m| { + for_each(object!(m)) -> |k,v| { + merged = get(combined_context, [k]) ?? [] + merged = flatten([merged, v]) + combined_context = set!(combined_context, [k], merged) + } + } - res = post_message_to_thread(api_token, channel, thread_ts, &blocks_str).await?; + combined_context = map_values(combined_context) -> |v| { + v = unique(array!(v)) + } - if !res["ok"].as_bool().unwrap() { - return Err(anyhow!( - "Failed to publish alert context to Slack: {}", - res["error"].as_str().unwrap() - )); - } + combined_context = compact(combined_context, recursive: true) + + . = combined_context + + matano_alert_info.match_count = length(rule_matches) + matano_alert_info + "#; + let mut matano_alert_info = vrl(&wrapped_prog, incoming_ecs_alert_info_agg)?.0; + vrl(FLATTENED_CONTEXT_EXPANDER, incoming_ecs_alert_info_agg)?; + + let alert_id = vrl(".id", &mut matano_alert_info)? + .0 + .as_str() + .context("missing id")? + .to_string(); + + // lookup existing alert + let dynamodb = DYNAMODB_CLIENT.get().await; + + let alert_tracker_table_name = var("ALERT_TRACKER_TABLE_NAME")?; + let merged_alert_item = dynamodb + .get_item() + .table_name(&alert_tracker_table_name) + .key("id", AttributeValue::S(alert_id.clone())) + .send() + .await? + .item; + let mut merged_alert = match merged_alert_item { + Some(merged_alert_item) => { + let merged_alert = from_item::(merged_alert_item) + .context("failed to deserialize existing alert")?; + Some(merged_alert) } - None => { - let alert_creation_time_str = format!( - " ", - alert.creation_time.timestamp(), - alert.creation_time.to_rfc2822() - ); - - println!("new alert"); - - let mut blocks = json!([ - { - "type": "header", - "text": { - "type": "plain_text", - "emoji": true, - "text": alert.title_fmt, - } - }, - { - "type": "context", - "elements": [ - { - "type": "image", - "image_url": alert.severity_icon_url, - "alt_text": &alert.severity - }, - { - "type": "mrkdwn", - "text": format!("Severity: *{}*", &alert.severity) - }, - { - "type": "mrkdwn", - "text": format!("Match count: *{}*", alert.match_count) - }, - { - "type": "mrkdwn", - "text": format!("Table: *{}*", alert.tables.join(",")) - } - ] - }, - { - "type": "context", - "elements": [ - { - "type": "mrkdwn", - "text": format!("*Alert ID:* {}", alert.id) - }, - { - "type": "mrkdwn", - "text": format!("*Created:* {}", alert_creation_time_str) - } - ] - }, - { - "type": "section", - "text": { - "type": "mrkdwn", - "text": format!("*Runbook:* {}", &alert.runbook) - } - }, - { - "type": "divider" - }, - { - "type": "section", - "text": { - "type": "mrkdwn", - "text": "*Context*" - }, - // "accessory": { - // "type": "button", - // "text": { - // "type": "plain_text", - // "emoji": true, - // "text": "View alert details" - // }, - // "value": "click_me_123" - // } - }, - { - "type": "context", - "elements": [ - { - "type": "mrkdwn", - "text": alert.related_strs.join("\n\n") - } - ] - } - ]); - - if alert.false_positives_str != "" { - blocks.as_array_mut().unwrap().insert( - 5, - json!({ - "type": "context", - "elements": [ - { - "type": "mrkdwn", - "text": format!("*False positives:* {}", alert.false_positives_str) - } - ] - }), - ); - } + None => None, + }; - let blocks_str = serde_json::to_string(&blocks).unwrap(); + // then computed merged context + merged_ecs_alert_context + .as_object_mut_unwrap() + .entry("combined_context".to_owned()) + .or_insert(match merged_alert { + Some(ref merged_alert) => merged_alert.context.clone(), + None => Value::Null, + }); + vrl(&wrapped_prog, &mut merged_ecs_alert_context)?.0; + vrl(FLATTENED_CONTEXT_EXPANDER, &mut merged_ecs_alert_context)?; + let merged_context_values = vrl(".values", &mut merged_ecs_alert_context)?.0; + + // now we have the alert info, the new context, and the merged context + let alert_creation_time = vrl( + r#"to_timestamp!(int!(.created) * 1000, "nanoseconds")"#, + &mut matano_alert_info, + )? + .0 + .as_timestamp_unwrap() + .to_owned(); + let alert_title = vrl(".title", &mut matano_alert_info)? + .0 + .as_str() + .context("missing title")? + .to_string(); + let alert_severity = vrl(".severity", &mut matano_alert_info)? + .0 + .as_str() + .context("missing severity")? + .to_string(); + let alert_runbook = vrl(".runbook || \"\"", &mut matano_alert_info)? + .0 + .as_str() + .context("missing runbook")? + .to_string(); + let alert_false_positives = vrl(".false_positives || []", &mut matano_alert_info)? + .0 + .as_array_unwrap() + .into_iter() + .map(|v| v.as_str().unwrap().to_string()) + .collect::>(); + let alert_false_positives_str = vrl( + r#" + fps = array(.rule.false_positives) ?? [] + fps_str = join!(fps, "\n• ") + if fps_str != "" && length(fps) > 1 { + fps_str = "\n• " + fps_str + } + fps_str + "#, + &mut matano_alert_info, + )? + .0 + .as_str() + .context("missing fp's")? + .to_string(); + let alert_destinations = vrl(".destinations || []", &mut matano_alert_info)? + .0 + .as_array_unwrap() + .into_iter() + .map(|v| v.as_str().unwrap().to_string()) + .collect::>(); + let match_count = vrl(".match_count", &mut matano_alert_info)? + .0 + .as_integer() + .unwrap(); + + let severity_icon_label = match alert_severity.as_str() { + "critical" => "high", + "notice" => "info", + s => &s, + }; + // TODO(shaeq): host these images properly + let severity_icon_url = format!("https://gist.githubusercontent.com/shaeqahmed/6c38fc5f0c3adb7e1a3fe6c5f78bbc4f/raw/9a12ff8d23592b31f224f9e27503e77b843b075c/apple-sev-{}-icon.png", severity_icon_label); - println!("{}", blocks.to_string()); + let mut context_values = vrl(".values", incoming_ecs_alert_info_agg)?.0; - res = post_message(api_token, channel, &blocks_str).await?; + let alert_tables = vrl(".matano.table", &mut context_values)? + .0 + .as_array() + .context("missing matano.table")? + .into_iter() + .map(|v| v.as_str().unwrap().to_string()) + .collect::>(); + + let new_alert = Alert { + id: alert_id, + creation_time: alert_creation_time, + title: alert_title, + severity: alert_severity, + severity_icon_url: severity_icon_url, + runbook: alert_runbook, + false_positives: alert_false_positives, + destinations: alert_destinations, + tables: alert_tables, + match_count: match_count, + update_count: 0, + context: context_values, + destination_to_alert_info: HashMap::new(), + }; - if !res["ok"].as_bool().unwrap() { - return Err(anyhow!( - "Failed to publish alert to Slack: {}", - res["error"].as_str().unwrap() - )); - } + // compute a contextual diff + let compute_context_diff_prog = r#" + context_diff = {} - let context_values_json: serde_json::Value = - alert.context_values.to_owned().try_into().unwrap(); - let context_values_json_str = - serde_json::to_string_pretty(&context_values_json).unwrap(); - - let blocks = json!([ - { - "type": "header", - "text": { - "type": "plain_text", - "emoji": true, - "text": format!("ℹ️ Context details for initial rule matches") - } - }, - // { - // "type": "divider" - // }, - { - "type": "context", - "elements": [ - { - "type": "mrkdwn", - "text": alert.context_strs.join("\n\n") - } - ] - } - ]); - - let blocks_str = serde_json::to_string(&blocks).unwrap(); - - println!("{}", blocks.to_string()); - - let res2 = post_message_to_thread( - api_token, - channel, - res["ts"].as_str().unwrap(), - &blocks_str, - ) - .await?; + .a = flatten(object!(.a)) + .b = flatten(object!(.b)) - if !res2["ok"].as_bool().unwrap() { - return Err(anyhow!( - "Failed to publish alert context to Slack: {}", - res2["error"].as_str().unwrap() - )); - } + for_each(.b) -> |k, v| { + a_v = array!( (get(.a, [k]) ?? []) || [] ) + a_v_set = {} + for_each(a_v) -> |_i, f| { + a_v_set = set!(a_v_set, [f], true) + } + v_new = [] + for_each(array!(v)) -> |_i, f| { + exists = get(a_v_set, [f]) ?? false + exists = if exists == true { + true + } else { + false + } + if !exists { + v_new = push(v_new, f) + } } + context_diff = set!(context_diff, [k], v_new) } + compact(context_diff)"#; + let existing_context = if merged_alert.is_some() { + merged_alert.as_ref().unwrap().context.clone() + } else { + Value::Object(BTreeMap::new()) + }; + let incoming_context = new_alert.context.clone(); + let mut context_diff = vrl( + compute_context_diff_prog, + &mut value!({ + "a": existing_context, + "b": incoming_context, + }), + )? + .0; + vrl(FLATTENED_CONTEXT_EXPANDER, &mut context_diff)?; + let context_diff_values = vrl(".values || {}", &mut context_diff)?.0; + + // merge the new rule matches into the existing alert to get a merged alert + if merged_alert.is_none() { + merged_alert = Some(new_alert.clone()); + } else { + merged_alert = merged_alert.map(|mut a| { + a.match_count += new_alert.match_count; + a.update_count += 1; + a.context = merged_context_values; + a + }); + } + + let merged_alert = merged_alert.unwrap(); - Ok(res) + Ok(AlertCDCPayload { + updated_alert: merged_alert, + incoming_rule_matches_context: new_alert.context, + context_diff: context_diff_values, + }) } diff --git a/lib/rust/alert_forwarder/src/slack.rs b/lib/rust/alert_forwarder/src/slack.rs index b8df6c2d..c1101c21 100644 --- a/lib/rust/alert_forwarder/src/slack.rs +++ b/lib/rust/alert_forwarder/src/slack.rs @@ -1,8 +1,56 @@ -use anyhow::Result; +use crate::{get_secret_for_destination, AlertCDCPayload}; +use ::value::Value; +use anyhow::{anyhow, Context, Ok, Result}; use log::{debug, error, info}; -use serde_json::Value; +use serde_json::json; +use shared::vrl_util::vrl; use std::collections::HashMap; +// Slack formatting helpers +const CONTEXT_TO_STR_FMT: &str = r#" +key_to_label = { + "related.ip": ":mag: IP", + "related.user": ":bust_in_silhouette: User", + "related.hosts": ":globe_with_meridians: Host", + "related.hash": ":hash: Hash", +} + +context = flatten(.) +ret = {} + +for_each(context) -> |k, v| { + label = get(key_to_label, [k]) ?? null + values = array!(v) + value_str_prefix = if label != null { label } else { k } + + value_str_prefix, err = "*" + to_string(value_str_prefix) + ":* " + vals, err = map_values(values) -> |v| { + "`" + to_string(v) + "`" + } + more_count_short = length(vals) - 5 + values_short = slice!(vals, 0, 5) + value_short_str = value_str_prefix + join!(values_short, " ") + if more_count_short > 0 { + value_short_str = value_short_str + " +" + to_string(more_count_short) + " more..." + } + + more_count_long = length(vals) - 25 + values_long = slice!(vals, 0, 25) + value_long_str = value_str_prefix + join!(values_long, " ") + if more_count_long > 0 { + value_long_str = value_long_str + " +" + to_string(more_count_long) + " more..." + } + + k_parts = split(k, ".") + + ret.long_fmt = set!(object!(ret.long_fmt || {}), k_parts, value_long_str) + ret.short_fmt = set!(object!(ret.short_fmt || {}), k_parts, value_short_str) +} + +ret +"#; + +// Slack API helpers async fn post(api_token: &str, body: HashMap<&str, &str>, uri: &str) -> Result { debug!("{:?}", body); let client = reqwest::Client::new() @@ -24,7 +72,11 @@ async fn post(api_token: &str, body: HashMap<&str, &str>, uri: &str) -> Result Result { +pub async fn post_message( + api_token: &str, + channel: &str, + blocks: &str, +) -> Result { let mut body = HashMap::new(); body.insert("channel", channel); body.insert("blocks", blocks); @@ -37,7 +89,7 @@ pub async fn post_message_to_thread( channel: &str, ts: &str, blocks: &str, -) -> Result { +) -> Result { let mut body = HashMap::new(); body.insert("channel", channel); body.insert("blocks", blocks); @@ -51,7 +103,7 @@ pub async fn post_ephemeral_attachments( channel: &str, user: &str, attachments: serde_json::Value, -) -> Result { +) -> Result { let attachments_str = attachments.to_string(); let mut body = HashMap::new(); body.insert("channel", channel); @@ -66,10 +118,323 @@ pub async fn add_reaction( channel: &str, ts: &str, reaction: &str, -) -> Result { +) -> Result { let mut body = HashMap::new(); body.insert("name", reaction); body.insert("channel", channel); body.insert("timestamp", ts); post(api_token, body, "https://slack.com/api/reactions.add").await } + +// Main Slack alert publishing function +pub async fn publish_alert_to_slack( + alert_payload: &mut AlertCDCPayload, + dest_name: &str, + channel: &str, + client_id: &str, +) -> Result { + let api_token = &get_secret_for_destination(dest_name).await?["bot_user_oauth_token"]; + + let mut res = json!(null); + + let alert = &alert_payload.updated_alert; + + let alert_title = &alert.title; + let alert_severity = &alert.severity; + let title_fmt = match alert_severity.as_str() { + "critical" => format!("💥 🚨 [{}] {}", alert_severity.to_uppercase(), alert_title), + "high" => format!("🚨 [{}] {}", alert_severity.to_uppercase(), alert_title), + "notice" | "info" => format!("📢 {}", alert_title), + _ => format!("{}", alert_title), + }; + + if alert.update_count > 0 { + println!("existing alert"); + + let existing_dest_info = alert.destination_to_alert_info.get(dest_name).unwrap(); + let thread_ts = existing_dest_info["ts"].as_str().unwrap(); + + let mut context_diff_fmt = vrl(CONTEXT_TO_STR_FMT, &mut alert_payload.context_diff)?.0; + + let new_context_strs = vrl("flatten(.long_fmt) ?? {}", &mut context_diff_fmt)?.0; + let new_context_strs = match new_context_strs { + Value::Object(context_strs) => context_strs + .into_values() + .map(|v| v.as_str().unwrap().to_string()) + .collect::>(), + _ => vec![], + }; + // let new_context_values = vrl(".values || {}", &mut new_context)?.0; + // let new_context_values_json: serde_json::Value = new_context_values.try_into().unwrap(); + // let new_context_values_json_str = serde_json::to_string_pretty(&new_context_values_json).unwrap(); + let mut blocks = json!([ + { + "type": "header", + "text": { + "type": "plain_text", + "emoji": true, + "text": format!("➕ {} new rule matches", alert.match_count) + } + }, + { + "type": "divider" + } + ]); + + if new_context_strs.len() > 0 { + blocks.as_array_mut().unwrap().insert( + 2, + json!({ + "type": "section", + "text": { + "type": "mrkdwn", + "text": "*New context*" + }, + }), + ); + blocks.as_array_mut().unwrap().insert( + 3, + json!({ + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": new_context_strs.join("\n\n") + } + ] + }), + ); + } else { + blocks.as_array_mut().unwrap().insert( + 2, + json!({ + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": "No new context" + } + ] + }), + ); + } + + let blocks_str = serde_json::to_string(&blocks).unwrap(); + + println!("{}", blocks.to_string()); + + res = post_message_to_thread(api_token, channel, thread_ts, &blocks_str).await?; + + if !res["ok"].as_bool().unwrap() { + return Err(anyhow!( + "Failed to publish alert context to Slack: {}", + res["error"].as_str().unwrap() + )); + } + } else { + let alert_creation_time_str = format!( + " ", + alert.creation_time.timestamp(), + alert.creation_time.to_rfc2822() + ); + + let mut context_fmt = vrl( + CONTEXT_TO_STR_FMT, + &mut alert_payload.incoming_rule_matches_context, + )? + .0; + + let related_strs = vrl(".short_fmt.related", &mut context_fmt)?.0; + let related_strs = match related_strs { + Value::Object(related_strs) => related_strs + .into_values() + .map(|v| v.as_str().unwrap().to_string()) + .collect::>(), + _ => vec![], + }; + let context_values = vrl(".values", &mut context_fmt)?.0; + + let context_strs = vrl("flatten!(.long_fmt)", &mut context_fmt)?.0; + let context_strs = match context_strs { + Value::Object(context_strs) => context_strs + .into_values() + .map(|v| v.as_str().unwrap().to_string()) + .collect::>(), + _ => vec![], + }; + + let mut blocks = json!([ + { + "type": "header", + "text": { + "type": "plain_text", + "emoji": true, + "text": title_fmt, + } + }, + { + "type": "context", + "elements": [ + { + "type": "image", + "image_url": alert.severity_icon_url, + "alt_text": &alert.severity + }, + { + "type": "mrkdwn", + "text": format!("Severity: *{}*", &alert.severity) + }, + { + "type": "mrkdwn", + "text": format!("Match count: *{}*", alert.match_count) + }, + { + "type": "mrkdwn", + "text": format!("Table: *{}*", alert.tables.join(",")) + } + ] + }, + { + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": format!("*Alert ID:* {}", alert.id) + }, + { + "type": "mrkdwn", + "text": format!("*Created:* {}", alert_creation_time_str) + } + ] + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": format!("*Runbook:* {}", &alert.runbook) + } + }, + { + "type": "divider" + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "*Context*" + }, + // "accessory": { + // "type": "button", + // "text": { + // "type": "plain_text", + // "emoji": true, + // "text": "View alert details" + // }, + // "value": "click_me_123" + // } + }, + ]); + + if related_strs.len() > 0 { + blocks.as_array_mut().unwrap().push( + json!({ + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": related_strs.join("\n\n") + } + ] + }), + ); + } + + let mut alert_false_positives_value: Value = alert.false_positives.clone().into(); + let alert_false_positives_str = vrl( + r#" + fps = array(.) ?? [] + fps_str = join!(fps, "\n• ") + if fps_str != "" && length(fps) > 1 { + fps_str = "\n• " + fps_str + } + fps_str + "#, + &mut alert_false_positives_value, + )? + .0 + .as_str() + .context("missing fp's")? + .to_string(); + + if alert_false_positives_str != "" { + blocks.as_array_mut().unwrap().insert( + 5, + json!({ + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": format!("*False positives:* {}", alert_false_positives_str) + } + ] + }), + ); + } + + let blocks_str = serde_json::to_string(&blocks).unwrap(); + + println!("{}", blocks.to_string()); + + res = post_message(api_token, channel, &blocks_str).await?; + + if !res["ok"].as_bool().unwrap() { + return Err(anyhow!( + "Failed to publish alert to Slack: {}", + res["error"].as_str().unwrap() + )); + } + + let context_values_json: serde_json::Value = alert.context.to_owned().try_into().unwrap(); + let context_values_json_str = serde_json::to_string_pretty(&context_values_json).unwrap(); + + let blocks = json!([ + { + "type": "header", + "text": { + "type": "plain_text", + "emoji": true, + "text": format!("ℹ️ Context details for initial rule matches") + } + }, + // { + // "type": "divider" + // }, + { + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": context_strs.join("\n\n") + } + ] + } + ]); + + let blocks_str = serde_json::to_string(&blocks).unwrap(); + + println!("{}", blocks.to_string()); + + let res2 = + post_message_to_thread(api_token, channel, res["ts"].as_str().unwrap(), &blocks_str) + .await?; + + if !res2["ok"].as_bool().unwrap() { + return Err(anyhow!( + "Failed to publish alert context to Slack: {}", + res2["error"].as_str().unwrap() + )); + } + } + + Ok(res) +} diff --git a/lib/rust/lake_writer/src/matano_alerts.rs b/lib/rust/lake_writer/src/matano_alerts.rs index 4213316c..c6a63fb5 100644 --- a/lib/rust/lake_writer/src/matano_alerts.rs +++ b/lib/rust/lake_writer/src/matano_alerts.rs @@ -536,7 +536,7 @@ pub async fn process_alerts(s3: aws_sdk_s3::Client, data: Vec>) -> Resul let alert_futures = alerts_to_deliver_compressed_chunks.into_iter().map(|s| { sns.publish() - .topic_arn(var("ALERTING_SNS_TOPIC_ARN").unwrap().to_string()) + .topic_arn(var("RULE_MATCHES_SNS_TOPIC_ARN").unwrap().to_string()) .message(s) .message_attributes( "destination",