Skip to content

Commit

Permalink
Replace more use of default slog handler with logger from context (#1253
Browse files Browse the repository at this point in the history
)

Things are getting a bit messy since while
temporalio/sdk-go#1158 claims the interfaces
are compatible in reality there's a mess about what the two different
implementations expect from `args ...interface{}`

For now use context specific logger more to mangle output less in CI
A more correct/complete solution will have to be worked out in future
  • Loading branch information
serprex authored Feb 11, 2024
1 parent 57ecc3c commit e5eb347
Show file tree
Hide file tree
Showing 19 changed files with 102 additions and 93 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
// drop the staging table
if err := bqClient.DatasetInProject(s.connector.projectID, datasetID).Table(stagingTable).Delete(ctx); err != nil {
// just log the error this isn't fatal.
slog.Error("failed to delete staging table "+stagingTable,
s.connector.logger.Warn("failed to delete staging table "+stagingTable,
slog.Any("error", err),
slog.Int64("syncBatchID", syncBatchID),
slog.String("destinationTable", rawTableName))
Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
if err := bqClient.DatasetInProject(s.connector.projectID, stagingDatasetTable.dataset).
Table(stagingDatasetTable.table).Delete(ctx); err != nil {
// just log the error this isn't fatal.
slog.Error("failed to delete staging table "+stagingDatasetTable.string(),
s.connector.logger.Warn("failed to delete staging table "+stagingDatasetTable.string(),
slog.Any("error", err),
flowLog)
}
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)
Expand Down Expand Up @@ -285,6 +286,6 @@ func CloseConnector(ctx context.Context, conn Connector) {

err := conn.Close(ctx)
if err != nil {
slog.Error("error closing connector", slog.Any("error", err))
logger.LoggerFromCtx(ctx).Error("error closing connector", slog.Any("error", err))
}
}
12 changes: 7 additions & 5 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"

"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -105,17 +106,18 @@ func (h *HubBatches) sendBatch(
return err
}

slog.InfoContext(ctx, "sendBatch",
slog.Int("events sent", int(events.NumEvents())), slog.String("event hub topic ", tblName.ToString()))
logger.LoggerFromCtx(ctx).Info("sendBatch",
slog.Int("events sent", int(events.NumEvents())), slog.String("event hub topic", tblName.ToString()))
return nil
}

