Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tudor-malene committed Mar 14, 2024
1 parent d6a9922 commit a6025a3
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 54 deletions.
6 changes: 6 additions & 0 deletions integration/obscurogateway/tengateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,12 @@ func testSubscriptionTopics(t *testing.T, httpURL, wsURL string, w wallet.Wallet
contractReceipt, err := integrationCommon.AwaitReceiptEth(context.Background(), user0.HTTPClient, signedTx.Hash(), time.Minute)
require.NoError(t, err)

tx, _, err := user0.HTTPClient.TransactionByHash(context.Background(), signedTx.Hash())
if err != nil {
return
}
require.Equal(t, signedTx.Hash(), tx.Hash())

// user0 subscribes to all events from that smart contract, user1 only an event with a topic of his first account
var user0logs []types.Log
var user1logs []types.Log
Expand Down
1 change: 1 addition & 0 deletions lib/gethfork/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
if err != nil {
if err != io.EOF {
resp := errorMessage(&invalidMessageError{"parse error"})
println(err)
codec.writeJSON(ctx, resp, true)
}
return
Expand Down
3 changes: 1 addition & 2 deletions tools/walletextension/lib/client_lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,5 @@ func (o *TGLib) HTTP() string {
}

func (o *TGLib) WS() string {
// todo - add /v1
return fmt.Sprintf("%s/?token=%s", o.wsURL, o.userID)
return fmt.Sprintf("%s/v1/?token=%s", o.wsURL, o.userID)
}
160 changes: 108 additions & 52 deletions tools/walletextension/rpcapi/filter_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"sync/atomic"
"time"

gethcommon "github.com/ethereum/go-ethereum/common"
Expand All @@ -28,68 +29,103 @@ func (api *FilterAPI) NewPendingTransactionFilter(_ *bool) rpc.ID {
return "not supported"
}

/*
func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
// not supported
return nil, nil
}
/*func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
// not supported
return nil, nil
}
*/

func (api *FilterAPI) NewBlockFilter() rpc.ID {
// not implemented
return ""
}

/*func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
// not implemented
return nil, nil
}
/*
func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
// not implemented
return nil, nil
}
*/
// todo - unsubscribe
func (api *FilterAPI) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) {
not, ok := rpc.NotifierFromContext(ctx)
if !ok {
return nil, fmt.Errorf("invalid subscription")
}

uid, err := wecommon.GetUserIDbyte(not.UserID)
subNotifier, user, err := getUserAndNotifier(ctx, api)
if err != nil {
return nil, fmt.Errorf("invald token: %s, %w", not.UserID, err)
}

user, err := getUser(uid, api.we.Storage)

notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, fmt.Errorf("creation of subscriptions is not supported")
return nil, err
}
subscription := notifier.CreateSubscription()

// determine the accounts to use for the backend subscriptions
candidateAddresses := user.GetAllAddresses()
if len(candidateAddresses) > 1 {
candidateAddresses = searchForAddressInFilterCriteria(crit, user.GetAllAddresses())
// when we can't determine which addresses to use based on the criteria, use all of them
if len(candidateAddresses) == 0 {
candidateAddresses = user.GetAllAddresses()
}
}

inputChannels := make([]chan common.IDAndLog, 0)
clientSubscriptions := make([]*rpc.ClientSubscription, 0)
backendSubscriptions := make([]*rpc.ClientSubscription, 0)
for _, address := range candidateAddresses {
rpcWSClient, err := user.accounts[*address].connect(api.we.HostAddrWS, api.we.Logger())
if err != nil {
return nil, err
}

inCh := make(chan common.IDAndLog)
inputChannels = append(inputChannels, inCh)
clientSubscription, err := rpcWSClient.Subscribe(ctx, nil, "eth", inCh, "logs", crit)
backendSubscription, err := rpcWSClient.Subscribe(ctx, nil, "eth", inCh, "logs", crit)
if err != nil {
return nil, err
}
clientSubscriptions = append(clientSubscriptions, clientSubscription)

inputChannels = append(inputChannels, inCh)
backendSubscriptions = append(backendSubscriptions, backendSubscription)
}
go forwardMsgs(inputChannels, clientSubscriptions, subscription, notifier)

dedupeBuffer := NewCircularBuffer(wecommon.DeduplicationBufferSize)
subscription := subNotifier.CreateSubscription()

unsubscribed := atomic.Bool{}
go forwardAndDedupe(inputChannels, backendSubscriptions, subscription, subNotifier, &unsubscribed, func(data common.IDAndLog) *types.Log {
uniqueLogKey := LogKey{
BlockHash: data.Log.BlockHash,
TxHash: data.Log.TxHash,
Index: data.Log.Index,
}

if !dedupeBuffer.Contains(uniqueLogKey) {
dedupeBuffer.Push(uniqueLogKey)
return data.Log
}
return nil
})

go handleUnsubscribe(subscription, backendSubscriptions, &unsubscribed)

return subscription, err
}

