Skip to content

Commit

Permalink
Merge branch 'main' into clickhouse-scrub
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Mar 11, 2024
2 parents c1aaa46 + 7bb74c0 commit a5d4b32
Show file tree
Hide file tree
Showing 12 changed files with 482 additions and 497 deletions.
1 change: 0 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ func (a *FlowableActivity) SyncFlow(
StagingPath: config.CdcStagingPath,
})
if err != nil {
logger.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to push records: %w", err)
}
Expand Down
1 change: 0 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func (a *SnapshotActivity) SetupReplication(
defer close(replicationErr)

closeConnectionForError := func(err error) {
logger.Error("failed to setup replication", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
// it is important to close the connection here as it is not closed in CloseSlotKeepAlive
connectors.CloseConnector(ctx, conn)
Expand Down
9 changes: 7 additions & 2 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,27 +224,32 @@ func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, mor
}

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) {
logger := logger.LoggerFromCtx(ctx)
errorWithStack := fmt.Sprintf("%+v", err)
logger.Error(err.Error(), slog.Any("stack", errorWithStack))
_, err = a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, errorWithStack, "error")
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err))
logger.Warn("failed to insert flow error", slog.Any("error", err))
return
}
a.sendTelemetryMessage(ctx, flowName, errorWithStack, telemetry.ERROR)
}

func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) {
logger.LoggerFromCtx(ctx).Info(info)
a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO)
}

func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) {
logger := logger.LoggerFromCtx(ctx)
logger.Info(info)
_, err := a.catalogPool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, info, "info")
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to insert flow info", slog.Any("error", err))
logger.Warn("failed to insert flow info", slog.Any("error", err))
return
}
}
7 changes: 5 additions & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ func (c *SnowflakeConnector) mergeTablesForBatch(
g.SetLimit(8) // limit parallel merges to 8

for _, tableName := range destinationTableNames {
if err := gCtx.Err(); err != nil {
return fmt.Errorf("canceled while normalizing records: %w", err)
if gCtx.Err() != nil {
break
}

g.Go(func() error {
Expand Down Expand Up @@ -597,6 +597,9 @@ func (c *SnowflakeConnector) mergeTablesForBatch(
if err := g.Wait(); err != nil {
return fmt.Errorf("error while normalizing records: %w", err)
}
if err := ctx.Err(); err != nil {
return fmt.Errorf("normalize canceled: %w", err)
}

return nil
}
Expand Down
6 changes: 3 additions & 3 deletions flow/shared/telemetry/sns_message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string,
var messageSubjectBuilder strings.Builder
maxSubjectSize := 99
for currentLength, char := range subject {
if currentLength > maxSubjectSize {
break
}
if unicode.IsPrint(char) {
messageSubjectBuilder.WriteRune(char)
} else {
messageSubjectBuilder.WriteRune(' ')
}
if currentLength > maxSubjectSize {
break
}
}
publish, err := s.client.Publish(ctx, &sns.PublishInput{
Message: aws.String(body),
Expand Down
54 changes: 27 additions & 27 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 12 additions & 23 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ fn parse_db_options(
opts.insert(&opt.name.value, val);
}

let config = match db_type {
Ok(Some(match db_type {
DbType::Bigquery => {
let pem_str = opts
.get("private_key")
Expand Down Expand Up @@ -565,8 +565,7 @@ fn parse_db_options(
.ok_or_else(|| anyhow::anyhow!("missing dataset_id in peer options"))?
.to_string(),
};
let config = Config::BigqueryConfig(bq_config);
Some(config)
Config::BigqueryConfig(bq_config)
}
DbType::Snowflake => {
let s3_int = opts
Expand Down Expand Up @@ -605,8 +604,7 @@ fn parse_db_options(
metadata_schema: opts.get("metadata_schema").map(|s| s.to_string()),
s3_integration: s3_int,
};
let config = Config::SnowflakeConfig(snowflake_config);
Some(config)
Config::SnowflakeConfig(snowflake_config)
}
DbType::Mongo => {
let mongo_config = MongoConfig {
Expand All @@ -632,8 +630,7 @@ fn parse_db_options(
.parse::<i32>()
.context("unable to parse port as valid int")?,
};
let config = Config::MongoConfig(mongo_config);
Some(config)
Config::MongoConfig(mongo_config)
}
DbType::Postgres => {
let postgres_config = PostgresConfig {
Expand All @@ -659,8 +656,7 @@ fn parse_db_options(
transaction_snapshot: "".to_string(),
ssh_config: None,
};
let config = Config::PostgresConfig(postgres_config);
Some(config)
Config::PostgresConfig(postgres_config)
}
DbType::Eventhub => {
let subscription_id = opts
Expand Down Expand Up @@ -699,8 +695,7 @@ fn parse_db_options(
partition_count,
message_retention_in_days,
};
let config = Config::EventhubConfig(eventhub_config);
Some(config)
Config::EventhubConfig(eventhub_config)
}
DbType::S3 => {
let s3_config = S3Config {
Expand All @@ -714,8 +709,7 @@ fn parse_db_options(
role_arn: opts.get("role_arn").map(|s| s.to_string()),
endpoint: opts.get("endpoint").map(|s| s.to_string()),
};
let config = Config::S3Config(s3_config);
Some(config)
Config::S3Config(s3_config)
}
DbType::Sqlserver => {
let port_str = opts.get("port").context("port not specified")?;
Expand All @@ -736,8 +730,7 @@ fn parse_db_options(
.context("database is not specified")?
.to_string(),
};
let config = Config::SqlserverConfig(sqlserver_config);
Some(config)
Config::SqlserverConfig(sqlserver_config)
}
DbType::EventhubGroup => {
// split comma separated list of columns and trim
Expand Down Expand Up @@ -775,8 +768,7 @@ fn parse_db_options(
eventhubs,
unnest_columns,
};
let config = Config::EventhubGroupConfig(eventhub_group_config);
Some(config)
Config::EventhubGroupConfig(eventhub_group_config)
}
DbType::Clickhouse => {
let clickhouse_config = ClickhouseConfig {
Expand Down Expand Up @@ -817,12 +809,9 @@ fn parse_db_options(
disable_tls: opts
.get("disable_tls")
.and_then(|s| s.parse::<bool>().ok())
.unwrap_or_default()
.unwrap_or_default(),
};
let config = Config::ClickhouseConfig(clickhouse_config);
Some(config)
Config::ClickhouseConfig(clickhouse_config)
}
};

Ok(config)
}))
}
Loading

0 comments on commit a5d4b32

Please sign in to comment.