Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2/3]: implement blockbeat #9

Open
wants to merge 42 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
5486f32
multi: introduce an option for resolutions
ziggie1984 Nov 12, 2024
ed6a246
rpcserver: add robustness check
ziggie1984 Nov 13, 2024
879041b
docs: add release notes
ziggie1984 Nov 12, 2024
c809340
chanacceptor: add custom channel commitment type
guggero Nov 20, 2024
1c3caf4
docs: add release notes
guggero Nov 20, 2024
4b563e6
Merge pull request #9253 from ziggie1984/fix-chanArb-deadlock
guggero Nov 20, 2024
a4195fa
Merge pull request #9288 from lightningnetwork/channel-acceptor-commi…
guggero Nov 20, 2024
a3e87cf
fn: update fn/go.mod to v2
ProofOfKeags Nov 20, 2024
5708480
lnd: stop `graphBuilder` during shutdown
yyforyongyu Nov 21, 2024
8b99d71
Merge pull request #9284 from ProofOfKeags/update/fn2
guggero Nov 21, 2024
9fc6c99
Merge pull request #9292 from yyforyongyu/fix-graphbuiler-shutdown
guggero Nov 21, 2024
c9f8d3c
sweep: add new state `TxFatal` for erroneous sweepings
yyforyongyu Apr 30, 2024
1447179
sweep: add new error `ErrZeroFeeRateDelta`
yyforyongyu Oct 25, 2024
fb4435e
sweep: add new interface method `Immediate`
yyforyongyu Oct 25, 2024
a7fd0c5
sweep: handle inputs locally instead of relying on the tx
yyforyongyu Oct 25, 2024
93758fe
sweep: add `handleInitialBroadcast` to handle initial broadcast
yyforyongyu Oct 25, 2024
9684d20
sweep: remove redundant error from `Broadcast`
yyforyongyu Apr 30, 2024
6b32e96
sweep: add method `handleBumpEventError` and fix `markInputFailed`
yyforyongyu Apr 30, 2024
2d8ad67
sweep: add method `isMature` on `SweeperInput`
yyforyongyu Apr 30, 2024
e658f8e
sweep: make sure defaultDeadline is derived from the mature height
yyforyongyu Apr 30, 2024
7dc8072
sweep: remove redundant loopvar assign
yyforyongyu Oct 25, 2024
b1ff2fa
sweep: break `initialBroadcast` into two steps
yyforyongyu Nov 7, 2024
616399e
sweep: make sure nil tx is handled
yyforyongyu Nov 7, 2024
2bbaf6a
chainio: introduce `chainio` to handle block synchronization
yyforyongyu Jun 27, 2024
dfb5827
chainio: implement `Blockbeat`
yyforyongyu Jun 27, 2024
b072822
chainio: add helper methods to dispatch beats
yyforyongyu Oct 31, 2024
a398cf2
chainio: add `BlockbeatDispatcher` to dispatch blockbeats
yyforyongyu Jun 27, 2024
8caef2d
chainio: add partial implementation of `Consumer` interface
yyforyongyu Oct 17, 2024
35dddf2
multi: implement `Consumer` on subsystems
yyforyongyu Oct 29, 2024
cf381e6
sweep: remove block subscription in `UtxoSweeper` and `TxPublisher`
yyforyongyu Jun 4, 2024
9d4a157
sweep: remove redundant notifications during shutdown
yyforyongyu Nov 18, 2024
6dd3f65
contractcourt: remove `waitForHeight` in resolvers
yyforyongyu Jun 4, 2024
bed0577
contractcourt: remove block subscription in chain arbitrator
yyforyongyu Oct 29, 2024
f092ec0
contractcourt: remove block subscription in channel arbitrator
yyforyongyu Oct 29, 2024
4ebef3f
contractcourt: remove the `immediate` param used in `Resolve`
yyforyongyu Jun 4, 2024
631e3a8
contractcourt: start channel arbitrator with blockbeat
yyforyongyu Oct 29, 2024
9960746
multi: start consumers with a starting blockbeat
yyforyongyu Oct 29, 2024
1f91b2b
lnd: add new method `startLowLevelServices`
yyforyongyu Oct 17, 2024
c01085a
lnd: start `blockbeatDispatcher` and register consumers
yyforyongyu Oct 17, 2024
58a45cd
contractcourt: fix linter `funlen`
yyforyongyu Oct 29, 2024
122f0db
multi: improve loggings
yyforyongyu May 22, 2024
1e17c5c
chainio: use `errgroup` to limit num of goroutines
yyforyongyu Nov 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions chainio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Chainio

`chainio` is a package designed to provide blockchain data access to various
subsystems within `lnd`. When a new block is received, it is encapsulated in a
`Blockbeat` object and disseminated to all registered consumers. Consumers may
receive these updates either concurrently or sequentially, based on their
registration configuration, ensuring that each subsystem maintains a
synchronized view of the current block state.

