From 48f4b67b68ab800f1fb1314f0b01ad623387c1a7 Mon Sep 17 00:00:00 2001 From: Charles Billette Date: Wed, 4 Sep 2024 08:12:36 -0400 Subject: [PATCH] transactions cache --- data/psql.go | 33 ++++++++++++++++++++------------- data/sinker.go | 1 + 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/data/psql.go b/data/psql.go index a62c05d..577fbfa 100644 --- a/data/psql.go +++ b/data/psql.go @@ -14,9 +14,10 @@ import ( ) type Psql struct { - db *sql.DB - tx *sql.Tx - logger *zap.Logger + db *sql.DB + tx *sql.Tx + logger *zap.Logger + TransactionIDs map[string]int64 } type PsqlInfo struct { @@ -66,17 +67,22 @@ func (p *Psql) HandleClock(clock *pbsubstreams.Clock) (dbBlockID int64, err erro } func (p *Psql) handleTransaction(dbBlockID int64, transactionHash string) (dbTransactionID int64, err error) { - //todo: create a transaction cache - rows, err := p.tx.Query("SELECT id FROM hivemapper.transactions WHERE hash = $1", transactionHash) - p.logger.Debug("handling transaction", zap.String("trx_hash", transactionHash)) - if err != nil { - return 0, fmt.Errorf("selecting transaction: %w", err) - } - defer rows.Close() - if rows.Next() { - err = rows.Scan(&dbTransactionID) - return + ////todo: create a transaction cache + //rows, err := p.tx.Query("SELECT id FROM hivemapper.transactions WHERE hash = $1", transactionHash) + //p.logger.Debug("handling transaction", zap.String("trx_hash", transactionHash)) + //if err != nil { + // return 0, fmt.Errorf("selecting transaction: %w", err) + //} + //defer rows.Close() + // + //if rows.Next() { + // err = rows.Scan(&dbTransactionID) + // return + //} + + if id, found := p.TransactionIDs[transactionHash]; found { + return id, nil } row := p.tx.QueryRow("INSERT INTO hivemapper.transactions (hash, block_id) VALUES ($1, $2) RETURNING id", transactionHash, dbBlockID) @@ -85,6 +91,7 @@ func (p *Psql) handleTransaction(dbBlockID int64, transactionHash string) (dbTra return 0, fmt.Errorf("inserting transaction: %w", err) } + p.TransactionIDs[transactionHash] = dbTransactionID err = row.Scan(&dbTransactionID) return } diff --git a/data/sinker.go b/data/sinker.go index 674cf81..47d8eba 100644 --- a/data/sinker.go +++ b/data/sinker.go @@ -65,6 +65,7 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp s.blockSecCount++ s.lastClock = data.Clock hasTransaction := false + s.db.TransactionIDs = map[string]int64{} defer func() { s.Sinker.AverageBlockTimeProcessing().Add(time.Since(startTime).Milliseconds())