Skip to content

Commit

Permalink
Use temporal's activity.GetLogger in connectors (#1187)
Browse files Browse the repository at this point in the history
Logs of a different test appear were getting mixed up.
This fixes that by obtaining a temporal logger from the current context,
using that in our connectors instead of the default slog instance

Co-authored-by: Philip Dubé <[email protected]>
  • Loading branch information
Amogh-Bharadwaj and serprex authored Feb 1, 2024
1 parent 279931b commit 2a0842e
Show file tree
Hide file tree
Showing 17 changed files with 99 additions and 63 deletions.
3 changes: 3 additions & 0 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"net"
"net/http"
"os"
"time"

"github.com/google/uuid"
Expand All @@ -22,6 +23,7 @@ import (

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)
Expand Down Expand Up @@ -91,6 +93,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
clientOptions := client.Options{
HostPort: args.TemporalHostPort,
Namespace: args.TemporalNamespace,
Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))),
}
if args.TemporalCert != "" && args.TemporalKey != "" {
slog.Info("Using temporal certificate/key for authentication")
Expand Down
4 changes: 4 additions & 0 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package main
import (
"crypto/tls"
"fmt"
"log/slog"
"os"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/activities"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand All @@ -25,6 +28,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))),
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
Expand Down
2 changes: 2 additions & 0 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/PeerDB-io/peer-flow/activities"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -90,6 +91,7 @@ func WorkerMain(opts *WorkerOptions) error {
clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))),
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
Expand Down
26 changes: 13 additions & 13 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"cloud.google.com/go/storage"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"google.golang.org/api/iterator"
"google.golang.org/api/option"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -54,7 +56,7 @@ type BigQueryConnector struct {
datasetID string
projectID string
catalogPool *pgxpool.Pool
logger slog.Logger
logger log.Logger
}

// Create BigQueryServiceAccount from BigqueryConfig
Expand Down Expand Up @@ -222,8 +224,6 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
return nil, fmt.Errorf("failed to create catalog connection pool: %v", err)
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)

return &BigQueryConnector{
ctx: ctx,
bqConfig: config,
Expand All @@ -233,7 +233,7 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
pgMetadata: metadataStore.NewPostgresMetadataStoreFromCatalog(ctx, catalogPool),
storageClient: storageClient,
catalogPool: catalogPool,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
logger: logger.LoggerFromCtx(ctx),
}, nil
}

Expand Down Expand Up @@ -279,7 +279,7 @@ func (c *BigQueryConnector) waitForTableReady(datasetTable *datasetTable) error
return nil
}

slog.Info("waiting for table to be ready",
c.logger.Info("waiting for table to be ready",
slog.String("table", datasetTable.table), slog.Int("attempt", attempt))
attempt++
time.Sleep(sleepInterval)
Expand Down Expand Up @@ -651,7 +651,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
return nil, fmt.Errorf("error while checking metadata for BigQuery dataset %s: %w",
datasetTable.dataset, err)
}
c.logger.InfoContext(c.ctx, fmt.Sprintf("creating dataset %s...", dataset.DatasetID))
c.logger.Info(fmt.Sprintf("creating dataset %s...", dataset.DatasetID))
err = dataset.Create(c.ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to create BigQuery dataset %s: %w", dataset.DatasetID, err)
Expand Down Expand Up @@ -758,7 +758,7 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
for _, renameRequest := range req.RenameTableOptions {
srcDatasetTable, _ := c.convertToDatasetTable(renameRequest.CurrentName)
dstDatasetTable, _ := c.convertToDatasetTable(renameRequest.NewName)
c.logger.InfoContext(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
Expand All @@ -768,11 +768,11 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
allCols := strings.Join(renameRequest.TableSchema.ColumnNames, ",")
pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",")

c.logger.InfoContext(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))
c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

c.logger.InfoContext(c.ctx, fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
c.logger.Info(fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName),
allCols, *req.SoftDeleteColName, dstDatasetTable.string(),
pkeyCols, pkeyCols, srcDatasetTable.string()))
Expand All @@ -796,7 +796,7 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
activity.RecordHeartbeat(c.ctx, fmt.Sprintf("setting synced at column for table '%s'...",
srcDatasetTable.string()))

c.logger.InfoContext(c.ctx,
c.logger.Info(
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(),
*req.SyncedAtColName, *req.SyncedAtColName))
query := c.client.Query(
Expand All @@ -811,7 +811,7 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
}
}

c.logger.InfoContext(c.ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s",
c.logger.Info(fmt.Sprintf("DROP TABLE IF EXISTS %s",
dstDatasetTable.string()))
// drop the dst table if exists
dropQuery := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s",
Expand All @@ -823,7 +823,7 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
return nil, fmt.Errorf("unable to drop table %s: %w", dstDatasetTable.string(), err)
}

c.logger.InfoContext(c.ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s",
c.logger.Info(fmt.Sprintf("ALTER TABLE %s RENAME TO %s",
srcDatasetTable.string(), dstDatasetTable.table))
// rename the src table to dst
query := c.client.Query(fmt.Sprintf("ALTER TABLE %s RENAME TO %s",
Expand All @@ -836,7 +836,7 @@ func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos
dstDatasetTable.string(), err)
}

c.logger.InfoContext(c.ctx, fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(),
c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(),
dstDatasetTable.string()))
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (c *BigQueryConnector) SyncQRepRecords(
}

