Skip to content

Commit

Permalink
feat: add revision check for kv and watch requests
Browse files Browse the repository at this point in the history
Refs: #188
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Jul 2, 2023
1 parent 07bdafd commit 504d338
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 27 deletions.
173 changes: 150 additions & 23 deletions xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,21 +195,55 @@ where
}

/// Validate range request before handle
fn check_range_request(req: &RangeRequest, current_revision: i64) -> Result<(), tonic::Status> {
fn check_range_request(
req: &RangeRequest,
compacted_revision: i64,
current_revision: i64,
) -> Result<(), tonic::Status> {
if req.key.is_empty() {
return Err(tonic::Status::invalid_argument("key is not provided"));
}
if !SortOrder::is_valid(req.sort_order) || !SortTarget::is_valid(req.sort_target) {
return Err(tonic::Status::invalid_argument("invalid sort option"));
}
if req.revision > current_revision {
return Err(tonic::Status::invalid_argument(format!(
Err(tonic::Status::invalid_argument(format!(
"required revision {} is higher than current revision {}",
req.revision, current_revision
)));
)))
} else if req.revision < compacted_revision {
Err(tonic::Status::invalid_argument(format!(
"required revision {} has been compacted, compacted revision is {}",
req.revision, compacted_revision
)))
} else {
Ok(())
}
}

Ok(())
/// Validate compact request before handle
fn check_compact_request(
req: &CompactionRequest,
compacted_revision: i64,
current_revision: i64,
) -> Result<(), tonic::Status> {
debug_assert!(
compacted_revision <= current_revision,
"compacted revision should not larger than current revision"
);
if req.revision <= compacted_revision {
Err(tonic::Status::invalid_argument(format!(
"required revision {} has been compacted, compacted revision is {}",
req.revision, compacted_revision
)))
} else if req.revision > current_revision {
Err(tonic::Status::invalid_argument(format!(
"required revision {} is higher than current revision {}",
req.revision, current_revision
)))
} else {
Ok(())
}
}

/// Validate put request before handle
Expand Down Expand Up @@ -237,7 +271,11 @@ where
}

