Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create block filters before poll interval #152

Merged
merged 13 commits into from
Sep 6, 2024
Prev Previous commit
Next Next commit
Correct failing tests
Signed-off-by: rodion <rodion.lim@partior.com>
  • Loading branch information
rodion-lim-partior committed Sep 3, 2024
commit 103176282cfb4fdf1772fd35eee059d5e4468b8d
61 changes: 53 additions & 8 deletions internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type blockUpdateConsumer struct {
updates chan<- *ffcapi.BlockHashEvent
}

type blockFilterStatus struct {
isEstablished bool
signal chan struct{}
mux sync.Mutex
}

// blockListener has two functions:
// 1) To establish and keep track of what the head block height of the blockchain is, so event streams know how far from the head they are
// 2) To feed new block information to any registered consumers
Expand All @@ -49,7 +55,7 @@ type blockListener struct {
backend rpcbackend.RPC
wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected
listenLoopDone chan struct{}
blockFilterEstablished chan struct{}
blockFilterEstablished blockFilterStatus
initialBlockHeightObtained chan struct{}
newHeadsTap chan struct{}
newHeadsSub rpcbackend.Subscription
Expand All @@ -74,7 +80,7 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
ctx: log.WithLogField(ctx, "role", "blocklistener"),
c: c,
backend: c.backend, // use the HTTP backend - might get overwritten by a connected websocket later
blockFilterEstablished: make(chan struct{}),
blockFilterEstablished: blockFilterStatus{isEstablished: false, signal: make(chan struct{})},
initialBlockHeightObtained: make(chan struct{}),
newHeadsTap: make(chan struct{}),
highestBlock: -1,
Expand All @@ -94,6 +100,34 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
return bl, nil
}

// setting block filter status updates that new block filter has been created
func (bl *blockListener) setBlockFilterStatus() {
bl.blockFilterEstablished.mux.Lock()
defer bl.blockFilterEstablished.mux.Unlock()
if !bl.blockFilterEstablished.isEstablished {
bl.blockFilterEstablished.isEstablished = true
close(bl.blockFilterEstablished.signal)
}
}

// un-setting block filter status updates that no block filter is present on block listener
func (bl *blockListener) unsetBlockFilterStatus() {
bl.blockFilterEstablished.mux.Lock()
defer bl.blockFilterEstablished.mux.Unlock()
if bl.blockFilterEstablished.isEstablished {
bl.blockFilterEstablished.isEstablished = false
bl.blockFilterEstablished.signal = make(chan struct{})
}
}

func (bl *blockListener) blockTillBlockFilterEstablished(ctx context.Context) {
select {
case <-bl.blockFilterEstablished.signal:
case <-bl.ctx.Done():
case <-ctx.Done():
}
}

func (bl *blockListener) newHeadsSubListener() {
for range bl.newHeadsSub.Notifications() {
select {
Expand Down Expand Up @@ -156,6 +190,9 @@ func (bl *blockListener) establishBlockHeightWithRetry() error {

func (bl *blockListener) listenLoop() {
defer close(bl.listenLoopDone)
defer func() {
bl.unsetBlockFilterStatus()
}()

err := bl.establishBlockHeightWithRetry()
close(bl.initialBlockHeightObtained)
Expand Down Expand Up @@ -195,7 +232,7 @@ func (bl *blockListener) listenLoop() {
failCount++
continue
} else {
close(bl.blockFilterEstablished)
bl.setBlockFilterStatus()
}
}

Expand Down Expand Up @@ -481,24 +518,28 @@ func (bl *blockListener) dispatchToConsumers(consumers []*blockUpdateConsumer, u
}
}

func (bl *blockListener) checkStartedLocked() {
func (bl *blockListener) checkStartedLocked(ctx context.Context) {
if bl.listenLoopDone == nil {
bl.listenLoopDone = make(chan struct{})
go bl.listenLoop()
}
<-bl.blockFilterEstablished

bl.blockTillBlockFilterEstablished(ctx)
}

func (bl *blockListener) addConsumer(c *blockUpdateConsumer) {
bl.mux.Lock()
defer bl.mux.Unlock()
bl.checkStartedLocked()
bl.consumers[*c.id] = c
bl.mux.Unlock()
bl.checkStartedLocked(context.Background())
}

func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) {
bl.checkStartedLocked(ctx)
if err := ctx.Err(); err != nil {
return -1, false
}
bl.mux.Lock()
bl.checkStartedLocked()
highestBlock := bl.highestBlock
bl.mux.Unlock()
// if not yet initialized, wait to be initialized
Expand All @@ -512,6 +553,10 @@ func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) {
}
bl.mux.Lock()
highestBlock = bl.highestBlock
if bl.highestBlock == -1 {
// handle edge case when bl.initialBlockHeightObtained channel is closed
return -1, false
}
bl.mux.Unlock()
log.L(ctx).Debugf("ChainHead=%d", highestBlock)
return highestBlock, true
Expand Down
Loading