Skip to content

Commit

Permalink
Merge branch 'main' into update-dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Mar 11, 2024
2 parents efb6333 + 99c8dd8 commit 5a8ec36
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 44 deletions.
1 change: 0 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ func (a *FlowableActivity) SyncFlow(
StagingPath: config.CdcStagingPath,
})
if err != nil {
logger.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to push records: %w", err)
}
Expand Down
1 change: 0 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func (a *SnapshotActivity) SetupReplication(
defer close(replicationErr)

closeConnectionForError := func(err error) {
logger.Error("failed to setup replication", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
// it is important to close the connection here as it is not closed in CloseSlotKeepAlive
connectors.CloseConnector(ctx, conn)
Expand Down
9 changes: 7 additions & 2 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,27 +224,32 @@ func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, mor
}

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) {
logger := logger.LoggerFromCtx(ctx)
errorWithStack := fmt.Sprintf("%+v", err)
logger.Error(err.Error(), slog.Any("stack", errorWithStack))
_, err = a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, errorWithStack, "error")
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err))
logger.Warn("failed to insert flow error", slog.Any("error", err))
return
}
a.sendTelemetryMessage(ctx, flowName, errorWithStack, telemetry.ERROR)
}

func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) {
logger.LoggerFromCtx(ctx).Info(info)
a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO)
}

func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) {
logger := logger.LoggerFromCtx(ctx)
logger.Info(info)
_, err := a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, info, "info")
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to insert flow info", slog.Any("error", err))
logger.Warn("failed to insert flow info", slog.Any("error", err))
return
}
}
6 changes: 3 additions & 3 deletions flow/shared/telemetry/sns_message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string,
var messageSubjectBuilder strings.Builder
maxSubjectSize := 99
for currentLength, char := range subject {
if currentLength > maxSubjectSize {
break
}
if unicode.IsPrint(char) {
messageSubjectBuilder.WriteRune(char)
} else {
messageSubjectBuilder.WriteRune(' ')
}
if currentLength > maxSubjectSize {
break
}
}
publish, err := s.client.Publish(ctx, &sns.PublishInput{
Message: aws.String(body),
Expand Down
37 changes: 0 additions & 37 deletions nexus/pt/src/flow_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,6 @@ pub struct FlowJobTableMapping {
pub exclude: Vec<String>,
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub enum FlowSyncMode {
Avro,
SQL,
}

impl FlowSyncMode {
pub fn parse_string(s: &str) -> Result<FlowSyncMode, String> {
match s {
"avro" => Ok(FlowSyncMode::Avro),
"sql" => Ok(FlowSyncMode::SQL),
_ => Err(format!("{} is not a valid FlowSyncMode", s)),
}
}
}

impl std::str::FromStr for FlowSyncMode {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"avro" => Ok(FlowSyncMode::Avro),
"default" => Ok(FlowSyncMode::SQL),
_ => Err(format!("{} is not a valid FlowSyncMode", s)),
}
}
}

impl ToString for FlowSyncMode {
fn to_string(&self) -> String {
match self {
FlowSyncMode::Avro => "avro".to_string(),
FlowSyncMode::SQL => "default".to_string(),
}
}
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub struct FlowJob {
pub name: String,
Expand Down

0 comments on commit 5a8ec36

Please sign in to comment.