Skip to content

Commit

Permalink
add logging to track flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
colindickson committed Nov 24, 2023
1 parent 85b0c25 commit 55e44fd
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 0 deletions.
1 change: 1 addition & 0 deletions db/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (l *Loader) InsertCursor(ctx context.Context, moduleHash string, c *sink.Cu
// ErrCursorNotFound. If the update was not successful on the database, returns an error.
// You can use tx=nil to run the query outside of a transaction.
func (l *Loader) UpdateCursor(ctx context.Context, tx Tx, moduleHash string, c *sink.Cursor) error {
l.logger.Debug("updating cursor", zap.String("module_hash", moduleHash), zap.Stringer("cursor", c))
_, err := l.runModifiyQuery(ctx, tx, "update", l.getDialect().GetUpdateCursorQuery(
l.cursorTable.identifier, moduleHash, c, c.Block().Num(), c.Block().ID(),
))
Expand Down
2 changes: 2 additions & 0 deletions sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
}

if data.Clock.Number%s.batchBlockModulo(data, isLive) == 0 {
s.logger.Debug("flushing to database", zap.Stringer("block", cursor.Block()), zap.Bool("is_live", *isLive))

flushStart := time.Now()
rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, data.FinalBlockHeight)
if err != nil {
Expand Down

0 comments on commit 55e44fd

Please sign in to comment.