From 60d01ac45c6c0eeaaba68a2dd9fde1353694e0c6 Mon Sep 17 00:00:00 2001 From: YI Date: Tue, 23 Apr 2024 04:03:43 +0800 Subject: [PATCH] Overhaul tx collaboration logic --- src/ckb/channel.rs | 242 ++++++++++++++++++++++++++------------------- 1 file changed, 141 insertions(+), 101 deletions(-) diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index 0815ad7b0..602ddaad1 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -179,87 +179,106 @@ impl ChannelActor { state.state = ChannelState::SigningCommitment(new_flags); Ok(()) } - _ => Err(ProcessingChannelError::InvalidState( - "Unable to send commitment signed message".to_string(), - )), + _ => Err(ProcessingChannelError::InvalidState(format!( + "Unable to send commitment signed message in state {:?}", + &state.state + ))), } } + // This is the dual of `handle_tx_collaboration_msg`. Any logic error here is likely + // to present in the other function as well. pub fn handle_tx_collaboration_command( &self, state: &mut ChannelActorState, command: TxCollaborationCommand, ) -> Result<(), ProcessingChannelError> { debug!("Handling tx collaboration command: {:?}", &command); - match state.state { - ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT) => { - if state.is_acceptor { - return Err(ProcessingChannelError::InvalidState(format!( - "sending a tx collaboration command while in while we're not the initiator of the channel negotiation, state: {:?}", - state.state - ))); - } else if let TxCollaborationCommand::TxAdd(ref tx) = command.clone() { - state.state = ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG, - ); - self.send_tx_collaboration_command(state, command)?; - state.add_tx_to_funding_tx(tx.clone().transaction)?; - } else { + let is_complete_command = match command { + TxCollaborationCommand::TxComplete(_) => true, + _ => false, + }; + let is_waiting_for_remote = match state.state { + ChannelState::CollaboratingFundingTx(flags) => { + flags.contains(CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG) + } + _ => false, + }; + + // We first exclude below cases that are invalid for tx collaboration, + // and then process the commands. + let flags = match state.state { + ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT) + if state.is_acceptor => + { + return Err(ProcessingChannelError::InvalidState(format!( + "Acceptor tries to start sending tx collaboration message", + ))); + } + ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT) + if matches!(command, TxCollaborationCommand::TxRemove(_)) => + { + return Err(ProcessingChannelError::InvalidState(format!( + "Trying to remove tx in the initial state", + ))); + } + ChannelState::NegotiatingFunding(_) => { + debug!("Beginning processing tx collaboration command"); + CollaboratingFundingTxFlags::empty() + } + ChannelState::CollaboratingFundingTx(_) + if !is_complete_command && is_waiting_for_remote => + { + return Err(ProcessingChannelError::InvalidState(format!( + "Trying to process command {:?} while in {:?} (should only send non-complete message after received response from peer)", + &command, state.state + ))); + } + ChannelState::CollaboratingFundingTx(flags) => { + if flags.contains(CollaboratingFundingTxFlags::OUR_TX_COMPLETE_SENT) { return Err(ProcessingChannelError::InvalidState(format!( - "the first message after AcceptChannel must be TxAdd, but we got {:?})", + "Trying to process a tx collaboration command {:?} while in collaboration already completed on our side", &command ))); } + debug!( + "Processing tx collaboration command {:?} for state {:?}", + &command, &state.state + ); + flags } - ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG, - ) - | ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COMPLETE, - ) => { - match command { - TxCollaborationCommand::TxComplete(_) => { - self.send_tx_collaboration_command(state, command)?; - state.state = ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COMPLETE, - ); - } - _ => return Err(ProcessingChannelError::InvalidState(format!( - "trying to send a message to remote while in {:?} which means we are waiting for remote to send message", state.state - ))) - } + _ => { + return Err(ProcessingChannelError::InvalidState(format!( + "Invalid tx collaboration command {:?} for state {:?}", + &command, state.state + ))); + } + }; + self.send_tx_collaboration_command(state, command.clone())?; + + // TODO: Note that we may deadlock here if send_tx_collaboration_command does successfully send the message, + // as in that case both us and the remote are waiting for each other to send the message. + match command { + TxCollaborationCommand::TxAdd(tx_add) => { + state.add_tx_to_funding_tx(tx_add.transaction)?; + state.state = ChannelState::CollaboratingFundingTx( + CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG, + ); } - ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG, - ) => { - self.send_tx_collaboration_command(state, command.clone())?; - // TODO: Note that we may deadlock here if send_tx_collaboration_command does successfully send the message, - // as in that case both us and the remote are waiting for each other to send the message. + TxCollaborationCommand::TxRemove(tx_remove) => { + state.remove_tx_from_funding_tx(tx_remove.transaction)?; state.state = ChannelState::CollaboratingFundingTx( CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG, ); - match command { - TxCollaborationCommand::TxAdd(tx_add) => { - state.add_tx_to_funding_tx(tx_add.transaction)?; - } - TxCollaborationCommand::TxRemove(tx_remove) => { - state.remove_tx_from_funding_tx(tx_remove.transaction)?; - } - TxCollaborationCommand::TxComplete(_) => { - state.state = ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COMPLETE, - ); - } - } } - _ => { - return Err(ProcessingChannelError::InvalidState(format!( - "expecting state transition valid for tx collaboration, but got {:?}", - state.state - ))); + TxCollaborationCommand::TxComplete(_) => { + state.state = ChannelState::CollaboratingFundingTx( + flags | CollaboratingFundingTxFlags::OUR_TX_COMPLETE_SENT, + ); } - }; + } + Ok(()) } @@ -584,8 +603,9 @@ bitflags! { pub struct CollaboratingFundingTxFlags: u32 { const AWAITING_REMOTE_TX_COLLABORATION_MSG = 1; const PREPARING_LOCAL_TX_COLLABORATION_MSG = 1 << 1; - const AWAITING_REMOTE_TX_COMPLETE = 1 << 2; - const COLLABRATION_COMPLETED = 1 << 3; + const OUR_TX_COMPLETE_SENT = 1 << 2; + const THEIR_TX_COMPLETE_SENT = 1 << 3; + const COLLABRATION_COMPLETED = CollaboratingFundingTxFlags::OUR_TX_COMPLETE_SENT.bits() | CollaboratingFundingTxFlags::THEIR_TX_COMPLETE_SENT.bits(); } #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -883,67 +903,87 @@ impl ChannelActorState { Ok(()) } + // This is the dual of `handle_tx_collaboration_command`. Any logic error here is likely + // to present in the other function as well. pub fn handle_tx_collaboration_msg( &mut self, msg: TxCollaborationMsg, ) -> ProcessingChannelResult { debug!("Processing tx collaboration message: {:?}", &msg); - match self.state { + let is_complete_message = match msg { + TxCollaborationMsg::TxComplete(_) => true, + _ => false, + }; + let is_waiting_for_remote = match self.state { + ChannelState::CollaboratingFundingTx(flags) => { + flags.contains(CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG) + } + _ => false, + }; + let flags = match self.state { // Starting transaction collaboration - ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT) => { - // Only the initator should start sending tx_add messages. - if !self.is_acceptor || !matches!(msg, TxCollaborationMsg::TxAdd(_)) { - return Err(ProcessingChannelError::InvalidState(format!( - "Tx collaboration message received. It must be a TxAdd message from the initator of the channel (we are the {}, and this message is {:?})", if self.is_acceptor {"acceptor"} else {"initator"}, &msg), - )); - } - self.state = ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG, - ); + ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT) + if !self.is_acceptor => + { + return Err(ProcessingChannelError::InvalidState(format!( + "Initiator received a tx collaboration message", + ))); } - // Alternate sending messages. - ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG, - ) => { - self.state = ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG, - ); + ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT) + if matches!(msg, TxCollaborationMsg::TxRemove(_)) => + { + return Err(ProcessingChannelError::InvalidState(format!( + "Recevied a TxRemove message from the start of tx collaboration" + ))); } - ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COMPLETE, - ) => { - if matches!(msg, TxCollaborationMsg::TxComplete(_)) { - self.state = ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::COLLABRATION_COMPLETED, - ); - } else { - self.state = ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG, - ); - } + ChannelState::NegotiatingFunding(_) => { + debug!("Beginning processing tx collaboration message"); + CollaboratingFundingTxFlags::empty() } - ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG, - ) if matches!(msg, TxCollaborationMsg::TxComplete(_)) => { - debug!("TxComplete message received, finishing tx collaboration"); + ChannelState::CollaboratingFundingTx(_) + if !is_complete_message && !is_waiting_for_remote => + { + return Err(ProcessingChannelError::InvalidState(format!( + "Trying to process message {:?} while in {:?} (should only receive non-complete message after sent response from peer)", + &msg, self.state + ))); + } + ChannelState::CollaboratingFundingTx(flags) => { + if flags.contains(CollaboratingFundingTxFlags::THEIR_TX_COMPLETE_SENT) { + return Err(ProcessingChannelError::InvalidState(format!( + "Trying to process a tx collaboration message {:?} while in collaboration already completed on our side", + &msg + ))); + } + debug!( + "Processing tx collaboration message {:?} for state {:?}", + &msg, &self.state + ); + flags } _ => { return Err(ProcessingChannelError::InvalidState(format!( - "TxAdd message received, but we're in {:?}", - self.state + "Invalid tx collaboration message {:?} for state {:?}", + &msg, &self.state ))); } - } + }; match msg { TxCollaborationMsg::TxAdd(msg) => { self.add_tx_to_funding_tx(msg.tx)?; + self.state = ChannelState::CollaboratingFundingTx( + CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG, + ); } TxCollaborationMsg::TxRemove(msg) => { self.remove_tx_from_funding_tx(msg.tx)?; + self.state = ChannelState::CollaboratingFundingTx( + CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG, + ); } - TxCollaborationMsg::TxComplete(msg) => { + TxCollaborationMsg::TxComplete(_msg) => { self.state = ChannelState::CollaboratingFundingTx( - CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COMPLETE, + flags | CollaboratingFundingTxFlags::THEIR_TX_COMPLETE_SENT, ); } }