func (h *HubBatches) flushAllBatches(
ctx context.Context,
flowName string,
) error {
logger := logger.LoggerFromCtx(ctx)
if h.Len() == 0 {
slog.Info("no events to send", slog.String(string(shared.FlowNameKey), flowName))
logger.Info("no events to send", slog.String(string(shared.FlowNameKey), flowName))
return nil
}

Expand All @@ -132,7 +134,7 @@ func (h *HubBatches) flushAllBatches(
}

numEventsPushed.Add(numEvents)
slog.Info("flushAllBatches",
logger.Info("flushAllBatches",
slog.String(string(shared.FlowNameKey), flowName),
slog.Int("events sent", int(numEvents)),
slog.String("event hub topic ", destination.ToString()))
Expand All @@ -144,7 +146,7 @@ func (h *HubBatches) flushAllBatches(
if err != nil {
return fmt.Errorf("failed to flushAllBatches: %v", err)
}
slog.Info("hub batches flush",
logger.Info("hub batches flush",
slog.String(string(shared.FlowNameKey), flowName),
slog.Int("events sent", int(numEventsPushed.Load())))

Expand Down
15 changes: 9 additions & 6 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
)

type EventHubManager struct {
Expand Down Expand Up @@ -65,12 +66,13 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE
hubTmp := hub.(*azeventhubs.ProducerClient)
_, err := hubTmp.GetEventHubProperties(ctx, nil)
if err != nil {
slog.Info(fmt.Sprintf("eventhub %s", name)+
"not reachable. Will re-establish connection and re-create it.",
logger := logger.LoggerFromCtx(ctx)
logger.Info(
fmt.Sprintf("eventhub %s not reachable. Will re-establish connection and re-create it.", name),
slog.Any("error", err))
closeError := m.closeProducerClient(ctx, hubTmp)
if closeError != nil {
slog.Error("failed to close producer client", slog.Any("error", closeError))
logger.Error("failed to close producer client", slog.Any("error", closeError))
}
m.hubs.Delete(name)
hubConnectOK = false
Expand Down Expand Up @@ -116,7 +118,7 @@ func (m *EventHubManager) Close(ctx context.Context) error {
hub := value.(*azeventhubs.ProducerClient)
err := m.closeProducerClient(ctx, hub)
if err != nil {
slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
logger.LoggerFromCtx(ctx).Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
}
numHubsClosed.Add(1)
Expand Down Expand Up @@ -167,6 +169,7 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE

partitionCount := int64(cfg.PartitionCount)
retention := int64(cfg.MessageRetentionInDays)
logger := logger.LoggerFromCtx(ctx)
if err != nil {
opts := armeventhub.Eventhub{
Properties: &armeventhub.Properties{
Expand All @@ -181,9 +184,9 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE
return err
}

slog.Info("event hub created", slog.Any("name", name))
logger.Info("event hub created", slog.Any("name", name))
} else {
slog.Info("event hub exists already", slog.Any("name", name))
logger.Info("event hub exists already", slog.Any("name", name))
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/cdc_records"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/geo"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -232,8 +233,9 @@ func (p *PostgresCDCSource) consumeStream(
standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

logger := logger.LoggerFromCtx(ctx)
addRecordWithKey := func(key *model.TableWithPkey, rec model.Record) error {
err := cdcRecordsStorage.Set(key, rec)
err := cdcRecordsStorage.Set(logger, key, rec)
if err != nil {
return err
}
Expand Down
12 changes: 7 additions & 5 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
func (c *PostgresConnector) CreateReplConn(ctx context.Context) (*pgx.Conn, error) {
conn, err := c.ssh.NewPostgresConnFromConfig(ctx, c.replConfig)
if err != nil {
slog.Error("failed to create replication connection", slog.Any("error", err))
logger.LoggerFromCtx(ctx).Error("failed to create replication connection", "error", err)
return nil, fmt.Errorf("failed to create replication connection: %w", err)
}

Expand Down Expand Up @@ -914,22 +914,24 @@ func (c *PostgresConnector) HandleSlotInfo(
slotName string,
peerName string,
) error {
logger := logger.LoggerFromCtx(ctx)

// must create new connection because HandleSlotInfo is threadsafe
conn, err := c.ssh.NewPostgresConnFromPostgresConfig(ctx, c.config)
if err != nil {
slog.WarnContext(ctx, "warning: failed to connect to get slot info", slog.Any("error", err))
logger.Warn("warning: failed to connect to get slot info", "error", err)
return err
}
defer conn.Close(ctx)

slotInfo, err := getSlotInfo(ctx, conn, slotName, c.config.Database)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
logger.Warn("warning: failed to get slot info", "error", err)
return err
}

if len(slotInfo) == 0 {
slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName))
logger.Warn("warning: unable to get slot info", "slotName", slotName)
return nil
}

Expand All @@ -950,7 +952,7 @@ cc: <!channel>`,
maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
res, err := getOpenConnectionsForUser(ctx, conn, c.config.User)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
logger.Warn("warning: failed to get current open connections", "error", err)
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
Expand Down
11 changes: 6 additions & 5 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peer-flow/connectors/utils"
partition_utils "github.com/PeerDB-io/peer-flow/connectors/utils/partition"
Expand Down Expand Up @@ -290,7 +291,7 @@ func (c *PostgresConnector) CheckForUpdatedMaxValue(
defer func() {
deferErr := tx.Rollback(ctx)
if deferErr != pgx.ErrTxClosed && deferErr != nil {
c.logger.Error("error rolling back transaction for getting max value", slog.Any("error", err))
c.logger.Error("error rolling back transaction for getting max value", "error", err)
}
}()

Expand Down Expand Up @@ -362,7 +363,7 @@ func (c *PostgresConnector) PullQRepRecords(

// Build the query to pull records within the range from the source table
// Be sure to order the results by the watermark column to ensure consistency across pulls
query, err := BuildQuery(config.Query, config.FlowJobName)
query, err := BuildQuery(c.logger, config.Query, config.FlowJobName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -432,7 +433,7 @@ func (c *PostgresConnector) PullQRepRecordStream(

// Build the query to pull records within the range from the source table
// Be sure to order the results by the watermark column to ensure consistency across pulls
query, err := BuildQuery(config.Query, config.FlowJobName)
query, err := BuildQuery(c.logger, config.Query, config.FlowJobName)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -568,7 +569,7 @@ func (c *PostgresConnector) PullXminRecordStream(
return numRecords, currentSnapshotXmin, nil
}

func BuildQuery(query string, flowJobName string) (string, error) {
func BuildQuery(logger log.Logger, query string, flowJobName string) (string, error) {
tmpl, err := template.New("query").Parse(query)
if err != nil {
return "", err
Expand All @@ -587,7 +588,7 @@ func BuildQuery(query string, flowJobName string) (string, error) {
}
res := buf.String()

slog.Info(fmt.Sprintf("templated query: %s", res))
logger.Info(fmt.Sprintf("templated query: %s", res))
return res, nil
}

Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/qrep_query_build_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package connpostgres

import (
"log/slog"
"testing"
)

Expand Down Expand Up @@ -34,7 +35,7 @@ func TestBuildQuery(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual, err := BuildQuery(tc.query, "test_flow")
actual, err := BuildQuery(slog.Default(), tc.query, "test_flow")
if err != nil {
t.Fatalf("Error returned by BuildQuery: %v", err)
}
Expand Down
13 changes: 8 additions & 5 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
qe.logger.Info("Processing rows")
// Iterate over the rows
for rows.Next() {
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
record, err := mapRowToQRecord(qe.logger, rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err))
return nil, fmt.Errorf("failed to map row to QRecord: %w", err)
Expand Down Expand Up @@ -186,7 +186,7 @@ func (qe *QRepQueryExecutor) processRowsStream(
return numRows, ctx.Err()
default:
// Process the row as before
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
record, err := mapRowToQRecord(qe.logger, rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err))
stream.Records <- model.QRecordOrError{
Expand Down Expand Up @@ -450,15 +450,18 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
return totalRecordsFetched, nil
}

func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
func mapRowToQRecord(
logger log.Logger,
row pgx.Rows,
fds []pgconn.FieldDescription,
customTypeMap map[uint32]string,
) ([]qvalue.QValue, error) {
// make vals an empty array of QValue of size len(fds)
record := make([]qvalue.QValue, len(fds))

values, err := row.Values()
if err != nil {
slog.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err))
logger.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err))
return nil, fmt.Errorf("failed to scan row: %w", err)
}

Expand All @@ -468,7 +471,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
if !ok {
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
slog.Error("[pg_query_executor] failed to parse field", slog.Any("error", err))
logger.Error("[pg_query_executor] failed to parse field", slog.Any("error", err))
return nil, fmt.Errorf("failed to parse field: %w", err)
}
record[i] = tmp
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/postgres/qrep_sql_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -47,7 +48,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
startTime := time.Now()
schema, err := stream.Schema()
if err != nil {
slog.Error("failed to get schema from stream", slog.Any("error", err), syncLog)
logger.LoggerFromCtx(ctx).Error("failed to get schema from stream", slog.Any("error", err), syncLog)
return 0, fmt.Errorf("failed to get schema from stream: %w", err)
}

Expand All @@ -71,7 +72,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
defer func() {
if err := tx.Rollback(context.Background()); err != nil {
if err != pgx.ErrTxClosed {
slog.Error("failed to rollback transaction tx2", slog.Any("error", err), syncLog)
logger.LoggerFromCtx(ctx).Error("failed to rollback transaction tx2", slog.Any("error", err), syncLog)
}
}
}()
Expand Down
Loading

0 comments on commit e5eb347

Please sign in to comment.