Skip to content

Commit

Permalink
Ziga/og deduplicate events (#1628)
Browse files Browse the repository at this point in the history
  • Loading branch information
zkokelj authored Nov 2, 2023
1 parent a352bed commit 368f2da
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 5 deletions.
6 changes: 2 additions & 4 deletions integration/obscurogateway/obscurogateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,9 @@ func testMultipleAccountsSubscription(t *testing.T, httpURL, wsURL string, w wal
// user0 should see two lifecycle events (1 for each interaction with setMessage2)
assert.Equal(t, 2, len(user0logs))
// user1 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage
// TODO: 5 events as expected (2*2 lifecycle events + 1 event specific to an address0 - change after deduplication
assert.Equal(t, 5, len(user1logs))
assert.Equal(t, 3, len(user1logs))
// user2 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage
// TODO: 5 events as expected (2*2 lifecycle events + 1 event specific to an address0 - change after deduplication
assert.Equal(t, 5, len(user2logs))
assert.Equal(t, 3, len(user2logs))
}

func testAreTxsMinted(t *testing.T, httpURL, wsURL string, w wallet.Wallet) { //nolint: unused
Expand Down
1 change: 1 addition & 0 deletions tools/walletextension/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
APIVersion1 = "/v1"
MethodEthSubscription = "eth_subscription"
PathVersion = "/version/"
DeduplicationBufferSize = 20
)

var ReaderHeadTimeout = 10 * time.Second
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package subscriptions

import "github.com/ethereum/go-ethereum/common"

// LogKey uniquely represents a log (consists of BlockHash, TxHash, and Index)
type LogKey struct {
BlockHash common.Hash // Not necessary, but can be helpful in edge case of block reorg.
TxHash common.Hash
Index uint
}

// CircularBuffer is a data structure that uses a single, fixed-size buffer as if it was connected end-to-end.
type CircularBuffer struct {
data []LogKey
size int
end int
}

// NewCircularBuffer initializes a new CircularBuffer of the given size.
func NewCircularBuffer(size int) *CircularBuffer {
return &CircularBuffer{
data: make([]LogKey, size),
size: size,
end: 0,
}
}

// Push adds a new LogKey to the end of the buffer. If the buffer is full,
// it overwrites the oldest data with the new LogKey.
func (cb *CircularBuffer) Push(key LogKey) {
cb.data[cb.end] = key
cb.end = (cb.end + 1) % cb.size
}

// Contains checks if the given LogKey exists in the buffer
func (cb *CircularBuffer) Contains(key LogKey) bool {
for _, item := range cb.data {
if item == key {
return true
}
}
return false
}
17 changes: 16 additions & 1 deletion tools/walletextension/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func (sm *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req
go readFromChannelAndWriteToUserConn(funnelMultipleAccountsChan, userConn, userSubscriptionID, sm.logger)

// iterate over all clients and subscribe for each of them
// TODO: currently we use only first client (enabling subscriptions for all of them will be part of future PR)
for _, client := range clients {
subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, funnelMultipleAccountsChan, req.Params...)
if err != nil {
Expand All @@ -75,13 +74,29 @@ func (sm *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req
}

func readFromChannelAndWriteToUserConn(channel chan common.IDAndLog, userConn userconn.UserConn, userSubscriptionID gethrpc.ID, logger gethlog.Logger) {
buffer := NewCircularBuffer(wecommon.DeduplicationBufferSize)
for data := range channel {
// create unique identifier for current log
uniqueLogKey := LogKey{
BlockHash: data.Log.BlockHash,
TxHash: data.Log.TxHash,
Index: data.Log.Index,
}

// check if the current event is a duplicate (and skip it if it is)
if buffer.Contains(uniqueLogKey) {
continue
}

jsonResponse, err := prepareLogResponse(data, userSubscriptionID)
if err != nil {
logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, data.SubID, log.ErrKey, err)
continue
}

// the current log is unique, and we want to add it to our buffer and proceed with forwarding to the user
buffer.Push(uniqueLogKey)

logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, data.SubID)
err = userConn.WriteResponse(jsonResponse)
if err != nil {
Expand Down

0 comments on commit 368f2da

Please sign in to comment.