diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 43df1b9d8c..60b6400c75 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -106,7 +106,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( // drop the staging table if err := bqClient.DatasetInProject(s.connector.projectID, datasetID).Table(stagingTable).Delete(ctx); err != nil { // just log the error this isn't fatal. - slog.Error("failed to delete staging table "+stagingTable, + s.connector.logger.Warn("failed to delete staging table "+stagingTable, slog.Any("error", err), slog.Int64("syncBatchID", syncBatchID), slog.String("destinationTable", rawTableName)) @@ -220,7 +220,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( if err := bqClient.DatasetInProject(s.connector.projectID, stagingDatasetTable.dataset). Table(stagingDatasetTable.table).Delete(ctx); err != nil { // just log the error this isn't fatal. - slog.Error("failed to delete staging table "+stagingDatasetTable.string(), + s.connector.logger.Warn("failed to delete staging table "+stagingDatasetTable.string(), slog.Any("error", err), flowLog) } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index d3e4299bd9..49ba9151a0 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -16,6 +16,7 @@ import ( connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared/alerting" ) @@ -285,6 +286,6 @@ func CloseConnector(ctx context.Context, conn Connector) { err := conn.Close(ctx) if err != nil { - slog.Error("error closing connector", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Error("error closing connector", slog.Any("error", err)) } } diff --git a/flow/connectors/eventhub/hub_batches.go b/flow/connectors/eventhub/hub_batches.go index e96d0e6896..86cd1d52a5 100644 --- a/flow/connectors/eventhub/hub_batches.go +++ b/flow/connectors/eventhub/hub_batches.go @@ -10,6 +10,7 @@ import ( azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/shared" ) @@ -105,8 +106,8 @@ func (h *HubBatches) sendBatch( return err } - slog.InfoContext(ctx, "sendBatch", - slog.Int("events sent", int(events.NumEvents())), slog.String("event hub topic ", tblName.ToString())) + logger.LoggerFromCtx(ctx).Info("sendBatch", + slog.Int("events sent", int(events.NumEvents())), slog.String("event hub topic", tblName.ToString())) return nil } @@ -114,8 +115,9 @@ func (h *HubBatches) flushAllBatches( ctx context.Context, flowName string, ) error { + logger := logger.LoggerFromCtx(ctx) if h.Len() == 0 { - slog.Info("no events to send", slog.String(string(shared.FlowNameKey), flowName)) + logger.Info("no events to send", slog.String(string(shared.FlowNameKey), flowName)) return nil } @@ -132,7 +134,7 @@ func (h *HubBatches) flushAllBatches( } numEventsPushed.Add(numEvents) - slog.Info("flushAllBatches", + logger.Info("flushAllBatches", slog.String(string(shared.FlowNameKey), flowName), slog.Int("events sent", int(numEvents)), slog.String("event hub topic ", destination.ToString())) @@ -144,7 +146,7 @@ func (h *HubBatches) flushAllBatches( if err != nil { return fmt.Errorf("failed to flushAllBatches: %v", err) } - slog.Info("hub batches flush", + logger.Info("hub batches flush", slog.String(string(shared.FlowNameKey), flowName), slog.Int("events sent", int(numEventsPushed.Load()))) diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 0caa6106fa..4ae8c5d044 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -17,6 +17,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" ) type EventHubManager struct { @@ -65,12 +66,13 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE hubTmp := hub.(*azeventhubs.ProducerClient) _, err := hubTmp.GetEventHubProperties(ctx, nil) if err != nil { - slog.Info(fmt.Sprintf("eventhub %s", name)+ - "not reachable. Will re-establish connection and re-create it.", + logger := logger.LoggerFromCtx(ctx) + logger.Info( + fmt.Sprintf("eventhub %s not reachable. Will re-establish connection and re-create it.", name), slog.Any("error", err)) closeError := m.closeProducerClient(ctx, hubTmp) if closeError != nil { - slog.Error("failed to close producer client", slog.Any("error", closeError)) + logger.Error("failed to close producer client", slog.Any("error", closeError)) } m.hubs.Delete(name) hubConnectOK = false @@ -116,7 +118,7 @@ func (m *EventHubManager) Close(ctx context.Context) error { hub := value.(*azeventhubs.ProducerClient) err := m.closeProducerClient(ctx, hub) if err != nil { - slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err)) allErrors = errors.Join(allErrors, err) } numHubsClosed.Add(1) @@ -167,6 +169,7 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE partitionCount := int64(cfg.PartitionCount) retention := int64(cfg.MessageRetentionInDays) + logger := logger.LoggerFromCtx(ctx) if err != nil { opts := armeventhub.Eventhub{ Properties: &armeventhub.Properties{ @@ -181,9 +184,9 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE return err } - slog.Info("event hub created", slog.Any("name", name)) + logger.Info("event hub created", slog.Any("name", name)) } else { - slog.Info("event hub exists already", slog.Any("name", name)) + logger.Info("event hub exists already", slog.Any("name", name)) } return nil diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index aef4f81c47..89805659f4 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -20,6 +20,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils/cdc_records" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/geo" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -232,8 +233,9 @@ func (p *PostgresCDCSource) consumeStream( standbyMessageTimeout := req.IdleTimeout nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) + logger := logger.LoggerFromCtx(ctx) addRecordWithKey := func(key *model.TableWithPkey, rec model.Record) error { - err := cdcRecordsStorage.Set(key, rec) + err := cdcRecordsStorage.Set(logger, key, rec) if err != nil { return err } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 1b0fff15af..ef0169b1b3 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -93,7 +93,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) func (c *PostgresConnector) CreateReplConn(ctx context.Context) (*pgx.Conn, error) { conn, err := c.ssh.NewPostgresConnFromConfig(ctx, c.replConfig) if err != nil { - slog.Error("failed to create replication connection", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Error("failed to create replication connection", "error", err) return nil, fmt.Errorf("failed to create replication connection: %w", err) } @@ -914,22 +914,24 @@ func (c *PostgresConnector) HandleSlotInfo( slotName string, peerName string, ) error { + logger := logger.LoggerFromCtx(ctx) + // must create new connection because HandleSlotInfo is threadsafe conn, err := c.ssh.NewPostgresConnFromPostgresConfig(ctx, c.config) if err != nil { - slog.WarnContext(ctx, "warning: failed to connect to get slot info", slog.Any("error", err)) + logger.Warn("warning: failed to connect to get slot info", "error", err) return err } defer conn.Close(ctx) slotInfo, err := getSlotInfo(ctx, conn, slotName, c.config.Database) if err != nil { - slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err)) + logger.Warn("warning: failed to get slot info", "error", err) return err } if len(slotInfo) == 0 { - slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName)) + logger.Warn("warning: unable to get slot info", "slotName", slotName) return nil } @@ -950,7 +952,7 @@ cc: `, maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) res, err := getOpenConnectionsForUser(ctx, conn, c.config.User) if err != nil { - slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err)) + logger.Warn("warning: failed to get current open connections", "error", err) return err } if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) { diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index d07c043c3b..8a01d90cbe 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -12,6 +12,7 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" + "go.temporal.io/sdk/log" "github.com/PeerDB-io/peer-flow/connectors/utils" partition_utils "github.com/PeerDB-io/peer-flow/connectors/utils/partition" @@ -290,7 +291,7 @@ func (c *PostgresConnector) CheckForUpdatedMaxValue( defer func() { deferErr := tx.Rollback(ctx) if deferErr != pgx.ErrTxClosed && deferErr != nil { - c.logger.Error("error rolling back transaction for getting max value", slog.Any("error", err)) + c.logger.Error("error rolling back transaction for getting max value", "error", err) } }() @@ -362,7 +363,7 @@ func (c *PostgresConnector) PullQRepRecords( // Build the query to pull records within the range from the source table // Be sure to order the results by the watermark column to ensure consistency across pulls - query, err := BuildQuery(config.Query, config.FlowJobName) + query, err := BuildQuery(c.logger, config.Query, config.FlowJobName) if err != nil { return nil, err } @@ -432,7 +433,7 @@ func (c *PostgresConnector) PullQRepRecordStream( // Build the query to pull records within the range from the source table // Be sure to order the results by the watermark column to ensure consistency across pulls - query, err := BuildQuery(config.Query, config.FlowJobName) + query, err := BuildQuery(c.logger, config.Query, config.FlowJobName) if err != nil { return 0, err } @@ -568,7 +569,7 @@ func (c *PostgresConnector) PullXminRecordStream( return numRecords, currentSnapshotXmin, nil } -func BuildQuery(query string, flowJobName string) (string, error) { +func BuildQuery(logger log.Logger, query string, flowJobName string) (string, error) { tmpl, err := template.New("query").Parse(query) if err != nil { return "", err @@ -587,7 +588,7 @@ func BuildQuery(query string, flowJobName string) (string, error) { } res := buf.String() - slog.Info(fmt.Sprintf("templated query: %s", res)) + logger.Info(fmt.Sprintf("templated query: %s", res)) return res, nil } diff --git a/flow/connectors/postgres/qrep_query_build_test.go b/flow/connectors/postgres/qrep_query_build_test.go index 25c1fef6e6..fdc4dc4522 100644 --- a/flow/connectors/postgres/qrep_query_build_test.go +++ b/flow/connectors/postgres/qrep_query_build_test.go @@ -1,6 +1,7 @@ package connpostgres import ( + "log/slog" "testing" ) @@ -34,7 +35,7 @@ func TestBuildQuery(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - actual, err := BuildQuery(tc.query, "test_flow") + actual, err := BuildQuery(slog.Default(), tc.query, "test_flow") if err != nil { t.Fatalf("Error returned by BuildQuery: %v", err) } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 54294f3537..48900555f0 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -145,7 +145,7 @@ func (qe *QRepQueryExecutor) ProcessRows( qe.logger.Info("Processing rows") // Iterate over the rows for rows.Next() { - record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap) + record, err := mapRowToQRecord(qe.logger, rows, fieldDescriptions, qe.customTypeMap) if err != nil { qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) return nil, fmt.Errorf("failed to map row to QRecord: %w", err) @@ -186,7 +186,7 @@ func (qe *QRepQueryExecutor) processRowsStream( return numRows, ctx.Err() default: // Process the row as before - record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap) + record, err := mapRowToQRecord(qe.logger, rows, fieldDescriptions, qe.customTypeMap) if err != nil { qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) stream.Records <- model.QRecordOrError{ @@ -450,7 +450,10 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( return totalRecordsFetched, nil } -func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, +func mapRowToQRecord( + logger log.Logger, + row pgx.Rows, + fds []pgconn.FieldDescription, customTypeMap map[uint32]string, ) ([]qvalue.QValue, error) { // make vals an empty array of QValue of size len(fds) @@ -458,7 +461,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, values, err := row.Values() if err != nil { - slog.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err)) + logger.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err)) return nil, fmt.Errorf("failed to scan row: %w", err) } @@ -468,7 +471,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, if !ok { tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i]) if err != nil { - slog.Error("[pg_query_executor] failed to parse field", slog.Any("error", err)) + logger.Error("[pg_query_executor] failed to parse field", slog.Any("error", err)) return nil, fmt.Errorf("failed to parse field: %w", err) } record[i] = tmp diff --git a/flow/connectors/postgres/qrep_sql_sync.go b/flow/connectors/postgres/qrep_sql_sync.go index 2e2aa47c49..ed88d395f1 100644 --- a/flow/connectors/postgres/qrep_sql_sync.go +++ b/flow/connectors/postgres/qrep_sql_sync.go @@ -12,6 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" ) @@ -47,7 +48,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( startTime := time.Now() schema, err := stream.Schema() if err != nil { - slog.Error("failed to get schema from stream", slog.Any("error", err), syncLog) + logger.LoggerFromCtx(ctx).Error("failed to get schema from stream", slog.Any("error", err), syncLog) return 0, fmt.Errorf("failed to get schema from stream: %w", err) } @@ -71,7 +72,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( defer func() { if err := tx.Rollback(context.Background()); err != nil { if err != pgx.ErrTxClosed { - slog.Error("failed to rollback transaction tx2", slog.Any("error", err), syncLog) + logger.LoggerFromCtx(ctx).Error("failed to rollback transaction tx2", slog.Any("error", err), syncLog) } } }() diff --git a/flow/connectors/postgres/ssh_wrapped_pool.go b/flow/connectors/postgres/ssh_wrapped_pool.go index 619a9e3dbe..e0c5591a3e 100644 --- a/flow/connectors/postgres/ssh_wrapped_pool.go +++ b/flow/connectors/postgres/ssh_wrapped_pool.go @@ -5,20 +5,20 @@ import ( "fmt" "log/slog" "net" - "sync" "time" "github.com/jackc/pgx/v5" + "go.temporal.io/sdk/log" "golang.org/x/crypto/ssh" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" ) type SSHTunnel struct { sshConfig *ssh.ClientConfig sshServer string - once sync.Once sshClient *ssh.Client } @@ -34,39 +34,30 @@ func NewSSHTunnel( var err error clientConfig, err = utils.GetSSHClientConfig(sshConfig) if err != nil { - slog.Error("Failed to get SSH client config", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Error("Failed to get SSH client config", "error", err) return nil, err } } - pool := &SSHTunnel{ + tunnel := &SSHTunnel{ sshConfig: clientConfig, sshServer: sshServer, } - err := pool.connect() + err := tunnel.setupSSH(logger.LoggerFromCtx(ctx)) if err != nil { return nil, err } - return pool, nil + return tunnel, nil } -func (tunnel *SSHTunnel) connect() error { - var err error - tunnel.once.Do(func() { - err = tunnel.setupSSH() - }) - - return err -} - -func (tunnel *SSHTunnel) setupSSH() error { +func (tunnel *SSHTunnel) setupSSH(logger log.Logger) error { if tunnel.sshConfig == nil { return nil } - slog.Info("Setting up SSH connection to " + tunnel.sshServer) + logger.Info("Setting up SSH connection to " + tunnel.sshServer) var err error tunnel.sshClient, err = ssh.Dial("tcp", tunnel.sshServer, tunnel.sshConfig) @@ -112,24 +103,25 @@ func (tunnel *SSHTunnel) NewPostgresConnFromConfig( } } + logger := logger.LoggerFromCtx(ctx) conn, err := pgx.ConnectConfig(ctx, connConfig) if err != nil { - slog.Error("Failed to create pool:", slog.Any("error", err)) + logger.Error("Failed to create pool:", slog.Any("error", err)) return nil, err } host := connConfig.Host - err = retryWithBackoff(func() error { + err = retryWithBackoff(logger, func() error { err = conn.Ping(ctx) if err != nil { - slog.Error("Failed to ping pool", slog.Any("error", err), slog.String("host", host)) + logger.Error("Failed to ping pool", slog.Any("error", err), slog.String("host", host)) return err } return nil }, 5, 5*time.Second) if err != nil { - slog.Error("Failed to create pool", slog.Any("error", err), slog.String("host", host)) + logger.Error("Failed to create pool", slog.Any("error", err), slog.String("host", host)) conn.Close(ctx) return nil, err } @@ -139,7 +131,7 @@ func (tunnel *SSHTunnel) NewPostgresConnFromConfig( type retryFunc func() error -func retryWithBackoff(fn retryFunc, maxRetries int, backoff time.Duration) error { +func retryWithBackoff(logger log.Logger, fn retryFunc, maxRetries int, backoff time.Duration) error { i := 0 for { err := fn() @@ -148,7 +140,7 @@ func retryWithBackoff(fn retryFunc, maxRetries int, backoff time.Duration) error } i += 1 if i < maxRetries { - slog.Info(fmt.Sprintf("Attempt #%d failed, retrying in %s", i+1, backoff)) + logger.Info(fmt.Sprintf("Attempt #%d failed, retrying in %s", i+1, backoff)) time.Sleep(backoff) } else { return err diff --git a/flow/connectors/sqlserver/qrep.go b/flow/connectors/sqlserver/qrep.go index af9386cfd5..e4ad60104d 100644 --- a/flow/connectors/sqlserver/qrep.go +++ b/flow/connectors/sqlserver/qrep.go @@ -4,12 +4,12 @@ import ( "bytes" "context" "fmt" - "log/slog" "text/template" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" "github.com/jmoiron/sqlx" + "go.temporal.io/sdk/log" utils "github.com/PeerDB-io/peer-flow/connectors/utils/partition" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -158,7 +158,7 @@ func (c *SQLServerConnector) PullQRepRecords( ) (*model.QRecordBatch, error) { // Build the query to pull records within the range from the source table // Be sure to order the results by the watermark column to ensure consistency across pulls - query, err := BuildQuery(config.Query) + query, err := BuildQuery(c.logger, config.Query) if err != nil { return nil, err } @@ -191,7 +191,7 @@ func (c *SQLServerConnector) PullQRepRecords( return c.NamedExecuteAndProcessQuery(ctx, query, rangeParams) } -func BuildQuery(query string) (string, error) { +func BuildQuery(logger log.Logger, query string) (string, error) { tmpl, err := template.New("query").Parse(query) if err != nil { return "", err @@ -210,6 +210,6 @@ func BuildQuery(query string) (string, error) { } res := buf.String() - slog.Info("templated query: " + res) + logger.Info("templated query: " + res) return res, nil } diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 3dcd7c3373..3a280c38f7 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -17,6 +17,7 @@ import ( "github.com/linkedin/goavro/v2" "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -116,9 +117,10 @@ func (p *peerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error } func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, ocfWriter *goavro.OCFWriter) (int, error) { + logger := logger.LoggerFromCtx(ctx) schema, err := p.stream.Schema() if err != nil { - slog.Error("failed to get schema from stream", slog.Any("error", err)) + logger.Error("failed to get schema from stream", slog.Any("error", err)) return 0, fmt.Errorf("failed to get schema from stream: %w", err) } @@ -136,7 +138,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, ocfWriter for qRecordOrErr := range p.stream.Records { if qRecordOrErr.Err != nil { - slog.Error("[avro] failed to get record from stream", slog.Any("error", qRecordOrErr.Err)) + logger.Error("[avro] failed to get record from stream", slog.Any("error", qRecordOrErr.Err)) return 0, fmt.Errorf("[avro] failed to get record from stream: %w", qRecordOrErr.Err) } @@ -149,13 +151,13 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, ocfWriter avroMap, err := avroConverter.Convert() if err != nil { - slog.Error("failed to convert QRecord to Avro compatible map: ", slog.Any("error", err)) + logger.Error("failed to convert QRecord to Avro compatible map: ", slog.Any("error", err)) return 0, fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err) } err = ocfWriter.Append([]interface{}{avroMap}) if err != nil { - slog.Error("failed to write record to OCF: ", slog.Any("error", err)) + logger.Error("failed to write record to OCF: ", slog.Any("error", err)) return 0, fmt.Errorf("failed to write record to OCF: %w", err) } @@ -181,9 +183,10 @@ func (p *peerDBOCFWriter) WriteOCF(ctx context.Context, w io.Writer) (int, error } func (p *peerDBOCFWriter) WriteRecordsToS3(ctx context.Context, bucketName, key string, s3Creds utils.S3PeerCredentials) (*AvroFile, error) { + logger := logger.LoggerFromCtx(ctx) s3svc, err := utils.CreateS3Client(s3Creds) if err != nil { - slog.Error("failed to create S3 client: ", slog.Any("error", err)) + logger.Error("failed to create S3 client: ", slog.Any("error", err)) return nil, fmt.Errorf("failed to create S3 client: %w", err) } @@ -205,7 +208,7 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(ctx context.Context, bucketName, key }) if err != nil { s3Path := "s3://" + bucketName + "/" + key - slog.Error("failed to upload file: ", slog.Any("error", err), slog.Any("s3_path", s3Path)) + logger.Error("failed to upload file: ", slog.Any("error", err), slog.Any("s3_path", s3Path)) return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err) } diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index b46a4ad9b8..914d5e9159 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -12,6 +12,7 @@ import ( "time" "github.com/cockroachdb/pebble" + "go.temporal.io/sdk/log" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" @@ -103,14 +104,14 @@ func (c *cdcRecordsStore) diskSpillThresholdsExceeded() bool { return false } -func (c *cdcRecordsStore) Set(key *model.TableWithPkey, rec model.Record) error { +func (c *cdcRecordsStore) Set(logger log.Logger, key *model.TableWithPkey, rec model.Record) error { if key != nil { _, ok := c.inMemoryRecords[*key] if ok || !c.diskSpillThresholdsExceeded() { c.inMemoryRecords[*key] = rec } else { if c.pebbleDB == nil { - slog.Info(c.thresholdReason, + logger.Info(c.thresholdReason, slog.String(string(shared.FlowNameKey), c.flowJobName)) err := c.initPebbleDB() if err != nil { diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index c585541054..c972092536 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -2,6 +2,7 @@ package cdc_records import ( "crypto/rand" + "log/slog" "math/big" "testing" "time" @@ -81,7 +82,7 @@ func TestSingleRecord(t *testing.T) { cdcRecordsStore.numRecordsSwitchThreshold = 10 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(&key, rec) + err := cdcRecordsStore.Set(slog.Default(), &key, rec) require.NoError(t, err) // should not spill into DB require.Len(t, cdcRecordsStore.inMemoryRecords, 1) @@ -103,7 +104,7 @@ func TestRecordsTillSpill(t *testing.T) { // add records upto set limit for i := 1; i <= 10; i++ { key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(&key, rec) + err := cdcRecordsStore.Set(slog.Default(), &key, rec) require.NoError(t, err) require.Len(t, cdcRecordsStore.inMemoryRecords, i) require.Nil(t, cdcRecordsStore.pebbleDB) @@ -111,7 +112,7 @@ func TestRecordsTillSpill(t *testing.T) { // this record should be spilled to DB key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(&key, rec) + err := cdcRecordsStore.Set(slog.Default(), &key, rec) require.NoError(t, err) _, ok := cdcRecordsStore.inMemoryRecords[key] require.False(t, ok) @@ -132,7 +133,7 @@ func TestTimeAndRatEncoding(t *testing.T) { cdcRecordsStore.numRecordsSwitchThreshold = 0 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(&key, rec) + err := cdcRecordsStore.Set(slog.Default(), &key, rec) require.NoError(t, err) retreived, ok, err := cdcRecordsStore.Get(key) @@ -153,7 +154,7 @@ func TestNullKeyDoesntStore(t *testing.T) { cdcRecordsStore.numRecordsSwitchThreshold = 0 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(nil, rec) + err := cdcRecordsStore.Set(slog.Default(), nil, rec) require.NoError(t, err) retreived, ok, err := cdcRecordsStore.Get(key) diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 5412323bfc..77ff006510 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -14,6 +14,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/shared" ) @@ -115,7 +116,7 @@ func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobNa defer func() { err = insertBatchTablesTx.Rollback(ctx) if err != pgx.ErrTxClosed && err != nil { - slog.Error("error during transaction rollback", + logger.LoggerFromCtx(ctx).Error("error during transaction rollback", slog.Any("error", err), slog.String(string(shared.FlowNameKey), flowJobName)) } @@ -225,7 +226,7 @@ func addPartitionToQRepRun(ctx context.Context, pool *pgxpool.Pool, flowJobName runUUID string, partition *protos.QRepPartition, ) error { if partition.Range == nil && partition.FullTablePartition { - slog.Info("partition"+partition.PartitionId+ + logger.LoggerFromCtx(ctx).Info("partition"+partition.PartitionId+ " is a full table partition. Metrics logging is skipped.", slog.String(string(shared.FlowNameKey), flowJobName)) return nil diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go index 047b614925..5225c400b1 100644 --- a/flow/dynamicconf/dynamicconf.go +++ b/flow/dynamicconf/dynamicconf.go @@ -2,7 +2,6 @@ package dynamicconf import ( "context" - "log/slog" "strconv" "time" @@ -10,6 +9,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" + "github.com/PeerDB-io/peer-flow/logger" ) func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) bool { @@ -17,7 +17,7 @@ func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) b query := "SELECT EXISTS(SELECT 1 FROM alerting_settings WHERE config_name = $1)" err := conn.QueryRow(ctx, query, key).Scan(&exists) if err != nil { - slog.Error("Failed to check if key exists: %v", err) + logger.LoggerFromCtx(ctx).Error("Failed to check if key exists: %v", err) return false } @@ -27,7 +27,7 @@ func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) b func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 { conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { - slog.Error("Failed to get catalog connection pool: %v", err) + logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) return defaultValue } @@ -39,13 +39,13 @@ func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uin query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" err = conn.QueryRow(ctx, query, key).Scan(&value) if err != nil { - slog.Error("Failed to get key: %v", err) + logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) return defaultValue } result, err := strconv.ParseUint(value.String, 10, 32) if err != nil { - slog.Error("Failed to parse uint32: %v", err) + logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err) return defaultValue } diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 67d83a666e..d7f25c7f5d 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -22,8 +22,6 @@ type QRecordBatch struct { func (q *QRecordBatch) ToQRecordStream(buffer int) (*QRecordStream, error) { stream := NewQRecordStream(buffer) - slog.Info(fmt.Sprintf("Converting %d records to QRecordStream", len(q.Records))) - go func() { err := stream.SetSchema(q.Schema) if err != nil { diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 98922d22bc..a817dd285f 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -11,12 +11,12 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/PeerDB-io/peer-flow/dynamicconf" + "github.com/PeerDB-io/peer-flow/logger" ) // alerting service, no cool name :( type Alerter struct { catalogPool *pgxpool.Pool - logger *slog.Logger } func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) { @@ -49,15 +49,12 @@ func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([] // doesn't take care of closing pool, needs to be done externally. func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { - logger := slog.Default() if catalogPool == nil { - logger.Error("catalog pool is nil for Alerter") return nil, fmt.Errorf("catalog pool is nil for Alerter") } return &Alerter{ catalogPool: catalogPool, - logger: logger, }, nil } @@ -66,7 +63,7 @@ func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) { dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) if dur == 0 { - a.logger.WarnContext(ctx, "Alerting disabled via environment variable, returning") + logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning") return } @@ -78,7 +75,7 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str var createdTimestamp time.Time err = row.Scan(&createdTimestamp) if err != nil && err != pgx.ErrNoRows { - a.logger.Warn("failed to send alert: ", slog.String("err", err.Error())) + logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error())) return } @@ -91,7 +88,7 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str func (a *Alerter) AlertToSlack(ctx context.Context, alertKey string, alertMessage string) { slackAlertSenders, err := registerSendersFromPool(ctx, a.catalogPool) if err != nil { - a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) return } @@ -99,7 +96,7 @@ func (a *Alerter) AlertToSlack(ctx context.Context, alertKey string, alertMessag err = slackAlertSender.sendAlert(ctx, fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage) if err != nil { - a.logger.WarnContext(ctx, "failed to send alert", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err)) return } } @@ -110,7 +107,7 @@ func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertM "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", alertKey, alertMessage) if err != nil { - a.logger.WarnContext(ctx, "failed to insert alert", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Warn("failed to insert alert", slog.Any("error", err)) return } } @@ -121,7 +118,7 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", flowName, errorWithStack, "error") if err != nil { - a.logger.WarnContext(ctx, "failed to insert flow error", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err)) return } } @@ -131,7 +128,7 @@ func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", flowName, info, "info") if err != nil { - a.logger.WarnContext(ctx, "failed to insert flow info", slog.Any("error", err)) + logger.LoggerFromCtx(ctx).Warn("failed to insert flow info", slog.Any("error", err)) return } }