if done {
c.logger.InfoContext(c.ctx, fmt.Sprintf("Partition %s has already been synced", partition.PartitionId))
c.logger.Info(fmt.Sprintf("Partition %s has already been synced", partition.PartitionId))
return 0, nil
}
c.logger.Info(fmt.Sprintf("QRep sync function called and partition existence checked for"+
Expand Down
16 changes: 8 additions & 8 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
slog.String("destinationTable", rawTableName))
}

slog.Info(fmt.Sprintf("loaded stage into %s.%s", datasetID, rawTableName),
s.connector.logger.Info(fmt.Sprintf("loaded stage into %s.%s", datasetID, rawTableName),
slog.String(string(shared.FlowNameKey), req.FlowJobName),
slog.String("dstTableName", rawTableName))

Expand Down Expand Up @@ -162,8 +162,8 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
slog.Info("Obtained Avro schema for destination table", flowLog)
slog.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema), flowLog)
s.connector.logger.Info("Obtained Avro schema for destination table", flowLog)
s.connector.logger.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema), flowLog)
// create a staging table name with partitionID replace hyphens with underscores
dstDatasetTable, _ := s.connector.convertToDatasetTable(dstTableName)
stagingDatasetTable := &datasetTable{
Expand Down Expand Up @@ -197,7 +197,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;",
dstTableName, selector, stagingDatasetTable.string())

slog.Info("Performing transaction inside QRep sync function", flowLog)
s.connector.logger.Info("Performing transaction inside QRep sync function", flowLog)

query := bqClient.Query(insertStmt)
query.DefaultDatasetID = s.connector.datasetID
Expand All @@ -221,7 +221,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
flowLog)
}

slog.Info(fmt.Sprintf("loaded stage into %s", dstTableName), flowLog)
s.connector.logger.Info(fmt.Sprintf("loaded stage into %s", dstTableName), flowLog)
return numRecords, nil
}

Expand Down Expand Up @@ -426,7 +426,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
}

avroFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
slog.Info("writing records to local file", idLog)
s.connector.logger.Info("writing records to local file", idLog)
avroFile, err = ocfWriter.WriteRecordsToAvroFile(avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
Expand All @@ -437,7 +437,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
if avroFile.NumRecords == 0 {
return 0, nil
}
slog.Info(fmt.Sprintf("wrote %d records", avroFile.NumRecords), idLog)
s.connector.logger.Info(fmt.Sprintf("wrote %d records", avroFile.NumRecords), idLog)

bqClient := s.connector.client
var avroRef bigquery.LoadSource
Expand Down Expand Up @@ -472,7 +472,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
if err := status.Err(); err != nil {
return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err)
}
slog.Info(fmt.Sprintf("Pushed from %s to BigQuery", avroFile.FilePath), idLog)
s.connector.logger.Info(fmt.Sprintf("Pushed from %s to BigQuery", avroFile.FilePath), idLog)

err = s.connector.waitForTableReady(stagingTable)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"go.temporal.io/sdk/log"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

type EventHubConnector struct {
Expand All @@ -23,7 +24,7 @@ type EventHubConnector struct {
pgMetadata *metadataStore.PostgresMetadataStore
creds *azidentity.DefaultAzureCredential
hubManager *EventHubManager
logger slog.Logger
logger log.Logger
}

// NewEventHubConnector creates a new EventHubConnector.
Expand All @@ -46,14 +47,13 @@ func NewEventHubConnector(
return nil, err
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &EventHubConnector{
ctx: ctx,
config: config,
pgMetadata: pgMetadata,
creds: defaultAzureCreds,
hubManager: hubManager,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
logger: logger.LoggerFromCtx(ctx),
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/normalize_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package connpostgres

import (
"fmt"
"log/slog"
"slices"
"strings"

"go.temporal.io/sdk/log"
"golang.org/x/exp/maps"

"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand All @@ -28,7 +28,7 @@ type normalizeStmtGenerator struct {
// Postgres metadata schema
metadataSchema string
// to log fallback statement selection
logger slog.Logger
logger log.Logger
}

func (n *normalizeStmtGenerator) generateNormalizeStatements() []string {
Expand Down
9 changes: 4 additions & 5 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ import (
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

Expand All @@ -35,7 +36,7 @@ type PostgresConnector struct {
replConfig *pgx.ConnConfig
customTypesMapping map[uint32]string
metadataSchema string
logger slog.Logger
logger log.Logger
}

// NewPostgresConnector creates a new instance of PostgresConnector.
Expand Down Expand Up @@ -80,8 +81,6 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
metadataSchema = *pgConfig.MetadataSchema
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
flowLog := slog.With(slog.String(string(shared.FlowNameKey), flowName))
return &PostgresConnector{
connStr: connectionString,
ctx: ctx,
Expand All @@ -91,7 +90,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
replConfig: replConfig,
customTypesMapping: customTypeMap,
metadataSchema: metadataSchema,
logger: *flowLog,
logger: logger.LoggerFromCtx(ctx),
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/assert"
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -174,7 +175,7 @@ func TestGetQRepPartitions(t *testing.T) {
ctx: context.Background(),
config: &protos.PostgresConfig{},
conn: conn,
logger: *slog.With(slog.String(string(shared.FlowNameKey), "testGetQRepPartitions")),
logger: log.NewStructuredLogger(slog.With(slog.String(string(shared.FlowNameKey), "testGetQRepPartitions"))),
}

got, err := c.GetQRepPartitions(tc.config, tc.last)
Expand Down
Loading

0 comments on commit 2a0842e

Please sign in to comment.