The main components include:

- `Blockbeat`: An interface that provides information about the block.

- `Consumer`: An interface that specifies how subsystems handle the blockbeat.

- `BlockbeatDispatcher`: The core service responsible for receiving each block
and distributing it to all consumers.

Additionally, the `BeatConsumer` struct provides a partial implementation of
the `Consumer` interface. This struct helps reduce code duplication, allowing
subsystems to avoid re-implementing the `ProcessBlock` method and provides a
commonly used `NotifyBlockProcessed` method.


### Register a Consumer

Consumers within the same queue are notified **sequentially**, while all queues
are notified **concurrently**. A queue consists of a slice of consumers, which
are notified in left-to-right order. Developers are responsible for determining
dependencies in block consumption across subsystems: independent subsystems
should be notified concurrently, whereas dependent subsystems should be
notified sequentially.

To notify the consumers concurrently, put them in different queues,
```go
// consumer1 and consumer2 will be notified concurrently.
queue1 := []chainio.Consumer{consumer1}
blockbeatDispatcher.RegisterQueue(consumer1)

queue2 := []chainio.Consumer{consumer2}
blockbeatDispatcher.RegisterQueue(consumer2)
```

To notify the consumers sequentially, put them in the same queue,
```go
// consumers will be notified sequentially via,
// consumer1 -> consumer2 -> consumer3
queue := []chainio.Consumer{
consumer1,
consumer2,
consumer3,
}
blockbeatDispatcher.RegisterQueue(queue)
```

### Implement the `Consumer` Interface

