Skip to content

Commit

Permalink
dekaf: Add lighter field selection and avro encoding snapshot tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Feb 19, 2025
1 parent 815619a commit 3a3b1b7
Show file tree
Hide file tree
Showing 9 changed files with 560 additions and 34 deletions.
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ apache-avro = { workspace = true }
async-process = { path = "../async-process" }
flowctl = { path = "../flowctl" }
locate-bin = { path = "../locate-bin" }
sources = { path = "../sources" }
validation-tests = { path = "../validation-tests" }

apache-avro = { workspace = true }
insta = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ pub async fn unary_materialize(

// Largely lifted from materialize-kafka
// TODO(jshearer): Expose this logic somewhere that materialize-kafka can use it
fn constraint_for_projection(
pub fn constraint_for_projection(
projection: &flow::Projection,
resource_config: &DekafResourceConfig,
endpoint_config: &DekafConfig,
Expand Down
3 changes: 2 additions & 1 deletion crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ mod topology;
use topology::{Collection, Partition};

mod read;
pub use read::extract_and_encode;
use read::Read;

mod utils;
pub mod utils;

mod session;
pub use session::Session;
Expand Down
64 changes: 32 additions & 32 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl Read {
tmp.push(0);
tmp.extend(self.value_schema_id.to_be_bytes());

self.extract_and_encode(root.get(), &mut tmp)?;
extract_and_encode(self.extractors.as_slice(), root.get(), &mut tmp)?;

record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
Expand Down Expand Up @@ -402,37 +402,6 @@ impl Read {
},
))
}

/// Handles extracting and avro-encoding a particular field.
/// Note that since avro encoding can happen piecewise, there's never a need to
/// put together the whole extracted document, and instead we can build up the
/// encoded output iteratively
fn extract_and_encode<'a>(
&'a self,
original: &'a doc::ArchivedNode,
buf: &mut Vec<u8>,
) -> anyhow::Result<()> {
self.extractors
.iter()
.try_fold(buf, |buf, (schema, extractor)| {
// This is the value extracted from the original doc
if let Err(e) = match extractor.extract(original) {
Ok(value) => avro::encode(buf, schema, value),
Err(default) => avro::encode(buf, schema, &default.into_owned()),
}
.context(format!(
"Extracting field {extractor:#?}, schema: {schema:?}"
)) {
let debug_serialized = serde_json::to_string(&original.to_debug_json_value())?;
tracing::debug!(extractor=?extractor, ?schema, debug_serialized, ?e, "Failed to encode");
return Err(e);
}

Ok::<_, anyhow::Error>(buf)
})?;

Ok(())
}
}

fn compressor<Output: BufMut>(
Expand All @@ -459,3 +428,34 @@ fn compressor<Output: BufMut>(
};
Ok(())
}

/// Handles extracting and avro-encoding a particular field.
/// Note that since avro encoding can happen piecewise, there's never a need to
/// put together the whole extracted document, and instead we can build up the
/// encoded output iteratively
pub fn extract_and_encode<'a, N: AsNode>(
extractors: &'a [(avro::Schema, utils::CustomizableExtractor)],
original: &'a N,
buf: &mut Vec<u8>,
) -> anyhow::Result<()> {
extractors
.iter()
.try_fold(buf, |buf, (schema, extractor)| {
// This is the value extracted from the original doc
if let Err(e) = match extractor.extract(original) {
Ok(value) => avro::encode(buf, schema, value),
Err(default) => avro::encode(buf, schema, &default.into_owned()),
}
.context(format!(
"Extracting field {extractor:#?}, schema: {schema:?}"
)) {
let debug_serialized = serde_json::to_string(&original.to_debug_json_value())?;
tracing::debug!(extractor=?extractor, ?schema, debug_serialized, ?e, "Failed to encode");
return Err(e);
}

Ok::<_, anyhow::Error>(buf)
})?;

Ok(())
}
Loading

0 comments on commit 3a3b1b7

Please sign in to comment.