Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3/3]: Implement Consumer on chainWatcher and resolvers #10

Open
wants to merge 64 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
ed2989a
multi: update to fn v2
ProofOfKeags Dec 3, 2024
117c6bc
multi: move routing.TlvTrafficShaper => htlcswitch.AuxTrafficShaper
guggero Dec 3, 2024
a2e78c3
htlcswitch: thread through packet's inbound wire records
guggero Dec 4, 2024
86b3be7
multi: thread through and use AuxTrafficShaper
guggero Dec 4, 2024
17bc882
contractcourt: refactor start up of arbitrators
ziggie1984 Dec 3, 2024
0004e31
docs: add release-notes
ziggie1984 Nov 13, 2024
8d7f085
Merge pull request #9324 from ziggie1984/non-blocking-startup-chan-ar…
Roasbeef Dec 5, 2024
366e48b
Merge pull request #9333 from guggero/aux-traffic-shaper-refactor
Roasbeef Dec 5, 2024
ac59b06
Update ruby.md
Guayaba221 Dec 6, 2024
5659c01
Merge pull request #9337 from Guayaba221/patch-1
guggero Dec 6, 2024
7374392
lnrpc: sort `Invoice.HTLCs` based on `HtlcIndex`
yyforyongyu Dec 5, 2024
c9ae63a
Merge pull request #9338 from yyforyongyu/fix-invoice-htlcs-order
guggero Dec 9, 2024
fb429d6
Merge pull request #9330 from ProofOfKeags/update/fn2
Roasbeef Dec 9, 2024
eee4dbd
sweep: add new state `TxFatal` for erroneous sweepings
yyforyongyu Apr 30, 2024
cfbfae6
sweep: add new error `ErrZeroFeeRateDelta`
yyforyongyu Oct 25, 2024
20b71a8
sweep: add new interface method `Immediate`
yyforyongyu Oct 25, 2024
fcd47e9
sweep: handle inputs locally instead of relying on the tx
yyforyongyu Oct 25, 2024
6f1511a
sweep: add `handleInitialBroadcast` to handle initial broadcast
yyforyongyu Oct 25, 2024
f4635a2
sweep: remove redundant error from `Broadcast`
yyforyongyu Apr 30, 2024
f7b301d
sweep: add method `handleBumpEventError` and fix `markInputFailed`
yyforyongyu Apr 30, 2024
b3e7f4c
sweep: add method `isMature` on `SweeperInput`
yyforyongyu Apr 30, 2024
2234937
sweep: make sure defaultDeadline is derived from the mature height
yyforyongyu Apr 30, 2024
7d4ccb7
sweep: remove redundant loopvar assign
yyforyongyu Oct 25, 2024
383b6a8
sweep: break `initialBroadcast` into two steps
yyforyongyu Nov 7, 2024
74b7e5b
sweep: make sure nil tx is handled
yyforyongyu Nov 7, 2024
8307534
chainio: introduce `chainio` to handle block synchronization
yyforyongyu Jun 27, 2024
c366e41
chainio: implement `Blockbeat`
yyforyongyu Jun 27, 2024
f8e1f2d
chainio: add helper methods to dispatch beats
yyforyongyu Oct 31, 2024
dfae94b
chainio: add `BlockbeatDispatcher` to dispatch blockbeats
yyforyongyu Jun 27, 2024
87f2f84
chainio: add partial implementation of `Consumer` interface
yyforyongyu Oct 17, 2024
5ac6198
multi: implement `Consumer` on subsystems
yyforyongyu Oct 29, 2024
798629d
sweep: remove block subscription in `UtxoSweeper` and `TxPublisher`
yyforyongyu Jun 4, 2024
9ecd072
sweep: remove redundant notifications during shutdown
yyforyongyu Nov 18, 2024
4622db2
contractcourt: remove `waitForHeight` in resolvers
yyforyongyu Jun 4, 2024
6f986a9
contractcourt: remove block subscription in chain arbitrator
yyforyongyu Oct 29, 2024
0d65d78
contractcourt: remove block subscription in channel arbitrator
yyforyongyu Oct 29, 2024
d61f054
contractcourt: remove the `immediate` param used in `Resolve`
yyforyongyu Jun 4, 2024
8261d2a
contractcourt: start channel arbitrator with blockbeat
yyforyongyu Oct 29, 2024
c1ec436
multi: start consumers with a starting blockbeat
yyforyongyu Oct 29, 2024
947286c
lnd: add new method `startLowLevelServices`
yyforyongyu Oct 17, 2024
da68b66
lnd: start `blockbeatDispatcher` and register consumers
yyforyongyu Oct 17, 2024
0579c43
contractcourt: fix linter `funlen`
yyforyongyu Oct 29, 2024
266cb6e
multi: improve loggings
yyforyongyu May 22, 2024
837d5b0
chainio: use `errgroup` to limit num of goroutines
yyforyongyu Nov 19, 2024
5f37d93
chainio: update `fn` to `v2`
yyforyongyu Dec 10, 2024
d2c96af
contractcourt: add verbose logging in resolvers
yyforyongyu Jun 20, 2024
8e69c43
contractcourt: add spend path helpers in timeout/success resolver
yyforyongyu Nov 13, 2024
5d75a0e
contractcourt: add sweep senders in `htlcSuccessResolver`
yyforyongyu Nov 14, 2024
1d89884
contractcourt: add resolver handlers in `htlcSuccessResolver`
yyforyongyu Nov 14, 2024
ba016c2
contractcourt: remove redundant return value in `claimCleanUp`
yyforyongyu Nov 14, 2024
2bec82c
contractcourt: add sweep senders in `htlcTimeoutResolver`
yyforyongyu Nov 14, 2024
cd2d709
contractcourt: add methods to checkpoint states
yyforyongyu Jul 16, 2024
60bfafc
contractcourt: add resolve handlers in `htlcTimeoutResolver`
yyforyongyu Jul 16, 2024
03b6bc6
contractcourt: add `Launch` method to anchor/breach resolver
yyforyongyu Jun 24, 2024
107a068
contractcourt: add `Launch` method to commit resolver
yyforyongyu Jun 20, 2024
05f5b7d
contractcourt: add `Launch` method to htlc success resolver
yyforyongyu Jul 15, 2024
7b99e49
contractcourt: add `Launch` method to htlc timeout resolver
yyforyongyu Jul 16, 2024
3b4e937
invoices: exit early when the subscriber chan is nil
yyforyongyu Nov 17, 2024
018a108
contractcourt: add `Launch` method to incoming contest resolver
yyforyongyu Nov 17, 2024
7b8cf8a
contractcourt: add `Launch` method to outgoing contest resolver
yyforyongyu Jun 20, 2024
5d9f5de
contractcourt: fix concurrent access to `resolved`
yyforyongyu Jul 10, 2024
8ee2b27
contractcourt: fix concurrent access to `launched`
yyforyongyu Jul 11, 2024
1fee03d
contractcourt: break `launchResolvers` into two steps
yyforyongyu Jun 25, 2024
cebad6d
contractcourt: offer outgoing htlc one block earlier before its expiry
yyforyongyu Nov 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions aliasmgr/aliasmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"sync"

