Skip to content

Commit

Permalink
remove sending expected position as part of replication request
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a committed Jan 17, 2024
1 parent 5f35a21 commit 285448d
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 34 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,6 @@ impl Ingester {
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
from_position_exclusive: Some(from_position_exclusive),
to_position_inclusive: Some(current_position_inclusive),
doc_batch: Some(doc_batch),
};
replicate_subrequests
Expand Down Expand Up @@ -677,6 +676,7 @@ impl Ingester {
}
};
for replicate_success in replicate_response.successes {
// TODO verify replication_position_inclusive matches what's expected locally
let persist_success = PersistSuccess {
subrequest_id: replicate_success.subrequest_id,
index_uid: replicate_success.index_uid,
Expand Down
41 changes: 17 additions & 24 deletions quickwit/quickwit-ingest/src/ingest_v2/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,6 @@ impl ReplicationTask {
for subrequest in replicate_request.subrequests {
let queue_id = subrequest.queue_id();
let from_position_exclusive = subrequest.from_position_exclusive().clone();
let to_position_inclusive = subrequest.to_position_inclusive().clone();

let Some(shard) = state_guard.shards.get(&queue_id) else {
let replicate_failure = ReplicateFailure {
Expand Down Expand Up @@ -653,12 +652,6 @@ impl ReplicationTask {
.replicated_num_docs_total
.inc_by(batch_num_docs);

if current_position_inclusive != to_position_inclusive {
return Err(IngestV2Error::Internal(format!(
"bad replica position: expected {to_position_inclusive:?}, got \
{current_position_inclusive:?}"
)));
}
let replica_shard = state_guard
.shards
.get_mut(&queue_id)
Expand Down Expand Up @@ -870,14 +863,23 @@ mod tests {
let replicate_successes = replicate_request
.subrequests
.iter()
.map(|subrequest| ReplicateSuccess {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid.clone(),
source_id: subrequest.source_id.clone(),
shard_id: subrequest.shard_id.clone(),
replication_position_inclusive: Some(
subrequest.to_position_inclusive().clone(),
),
.map(|subrequest| {
let batch_len = subrequest.doc_batch.as_ref().unwrap().num_docs();
let replication_position_inclusive = subrequest
.from_position_exclusive
.clone()
.unwrap_or_default()
.as_usize()
.map_or(batch_len - 1, |pos| pos + batch_len);
ReplicateSuccess {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid.clone(),
source_id: subrequest.source_id.clone(),
shard_id: subrequest.shard_id.clone(),
replication_position_inclusive: Some(Position::offset(
replication_position_inclusive,
)),
}
})
.collect::<Vec<_>>();

Expand Down Expand Up @@ -905,7 +907,6 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])),
from_position_exclusive: Some(Position::Beginning),
to_position_inclusive: Some(Position::offset(0u64)),
},
ReplicateSubrequest {
subrequest_id: 1,
Expand All @@ -914,7 +915,6 @@ mod tests {
shard_id: Some(ShardId::from(2)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-bar", "test-doc-baz"])),
from_position_exclusive: Some(Position::Beginning),
to_position_inclusive: Some(Position::offset(1u64)),
},
ReplicateSubrequest {
subrequest_id: 2,
Expand All @@ -923,7 +923,6 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: Some(DocBatchV2::for_test(["test-qux", "test-doc-tux"])),
from_position_exclusive: Some(Position::offset(0u64)),
to_position_inclusive: Some(Position::offset(2u64)),
},
];
let replicate_response = replication_stream_task_handle
Expand Down Expand Up @@ -1152,7 +1151,6 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])),
from_position_exclusive: Some(Position::Beginning),
to_position_inclusive: Some(Position::offset(0u64)),
},
ReplicateSubrequest {
subrequest_id: 1,
Expand All @@ -1161,7 +1159,6 @@ mod tests {
shard_id: Some(ShardId::from(2)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-bar", "test-doc-baz"])),
from_position_exclusive: Some(Position::Beginning),
to_position_inclusive: Some(Position::offset(1u64)),
},
ReplicateSubrequest {
subrequest_id: 2,
Expand All @@ -1170,7 +1167,6 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-qux", "test-doc-tux"])),
from_position_exclusive: Some(Position::Beginning),
to_position_inclusive: Some(Position::offset(1u64)),
},
],
replication_seqno: 3,
Expand Down Expand Up @@ -1246,7 +1242,6 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-moo"])),
from_position_exclusive: Some(Position::offset(0u64)),
to_position_inclusive: Some(Position::offset(1u64)),
}],
replication_seqno: 4,
};
Expand Down Expand Up @@ -1340,7 +1335,6 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])),
from_position_exclusive: Position::offset(0u64).into(),
to_position_inclusive: Some(Position::offset(1u64)),
}],
replication_seqno: 0,
};
Expand Down Expand Up @@ -1425,7 +1419,6 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])),
from_position_exclusive: Some(Position::Beginning),
to_position_inclusive: Some(Position::offset(0u64)),
}],
replication_seqno: 0,
};
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-proto/protos/quickwit/ingester.proto
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ message ReplicateSubrequest {
string source_id = 3;
quickwit.ingest.ShardId shard_id = 4;
quickwit.ingest.Position from_position_exclusive = 5;
quickwit.ingest.Position to_position_inclusive = 6;
// used to be to_position_inclusive, now the roles are reverted and this is
// sent in the response instead of given in the request
reserved 6;
ingest.DocBatchV2 doc_batch = 7;
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions quickwit/quickwit-proto/src/ingest/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,6 @@ impl ReplicateSubrequest {
.as_ref()
.expect("`from_position_exclusive` should be a required field")
}

pub fn to_position_inclusive(&self) -> &Position {
self.to_position_inclusive
.as_ref()
.expect("`to_position_inclusive` should be a required field")
}
}

impl ReplicateSuccess {
Expand Down

0 comments on commit 285448d

Please sign in to comment.