Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remote_wal): split an entry if it's too large #3092

Merged
merged 20 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ sync_write = false

# Kafka wal options, see `standalone.example.toml`.
# broker_endpoints = ["127.0.0.1:9092"]
# max_batch_size = "4MB"
# Warning: Kafka has a default limit of 1MB per message in a topic.
# max_batch_size = "1MB"
# linger = "200ms"
# consumer_wait_timeout = "100ms"
# backoff_init = "500ms"
Expand Down
3 changes: 2 additions & 1 deletion config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ provider = "raft_engine"
# replication_factor = 1

# The max size of a single producer batch.
# max_batch_size = "4MB"
# Warning: Kafka has a default limit of 1MB per message in a topic.
# max_batch_size = "1MB"
# The linger duration.
# linger = "200ms"
# The consumer wait timeout.
Expand Down
18 changes: 16 additions & 2 deletions src/common/config/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ mod tests {

#[test]
fn test_serde_kafka_config() {
// With all fields.
let toml_str = r#"
broker_endpoints = ["127.0.0.1:9092"]
max_batch_size = "4MB"
max_batch_size = "1MB"
linger = "200ms"
consumer_wait_timeout = "100ms"
backoff_init = "500ms"
Expand All @@ -104,7 +105,7 @@ mod tests {
let expected = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: RsKafkaCompression::default(),
max_batch_size: ReadableSize::mb(4),
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig {
Expand All @@ -115,6 +116,19 @@ mod tests {
},
};
assert_eq!(decoded, expected);

// With some fields missing.
let toml_str = r#"
broker_endpoints = ["127.0.0.1:9092"]
linger = "200ms"
"#;
let decoded: KafkaConfig = toml::from_str(toml_str).unwrap();
let expected = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
linger: Duration::from_millis(200),
..Default::default()
};
assert_eq!(decoded, expected);
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ impl Default for KafkaConfig {
Self {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: RsKafkaCompression::NoCompression,
max_batch_size: ReadableSize::mb(4),
// Warning: Kafka has a default limit of 1MB per message in a topic.
niebayes marked this conversation as resolved.
Show resolved Hide resolved
max_batch_size: ReadableSize::mb(1),
niebayes marked this conversation as resolved.
Show resolved Hide resolved
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig::default(),
Expand Down
2 changes: 2 additions & 0 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async-stream.workspace = true
async-trait.workspace = true
byteorder = "1.4"
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
Expand All @@ -24,6 +25,7 @@ common-telemetry.workspace = true
dashmap.workspace = true
futures-util.workspace = true
futures.workspace = true
itertools.workspace = true
protobuf = { version = "2", features = ["bytes"] }
raft-engine.workspace = true
rskafka.workspace = true
Expand Down
41 changes: 26 additions & 15 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common_config::wal::KafkaWalTopic;
use common_error::ext::ErrorExt;
use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};

use crate::kafka::NamespaceImpl as KafkaNamespace;
Expand Down Expand Up @@ -123,20 +124,6 @@ pub enum Error {
error: String,
},

#[snafu(display("Failed to encode a record meta"))]
EncodeMeta {
location: Location,
#[snafu(source)]
error: serde_json::Error,
},

#[snafu(display("Failed to decode a record meta"))]
DecodeMeta {
location: Location,
#[snafu(source)]
error: serde_json::Error,
},

#[snafu(display("Missing required key in a record"))]
MissingKey { location: Location },

Expand All @@ -146,9 +133,16 @@ pub enum Error {
#[snafu(display("Cannot build a record from empty entries"))]
EmptyEntries { location: Location },

#[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
#[snafu(display(
"Failed to produce records to Kafka, topic: {}, size: {}, limit: {}",
topic,
size,
limit,
))]
ProduceRecord {
topic: KafkaWalTopic,
size: usize,
limit: usize,
location: Location,
#[snafu(source)]
error: rskafka::client::producer::Error,
Expand All @@ -172,6 +166,23 @@ pub enum Error {

#[snafu(display("Failed to do a cast"))]
Cast { location: Location },

#[snafu(display("Failed to encode object into json"))]
EncodeJson {
location: Location,
#[snafu(source)]
error: JsonError,
},

#[snafu(display("Failed to decode object from json"))]
DecodeJson {
location: Location,
#[snafu(source)]
error: JsonError,
},

#[snafu(display("The record sequence is not legal, error: {}", error))]
IllegalSequence { location: Location, error: String },
}

impl ErrorExt for Error {
Expand Down
9 changes: 4 additions & 5 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod client_manager;
pub(crate) mod client_manager;
pub mod log_store;
mod offset;
mod record_utils;
pub(crate) mod util;

use std::fmt::Display;

Expand All @@ -29,8 +28,8 @@ use crate::error::Error;
/// Kafka Namespace implementation.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct NamespaceImpl {
region_id: u64,
topic: Topic,
pub region_id: u64,
pub topic: Topic,
}

impl Namespace for NamespaceImpl {
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Client {
/// Manages client construction and accesses.
#[derive(Debug)]
pub(crate) struct ClientManager {
config: KafkaConfig,
pub(crate) config: KafkaConfig,
/// Top-level client in kafka. All clients are constructed by this client.
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
Expand Down
94 changes: 58 additions & 36 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::Id as NamespaceId;
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};

use crate::error::{ConsumeRecordSnafu, Error, GetOffsetSnafu, Result};
use crate::error::{ConsumeRecordSnafu, Error, GetOffsetSnafu, IllegalSequenceSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::offset::Offset;
use crate::kafka::record_utils::{decode_from_record, RecordProducer};
use crate::kafka::util::offset::Offset;
use crate::kafka::util::record::{maybe_emit_entry, Record, RecordProducer};
use crate::kafka::{EntryImpl, NamespaceImpl};

/// A log store backed by Kafka.
Expand Down Expand Up @@ -85,8 +85,6 @@ impl LogStore for KafkaLogStore {
/// Appends a batch of entries and returns a response containing a map where the key is a region id
/// while the value is the id of the last successfully written entry of the region.
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
debug!("LogStore handles append_batch with entries {:?}", entries);

if entries.is_empty() {
return Ok(AppendBatchResponse::default());
}
Expand All @@ -96,7 +94,7 @@ impl LogStore for KafkaLogStore {
for entry in entries {
producers
.entry(entry.ns.region_id)
.or_insert(RecordProducer::new(entry.ns.clone()))
.or_insert_with(|| RecordProducer::new(entry.ns.clone()))
.push(entry);
}

Expand All @@ -115,8 +113,6 @@ impl LogStore for KafkaLogStore {
.into_iter()
.collect::<HashMap<_, _>>();

debug!("Append batch result: {:?}", last_entry_ids);

Ok(AppendBatchResponse { last_entry_ids })
}

Expand All @@ -127,13 +123,10 @@ impl LogStore for KafkaLogStore {
ns: &Self::Namespace,
entry_id: EntryId,
) -> Result<SendableEntryStream<Self::Entry, Self::Error>> {
let topic = ns.topic.clone();
let region_id = ns.region_id;

// Gets the client associated with the topic.
let client = self
.client_manager
.get_or_insert(&topic)
.get_or_insert(&ns.topic)
.await?
.raw_client
.clone();
Expand All @@ -148,13 +141,13 @@ impl LogStore for KafkaLogStore {
.context(GetOffsetSnafu { ns: ns.clone() })?
- 1;
// Reads entries with offsets in the range [start_offset, end_offset).
niebayes marked this conversation as resolved.
Show resolved Hide resolved
let start_offset = Offset::try_from(entry_id)?.0;
let start_offset: i64 = Offset::try_from(entry_id)?.0;

// Abort if there're no new entries.
// FIXME(niebayes): how come this case happens?
if start_offset > end_offset {
warn!(
"No new entries for ns {} in range [{}, {})",
"No new entries for ns {} in range [{}, {}]",
ns, start_offset, end_offset
);
return Ok(futures_util::stream::empty().boxed());
Expand All @@ -166,44 +159,52 @@ impl LogStore for KafkaLogStore {
.build();

debug!(
"Built a stream consumer for ns {} to consume entries in range [{}, {})",
"Built a stream consumer for ns {} to consume entries in range [{}, {}]",
ns, start_offset, end_offset
);

// Key: entry id, Value: the records associated with the entry.
let mut entry_records: HashMap<_, Vec<_>> = HashMap::new();
let ns_clone = ns.clone();
let stream = async_stream::stream!({
while let Some(consume_result) = stream_consumer.next().await {
// Each next will prdoce a `RecordAndOffset` and a high watermark offset.
// Each next on the stream consumer produces a `RecordAndOffset` and a high watermark offset.
// The `RecordAndOffset` contains the record data and its start offset.
// The high watermark offset is the end offset of the latest record in the partition.
let (record, high_watermark) = consume_result.context(ConsumeRecordSnafu {
ns: ns_clone.clone(),
})?;
let record_offset = record.offset;
// The high watermark offset is the offset of the last record plus one.
let (record_and_offset, high_watermark) =
consume_result.with_context(|_| ConsumeRecordSnafu {
ns: ns_clone.clone(),
})?;
let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset);

debug!(
"Read a record at offset {} for ns {}, high watermark: {}",
record_offset, ns_clone, high_watermark
offset, ns_clone, high_watermark
);

// Ignores noop records.
if record.record.value.is_none() {
// Ignores no-op records.
if kafka_record.value.is_none() {
if check_termination(offset, end_offset, &entry_records)? {
break;
}
continue;
}

// Filters records by namespace.
let record = Record::try_from(kafka_record)?;
if record.meta.ns != ns_clone {
if check_termination(offset, end_offset, &entry_records)? {
break;
}
continue;
}
let entries = decode_from_record(record.record)?;

// Filters entries by region id.
if let Some(entry) = entries.first()
&& entry.ns.region_id == region_id
{
yield Ok(entries);
// Tries to construct an entry from records consumed so far.
if let Some(entry) = maybe_emit_entry(record, &mut entry_records)? {
yield Ok(vec![entry]);
}

// Terminates the stream if the entry with the end offset was read.
if record_offset >= end_offset {
debug!(
"Stream consumer for ns {} terminates at offset {}",
ns_clone, record_offset
);
if check_termination(offset, end_offset, &entry_records)? {
break;
}
}
Expand Down Expand Up @@ -252,3 +253,24 @@ impl LogStore for KafkaLogStore {
Ok(())
}
}

fn check_termination(
offset: i64,
end_offset: i64,
entry_records: &HashMap<EntryId, Vec<Record>>,
) -> Result<bool> {
// Terminates the stream if the entry with the end offset was read.
if offset >= end_offset {
debug!("Stream consumer terminates at offset {}", offset);
// There must have no records when the stream terminates.
if !entry_records.is_empty() {
return IllegalSequenceSnafu {
error: "Found records leftover",
}
.fail();
}
Ok(true)
} else {
Ok(false)
}
}
Loading
Loading