Implementing the `Consumer` interface is straightforward. Below is an example
of how
[`sweep.TxPublisher`](https://github.com/lightningnetwork/lnd/blob/5cec466fad44c582a64cfaeb91f6d5fd302fcf85/sweep/fee_bumper.go#L310)
implements this interface.

To start, embed the partial implementation `chainio.BeatConsumer`, which
already provides the `ProcessBlock` implementation and commonly used
`NotifyBlockProcessed` method, and exposes `BlockbeatChan` for the consumer to
receive blockbeats.

```go
type TxPublisher struct {
started atomic.Bool
stopped atomic.Bool

chainio.BeatConsumer

...
```

We should also remember to initialize this `BeatConsumer`,

```go
...
// Mount the block consumer.
tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
```

Finally, in the main event loop, read from `BlockbeatChan`, process the
received blockbeat, and, crucially, call `beat.NotifyBlockProcessed` to inform
the blockbeat dispatcher that processing is complete.

```go
for {
select {
case beat := <-t.BlockbeatChan:
// Consume this blockbeat, usually it means updating the subsystem
// using the new block data.

// Notify we've processed the block.
t.NotifyBlockProcessed(beat, nil)

...
```

### Existing Queues

Currently, we have a single queue of consumers dedicated to handling force
closures. This queue includes `ChainArbitrator`, `UtxoSweeper`, and
`TxPublisher`, with `ChainArbitrator` managing two internal consumers:
`chainWatcher` and `ChannelArbitrator`. The blockbeat flows sequentially
through the chain as follows: `ChainArbitrator => chainWatcher =>
ChannelArbitrator => UtxoSweeper => TxPublisher`. The following diagram
illustrates the flow within the public subsystems.

```mermaid
sequenceDiagram
autonumber
participant bb as BlockBeat
participant cc as ChainArb
participant us as UtxoSweeper
participant tp as TxPublisher

note left of bb: 0. received block x,<br>dispatching...

note over bb,cc: 1. send block x to ChainArb,<br>wait for its done signal
bb->>cc: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
cc->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end

note over bb,us: 2. send block x to UtxoSweeper, wait for its done signal
bb->>us: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
us->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end

note over bb,tp: 3. send block x to TxPublisher, wait for its done signal
bb->>tp: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
tp->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end
```
55 changes: 55 additions & 0 deletions chainio/blockbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package chainio

import (
"fmt"

"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainntnfs"
)

// Beat implements the Blockbeat interface. It contains the block epoch and a
// customized logger.
//
// TODO(yy): extend this to check for confirmation status - which serves as the
// single source of truth, to avoid the potential race between receiving blocks
// and `GetTransactionDetails/RegisterSpendNtfn/RegisterConfirmationsNtfn`.
type Beat struct {
// epoch is the current block epoch the blockbeat is aware of.
epoch chainntnfs.BlockEpoch

// log is the customized logger for the blockbeat which prints the
// block height.
log btclog.Logger
}

// Compile-time check to ensure Beat satisfies the Blockbeat interface.
var _ Blockbeat = (*Beat)(nil)

// NewBeat creates a new beat with the specified block epoch and a customized
// logger.
func NewBeat(epoch chainntnfs.BlockEpoch) *Beat {
b := &Beat{
epoch: epoch,
}

// Create a customized logger for the blockbeat.
logPrefix := fmt.Sprintf("Height[%6d]:", b.Height())
b.log = build.NewPrefixLog(logPrefix, clog)

return b
}

// Height returns the height of the block epoch.
//
// NOTE: Part of the Blockbeat interface.
func (b *Beat) Height() int32 {
return b.epoch.Height
}

// logger returns the logger for the blockbeat.
//
// NOTE: Part of the private blockbeat interface.
func (b *Beat) logger() btclog.Logger {
return b.log
}
28 changes: 28 additions & 0 deletions chainio/blockbeat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package chainio

import (
"errors"
"testing"

"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/stretchr/testify/require"
)

var errDummy = errors.New("dummy error")

// TestNewBeat tests the NewBeat and Height functions.
func TestNewBeat(t *testing.T) {
t.Parallel()

// Create a testing epoch.
epoch := chainntnfs.BlockEpoch{
Height: 1,
}

// Create the beat and check the internal state.
beat := NewBeat(epoch)
require.Equal(t, epoch, beat.epoch)

// Check the height function.
require.Equal(t, epoch.Height, beat.Height())
}
113 changes: 113 additions & 0 deletions chainio/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package chainio

// BeatConsumer defines a supplementary component that should be used by
// subsystems which implement the `Consumer` interface. It partially implements
// the `Consumer` interface by providing the method `ProcessBlock` such that
// subsystems don't need to re-implement it.
//
// While inheritance is not commonly used in Go, subsystems embedding this
// struct cannot pass the interface check for `Consumer` because the `Name`
// method is not implemented, which gives us a "mortise and tenon" structure.
// In addition to reducing code duplication, this design allows `ProcessBlock`
// to work on the concrete type `Beat` to access its internal states.
type BeatConsumer struct {
// BlockbeatChan is a channel to receive blocks from Blockbeat. The
// received block contains the best known height and the txns confirmed
// in this block.
BlockbeatChan chan Blockbeat

// name is the name of the consumer which embeds the BlockConsumer.
name string

// quit is a channel that closes when the BlockConsumer is shutting
// down.
//
// NOTE: this quit channel should be mounted to the same quit channel
// used by the subsystem.
quit chan struct{}

// errChan is a buffered chan that receives an error returned from
// processing this block.
errChan chan error
}

// NewBeatConsumer creates a new BlockConsumer.
func NewBeatConsumer(quit chan struct{}, name string) BeatConsumer {
// Refuse to start `lnd` if the quit channel is not initialized. We
// treat this case as if we are facing a nil pointer dereference, as
// there's no point to return an error here, which will cause the node
// to fail to be started anyway.
if quit == nil {
panic("quit channel is nil")
}

b := BeatConsumer{
BlockbeatChan: make(chan Blockbeat),
name: name,
errChan: make(chan error, 1),
quit: quit,
}

return b
}

// ProcessBlock takes a blockbeat and sends it to the consumer's blockbeat
// channel. It will send it to the subsystem's BlockbeatChan, and block until
// the processed result is received from the subsystem. The subsystem must call
// `NotifyBlockProcessed` after it has finished processing the block.
//
// NOTE: part of the `chainio.Consumer` interface.
func (b *BeatConsumer) ProcessBlock(beat Blockbeat) error {
// Update the current height.
beat.logger().Tracef("set current height for [%s]", b.name)

select {
// Send the beat to the blockbeat channel. It's expected that the
// consumer will read from this channel and process the block. Once
// processed, it should return the error or nil to the beat.Err chan.
case b.BlockbeatChan <- beat:
beat.logger().Tracef("Sent blockbeat to [%s]", b.name)

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown before sending "+
"beat", b.name)

return nil
}

// Check the consumer's err chan. We expect the consumer to call
// `beat.NotifyBlockProcessed` to send the error back here.
select {
case err := <-b.errChan:
beat.logger().Debugf("[%s] processed beat: err=%v", b.name, err)

return err

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown", b.name)
}

return nil
}

// NotifyBlockProcessed signals that the block has been processed. It takes the
// blockbeat being processed and an error resulted from processing it. This
// error is then sent back to the consumer's err chan to unblock
// `ProcessBlock`.
//
// NOTE: This method must be called by the subsystem after it has finished
// processing the block.
func (b *BeatConsumer) NotifyBlockProcessed(beat Blockbeat, err error) {
// Update the current height.
beat.logger().Debugf("[%s]: notifying beat processed", b.name)

select {
case b.errChan <- err:
beat.logger().Debugf("[%s]: notified beat processed, err=%v",
b.name, err)

case <-b.quit:
beat.logger().Debugf("[%s] received shutdown before notifying "+
"beat processed", b.name)
}
}
Loading
Loading