Skip to content

Commit

Permalink
RFC 12 Implementation: Coordination layer groundwork (#3744)
Browse files Browse the repository at this point in the history
Refs: keep-network/tbtc-v2#737

Here we present the first part of the changes meant to implement [RFC
12: Decentralized wallet
coordination](https://github.com/keep-network/tbtc-v2/blob/main/docs/rfc/rfc-12.adoc)
in the tBTC wallet client. This pull request focuses on the groundwork
necessary to run the off-chain coordination layer.

### Coordination layer orchestration

The node orchestrates the coordination layer upon startup. Specifically,
it runs two separate goroutines:
- Coordination window watch
- Coordination result processor

Both goroutines are steered by the node's root context and communicate
through a dedicated channel.

### Coordination window watch

The coordination window watch goroutine is responsible for detecting
coordination windows that occur every `900` blocks. Once a window is
detected, it runs the window handler function and passes the window data
as an argument.
The window watch process guarantees that the window handler is called
only once for the given coordination window. Moreover, it also
guarantees that the window handler is called for all coordination
windows. This is achieved by calling each handler in a separate
goroutine so the watch loop does not block for long and the chance of
missing a coordination window signal is negligible.

The window handler function triggers the window processing logic.
Specific steps of that logic are:
1. Determine the list of wallets controlled by the given node
2. Take a coordination executor for each wallet
3. Use the coordination executors to run the coordination procedure for
each wallet (in parallel)
4. Collect coordination results and push them to the processing channel

### Coordination executor

The coordination executor is a component that is responsible for running
the coordination procedure for the given wallet and coordination window.
It is designed to encapsulate the logic of the procedure (coordination
seed, communication, and so on). It also ensures that only one instance
of the procedure is executed at a time. The executor is also responsible
for assembling the coordination procedure's result and reporting all
coordination faults detected during execution.

The design of the coordination executor is inspired by the existing
signing and DKG executor. It attempts to fit the coordination
procedure's logic into the existing codebase in an elegant way.

### Coordination result processor

The coordination result processor goroutine listens for incoming
coordination results and triggers the result handler function in a
separate goroutine to ensure all results are processed independently.

The result handler function triggers the result processing logic.
Specific steps of that logic are:
1. Record coordination faults reported in the result
2. Detect the type of the action proposal being part of the result
3. Fire the appropriate proposal handler

The proposal handlers are part of the existing codebase. They are
responsible for the orchestration and execution of the proposed wallet
actions.

### Intersection with the existing chain-based coordination mechanism

The presented groundwork was built alongside the existing chain-based
coordination mechanism. Some initial integration steps around data types
were done. The existing mechanism will be gradually removed and replaced
in the follow-up pull requests.

### Next steps

The next steps (coarse-grained) on the way towards RFC 12 implementation
are:

- Implement coordination procedure logic (i.e. implement the
`coordinationExecutor.coordinate` method)
- Finalize coordination result processing (i.e. implement the
`processCoordinationResult` function and refactor `node`'s handlers
appropriately)
- Remove the existing chain-based mechanism (i.e. detach
`WalletCoordinator`'s events handlers and remove unnecessary code from
`chain.go`)
- Modify the SPV maintainter to not rely on `WalletCoordinator`'s events
during unproven transactions lookup
  • Loading branch information
tomaszslabon authored Nov 27, 2023
2 parents 73697d1 + 5c7513d commit 35f9ceb
Show file tree
Hide file tree
Showing 15 changed files with 1,040 additions and 45 deletions.
18 changes: 13 additions & 5 deletions pkg/chain/local_v1/blockcounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type watcher struct {
channel chan uint64
}

var blockTime = time.Duration(500 * time.Millisecond)
var defaultBlockTime = 500 * time.Millisecond

func (lbc *localBlockCounter) WaitForBlockHeight(blockNumber uint64) error {
waiter, err := lbc.BlockHeightWaiter(blockNumber)
Expand Down Expand Up @@ -88,8 +88,16 @@ func (lbc *localBlockCounter) WatchBlocks(ctx context.Context) <-chan uint64 {

// count is an internal function that counts up time to simulate the generation
// of blocks.
func (lbc *localBlockCounter) count() {
ticker := time.NewTicker(blockTime)
func (lbc *localBlockCounter) count(blockTime ...time.Duration) {
var resolvedBlockTime time.Duration
switch len(blockTime) {
case 1:
resolvedBlockTime = blockTime[0]
default:
resolvedBlockTime = defaultBlockTime
}

ticker := time.NewTicker(resolvedBlockTime)

for range ticker.C {
lbc.structMutex.Lock()
Expand Down Expand Up @@ -127,10 +135,10 @@ func (lbc *localBlockCounter) count() {
// BlockCounter creates a BlockCounter that runs completely locally. It is
// designed to simply increase block height at a set time interval in the
// background.
func BlockCounter() (chain.BlockCounter, error) {
func BlockCounter(blockTime ...time.Duration) (chain.BlockCounter, error) {
counter := localBlockCounter{blockHeight: 0, waiters: make(map[uint64][]chan uint64)}

go counter.count()
go counter.count(blockTime...)

return &counter, nil
}
6 changes: 3 additions & 3 deletions pkg/chain/local_v1/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,13 @@ func TestLocalBlockHeightWaiter(t *testing.T) {
},
"returns immediately for block height already reached": {
blockHeight: 2,
initialDelay: 3 * blockTime,
initialDelay: 3 * defaultBlockTime,
expectedWaitTime: 0,
},
"waits for block height not yet reached": {
blockHeight: 5,
initialDelay: 2 * blockTime,
expectedWaitTime: 3 * blockTime,
initialDelay: 2 * defaultBlockTime,
expectedWaitTime: 3 * defaultBlockTime,
},
}

Expand Down
24 changes: 6 additions & 18 deletions pkg/tbtc/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,26 +363,19 @@ type WalletCoordinatorChain interface {

// HeartbeatRequestSubmittedEvent represents a wallet heartbeat request
// submitted to the chain.
//
// TODO: Remove this type and all related code.
type HeartbeatRequestSubmittedEvent struct {
WalletPublicKeyHash [20]byte
Message []byte
Coordinator chain.Address
BlockNumber uint64
}

// DepositSweepProposal represents a deposit sweep proposal submitted to the chain.
type DepositSweepProposal struct {
WalletPublicKeyHash [20]byte
DepositsKeys []struct {
FundingTxHash bitcoin.Hash
FundingOutputIndex uint32
}
SweepTxFee *big.Int
DepositsRevealBlocks []*big.Int
}

// DepositSweepProposalSubmittedEvent represents a deposit sweep proposal
// submission event.
//
// TODO: Remove this type and all related code.
type DepositSweepProposalSubmittedEvent struct {
Proposal *DepositSweepProposal
Coordinator chain.Address
Expand All @@ -404,6 +397,8 @@ type DepositSweepProposalSubmittedEventFilter struct {

// RedemptionProposalSubmittedEvent represents a redemption proposal
// submission event.
//
// TODO: Remove this type and all related code.
type RedemptionProposalSubmittedEvent struct {
Proposal *RedemptionProposal
Coordinator chain.Address
Expand All @@ -423,13 +418,6 @@ type RedemptionProposalSubmittedEventFilter struct {
WalletPublicKeyHash [20]byte
}

// RedemptionProposal represents a redemption proposal submitted to the chain.
type RedemptionProposal struct {
WalletPublicKeyHash [20]byte
RedeemersOutputScripts []bitcoin.Script
RedemptionTxFee *big.Int
}

// RedemptionRequestedEvent represents a redemption requested event.
type RedemptionRequestedEvent struct {
WalletPublicKeyHash [20]byte
Expand Down
11 changes: 7 additions & 4 deletions pkg/tbtc/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,19 +794,22 @@ func buildRedemptionProposalValidationKey(
}

// Connect sets up the local chain.
func Connect() *localChain {
func Connect(blockTime ...time.Duration) *localChain {
operatorPrivateKey, _, err := operator.GenerateKeyPair(local_v1.DefaultCurve)
if err != nil {
panic(err)
}

return ConnectWithKey(operatorPrivateKey)
return ConnectWithKey(operatorPrivateKey, blockTime...)
}

// ConnectWithKey sets up the local chain using the provided operator private
// key.
func ConnectWithKey(operatorPrivateKey *operator.PrivateKey) *localChain {
blockCounter, _ := local_v1.BlockCounter()
func ConnectWithKey(
operatorPrivateKey *operator.PrivateKey,
blockTime ...time.Duration,
) *localChain {
blockCounter, _ := local_v1.BlockCounter(blockTime...)

localChain := &localChain{
dkgResultSubmissionHandlers: make(
Expand Down
257 changes: 257 additions & 0 deletions pkg/tbtc/coordination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
package tbtc

import (
"context"
"fmt"
"github.com/keep-network/keep-core/pkg/chain"
"github.com/keep-network/keep-core/pkg/generator"
"github.com/keep-network/keep-core/pkg/net"
"github.com/keep-network/keep-core/pkg/protocol/group"
"golang.org/x/sync/semaphore"
)

const (
// coordinationFrequencyBlocks is the number of blocks between two
// consecutive coordination windows.
coordinationFrequencyBlocks = 900
// coordinationActivePhaseDurationBlocks is the number of blocks in the
// active phase of the coordination window. The active phase is the
// phase during which the communication between the coordination leader and
// their followers is allowed.
coordinationActivePhaseDurationBlocks = 80
// coordinationPassivePhaseDurationBlocks is the number of blocks in the
// passive phase of the coordination window. The passive phase is the
// phase during which communication is not allowed. Participants are
// expected to validate the result of the coordination and prepare for
// execution of the proposed wallet action.
coordinationPassivePhaseDurationBlocks = 20
// coordinationDurationBlocks is the number of blocks in a single
// coordination window.
coordinationDurationBlocks = coordinationActivePhaseDurationBlocks +
coordinationPassivePhaseDurationBlocks
)

// errCoordinationExecutorBusy is an error returned when the coordination
// executor cannot execute the requested coordination due to an ongoing one.
var errCoordinationExecutorBusy = fmt.Errorf("coordination executor is busy")

// coordinationWindow represents a single coordination window. The coordination
// block is the first block of the window.
type coordinationWindow struct {
// coordinationBlock is the first block of the coordination window.
coordinationBlock uint64
}

// newCoordinationWindow creates a new coordination window for the given
// coordination block.
func newCoordinationWindow(coordinationBlock uint64) *coordinationWindow {
return &coordinationWindow{
coordinationBlock: coordinationBlock,
}
}

// ActivePhaseEndBlock returns the block number at which the active phase
// of the coordination window ends.
func (cw *coordinationWindow) activePhaseEndBlock() uint64 {
return cw.coordinationBlock + coordinationActivePhaseDurationBlocks
}

// EndBlock returns the block number at which the coordination window ends.
func (cw *coordinationWindow) endBlock() uint64 {
return cw.coordinationBlock + coordinationDurationBlocks
}

// isAfter returns true if this coordination window is after the other
// window.
func (cw *coordinationWindow) isAfter(other *coordinationWindow) bool {
if other == nil {
return true
}

return cw.coordinationBlock > other.coordinationBlock
}

// watchCoordinationWindows watches for new coordination windows and runs
// the given callback when a new window is detected. The callback is run
// in a separate goroutine. It is guaranteed that the callback is not run
// twice for the same window. The context passed as the first parameter
// is used to cancel the watch.
func watchCoordinationWindows(
ctx context.Context,
watchBlocksFn func(ctx context.Context) <-chan uint64,
onWindowFn func(window *coordinationWindow),
) {
blocksChan := watchBlocksFn(ctx)
var lastWindow *coordinationWindow

for {
select {
case block := <-blocksChan:
if block%coordinationFrequencyBlocks == 0 {
// Make sure the current window is not the same as the last one.
// There is no guarantee that the block channel will not emit
// the same block again.
if window := newCoordinationWindow(block); window.isAfter(lastWindow) {
lastWindow = window
// Run the callback in a separate goroutine to avoid blocking
// this loop and potentially missing the next block.
go onWindowFn(window)
}
}
case <-ctx.Done():
return
}
}
}

// CoordinationFaultType represents a type of the coordination fault.
type CoordinationFaultType uint8

const (
// FaultUnknown is a fault type used when the fault type is unknown.
FaultUnknown CoordinationFaultType = iota
// FaultLeaderIdleness is a fault type used when the leader was idle, i.e.
// missed their turn to propose a wallet action.
FaultLeaderIdleness
// FaultLeaderMistake is a fault type used when the leader's proposal
// turned out to be invalid.
FaultLeaderMistake
// FaultLeaderImpersonation is a fault type used when the leader was
// impersonated by another operator who raised their own proposal.
FaultLeaderImpersonation
)

func (cft CoordinationFaultType) String() string {
switch cft {
case FaultUnknown:
return "Unknown"
case FaultLeaderIdleness:
return "LeaderIdleness"
case FaultLeaderMistake:
return "FaultLeaderMistake"
case FaultLeaderImpersonation:
return "LeaderImpersonation"
default:
panic("unknown coordination fault type")
}
}

// coordinationFault represents a single coordination fault.
type coordinationFault struct {
// culprit is the address of the operator that is responsible for the fault.
culprit chain.Address
// faultType is the type of the fault.
faultType CoordinationFaultType
}

func (cf *coordinationFault) String() string {
return fmt.Sprintf(
"operator [%s], fault [%s]",
cf.culprit,
cf.faultType,
)
}

// coordinationProposal represents a single action proposal for the given wallet.
type coordinationProposal interface {
// actionType returns the specific type of the walletAction being subject
// of this proposal.
actionType() WalletActionType
// validityBlocks returns the number of blocks for which the proposal is
// valid.
validityBlocks() uint64
}

// noopProposal is a proposal that does not propose any action.
type noopProposal struct{}

func (np *noopProposal) actionType() WalletActionType {
return ActionNoop
}

func (np *noopProposal) validityBlocks() uint64 {
// Panic to make sure that the proposal is not processed by the node.
panic("noop proposal does not have validity blocks")
}

// coordinationResult represents the result of the coordination procedure
// executed for the given wallet in the given coordination window.
type coordinationResult struct {
wallet wallet
window *coordinationWindow
leader chain.Address
proposal coordinationProposal
faults []*coordinationFault
}

func (cr *coordinationResult) String() string {
return fmt.Sprintf(
"wallet [%s], window [%v], leader [%s], proposal [%s], faults [%s]",
&cr.wallet,
cr.window.coordinationBlock,
cr.leader,
cr.proposal.actionType(),
cr.faults,
)
}

// coordinationExecutor is responsible for executing the coordination
// procedure for the given wallet.
type coordinationExecutor struct {
lock *semaphore.Weighted

signers []*signer // TODO: Do we need whole signers?
broadcastChannel net.BroadcastChannel
membershipValidator *group.MembershipValidator
protocolLatch *generator.ProtocolLatch
}

// newCoordinationExecutor creates a new coordination executor for the
// given wallet.
func newCoordinationExecutor(
signers []*signer,
broadcastChannel net.BroadcastChannel,
membershipValidator *group.MembershipValidator,
protocolLatch *generator.ProtocolLatch,
) *coordinationExecutor {
return &coordinationExecutor{
lock: semaphore.NewWeighted(1),
signers: signers,
broadcastChannel: broadcastChannel,
membershipValidator: membershipValidator,
protocolLatch: protocolLatch,
}
}

// wallet returns the wallet this executor is responsible for.
func (ce *coordinationExecutor) wallet() wallet {
// All signers belong to one wallet. Take that wallet from the
// first signer.
return ce.signers[0].wallet
}

// coordinate executes the coordination procedure for the given coordination
// window.
func (ce *coordinationExecutor) coordinate(
window *coordinationWindow,
) (*coordinationResult, error) {
if lockAcquired := ce.lock.TryAcquire(1); !lockAcquired {
return nil, errCoordinationExecutorBusy
}
defer ce.lock.Release(1)

// TODO: Implement coordination logic. Remember about:
// - Setting up the right context
// - Using the protocol latch
// - Using the membership validator
// Example result:
result := &coordinationResult{
wallet: ce.wallet(),
window: window,
leader: ce.wallet().signingGroupOperators[0],
proposal: &noopProposal{},
faults: nil,
}

return result, nil
}
Loading

0 comments on commit 35f9ceb

Please sign in to comment.