Skip to content

Commit

Permalink
fix: overwrite entry_id if entry id is less than start_offset (Grepti…
Browse files Browse the repository at this point in the history
…meTeam#4842)

* fix: overwrite entry_id if entry id is less than start_offset

* feat: add `overwrite_entry_start_id` to options

* chore: update config.md
  • Loading branch information
WenyXu authored Oct 18, 2024
1 parent 74bdba4 commit ca6e029
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 1 deletion.
2 changes: 2 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system <br/>can still successfully replay memtable data without throwing an <br/>out-of-range error. <br/>However, enabling this option might lead to unexpected data loss, <br/>as the system will skip over missing entries instead of treating <br/>them as critical errors. |
| `metadata_store` | -- | -- | Metadata storage options. |
| `metadata_store.file_size` | String | `256MB` | Kv file size in bytes. |
| `metadata_store.purge_threshold` | String | `4GB` | Kv purge threshold. |
Expand Down Expand Up @@ -409,6 +410,7 @@
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system <br/>can still successfully replay memtable data without throwing an <br/>out-of-range error. <br/>However, enabling this option might lead to unexpected data loss, <br/>as the system will skip over missing entries instead of treating <br/>them as critical errors. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
Expand Down
11 changes: 11 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,17 @@ create_index = true
## **It's only used when the provider is `kafka`**.
dump_index_interval = "60s"

## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
## them as critical errors.
overwrite_entry_start_id = false

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
Expand Down
11 changes: 11 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,17 @@ backoff_base = 2
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"

## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
## them as critical errors.
overwrite_entry_start_id = false

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
Expand Down
3 changes: 3 additions & 0 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub struct DatanodeKafkaConfig {
pub create_index: bool,
#[serde(with = "humantime_serde")]
pub dump_index_interval: Duration,
/// Ignore missing entries during read WAL.
pub overwrite_entry_start_id: bool,
}

impl Default for DatanodeKafkaConfig {
Expand All @@ -60,6 +62,7 @@ impl Default for DatanodeKafkaConfig {
auto_create_topics: true,
create_index: true,
dump_index_interval: Duration::from_secs(60),
overwrite_entry_start_id: false,
}
}
}
24 changes: 23 additions & 1 deletion src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct KafkaLogStore {
max_batch_bytes: usize,
/// The consumer wait timeout.
consumer_wait_timeout: Duration,
/// Ignore missing entries during read WAL.
overwrite_entry_start_id: bool,
}

impl KafkaLogStore {
Expand All @@ -64,6 +66,7 @@ impl KafkaLogStore {
client_manager,
max_batch_bytes: config.max_batch_bytes.as_bytes() as usize,
consumer_wait_timeout: config.consumer_wait_timeout,
overwrite_entry_start_id: config.overwrite_entry_start_id,
})
}
}
Expand Down Expand Up @@ -205,7 +208,7 @@ impl LogStore for KafkaLogStore {
async fn read(
&self,
provider: &Provider,
entry_id: EntryId,
mut entry_id: EntryId,
index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
let provider = provider
Expand All @@ -225,6 +228,25 @@ impl LogStore for KafkaLogStore {
.client()
.clone();

if self.overwrite_entry_start_id {
let start_offset =
client
.get_offset(OffsetAt::Earliest)
.await
.context(GetOffsetSnafu {
topic: &provider.topic,
})?;

if entry_id as i64 <= start_offset {
warn!(
"The entry_id: {} is less than start_offset: {}, topic: {}. Overwriting entry_id with start_offset",
entry_id, start_offset, &provider.topic
);

entry_id = start_offset as u64;
}
}

// Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic.
// The read operation terminates when this record is consumed.
// Warning: the `get_offset` returns the end offset of the latest record. For our usage, it should be decremented.
Expand Down

0 comments on commit ca6e029

Please sign in to comment.