Skip to content

Commit

Permalink
Overhaul tx collaboration logic
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Apr 22, 2024
1 parent 6ad3ef5 commit 60d01ac
Showing 1 changed file with 141 additions and 101 deletions.
242 changes: 141 additions & 101 deletions src/ckb/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,87 +179,106 @@ impl ChannelActor {
state.state = ChannelState::SigningCommitment(new_flags);
Ok(())
}
_ => Err(ProcessingChannelError::InvalidState(
"Unable to send commitment signed message".to_string(),
)),
_ => Err(ProcessingChannelError::InvalidState(format!(
"Unable to send commitment signed message in state {:?}",
&state.state
))),
}
}

// This is the dual of `handle_tx_collaboration_msg`. Any logic error here is likely
// to present in the other function as well.
pub fn handle_tx_collaboration_command(
&self,
state: &mut ChannelActorState,
command: TxCollaborationCommand,
) -> Result<(), ProcessingChannelError> {
debug!("Handling tx collaboration command: {:?}", &command);
match state.state {
ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT) => {
if state.is_acceptor {
return Err(ProcessingChannelError::InvalidState(format!(
"sending a tx collaboration command while in while we're not the initiator of the channel negotiation, state: {:?}",
state.state
)));
} else if let TxCollaborationCommand::TxAdd(ref tx) = command.clone() {
state.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG,
);
self.send_tx_collaboration_command(state, command)?;
state.add_tx_to_funding_tx(tx.clone().transaction)?;
} else {
let is_complete_command = match command {
TxCollaborationCommand::TxComplete(_) => true,
_ => false,
};
let is_waiting_for_remote = match state.state {
ChannelState::CollaboratingFundingTx(flags) => {
flags.contains(CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG)
}
_ => false,
};

// We first exclude below cases that are invalid for tx collaboration,
// and then process the commands.
let flags = match state.state {
ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT)
if state.is_acceptor =>
{
return Err(ProcessingChannelError::InvalidState(format!(
"Acceptor tries to start sending tx collaboration message",
)));
}
ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT)
if matches!(command, TxCollaborationCommand::TxRemove(_)) =>
{
return Err(ProcessingChannelError::InvalidState(format!(
"Trying to remove tx in the initial state",
)));
}
ChannelState::NegotiatingFunding(_) => {
debug!("Beginning processing tx collaboration command");
CollaboratingFundingTxFlags::empty()
}
ChannelState::CollaboratingFundingTx(_)
if !is_complete_command && is_waiting_for_remote =>
{
return Err(ProcessingChannelError::InvalidState(format!(
"Trying to process command {:?} while in {:?} (should only send non-complete message after received response from peer)",
&command, state.state
)));
}
ChannelState::CollaboratingFundingTx(flags) => {
if flags.contains(CollaboratingFundingTxFlags::OUR_TX_COMPLETE_SENT) {
return Err(ProcessingChannelError::InvalidState(format!(
"the first message after AcceptChannel must be TxAdd, but we got {:?})",
"Trying to process a tx collaboration command {:?} while in collaboration already completed on our side",
&command
)));
}
debug!(
"Processing tx collaboration command {:?} for state {:?}",
&command, &state.state
);
flags
}
ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG,
)
| ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COMPLETE,
) => {
match command {
TxCollaborationCommand::TxComplete(_) => {
self.send_tx_collaboration_command(state, command)?;
state.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COMPLETE,
);
}
_ => return Err(ProcessingChannelError::InvalidState(format!(
"trying to send a message to remote while in {:?} which means we are waiting for remote to send message", state.state
)))
}
_ => {
return Err(ProcessingChannelError::InvalidState(format!(
"Invalid tx collaboration command {:?} for state {:?}",
&command, state.state
)));
}
};

self.send_tx_collaboration_command(state, command.clone())?;

// TODO: Note that we may deadlock here if send_tx_collaboration_command does successfully send the message,
// as in that case both us and the remote are waiting for each other to send the message.
match command {
TxCollaborationCommand::TxAdd(tx_add) => {
state.add_tx_to_funding_tx(tx_add.transaction)?;
state.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG,
);
}
ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG,
) => {
self.send_tx_collaboration_command(state, command.clone())?;
// TODO: Note that we may deadlock here if send_tx_collaboration_command does successfully send the message,
// as in that case both us and the remote are waiting for each other to send the message.
TxCollaborationCommand::TxRemove(tx_remove) => {
state.remove_tx_from_funding_tx(tx_remove.transaction)?;
state.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG,
);
match command {
TxCollaborationCommand::TxAdd(tx_add) => {
state.add_tx_to_funding_tx(tx_add.transaction)?;
}
TxCollaborationCommand::TxRemove(tx_remove) => {
state.remove_tx_from_funding_tx(tx_remove.transaction)?;
}
TxCollaborationCommand::TxComplete(_) => {
state.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COMPLETE,
);
}
}
}
_ => {
return Err(ProcessingChannelError::InvalidState(format!(
"expecting state transition valid for tx collaboration, but got {:?}",
state.state
)));
TxCollaborationCommand::TxComplete(_) => {
state.state = ChannelState::CollaboratingFundingTx(
flags | CollaboratingFundingTxFlags::OUR_TX_COMPLETE_SENT,
);
}
};
}