"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
Expand Down Expand Up @@ -432,9 +432,9 @@ func (m *Manager) DeleteLocalAlias(alias,
}

// We'll filter the alias set and remove the alias from it.
aliasSet = fn.Filter(func(a lnwire.ShortChannelID) bool {
aliasSet = fn.Filter(aliasSet, func(a lnwire.ShortChannelID) bool {
return a.ToUint64() != alias.ToUint64()
}, aliasSet)
})

// If the alias set is empty, we'll delete the base SCID from the
// baseToSet map.
Expand Down Expand Up @@ -514,11 +514,17 @@ func (m *Manager) RequestAlias() (lnwire.ShortChannelID, error) {
// haveAlias returns true if the passed alias is already assigned to a
// channel in the baseToSet map.
haveAlias := func(maybeNextAlias lnwire.ShortChannelID) bool {
return fn.Any(func(aliasList []lnwire.ShortChannelID) bool {
return fn.Any(func(alias lnwire.ShortChannelID) bool {
return alias == maybeNextAlias
}, aliasList)
}, maps.Values(m.baseToSet))
return fn.Any(
maps.Values(m.baseToSet),
func(aliasList []lnwire.ShortChannelID) bool {
return fn.Any(
aliasList,
func(alias lnwire.ShortChannelID) bool {
return alias == maybeNextAlias
},
)
},
)
}

err := kvdb.Update(m.backend, func(tx kvdb.RwTx) error {
Expand Down
152 changes: 152 additions & 0 deletions chainio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Chainio

`chainio` is a package designed to provide blockchain data access to various
subsystems within `lnd`. When a new block is received, it is encapsulated in a
`Blockbeat` object and disseminated to all registered consumers. Consumers may
receive these updates either concurrently or sequentially, based on their
registration configuration, ensuring that each subsystem maintains a
synchronized view of the current block state.

The main components include:

- `Blockbeat`: An interface that provides information about the block.

- `Consumer`: An interface that specifies how subsystems handle the blockbeat.

- `BlockbeatDispatcher`: The core service responsible for receiving each block
and distributing it to all consumers.

Additionally, the `BeatConsumer` struct provides a partial implementation of
the `Consumer` interface. This struct helps reduce code duplication, allowing
subsystems to avoid re-implementing the `ProcessBlock` method and provides a
commonly used `NotifyBlockProcessed` method.


### Register a Consumer

Consumers within the same queue are notified **sequentially**, while all queues
are notified **concurrently**. A queue consists of a slice of consumers, which
are notified in left-to-right order. Developers are responsible for determining
dependencies in block consumption across subsystems: independent subsystems
should be notified concurrently, whereas dependent subsystems should be
notified sequentially.

To notify the consumers concurrently, put them in different queues,
```go
// consumer1 and consumer2 will be notified concurrently.
queue1 := []chainio.Consumer{consumer1}
blockbeatDispatcher.RegisterQueue(consumer1)

queue2 := []chainio.Consumer{consumer2}
blockbeatDispatcher.RegisterQueue(consumer2)
```

To notify the consumers sequentially, put them in the same queue,
```go
// consumers will be notified sequentially via,
// consumer1 -> consumer2 -> consumer3
queue := []chainio.Consumer{
consumer1,
consumer2,
consumer3,
}
blockbeatDispatcher.RegisterQueue(queue)
```

### Implement the `Consumer` Interface

Implementing the `Consumer` interface is straightforward. Below is an example
of how
[`sweep.TxPublisher`](https://github.com/lightningnetwork/lnd/blob/5cec466fad44c582a64cfaeb91f6d5fd302fcf85/sweep/fee_bumper.go#L310)
implements this interface.

To start, embed the partial implementation `chainio.BeatConsumer`, which
already provides the `ProcessBlock` implementation and commonly used
`NotifyBlockProcessed` method, and exposes `BlockbeatChan` for the consumer to
receive blockbeats.

```go
type TxPublisher struct {
started atomic.Bool
stopped atomic.Bool

chainio.BeatConsumer

...
```

We should also remember to initialize this `BeatConsumer`,

```go
...
// Mount the block consumer.
tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
```

Finally, in the main event loop, read from `BlockbeatChan`, process the
received blockbeat, and, crucially, call `tp.NotifyBlockProcessed` to inform
the blockbeat dispatcher that processing is complete.

```go
for {
select {
case beat := <-tp.BlockbeatChan:
// Consume this blockbeat, usually it means updating the subsystem
// using the new block data.

// Notify we've processed the block.
tp.NotifyBlockProcessed(beat, nil)

...
```

### Existing Queues

Currently, we have a single queue of consumers dedicated to handling force
closures. This queue includes `ChainArbitrator`, `UtxoSweeper`, and
`TxPublisher`, with `ChainArbitrator` managing two internal consumers:
`chainWatcher` and `ChannelArbitrator`. The blockbeat flows sequentially
through the chain as follows: `ChainArbitrator => chainWatcher =>
ChannelArbitrator => UtxoSweeper => TxPublisher`. The following diagram
illustrates the flow within the public subsystems.

```mermaid
sequenceDiagram
autonumber
participant bb as BlockBeat
participant cc as ChainArb
participant us as UtxoSweeper
participant tp as TxPublisher

note left of bb: 0. received block x,<br>dispatching...

note over bb,cc: 1. send block x to ChainArb,<br>wait for its done signal
bb->>cc: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
cc->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end

note over bb,us: 2. send block x to UtxoSweeper, wait for its done signal
bb->>us: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
us->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end

note over bb,tp: 3. send block x to TxPublisher, wait for its done signal
bb->>tp: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
tp->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end
```
54 changes: 54 additions & 0 deletions chainio/blockbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package chainio

import (
"fmt"

"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/chainntnfs"
)

// Beat implements the Blockbeat interface. It contains the block epoch and a
// customized logger.
//
// TODO(yy): extend this to check for confirmation status - which serves as the
// single source of truth, to avoid the potential race between receiving blocks
// and `GetTransactionDetails/RegisterSpendNtfn/RegisterConfirmationsNtfn`.
type Beat struct {
// epoch is the current block epoch the blockbeat is aware of.
epoch chainntnfs.BlockEpoch

// log is the customized logger for the blockbeat which prints the
// block height.
log btclog.Logger
}

// Compile-time check to ensure Beat satisfies the Blockbeat interface.
var _ Blockbeat = (*Beat)(nil)

// NewBeat creates a new beat with the specified block epoch and a customized
// logger.
func NewBeat(epoch chainntnfs.BlockEpoch) *Beat {
b := &Beat{
epoch: epoch,
}

// Create a customized logger for the blockbeat.
logPrefix := fmt.Sprintf("Height[%6d]:", b.Height())
b.log = clog.WithPrefix(logPrefix)

return b
}

// Height returns the height of the block epoch.
//
// NOTE: Part of the Blockbeat interface.
func (b *Beat) Height() int32 {
return b.epoch.Height
}

// logger returns the logger for the blockbeat.
//
// NOTE: Part of the private blockbeat interface.
func (b *Beat) logger() btclog.Logger {
return b.log
}
28 changes: 28 additions & 0 deletions chainio/blockbeat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package chainio

import (
"errors"
"testing"

"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/stretchr/testify/require"
)

var errDummy = errors.New("dummy error")

// TestNewBeat tests the NewBeat and Height functions.
func TestNewBeat(t *testing.T) {
t.Parallel()

// Create a testing epoch.
epoch := chainntnfs.BlockEpoch{
Height: 1,
}

// Create the beat and check the internal state.
beat := NewBeat(epoch)
require.Equal(t, epoch, beat.epoch)

// Check the height function.
require.Equal(t, epoch.Height, beat.Height())
}
113 changes: 113 additions & 0 deletions chainio/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package chainio

// BeatConsumer defines a supplementary component that should be used by
// subsystems which implement the `Consumer` interface. It partially implements
// the `Consumer` interface by providing the method `ProcessBlock` such that
// subsystems don't need to re-implement it.
//
// While inheritance is not commonly used in Go, subsystems embedding this
// struct cannot pass the interface check for `Consumer` because the `Name`
// method is not implemented, which gives us a "mortise and tenon" structure.
// In addition to reducing code duplication, this design allows `ProcessBlock`
// to work on the concrete type `Beat` to access its internal states.
type BeatConsumer struct {
// BlockbeatChan is a channel to receive blocks from Blockbeat. The
// received block contains the best known height and the txns confirmed
// in this block.
BlockbeatChan chan Blockbeat

// name is the name of the consumer which embeds the BlockConsumer.
name string

// quit is a channel that closes when the BlockConsumer is shutting
// down.
//
// NOTE: this quit channel should be mounted to the same quit channel
// used by the subsystem.
quit chan struct{}

// errChan is a buffered chan that receives an error returned from
// processing this block.
errChan chan error
}

// NewBeatConsumer creates a new BlockConsumer.
func NewBeatConsumer(quit chan struct{}, name string) BeatConsumer {
// Refuse to start `lnd` if the quit channel is not initialized. We
// treat this case as if we are facing a nil pointer dereference, as
// there's no point to return an error here, which will cause the node
// to fail to be started anyway.
if quit == nil {
panic("quit channel is nil")
}

b := BeatConsumer{
BlockbeatChan: make(chan Blockbeat),
name: name,
errChan: make(chan error, 1),
quit: quit,
}

return b
}

// ProcessBlock takes a blockbeat and sends it to the consumer's blockbeat
// channel. It will send it to the subsystem's BlockbeatChan, and block until
// the processed result is received from the subsystem. The subsystem must call
// `NotifyBlockProcessed` after it has finished processing the block.
//
// NOTE: part of the `chainio.Consumer` interface.
func (b *BeatConsumer) ProcessBlock(beat Blockbeat) error {
// Update the current height.
beat.logger().Tracef("set current height for [%s]", b.name)

select {
// Send the beat to the blockbeat channel. It's expected that the
// consumer will read from this channel and process the block. Once
// processed, it should return the error or nil to the beat.Err chan.
case b.BlockbeatChan <- beat:
beat.logger().Tracef("Sent blockbeat to [%s]", b.name)

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown before sending "+
"beat", b.name)

return nil
}

// Check the consumer's err chan. We expect the consumer to call
// `beat.NotifyBlockProcessed` to send the error back here.
select {
case err := <-b.errChan:
beat.logger().Debugf("[%s] processed beat: err=%v", b.name, err)

return err

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown", b.name)
}

return nil
}

// NotifyBlockProcessed signals that the block has been processed. It takes the
// blockbeat being processed and an error resulted from processing it. This
// error is then sent back to the consumer's err chan to unblock
// `ProcessBlock`.
//
// NOTE: This method must be called by the subsystem after it has finished
// processing the block.
func (b *BeatConsumer) NotifyBlockProcessed(beat Blockbeat, err error) {
// Update the current height.
beat.logger().Debugf("[%s]: notifying beat processed", b.name)

select {
case b.errChan <- err:
beat.logger().Debugf("[%s]: notified beat processed, err=%v",
b.name, err)

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown before notifying "+
"beat processed", b.name)
}
}
Loading
Loading