Skip to content

Commit

Permalink
fix RecordDurationFetchPrevValueFetch stat
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Apr 4, 2024
1 parent 9cbe79e commit 4536c90
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
25 changes: 13 additions & 12 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"math"
"time"

"github.com/streamingfast/substreams-sink-kv/sinker"

"github.com/streamingfast/bstream"
"github.com/streamingfast/kvdb/store"
"github.com/streamingfast/logging"
Expand Down Expand Up @@ -80,26 +78,28 @@ func (db *OperationDB) AddOperation(op *pbkv.KVOperation) {
db.pendingOperations[op.Key] = op
}

func (db *OperationDB) HandleOperations(ctx context.Context, blockNumber uint64, finalBlockHeight uint64, step bstream.StepType, kvOps *pbkv.KVOperations, stats *sinker.Stats) error {
func (db *OperationDB) HandleOperations(ctx context.Context, blockNumber uint64, finalBlockHeight uint64, step bstream.StepType, kvOps *pbkv.KVOperations) (time.Duration, error) {
prevValueFetchDuration := time.Duration(0)
if step == bstream.StepNew {
err := db.PurgeUndoOperations(ctx, finalBlockHeight)
if err != nil {
return fmt.Errorf("deleting LIB undo operations: %w", err)
return 0, fmt.Errorf("deleting LIB undo operations: %w", err)
}

undoOperations, err := db.GenerateUndoOperations(ctx, kvOps.Operations, stats)
undoOperations, fetchDuration, err := db.GenerateUndoOperations(ctx, kvOps.Operations)
prevValueFetchDuration = fetchDuration
if err != nil {
return fmt.Errorf("generating reverse operations: %w", err)
return 0, fmt.Errorf("generating reverse operations: %w", err)
}

err = db.AddUndosOperations(ctx, blockNumber, undoOperations)
if err != nil {
return fmt.Errorf("storing reverse operations: %w", err)
return 0, fmt.Errorf("storing reverse operations: %w", err)
}
}

db.AddOperations(kvOps)
return nil
return prevValueFetchDuration, nil
}

func (db *OperationDB) Flush(ctx context.Context, cursor *sink.Cursor) (count int, err error) {
Expand Down Expand Up @@ -161,25 +161,26 @@ func (db *OperationDB) AddUndosOperations(ctx context.Context, blockNumber uint6
return nil
}

func (db *OperationDB) GenerateUndoOperations(ctx context.Context, ops []*pbkv.KVOperation, stats *sinker.Stats) (*pbkv.KVOperations, error) {
func (db *OperationDB) GenerateUndoOperations(ctx context.Context, ops []*pbkv.KVOperation) (*pbkv.KVOperations, time.Duration, error) {
fetchDuration := time.Duration(0)
var undoOperations []*pbkv.KVOperation
for _, op := range ops {
start := time.Now()
previousValue, err := db.store.Get(ctx, userKey(op.Key))
stats.RecordDurationFetchPrevValueFetch(time.Since(start))
fetchDuration = fetchDuration + time.Since(start)

previousKeyExists := true
if err != nil {
if !errors.Is(err, store.ErrNotFound) {
return nil, fmt.Errorf("getting previous value for key %s %T: %w", op.Key, err, err)
return nil, fetchDuration, fmt.Errorf("getting previous value for key %s %T: %w", op.Key, err, err)
}
previousKeyExists = false
}
undoOp := undoOperation(op, previousValue, previousKeyExists)
undoOperations = append([]*pbkv.KVOperation{undoOp}, undoOperations...)
}
reversedKVOperations := &pbkv.KVOperations{Operations: undoOperations}
return reversedKVOperations, nil
return reversedKVOperations, fetchDuration, nil
}

func undoOperation(op *pbkv.KVOperation, previousValue []byte, previousKeyExists bool) *pbkv.KVOperation {
Expand Down
4 changes: 2 additions & 2 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestDB_HandleOperations(t *testing.T) {
require.NoError(t, err)

for _, block := range c.blocks {
err = db.HandleOperations(ctx, block.blockNumber, block.finalBlockHeight, bstream.StepNew, block.operations, nil)
_, err = db.HandleOperations(ctx, block.blockNumber, block.finalBlockHeight, bstream.StepNew, block.operations)
require.NoError(t, err)
_, err = db.Flush(ctx, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestDB_HandleUndo(t *testing.T) {
require.NoError(t, err)

for _, block := range c.blocks {
err = db.HandleOperations(ctx, block.blockNumber, block.finalBlockHeight, bstream.StepNew, block.operations, nil)
_, err = db.HandleOperations(ctx, block.blockNumber, block.finalBlockHeight, bstream.StepNew, block.operations)
require.NoError(t, err)
_, err = db.Flush(ctx, nil)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ func (s *KVSinker) handleBlockScopedData(ctx context.Context, data *pbsubstreams
return fmt.Errorf("unmarshal database changes: %w", err)
}

err = s.operationDB.HandleOperations(ctx, data.Clock.Number, data.FinalBlockHeight, cursor.Step, kvOps, s.stats)
prevValueFetchDuration, err := s.operationDB.HandleOperations(ctx, data.Clock.Number, data.FinalBlockHeight, cursor.Step, kvOps)
if err != nil {
return fmt.Errorf("handling operation: %w", err)
}
s.stats.RecordDurationFetchPrevValueFetch(prevValueFetchDuration)

BlockCount.Inc()
if s.shouldFlushKeys(cursor) {
Expand Down

0 comments on commit 4536c90

Please sign in to comment.