From 55e44fd3986b8670a306fcf45dc326fd436cd39e Mon Sep 17 00:00:00 2001 From: colindickson Date: Thu, 23 Nov 2023 22:05:34 -0500 Subject: [PATCH] add logging to track flushing --- db/cursor.go | 1 + sinker/sinker.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/db/cursor.go b/db/cursor.go index a946a8b..44a1a62 100644 --- a/db/cursor.go +++ b/db/cursor.go @@ -119,6 +119,7 @@ func (l *Loader) InsertCursor(ctx context.Context, moduleHash string, c *sink.Cu // ErrCursorNotFound. If the update was not successful on the database, returns an error. // You can use tx=nil to run the query outside of a transaction. func (l *Loader) UpdateCursor(ctx context.Context, tx Tx, moduleHash string, c *sink.Cursor) error { + l.logger.Debug("updating cursor", zap.String("module_hash", moduleHash), zap.Stringer("cursor", c)) _, err := l.runModifiyQuery(ctx, tx, "update", l.getDialect().GetUpdateCursorQuery( l.cursorTable.identifier, moduleHash, c, c.Block().Num(), c.Block().ID(), )) diff --git a/sinker/sinker.go b/sinker/sinker.go index f7da20b..24b0639 100644 --- a/sinker/sinker.go +++ b/sinker/sinker.go @@ -119,6 +119,8 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream } if data.Clock.Number%s.batchBlockModulo(data, isLive) == 0 { + s.logger.Debug("flushing to database", zap.Stringer("block", cursor.Block()), zap.Bool("is_live", *isLive)) + flushStart := time.Now() rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, data.FinalBlockHeight) if err != nil {