Ok(())
}

Expand Down Expand Up @@ -584,8 +603,9 @@ bitflags! {
pub struct CollaboratingFundingTxFlags: u32 {
const AWAITING_REMOTE_TX_COLLABORATION_MSG = 1;
const PREPARING_LOCAL_TX_COLLABORATION_MSG = 1 << 1;
const AWAITING_REMOTE_TX_COMPLETE = 1 << 2;
const COLLABRATION_COMPLETED = 1 << 3;
const OUR_TX_COMPLETE_SENT = 1 << 2;
const THEIR_TX_COMPLETE_SENT = 1 << 3;
const COLLABRATION_COMPLETED = CollaboratingFundingTxFlags::OUR_TX_COMPLETE_SENT.bits() | CollaboratingFundingTxFlags::THEIR_TX_COMPLETE_SENT.bits();
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -883,67 +903,87 @@ impl ChannelActorState {
Ok(())
}

// This is the dual of `handle_tx_collaboration_command`. Any logic error here is likely
// to present in the other function as well.
pub fn handle_tx_collaboration_msg(
&mut self,
msg: TxCollaborationMsg,
) -> ProcessingChannelResult {
debug!("Processing tx collaboration message: {:?}", &msg);
match self.state {
let is_complete_message = match msg {
TxCollaborationMsg::TxComplete(_) => true,
_ => false,
};
let is_waiting_for_remote = match self.state {
ChannelState::CollaboratingFundingTx(flags) => {
flags.contains(CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG)
}
_ => false,
};
let flags = match self.state {
// Starting transaction collaboration
ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT) => {
// Only the initator should start sending tx_add messages.
if !self.is_acceptor || !matches!(msg, TxCollaborationMsg::TxAdd(_)) {
return Err(ProcessingChannelError::InvalidState(format!(
"Tx collaboration message received. It must be a TxAdd message from the initator of the channel (we are the {}, and this message is {:?})", if self.is_acceptor {"acceptor"} else {"initator"}, &msg),
));
}
self.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG,
);
ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT)
if !self.is_acceptor =>
{
return Err(ProcessingChannelError::InvalidState(format!(
"Initiator received a tx collaboration message",
)));
}
// Alternate sending messages.
ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG,
) => {
self.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG,
);
ChannelState::NegotiatingFunding(NegotiatingFundingFlags::INIT_SENT)
if matches!(msg, TxCollaborationMsg::TxRemove(_)) =>
{
return Err(ProcessingChannelError::InvalidState(format!(
"Recevied a TxRemove message from the start of tx collaboration"
)));
}
ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COMPLETE,
) => {
if matches!(msg, TxCollaborationMsg::TxComplete(_)) {
self.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::COLLABRATION_COMPLETED,
);
} else {
self.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG,
);
}
ChannelState::NegotiatingFunding(_) => {
debug!("Beginning processing tx collaboration message");
CollaboratingFundingTxFlags::empty()
}
ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG,
) if matches!(msg, TxCollaborationMsg::TxComplete(_)) => {
debug!("TxComplete message received, finishing tx collaboration");
ChannelState::CollaboratingFundingTx(_)
if !is_complete_message && !is_waiting_for_remote =>
{
return Err(ProcessingChannelError::InvalidState(format!(
"Trying to process message {:?} while in {:?} (should only receive non-complete message after sent response from peer)",
&msg, self.state
)));
}
ChannelState::CollaboratingFundingTx(flags) => {
if flags.contains(CollaboratingFundingTxFlags::THEIR_TX_COMPLETE_SENT) {
return Err(ProcessingChannelError::InvalidState(format!(
"Trying to process a tx collaboration message {:?} while in collaboration already completed on our side",
&msg
)));
}
debug!(
"Processing tx collaboration message {:?} for state {:?}",
&msg, &self.state
);
flags
}
_ => {
return Err(ProcessingChannelError::InvalidState(format!(
"TxAdd message received, but we're in {:?}",
self.state
"Invalid tx collaboration message {:?} for state {:?}",
&msg, &self.state
)));
}
}
};
match msg {
TxCollaborationMsg::TxAdd(msg) => {
self.add_tx_to_funding_tx(msg.tx)?;
self.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG,
);
}
TxCollaborationMsg::TxRemove(msg) => {
self.remove_tx_from_funding_tx(msg.tx)?;
self.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG,
);
}
TxCollaborationMsg::TxComplete(msg) => {
TxCollaborationMsg::TxComplete(_msg) => {
self.state = ChannelState::CollaboratingFundingTx(
CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COMPLETE,
flags | CollaboratingFundingTxFlags::THEIR_TX_COMPLETE_SENT,
);
}
}
Expand Down

0 comments on commit 60d01ac

Please sign in to comment.