Skip to content

Commit

Permalink
refactor: retrieve latest offset while dumping indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 14, 2024
1 parent b5defba commit 1f47698
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 48 deletions.
1 change: 0 additions & 1 deletion config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `wal.index_checkpoint_interval` | String | `300s` | The interval for doing WAL index checkpoints.<br/>**It's only used when the provider is `kafka`**. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
Expand Down
4 changes: 0 additions & 4 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,6 @@ create_index = true
## **It's only used when the provider is `kafka`**.
dump_index_interval = "60s"

## The interval for doing WAL index checkpoints.
## **It's only used when the provider is `kafka`**.
index_checkpoint_interval = "300s"

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
Expand Down
3 changes: 0 additions & 3 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ pub struct DatanodeKafkaConfig {
pub create_index: bool,
#[serde(with = "humantime_serde")]
pub dump_index_interval: Duration,
#[serde(with = "humantime_serde")]
pub index_checkpoint_interval: Duration,
}

impl Default for DatanodeKafkaConfig {
Expand All @@ -58,7 +56,6 @@ impl Default for DatanodeKafkaConfig {
kafka_topic: KafkaTopicConfig::default(),
create_index: true,
dump_index_interval: Duration::from_secs(60),
index_checkpoint_interval: Duration::from_secs(5 * 60),
}
}
}
4 changes: 1 addition & 3 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ impl DatanodeBuilder {
let path = default_index_file(opts.node_id.unwrap());
Some(Self::build_global_index_collector(
kafka_config.dump_index_interval,
kafka_config.index_checkpoint_interval,
operator,
path,
))
Expand Down Expand Up @@ -475,11 +474,10 @@ impl DatanodeBuilder {
/// Builds [`GlobalIndexCollector`]
fn build_global_index_collector(
dump_index_interval: Duration,
checkpoint_interval: Duration,
operator: object_store::ObjectStore,
path: String,
) -> GlobalIndexCollector {
GlobalIndexCollector::new(dump_index_interval, checkpoint_interval, operator, path)
GlobalIndexCollector::new(dump_index_interval, operator, path)
}
}

Expand Down
22 changes: 1 addition & 21 deletions src/log-store/src/kafka/index/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ pub struct GlobalIndexCollector {
pub struct CollectionTask {
providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
dump_index_interval: Duration,
checkpoint_interval: Duration,
operator: object_store::ObjectStore,
path: String,
running: Arc<AtomicBool>,
Expand Down Expand Up @@ -101,23 +100,11 @@ impl CollectionTask {
Ok(())
}

async fn checkpoint(&self) {
for (provider, sender) in self.providers.lock().await.iter() {
if sender.send(WorkerRequest::Checkpoint).await.is_err() {
error!(
"BackgroundProducerWorker is stopped, topic: {}",
provider.topic
)
}
}
}

/// The background task performs two main operations:
/// - Persists the WAL index to the specified `path` at every `dump_index_interval`.
/// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`.
fn run(&self) {
let mut dump_index_interval = tokio::time::interval(self.dump_index_interval);
let mut checkpoint_interval = tokio::time::interval(self.checkpoint_interval);
let running = self.running.clone();
let moved_self = self.clone();
common_runtime::spawn_global(async move {
Expand All @@ -132,9 +119,6 @@ impl CollectionTask {
error!(err; "Failed to persist the WAL index");
}
},
_ = checkpoint_interval.tick() => {
moved_self.checkpoint().await;
}
}
}
});
Expand All @@ -153,12 +137,9 @@ impl GlobalIndexCollector {
/// This method initializes a `GlobalIndexCollector` instance and starts a background task
/// for managing WAL (Write-Ahead Logging) indexes.
///
/// The background task performs two main operations:
/// - Persists the WAL index to the specified `path` at every `dump_index_interval`.
/// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`.
/// The background task persists the WAL index to the specified `path` at every `dump_index_interval`.
pub fn new(
dump_index_interval: Duration,
checkpoint_interval: Duration,
operator: object_store::ObjectStore,
path: String,
) -> Self {
Expand All @@ -167,7 +148,6 @@ impl GlobalIndexCollector {
let task = CollectionTask {
providers: providers.clone(),
dump_index_interval,
checkpoint_interval,
operator,
path,
running: Arc::new(AtomicBool::new(true)),
Expand Down
17 changes: 3 additions & 14 deletions src/log-store/src/kafka/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod checkpoint;
pub(crate) mod dump_index;
pub(crate) mod flush;
pub(crate) mod produce;

Expand All @@ -29,14 +29,12 @@ use store_api::storage::RegionId;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot::{self};

use super::index::IndexEncoder;
use crate::error::{self, NoMaxValueSnafu, Result};
use crate::kafka::index::IndexCollector;
use crate::kafka::index::{IndexCollector, IndexEncoder};
use crate::kafka::producer::ProducerClient;

pub(crate) enum WorkerRequest {
Produce(ProduceRequest),
Checkpoint,
TruncateIndex(TruncateIndexRequest),
DumpIndex(DumpIndexRequest),
}
Expand Down Expand Up @@ -188,27 +186,18 @@ impl BackgroundProducerWorker {

async fn handle_requests(&mut self, buffer: &mut Vec<WorkerRequest>) {
let mut produce_requests = Vec::with_capacity(buffer.len());
let mut do_checkpoint = false;
for req in buffer.drain(..) {
match req {
WorkerRequest::Produce(req) => produce_requests.push(req),
WorkerRequest::Checkpoint => do_checkpoint = true,
WorkerRequest::TruncateIndex(TruncateIndexRequest {
region_id,
entry_id,
}) => self.index_collector.truncate(region_id, entry_id),
WorkerRequest::DumpIndex(req) => {
self.index_collector.dump(req.encoder.as_ref());
let _ = req.sender.send(());
}
WorkerRequest::DumpIndex(req) => self.dump_index(req).await,
}
}

let pending_requests = self.aggregate_records(&mut produce_requests, self.max_batch_bytes);
self.try_flush_pending_requests(pending_requests).await;

if do_checkpoint {
self.do_checkpoint().await;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@ use common_telemetry::error;
use rskafka::client::partition::OffsetAt;
use snafu::ResultExt;

use super::DumpIndexRequest;
use crate::error;
use crate::kafka::worker::BackgroundProducerWorker;

impl BackgroundProducerWorker {
pub(crate) async fn do_checkpoint(&mut self) {
pub(crate) async fn dump_index(&mut self, req: DumpIndexRequest) {
match self
.client
.get_offset(OffsetAt::Latest)
.await
.context(error::GetOffsetSnafu {
topic: &self.provider.topic,
}) {
Ok(offset) => self.index_collector.set_latest_entry_id(offset as u64),
Ok(offset) => {
self.index_collector.set_latest_entry_id(offset as u64);
self.index_collector.dump(req.encoder.as_ref());
let _ = req.sender.send(());
}
Err(err) => error!(err; "Failed to do checkpoint"),
}
}
Expand Down

0 comments on commit 1f47698

Please sign in to comment.