Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FRAME] MQ processor should be transactional #5198

Merged
merged 17 commits into from
Sep 2, 2024
Merged
10 changes: 10 additions & 0 deletions prdoc/pr_5198.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: "MQ processor should be transactional"

doc:
- audience: Runtime User
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- audience: Runtime User
- audience: [Runtime User, Runtime Dev]

description: |
Enforce transactional processing on pallet Message Queue Processor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Enforce transactional processing on pallet Message Queue Processor
Enforce transactional processing on pallet Message Queue Processor.
Storage changes that were done while processing a message will now be rolled back
when the processing returns an error. `Ok(false)` will not revert, only `Err(_)`.


crates:
- name: pallet-message-queue
bump: patch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bump: patch
bump: major

Its a logical breaking change, better safe than sorry for these things.

19 changes: 17 additions & 2 deletions substrate/frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ use sp_arithmetic::traits::{BaseArithmetic, Unsigned};
use sp_core::{defer, H256};
use sp_runtime::{
traits::{One, Zero},
SaturatedConversion, Saturating,
SaturatedConversion, Saturating, TransactionOutcome,
};
use sp_weights::WeightMeter;
pub use weights::WeightInfo;
Expand Down Expand Up @@ -1447,7 +1447,22 @@ impl<T: Config> Pallet<T> {
use ProcessMessageError::*;
let prev_consumed = meter.consumed();

match T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id) {
let transaction =
storage::with_transaction(|| -> TransactionOutcome<Result<_, DispatchError>> {
let res =
T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id);
match &res {
Ok(_) => TransactionOutcome::Commit(Ok(res)),
Err(_) => TransactionOutcome::Rollback(Ok(res)),
}
});

let transaction = match transaction {
Ok(result) => result,
_ => unreachable!(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With current storage::with_transaction implementation, this is indeed unreachable, but if the internals of that function change, this will crash the runtime.

I know, highly unlikely, but still I would prefer to:

  • debug_assert!() to have it crash in tests if the invariant is ever broken,
  • return some error MessageExecutionStatus in prod without crashing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it should be defensive and then just return an error or so. Having panics in the runtime is never good.

};

match transaction {
Err(Overweight(w)) if w.any_gt(overweight_limit) => {
// Permanently overweight.
Self::deposit_event(Event::<T>::OverweightEnqueued {
Expand Down
1 change: 1 addition & 0 deletions substrate/frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ impl ProcessMessage for RecordingMessageProcessor {
meter: &mut WeightMeter,
_id: &mut [u8; 32],
) -> Result<bool, ProcessMessageError> {
sp_io::storage::set(b"transactional_storage", &vec![1, 2, 3]);
Copy link
Member

@ggwpez ggwpez Aug 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mocked process supports callbacks - no need to hard code it here.

Check out the tests that use it:

Callback::set(Box::new(|_, _| {
	// whatever youwant
}));

processing_message(message, &origin)?;

let weight = if message.starts_with(&b"weight="[..]) {
Expand Down
81 changes: 81 additions & 0 deletions substrate/frame/message-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1975,3 +1975,84 @@ fn execute_overweight_keeps_stack_ov_message() {
System::reset_events();
});
}

/// Test that process_message is transactional
#[test]
fn test_process_message_transactional() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
// We need to create a mocked message that first reports insufficient weight, and then
// `StackLimitReached`:
IgnoreStackOvError::set(true);
MessageQueue::enqueue_message(msg("stacklimitreached"), Here);
MessageQueue::service_queues(0.into_weight());

assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"stacklimitreached"),
origin: MessageOrigin::Here,
message_index: 0,
page_index: 0,
}
.into(),
);
// Does not count as 'processed':
assert!(MessagesProcessed::take().is_empty());
assert_pages(&[0]);

// create a storage
let vec_to_set = vec![1, 2, 3, 4, 5];
sp_io::storage::set(b"transactional_storage", &vec_to_set);

// Now let it return `StackLimitReached`. Note that this case would normally not happen,
// since we assume that the top-level execution is the one with the most remaining stack
// depth.
IgnoreStackOvError::set(false);
// Ensure that trying to execute the message does not change any state (besides events).
System::reset_events();
let storage_noop = StorageNoopGuard::new();
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(3.into_weight(), (Here, 0, 0)),
Err(ExecuteOverweightError::Other)
);
assert_last_event::<Test>(
Event::ProcessingFailed {
id: blake2_256(b"stacklimitreached").into(),
origin: MessageOrigin::Here,
error: ProcessMessageError::StackLimitReached,
}
.into(),
);
System::reset_events();
drop(storage_noop);

// because the message was processed with an error, transactional_storage changes wasn't
// commited this means storage was rolled back
let stored_vec = sp_io::storage::get(b"transactional_storage").unwrap();
assert_eq!(stored_vec, vec![1, 2, 3, 4, 5]);

// Now let's process it normally:
IgnoreStackOvError::set(true);
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(1.into_weight(), (Here, 0, 0))
.unwrap(),
1.into_weight()
);

// transactional storage changes, this means storage was committed
let stored_vec = sp_io::storage::get(b"transactional_storage").unwrap();
assert_eq!(stored_vec, vec![1, 2, 3]);

assert_last_event::<Test>(
Event::Processed {
id: blake2_256(b"stacklimitreached").into(),
origin: MessageOrigin::Here,
weight_used: 1.into_weight(),
success: true,
}
.into(),
);
assert_pages(&[]);
System::reset_events();
});
}
Loading