Skip to content

Commit

Permalink
refine logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 28, 2023
1 parent 7c1d7a7 commit a1896a8
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 30 deletions.
30 changes: 23 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ type FlowableActivity struct {
func (a *FlowableActivity) CheckConnection(
ctx context.Context,
config *protos.Peer,
flowName string,
) (*CheckConnectionResult, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -65,14 +67,14 @@ func (a *FlowableActivity) CheckConnection(
}

// SetupMetadataTables implements SetupMetadataTables.
func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.Peer) error {
func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.Peer, flowName string) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config)
if err != nil {
return fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
if err := dstConn.SetupMetadataTables(); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
Expand All @@ -86,6 +88,7 @@ func (a *FlowableActivity) GetLastSyncedID(
ctx context.Context,
config *protos.GetLastSyncedIDInput,
) (*protos.LastSyncState, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -105,6 +108,7 @@ func (a *FlowableActivity) EnsurePullability(
ctx context.Context,
config *protos.EnsurePullabilityBatchInput,
) (*protos.EnsurePullabilityBatchOutput, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -125,6 +129,7 @@ func (a *FlowableActivity) CreateRawTable(
ctx context.Context,
config *protos.CreateRawTableInput,
) (*protos.CreateRawTableOutput, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -147,7 +152,9 @@ func (a *FlowableActivity) CreateRawTable(
func (a *FlowableActivity) GetTableSchema(
ctx context.Context,
config *protos.GetTableSchemaBatchInput,
flowName string,
) (*protos.GetTableSchemaBatchOutput, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
srcConn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -161,7 +168,9 @@ func (a *FlowableActivity) GetTableSchema(
func (a *FlowableActivity) CreateNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
flowName string,
) (*protos.SetupNormalizedTableBatchOutput, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
conn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -170,7 +179,6 @@ func (a *FlowableActivity) CreateNormalizedTable(

setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config)
if err != nil {
flowName, _ := ctx.Value(shared.FlowNameKey).(string)
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
}
Expand All @@ -181,7 +189,7 @@ func (a *FlowableActivity) CreateNormalizedTable(
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput,
) (*model.SyncResponse, error) {
slog.InfoContext(ctx, "starting flow...")
ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowConnectionConfigs.FlowJobName)
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down Expand Up @@ -364,7 +372,7 @@ func (a *FlowableActivity) StartNormalize(
input *protos.StartNormalizeInput,
) (*model.NormalizeResponse, error) {
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.FlowNameKey, conn.FlowJobName)
dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down Expand Up @@ -476,6 +484,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
last *protos.QRepPartition,
runUUID string,
) (*protos.QRepParitionResult, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return nil, fmt.Errorf("failed to get qrep pull connector: %w", err)
Expand Down Expand Up @@ -519,6 +528,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
partitions *protos.QRepPartitionBatch,
runUUID string,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID)
if err != nil {
return fmt.Errorf("failed to update start time for qrep run: %w", err)
Expand Down Expand Up @@ -549,6 +559,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now())
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -702,6 +713,7 @@ func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.Q
}

func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.ShutdownRequest) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetCDCPullConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get source connector: %w", err)
Expand Down Expand Up @@ -759,7 +771,7 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
if !peerdbenv.PeerDBEnableWALHeartbeat() {
slog.InfoContext(ctx, "wal heartbeat is disabled")
slog.Info("wal heartbeat is disabled")
return nil
}

Expand All @@ -770,7 +782,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
for {
select {
case <-ctx.Done():
slog.InfoContext(ctx, "context is done, exiting wal heartbeat send loop")
slog.Info("context is done, exiting wal heartbeat send loop")
return nil
case <-ticker.C:
pgPeers, err := a.getPostgresPeerConfigs(ctx)
Expand Down Expand Up @@ -817,6 +829,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
config *protos.QRepConfig, last *protos.QRepPartition,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil {
return nil
}
Expand Down Expand Up @@ -856,6 +869,7 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (
*protos.RenameTablesOutput, error,
) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -884,6 +898,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error,
) {
ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, req.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand Down Expand Up @@ -912,6 +927,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) (int64, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
startTime := time.Now()
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

Expand All @@ -34,6 +35,7 @@ func (a *SnapshotActivity) SetupReplication(
ctx context.Context,
config *protos.SetupReplicationInput,
) (*protos.SetupReplicationOutput, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dbType := config.PeerConnectionConfig.Type
if dbType != protos.DBType_POSTGRES {
slog.InfoContext(ctx, fmt.Sprintf("setup replication is no-op for %s", dbType))
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (h *FlowRequestHandler) ShutdownFlow(
req *protos.ShutdownRequest,
) (*protos.ShutdownResponse, error) {
logs := slog.Group("shutdown-log",
slog.String("flowName", req.FlowJobName),
slog.String(string(shared.FlowNameKey), req.FlowJobName),
slog.String("workflowId", req.WorkflowId),
)
err := h.temporalClient.SignalWorkflow(
Expand Down Expand Up @@ -417,7 +417,7 @@ func (h *FlowRequestHandler) ShutdownFlow(
delErr := h.removeFlowEntryInCatalog(req.FlowJobName)
if delErr != nil {
slog.Error("unable to remove flow job entry",
slog.String("flowName", req.FlowJobName),
slog.String(string(shared.FlowNameKey), req.FlowJobName),
slog.Any("error", err),
slog.String("workflowId", req.WorkflowId))
return &protos.ShutdownResponse{
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
&datasetTable{
dataset: s.connector.datasetID,
table: stagingTable,
}, stream)
}, stream, flowJobName)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %v", err)
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
strings.ReplaceAll(partition.PartitionId, "-", "_")),
}
numRecords, err := s.writeToStage(partition.PartitionId, flowJobName, avroSchema,
stagingDatasetTable, stream)
stagingDatasetTable, stream, flowJobName)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %v", err)
}
Expand Down Expand Up @@ -340,6 +340,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
avroSchema *model.QRecordAvroSchemaDefinition,
stagingTable *datasetTable,
stream *model.QRecordStream,
flowName string,
) (int, error) {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, time.Minute,
func() string {
Expand All @@ -355,6 +356,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
avro.CompressNone, qvalue.QDWHTypeBigQuery)
idLog := slog.Group("write-metadata",
slog.String(string(shared.FlowNameKey), flowName),
slog.String("batchOrPartitionID", syncID),
)
if s.gcsBucket != "" {
Expand Down Expand Up @@ -426,7 +428,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
if err := status.Err(); err != nil {
return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err)
}
slog.Info(fmt.Sprintf("Pushed from %s to BigQuery", avroFile.FilePath))
slog.Info(fmt.Sprintf("Pushed from %s to BigQuery", avroFile.FilePath), idLog)

err = s.connector.waitForTableReady(stagingTable)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ func (c *PostgresConnector) GetTableSchema(
}
res[tableName] = tableSchema
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName))
c.logger.Info(fmt.Sprintf("fetched schema for table %s", tableName))
}

return &protos.GetTableSchemaBatchOutput{
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error {
if c.pebbleDB == nil {
slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk",
c.numRecordsSwitchThreshold),
slog.String("flowName", c.flowJobName))
slog.String(string(shared.FlowNameKey), c.flowJobName))
err := c.initPebbleDB()
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ func CDCFlowWorkflowWithConfig(
return nil, fmt.Errorf("invalid connection configs")
}

ctx = workflow.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
w := NewCDCFlowWorkflowExecution(ctx)

if limits.TotalSyncFlows == 0 {
Expand Down Expand Up @@ -402,7 +401,7 @@ func CDCFlowWorkflowWithConfig(
&protos.GetTableSchemaBatchInput{
PeerConnectionConfig: cfg.Source,
TableIdentifiers: modifiedSrcTables,
})
}, cfg.FlowJobName)

var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
TableIdentifiers: []string{q.config.WatermarkTable},
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput, q.config.FlowJobName)

var tblSchemaOutput *protos.GetTableSchemaBatchOutput
if err := future.Get(ctx, &tblSchemaOutput); err != nil {
Expand All @@ -128,7 +128,7 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
SyncedAtColName: q.config.SyncedAtColName,
}

future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig)
future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig, q.config.FlowJobName)
var createNormalizedTablesOutput *protos.SetupNormalizedTableBatchOutput
if err := future.Get(ctx, &createNormalizedTablesOutput); err != nil {
q.logger.Error("failed to create watermark table: ", err)
Expand Down
8 changes: 4 additions & 4 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables(
})

