diff --git a/.github/workflows/manual-deploy-testnet-validator.yml b/.github/workflows/manual-deploy-testnet-validator.yml index cc2222787a..6f2fbb6520 100644 --- a/.github/workflows/manual-deploy-testnet-validator.yml +++ b/.github/workflows/manual-deploy-testnet-validator.yml @@ -1,4 +1,4 @@ -# Deploys an Ten validator on Azure for Testnet +# Deploys a Ten validator on Azure for Testnet # # This script uses GitHub Environments for variables (vars) and secrets - these are configured on GitHub and # the environments match the input.testnet_type options @@ -186,6 +186,7 @@ jobs: -max_batch_interval=${{ vars.L2_MAX_BATCH_INTERVAL }} \ -rollup_interval=${{ vars.L2_ROLLUP_INTERVAL }} \ -l1_chain_id=${{ vars.L1_CHAIN_ID }} \ + -postgres_db_host=postgres://tenuser:${{ secrets.TEN_POSTGRES_USER_PWD }}@postgres-ten-${{ github.event.inputs.testnet_type }}.postgres.database.azure.com:5432/ \ start' diff --git a/.github/workflows/manual-upgrade-testnet-l2.yml b/.github/workflows/manual-upgrade-testnet-l2.yml index c63a697d24..ecc8e05254 100644 --- a/.github/workflows/manual-upgrade-testnet-l2.yml +++ b/.github/workflows/manual-upgrade-testnet-l2.yml @@ -177,6 +177,7 @@ jobs: -max_batch_interval=${{ vars.L2_MAX_BATCH_INTERVAL }} \ -rollup_interval=${{ vars.L2_ROLLUP_INTERVAL }} \ -l1_chain_id=${{ vars.L1_CHAIN_ID }} \ + -postgres_db_host=postgres://tenuser:${{ secrets.TEN_POSTGRES_USER_PWD }}@postgres-ten-${{ github.event.inputs.testnet_type }}.postgres.database.azure.com:5432/ \ upgrade' check-obscuro-is-healthy: diff --git a/go/common/subscription/new_heads_manager.go b/go/common/subscription/new_heads_manager.go index 13e6b6671a..b39ebc21ef 100644 --- a/go/common/subscription/new_heads_manager.go +++ b/go/common/subscription/new_heads_manager.go @@ -5,6 +5,7 @@ import ( "math/big" "sync" "sync/atomic" + "time" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -20,7 +21,7 @@ import ( // also handles unsubscribe // Note: this is a service which must be Started and Stopped type NewHeadsService struct { - inputCh chan *common.BatchHeader + connectFunc func() (chan *common.BatchHeader, <-chan error, error) convertToEthHeader bool notifiersMutex *sync.RWMutex newHeadNotifiers map[rpc.ID]*rpc.Notifier @@ -29,9 +30,10 @@ type NewHeadsService struct { logger gethlog.Logger } -func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService { +// connect - function that returns the input channel +func NewNewHeadsService(connect func() (chan *common.BatchHeader, <-chan error, error), convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService { return &NewHeadsService{ - inputCh: inputCh, + connectFunc: connect, convertToEthHeader: convertToEthHeader, onMessage: onMessage, logger: logger, @@ -42,38 +44,68 @@ func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader boo } func (nhs *NewHeadsService) Start() error { - go ForwardFromChannels([]chan *common.BatchHeader{nhs.inputCh}, nhs.stopped, func(head *common.BatchHeader) error { - if nhs.onMessage != nil { - err := nhs.onMessage(head) - if err != nil { - nhs.logger.Info("failed invoking onMessage callback.", log.ErrKey, err) - } - } + nhs.reconnect() + return nil +} + +func (nhs *NewHeadsService) reconnect() { + // reconnect to the backend and restart the listening + newCh, errCh, err := nhs.connectFunc() + if err != nil { + nhs.logger.Crit("could not connect to new heads: ", log.ErrKey, err) + } + nhs._subscribe(newCh, errCh) +} + +func (nhs *NewHeadsService) _subscribe(inputCh chan *common.BatchHeader, errChan <-chan error) { + backedUnsub := &atomic.Bool{} + go HandleUnsubscribeErrChan([]<-chan error{errChan}, func() { + backedUnsub.Store(true) + }) + go ForwardFromChannels( + []chan *common.BatchHeader{inputCh}, + func(head *common.BatchHeader) error { + return nhs.onNewBatch(head) + }, + func() { + nhs.logger.Info("Disconnected from new head subscription. Reconnecting...") + nhs.reconnect() + }, + backedUnsub, + nhs.stopped, + 2*time.Minute, // todo - create constant + nhs.logger, + ) +} - var msg any = head - if nhs.convertToEthHeader { - msg = convertBatchHeader(head) +func (nhs *NewHeadsService) onNewBatch(head *common.BatchHeader) error { + if nhs.onMessage != nil { + err := nhs.onMessage(head) + if err != nil { + nhs.logger.Info("failed invoking onMessage callback.", log.ErrKey, err) } + } + + var msg any = head + if nhs.convertToEthHeader { + msg = convertBatchHeader(head) + } - nhs.notifiersMutex.RLock() - defer nhs.notifiersMutex.RUnlock() - - // for each new head, notify all registered subscriptions - for id, notifier := range nhs.newHeadNotifiers { - if nhs.stopped.Load() { - return nil - } - err := notifier.Notify(id, msg) - if err != nil { - // on error, remove the notification - nhs.logger.Info("failed to notify newHead subscription", log.ErrKey, err, log.SubIDKey, id) - nhs.notifiersMutex.Lock() - delete(nhs.newHeadNotifiers, id) - nhs.notifiersMutex.Unlock() - } + nhs.notifiersMutex.Lock() + defer nhs.notifiersMutex.Unlock() + + // for each new head, notify all registered subscriptions + for id, notifier := range nhs.newHeadNotifiers { + if nhs.stopped.Load() { + return nil } - return nil - }) + err := notifier.Notify(id, msg) + if err != nil { + // on error, remove the notification + nhs.logger.Info("failed to notify newHead subscription", log.ErrKey, err, log.SubIDKey, id) + delete(nhs.newHeadNotifiers, id) + } + } return nil } @@ -82,7 +114,7 @@ func (nhs *NewHeadsService) RegisterNotifier(notifier *rpc.Notifier, subscriptio defer nhs.notifiersMutex.Unlock() nhs.newHeadNotifiers[subscription.ID] = notifier - go HandleUnsubscribe(subscription, nil, func() { + go HandleUnsubscribe(subscription, func() { nhs.notifiersMutex.Lock() defer nhs.notifiersMutex.Unlock() delete(nhs.newHeadNotifiers, subscription.ID) diff --git a/go/common/subscription/utils.go b/go/common/subscription/utils.go index 81b02aaf20..cbd055f580 100644 --- a/go/common/subscription/utils.go +++ b/go/common/subscription/utils.go @@ -5,19 +5,24 @@ import ( "sync/atomic" "time" + gethlog "github.com/ethereum/go-ethereum/log" + "github.com/ten-protocol/go-ten/go/common/log" + "github.com/ten-protocol/go-ten/lib/gethfork/rpc" ) -// ForwardFromChannels - reads messages from the input channels, and calls the `onMessage` callback. -// Exits when the unsubscribed flag is true. +// ForwardFromChannels - reads messages from all input channels, and calls the `onMessage` callback. +// Exits when the "stopped" flag is true or when the connection times out. // Must be called as a go routine! -func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Bool, onMessage func(R) error) { +func ForwardFromChannels[R any](inputChannels []chan R, onMessage func(R) error, onBackendDisconnect func(), backendDisconnected *atomic.Bool, stopped *atomic.Bool, timeoutInterval time.Duration, logger gethlog.Logger) { inputCases := make([]reflect.SelectCase, len(inputChannels)+1) - // create a ticker to handle cleanup, check the "unsubscribed" flag and exit the goroutine + cleanupTicker := time.NewTicker(1 * time.Second) + defer cleanupTicker.Stop() + // create a ticker to handle cleanup, check the "stopped" flag and exit the goroutine inputCases[0] = reflect.SelectCase{ Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(time.NewTicker(2 * time.Second).C), + Chan: reflect.ValueOf(cleanupTicker.C), } // create a select "case" for each input channel @@ -25,42 +30,65 @@ func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Boo inputCases[i+1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} } - unclosedInputChannels := len(inputCases) - for unclosedInputChannels > 0 { - chosen, value, ok := reflect.Select(inputCases) + lastMessageTime := time.Now() +loop: + for { + // this mechanism removes closed input channels. When there is none left, the subscription is considered "disconnected". + _, value, ok := reflect.Select(inputCases) if !ok { - // The chosen channel has been closed, so zero out the channel to disable the case - inputCases[chosen].Chan = reflect.ValueOf(nil) - unclosedInputChannels-- - continue + logger.Debug("Failed to read from the channel") + break loop } - if unsubscribed != nil && unsubscribed.Load() { + // flag that the service needs to stop + if stopped != nil && stopped.Load() { return } + // flag that the backend channels were disconnected + if backendDisconnected != nil && backendDisconnected.Load() { + break loop + } + switch v := value.Interface().(type) { case time.Time: - // exit the loop to avoid a goroutine leak - if unsubscribed != nil && unsubscribed.Load() { - return + // no message was received longer than the timeout. Exiting. + if time.Since(lastMessageTime) > timeoutInterval { + break loop } case R: + lastMessageTime = time.Now() err := onMessage(v) if err != nil { - // todo - log - return + logger.Error("Failed to process message", log.ErrKey, err) + break loop } default: // ignore unexpected element - continue + logger.Warn("Received unexpected message type.", "type", reflect.TypeOf(v), "value", value) + break loop } } + + if onBackendDisconnect != nil { + onBackendDisconnect() + } } // HandleUnsubscribe - when the client calls "unsubscribe" or the subscription times out, it calls `onSub` // Must be called as a go routine! -func HandleUnsubscribe(connectionSub *rpc.Subscription, unsubscribed *atomic.Bool, onUnsub func()) { +func HandleUnsubscribe(connectionSub *rpc.Subscription, onUnsub func()) { <-connectionSub.Err() onUnsub() } + +// HandleUnsubscribeErrChan - when the client calls "unsubscribe" or the subscription times out, it calls `onSub` +// Must be called as a go routine! +func HandleUnsubscribeErrChan(errChan []<-chan error, onUnsub func()) { + inputCases := make([]reflect.SelectCase, len(errChan)) + for i, ch := range errChan { + inputCases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} + } + reflect.Select(inputCases) + onUnsub() +} diff --git a/go/enclave/enclave.go b/go/enclave/enclave.go index 1114790aad..af4788569f 100644 --- a/go/enclave/enclave.go +++ b/go/enclave/enclave.go @@ -352,7 +352,11 @@ func (e *enclaveImpl) StopClient() common.SystemError { } func (e *enclaveImpl) sendBatch(batch *core.Batch, outChannel chan common.StreamL2UpdatesResponse) { - e.logger.Info("Streaming batch to host", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.SeqNo()) + if batch.SeqNo().Uint64()%10 == 0 { + e.logger.Info("Streaming batch to host", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.SeqNo()) + } else { + e.logger.Debug("Streaming batch to host", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.SeqNo()) + } extBatch, err := batch.ToExtBatch(e.dataEncryptionService, e.dataCompressionService) if err != nil { // this error is unrecoverable diff --git a/go/enclave/l2chain/l2_chain.go b/go/enclave/l2chain/l2_chain.go index f01d825a9b..b0c34c4c32 100644 --- a/go/enclave/l2chain/l2_chain.go +++ b/go/enclave/l2chain/l2_chain.go @@ -104,7 +104,7 @@ func (oc *obscuroChain) GetBalanceAtBlock(ctx context.Context, accountAddr gethc func (oc *obscuroChain) ObsCall(ctx context.Context, apiArgs *gethapi.TransactionArgs, blockNumber *gethrpc.BlockNumber) (*gethcore.ExecutionResult, error) { result, err := oc.ObsCallAtBlock(ctx, apiArgs, blockNumber) if err != nil { - oc.logger.Info(fmt.Sprintf("Obs_Call: failed to execute contract %s.", apiArgs.To), log.CtrErrKey, err.Error()) + oc.logger.Debug(fmt.Sprintf("Obs_Call: failed to execute contract %s.", apiArgs.To), log.CtrErrKey, err.Error()) return nil, err } diff --git a/go/enclave/rpc/GetCustomQuery.go b/go/enclave/rpc/GetCustomQuery.go index a280b4c33d..148a50526e 100644 --- a/go/enclave/rpc/GetCustomQuery.go +++ b/go/enclave/rpc/GetCustomQuery.go @@ -31,14 +31,14 @@ func GetCustomQueryExecute(builder *CallBuilder[common.PrivateCustomQueryListTra return nil //nolint:nilerr } - encryptReceipts, err := rpc.storage.GetReceiptsPerAddress(builder.ctx, &builder.Param.Address, &builder.Param.Pagination) + encryptReceipts, err := rpc.storage.GetTransactionsPerAddress(builder.ctx, &builder.Param.Address, &builder.Param.Pagination) if err != nil { - return fmt.Errorf("GetReceiptsPerAddress - %w", err) + return fmt.Errorf("GetTransactionsPerAddress - %w", err) } - receiptsCount, err := rpc.storage.GetReceiptsPerAddressCount(builder.ctx, &builder.Param.Address) + receiptsCount, err := rpc.storage.CountTransactionsPerAddress(builder.ctx, &builder.Param.Address) if err != nil { - return fmt.Errorf("GetReceiptsPerAddressCount - %w", err) + return fmt.Errorf("CountTransactionsPerAddress - %w", err) } builder.ReturnValue = &common.PrivateQueryResponse{ diff --git a/go/enclave/storage/enclavedb/batch.go b/go/enclave/storage/enclavedb/batch.go index 423a35d3a8..68b783634e 100644 --- a/go/enclave/storage/enclavedb/batch.go +++ b/go/enclave/storage/enclavedb/batch.go @@ -443,13 +443,11 @@ func BatchWasExecuted(ctx context.Context, db *sql.DB, hash common.L2BatchHash) return result, nil } -func GetReceiptsPerAddress(ctx context.Context, db *sql.DB, config *params.ChainConfig, address *gethcommon.Address, pagination *common.QueryPagination) (types.Receipts, error) { - // todo - not indexed +func GetTransactionsPerAddress(ctx context.Context, db *sql.DB, config *params.ChainConfig, address *gethcommon.Address, pagination *common.QueryPagination) (types.Receipts, error) { return selectReceipts(ctx, db, config, "where tx.sender_address = ? ORDER BY height DESC LIMIT ? OFFSET ? ", address.Bytes(), pagination.Size, pagination.Offset) } -func GetReceiptsPerAddressCount(ctx context.Context, db *sql.DB, address *gethcommon.Address) (uint64, error) { - // todo - this is not indexed and will do a full table scan! +func CountTransactionsPerAddress(ctx context.Context, db *sql.DB, address *gethcommon.Address) (uint64, error) { row := db.QueryRowContext(ctx, "select count(1) from exec_tx join tx on tx.id=exec_tx.tx join batch on batch.sequence=exec_tx.batch "+" where tx.sender_address = ?", address.Bytes()) var count uint64 @@ -461,63 +459,6 @@ func GetReceiptsPerAddressCount(ctx context.Context, db *sql.DB, address *gethco return count, nil } -func GetPublicTransactionData(ctx context.Context, db *sql.DB, pagination *common.QueryPagination) ([]common.PublicTransaction, error) { - return selectPublicTxsBySender(ctx, db, " ORDER BY height DESC LIMIT ? OFFSET ? ", pagination.Size, pagination.Offset) -} - -func selectPublicTxsBySender(ctx context.Context, db *sql.DB, query string, args ...any) ([]common.PublicTransaction, error) { - var publicTxs []common.PublicTransaction - - q := "select tx.hash, batch.height, batch.header from exec_tx join batch on batch.sequence=exec_tx.batch join tx on tx.id=exec_tx.tx where batch.is_canonical=true " + query - rows, err := db.QueryContext(ctx, q, args...) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - // make sure the error is converted to obscuro-wide not found error - return nil, errutil.ErrNotFound - } - return nil, err - } - defer rows.Close() - for rows.Next() { - var txHash []byte - var batchHeight uint64 - var batchHeader string - err := rows.Scan(&txHash, &batchHeight, &batchHeader) - if err != nil { - return nil, err - } - - h := new(common.BatchHeader) - if err := rlp.DecodeBytes([]byte(batchHeader), h); err != nil { - return nil, fmt.Errorf("could not decode batch header. Cause: %w", err) - } - - publicTxs = append(publicTxs, common.PublicTransaction{ - TransactionHash: gethcommon.BytesToHash(txHash), - BatchHeight: big.NewInt(0).SetUint64(batchHeight), - BatchTimestamp: h.Time, - Finality: common.BatchFinal, - }) - } - if rows.Err() != nil { - return nil, rows.Err() - } - - return publicTxs, nil -} - -func GetPublicTransactionCount(ctx context.Context, db *sql.DB) (uint64, error) { - row := db.QueryRowContext(ctx, "select count(1) from exec_tx join batch on batch.sequence=exec_tx.batch where batch.is_canonical=true") - - var count uint64 - err := row.Scan(&count) - if err != nil { - return 0, err - } - - return count, nil -} - func FetchConvertedBatchHash(ctx context.Context, db *sql.DB, seqNo uint64) (gethcommon.Hash, error) { var hash []byte diff --git a/go/enclave/storage/enclavedb/block.go b/go/enclave/storage/enclavedb/block.go index 85a9198056..a08113e409 100644 --- a/go/enclave/storage/enclavedb/block.go +++ b/go/enclave/storage/enclavedb/block.go @@ -71,9 +71,9 @@ func updateCanonicalValue(ctx context.Context, dbtx *sql.Tx, isCanonical bool, b return nil } -func SetMissingBlockId(ctx context.Context, dbtx *sql.Tx, blockId int64, blockHash common.L1BlockHash) error { - // handle the corner case where the block wasn't available - _, err := dbtx.ExecContext(ctx, "update batch set l1_proof=? where (l1_proof is null) and l1_proof_hash=?", blockId, blockHash.Bytes()) +// HandleBlockArrivedAfterBatches- handle the corner case where the block wasn't available when the batch was received +func HandleBlockArrivedAfterBatches(ctx context.Context, dbtx *sql.Tx, blockId int64, blockHash common.L1BlockHash) error { + _, err := dbtx.ExecContext(ctx, "update batch set l1_proof=?, is_canonical=true where l1_proof_hash=?", blockId, blockHash.Bytes()) return err } @@ -96,7 +96,7 @@ func FetchBatchHeadersBetween(ctx context.Context, db *sql.DB, start *big.Int, e func GetBlockId(ctx context.Context, db *sql.Tx, hash common.L1BlockHash) (int64, error) { var id int64 - err := db.QueryRowContext(ctx, "select id from block where hash=? ", hash).Scan(&id) + err := db.QueryRowContext(ctx, "select id from block where hash=? ", hash.Bytes()).Scan(&id) if err != nil { return 0, err } diff --git a/go/enclave/storage/init/edgelessdb/001_init.sql b/go/enclave/storage/init/edgelessdb/001_init.sql index 8dc56204a6..878b2452c2 100644 --- a/go/enclave/storage/init/edgelessdb/001_init.sql +++ b/go/enclave/storage/init/edgelessdb/001_init.sql @@ -92,7 +92,8 @@ create table if not exists obsdb.batch primary key (sequence), INDEX USING HASH (hash(8)), INDEX USING HASH (l1_proof_hash(8)), - INDEX (body, l1_proof), + INDEX (body), + INDEX (l1_proof), INDEX (height) ); GRANT ALL ON obsdb.batch TO obscuro; @@ -107,6 +108,7 @@ create table if not exists obsdb.tx idx int NOT NULL, body int NOT NULL, INDEX USING HASH (hash(8)), + INDEX USING HASH (sender_address), primary key (id) ); GRANT ALL ON obsdb.tx TO obscuro; diff --git a/go/enclave/storage/init/sqlite/001_init.sql b/go/enclave/storage/init/sqlite/001_init.sql index 6f07aaa91a..9a0ea2e06b 100644 --- a/go/enclave/storage/init/sqlite/001_init.sql +++ b/go/enclave/storage/init/sqlite/001_init.sql @@ -80,7 +80,8 @@ create table if not exists batch ); create index IDX_BATCH_HASH on batch (hash); create index IDX_BATCH_BLOCK on batch (l1_proof_hash); -create index IDX_BATCH_BODY on batch (body, l1_proof); +create index IDX_BATCH_BODY on batch (body); +create index IDX_BATCH_L1 on batch (l1_proof); create index IDX_BATCH_HEIGHT on batch (height); create table if not exists tx @@ -94,6 +95,7 @@ create table if not exists tx body int NOT NULL REFERENCES batch_body ); create index IDX_TX_HASH on tx (hash); +create index IDX_TX_SENDER_ADDRESS on tx (sender_address); create table if not exists exec_tx ( diff --git a/go/enclave/storage/interfaces.go b/go/enclave/storage/interfaces.go index c182635274..4b68859a10 100644 --- a/go/enclave/storage/interfaces.go +++ b/go/enclave/storage/interfaces.go @@ -152,9 +152,7 @@ type Storage interface { type ScanStorage interface { GetContractCount(ctx context.Context) (*big.Int, error) - GetReceiptsPerAddress(ctx context.Context, address *gethcommon.Address, pagination *common.QueryPagination) (types.Receipts, error) - GetPublicTransactionData(ctx context.Context, pagination *common.QueryPagination) ([]common.PublicTransaction, error) - GetPublicTransactionCount(ctx context.Context) (uint64, error) + GetTransactionsPerAddress(ctx context.Context, address *gethcommon.Address, pagination *common.QueryPagination) (types.Receipts, error) - GetReceiptsPerAddressCount(ctx context.Context, addr *gethcommon.Address) (uint64, error) + CountTransactionsPerAddress(ctx context.Context, addr *gethcommon.Address) (uint64, error) } diff --git a/go/enclave/storage/storage.go b/go/enclave/storage/storage.go index 8a35a348ab..d5f1cc2f42 100644 --- a/go/enclave/storage/storage.go +++ b/go/enclave/storage/storage.go @@ -205,7 +205,7 @@ func (s *storageImpl) FetchCanonicalBatchesBetween(ctx context.Context, startSeq return enclavedb.ReadCanonicalBatches(ctx, s.db.GetSQLDB(), startSeq, endSeq) } -func (s *storageImpl) StoreBlock(ctx context.Context, b *types.Block, chainFork *common.ChainFork) error { +func (s *storageImpl) StoreBlock(ctx context.Context, block *types.Block, chainFork *common.ChainFork) error { defer s.logDuration("StoreBlock", measure.NewStopwatch()) dbTx, err := s.db.NewDBTransaction(ctx) if err != nil { @@ -213,17 +213,17 @@ func (s *storageImpl) StoreBlock(ctx context.Context, b *types.Block, chainFork } defer dbTx.Rollback() - if err := enclavedb.WriteBlock(ctx, dbTx, b.Header()); err != nil { - return fmt.Errorf("2. could not store block %s. Cause: %w", b.Hash(), err) + if err := enclavedb.WriteBlock(ctx, dbTx, block.Header()); err != nil { + return fmt.Errorf("2. could not store block %s. Cause: %w", block.Hash(), err) } - blockId, err := enclavedb.GetBlockId(ctx, dbTx, b.Hash()) + blockId, err := enclavedb.GetBlockId(ctx, dbTx, block.Hash()) if err != nil { - return fmt.Errorf("could not get block id - %w", err) + return fmt.Errorf("3. could not get block id - %w", err) } // In case there were any batches inserted before this block was received - err = enclavedb.SetMissingBlockId(ctx, dbTx, blockId, b.Hash()) + err = enclavedb.HandleBlockArrivedAfterBatches(ctx, dbTx, blockId, block.Hash()) if err != nil { return err } @@ -236,16 +236,11 @@ func (s *storageImpl) StoreBlock(ctx context.Context, b *types.Block, chainFork } } - err = enclavedb.UpdateCanonicalBlocks(ctx, dbTx, []common.L1BlockHash{b.Hash()}, nil, s.logger) - if err != nil { - return err - } - if err := dbTx.Commit(); err != nil { - return fmt.Errorf("3. could not store block %s. Cause: %w", b.Hash(), err) + return fmt.Errorf("4. could not store block %s. Cause: %w", block.Hash(), err) } - common.CacheValue(ctx, s.blockCache, s.logger, b.Hash(), b) + common.CacheValue(ctx, s.blockCache, s.logger, block.Hash(), block) return nil } @@ -683,24 +678,14 @@ func (s *storageImpl) BatchWasExecuted(ctx context.Context, hash common.L2BatchH return enclavedb.BatchWasExecuted(ctx, s.db.GetSQLDB(), hash) } -func (s *storageImpl) GetReceiptsPerAddress(ctx context.Context, address *gethcommon.Address, pagination *common.QueryPagination) (types.Receipts, error) { - defer s.logDuration("GetReceiptsPerAddress", measure.NewStopwatch()) - return enclavedb.GetReceiptsPerAddress(ctx, s.db.GetSQLDB(), s.chainConfig, address, pagination) -} - -func (s *storageImpl) GetReceiptsPerAddressCount(ctx context.Context, address *gethcommon.Address) (uint64, error) { - defer s.logDuration("GetReceiptsPerAddressCount", measure.NewStopwatch()) - return enclavedb.GetReceiptsPerAddressCount(ctx, s.db.GetSQLDB(), address) -} - -func (s *storageImpl) GetPublicTransactionData(ctx context.Context, pagination *common.QueryPagination) ([]common.PublicTransaction, error) { - defer s.logDuration("GetPublicTransactionData", measure.NewStopwatch()) - return enclavedb.GetPublicTransactionData(ctx, s.db.GetSQLDB(), pagination) +func (s *storageImpl) GetTransactionsPerAddress(ctx context.Context, address *gethcommon.Address, pagination *common.QueryPagination) (types.Receipts, error) { + defer s.logDuration("GetTransactionsPerAddress", measure.NewStopwatch()) + return enclavedb.GetTransactionsPerAddress(ctx, s.db.GetSQLDB(), s.chainConfig, address, pagination) } -func (s *storageImpl) GetPublicTransactionCount(ctx context.Context) (uint64, error) { - defer s.logDuration("GetPublicTransactionCount", measure.NewStopwatch()) - return enclavedb.GetPublicTransactionCount(ctx, s.db.GetSQLDB()) +func (s *storageImpl) CountTransactionsPerAddress(ctx context.Context, address *gethcommon.Address) (uint64, error) { + defer s.logDuration("CountTransactionsPerAddress", measure.NewStopwatch()) + return enclavedb.CountTransactionsPerAddress(ctx, s.db.GetSQLDB(), address) } func (s *storageImpl) logDuration(method string, stopWatch *measure.Stopwatch) { diff --git a/go/host/enclave/service.go b/go/host/enclave/service.go index cc4edc82f4..c7637c39c0 100644 --- a/go/host/enclave/service.go +++ b/go/host/enclave/service.go @@ -91,19 +91,11 @@ func (e *Service) HealthStatus(ctx context.Context) host.HealthStatus { return &host.GroupErrsHealthStatus{Errors: errors} } -func (e *Service) HealthyGuardian(ctx context.Context) *Guardian { - for _, guardian := range e.enclaveGuardians { - if guardian.HealthStatus(ctx).OK() { - return guardian - } - } - return nil -} - // LookupBatchBySeqNo is used to fetch batch data from the enclave - it is only used as a fallback for the sequencer // host if it's missing a batch (other host services should use L2Repo to fetch batch data) func (e *Service) LookupBatchBySeqNo(ctx context.Context, seqNo *big.Int) (*common.ExtBatch, error) { - hg := e.HealthyGuardian(ctx) + // todo (@matt) revisit this flow to make sure it handles HA scenarios properly + hg := e.enclaveGuardians[0] state := hg.GetEnclaveState() if state.GetEnclaveL2Head().Cmp(seqNo) < 0 { return nil, errutil.ErrNotFound diff --git a/go/host/events/logs.go b/go/host/events/logs.go index b0f25a246d..97a2914be3 100644 --- a/go/host/events/logs.go +++ b/go/host/events/logs.go @@ -63,19 +63,22 @@ func (l *LogEventManager) Subscribe(id rpc.ID, encryptedLogSubscription common.E func (l *LogEventManager) Unsubscribe(id rpc.ID) { enclaveUnsubErr := l.sl.Enclaves().Unsubscribe(id) if enclaveUnsubErr != nil { - // this can happen when the client passes a invalid subscription id + // this can happen when the client passes an invalid subscription id l.logger.Debug("Could not terminate enclave subscription", log.SubIDKey, id, log.ErrKey, enclaveUnsubErr) } - l.subscriptionMutex.Lock() - defer l.subscriptionMutex.Unlock() - + l.subscriptionMutex.RLock() logSubscription, found := l.subscriptions[id] + ch := logSubscription.ch + l.subscriptionMutex.RUnlock() + if found { - close(logSubscription.ch) + l.subscriptionMutex.Lock() delete(l.subscriptions, id) + l.subscriptionMutex.Unlock() if enclaveUnsubErr != nil { l.logger.Error("The subscription management between the host and the enclave is out of sync", log.SubIDKey, id, log.ErrKey, enclaveUnsubErr) } + close(ch) } } diff --git a/go/host/rpc/clientapi/client_api_filter.go b/go/host/rpc/clientapi/client_api_filter.go index 238f5d5abf..64746208af 100644 --- a/go/host/rpc/clientapi/client_api_filter.go +++ b/go/host/rpc/clientapi/client_api_filter.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync/atomic" + "time" "github.com/ten-protocol/go-ten/go/common/host" subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription" @@ -26,9 +27,16 @@ type FilterAPI struct { func NewFilterAPI(host host.Host, logger gethlog.Logger) *FilterAPI { return &FilterAPI{ - host: host, - logger: logger, - NewHeadsService: subscriptioncommon.NewNewHeadsService(host.NewHeadsChan(), false, logger, nil), + host: host, + logger: logger, + NewHeadsService: subscriptioncommon.NewNewHeadsService( + func() (chan *common.BatchHeader, <-chan error, error) { + return host.NewHeadsChan(), nil, nil + }, + false, + logger, + nil, + ), } } @@ -57,12 +65,23 @@ func (api *FilterAPI) Logs(ctx context.Context, encryptedParams common.Encrypted } var unsubscribed atomic.Bool - go subscriptioncommon.ForwardFromChannels([]chan []byte{logsFromSubscription}, &unsubscribed, func(elem []byte) error { - return notifier.Notify(subscription.ID, elem) - }) - go subscriptioncommon.HandleUnsubscribe(subscription, &unsubscribed, func() { - api.host.UnsubscribeLogs(subscription.ID) + go subscriptioncommon.ForwardFromChannels( + []chan []byte{logsFromSubscription}, + func(elem []byte) error { + return notifier.Notify(subscription.ID, elem) + }, + nil, + nil, + &unsubscribed, + 12*time.Hour, + api.logger, + ) + go subscriptioncommon.HandleUnsubscribe(subscription, func() { + // first exit the forwarding go-routine unsubscribed.Store(true) + time.Sleep(100 * time.Millisecond) + // and then close the channel + api.host.UnsubscribeLogs(subscription.ID) }) return subscription, nil } diff --git a/go/host/rpc/clientapi/client_api_scan.go b/go/host/rpc/clientapi/client_api_scan.go index 41dd7ff1bc..7b6feb5395 100644 --- a/go/host/rpc/clientapi/client_api_scan.go +++ b/go/host/rpc/clientapi/client_api_scan.go @@ -39,6 +39,11 @@ func (s *ScanAPI) GetTotalTransactionCount() (*big.Int, error) { return s.host.Storage().FetchTotalTxCount() } +// GetTotalTransactionsQuery returns the number of recorded transactions on the network. +func (s *ScanAPI) GetTotalTransactionsQuery() (*big.Int, error) { + return s.host.Storage().FetchTotalTxsQuery() +} + // GetBatchListingNew returns a paginated list of batches func (s *ScanAPI) GetBatchListingNew(pagination *common.QueryPagination) (*common.BatchListingResponse, error) { return s.host.Storage().FetchBatchListing(pagination) diff --git a/go/host/storage/hostdb/transaction.go b/go/host/storage/hostdb/transaction.go index 2091451c23..49a4b0507e 100644 --- a/go/host/storage/hostdb/transaction.go +++ b/go/host/storage/hostdb/transaction.go @@ -13,6 +13,7 @@ const ( selectTxCount = "SELECT total FROM transaction_count WHERE id = 1" selectTx = "SELECT full_hash, b_sequence FROM transaction_host WHERE hash = " selectTxs = "SELECT t.full_hash, b.ext_batch FROM transaction_host t JOIN batch_host b ON t.b_sequence = b.sequence ORDER BY b.height DESC " + countTxs = "SELECT COUNT(b_sequence) AS row_count FROM transaction_host" ) // GetTransactionListing returns a paginated list of transactions in descending order @@ -82,7 +83,7 @@ func GetTransaction(db HostDB, hash gethcommon.Hash) (*common.PublicTransaction, return tx, nil } -// GetTotalTxCount returns the total number of batched transactions +// GetTotalTxCount returns value from the transaction count table func GetTotalTxCount(db HostDB) (*big.Int, error) { var totalCount int err := db.GetSQLDB().QueryRow(selectTxCount).Scan(&totalCount) @@ -91,3 +92,13 @@ func GetTotalTxCount(db HostDB) (*big.Int, error) { } return big.NewInt(int64(totalCount)), nil } + +// GetTotalTxsQuery returns the count of the transactions table +func GetTotalTxsQuery(db HostDB) (*big.Int, error) { + var totalCount int + err := db.GetSQLDB().QueryRow(countTxs).Scan(&totalCount) + if err != nil { + return nil, fmt.Errorf("failed to count number of transactions: %w", err) + } + return big.NewInt(int64(totalCount)), nil +} diff --git a/go/host/storage/hostdb/transaction_test.go b/go/host/storage/hostdb/transaction_test.go index 2df05b5ec4..f541342ac3 100644 --- a/go/host/storage/hostdb/transaction_test.go +++ b/go/host/storage/hostdb/transaction_test.go @@ -1,6 +1,7 @@ package hostdb import ( + "fmt" "math/big" "testing" @@ -143,3 +144,28 @@ func TestCanRetrieveTotalNumberOfTransactions(t *testing.T) { t.Errorf("total number of batch transactions was not stored correctly") } } + +func TestTotalTxsQuery(t *testing.T) { + db, _ := createSQLiteDB(t) + var txHashes []common.L2TxHash + for i := 0; i < 100; i++ { + txHash := gethcommon.BytesToHash([]byte(fmt.Sprintf("magicString%d", i+1))) + txHashes = append(txHashes, txHash) + } + batchOne := createBatch(batchNumber, txHashes) + dbtx, _ := db.NewDBTransaction() + err := AddBatch(dbtx, db.GetSQLStatement(), &batchOne) + if err != nil { + t.Errorf("could not store batch. Cause: %s", err) + } + dbtx.Write() + + totalTxs, err := GetTotalTxsQuery(db) + if err != nil { + t.Errorf("was not able to count total number of transactions. Cause: %s", err) + } + + if totalTxs.Cmp(big.NewInt(100)) != 0 { + t.Errorf("total number of transactions was not counted correction") + } +} diff --git a/go/host/storage/interfaces.go b/go/host/storage/interfaces.go index e8997d8451..d7e229c36e 100644 --- a/go/host/storage/interfaces.go +++ b/go/host/storage/interfaces.go @@ -42,8 +42,10 @@ type BatchResolver interface { FetchBatchHeaderByHeight(height *big.Int) (*common.BatchHeader, error) // FetchBatchByHeight returns the `PublicBatch` with the given height FetchBatchByHeight(height *big.Int) (*common.PublicBatch, error) - // FetchTotalTxCount returns the number of transactions in the DB + // FetchTotalTxCount returns the value of the transactions_count table. Fast but inaccurate for Tenscan FetchTotalTxCount() (*big.Int, error) + // FetchTotalTxsQuery returns the number of transactions in the DB. Required for e2e tests + FetchTotalTxsQuery() (*big.Int, error) // FetchTransaction returns the transaction given its hash FetchTransaction(hash gethcommon.Hash) (*common.PublicTransaction, error) // FetchBatchTransactions returns a list of public transaction data within a given batch hash diff --git a/go/host/storage/storage.go b/go/host/storage/storage.go index 9b6d1681f7..03baadccb7 100644 --- a/go/host/storage/storage.go +++ b/go/host/storage/storage.go @@ -154,6 +154,10 @@ func (s *storageImpl) FetchTotalTxCount() (*big.Int, error) { return hostdb.GetTotalTxCount(s.db) } +func (s *storageImpl) FetchTotalTxsQuery() (*big.Int, error) { + return hostdb.GetTotalTxsQuery(s.db) +} + func (s *storageImpl) FetchTransaction(hash gethcommon.Hash) (*common.PublicTransaction, error) { return hostdb.GetTransaction(s.db, hash) } diff --git a/go/rpc/client.go b/go/rpc/client.go index 89d404b0df..81227b0266 100644 --- a/go/rpc/client.go +++ b/go/rpc/client.go @@ -34,6 +34,7 @@ const ( GetBatchByTx = "scan_getBatchByTx" GetLatestRollupHeader = "scan_getLatestRollupHeader" GetTotalTxCount = "scan_getTotalTransactionCount" + GetTotalTxsQuery = "scan_getTotalTransactionsQuery" GetTotalContractCount = "scan_getTotalContractCount" GetPublicTransactionData = "scan_getPublicTransactionData" GetBatchListing = "scan_getBatchListing" diff --git a/go/rpc/encrypted_client.go b/go/rpc/encrypted_client.go index 1675248319..764890a857 100644 --- a/go/rpc/encrypted_client.go +++ b/go/rpc/encrypted_client.go @@ -6,6 +6,8 @@ import ( "encoding/json" "fmt" "reflect" + "sync/atomic" + "time" "github.com/ten-protocol/go-ten/go/common/rpc" "github.com/ten-protocol/go-ten/go/common/subscription" @@ -253,31 +255,45 @@ func (c *EncRPCClient) logSubscription(ctx context.Context, namespace string, ch return nil, err } - // todo - do we need to handle unsubscribe in a special way? - // probably not, because when the inbound channel is closed, this goroutine will exit as well. - go subscription.ForwardFromChannels([]chan []byte{inboundChannel}, nil, func(encLog []byte) error { - jsonLogs, err := c.decryptResponse(encLog) - if err != nil { - c.logger.Error("could not decrypt logs received from subscription.", log.ErrKey, err) - return err - } - - var logs []*types.Log - err = json.Unmarshal(jsonLogs, &logs) - if err != nil { - c.logger.Error(fmt.Sprintf("could not unmarshal log from JSON. Received data: %s.", string(jsonLogs)), log.ErrKey, err) - return err - } - - for _, decryptedLog := range logs { - outboundChannel <- *decryptedLog - } - return nil + backendDisconnected := &atomic.Bool{} + go subscription.HandleUnsubscribeErrChan([]<-chan error{backendSub.Err()}, func() { + backendDisconnected.Store(true) }) + go subscription.ForwardFromChannels( + []chan []byte{inboundChannel}, + func(encLog []byte) error { + return c.onMessage(encLog, outboundChannel) + }, + nil, + backendDisconnected, + nil, + 12*time.Hour, + c.logger, + ) return backendSub, nil } +func (c *EncRPCClient) onMessage(encLog []byte, outboundChannel chan types.Log) error { + jsonLogs, err := c.decryptResponse(encLog) + if err != nil { + c.logger.Error("could not decrypt logs received from subscription.", log.ErrKey, err) + return err + } + + var logs []*types.Log + err = json.Unmarshal(jsonLogs, &logs) + if err != nil { + c.logger.Error(fmt.Sprintf("could not unmarshal log from JSON. Received data: %s.", string(jsonLogs)), log.ErrKey, err) + return err + } + + for _, decryptedLog := range logs { + outboundChannel <- *decryptedLog + } + return nil +} + func (c *EncRPCClient) newHeadSubscription(ctx context.Context, namespace string, ch interface{}, args ...any) (*gethrpc.ClientSubscription, error) { return nil, fmt.Errorf("not implemented") } diff --git a/tools/tenscan/backend/webserver/webserver_routes_items.go b/tools/tenscan/backend/webserver/webserver_routes_items.go index b53878faef..cacfa51dc8 100644 --- a/tools/tenscan/backend/webserver/webserver_routes_items.go +++ b/tools/tenscan/backend/webserver/webserver_routes_items.go @@ -11,23 +11,30 @@ import ( ) func routeItems(r *gin.Engine, server *WebServer) { + // info + r.GET("/info/obscuro/", server.getConfig) + r.GET("/info/health/", server.getHealthStatus) + + // batches + r.GET("/items/batches/", server.getBatchListingDeprecated) + r.GET("/items/v2/batches/", server.getBatchListingNew) r.GET("/items/batch/latest/", server.getLatestBatch) r.GET("/items/batch/:hash", server.getBatch) - r.GET("/items/rollup/latest/", server.getLatestRollupHeader) - r.GET("/items/batches/", server.getBatchListingDeprecated) - r.GET("/items/blocks/", server.getBlockListing) // Deprecated - r.GET("/items/transactions/", server.getPublicTransactions) - r.GET("/info/obscuro/", server.getConfig) - r.POST("/info/health/", server.getHealthStatus) + r.GET("/items/batch/:hash/transactions", server.getBatchTransactions) + r.GET("/items/batch/height/:height", server.getBatchByHeight) + // rollups r.GET("/items/rollups/", server.getRollupListing) // New - r.GET("/items/v2/batches/", server.getBatchListingNew) + r.GET("/items/rollup/latest/", server.getLatestRollupHeader) r.GET("/items/rollup/:hash", server.getRollup) r.GET("/items/rollup/:hash/batches", server.getRollupBatches) - r.GET("/items/batch/:hash/transactions", server.getBatchTransactions) - r.GET("/items/batch/height/:height", server.getBatchByHeight) r.GET("/items/rollup/batch/:seq", server.getRollupBySeq) + + // transactions + r.GET("/items/transactions/", server.getPublicTransactions) r.GET("/items/transaction/:hash", server.getTransaction) + r.GET("/items/transactions/count", server.getTotalTxCount) + r.GET("/items/blocks/", server.getBlockListing) // Deprecated } func (w *WebServer) getHealthStatus(c *gin.Context) { @@ -40,7 +47,7 @@ func (w *WebServer) getHealthStatus(c *gin.Context) { func (w *WebServer) getLatestBatch(c *gin.Context) { batch, err := w.backend.GetLatestBatch() if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getLatestBatch request %w", err), w.logger) return } @@ -50,7 +57,7 @@ func (w *WebServer) getLatestBatch(c *gin.Context) { func (w *WebServer) getLatestRollupHeader(c *gin.Context) { rollup, err := w.backend.GetLatestRollupHeader() if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getLatestRollupHeader request %w", err), w.logger) return } @@ -62,7 +69,7 @@ func (w *WebServer) getBatch(c *gin.Context) { parsedHash := gethcommon.HexToHash(hash) batch, err := w.backend.GetBatchByHash(parsedHash) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getBatch request %w", err), w.logger) return } @@ -76,7 +83,7 @@ func (w *WebServer) getBatchByHeight(c *gin.Context) { heightBigInt.SetString(heightStr, 10) batch, err := w.backend.GetBatchByHeight(heightBigInt) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getBatchByHeight request %w", err), w.logger) return } @@ -94,7 +101,7 @@ func (w *WebServer) getRollupBySeq(c *gin.Context) { batch, err := w.backend.GetRollupBySeqNo(seq) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getRollupBySeq request %w", err), w.logger) return } @@ -106,7 +113,7 @@ func (w *WebServer) getBatchHeader(c *gin.Context) { parsedHash := gethcommon.HexToHash(hash) batch, err := w.backend.GetBatchHeader(parsedHash) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getBatchHeader request %w", err), w.logger) return } @@ -118,7 +125,7 @@ func (w *WebServer) getTransaction(c *gin.Context) { parsedHash := gethcommon.HexToHash(hash) batch, err := w.backend.GetTransaction(parsedHash) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getTransaction request %w", err), w.logger) return } @@ -131,44 +138,54 @@ func (w *WebServer) getPublicTransactions(c *gin.Context) { offset, err := strconv.ParseUint(offsetStr, 10, 32) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to parse getPublicTransactions offset units %w", err), w.logger) return } - parseUint, err := strconv.ParseUint(sizeStr, 10, 64) + size, err := strconv.ParseUint(sizeStr, 10, 64) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to parse getPublicTransactions size units %w", err), w.logger) return } - publicTxs, err := w.backend.GetPublicTransactions(offset, parseUint) + publicTxs, err := w.backend.GetPublicTransactions(offset, size) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getPublicTransactions request %w", err), w.logger) return } c.JSON(http.StatusOK, gin.H{"result": publicTxs}) } +func (w *WebServer) getTotalTxCount(c *gin.Context) { + txCount, err := w.backend.GetTotalTransactionCount() + if err != nil { + errorHandler(c, fmt.Errorf("unable to execute getTotalTxCount request %w", err), w.logger) + return + } + + c.JSON(http.StatusOK, gin.H{"result": txCount}) +} + func (w *WebServer) getBatchListingNew(c *gin.Context) { offsetStr := c.DefaultQuery("offset", "0") sizeStr := c.DefaultQuery("size", "10") offset, err := strconv.ParseUint(offsetStr, 10, 32) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to parse getBatchListingNew offset units %w", err), w.logger) return } parseUint, err := strconv.ParseUint(sizeStr, 10, 64) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to parse getBatchListingNew size units %w", err), w.logger) return } batchesListing, err := w.backend.GetBatchesListing(offset, parseUint) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getBatchListingNew request %w", err), w.logger) return } @@ -181,19 +198,19 @@ func (w *WebServer) getBatchListingDeprecated(c *gin.Context) { offset, err := strconv.ParseUint(offsetStr, 10, 32) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to parse getBatchListingDeprecated offset units %w", err), w.logger) return } - parseUint, err := strconv.ParseUint(sizeStr, 10, 64) + size, err := strconv.ParseUint(sizeStr, 10, 64) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to parse getBatchListingDeprecated size units %w", err), w.logger) return } - batchesListing, err := w.backend.GetBatchesListingDeprecated(offset, parseUint) + batchesListing, err := w.backend.GetBatchesListingDeprecated(offset, size) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getBatchListingDeprecated request %w", err), w.logger) return } @@ -206,19 +223,19 @@ func (w *WebServer) getRollupListing(c *gin.Context) { offset, err := strconv.ParseUint(offsetStr, 10, 32) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to parse getRollupListing offset units %w", err), w.logger) return } - parseUint, err := strconv.ParseUint(sizeStr, 10, 64) + size, err := strconv.ParseUint(sizeStr, 10, 64) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to parse getRollupListing size units %w", err), w.logger) return } - rollupListing, err := w.backend.GetRollupListing(offset, parseUint) + rollupListing, err := w.backend.GetRollupListing(offset, size) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getRollupListing request %w", err), w.logger) return } @@ -231,19 +248,19 @@ func (w *WebServer) getBlockListing(c *gin.Context) { offset, err := strconv.ParseUint(offsetStr, 10, 32) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to parse getBlockListing offset units %w", err), w.logger) return } - parseUint, err := strconv.ParseUint(sizeStr, 10, 64) + size, err := strconv.ParseUint(sizeStr, 10, 64) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to parse getBlockListing size units %w", err), w.logger) return } - batchesListing, err := w.backend.GetBlockListing(offset, parseUint) + batchesListing, err := w.backend.GetBlockListing(offset, size) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getBlockListing request %w", err), w.logger) return } @@ -255,7 +272,7 @@ func (w *WebServer) getRollup(c *gin.Context) { parsedHash := gethcommon.HexToHash(hash) rollup, err := w.backend.GetRollupByHash(parsedHash) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getRollup request %w", err), w.logger) return } @@ -267,7 +284,7 @@ func (w *WebServer) getRollupBatches(c *gin.Context) { parsedHash := gethcommon.HexToHash(hash) batchListing, err := w.backend.GetRollupBatches(parsedHash) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getRollupBatches request %w", err), w.logger) return } @@ -279,7 +296,7 @@ func (w *WebServer) getBatchTransactions(c *gin.Context) { parsedHash := gethcommon.HexToHash(hash) txListing, err := w.backend.GetBatchTransactions(parsedHash) if err != nil { - errorHandler(c, fmt.Errorf("unable to execute request %w", err), w.logger) + errorHandler(c, fmt.Errorf("unable to execute getBatchTransactions request %w", err), w.logger) return } diff --git a/tools/tenscan/frontend/api/general.ts b/tools/tenscan/frontend/api/general.ts index 07c2aec2e3..a9e09941dc 100644 --- a/tools/tenscan/frontend/api/general.ts +++ b/tools/tenscan/frontend/api/general.ts @@ -7,8 +7,7 @@ export const fetchTestnetStatus = async (): Promise< ResponseDataInterface > => { return await httpRequest>({ - method: "post", + method: "get", url: pathToUrl(apiRoutes.getHealthStatus), - data: { jsonrpc: "2.0", method: "obscuro_health", params: [], id: 1 }, }); }; diff --git a/tools/walletextension/frontend/src/api/general.ts b/tools/walletextension/frontend/src/api/general.ts index 24a8afd877..8df163eba2 100644 --- a/tools/walletextension/frontend/src/api/general.ts +++ b/tools/walletextension/frontend/src/api/general.ts @@ -7,8 +7,7 @@ export const fetchTestnetStatus = async (): Promise< ResponseDataInterface > => { return await httpRequest>({ - method: "post", + method: "get", url: pathToUrl(apiRoutes.getHealthStatus), - data: { jsonrpc: "2.0", method: "obscuro_health", params: [], id: 1 }, }); }; diff --git a/tools/walletextension/rpcapi/filter_api.go b/tools/walletextension/rpcapi/filter_api.go index a0664ae58e..b7f049833c 100644 --- a/tools/walletextension/rpcapi/filter_api.go +++ b/tools/walletextension/rpcapi/filter_api.go @@ -4,6 +4,9 @@ import ( "context" "fmt" "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/log" subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription" @@ -19,12 +22,14 @@ import ( ) type FilterAPI struct { - we *Services + we *Services + logger log.Logger } func NewFilterAPI(we *Services) *FilterAPI { return &FilterAPI{ - we: we, + we: we, + logger: we.logger, } } @@ -70,9 +75,10 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp backendWSConnections := make([]*tenrpc.EncRPCClient, 0) inputChannels := make([]chan types.Log, 0) + errorChannels := make([]<-chan error, 0) backendSubscriptions := make([]*rpc.ClientSubscription, 0) for _, address := range candidateAddresses { - rpcWSClient, err := connectWS(user.accounts[*address], api.we.Logger()) + rpcWSClient, err := connectWS(ctx, user.accounts[*address], api.we.Logger()) if err != nil { return nil, err } @@ -86,40 +92,63 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp } inputChannels = append(inputChannels, inCh) + errorChannels = append(errorChannels, backendSubscription.Err()) backendSubscriptions = append(backendSubscriptions, backendSubscription) } dedupeBuffer := NewCircularBuffer(wecommon.DeduplicationBufferSize) subscription := subNotifier.CreateSubscription() - unsubscribed := atomic.Bool{} - go subscriptioncommon.ForwardFromChannels(inputChannels, &unsubscribed, func(log types.Log) error { - uniqueLogKey := LogKey{ - BlockHash: log.BlockHash, - TxHash: log.TxHash, - Index: log.Index, - } + unsubscribedByClient := atomic.Bool{} + unsubscribedByBackend := atomic.Bool{} + go subscriptioncommon.ForwardFromChannels( + inputChannels, + func(log types.Log) error { + uniqueLogKey := LogKey{ + BlockHash: log.BlockHash, + TxHash: log.TxHash, + Index: log.Index, + } - if !dedupeBuffer.Contains(uniqueLogKey) { - dedupeBuffer.Push(uniqueLogKey) - return subNotifier.Notify(subscription.ID, log) - } - return nil + if !dedupeBuffer.Contains(uniqueLogKey) { + dedupeBuffer.Push(uniqueLogKey) + return subNotifier.Notify(subscription.ID, log) + } + return nil + }, + func() { + // release resources + api.closeConnections(backendSubscriptions, backendWSConnections) + }, // todo - we can implement reconnect logic here + &unsubscribedByBackend, + &unsubscribedByClient, + 12*time.Hour, + api.logger, + ) + + // handles any of the backend connections being closed + go subscriptioncommon.HandleUnsubscribeErrChan(errorChannels, func() { + unsubscribedByBackend.Store(true) }) - go subscriptioncommon.HandleUnsubscribe(subscription, &unsubscribed, func() { - for _, backendSub := range backendSubscriptions { - backendSub.Unsubscribe() - } - for _, connection := range backendWSConnections { - _ = returnConn(api.we.rpcWSConnPool, connection.BackingClient()) - } - unsubscribed.Store(true) + // handles "unsubscribe" from the user + go subscriptioncommon.HandleUnsubscribe(subscription, func() { + unsubscribedByClient.Store(true) + api.closeConnections(backendSubscriptions, backendWSConnections) }) return subscription, err } +func (api *FilterAPI) closeConnections(backendSubscriptions []*rpc.ClientSubscription, backendWSConnections []*tenrpc.EncRPCClient) { + for _, backendSub := range backendSubscriptions { + backendSub.Unsubscribe() + } + for _, connection := range backendWSConnections { + _ = returnConn(api.we.rpcWSConnPool, connection.BackingClient(), api.logger) + } +} + func getUserAndNotifier(ctx context.Context, api *FilterAPI) (*rpc.Notifier, *GWUser, error) { subNotifier, supported := rpc.NotifierFromContext(ctx) if !supported { diff --git a/tools/walletextension/rpcapi/utils.go b/tools/walletextension/rpcapi/utils.go index 357e38ca18..9dbca7148a 100644 --- a/tools/walletextension/rpcapi/utils.go +++ b/tools/walletextension/rpcapi/utils.go @@ -8,6 +8,11 @@ import ( "fmt" "time" + "github.com/ten-protocol/go-ten/go/common/measure" + "github.com/ten-protocol/go-ten/go/enclave/core" + + "github.com/ten-protocol/go-ten/go/common/log" + gethlog "github.com/ethereum/go-ethereum/log" pool "github.com/jolestar/go-commons-pool/v2" tenrpc "github.com/ten-protocol/go-ten/go/rpc" @@ -72,7 +77,7 @@ func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *Services, cfg *Cac cacheArgs = append(cacheArgs, args...) res, err := withCache(w.Cache, cfg, generateCacheKey(cacheArgs), func() (*R, error) { - return withPlainRPCConnection(w, func(client *rpc.Client) (*R, error) { + return withPlainRPCConnection(ctx, w, func(client *rpc.Client) (*R, error) { var resp *R var err error @@ -117,7 +122,7 @@ func ExecAuthRPC[R any](ctx context.Context, w *Services, cfg *ExecCfg, method s var rpcErr error for i := range candidateAccts { acct := candidateAccts[i] - result, err := withEncRPCConnection(w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) { + result, err := withEncRPCConnection(ctx, w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) { var result *R adjustedArgs := args if cfg.adjustArgs != nil { @@ -272,42 +277,48 @@ func cacheBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy { return LatestBatch } -func connectWS(account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { - return conn(account.user.services.rpcWSConnPool, account, logger) +func connectWS(ctx context.Context, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { + return conn(ctx, account.user.services.rpcWSConnPool, account, logger) } -func conn(p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { - connectionObj, err := p.BorrowObject(context.Background()) +func conn(ctx context.Context, p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { + defer core.LogMethodDuration(logger, measure.NewStopwatch(), "get rpc connection") + connectionObj, err := p.BorrowObject(ctx) if err != nil { return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err) } conn := connectionObj.(*rpc.Client) encClient, err := wecommon.CreateEncClient(conn, account.address.Bytes(), account.user.userKey, account.signature, account.signatureType, logger) if err != nil { + _ = returnConn(p, conn, logger) return nil, fmt.Errorf("error creating new client, %w", err) } return encClient, nil } -func returnConn(p *pool.ObjectPool, conn tenrpc.Client) error { - return p.ReturnObject(context.Background(), conn) +func returnConn(p *pool.ObjectPool, conn tenrpc.Client, logger gethlog.Logger) error { + err := p.ReturnObject(context.Background(), conn) + if err != nil { + logger.Error("Error returning connection to pool", log.ErrKey, err) + } + return err } -func withEncRPCConnection[R any](w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) { - rpcClient, err := conn(acct.user.services.rpcHTTPConnPool, acct, w.logger) +func withEncRPCConnection[R any](ctx context.Context, w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) { + rpcClient, err := conn(ctx, acct.user.services.rpcHTTPConnPool, acct, w.logger) if err != nil { return nil, fmt.Errorf("could not connect to backed. Cause: %w", err) } - defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient()) + defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient(), w.logger) return execute(rpcClient) } -func withPlainRPCConnection[R any](w *Services, execute func(client *rpc.Client) (*R, error)) (*R, error) { - connectionObj, err := w.rpcHTTPConnPool.BorrowObject(context.Background()) +func withPlainRPCConnection[R any](ctx context.Context, w *Services, execute func(client *rpc.Client) (*R, error)) (*R, error) { + connectionObj, err := w.rpcHTTPConnPool.BorrowObject(ctx) if err != nil { return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err) } rpcClient := connectionObj.(*rpc.Client) - defer returnConn(w.rpcHTTPConnPool, rpcClient) + defer returnConn(w.rpcHTTPConnPool, rpcClient, w.logger) return execute(rpcClient) } diff --git a/tools/walletextension/rpcapi/wallet_extension.go b/tools/walletextension/rpcapi/wallet_extension.go index eb967d6d8e..0d2b742791 100644 --- a/tools/walletextension/rpcapi/wallet_extension.go +++ b/tools/walletextension/rpcapi/wallet_extension.go @@ -45,11 +45,10 @@ type Services struct { version string Cache cache.Cache // the OG maintains a connection pool of rpc connections to underlying nodes - rpcHTTPConnPool *pool.ObjectPool - rpcWSConnPool *pool.ObjectPool - Config *common.Config - backendNewHeadsSubscription *gethrpc.ClientSubscription - NewHeadsService *subscriptioncommon.NewHeadsService + rpcHTTPConnPool *pool.ObjectPool + rpcWSConnPool *pool.ObjectPool + Config *common.Config + NewHeadsService *subscriptioncommon.NewHeadsService } type NewHeadNotifier interface { @@ -91,7 +90,7 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage }, nil, nil, nil) cfg := pool.NewDefaultPoolConfig() - cfg.MaxTotal = 100 // todo - what is the right number + cfg.MaxTotal = 200 // todo - what is the right number services := Services{ HostAddrHTTP: hostAddrHTTP, @@ -107,42 +106,50 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage Config: config, } - connectionObj, err := services.rpcWSConnPool.BorrowObject(context.Background()) - if err != nil { - panic(fmt.Errorf("cannot fetch rpc connection to backend node %w", err)) - } - - rpcClient := connectionObj.(rpc.Client) - ch := make(chan *tencommon.BatchHeader) - clientSubscription, err := subscribeToNewHeadsWithRetry(rpcClient, ch, retry.NewTimeoutStrategy(10*time.Minute, 1*time.Second), logger) - if err != nil { - panic(fmt.Errorf("cannot subscribe to new heads to the backend %w", err)) - } - services.backendNewHeadsSubscription = clientSubscription - services.NewHeadsService = subscriptioncommon.NewNewHeadsService(ch, true, logger, func(newHead *tencommon.BatchHeader) error { - services.Cache.EvictShortLiving() - return nil - }) + services.NewHeadsService = subscriptioncommon.NewNewHeadsService( + func() (chan *tencommon.BatchHeader, <-chan error, error) { + logger.Info("Connecting to new heads service...") + // clear the cache to avoid returning stale data during reconnecting. + services.Cache.EvictShortLiving() + ch := make(chan *tencommon.BatchHeader) + errCh, err := subscribeToNewHeadsWithRetry(ch, services, logger) + logger.Info("Connected to new heads service.", log.ErrKey, err) + return ch, errCh, err + }, + true, + logger, + func(newHead *tencommon.BatchHeader) error { + services.Cache.EvictShortLiving() + return nil + }) return &services } -func subscribeToNewHeadsWithRetry(rpcClient rpc.Client, ch chan *tencommon.BatchHeader, retryStrategy retry.Strategy, logger gethlog.Logger) (*gethrpc.ClientSubscription, error) { +func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services Services, logger gethlog.Logger) (<-chan error, error) { var sub *gethrpc.ClientSubscription - - err := retry.Do(func() error { - var err error - sub, err = rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads) - if err != nil { - logger.Info("could not subscribe for new head blocks", log.ErrKey, err) - } - return err - }, retryStrategy) + err := retry.Do( + func() error { + connectionObj, err := services.rpcWSConnPool.BorrowObject(context.Background()) + if err != nil { + return fmt.Errorf("cannot fetch rpc connection to backend node %w", err) + } + rpcClient := connectionObj.(rpc.Client) + sub, err = rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads) + if err != nil { + logger.Info("could not subscribe for new head blocks", log.ErrKey, err) + _ = returnConn(services.rpcWSConnPool, rpcClient, logger) + } + return err + }, + retry.NewTimeoutStrategy(10*time.Minute, 1*time.Second), + ) if err != nil { logger.Error("could not subscribe for new head blocks.", log.ErrKey, err) + return nil, fmt.Errorf("cannot subscribe to new heads to the backend %w", err) } - return sub, err + return sub.Err(), nil } // IsStopping returns whether the WE is stopping @@ -264,7 +271,7 @@ func (w *Services) Version() string { } func (w *Services) GetTenNodeHealthStatus() (bool, error) { - res, err := withPlainRPCConnection[bool](w, func(client *gethrpc.Client) (*bool, error) { + res, err := withPlainRPCConnection[bool](context.Background(), w, func(client *gethrpc.Client) (*bool, error) { res, err := obsclient.NewObsClient(client).Health() return &res, err }) @@ -288,7 +295,6 @@ func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlic } func (w *Services) Stop() { - w.backendNewHeadsSubscription.Unsubscribe() w.rpcHTTPConnPool.Close(context.Background()) w.rpcWSConnPool.Close(context.Background()) }