Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wtclient: Tower Client Multiplexer #7702

Merged
merged 12 commits into from
Dec 5, 2023
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.18.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@
In particular, the complexity involved in the lifecycle loop has been
decoupled into logical steps, with each step having its own responsibility,
making it easier to reason about the payment flow.

* [Add a watchtower tower client
multiplexer](https://github.com/lightningnetwork/lnd/pull/7702) to manage
tower clients of different types.

## Breaking Changes
## Performance Improvements
Expand Down
2 changes: 1 addition & 1 deletion htlcswitch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ type TowerClient interface {
// parameters within the client. This should be called during link
// startup to ensure that the client is able to support the link during
// operation.
RegisterChannel(lnwire.ChannelID) error
RegisterChannel(lnwire.ChannelID, channeldb.ChannelType) error

// BackupState initiates a request to back up a particular revoked
// state. If the method returns nil, the backup is guaranteed to be
Expand Down
4 changes: 3 additions & 1 deletion htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,9 @@ func (l *channelLink) Start() error {
// If the config supplied watchtower client, ensure the channel is
// registered before trying to use it during operation.
if l.cfg.TowerClient != nil {
err := l.cfg.TowerClient.RegisterChannel(l.ChanID())
err := l.cfg.TowerClient.RegisterChannel(
l.ChanID(), l.channel.State().ChanType,
)
if err != nil {
return err
}
Expand Down
10 changes: 3 additions & 7 deletions lnrpc/wtclientrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@ type Config struct {
// Active indicates if the watchtower client is enabled.
Active bool

// Client is the backing watchtower client that we'll interact with
// through the watchtower RPC subserver.
Client wtclient.Client

// AnchorClient is the backing watchtower client for anchor channels that
// we'll interact through the watchtower RPC subserver.
AnchorClient wtclient.Client
// ClientMgr is a tower client manager that manages a set of tower
// clients.
ClientMgr wtclient.ClientManager

// Resolver is a custom resolver that will be used to resolve watchtower
// addresses to ensure we don't leak any information when running over
Expand Down
156 changes: 67 additions & 89 deletions lnrpc/wtclientrpc/wtclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/watchtower"
"github.com/lightningnetwork/lnd/watchtower/blob"
"github.com/lightningnetwork/lnd/watchtower/wtclient"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtpolicy"
"google.golang.org/grpc"
"gopkg.in/macaroon-bakery.v2/bakery"
)
Expand Down Expand Up @@ -208,11 +208,7 @@ func (c *WatchtowerClient) AddTower(ctx context.Context,
Address: addr,
}

// TODO(conner): make atomic via multiplexed client
guggero marked this conversation as resolved.
Show resolved Hide resolved
if err := c.cfg.Client.AddTower(towerAddr); err != nil {
return nil, err
}
if err := c.cfg.AnchorClient.AddTower(towerAddr); err != nil {
if err := c.cfg.ClientMgr.AddTower(towerAddr); err != nil {
return nil, err
}

Expand Down Expand Up @@ -247,12 +243,7 @@ func (c *WatchtowerClient) RemoveTower(ctx context.Context,
}
}

// TODO(conner): make atomic via multiplexed client
err = c.cfg.Client.RemoveTower(pubKey, addr)
if err != nil {
return nil, err
}
err = c.cfg.AnchorClient.RemoveTower(pubKey, addr)
err = c.cfg.ClientMgr.RemoveTower(pubKey, addr)
if err != nil {
return nil, err
}
Expand All @@ -272,44 +263,40 @@ func (c *WatchtowerClient) ListTowers(ctx context.Context,
req.IncludeSessions, req.ExcludeExhaustedSessions,
)

anchorTowers, err := c.cfg.AnchorClient.RegisteredTowers(opts...)
if err != nil {
return nil, err
}

// Collect all the anchor client towers.
rpcTowers := make(map[wtdb.TowerID]*Tower)
for _, tower := range anchorTowers {
rpcTower := marshallTower(
tower, PolicyType_ANCHOR, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)

rpcTowers[tower.ID] = rpcTower
}

legacyTowers, err := c.cfg.Client.RegisteredTowers(opts...)
towersPerBlobType, err := c.cfg.ClientMgr.RegisteredTowers(opts...)
if err != nil {
return nil, err
}

// Collect all the legacy client towers. If it has any of the same
// towers that the anchors client has, then just add the session info
// for the legacy client to the existing tower.
for _, tower := range legacyTowers {
rpcTower := marshallTower(
tower, PolicyType_LEGACY, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)

t, ok := rpcTowers[tower.ID]
if !ok {
rpcTowers[tower.ID] = rpcTower
continue
rpcTowers := make(map[wtdb.TowerID]*Tower)
for blobType, towers := range towersPerBlobType {
policyType := PolicyType_LEGACY
if blobType.IsAnchorChannel() {
policyType = PolicyType_ANCHOR
}

t.SessionInfo = append(t.SessionInfo, rpcTower.SessionInfo...)
t.Sessions = append(t.Sessions, rpcTower.Sessions...)
for _, tower := range towers {
rpcTower := marshallTower(
tower, policyType, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)

t, ok := rpcTowers[tower.ID]
if !ok {
rpcTowers[tower.ID] = rpcTower
continue
}

t.SessionInfo = append(
t.SessionInfo, rpcTower.SessionInfo...,
guggero marked this conversation as resolved.
Show resolved Hide resolved
)
t.Sessions = append(
t.Sessions, rpcTower.Sessions...,
)
}
}

towers := make([]*Tower, 0, len(rpcTowers))
Expand Down Expand Up @@ -337,40 +324,42 @@ func (c *WatchtowerClient) GetTowerInfo(ctx context.Context,
req.IncludeSessions, req.ExcludeExhaustedSessions,
)

// Get the tower and its sessions from anchors client.
tower, err := c.cfg.AnchorClient.LookupTower(pubKey, opts...)
towersPerBlobType, err := c.cfg.ClientMgr.LookupTower(pubKey, opts...)
if err != nil {
return nil, err
}
rpcTower := marshallTower(
tower, PolicyType_ANCHOR, req.IncludeSessions, ackCounts,
committedUpdateCounts,
)

// Get the tower and its sessions from legacy client.
tower, err = c.cfg.Client.LookupTower(pubKey, opts...)
if err != nil {
return nil, err
}
var resTower *Tower
for blobType, tower := range towersPerBlobType {
policyType := PolicyType_LEGACY
if blobType.IsAnchorChannel() {
policyType = PolicyType_ANCHOR
}

rpcLegacyTower := marshallTower(
tower, PolicyType_LEGACY, req.IncludeSessions, ackCounts,
committedUpdateCounts,
)
rpcTower := marshallTower(
tower, policyType, req.IncludeSessions,
ackCounts, committedUpdateCounts,
)

if !bytes.Equal(rpcTower.Pubkey, rpcLegacyTower.Pubkey) {
return nil, fmt.Errorf("legacy and anchor clients returned " +
"inconsistent results for the given tower")
}
if resTower == nil {
resTower = rpcTower
continue
}

rpcTower.SessionInfo = append(
rpcTower.SessionInfo, rpcLegacyTower.SessionInfo...,
)
rpcTower.Sessions = append(
rpcTower.Sessions, rpcLegacyTower.Sessions...,
)
if !bytes.Equal(rpcTower.Pubkey, resTower.Pubkey) {
return nil, fmt.Errorf("tower clients returned " +
"inconsistent results for the given tower")
}

return rpcTower, nil
resTower.SessionInfo = append(
guggero marked this conversation as resolved.
Show resolved Hide resolved
resTower.SessionInfo, rpcTower.SessionInfo...,
)
resTower.Sessions = append(
resTower.Sessions, rpcTower.Sessions...,
)
}

return resTower, nil
}

// constructFunctionalOptions is a helper function that constructs a list of
Expand Down Expand Up @@ -422,30 +411,14 @@ func constructFunctionalOptions(includeSessions,
}

// Stats returns the in-memory statistics of the client since startup.
func (c *WatchtowerClient) Stats(ctx context.Context,
req *StatsRequest) (*StatsResponse, error) {
func (c *WatchtowerClient) Stats(_ context.Context,
_ *StatsRequest) (*StatsResponse, error) {

if err := c.isActive(); err != nil {
return nil, err
}

clientStats := []wtclient.ClientStats{
c.cfg.Client.Stats(),
c.cfg.AnchorClient.Stats(),
}

var stats wtclient.ClientStats
for i := range clientStats {
// Grab a reference to the slice index rather than copying bc
// ClientStats contains a lock which cannot be copied by value.
stat := &clientStats[i]

stats.NumTasksAccepted += stat.NumTasksAccepted
stats.NumTasksIneligible += stat.NumTasksIneligible
stats.NumTasksPending += stat.NumTasksPending
stats.NumSessionsAcquired += stat.NumSessionsAcquired
stats.NumSessionsExhausted += stat.NumSessionsExhausted
}
stats := c.cfg.ClientMgr.Stats()

return &StatsResponse{
NumBackups: uint32(stats.NumTasksAccepted),
Expand All @@ -464,17 +437,22 @@ func (c *WatchtowerClient) Policy(ctx context.Context,
return nil, err
}

var policy wtpolicy.Policy
var blobType blob.Type
switch req.PolicyType {
case PolicyType_LEGACY:
policy = c.cfg.Client.Policy()
blobType = blob.TypeAltruistCommit
case PolicyType_ANCHOR:
policy = c.cfg.AnchorClient.Policy()
blobType = blob.TypeAltruistAnchorCommit
default:
return nil, fmt.Errorf("unknown policy type: %v",
req.PolicyType)
}

policy, err := c.cfg.ClientMgr.Policy(blobType)
if err != nil {
return nil, err
}

return &PolicyResponse{
MaxUpdates: uint32(policy.MaxUpdates),
SweepSatPerVbyte: uint32(policy.SweepFeeRate.FeePerVByte()),
Expand Down
22 changes: 5 additions & 17 deletions peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,8 @@ type Config struct {
// HtlcNotifier is used when creating a ChannelLink.
HtlcNotifier *htlcswitch.HtlcNotifier

// TowerClient is used by legacy channels to backup revoked states.
TowerClient wtclient.Client

// AnchorTowerClient is used by anchor channels to backup revoked
// states.
AnchorTowerClient wtclient.Client
// TowerClient is used to backup revoked states.
TowerClient wtclient.ClientManager

// DisconnectPeer is used to disconnect this peer if the cooperative close
// process fails.
Expand Down Expand Up @@ -1040,14 +1036,8 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
return p.cfg.ChainArb.NotifyContractUpdate(*chanPoint, update)
}

chanType := lnChan.State().ChanType

// Select the appropriate tower client based on the channel type. It's
// okay if the clients are disabled altogether and these values are nil,
// as the link will check for nilness before using either.
var towerClient htlcswitch.TowerClient
switch {
case chanType.IsTaproot():
var towerClient wtclient.ClientManager
if lnChan.ChanType().IsTaproot() {
// Leave the tower client as nil for now until the tower client
// has support for taproot channels.
//
Expand All @@ -1060,9 +1050,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
"are not yet taproot channel compatible",
chanPoint)
}
case chanType.HasAnchors():
towerClient = p.cfg.AnchorTowerClient
default:
} else {
guggero marked this conversation as resolved.
Show resolved Hide resolved
towerClient = p.cfg.TowerClient
}

Expand Down
9 changes: 4 additions & 5 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,11 +743,10 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
r.cfg, s.cc, r.cfg.networkDir, macService, atpl, invoiceRegistry,
s.htlcSwitch, r.cfg.ActiveNetParams.Params, s.chanRouter,
routerBackend, s.nodeSigner, s.graphDB, s.chanStateDB,
s.sweeper, tower, s.towerClient, s.anchorTowerClient,
r.cfg.net.ResolveTCPAddr, genInvoiceFeatures,
genAmpInvoiceFeatures, s.getNodeAnnouncement,
s.updateAndBrodcastSelfNode, parseAddr, rpcsLog,
s.aliasMgr.GetPeerAlias,
s.sweeper, tower, s.towerClientMgr, r.cfg.net.ResolveTCPAddr,
genInvoiceFeatures, genAmpInvoiceFeatures,
s.getNodeAnnouncement, s.updateAndBrodcastSelfNode, parseAddr,
rpcsLog, s.aliasMgr.GetPeerAlias,
)
if err != nil {
return err
Expand Down
Loading
Loading