Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
feat: accept other JSON types as distinct_id, stringify them (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Dec 5, 2023
1 parent 3524d26 commit 7ff5b80
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 40 deletions.
27 changes: 7 additions & 20 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ pub async fn event(
let payload = base64::engine::general_purpose::STANDARD
.decode(input.data)
.unwrap();
RawEvent::from_bytes(&meta, payload.into())
RawEvent::from_bytes(payload.into())
}
ct => {
tracing::Span::current().record("content_type", ct);

RawEvent::from_bytes(&meta, body)
RawEvent::from_bytes(body)
}
}?;

Expand Down Expand Up @@ -165,19 +165,6 @@ pub fn process_single_event(
event: &RawEvent,
context: &ProcessingContext,
) -> Result<ProcessedEvent, CaptureError> {
let distinct_id = match &event.distinct_id {
Some(id) => id,
None => match event.properties.get("distinct_id").map(|v| v.as_str()) {
Some(Some(id)) => id,
_ => return Err(CaptureError::MissingDistinctId),
},
};
// Limit the size of distinct_id to 200 chars
let distinct_id: String = match distinct_id.len() {
0..=200 => distinct_id.to_owned(),
_ => distinct_id.chars().take(200).collect(),
};

if event.event.is_empty() {
return Err(CaptureError::MissingEventName);
}
Expand All @@ -189,7 +176,7 @@ pub fn process_single_event(

Ok(ProcessedEvent {
uuid: event.uuid.unwrap_or_else(uuid_v7),
distinct_id,
distinct_id: event.extract_distinct_id()?,
ip: context.client_ip.clone(),
data,
now: context.now.clone(),
Expand Down Expand Up @@ -252,7 +239,7 @@ mod tests {
let events = vec![
RawEvent {
token: Some(String::from("hello")),
distinct_id: Some("testing".to_string()),
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::new(),
Expand All @@ -263,7 +250,7 @@ mod tests {
},
RawEvent {
token: None,
distinct_id: Some("testing".to_string()),
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::from([(String::from("token"), json!("hello"))]),
Expand All @@ -283,7 +270,7 @@ mod tests {
let events = vec![
RawEvent {
token: Some(String::from("hello")),
distinct_id: Some("testing".to_string()),
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::new(),
Expand All @@ -294,7 +281,7 @@ mod tests {
},
RawEvent {
token: None,
distinct_id: Some("testing".to_string()),
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::from([(String::from("token"), json!("goodbye"))]),
Expand Down
159 changes: 139 additions & 20 deletions capture/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub struct RawEvent {
skip_serializing_if = "Option::is_none"
)]
pub token: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub distinct_id: Option<String>,
#[serde(alias = "$distinct_id", skip_serializing_if = "Option::is_none")]
pub distinct_id: Option<Value>, // posthog-js accepts arbitrary values as distinct_id
pub uuid: Option<Uuid>,
pub event: String,
#[serde(default)]
Expand Down Expand Up @@ -87,7 +87,7 @@ impl RawEvent {
/// Instead of trusting the parameter, we peek at the payload's first three bytes to
/// detect gzip, fallback to uncompressed utf8 otherwise.
#[instrument(skip_all)]
pub fn from_bytes(_query: &EventQuery, bytes: Bytes) -> Result<Vec<RawEvent>, CaptureError> {
pub fn from_bytes(bytes: Bytes) -> Result<Vec<RawEvent>, CaptureError> {
tracing::debug!(len = bytes.len(), "decoding new event");

let payload = if bytes.starts_with(&GZIP_MAGIC_NUMBERS) {
Expand Down Expand Up @@ -119,6 +119,30 @@ impl RawEvent {
.map(String::from),
}
}

/// Extracts, stringifies and trims the distinct_id to a 200 chars String.
/// SDKs send the distinct_id either in the root field or as a property,
/// and can send string, number, array, or map values. We try to best-effort
/// stringify complex values, and make sure it's not longer than 200 chars.
pub fn extract_distinct_id(&self) -> Result<String, CaptureError> {
// Breaking change compared to capture-py: None / Null is not allowed.
let value = match &self.distinct_id {
None | Some(Value::Null) => match self.properties.get("distinct_id") {
None | Some(Value::Null) => return Err(CaptureError::MissingDistinctId),
Some(id) => id,
},
Some(id) => id,
};

let distinct_id = value
.as_str()
.map(|s| s.to_owned())
.unwrap_or_else(|| value.to_string());
Ok(match distinct_id.len() {
0..=200 => distinct_id,
_ => distinct_id.chars().take(200).collect(),
})
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -150,29 +174,124 @@ impl ProcessedEvent {

#[cfg(test)]
mod tests {
use super::Compression;
use base64::Engine as _;
use bytes::Bytes;
use rand::distributions::Alphanumeric;
use rand::Rng;
use serde_json::json;

use super::{EventQuery, RawEvent};
use super::CaptureError;
use super::RawEvent;

#[test]
fn decode_bytes() {
let horrible_blob = "H4sIAAAAAAAAA31T207cMBD9lSrikSy+5bIrVX2g4oWWUlEqBEKRY08Sg4mD4+xCEf/e8XLZBSGeEp+ZOWOfmXPxkMAS+pAskp1BtmBBLiHZTQbvBvDBwJgsHpIdh5/kp1Rffp18OcMwAtUS/GhcjwFKZjSbkYjX3q1G8AgeGA+Nu4ughqVRUIX7ATDwHcbr4IYYUJP32LyavMVAF8Kw2NuzTknbuTEsSkIIHlvTf+vhLnzdizUxgslvs2JgkKHr5U1s8VS0dZ/NZSnlW7CVfTvhs7EG+vT0JJaMygP0VQem7bDTvBAbcGV06JAkIwTBpYHV4Hx4zS1FJH+FX7IFj7A1NbZZQR2b4GFbwFlWzFjETY/XCpXRiN538yt/S9mdnm7bSa+lDCY+kOalKDJGs/msZMVuos0YTK+e62hZciHqes7LnDcpoVmTg+TAaqnKMhWUaaa4TllBoCDpJn2uYK3k87xeyFjZFHWdzxmdq5Q0IstBzRXlDMiHbM/5kgnerKfs+tFZqHAolQflvDZ9W0Evawu6wveiENVoND4s+Ami2jBGZbayn/42g3xblizX4skp4FYMYfJQoSQf8DfSjrGBVMEsoWpArpMbK1vc8ItLDG1j1SDvrZM6muBxN/Eg7U1cVFw70KmyRl13bhqjYeBGGrtuFqWTSzzF/q8tRyvV9SfxHXQLoBuidXY0ekeF+KQnNCqgHXaIy7KJBncNERk6VUFhhB33j8zv5uhQ/rCTvbq9/9seH5Pj3Bf/TsuzYf9g2j+3h9N6yZ8Vfpmx4KSguSY5S0lOqc5LmgmhidoMmOaixoFvktFKOo9kK9Nrt3rPxViWk5RwIhtJykZzXohP2DjmZ08+bnH/4B1fkUnGSp2SMmNlIYTguS5ga//eERZZTSVeD8cWPTMGeTMgHSOMpyRLGftDyUKwBV9b6Dx5vPwPzQHjFwsFAAA=";
let decoded_horrible_blob = base64::engine::general_purpose::STANDARD
.decode(horrible_blob)
.unwrap();

let bytes = Bytes::from(decoded_horrible_blob);
let events = RawEvent::from_bytes(
&EventQuery {
compression: Some(Compression::Gzip),
lib_version: None,
sent_at: None,
},
bytes,
fn decode_uncompressed_raw_event() {
let base64_payload = "ewogICAgImRpc3RpbmN0X2lkIjogIm15X2lkMSIsCiAgICAiZXZlbnQiOiAibXlfZXZlbnQxIiwKICAgICJwcm9wZXJ0aWVzIjogewogICAgICAgICIkZGV2aWNlX3R5cGUiOiAiRGVza3RvcCIKICAgIH0sCiAgICAiYXBpX2tleSI6ICJteV90b2tlbjEiCn0K";
let compressed_bytes = Bytes::from(
base64::engine::general_purpose::STANDARD
.decode(base64_payload)
.expect("payload is not base64"),
);

let events = RawEvent::from_bytes(compressed_bytes).expect("failed to parse");
assert_eq!(1, events.len());
assert_eq!(Some("my_token1".to_string()), events[0].extract_token());
assert_eq!("my_event1".to_string(), events[0].event);
assert_eq!(
"my_id1".to_string(),
events[0]
.extract_distinct_id()
.expect("cannot find distinct_id")
);
}
#[test]
fn decode_gzipped_raw_event() {
let base64_payload = "H4sIADQSbmUCAz2MsQqAMAxE936FBEcnR2f/o4i9IRTb0AahiP9urcVMx3t3ucxQjxxn5bCrZUfLQEepYabpkzgRtOOWfyMpCpIyctVXY42PDifvsFoE73BF9hqFWuPu403YepT+WKNHmMnc5gENoFu2kwAAAA==";
let compressed_bytes = Bytes::from(
base64::engine::general_purpose::STANDARD
.decode(base64_payload)
.expect("payload is not base64"),
);

assert!(events.is_ok());
let events = RawEvent::from_bytes(compressed_bytes).expect("failed to parse");
assert_eq!(1, events.len());
assert_eq!(Some("my_token2".to_string()), events[0].extract_token());
assert_eq!("my_event2".to_string(), events[0].event);
assert_eq!(
"my_id2".to_string(),
events[0]
.extract_distinct_id()
.expect("cannot find distinct_id")
);
}

#[test]
fn extract_distinct_id() {
let parse_and_extract = |input: &'static str| -> Result<String, CaptureError> {
let parsed = RawEvent::from_bytes(input.into()).expect("failed to parse");
parsed[0].extract_distinct_id()
};
// Return MissingDistinctId if not found
assert!(matches!(
parse_and_extract(r#"{"event": "e"}"#),
Err(CaptureError::MissingDistinctId)
));
// Return MissingDistinctId if null, breaking compat with capture-py
assert!(matches!(
parse_and_extract(r#"{"event": "e", "distinct_id": null}"#),
Err(CaptureError::MissingDistinctId)
));

let assert_extracted_id = |input: &'static str, expected: &str| {
let id = parse_and_extract(input).expect("failed to extract");
assert_eq!(id, expected);
};
// Happy path: toplevel field present
assert_extracted_id(r#"{"event": "e", "distinct_id": "myid"}"#, "myid");
assert_extracted_id(r#"{"event": "e", "$distinct_id": "23"}"#, "23");

// Sourced from properties if not present in toplevel field, but toplevel wins if both present
assert_extracted_id(
r#"{"event": "e", "properties":{"distinct_id": "myid"}}"#,
"myid",
);
assert_extracted_id(
r#"{"event": "e", "distinct_id": 23, "properties":{"distinct_id": "myid"}}"#,
"23",
);

// Numbers are stringified
assert_extracted_id(r#"{"event": "e", "distinct_id": 23}"#, "23");
assert_extracted_id(r#"{"event": "e", "distinct_id": 23.4}"#, "23.4");

// Containers are stringified
assert_extracted_id(
r#"{"event": "e", "distinct_id": ["a", "b"]}"#,
r#"["a","b"]"#,
);
assert_extracted_id(
r#"{"event": "e", "distinct_id": {"string": "a", "number": 3}}"#,
r#"{"number":3,"string":"a"}"#,
);
}

#[test]
fn extract_distinct_id_trims_to_200_chars() {
let distinct_id: String = rand::thread_rng()
.sample_iter(Alphanumeric)
.take(222)
.map(char::from)
.collect();
let (expected_distinct_id, _) = distinct_id.split_at(200); // works because ascii chars only
let input = json!([{
"token": "mytoken",
"event": "myevent",
"distinct_id": distinct_id
}]);

let parsed = RawEvent::from_bytes(input.to_string().into()).expect("failed to parse");
assert_eq!(
parsed[0].extract_distinct_id().expect("failed to extract"),
expected_distinct_id
);
}
}

0 comments on commit 7ff5b80

Please sign in to comment.