From 796786e9f512f2b63a9292a15c883d1a3c48ec01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 11 Mar 2024 13:15:35 +0000 Subject: [PATCH] alerting: also log to logger --- flow/activities/flowable.go | 1 - flow/activities/snapshot_activity.go | 1 - flow/alerting/alerting.go | 9 +++++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 287082dcd..7ad57fd12 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -391,7 +391,6 @@ func (a *FlowableActivity) SyncFlow( StagingPath: config.CdcStagingPath, }) if err != nil { - logger.Warn("failed to push records", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return fmt.Errorf("failed to push records: %w", err) } diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 84df6ecfa..1cd4dac3e 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -63,7 +63,6 @@ func (a *SnapshotActivity) SetupReplication( defer close(replicationErr) closeConnectionForError := func(err error) { - logger.Error("failed to setup replication", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, config.FlowJobName, err) // it is important to close the connection here as it is not closed in CloseSlotKeepAlive connectors.CloseConnector(ctx, conn) diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 6fc903aa7..93cb946f7 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -224,27 +224,32 @@ func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, mor } func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) { + logger := logger.LoggerFromCtx(ctx) errorWithStack := fmt.Sprintf("%+v", err) + logger.Error(err.Error(), slog.Any("stack", errorWithStack)) _, err = a.catalogPool.Exec(ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", flowName, errorWithStack, "error") if err != nil { - logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err)) + logger.Warn("failed to insert flow error", slog.Any("error", err)) return } a.sendTelemetryMessage(ctx, flowName, errorWithStack, telemetry.ERROR) } func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) { + logger.LoggerFromCtx(ctx).Info(info) a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO) } func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) { + logger := logger.LoggerFromCtx(ctx) + logger.Info(info) _, err := a.catalogPool.Exec(ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", flowName, info, "info") if err != nil { - logger.LoggerFromCtx(ctx).Warn("failed to insert flow info", slog.Any("error", err)) + logger.Warn("failed to insert flow info", slog.Any("error", err)) return } }