Skip to content

Commit

Permalink
fix: append instead of upsert, updated_at timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 7, 2023
1 parent d737c94 commit 1ff80fe
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
4 changes: 3 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed to get slot info: %w", err)
}

a.CatalogMirrorMonitor.UpsertSlotSizeInfo(ctx, input.FlowConnectionConfigs.Source.Name, slotInfo[0])
if len(slotInfo) == 1 {
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, input.FlowConnectionConfigs.Source.Name, slotInfo[0])
}

hasRecords := !recordBatch.WaitAndCheckEmpty()
log.WithFields(log.Fields{
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU
return nil
}

func (c *CatalogMirrorMonitor) UpsertSlotSizeInfo(
func (c *CatalogMirrorMonitor) AppendSlotSizeInfo(
ctx context.Context,
peerName string,
slotInfo *protos.SlotInfo,
Expand All @@ -256,9 +256,9 @@ func (c *CatalogMirrorMonitor) UpsertSlotSizeInfo(
}

_, err := c.catalogConn.Exec(ctx,
"INSERT INTO peerdb_stats.peer_slot_size(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size) "+
"VALUES($1,$2,$3,$4,$5,$6) "+
"ON CONFLICT (slot_name, peer_name) DO UPDATE SET restart_lsn = $3, redo_lsn = $4, confirmed_flush_lsn = $5, slot_size = $6",
"INSERT INTO peerdb_stats.peer_slot_size"+
"(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size) "+
"VALUES($1,$2,$3,$4,$5,$6);",
peerName,
slotInfo.SlotName,
slotInfo.RestartLSN,
Expand Down
3 changes: 1 addition & 2 deletions nexus/catalog/migrations/V12__slot_size.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ CREATE TABLE IF NOT EXISTS peerdb_stats.peer_slot_size (
restart_lsn TEXT,
confirmed_flush_lsn TEXT,
slot_size BIGINT,
updated_at
PRIMARY KEY (slot_name, peer_name)
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);

0 comments on commit 1ff80fe

Please sign in to comment.