diff --git a/Cargo.lock b/Cargo.lock index 4c25a12026..37029102bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2366,6 +2366,7 @@ dependencies = [ "serde_json", "simd-doc", "socket2", + "sources", "tempfile", "time 0.3.36", "tokio", @@ -2382,6 +2383,7 @@ dependencies = [ "typestate", "unseal", "url", + "validation-tests", "webpki", ] @@ -8043,6 +8045,36 @@ dependencies = [ [[package]] name = "validation" version = "0.0.0" +dependencies = [ + "anyhow", + "assemble", + "bytes", + "doc", + "extractors", + "futures", + "itertools 0.10.5", + "json", + "labels", + "lazy_static", + "models", + "pbjson-types", + "proto-flow", + "regex", + "rusqlite", + "serde", + "serde_json", + "sources", + "strsim 0.10.0", + "superslice", + "tables", + "thiserror", + "tracing", + "url", +] + +[[package]] +name = "validation-tests" +version = "0.0.0" dependencies = [ "anyhow", "assemble", @@ -8072,6 +8104,7 @@ dependencies = [ "thiserror", "tracing", "url", + "validation", ] [[package]] diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index 945d4a4be4..073d1fa421 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -86,6 +86,8 @@ apache-avro = { workspace = true } async-process = { path = "../async-process" } flowctl = { path = "../flowctl" } locate-bin = { path = "../locate-bin" } +sources = { path = "../sources" } +validation-tests = { path = "../validation-tests" } apache-avro = { workspace = true } insta = { workspace = true } diff --git a/crates/dekaf/src/connector.rs b/crates/dekaf/src/connector.rs index 805e648de2..ad889e6496 100644 --- a/crates/dekaf/src/connector.rs +++ b/crates/dekaf/src/connector.rs @@ -172,7 +172,7 @@ pub async fn unary_materialize( // Largely lifted from materialize-kafka // TODO(jshearer): Expose this logic somewhere that materialize-kafka can use it -fn constraint_for_projection( +pub fn constraint_for_projection( projection: &flow::Projection, resource_config: &DekafResourceConfig, endpoint_config: &DekafConfig, diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index 5f2b09b608..181f3e91f8 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -13,9 +13,10 @@ mod topology; use topology::{Collection, Partition}; mod read; +pub use read::extract_and_encode; use read::Read; -mod utils; +pub mod utils; mod session; pub use session::Session; diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index f5973435d1..43fdbaaaa4 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -296,7 +296,7 @@ impl Read { tmp.push(0); tmp.extend(self.value_schema_id.to_be_bytes()); - self.extract_and_encode(root.get(), &mut tmp)?; + extract_and_encode(self.extractors.as_slice(), root.get(), &mut tmp)?; record_bytes += tmp.len(); buf.extend_from_slice(&tmp); @@ -402,37 +402,6 @@ impl Read { }, )) } - - /// Handles extracting and avro-encoding a particular field. - /// Note that since avro encoding can happen piecewise, there's never a need to - /// put together the whole extracted document, and instead we can build up the - /// encoded output iteratively - fn extract_and_encode<'a>( - &'a self, - original: &'a doc::ArchivedNode, - buf: &mut Vec, - ) -> anyhow::Result<()> { - self.extractors - .iter() - .try_fold(buf, |buf, (schema, extractor)| { - // This is the value extracted from the original doc - if let Err(e) = match extractor.extract(original) { - Ok(value) => avro::encode(buf, schema, value), - Err(default) => avro::encode(buf, schema, &default.into_owned()), - } - .context(format!( - "Extracting field {extractor:#?}, schema: {schema:?}" - )) { - let debug_serialized = serde_json::to_string(&original.to_debug_json_value())?; - tracing::debug!(extractor=?extractor, ?schema, debug_serialized, ?e, "Failed to encode"); - return Err(e); - } - - Ok::<_, anyhow::Error>(buf) - })?; - - Ok(()) - } } fn compressor( @@ -459,3 +428,34 @@ fn compressor( }; Ok(()) } + +/// Handles extracting and avro-encoding a particular field. +/// Note that since avro encoding can happen piecewise, there's never a need to +/// put together the whole extracted document, and instead we can build up the +/// encoded output iteratively +pub fn extract_and_encode<'a, N: AsNode>( + extractors: &'a [(avro::Schema, utils::CustomizableExtractor)], + original: &'a N, + buf: &mut Vec, +) -> anyhow::Result<()> { + extractors + .iter() + .try_fold(buf, |buf, (schema, extractor)| { + // This is the value extracted from the original doc + if let Err(e) = match extractor.extract(original) { + Ok(value) => avro::encode(buf, schema, value), + Err(default) => avro::encode(buf, schema, &default.into_owned()), + } + .context(format!( + "Extracting field {extractor:#?}, schema: {schema:?}" + )) { + let debug_serialized = serde_json::to_string(&original.to_debug_json_value())?; + tracing::debug!(extractor=?extractor, ?schema, debug_serialized, ?e, "Failed to encode"); + return Err(e); + } + + Ok::<_, anyhow::Error>(buf) + })?; + + Ok(()) +} diff --git a/crates/dekaf/tests/dekaf_schema_test.rs b/crates/dekaf/tests/dekaf_schema_test.rs new file mode 100644 index 0000000000..a69b5b8aa4 --- /dev/null +++ b/crates/dekaf/tests/dekaf_schema_test.rs @@ -0,0 +1,397 @@ +use anyhow::Context; +use dekaf::connector; +use itertools::Itertools; +use proto_flow::{flow::materialization_spec::ConnectorType, materialize}; +use serde_json::json; +use validation_tests::Outcome; + +fn run_validation(fixture: &str) -> anyhow::Result { + let outcome = validation_tests::run(fixture, "{}"); + + let mut errors = outcome + .errors + .iter() + .chain(outcome.errors_draft.iter()) + .peekable(); + + if errors.peek().is_some() { + let formatted = errors.format("\n"); + anyhow::bail!("Validation errors: {formatted:?}") + } + + Ok(outcome) +} + +fn json_schema_to_shape(schema: &str) -> anyhow::Result { + let json_schema = doc::validation::build_bundle(schema)?; + let validator = doc::Validator::new(json_schema)?; + Ok(doc::Shape::infer( + &validator.schemas()[0], + validator.schema_index(), + )) +} + +fn build_test_fixture( + schema: serde_json::Value, + field_selection: serde_json::Value, + config: connector::DekafConfig, + bindings: Option>, +) -> String { + let materialization = if let Some(bindings) = bindings { + json!({ + "connectorType": "DEKAF", + "config": { + "variant": "foo", + "config": config + }, + "bindings": bindings + }) + } else { + json!({ + "connectorType": "DEKAF", + "config": { + "variant": "foo", + "config": config + }, + "bindings": [{ + "constraints": {}, + "resourcePath": ["anything"] + }] + }) + }; + + serde_json::to_string_pretty(&json!({ + "test://example/catalog.yaml": { + "collections": { + "test/collection": schema + }, + "materializations": { + "test/materialization":{ + "endpoint": { + "dekaf": { + "variant": "foo", + "config": config + }, + }, + "bindings": [ + { + "source": "test/collection", + "resource": { + "topic_name": "foo" + }, + "fields": field_selection + } + ] + } + } + }, + "driver": { + "dataPlanes": { + "1d:1d:1d:1d:1d:1d:1d:1d": {"default": true} + }, + "materializations": { + "test/materialization": materialization + } + } + })) + .unwrap() +} + +/// Helper function to run validation and get the components needed for field extraction +async fn get_extraction_components( + schema: serde_json::Value, + field_selection: serde_json::Value, + config: connector::DekafConfig, +) -> anyhow::Result<( + doc::Shape, + proto_flow::flow::FieldSelection, + Vec, +)> { + // First run to get validated bindings + let fixture = build_test_fixture( + schema.clone(), + // Just need something that passes validation, we'll pass in the real + // field selection in the second pass once we have the bindings + json!({"recommended": true}), + config.clone(), + None, + ); + let outcome = run_validation(fixture.as_ref())?; + let built_materializations = outcome.built_materializations; + + let materialization_spec = built_materializations + .first() + .context("No materializations built")? + .spec + .as_ref() + .context("No spec")?; + + let validate_req = materialize::request::Validate { + name: "what".to_string(), + connector_type: ConnectorType::Dekaf as i32, + config_json: materialization_spec.config_json.to_owned(), + bindings: materialization_spec + .bindings + .iter() + .map( + |binding| proto_flow::materialize::request::validate::Binding { + resource_config_json: binding.resource_config_json.clone(), + collection: binding.collection.clone(), + field_config_json_map: binding + .field_selection + .as_ref() + .expect("No field selection") + .field_config_json_map + .clone(), + backfill: 0, + }, + ) + .collect_vec(), + last_materialization: None, + last_version: "foo".to_owned(), + }; + + let validate_resp = connector::unary_materialize(materialize::Request { + validate: Some(validate_req), + ..Default::default() + }) + .await?; + + let bindings = validate_resp + .validated + .as_ref() + .context("No validated response")? + .bindings + .clone(); + + // Second run with validated bindings to get final components + let fixture = build_test_fixture(schema, field_selection, config, Some(bindings)); + let outcome = run_validation(fixture.as_ref())?; + let built_materializations = outcome.built_materializations; + let built_collections = outcome.built_collections; + + let collection = built_collections.first().context("No collections built")?; + let collection_spec = collection.spec.as_ref().context("No collection spec")?; + let materialization = built_materializations + .first() + .context("No materializations built")?; + let materialization_spec = materialization.spec.as_ref().context("No spec")?; + + let schema = if collection_spec.read_schema_json.len() > 0 { + &collection_spec.read_schema_json + } else { + &collection_spec.write_schema_json + }; + + let shape = json_schema_to_shape(schema)?; + let field_selection = materialization_spec + .bindings + .first() + .context("No bindings")? + .field_selection + .as_ref() + .context("No field selection")? + .clone(); + + Ok((shape, field_selection, collection_spec.projections.clone())) +} + +async fn roundtrip( + endpoint_config: connector::DekafConfig, + schema: serde_json::Value, + field_selection: serde_json::Value, + docs: Vec, +) -> anyhow::Result>> { + let (shape, field_selection, projections) = + get_extraction_components(schema, field_selection, endpoint_config.clone()).await?; + + let (avro_schema, extractors) = dekaf::utils::build_field_extractors( + shape, + field_selection, + projections, + endpoint_config.deletions, + )?; + + let avro::Schema::Record(root_schema) = &avro_schema else { + anyhow::bail!("Invalid schema"); + }; + + let field_schemas = root_schema.fields.iter().cloned().map(|f| f.schema); + let extractors = field_schemas + .zip(extractors.clone().into_iter()) + .collect_vec(); + + docs.into_iter() + .map(|doc| { + // Extract and encode document + let mut encoded = Vec::new(); + dekaf::extract_and_encode(extractors.as_slice(), &doc, &mut encoded)?; + + // Now decode it back into a Value representation + Ok(apache_avro::from_avro_datum( + &avro_schema, + &mut encoded.as_slice(), + None, + )) + }) + .collect::, _>>() +} + +#[tokio::test] +async fn test_allof_with_null_default() -> anyhow::Result<()> { + for output in roundtrip( + connector::DekafConfig { + deletions: connector::DeletionMode::Kafka, + token: "1234".to_string(), + strict_topic_names: false, + }, + json!({ + "schema": { + "allOf": [ + { + "properties": { + "id": { + "title": "Id", + "type": "integer" + }, + "conflicts": { + "type": ["integer", "null"], + "default": null, + "title": "Updatedbyuserid" + } + }, + "required": ["id"], + "type": "object" + }, + { + "properties": { + "id": { + "title": "Id", + "type": "integer" + }, + "conflicts": { + "type": "integer" + } + }, + "required": ["id"], + "type": "object" + } + ] + }, + "key": ["/id"] + }), + json!({ + "recommended": true + }), + vec![json!({ + "id": 671963468 + })], + ) + .await? + { + insta::assert_debug_snapshot!(output?); + } + + Ok(()) +} + +#[tokio::test] +async fn test_field_selection_specific_fields() -> anyhow::Result<()> { + for output in roundtrip( + dekaf::connector::DekafConfig { + deletions: dekaf::connector::DeletionMode::Kafka, + strict_topic_names: false, + token: "1234".to_string(), + }, + json!({ + "schema": { + "properties": { + "key": { + "type": "string" + }, + "field_a": { + "type": "string", + }, + "field_b": { + "type": "string", + } + }, + "type": "object", + "required": [ + "key", + "field_a", + "field_b" + ], + }, + "key": [ + "/key" + ] + }), + json!({ + "include": { + "field_a": {} + }, + "recommended": false + }), + vec![json!({ + "key": "first", + "field_a": "foo", + "field_b": "bar" + })], + ) + .await? + { + insta::assert_debug_snapshot!(output?); + } + + Ok(()) +} + +#[tokio::test] +async fn test_field_selection_recommended_fields() -> anyhow::Result<()> { + for output in roundtrip( + dekaf::connector::DekafConfig { + deletions: dekaf::connector::DeletionMode::Kafka, + strict_topic_names: false, + token: "1234".to_string(), + }, + json!({ + "schema": { + "properties": { + "key": { + "type": "string" + }, + "field_a": { + "type": "string", + }, + "field_b": { + "type": "string", + } + }, + "type": "object", + "required": [ + "key", + "field_a", + "field_b" + ], + }, + "key": [ + "/key" + ] + }), + json!({ + "recommended": true + }), + vec![json!({ + "key": "first", + "field_a": "foo", + "field_b": "bar" + })], + ) + .await? + { + insta::assert_debug_snapshot!(output?); + } + + Ok(()) +} diff --git a/crates/dekaf/tests/snapshots/dekaf_schema_test__allof_with_null_default.snap b/crates/dekaf/tests/snapshots/dekaf_schema_test__allof_with_null_default.snap new file mode 100644 index 0000000000..ac6a0316f9 --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_schema_test__allof_with_null_default.snap @@ -0,0 +1,37 @@ +--- +source: crates/dekaf/tests/dekaf_schema_test.rs +expression: output? +--- +Record( + [ + ( + "id", + Long( + 671963468, + ), + ), + ( + "conflicts", + Union( + 1, + Null, + ), + ), + ( + "flow_published_at", + Union( + 0, + Record( + [ + ( + "json", + String( + "null", + ), + ), + ], + ), + ), + ), + ], +) diff --git a/crates/dekaf/tests/snapshots/dekaf_schema_test__field_selection_recommended_fields.snap b/crates/dekaf/tests/snapshots/dekaf_schema_test__field_selection_recommended_fields.snap new file mode 100644 index 0000000000..c91bf69b26 --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_schema_test__field_selection_recommended_fields.snap @@ -0,0 +1,42 @@ +--- +source: crates/dekaf/tests/dekaf_schema_test.rs +expression: output? +--- +Record( + [ + ( + "key", + String( + "first", + ), + ), + ( + "field_a", + String( + "foo", + ), + ), + ( + "field_b", + String( + "bar", + ), + ), + ( + "flow_published_at", + Union( + 0, + Record( + [ + ( + "json", + String( + "null", + ), + ), + ], + ), + ), + ), + ], +) diff --git a/crates/dekaf/tests/snapshots/dekaf_schema_test__field_selection_specific_fields.snap b/crates/dekaf/tests/snapshots/dekaf_schema_test__field_selection_specific_fields.snap new file mode 100644 index 0000000000..71d86f1a35 --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_schema_test__field_selection_specific_fields.snap @@ -0,0 +1,14 @@ +--- +source: crates/dekaf/tests/dekaf_schema_test.rs +expression: output? +--- +Record( + [ + ( + "field_a", + String( + "foo", + ), + ), + ], +)