From d6e9219985aa1b340d42beeca2166f4ff87224e9 Mon Sep 17 00:00:00 2001 From: Tomasz Slabon Date: Mon, 22 Apr 2024 14:58:04 +0200 Subject: [PATCH] Added inactivity claim executor caching --- pkg/tbtc/node.go | 107 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 98 insertions(+), 9 deletions(-) diff --git a/pkg/tbtc/node.go b/pkg/tbtc/node.go index f197c28310..3d16e609dd 100644 --- a/pkg/tbtc/node.go +++ b/pkg/tbtc/node.go @@ -71,6 +71,18 @@ type node struct { // the map is the uncompressed public key (with 04 prefix) of the wallet. heartbeatFailureCounters map[string]*uint + inactivityClaimExecutorMutex sync.Mutex + // inactivityClaimExecutors is the cache holding inactivity claim executors + // for specific wallets. The cache key is the uncompressed public key + // (with 04 prefix) of the wallet. + // inactivityClaimExecutor encapsulates the logic of handling inactivity + // claim signing and submitting. + // + // inactivityClaimExecutors MUST NOT be used outside this struct. Please use + // wallet actions and walletDispatcher to execute an action on an existing + // wallet. + inactivityClaimExecutors map[string]*inactivityClaimExecutor + signingExecutorsMutex sync.Mutex // signingExecutors is the cache holding signing executors for specific wallets. // The cache key is the uncompressed public key (with 04 prefix) of the wallet. @@ -435,23 +447,88 @@ func (n *node) getCoordinationExecutor( return executor, true, nil } -func (n *node) getInactivityNotifier( +// getInactivityClaimExecutor gets the inactivity claim executor responsible for +// executing inactivity claim signing and submission related to a specific +// wallet whose part is controlled by this node. The second boolean return value +// indicates whether the node controls at least one signer for the given wallet. +func (n *node) getInactivityClaimExecutor( walletPublicKey *ecdsa.PublicKey, -) (*inactivityClaimExecutor, error) { +) (*inactivityClaimExecutor, bool, error) { + n.inactivityClaimExecutorMutex.Lock() + defer n.inactivityClaimExecutorMutex.Unlock() + + walletPublicKeyBytes, err := marshalPublicKey(walletPublicKey) + if err != nil { + return nil, false, fmt.Errorf("cannot marshal wallet public key: [%v]", err) + } + + executorKey := hex.EncodeToString(walletPublicKeyBytes) + + if executor, exists := n.inactivityClaimExecutors[executorKey]; exists { + return executor, true, nil + } + + executorLogger := logger.With( + zap.String("wallet", fmt.Sprintf("0x%x", walletPublicKeyBytes)), + ) + signers := n.walletRegistry.getSigners(walletPublicKey) if len(signers) == 0 { // This is not an error because the node simply does not control // the given wallet. - return nil, nil + return nil, false, nil + } + + // All signers belong to one wallet. Take that wallet from the first signer. + wallet := signers[0].wallet + + channelName := fmt.Sprintf( + "%s-%s", + ProtocolName, + hex.EncodeToString(walletPublicKeyBytes), + ) + + broadcastChannel, err := n.netProvider.BroadcastChannelFor(channelName) + if err != nil { + return nil, false, fmt.Errorf("failed to get broadcast channel: [%v]", err) + } + + // TODO: Handle unmarshallers + + // signing.RegisterUnmarshallers(broadcastChannel) + // announcer.RegisterUnmarshaller(broadcastChannel) + // broadcastChannel.SetUnmarshaler(func() net.TaggedUnmarshaler { + // return &signingDoneMessage{} + // }) + + membershipValidator := group.NewMembershipValidator( + executorLogger, + wallet.signingGroupOperators, + n.chain.Signing(), + ) + + err = broadcastChannel.SetFilter(membershipValidator.IsInGroup) + if err != nil { + return nil, false, fmt.Errorf( + "could not set filter for channel [%v]: [%v]", + broadcastChannel.Name(), + err, + ) } - inactivityNotifier := newInactivityClaimExecutor( + executorLogger.Infof( + "signing executor created; controlling [%v] signers", + len(signers), + ) + + executor := newInactivityClaimExecutor( n.chain, signers, ) - // TODO: Continue with the implementation. - return inactivityNotifier, nil + n.inactivityClaimExecutors[executorKey] = executor + + return executor, true, nil } // handleHeartbeatProposal handles an incoming heartbeat proposal by @@ -492,9 +569,21 @@ func (n *node) handleHeartbeatProposal( return } - inactivityNotifier, err := n.getInactivityNotifier(wallet.publicKey) + inactivityClaimExecutor, ok, err := n.getInactivityClaimExecutor(wallet.publicKey) if err != nil { - logger.Errorf("cannot get inactivity operator: [%v]", err) + logger.Errorf("cannot get inactivity claim executor: [%v]", err) + return + } + // This check is actually redundant. We know the node controls some + // wallet signers as we just got the wallet from the registry using their + // public key hash. However, we are doing it just in case. The API + // contract of getInactivityClaimExecutor may change one day. + if !ok { + logger.Infof( + "node does not control signers of wallet [0x%x]; "+ + "ignoring the received heartbeat request", + walletPublicKeyBytes, + ) return } @@ -520,7 +609,7 @@ func (n *node) handleHeartbeatProposal( signingExecutor, proposal, heartbeatFailureCounter, - inactivityNotifier, + inactivityClaimExecutor, startBlock, expiryBlock, n.waitForBlockHeight,