diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 09bfb4c5df..88a435767f 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -8,6 +8,7 @@ import ( "log/slog" "net" "net/http" + "os" "time" "github.com/google/uuid" @@ -22,6 +23,7 @@ import ( utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) @@ -91,6 +93,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error { clientOptions := client.Options{ HostPort: args.TemporalHostPort, Namespace: args.TemporalNamespace, + Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))), } if args.TemporalCert != "" && args.TemporalKey != "" { slog.Info("Using temporal certificate/key for authentication") diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index d76a095ace..e917818ea4 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -3,12 +3,15 @@ package main import ( "crypto/tls" "fmt" + "log/slog" + "os" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/activities" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/shared" "github.com/PeerDB-io/peer-flow/shared/alerting" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -25,6 +28,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { clientOptions := client.Options{ HostPort: opts.TemporalHostPort, Namespace: opts.TemporalNamespace, + Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))), } if opts.TemporalCert != "" && opts.TemporalKey != "" { diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index aacc7d85bc..7a45f2306b 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -16,6 +16,7 @@ import ( "github.com/PeerDB-io/peer-flow/activities" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/shared" "github.com/PeerDB-io/peer-flow/shared/alerting" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -90,6 +91,7 @@ func WorkerMain(opts *WorkerOptions) error { clientOptions := client.Options{ HostPort: opts.TemporalHostPort, Namespace: opts.TemporalNamespace, + Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))), } if opts.TemporalCert != "" && opts.TemporalKey != "" { diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 876e437a11..fb9b7c5e4b 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -15,6 +15,7 @@ import ( "cloud.google.com/go/storage" "github.com/jackc/pgx/v5/pgxpool" "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/log" "google.golang.org/api/iterator" "google.golang.org/api/option" @@ -22,6 +23,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -54,7 +56,7 @@ type BigQueryConnector struct { datasetID string projectID string catalogPool *pgxpool.Pool - logger slog.Logger + logger log.Logger } // Create BigQueryServiceAccount from BigqueryConfig @@ -222,8 +224,6 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* return nil, fmt.Errorf("failed to create catalog connection pool: %v", err) } - flowName, _ := ctx.Value(shared.FlowNameKey).(string) - return &BigQueryConnector{ ctx: ctx, bqConfig: config, @@ -233,7 +233,7 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* pgMetadata: metadataStore.NewPostgresMetadataStoreFromCatalog(ctx, catalogPool), storageClient: storageClient, catalogPool: catalogPool, - logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + logger: logger.LoggerFromCtx(ctx), }, nil } @@ -279,7 +279,7 @@ func (c *BigQueryConnector) waitForTableReady(datasetTable *datasetTable) error return nil } - slog.Info("waiting for table to be ready", + c.logger.Info("waiting for table to be ready", slog.String("table", datasetTable.table), slog.Int("attempt", attempt)) attempt++ time.Sleep(sleepInterval) @@ -651,7 +651,7 @@ func (c *BigQueryConnector) SetupNormalizedTables( return nil, fmt.Errorf("error while checking metadata for BigQuery dataset %s: %w", datasetTable.dataset, err) } - c.logger.InfoContext(c.ctx, fmt.Sprintf("creating dataset %s...", dataset.DatasetID)) + c.logger.Info(fmt.Sprintf("creating dataset %s...", dataset.DatasetID)) err = dataset.Create(c.ctx, nil) if err != nil { return nil, fmt.Errorf("failed to create BigQuery dataset %s: %w", dataset.DatasetID, err) @@ -758,7 +758,7 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos for _, renameRequest := range req.RenameTableOptions { srcDatasetTable, _ := c.convertToDatasetTable(renameRequest.CurrentName) dstDatasetTable, _ := c.convertToDatasetTable(renameRequest.NewName) - c.logger.InfoContext(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), + c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), dstDatasetTable.string())) activity.RecordHeartbeat(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), @@ -768,11 +768,11 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos allCols := strings.Join(renameRequest.TableSchema.ColumnNames, ",") pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",") - c.logger.InfoContext(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string())) + c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string())) activity.RecordHeartbeat(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string())) - c.logger.InfoContext(c.ctx, fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)", + c.logger.Info(fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)", srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName), allCols, *req.SoftDeleteColName, dstDatasetTable.string(), pkeyCols, pkeyCols, srcDatasetTable.string())) @@ -796,7 +796,7 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos activity.RecordHeartbeat(c.ctx, fmt.Sprintf("setting synced at column for table '%s'...", srcDatasetTable.string())) - c.logger.InfoContext(c.ctx, + c.logger.Info( fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(), *req.SyncedAtColName, *req.SyncedAtColName)) query := c.client.Query( @@ -811,7 +811,7 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos } } - c.logger.InfoContext(c.ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", + c.logger.Info(fmt.Sprintf("DROP TABLE IF EXISTS %s", dstDatasetTable.string())) // drop the dst table if exists dropQuery := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s", @@ -823,7 +823,7 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos return nil, fmt.Errorf("unable to drop table %s: %w", dstDatasetTable.string(), err) } - c.logger.InfoContext(c.ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", + c.logger.Info(fmt.Sprintf("ALTER TABLE %s RENAME TO %s", srcDatasetTable.string(), dstDatasetTable.table)) // rename the src table to dst query := c.client.Query(fmt.Sprintf("ALTER TABLE %s RENAME TO %s", @@ -836,7 +836,7 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos dstDatasetTable.string(), err) } - c.logger.InfoContext(c.ctx, fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(), + c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(), dstDatasetTable.string())) } diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 8978f82a11..bea5d210e7 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -34,7 +34,7 @@ func (c *BigQueryConnector) SyncQRepRecords( } if done { - c.logger.InfoContext(c.ctx, fmt.Sprintf("Partition %s has already been synced", partition.PartitionId)) + c.logger.Info(fmt.Sprintf("Partition %s has already been synced", partition.PartitionId)) return 0, nil } c.logger.Info(fmt.Sprintf("QRep sync function called and partition existence checked for"+ diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 894475c6b7..a53ec481c6 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -109,7 +109,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( slog.String("destinationTable", rawTableName)) } - slog.Info(fmt.Sprintf("loaded stage into %s.%s", datasetID, rawTableName), + s.connector.logger.Info(fmt.Sprintf("loaded stage into %s.%s", datasetID, rawTableName), slog.String(string(shared.FlowNameKey), req.FlowJobName), slog.String("dstTableName", rawTableName)) @@ -162,8 +162,8 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( if err != nil { return 0, fmt.Errorf("failed to define Avro schema: %w", err) } - slog.Info("Obtained Avro schema for destination table", flowLog) - slog.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema), flowLog) + s.connector.logger.Info("Obtained Avro schema for destination table", flowLog) + s.connector.logger.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema), flowLog) // create a staging table name with partitionID replace hyphens with underscores dstDatasetTable, _ := s.connector.convertToDatasetTable(dstTableName) stagingDatasetTable := &datasetTable{ @@ -197,7 +197,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;", dstTableName, selector, stagingDatasetTable.string()) - slog.Info("Performing transaction inside QRep sync function", flowLog) + s.connector.logger.Info("Performing transaction inside QRep sync function", flowLog) query := bqClient.Query(insertStmt) query.DefaultDatasetID = s.connector.datasetID @@ -221,7 +221,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( flowLog) } - slog.Info(fmt.Sprintf("loaded stage into %s", dstTableName), flowLog) + s.connector.logger.Info(fmt.Sprintf("loaded stage into %s", dstTableName), flowLog) return numRecords, nil } @@ -426,7 +426,7 @@ func (s *QRepAvroSyncMethod) writeToStage( } avroFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, syncID) - slog.Info("writing records to local file", idLog) + s.connector.logger.Info("writing records to local file", idLog) avroFile, err = ocfWriter.WriteRecordsToAvroFile(avroFilePath) if err != nil { return 0, fmt.Errorf("failed to write records to local Avro file: %w", err) @@ -437,7 +437,7 @@ func (s *QRepAvroSyncMethod) writeToStage( if avroFile.NumRecords == 0 { return 0, nil } - slog.Info(fmt.Sprintf("wrote %d records", avroFile.NumRecords), idLog) + s.connector.logger.Info(fmt.Sprintf("wrote %d records", avroFile.NumRecords), idLog) bqClient := s.connector.client var avroRef bigquery.LoadSource @@ -472,7 +472,7 @@ func (s *QRepAvroSyncMethod) writeToStage( if err := status.Err(); err != nil { return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err) } - slog.Info(fmt.Sprintf("Pushed from %s to BigQuery", avroFile.FilePath), idLog) + s.connector.logger.Info(fmt.Sprintf("Pushed from %s to BigQuery", avroFile.FilePath), idLog) err = s.connector.waitForTableReady(stagingTable) if err != nil { diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 5386eb11a4..382beabf04 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -8,13 +8,14 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "go.temporal.io/sdk/log" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" - "github.com/PeerDB-io/peer-flow/shared" ) type EventHubConnector struct { @@ -23,7 +24,7 @@ type EventHubConnector struct { pgMetadata *metadataStore.PostgresMetadataStore creds *azidentity.DefaultAzureCredential hubManager *EventHubManager - logger slog.Logger + logger log.Logger } // NewEventHubConnector creates a new EventHubConnector. @@ -46,14 +47,13 @@ func NewEventHubConnector( return nil, err } - flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &EventHubConnector{ ctx: ctx, config: config, pgMetadata: pgMetadata, creds: defaultAzureCreds, hubManager: hubManager, - logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + logger: logger.LoggerFromCtx(ctx), }, nil } diff --git a/flow/connectors/postgres/normalize_stmt_generator.go b/flow/connectors/postgres/normalize_stmt_generator.go index 47da11f0c4..629a252572 100644 --- a/flow/connectors/postgres/normalize_stmt_generator.go +++ b/flow/connectors/postgres/normalize_stmt_generator.go @@ -2,10 +2,10 @@ package connpostgres import ( "fmt" - "log/slog" "slices" "strings" + "go.temporal.io/sdk/log" "golang.org/x/exp/maps" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -28,7 +28,7 @@ type normalizeStmtGenerator struct { // Postgres metadata schema metadataSchema string // to log fallback statement selection - logger slog.Logger + logger log.Logger } func (n *normalizeStmtGenerator) generateNormalizeStatements() []string { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 8a1c4eb1bc..e40de85ec4 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -13,15 +13,16 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + "go.temporal.io/sdk/log" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/peerdbenv" - "github.com/PeerDB-io/peer-flow/shared" "github.com/PeerDB-io/peer-flow/shared/alerting" ) @@ -35,7 +36,7 @@ type PostgresConnector struct { replConfig *pgx.ConnConfig customTypesMapping map[uint32]string metadataSchema string - logger slog.Logger + logger log.Logger } // NewPostgresConnector creates a new instance of PostgresConnector. @@ -80,8 +81,6 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) metadataSchema = *pgConfig.MetadataSchema } - flowName, _ := ctx.Value(shared.FlowNameKey).(string) - flowLog := slog.With(slog.String(string(shared.FlowNameKey), flowName)) return &PostgresConnector{ connStr: connectionString, ctx: ctx, @@ -91,7 +90,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) replConfig: replConfig, customTypesMapping: customTypeMap, metadataSchema: metadataSchema, - logger: *flowLog, + logger: logger.LoggerFromCtx(ctx), }, nil } diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index 8ac3a207c1..b1a29c80eb 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -9,6 +9,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/log" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" @@ -174,7 +175,7 @@ func TestGetQRepPartitions(t *testing.T) { ctx: context.Background(), config: &protos.PostgresConfig{}, conn: conn, - logger: *slog.With(slog.String(string(shared.FlowNameKey), "testGetQRepPartitions")), + logger: log.NewStructuredLogger(slog.With(slog.String(string(shared.FlowNameKey), "testGetQRepPartitions"))), } got, err := c.GetQRepPartitions(tc.config, tc.last) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index ce4cb0d253..8b422574b6 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -9,9 +9,11 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/log" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/geo" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -25,7 +27,7 @@ type QRepQueryExecutor struct { flowJobName string partitionID string customTypeMap map[uint32]string - logger slog.Logger + logger log.Logger } func NewQRepQueryExecutor(conn *pgx.Conn, ctx context.Context, @@ -37,18 +39,16 @@ func NewQRepQueryExecutor(conn *pgx.Conn, ctx context.Context, snapshot: "", flowJobName: flowJobName, partitionID: partitionID, - logger: *slog.With( - slog.String(string(shared.FlowNameKey), flowJobName), - slog.String(string(shared.PartitionIDKey), partitionID)), + logger: log.With( + logger.LoggerFromCtx(ctx), + slog.String(string(shared.PartitionIDKey), partitionID), + ), } } func NewQRepQueryExecutorSnapshot(conn *pgx.Conn, ctx context.Context, snapshot string, flowJobName string, partitionID string, ) (*QRepQueryExecutor, error) { - qrepLog := slog.Group("qrep-metadata", slog.String(string(shared.FlowNameKey), flowJobName), - slog.String(string(shared.PartitionIDKey), partitionID)) - slog.Info("Declared new qrep executor for snapshot", qrepLog) CustomTypeMap, err := utils.GetCustomDataTypes(ctx, conn) if err != nil { return nil, fmt.Errorf("failed to get custom data types: %w", err) @@ -60,7 +60,10 @@ func NewQRepQueryExecutorSnapshot(conn *pgx.Conn, ctx context.Context, snapshot flowJobName: flowJobName, partitionID: partitionID, customTypeMap: CustomTypeMap, - logger: *slog.With(qrepLog), + logger: log.With( + logger.LoggerFromCtx(ctx), + slog.String(string(shared.PartitionIDKey), partitionID), + ), }, nil } diff --git a/flow/connectors/postgres/qrep_sql_sync.go b/flow/connectors/postgres/qrep_sql_sync.go index 1ca15ef43f..932af63be8 100644 --- a/flow/connectors/postgres/qrep_sql_sync.go +++ b/flow/connectors/postgres/qrep_sql_sync.go @@ -117,7 +117,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( dstTableIdentifier.Sanitize(), ) - slog.Info(fmt.Sprintf("Creating staging table %s - '%s'", + s.connector.logger.Info(fmt.Sprintf("Creating staging table %s - '%s'", stagingTableName, createStagingTableStmt), syncLog) _, err = tx.Exec(context.Background(), createStagingTableStmt) @@ -169,7 +169,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( strings.Join(writeMode.UpsertKeyColumns, ", "), setClause, ) - slog.Info("Performing upsert operation", slog.String("upsertStmt", upsertStmt), syncLog) + s.connector.logger.Info("Performing upsert operation", slog.String("upsertStmt", upsertStmt), syncLog) res, err := tx.Exec(context.Background(), upsertStmt) if err != nil { return -1, fmt.Errorf("failed to perform upsert operation: %v", err) @@ -182,14 +182,14 @@ func (s *QRepStagingTableSync) SyncQRepRecords( "DROP TABLE %s;", stagingTableIdentifier.Sanitize(), ) - slog.Info("Dropping staging table", slog.String("stagingTable", stagingTableName), syncLog) + s.connector.logger.Info("Dropping staging table", slog.String("stagingTable", stagingTableName), syncLog) _, err = tx.Exec(context.Background(), dropStagingTableStmt) if err != nil { return -1, fmt.Errorf("failed to drop staging table: %v", err) } } - slog.Info(fmt.Sprintf("pushed %d records to %s", numRowsSynced, dstTableName), syncLog) + s.connector.logger.Info(fmt.Sprintf("pushed %d records to %s", numRowsSynced, dstTableName), syncLog) // marshal the partition to json using protojson pbytes, err := protojson.Marshal(partition) @@ -202,7 +202,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( "INSERT INTO %s VALUES ($1, $2, $3, $4, $5);", metadataTableIdentifier.Sanitize(), ) - slog.Info("Executing transaction inside Qrep sync", syncLog) + s.connector.logger.Info("Executing transaction inside Qrep sync", syncLog) _, err = tx.Exec( context.Background(), insertMetadataStmt, @@ -222,6 +222,6 @@ func (s *QRepStagingTableSync) SyncQRepRecords( } numRowsInserted := copySource.NumRecords() - slog.Info(fmt.Sprintf("pushed %d records to %s", numRowsInserted, dstTableName), syncLog) + s.connector.logger.Info(fmt.Sprintf("pushed %d records to %s", numRowsInserted, dstTableName), syncLog) return numRowsInserted, nil } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 95213c6123..9a1877ca32 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -10,12 +10,13 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" + "go.temporal.io/sdk/log" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/shared" ) const ( @@ -28,7 +29,7 @@ type S3Connector struct { pgMetadata *metadataStore.PostgresMetadataStore client s3.Client creds utils.S3PeerCredentials - logger slog.Logger + logger log.Logger } func NewS3Connector(ctx context.Context, @@ -70,14 +71,13 @@ func NewS3Connector(ctx context.Context, slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) return nil, err } - flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &S3Connector{ ctx: ctx, url: config.Url, pgMetadata: pgMetadata, client: *s3Client, creds: s3PeerCreds, - logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + logger: logger.LoggerFromCtx(ctx), }, nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 7394110b86..28761513a1 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -15,11 +15,13 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/snowflakedb/gosnowflake" "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/log" "golang.org/x/sync/errgroup" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -76,7 +78,7 @@ type SnowflakeConnector struct { database *sql.DB pgMetadata *metadataStore.PostgresMetadataStore rawSchema string - logger slog.Logger + logger log.Logger } // creating this to capture array results from snowflake. @@ -112,7 +114,7 @@ func TableCheck(ctx context.Context, database *sql.DB) error { defer func() { deferErr := tx.Rollback() if deferErr != sql.ErrTxDone && deferErr != nil { - activity.GetLogger(ctx).Error("error while rolling back transaction for table check", + logger.LoggerFromCtx(ctx).Error("error while rolling back transaction for table check", slog.Any("error", deferErr)) } }() @@ -207,13 +209,12 @@ func NewSnowflakeConnector(ctx context.Context, return nil, fmt.Errorf("could not connect to metadata store: %w", err) } - flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &SnowflakeConnector{ ctx: ctx, database: database, pgMetadata: pgMetadata, rawSchema: rawSchema, - logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + logger: logger.LoggerFromCtx(ctx), }, nil } diff --git a/flow/connectors/sqlserver/sqlserver.go b/flow/connectors/sqlserver/sqlserver.go index 805dcfb849..cf99a98d0e 100644 --- a/flow/connectors/sqlserver/sqlserver.go +++ b/flow/connectors/sqlserver/sqlserver.go @@ -3,14 +3,14 @@ package connsqlserver import ( "context" "fmt" - "log/slog" "github.com/jmoiron/sqlx" _ "github.com/microsoft/go-mssqldb" + "go.temporal.io/sdk/log" peersql "github.com/PeerDB-io/peer-flow/connectors/sql" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" + "github.com/PeerDB-io/peer-flow/logger" ) type SQLServerConnector struct { @@ -19,7 +19,7 @@ type SQLServerConnector struct { ctx context.Context config *protos.SqlServerConfig db *sqlx.DB - logger slog.Logger + logger log.Logger } // NewSQLServerConnector creates a new SQL Server connection @@ -40,14 +40,12 @@ func NewSQLServerConnector(ctx context.Context, config *protos.SqlServerConfig) genericExecutor := *peersql.NewGenericSQLQueryExecutor( ctx, db, sqlServerTypeToQValueKindMap, qValueKindToSQLServerTypeMap) - flowName, _ := ctx.Value(shared.FlowNameKey).(string) - return &SQLServerConnector{ GenericSQLQueryExecutor: genericExecutor, ctx: ctx, config: config, db: db, - logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + logger: logger.LoggerFromCtx(ctx), }, nil } diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index a8b5c41be8..52dfe86209 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -211,7 +211,6 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils slog.Error("failed to upload file: ", slog.Any("error", err), slog.Any("s3_path", s3Path)) return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err) } - slog.Info("file uploaded to " + fmt.Sprintf("%s/%s", bucketName, key)) if !noPanic { return nil, fmt.Errorf("WriteOCF panicked while writing avro to S3 %s/%s", bucketName, key) diff --git a/flow/logger/from_ctx.go b/flow/logger/from_ctx.go new file mode 100644 index 0000000000..5551110761 --- /dev/null +++ b/flow/logger/from_ctx.go @@ -0,0 +1,26 @@ +package logger + +import ( + "context" + "log/slog" + + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/log" + + "github.com/PeerDB-io/peer-flow/shared" +) + +func LoggerFromCtx(ctx context.Context) log.Logger { + flowName, hasName := ctx.Value(shared.FlowNameKey).(string) + if activity.IsActivity(ctx) { + if hasName { + return log.With(activity.GetLogger(ctx), string(shared.FlowNameKey), flowName) + } else { + return activity.GetLogger(ctx) + } + } else if hasName { + return slog.With(string(shared.FlowNameKey), flowName) + } else { + return slog.Default() + } +}