/// Validate txn request before handle
fn check_txn_request(req: &TxnRequest, current_revision: i64) -> Result<(), tonic::Status> {
fn check_txn_request(
req: &TxnRequest,
compacted_revision: i64,
current_revision: i64,
) -> Result<(), tonic::Status> {
let opc = req
.compare
.len()
Expand All @@ -256,10 +294,14 @@ where
for op in req.success.iter().chain(req.failure.iter()) {
if let Some(ref request) = op.request {
match *request {
Request::RequestRange(ref r) => Self::check_range_request(r, current_revision),
Request::RequestRange(ref r) => {
Self::check_range_request(r, compacted_revision, current_revision)
}
Request::RequestPut(ref r) => Self::check_put_request(r),
Request::RequestDeleteRange(ref r) => Self::check_delete_range_request(r),
Request::RequestTxn(ref r) => Self::check_txn_request(r, current_revision),
Request::RequestTxn(ref r) => {
Self::check_txn_request(r, compacted_revision, current_revision)
}
}?;
} else {
return Err(tonic::Status::invalid_argument("key not found"));
Expand Down Expand Up @@ -392,7 +434,11 @@ where
) -> Result<tonic::Response<RangeResponse>, tonic::Status> {
let range_req = request.get_ref();
debug!("Receive grpc request: {:?}", range_req);
Self::check_range_request(range_req, self.kv_storage.revision())?;
Self::check_range_request(
range_req,
self.kv_storage.compacted_revision(),
self.kv_storage.revision(),
)?;
let is_serializable = range_req.serializable;
let token = get_token(request.metadata());
let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token);
Expand Down Expand Up @@ -469,7 +515,11 @@ where
) -> Result<tonic::Response<TxnResponse>, tonic::Status> {
let txn_req = request.get_ref();
debug!("Receive grpc request: {:?}", txn_req);
Self::check_txn_request(txn_req, self.kv_storage.revision())?;
Self::check_txn_request(
txn_req,
self.kv_storage.compacted_revision(),
self.kv_storage.revision(),
)?;
let is_fast_path = false; // lock need revision of txn
let (cmd_res, sync_res) = self.propose(request, is_fast_path).await?;

Expand All @@ -494,8 +544,10 @@ where
&self,
request: tonic::Request<CompactionRequest>,
) -> Result<tonic::Response<CompactionResponse>, tonic::Status> {
let compact_req = request.get_ref();
debug!("Receive grpc request: {:?}", compact_req);
debug!("Receive CompactionRequest {:?}", request);
let compacted_revision = self.kv_storage.compacted_revision();
let current_revision = self.kv_storage.revision();
Self::check_compact_request(request.get_ref(), compacted_revision, current_revision)?;
Err(tonic::Status::new(
tonic::Code::Unimplemented,
"Not Implemented".to_owned(),
Expand Down Expand Up @@ -549,38 +601,113 @@ mod test {
],
failure: vec![],
};
let result = KvServer::<DB>::check_txn_request(&txn_req, 0);
let result = KvServer::<DB>::check_txn_request(&txn_req, 1, 0);
assert!(result.is_ok());
}

#[tokio::test]
async fn test_future_revision() {
async fn test_range_invalid_revision() {
let current_revision = 10;
let range_request = RangeRequest {
let compacted_revision = 5;
let range_request_with_future_rev = RangeRequest {
key: b"foo".to_vec(),
revision: 20,
..Default::default()
};

let expected_err_message = tonic::Status::invalid_argument(format!(
"required revision {} is higher than current revision {}",
range_request.revision, current_revision
range_request_with_future_rev.revision, current_revision
))
.to_string();
let message = KvServer::<DB>::check_range_request(&range_request, current_revision)
.unwrap_err()
.to_string();
let message = KvServer::<DB>::check_range_request(
&range_request_with_future_rev,
compacted_revision,
current_revision,
)
.unwrap_err()
.to_string();
assert_eq!(message, expected_err_message);

let txn_req = TxnRequest {
let range_request_with_compacted_rev = RangeRequest {
key: b"foo".to_vec(),
revision: 2,
..Default::default()
};

let expected_err_message = tonic::Status::invalid_argument(format!(
"required revision {} has been compacted, compacted revision is {}",
range_request_with_compacted_rev.revision, compacted_revision
))
.to_string();

let message = KvServer::<DB>::check_range_request(
&range_request_with_compacted_rev,
compacted_revision,
current_revision,
)
.unwrap_err()
.to_string();
assert_eq!(message, expected_err_message);
assert_eq!(message, expected_err_message);
}

#[tokio::test]
async fn test_txn_invalid_revision() {
let current_revision = 10;
let compacted_revision = 5;
let txn_request_with_future_revision = TxnRequest {
compare: vec![],
success: vec![RequestOp {
request: Some(Request::RequestRange(range_request)),
request: Some(Request::RequestRange(RangeRequest {
key: b"foo".to_vec(),
revision: 20,
..Default::default()
})),
}],
failure: vec![],
};
let message = KvServer::<DB>::check_txn_request(&txn_req, current_revision)
.unwrap_err()
.to_string();

let expected_err_message = tonic::Status::invalid_argument(format!(
"required revision {} is higher than current revision {}",
20, current_revision
))
.to_string();

let message = KvServer::<DB>::check_txn_request(
&txn_request_with_future_revision,
compacted_revision,
current_revision,
)
.unwrap_err()
.to_string();
assert_eq!(message, expected_err_message);

let txn_request_with_compacted_revision = TxnRequest {
compare: vec![],
success: vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"foo".to_vec(),
revision: 3,
..Default::default()
})),
}],
failure: vec![],
};

let expected_err_message = tonic::Status::invalid_argument(format!(
"required revision {} has been compacted, compacted revision is {}",
3, compacted_revision
))
.to_string();

let message = KvServer::<DB>::check_txn_request(
&txn_request_with_compacted_revision,
compacted_revision,
current_revision,
)
.unwrap_err()
.to_string();
assert_eq!(message, expected_err_message);
}

Expand Down
Loading

0 comments on commit 504d338

Please sign in to comment.