Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Feb 7, 2024
1 parent f024de2 commit 756d37c
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 107 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/failpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async fn aux_test_failpoints() -> anyhow::Result<()> {
Ok(())
}

const TEST_TEXT: &'static str = r#"His sole child, my lord, and bequeathed to my
const TEST_TEXT: &str = r#"His sole child, my lord, and bequeathed to my
overlooking. I have those hopes of her good that
her education promises; her dispositions she
inherits, which makes fair gifts fairer; for where
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
dyn-clone = { workspace = true }
fail = { workspace = true }
fail = { workspace = true, optional = true }
flume = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
Expand Down Expand Up @@ -53,4 +53,5 @@ quickwit-proto = { workspace = true, features = ["testsuite"] }
quickwit-codegen = { workspace = true }

[features]
failpoints = ["fail/failpoints"]
testsuite = ["mockall"]
77 changes: 38 additions & 39 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,25 @@ impl Ingester {
) -> IngestV2Result<()> {
let queue_id = shard.queue_id();
info!(
index_uid = shard.index_uid,
source = shard.source_id,
shard = %shard.shard_id(),
index_uid=shard.index_uid,
source_id=shard.source_id,
shard_id=%shard.shard_id(),
"init primary shard"
);
let Entry::Vacant(entry) = state.shards.entry(queue_id.clone()) else {
return Ok(());
};
match mrecordlog.create_queue(&queue_id).await {
Ok(_) => {}
Err(CreateQueueError::AlreadyExists) => panic!("queue should not exist"),
Err(CreateQueueError::AlreadyExists) => {
error!("WAL queue `{queue_id}` already exists");
let message = format!("WAL queue `{queue_id}` already exists");
return Err(IngestV2Error::Internal(message));
}
Err(CreateQueueError::IoError(io_error)) => {
// TODO: Close all shards and set readiness to false.
error!(
"failed to create mrecordlog queue `{}`: {}",
queue_id, io_error
);
return Err(IngestV2Error::Internal(format!("Io Error: {io_error}")));
error!("failed to create WAL queue `{queue_id}`: {io_error}",);
let message = format!("failed to create WAL queue `{queue_id}`: {io_error}");
return Err(IngestV2Error::Internal(message));
}
};
let rate_limiter = RateLimiter::from_settings(self.rate_limiter_settings);
Expand Down Expand Up @@ -330,7 +331,8 @@ impl Ingester {

// first verify if we would locally accept each subrequest
{
let mut sum_of_requested_capacity = bytesize::ByteSize::b(0);
let mut total_requested_capacity = bytesize::ByteSize::b(0);

for subrequest in persist_request.subrequests {
let queue_id = subrequest.queue_id();

Expand Down Expand Up @@ -380,29 +382,26 @@ impl Ingester {
};
let requested_capacity = estimate_size(&doc_batch);

match check_enough_capacity(
if let Err(error) = check_enough_capacity(
&state_guard.mrecordlog,
self.disk_capacity,
self.memory_capacity,
requested_capacity + sum_of_requested_capacity,
requested_capacity + total_requested_capacity,
) {
Ok(_usage) => (),
Err(error) => {
rate_limited_warn!(
limit_per_min = 10,
"failed to persist records to ingester `{}`: {error}",
self.self_node_id
);
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ResourceExhausted as i32,
};
persist_failures.push(persist_failure);
continue;
}
rate_limited_warn!(
limit_per_min = 10,
"failed to persist records to ingester `{}`: {error}",
self.self_node_id
);
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ResourceExhausted as i32,
};
persist_failures.push(persist_failure);
continue;
};

let (rate_limiter, rate_meter) = state_guard
Expand All @@ -426,7 +425,7 @@ impl Ingester {

let batch_num_bytes = doc_batch.num_bytes() as u64;
rate_meter.update(batch_num_bytes);
sum_of_requested_capacity += requested_capacity;
total_requested_capacity += requested_capacity;

if let Some(follower_id) = follower_id_opt {
let replicate_subrequest = ReplicateSubrequest {
Expand Down Expand Up @@ -618,18 +617,15 @@ impl Ingester {

shard.shard_state = ShardState::Closed;
shard.notify_shard_status();
warn!("closed shard `{queue_id}` following IO error");
}
info!(
"closed {} shard(s) following IO error(s)",
shards_to_close.len()
);
}
if !shards_to_delete.is_empty() {
for queue_id in &shards_to_delete {
state_guard.shards.remove(queue_id);
state_guard.rate_trackers.remove(queue_id);
warn!("deleted dangling shard `{queue_id}`");
}
info!("deleted {} dangling shard(s)", shards_to_delete.len());
}

INGEST_V2_METRICS
Expand Down Expand Up @@ -1535,10 +1531,13 @@ mod tests {
);
}

// This test should be run manually and independently of other tests with the `fail/failpoints`
// feature enabled.
// This test should be run manually and independently of other tests with the `failpoints`
// feature enabled:
// ```sh
// cargo test -p quickwit-ingest --features failpoints -- test_ingester_persist_closes_shard_on_io_error
// ```
#[cfg(feature = "failpoints")]
#[tokio::test]
#[ignore]
async fn test_ingester_persist_closes_shard_on_io_error() {
let scenario = fail::FailScenario::setup();
fail::cfg("ingester:append_records", "return").unwrap();
Expand Down
30 changes: 15 additions & 15 deletions quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::iter::once;
use std::ops::RangeInclusive;

use bytesize::ByteSize;
#[cfg(feature = "failpoints")]
use fail::fail_point;
use mrecordlog::error::{AppendError, DeleteQueueError};
use mrecordlog::MultiRecordLog;
Expand Down Expand Up @@ -54,19 +55,25 @@ pub(super) async fn append_non_empty_doc_batch(
.docs()
.map(|doc| MRecord::Doc(doc).encode())
.chain(once(MRecord::Commit.encode()));

#[cfg(feature = "failpoints")]
fail_point!("ingester:append_records", |_| {
let io_error = io::Error::from(io::ErrorKind::PermissionDenied);
Err(AppendDocBatchError::Io(io_error))
});

mrecordlog
.append_records(queue_id, None, encoded_mrecords)
.await
} else {
let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode());

#[cfg(feature = "failpoints")]
fail_point!("ingester:append_records", |_| {
let io_error = io::Error::from(io::ErrorKind::PermissionDenied);
Err(AppendDocBatchError::Io(io_error))
});

mrecordlog
.append_records(queue_id, None, encoded_mrecords)
.await
Expand All @@ -84,12 +91,6 @@ pub(super) async fn append_non_empty_doc_batch(
}
}

#[derive(Debug, Clone, Copy)]
pub(super) struct MRecordLogUsage {
pub disk: ByteSize,
pub memory: ByteSize,
}

/// Error returned when the mrecordlog does not have enough capacity to store some records.
#[derive(Debug, Clone, Copy, thiserror::Error)]
pub(super) enum NotEnoughCapacityError {
Expand Down Expand Up @@ -118,7 +119,7 @@ pub(super) fn check_enough_capacity(
disk_capacity: ByteSize,
memory_capacity: ByteSize,
requested_capacity: ByteSize,
) -> Result<MRecordLogUsage, NotEnoughCapacityError> {
) -> Result<(), NotEnoughCapacityError> {
let disk_usage = ByteSize(mrecordlog.disk_usage() as u64);

if disk_usage + requested_capacity > disk_capacity {
Expand All @@ -137,11 +138,7 @@ pub(super) fn check_enough_capacity(
requested: requested_capacity,
});
}
let usage = MRecordLogUsage {
disk: disk_usage,
memory: memory_usage,
};
Ok(usage)
Ok(())
}

/// Deletes a queue from the WAL. Returns without error if the queue does not exist.
Expand Down Expand Up @@ -210,10 +207,13 @@ mod tests {
assert_eq!(position, Position::offset(2u64));
}

// This test should be run manually and independently of other tests with the `fail/failpoints`
// feature enabled.
// This test should be run manually and independently of other tests with the `failpoints`
// feature enabled:
// ```sh
// cargo test -p quickwit-ingest --features failpoints -- test_append_non_empty_doc_batch_io_error
// ```
#[cfg(feature = "failpoints")]
#[tokio::test]
#[ignore]
async fn test_append_non_empty_doc_batch_io_error() {
let scenario = fail::FailScenario::setup();
fail::cfg("ingester:append_records", "return").unwrap();
Expand Down
14 changes: 10 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ Two gRPC streams back the independent streams of requests and responses between
### Life of a happy persist request
1. Leader receives a persist request pre-assigned to a shard from a router.

1. Leader writes the data to the corresponding mrecordlog queue and records the new position of the queue called `primary_position`.

1. Leader sends replicate request to follower of the shard via the SYN replication stream.
1. Leader forwards replicate request to follower of the shard via the SYN replication stream.

1. Follower receives the replicate request, writes the data to its replica queue, and records the new position of the queue called `replica_position`.

1. Follower returns replicate response to leader via the ACK replication stream.

1. Leader records the new position of the replica queue. It should match the `primary_position`.
1. Leader records the new position of the replica queue.

1. Leader writes the data to its local mrecordlog queue and records the new position of the queue called `primary_position`. It should match the `replica_position`.

1. Leader return success persist response to router.

### Replication stream errors

- When a replication request fails, the leader and follower close the shard(s) targetted by the request.

- When a replication stream fails (transport error, timeout), the leader and follower close the shard(s) targetted by the stream. Then, the leader reopens a new stream if necessary.
Loading

0 comments on commit 756d37c

Please sign in to comment.