diff --git a/src/log-store/src/kafka/util/record.rs b/src/log-store/src/kafka/util/record.rs index aff29f7482ef..a2840f808f57 100644 --- a/src/log-store/src/kafka/util/record.rs +++ b/src/log-store/src/kafka/util/record.rs @@ -59,16 +59,13 @@ impl From for i32 { } } -/// The minimal storage unit in the Kafka log store. -/// Our own Record will be converted into a Kafka record during producing. +/// The metadata of a record. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct Record { +pub struct RecordMeta { /// The version of the record. Used for backward compatibility. version: u32, /// The type of the record. pub tp: RecordType, - /// The payload of the record. - data: Vec, /// The id of the entry the record associated with. pub entry_id: EntryId, /// The namespace of the entry the record associated with. @@ -77,14 +74,24 @@ pub struct Record { checksum: u32, } -impl TryInto for Record { +/// The minimal storage unit in the Kafka log store. +/// Our own Record will be converted into a Kafka record during producing. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct Record { + /// The metadata of the record. + meta: RecordMeta, + /// The payload of the record. + data: Vec, +} + +impl TryFrom for KafkaRecord { type Error = crate::error::Error; - fn try_into(self) -> Result { - let value = serde_json::to_vec(&self).context(EncodeJsonSnafu)?; + fn try_from(record: Record) -> Result { + let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?; Ok(KafkaRecord { - key: None, - value: Some(value), + key: Some(key), + value: Some(record.data), timestamp: rskafka::chrono::Utc::now(), headers: Default::default(), }) @@ -95,8 +102,12 @@ impl TryFrom for Record { type Error = crate::error::Error; fn try_from(kafka_record: KafkaRecord) -> Result { - let value = kafka_record.value.context(MissingValueSnafu)?; - serde_json::from_slice(&value).context(DecodeJsonSnafu) + let key = kafka_record.key.unwrap(); + let meta = serde_json::from_slice(&key) + .context(DecodeJsonSnafu) + .unwrap(); + let data = kafka_record.value.context(MissingValueSnafu)?; + Ok(Self { meta, data }) } } @@ -106,8 +117,8 @@ impl TryFrom> for EntryImpl { fn try_from(records: Vec) -> Result { check_records(&records)?; - let entry_id = records[0].entry_id; - let ns = records[0].ns.clone(); + let entry_id = records[0].meta.entry_id; + let ns = records[0].meta.ns.clone(); let data = records.into_iter().flat_map(|record| record.data).collect(); Ok(EntryImpl { data, @@ -197,10 +208,10 @@ fn check_records(records: &[Record]) -> Result<()> { let mut namespaces = HashSet::with_capacity(len); let mut checksum_matched = HashSet::with_capacity(len); for record in records { - sequence.push(i32::from(record.tp)); - entry_ids.insert(record.entry_id); - namespaces.insert(&record.ns); - checksum_matched.insert(record.checksum == crc32fast::hash(&record.data)); + sequence.push(i32::from(record.meta.tp)); + entry_ids.insert(record.meta.entry_id); + namespaces.insert(&record.meta.ns); + checksum_matched.insert(record.meta.checksum == crc32fast::hash(&record.data)); } ensure!( @@ -259,12 +270,14 @@ fn record_type(seq: usize, num_records: usize) -> RecordType { fn build_records(entry: EntryImpl, max_record_size: usize) -> Vec { if entry.data.len() <= max_record_size { let record = Record { - version: VERSION, - tp: RecordType::Full, - checksum: crc32fast::hash(&entry.data), + meta: RecordMeta { + version: VERSION, + tp: RecordType::Full, + entry_id: entry.id, + ns: entry.ns, + checksum: crc32fast::hash(&entry.data), + }, data: entry.data, - entry_id: entry.id, - ns: entry.ns, }; return vec![record]; } @@ -274,12 +287,14 @@ fn build_records(entry: EntryImpl, max_record_size: usize) -> Vec { chunks .enumerate() .map(|(i, chunk)| Record { - version: VERSION, - tp: record_type(i, num_chunks), + meta: RecordMeta { + version: VERSION, + tp: record_type(i, num_chunks), + entry_id: entry.id, + ns: entry.ns.clone(), + checksum: crc32fast::hash(chunk), + }, data: chunk.to_vec(), - entry_id: entry.id, - ns: entry.ns.clone(), - checksum: crc32fast::hash(chunk), }) .collect() } @@ -290,7 +305,7 @@ pub fn maybe_emit_entry( entry_records: &mut HashMap>, ) -> Result> { let mut entry = None; - match record.tp { + match record.meta.tp { RecordType::Full => { entry = Some(EntryImpl::try_from(vec![record])?); } @@ -298,7 +313,7 @@ pub fn maybe_emit_entry( // There must have a sequence prefix before a Last record is read. let mut records = entry_records - .remove(&record.entry_id) + .remove(&record.meta.entry_id) .context(IllegalSequenceSnafu { error: "Missing sequence prefix", })?; @@ -307,7 +322,7 @@ pub fn maybe_emit_entry( } _ => { entry_records - .entry(record.entry_id) + .entry(record.meta.entry_id) .or_default() .push(record); } @@ -324,21 +339,30 @@ pub fn maybe_emit_entry( #[cfg(test)] mod tests { + use std::sync::Arc; + + use common_base::readable_size::ReadableSize; + use common_config::wal::KafkaConfig; + use rand::Rng; + use super::*; + use crate::kafka::client_manager::ClientManager; // Implements some utility methods for testing. impl Default for Record { fn default() -> Self { Self { - version: VERSION, - tp: RecordType::Full, - ns: NamespaceImpl { - region_id: 0, - topic: "greptimedb_wal_topic".to_string(), + meta: RecordMeta { + version: VERSION, + tp: RecordType::Full, + ns: NamespaceImpl { + region_id: 0, + topic: "greptimedb_wal_topic".to_string(), + }, + entry_id: 0, + checksum: 0, }, data: Vec::new(), - entry_id: 0, - checksum: 0, } } } @@ -346,7 +370,13 @@ mod tests { impl Record { /// Overrides tp. fn with_tp(&self, tp: RecordType) -> Self { - Self { tp, ..self.clone() } + Self { + meta: RecordMeta { + tp, + ..self.meta.clone() + }, + ..self.clone() + } } /// Overrides data with the given data. @@ -360,26 +390,34 @@ mod tests { /// Overrides entry id. fn with_entry_id(&self, entry_id: EntryId) -> Self { Self { - entry_id, + meta: RecordMeta { + entry_id, + ..self.meta.clone() + }, ..self.clone() } } /// Overrides namespace. fn with_ns(&self, ns: NamespaceImpl) -> Self { - Self { ns, ..self.clone() } + Self { + meta: RecordMeta { ns, ..self.meta }, + ..self.clone() + } } /// Overrides checksum. fn with_checksum(&self, checksum: u32) -> Self { Self { - checksum, + meta: RecordMeta { + checksum, + ..self.meta.clone() + }, ..self.clone() } } } - #[allow(unused)] fn new_test_entry>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl { EntryImpl { data: data.as_ref().to_vec(), @@ -557,15 +595,17 @@ mod tests { #[test] fn test_record_conversion() { let record = Record { - version: VERSION, - tp: RecordType::Full, - data: b"12345".to_vec(), - entry_id: 1, - ns: NamespaceImpl { - region_id: 1, - topic: "greptimedb_wal_topic".to_string(), + meta: RecordMeta { + version: VERSION, + tp: RecordType::Full, + entry_id: 1, + ns: NamespaceImpl { + region_id: 1, + topic: "greptimedb_wal_topic".to_string(), + }, + checksum: crc32fast::hash(b"12345".as_slice()), }, - checksum: crc32fast::hash(b"12345".as_slice()), + data: b"12345".to_vec(), }; let kafka_record: KafkaRecord = record.clone().try_into().unwrap(); let got = Record::try_from(kafka_record).unwrap(); @@ -591,8 +631,8 @@ mod tests { .with_tp(RecordType::Last), ]; let entry = EntryImpl::try_from(records.clone()).unwrap(); - assert_eq!(records[0].entry_id, entry.id); - assert_eq!(records[0].ns, entry.ns); + assert_eq!(records[0].meta.entry_id, entry.id); + assert_eq!(records[0].meta.ns, entry.ns); assert_eq!( entry.data, records @@ -666,4 +706,24 @@ mod tests { assert!(entry_records.contains_key(&1)); assert!(entry_records.contains_key(&2)); } + + #[tokio::test] + async fn test_produce_large_entry() { + let max_record_size = 1024; + let topic = format!("greptimedb_wal_topic_{}", rand::thread_rng().gen::()); + let ns = NamespaceImpl { + region_id: 1, + topic, + }; + let entry = new_test_entry([b'1'; 4096], 0, ns.clone()); + let producer = RecordProducer::new(ns.clone(), max_record_size).with_entries(vec![entry]); + + let config = KafkaConfig { + broker_endpoints: vec!["localhost:9092".to_string()], + max_batch_size: ReadableSize::kb(1), + ..Default::default() + }; + let manager = Arc::new(ClientManager::try_new(&config).await.unwrap()); + producer.produce(&manager).await.unwrap(); + } }