From 6582f6927011d30b8bc16f96e7e9f6b2c45c26b3 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 4 Dec 2023 17:55:02 +0100 Subject: [PATCH] feat: accept other JSON types as distinct_id, stringify them --- capture/src/capture.rs | 27 ++----- capture/src/event.rs | 159 +++++++++++++++++++++++++++++++++++------ 2 files changed, 146 insertions(+), 40 deletions(-) diff --git a/capture/src/capture.rs b/capture/src/capture.rs index 8cc37e0..37e2872 100644 --- a/capture/src/capture.rs +++ b/capture/src/capture.rs @@ -79,12 +79,12 @@ pub async fn event( let payload = base64::engine::general_purpose::STANDARD .decode(input.data) .unwrap(); - RawEvent::from_bytes(&meta, payload.into()) + RawEvent::from_bytes(payload.into()) } ct => { tracing::Span::current().record("content_type", ct); - RawEvent::from_bytes(&meta, body) + RawEvent::from_bytes(body) } }?; @@ -165,19 +165,6 @@ pub fn process_single_event( event: &RawEvent, context: &ProcessingContext, ) -> Result { - let distinct_id = match &event.distinct_id { - Some(id) => id, - None => match event.properties.get("distinct_id").map(|v| v.as_str()) { - Some(Some(id)) => id, - _ => return Err(CaptureError::MissingDistinctId), - }, - }; - // Limit the size of distinct_id to 200 chars - let distinct_id: String = match distinct_id.len() { - 0..=200 => distinct_id.to_owned(), - _ => distinct_id.chars().take(200).collect(), - }; - if event.event.is_empty() { return Err(CaptureError::MissingEventName); } @@ -189,7 +176,7 @@ pub fn process_single_event( Ok(ProcessedEvent { uuid: event.uuid.unwrap_or_else(uuid_v7), - distinct_id, + distinct_id: event.extract_distinct_id()?, ip: context.client_ip.clone(), data, now: context.now.clone(), @@ -252,7 +239,7 @@ mod tests { let events = vec![ RawEvent { token: Some(String::from("hello")), - distinct_id: Some("testing".to_string()), + distinct_id: Some(json!("testing")), uuid: None, event: String::new(), properties: HashMap::new(), @@ -263,7 +250,7 @@ mod tests { }, RawEvent { token: None, - distinct_id: Some("testing".to_string()), + distinct_id: Some(json!("testing")), uuid: None, event: String::new(), properties: HashMap::from([(String::from("token"), json!("hello"))]), @@ -283,7 +270,7 @@ mod tests { let events = vec![ RawEvent { token: Some(String::from("hello")), - distinct_id: Some("testing".to_string()), + distinct_id: Some(json!("testing")), uuid: None, event: String::new(), properties: HashMap::new(), @@ -294,7 +281,7 @@ mod tests { }, RawEvent { token: None, - distinct_id: Some("testing".to_string()), + distinct_id: Some(json!("testing")), uuid: None, event: String::new(), properties: HashMap::from([(String::from("token"), json!("goodbye"))]), diff --git a/capture/src/event.rs b/capture/src/event.rs index 92b0f0b..ea71a3f 100644 --- a/capture/src/event.rs +++ b/capture/src/event.rs @@ -44,8 +44,8 @@ pub struct RawEvent { skip_serializing_if = "Option::is_none" )] pub token: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub distinct_id: Option, + #[serde(alias = "$distinct_id", skip_serializing_if = "Option::is_none")] + pub distinct_id: Option, // posthog-js accepts arbitrary values as distinct_id pub uuid: Option, pub event: String, #[serde(default)] @@ -87,7 +87,7 @@ impl RawEvent { /// Instead of trusting the parameter, we peek at the payload's first three bytes to /// detect gzip, fallback to uncompressed utf8 otherwise. #[instrument(skip_all)] - pub fn from_bytes(_query: &EventQuery, bytes: Bytes) -> Result, CaptureError> { + pub fn from_bytes(bytes: Bytes) -> Result, CaptureError> { tracing::debug!(len = bytes.len(), "decoding new event"); let payload = if bytes.starts_with(&GZIP_MAGIC_NUMBERS) { @@ -119,6 +119,30 @@ impl RawEvent { .map(String::from), } } + + /// Extracts, stringifies and trims the distinct_id to a 200 chars String. + /// SDKs send the distinct_id either in the root field or as a property, + /// and can send string, number, array, or map values. We try to best-effort + /// stringify complex values, and make sure it's not longer than 200 chars. + pub fn extract_distinct_id(&self) -> Result { + // Breaking change compared to capture-py: None / Null is not allowed. + let value = match &self.distinct_id { + None | Some(Value::Null) => match self.properties.get("distinct_id") { + None | Some(Value::Null) => return Err(CaptureError::MissingDistinctId), + Some(id) => id, + }, + Some(id) => id, + }; + + let distinct_id = value + .as_str() + .map(|s| s.to_owned()) + .unwrap_or_else(|| value.to_string()); + Ok(match distinct_id.len() { + 0..=200 => distinct_id, + _ => distinct_id.chars().take(200).collect(), + }) + } } #[derive(Debug)] @@ -150,29 +174,124 @@ impl ProcessedEvent { #[cfg(test)] mod tests { - use super::Compression; use base64::Engine as _; use bytes::Bytes; + use rand::distributions::Alphanumeric; + use rand::Rng; + use serde_json::json; - use super::{EventQuery, RawEvent}; + use super::CaptureError; + use super::RawEvent; #[test] - fn decode_bytes() { - let horrible_blob = "H4sIAAAAAAAAA31T207cMBD9lSrikSy+5bIrVX2g4oWWUlEqBEKRY08Sg4mD4+xCEf/e8XLZBSGeEp+ZOWOfmXPxkMAS+pAskp1BtmBBLiHZTQbvBvDBwJgsHpIdh5/kp1Rffp18OcMwAtUS/GhcjwFKZjSbkYjX3q1G8AgeGA+Nu4ughqVRUIX7ATDwHcbr4IYYUJP32LyavMVAF8Kw2NuzTknbuTEsSkIIHlvTf+vhLnzdizUxgslvs2JgkKHr5U1s8VS0dZ/NZSnlW7CVfTvhs7EG+vT0JJaMygP0VQem7bDTvBAbcGV06JAkIwTBpYHV4Hx4zS1FJH+FX7IFj7A1NbZZQR2b4GFbwFlWzFjETY/XCpXRiN538yt/S9mdnm7bSa+lDCY+kOalKDJGs/msZMVuos0YTK+e62hZciHqes7LnDcpoVmTg+TAaqnKMhWUaaa4TllBoCDpJn2uYK3k87xeyFjZFHWdzxmdq5Q0IstBzRXlDMiHbM/5kgnerKfs+tFZqHAolQflvDZ9W0Evawu6wveiENVoND4s+Ami2jBGZbayn/42g3xblizX4skp4FYMYfJQoSQf8DfSjrGBVMEsoWpArpMbK1vc8ItLDG1j1SDvrZM6muBxN/Eg7U1cVFw70KmyRl13bhqjYeBGGrtuFqWTSzzF/q8tRyvV9SfxHXQLoBuidXY0ekeF+KQnNCqgHXaIy7KJBncNERk6VUFhhB33j8zv5uhQ/rCTvbq9/9seH5Pj3Bf/TsuzYf9g2j+3h9N6yZ8Vfpmx4KSguSY5S0lOqc5LmgmhidoMmOaixoFvktFKOo9kK9Nrt3rPxViWk5RwIhtJykZzXohP2DjmZ08+bnH/4B1fkUnGSp2SMmNlIYTguS5ga//eERZZTSVeD8cWPTMGeTMgHSOMpyRLGftDyUKwBV9b6Dx5vPwPzQHjFwsFAAA="; - let decoded_horrible_blob = base64::engine::general_purpose::STANDARD - .decode(horrible_blob) - .unwrap(); - - let bytes = Bytes::from(decoded_horrible_blob); - let events = RawEvent::from_bytes( - &EventQuery { - compression: Some(Compression::Gzip), - lib_version: None, - sent_at: None, - }, - bytes, + fn decode_uncompressed_raw_event() { + let base64_payload = "ewogICAgImRpc3RpbmN0X2lkIjogIm15X2lkMSIsCiAgICAiZXZlbnQiOiAibXlfZXZlbnQxIiwKICAgICJwcm9wZXJ0aWVzIjogewogICAgICAgICIkZGV2aWNlX3R5cGUiOiAiRGVza3RvcCIKICAgIH0sCiAgICAiYXBpX2tleSI6ICJteV90b2tlbjEiCn0K"; + let compressed_bytes = Bytes::from( + base64::engine::general_purpose::STANDARD + .decode(base64_payload) + .expect("payload is not base64"), + ); + + let events = RawEvent::from_bytes(compressed_bytes).expect("failed to parse"); + assert_eq!(1, events.len()); + assert_eq!(Some("my_token1".to_string()), events[0].extract_token()); + assert_eq!("my_event1".to_string(), events[0].event); + assert_eq!( + "my_id1".to_string(), + events[0] + .extract_distinct_id() + .expect("cannot find distinct_id") + ); + } + #[test] + fn decode_gzipped_raw_event() { + let base64_payload = "H4sIADQSbmUCAz2MsQqAMAxE936FBEcnR2f/o4i9IRTb0AahiP9urcVMx3t3ucxQjxxn5bCrZUfLQEepYabpkzgRtOOWfyMpCpIyctVXY42PDifvsFoE73BF9hqFWuPu403YepT+WKNHmMnc5gENoFu2kwAAAA=="; + let compressed_bytes = Bytes::from( + base64::engine::general_purpose::STANDARD + .decode(base64_payload) + .expect("payload is not base64"), ); - assert!(events.is_ok()); + let events = RawEvent::from_bytes(compressed_bytes).expect("failed to parse"); + assert_eq!(1, events.len()); + assert_eq!(Some("my_token2".to_string()), events[0].extract_token()); + assert_eq!("my_event2".to_string(), events[0].event); + assert_eq!( + "my_id2".to_string(), + events[0] + .extract_distinct_id() + .expect("cannot find distinct_id") + ); + } + + #[test] + fn extract_distinct_id() { + let parse_and_extract = |input: &'static str| -> Result { + let parsed = RawEvent::from_bytes(input.into()).expect("failed to parse"); + parsed[0].extract_distinct_id() + }; + // Return MissingDistinctId if not found + assert!(matches!( + parse_and_extract(r#"{"event": "e"}"#), + Err(CaptureError::MissingDistinctId) + )); + // Return MissingDistinctId if null, breaking compat with capture-py + assert!(matches!( + parse_and_extract(r#"{"event": "e", "distinct_id": null}"#), + Err(CaptureError::MissingDistinctId) + )); + + let assert_extracted_id = |input: &'static str, expected: &str| { + let id = parse_and_extract(input).expect("failed to extract"); + assert_eq!(id, expected); + }; + // Happy path: toplevel field present + assert_extracted_id(r#"{"event": "e", "distinct_id": "myid"}"#, "myid"); + assert_extracted_id(r#"{"event": "e", "$distinct_id": "23"}"#, "23"); + + // Sourced from properties if not present in toplevel field, but toplevel wins if both present + assert_extracted_id( + r#"{"event": "e", "properties":{"distinct_id": "myid"}}"#, + "myid", + ); + assert_extracted_id( + r#"{"event": "e", "distinct_id": 23, "properties":{"distinct_id": "myid"}}"#, + "23", + ); + + // Numbers are stringified + assert_extracted_id(r#"{"event": "e", "distinct_id": 23}"#, "23"); + assert_extracted_id(r#"{"event": "e", "distinct_id": 23.4}"#, "23.4"); + + // Containers are stringified + assert_extracted_id( + r#"{"event": "e", "distinct_id": ["a", "b"]}"#, + r#"["a","b"]"#, + ); + assert_extracted_id( + r#"{"event": "e", "distinct_id": {"string": "a", "number": 3}}"#, + r#"{"number":3,"string":"a"}"#, + ); + } + + #[test] + fn extract_distinct_id_trims_to_200_chars() { + let distinct_id: String = rand::thread_rng() + .sample_iter(Alphanumeric) + .take(222) + .map(char::from) + .collect(); + let (expected_distinct_id, _) = distinct_id.split_at(200); // works because ascii chars only + let input = json!([{ + "token": "mytoken", + "event": "myevent", + "distinct_id": distinct_id + }]); + + let parsed = RawEvent::from_bytes(input.to_string().into()).expect("failed to parse"); + assert_eq!( + parsed[0].extract_distinct_id().expect("failed to extract"), + expected_distinct_id + ); } }