func getUserAndNotifier(ctx context.Context, api *FilterAPI) (*rpc.Notifier, *GWUser, error) {
subNotifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, nil, fmt.Errorf("creation of subscriptions is not supported")
}

// todo - we might want to allow access to public logs
if len(subNotifier.UserID) == 0 {
return nil, nil, fmt.Errorf("illegal access")
}

uid, err := wecommon.GetUserIDbyte(subNotifier.UserID)
if err != nil {
return nil, nil, fmt.Errorf("invald token: %s, %w", subNotifier.UserID, err)
}

user, err := getUser(uid, api.we.Storage)
if err != nil {
return nil, nil, fmt.Errorf("illegal access: %s, %w", subNotifier.UserID, err)
}
return subNotifier, user, nil
}

func searchForAddressInFilterCriteria(filterCriteria filters.FilterCriteria, possibleAddresses []*gethcommon.Address) []*gethcommon.Address {
result := make([]*gethcommon.Address, 0)
addrMap := toMap(possibleAddresses)
Expand All @@ -104,41 +140,60 @@ func searchForAddressInFilterCriteria(filterCriteria filters.FilterCriteria, pos
return result
}

// todo - comment
func forwardMsgs(inputChannels []chan common.IDAndLog, _ []*rpc.ClientSubscription, outSub *rpc.Subscription, notifier *rpc.Notifier) {
buffer := NewCircularBuffer(wecommon.DeduplicationBufferSize)
cases := make([]reflect.SelectCase, len(inputChannels))
// forwardAndDedupe - reads messages from the input channel, and forwards them to the notifier only if they are new
func forwardAndDedupe[R any, T any](inputChannels []chan R, _ []*rpc.ClientSubscription, outSub *rpc.Subscription, notifier *rpc.Notifier, unsubscribed *atomic.Bool, toForward func(elem R) *T) {
inputCases := make([]reflect.SelectCase, len(inputChannels)+1)

// create a ticker to handle cleanup
inputCases[0] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(time.NewTicker(10 * time.Second).C),
}

// create a select "case" for each input channel
for i, ch := range inputChannels {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
inputCases[i+1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
remaining := len(cases)
for remaining > 0 {
chosen, value, ok := reflect.Select(cases)

unclosedInputChannels := len(inputCases)
for unclosedInputChannels > 0 {
chosen, value, ok := reflect.Select(inputCases)
if !ok {
// The chosen channel has been closed, so zero out the channel to disable the case
cases[chosen].Chan = reflect.ValueOf(nil)
remaining--
inputCases[chosen].Chan = reflect.ValueOf(nil)
unclosedInputChannels--
continue
}

data := value.Interface().(common.IDAndLog)
uniqueLogKey := LogKey{
BlockHash: data.Log.BlockHash,
TxHash: data.Log.TxHash,
Index: data.Log.Index,
}

// check if the current event is a duplicate (and skip it if it is)
if !buffer.Contains(uniqueLogKey) {
buffer.Push(uniqueLogKey)
err := notifier.Notify(outSub.ID, data.Log)
if err != nil {
switch v := value.Interface().(type) {
case time.Time:
// exit the loop
if unsubscribed.Load() {
return
}
case R:
valueToSubmit := toForward(v)
if valueToSubmit != nil {
err := notifier.Notify(outSub.ID, *valueToSubmit)
if err != nil {
return
}
}
default:
// unexpected element received
continue
}
}
}

func handleUnsubscribe(connectionSub *rpc.Subscription, backendSubscriptions []*rpc.ClientSubscription, unsubscribed *atomic.Bool) {
<-connectionSub.Err()
for _, backendSub := range backendSubscriptions {
backendSub.Unsubscribe()
unsubscribed.Store(true)
}
}

/*
func (api *FilterAPI) NewFilter(crit filters.FilterCriteria) (rpc.ID, error) {
// not implemented
Expand All @@ -153,6 +208,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit filters.FilterCriteria)
&ExecCfg{
cacheCfg: &CacheCfg{
TTLCallback: func() time.Duration {
// when the toBlock is not specified, the request is open-ended
if crit.ToBlock != nil {
return longCacheTTL
}
Expand Down

0 comments on commit a6025a3

Please sign in to comment.