Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more information logs for UI logs table #2220

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@
) (*protos.SetupNormalizedTableBatchOutput, error) {
logger := activity.GetLogger(ctx)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
a.Alerter.LogFlowInfo(ctx, config.FlowName, "Setting up destination tables")
conn, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
Expand Down Expand Up @@ -247,6 +248,7 @@
})
defer shutdown()

a.Alerter.LogFlowInfo(ctx, config.FlowName, "Setting up destination tables")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have this twice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think it's meant to say conn.StartupSetupNormalizedTables has created tables, beginning ingestion. Wording needs to be updated

tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping))
for tableIdentifier, tableSchema := range tableNameSchemaMapping {
existing, err := conn.SetupNormalizedTable(
Expand All @@ -265,15 +267,19 @@
numTablesSetup.Add(1)
if !existing {
logger.Info("created table " + tableIdentifier)
a.Alerter.LogFlowInfo(ctx, config.FlowName, "created table "+tableIdentifier+" in destination")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be too noisy for a large number of tables

} else {
logger.Info("table already exists " + tableIdentifier)
}

}

Check failure on line 275 in flow/activities/flowable.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

if err := conn.FinishSetupNormalizedTables(ctx, tx); err != nil {
return nil, fmt.Errorf("failed to commit normalized tables tx: %w", err)
}

a.Alerter.LogFlowInfo(ctx, config.FlowName, "All destination tables have been setup")

return &protos.SetupNormalizedTableBatchOutput{
TableExistsMapping: tableExistsMapping,
}, nil
Expand Down Expand Up @@ -509,6 +515,8 @@
}
}

a.Alerter.LogFlowInfo(ctx, config.FlowJobName, fmt.Sprintf("obtained partitions for table %s", config.WatermarkTable))

Check failure on line 518 in flow/activities/flowable.go

View workflow job for this annotation

GitHub Actions / lint

fmt.Sprintf can be replaced with string concatenation (perfsprint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just log instead of sending an alert? (too noisy)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not alerts. This is for our logs table in UI
I think LogFlowEvent is for alerts ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, but still do we want to do it for all tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah because we have scarcity of user facing info during initial load - common feedback from customers cc @iskakaushik


return &protos.QRepParitionResult{
Partitions: partitions,
}, nil
Expand Down Expand Up @@ -576,6 +584,7 @@
}
}

a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "replicated all rows to destination for table "+config.DestinationTableIdentifier)
return nil
}

Expand Down Expand Up @@ -637,6 +646,8 @@
return pullCleanupErr
}

a.Alerter.LogFlowInfo(ctx, req.FlowJobName, "Cleaned up source peer replication objects. Any replication slot or publication created by PeerDB has been removed.")

return nil
}

Expand All @@ -656,6 +667,8 @@
return syncFlowCleanupErr
}

a.Alerter.LogFlowInfo(ctx, req.FlowJobName, "Cleaned up destination peer replication objects. Any PeerDB metadata storage has been dropped.")

return nil
}

Expand Down Expand Up @@ -900,6 +913,8 @@
}
}

a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Resync completed for all tables")

return renameOutput, tx.Commit(ctx)
}

Expand Down Expand Up @@ -973,6 +988,9 @@
if err != nil {
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
}

a.Alerter.LogFlowInfo(ctx, cfg.FlowJobName, fmt.Sprintf("ensured %d tables exist in publication %s",
len(additionalTableMappings), cfg.PublicationName))
return err
}

Expand All @@ -996,6 +1014,9 @@
if err != nil {
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
}

a.Alerter.LogFlowInfo(ctx, cfg.FlowJobName, fmt.Sprintf("removed %d tables from publication %s",
len(removedTablesMapping), cfg.PublicationName))
return err
}

Expand Down
7 changes: 6 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

numberOfSchemaChanges := len(recordBatchSync.SchemaDeltas)
if numberOfSchemaChanges > 0 {
a.Alerter.LogFlowInfo(ctx, flowName, fmt.Sprintf("synced %d schema changes from source to destination", numberOfSchemaChanges))
}

return &model.SyncCompositeResponse{
SyncResponse: &model.SyncResponse{
CurrentSyncBatchID: -1,
Expand Down Expand Up @@ -321,7 +326,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
}

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
pushedRecordsWithCount := fmt.Sprintf("pushed %d records into intermediate storage", numRecords)
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)
a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount)

Expand Down
4 changes: 3 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (a *SnapshotActivity) SetupReplication(
) (*protos.SetupReplicationOutput, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Setting up replication slot and publication")
a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for alerting, we need this


conn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, nil, a.CatalogPool, config.PeerName)
Expand Down Expand Up @@ -98,6 +98,8 @@ func (a *SnapshotActivity) SetupReplication(
connector: conn,
}

a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Replication slot and publication setup complete")

return &protos.SetupReplicationOutput{
SlotName: slotInfo.SlotName,
SnapshotName: slotInfo.SnapshotName,
Expand Down
Loading