Skip to content

Commit

Permalink
htlcswitch: add an always on mode to interceptable switch
Browse files Browse the repository at this point in the history
Co-authored-by: Juan Pablo Civile <[email protected]>
  • Loading branch information
joostjager and champo committed Mar 17, 2022
1 parent 169f0c0 commit ae314ec
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 54 deletions.
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ type Config struct {

RejectHTLC bool `long:"rejecthtlc" description:"If true, lnd will not forward any HTLCs that are meant as onward payments. This option will still allow lnd to send HTLCs and receive HTLCs but lnd won't be used as a hop."`

// RequireInterceptor determines whether the HTLC interceptor is
// registered regardless of whether the RPC is called or not.
RequireInterceptor bool `long:"requireinterceptor" description:"Whether to always intercept HTLCs, even if no stream is attached"`

StaggerInitialReconnect bool `long:"stagger-initial-reconnect" description:"If true, will apply a randomized staggering between 0s and 30s when reconnecting to persistent peers on startup. The first 10 reconnections will be attempted instantly, regardless of the flag's value"`

MaxOutgoingCltvExpiry uint32 `long:"max-cltv-expiry" description:"The maximum number of blocks funds could be locked up for when forwarding payments."`
Expand Down
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.15.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@
change, it allows encrypted failure messages to be returned to the sender.
Additionally it is possible to signal a malformed htlc.

* Add an [always on](https://github.com/lightningnetwork/lnd/pull/6232) mode to
the HTLC interceptor API. This enables interception applications where every
packet must be intercepted.

## Database

* [Add ForAll implementation for etcd to speed up
Expand Down
68 changes: 60 additions & 8 deletions htlcswitch/interceptable_switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type InterceptableSwitch struct {
// client connect and disconnect.
interceptorRegistration chan ForwardInterceptor

// requireInterceptor indicates whether processing should block if no
// interceptor is connected.
requireInterceptor bool

// interceptor is the handler for intercepted packets.
interceptor ForwardInterceptor

Expand All @@ -58,6 +62,7 @@ type InterceptableSwitch struct {
type interceptedPackets struct {
packets []*htlcPacket
linkQuit chan struct{}
isReplay bool
}

// FwdAction defines the various resolution types.
Expand Down Expand Up @@ -101,13 +106,16 @@ type fwdResolution struct {
}

// NewInterceptableSwitch returns an instance of InterceptableSwitch.
func NewInterceptableSwitch(s *Switch) *InterceptableSwitch {
func NewInterceptableSwitch(s *Switch,
requireInterceptor bool) *InterceptableSwitch {

return &InterceptableSwitch{
htlcSwitch: s,
intercepted: make(chan *interceptedPackets),
interceptorRegistration: make(chan ForwardInterceptor),
holdForwards: make(map[channeldb.CircuitKey]InterceptedForward),
resolutionChan: make(chan *fwdResolution),
requireInterceptor: requireInterceptor,

quit: make(chan struct{}),
}
Expand Down Expand Up @@ -155,9 +163,7 @@ func (s *InterceptableSwitch) run() {
case packets := <-s.intercepted:
var notIntercepted []*htlcPacket
for _, p := range packets.packets {
if s.interceptor == nil ||
!s.interceptForward(p) {

if !s.interceptForward(p, packets.isReplay) {
notIntercepted = append(
notIntercepted, p,
)
Expand All @@ -178,7 +184,6 @@ func (s *InterceptableSwitch) run() {
}
}
}

func (s *InterceptableSwitch) sendForward(fwd InterceptedForward) {
err := s.interceptor(fwd.Packet())
if err != nil {
Expand All @@ -191,12 +196,28 @@ func (s *InterceptableSwitch) sendForward(fwd InterceptedForward) {
func (s *InterceptableSwitch) setInterceptor(interceptor ForwardInterceptor) {
s.interceptor = interceptor

// Replay all currently held htlcs. When an interceptor is not required,
// there may be none because they've been cleared after the previous
// disconnect.
if interceptor != nil {
log.Debugf("Interceptor connected")

for _, fwd := range s.holdForwards {
s.sendForward(fwd)
}

return
}

// The interceptor disconnects. If an interceptor is required, keep the
// held htlcs.
if s.requireInterceptor {
log.Infof("Interceptor disconnected, retaining held packets")

return
}

// Interceptor is not required. Release held forwards.
log.Infof("Interceptor disconnected, resolving held packets")

for _, fwd := range s.holdForwards {
Expand Down Expand Up @@ -260,7 +281,7 @@ func (s *InterceptableSwitch) Resolve(res *FwdResolution) error {
// interceptor. If the interceptor signals the resume action, the htlcs are
// forwarded to the switch. The link's quit signal should be provided to allow
// cancellation of forwarding during link shutdown.
func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{},
func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{}, isReplay bool,
packets ...*htlcPacket) error {

// Synchronize with the main event loop. This should be light in the
Expand All @@ -269,6 +290,7 @@ func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{},
case s.intercepted <- &interceptedPackets{
packets: packets,
linkQuit: linkQuit,
isReplay: isReplay,
}:

case <-linkQuit:
Expand All @@ -283,7 +305,15 @@ func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{},

// interceptForward forwards the packet to the external interceptor after
// checking the interception criteria.
func (s *InterceptableSwitch) interceptForward(packet *htlcPacket) bool {
func (s *InterceptableSwitch) interceptForward(packet *htlcPacket,
isReplay bool) bool {

// Process normally if an interceptor is not required and not
// registered.
if !s.requireInterceptor && s.interceptor == nil {
return false
}

switch htlc := packet.htlc.(type) {
case *lnwire.UpdateAddHTLC:
// We are not interested in intercepting initiated payments.
Expand All @@ -307,9 +337,31 @@ func (s *InterceptableSwitch) interceptForward(packet *htlcPacket) bool {
htlcSwitch: s.htlcSwitch,
}

if s.interceptor == nil && !isReplay {
// There is no interceptor registered, we are in
// interceptor-required mode, and this is a new packet
//
// Because the interceptor has never seen this packet
// yet, it is still safe to fail back. This limits the
// backlog of htlcs when the interceptor is down.
err := intercepted.FailWithCode(
lnwire.CodeTemporaryChannelFailure,
)
if err != nil {
log.Errorf("Cannot fail packet: %v", err)
}

return true
}

s.holdForwards[inKey] = intercepted

s.sendForward(intercepted)
// If there is no interceptor registered, we must be in
// interceptor-required mode. The packet is kept in the queue
// until the interceptor registers itself.
if s.interceptor != nil {
s.sendForward(intercepted)
}

return true

Expand Down
18 changes: 11 additions & 7 deletions htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type ChannelLinkConfig struct {
// switch. The function returns and error in case it fails to send one or
// more packets. The link's quit signal should be provided to allow
// cancellation of forwarding during link shutdown.
ForwardPackets func(chan struct{}, ...*htlcPacket) error
ForwardPackets func(chan struct{}, bool, ...*htlcPacket) error

// DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
// blobs, which are then used to inform how to forward an HTLC.
Expand Down Expand Up @@ -1720,7 +1720,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
l.uncommittedPreimages = append(l.uncommittedPreimages, pre)

// Pipeline this settle, send it to the switch.
go l.forwardBatch(settlePacket)
go l.forwardBatch(false, settlePacket)

case *lnwire.UpdateFailMalformedHTLC:
// Convert the failure type encoded within the HTLC fail
Expand Down Expand Up @@ -2744,7 +2744,7 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,

// Only spawn the task forward packets we have a non-zero number.
if len(switchPackets) > 0 {
go l.forwardBatch(switchPackets...)
go l.forwardBatch(false, switchPackets...)
}
}

Expand Down Expand Up @@ -3043,14 +3043,17 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
return
}

l.log.Debugf("forwarding %d packets to switch", len(switchPackets))
replay := fwdPkg.State != channeldb.FwdStateLockedIn

l.log.Debugf("forwarding %d packets to switch: replay=%v",
len(switchPackets), replay)

// NOTE: This call is made synchronous so that we ensure all circuits
// are committed in the exact order that they are processed in the link.
// Failing to do this could cause reorderings/gaps in the range of
// opened circuits, which violates assumptions made by the circuit
// trimming.
l.forwardBatch(switchPackets...)
l.forwardBatch(replay, switchPackets...)
}

// processExitHop handles an htlc for which this link is the exit hop. It
Expand Down Expand Up @@ -3184,7 +3187,7 @@ func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
// err chan for the individual responses. This method is intended to be spawned
// as a goroutine so the responses can be handled in the background.
func (l *channelLink) forwardBatch(packets ...*htlcPacket) {
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
// Don't forward packets for which we already have a response in our
// mailbox. This could happen if a packet fails and is buffered in the
// mailbox, and the incoming link flaps.
Expand All @@ -3197,7 +3200,8 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) {
filteredPkts = append(filteredPkts, pkt)
}

if err := l.cfg.ForwardPackets(l.quit, filteredPkts...); err != nil {
err := l.cfg.ForwardPackets(l.quit, replay, filteredPkts...)
if err != nil {
log.Errorf("Unhandled error while reforwarding htlc "+
"settle/fail over htlcswitch: %v", err)
}
Expand Down
30 changes: 17 additions & 13 deletions htlcswitch/link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1940,12 +1940,14 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
// the firing via force feeding.
bticker := ticker.NewForce(time.Hour)
aliceCfg := ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
Peer: alicePeer,
Switch: aliceSwitch,
BestHeight: aliceSwitch.BestHeight,
Circuits: aliceSwitch.CircuitModifier(),
ForwardPackets: aliceSwitch.ForwardPackets,
FwrdingPolicy: globalPolicy,
Peer: alicePeer,
Switch: aliceSwitch,
BestHeight: aliceSwitch.BestHeight,
Circuits: aliceSwitch.CircuitModifier(),
ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error {
return aliceSwitch.ForwardPackets(linkQuit, packets...)
},
DecodeHopIterators: decoder.DecodeHopIterators,
ExtractErrorEncrypter: func(*btcec.PublicKey) (
hop.ErrorEncrypter, lnwire.FailCode) {
Expand Down Expand Up @@ -4491,12 +4493,14 @@ func (h *persistentLinkHarness) restartLink(
// the firing via force feeding.
bticker := ticker.NewForce(time.Hour)
aliceCfg := ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
Peer: alicePeer,
Switch: aliceSwitch,
BestHeight: aliceSwitch.BestHeight,
Circuits: aliceSwitch.CircuitModifier(),
ForwardPackets: aliceSwitch.ForwardPackets,
FwrdingPolicy: globalPolicy,
Peer: alicePeer,
Switch: aliceSwitch,
BestHeight: aliceSwitch.BestHeight,
Circuits: aliceSwitch.CircuitModifier(),
ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error {
return aliceSwitch.ForwardPackets(linkQuit, packets...)
},
DecodeHopIterators: decoder.DecodeHopIterators,
ExtractErrorEncrypter: func(*btcec.PublicKey) (
hop.ErrorEncrypter, lnwire.FailCode) {
Expand Down Expand Up @@ -6694,7 +6698,7 @@ func TestPipelineSettle(t *testing.T) {
// erroneously forwarded. If the forwardChan is closed before the last
// step, then the test will fail.
forwardChan := make(chan struct{})
fwdPkts := func(c chan struct{}, hp ...*htlcPacket) error {
fwdPkts := func(c chan struct{}, _ bool, hp ...*htlcPacket) error {
close(forwardChan)
return nil
}
Expand Down
Loading

0 comments on commit ae314ec

Please sign in to comment.