// first check the source peer connection
srcConnStatusFuture := workflow.ExecuteActivity(ctx, flowable.CheckConnection, config.Source)
srcConnStatusFuture := workflow.ExecuteActivity(ctx, flowable.CheckConnection, config.Source, config.FlowJobName)
var srcConnStatus activities.CheckConnectionResult
if err := srcConnStatusFuture.Get(ctx, &srcConnStatus); err != nil {
return fmt.Errorf("failed to check source peer connection: %w", err)
Expand All @@ -80,7 +80,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables(

// then setup the destination peer metadata tables
if destConnStatus.NeedsSetupMetadataTables {
fDst := workflow.ExecuteActivity(ctx, flowable.SetupMetadataTables, config.Destination)
fDst := workflow.ExecuteActivity(ctx, flowable.SetupMetadataTables, config.Destination, config.FlowJobName)
if err := fDst.Get(ctx, nil); err != nil {
return fmt.Errorf("failed to setup destination peer metadata tables: %w", err)
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
TableIdentifiers: sourceTables,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput, s.CDCFlowName)

var tblSchemaOutput *protos.GetTableSchemaBatchOutput
if err := future.Get(ctx, &tblSchemaOutput); err != nil {
Expand Down Expand Up @@ -227,7 +227,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
SyncedAtColName: flowConnectionConfigs.SyncedAtColName,
}

future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig)
future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig, flowConnectionConfigs.FlowJobName)
var createNormalizedTablesOutput *protos.SetupNormalizedTableBatchOutput
if err := future.Get(ctx, &createNormalizedTablesOutput); err != nil {
s.logger.Error("failed to create normalized tables: ", err)
Expand Down
1 change: 0 additions & 1 deletion flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon
}

replCtx := ctx
replCtx = workflow.WithValue(replCtx, shared.FlowNameKey, config.FlowJobName)
if config.DoInitialCopy {
sessionOpts := &workflow.SessionOptions{
CreationTimeout: 5 * time.Minute,
Expand Down
6 changes: 0 additions & 6 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package peerflow

import (
"fmt"
"log/slog"
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
)
Expand Down Expand Up @@ -69,8 +67,6 @@ func (s *SyncFlowExecution) executeSyncFlow(
HeartbeatTimeout: 30 * time.Second,
})

startFlowCtx = workflow.WithValue(startFlowCtx, shared.FlowNameKey, s.CDCFlowName)

// execute StartFlow on the peers to start the flow
startFlowInput := &protos.StartFlowInput{
FlowConnectionConfigs: config,
Expand Down Expand Up @@ -113,7 +109,5 @@ func SyncFlowWorkflow(ctx workflow.Context,
CDCFlowName: config.FlowJobName,
Progress: []string{},
})
flowName, _ := ctx.Value(shared.FlowNameKey).(string)
slog.Info("Context obtained flow name in syncflowworkflow: " + flowName)
return s.executeSyncFlow(ctx, config, options, options.RelationMessageMapping)
}
4 changes: 2 additions & 2 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (q *XminFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
TableIdentifiers: []string{q.config.WatermarkTable},
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput, q.config.FlowJobName)

var tblSchemaOutput *protos.GetTableSchemaBatchOutput
if err := future.Get(ctx, &tblSchemaOutput); err != nil {
Expand All @@ -81,7 +81,7 @@ func (q *XminFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
},
}

future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig)
future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig, q.config.FlowJobName)
var createNormalizedTablesOutput *protos.SetupNormalizedTableBatchOutput
if err := future.Get(ctx, &createNormalizedTablesOutput); err != nil {
q.logger.Error("failed to create watermark table: ", err)
Expand Down

0 comments on commit a1896a8

Please sign in to comment.