Skip to content

Commit

Permalink
re-enables worker db logs, adds more err wraps to benthos insert (#3199)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei authored Jan 29, 2025
1 parent 9882b4d commit be0adca
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 14 deletions.
2 changes: 1 addition & 1 deletion backend/internal/neosyncdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/tracelog"
db_queries "github.com/nucleuscloud/neosync/backend/gen/go/db"
pgxslog "github.com/nucleuscloud/neosync/backend/internal/pgx-slog"
pgxslog "github.com/nucleuscloud/neosync/internal/pgx-slog"
)

type DBTX interface {
Expand Down
4 changes: 3 additions & 1 deletion backend/pkg/sqlconnect/sql-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.Connectio

func getPgConnectorFn(dsn string, config *mgmtv1alpha1.PostgresConnectionConfig, logger *slog.Logger) stdlibConnectorGetter {
return func() (driver.Connector, func(), error) {
connectorOpts := []postgrestunconnector.Option{}
connectorOpts := []postgrestunconnector.Option{
postgrestunconnector.WithLogger(logger),
}
closers := []func(){}

if config.GetClientTls() != nil {
Expand Down
File renamed without changes.
22 changes: 21 additions & 1 deletion internal/sshtunnel/connectors/postgrestunconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"context"
"crypto/tls"
"database/sql/driver"
"log/slog"
"net"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/stdlib"
"github.com/jackc/pgx/v5/tracelog"
pgxslog "github.com/nucleuscloud/neosync/internal/pgx-slog"
"github.com/nucleuscloud/neosync/internal/sshtunnel"
)

Expand All @@ -23,6 +26,7 @@ type Option func(*connectorConfig) error
type connectorConfig struct {
dialer sshtunnel.ContextDialer
tlsConfig *tls.Config
logger *slog.Logger
}

// WithDialer sets a custom dialer for the connector
Expand All @@ -41,11 +45,20 @@ func WithTLSConfig(tlsConfig *tls.Config) Option {
}
}

func WithLogger(logger *slog.Logger) Option {
return func(cfg *connectorConfig) error {
cfg.logger = logger
return nil
}
}

func New(
dsn string,
opts ...Option,
) (*Connector, func(), error) {
cfg := &connectorConfig{}
cfg := &connectorConfig{
logger: slog.Default(),
}
for _, opt := range opts {
if err := opt(cfg); err != nil {
return nil, nil, err
Expand All @@ -66,6 +79,13 @@ func New(
pgxConfig.TLSConfig = cfg.tlsConfig
}

pgxConfig.Tracer = &tracelog.TraceLog{
Logger: pgxslog.NewLogger(cfg.logger, pgxslog.GetShouldOmitArgs()),
LogLevel: pgxslog.GetDatabaseLogLevel(),
}
// todo: We may need to re-enable this to support pg bouncer
// pgxConfig.DefaultQueryExecMode = pgx.QueryExecModeExec

// RegisterConnConfig returns unique connection strings, so even if the dsn is used for multiple calls to New()
// The unregister will not interfere with any other instances of Connector that are using the same input dsn
connStr := stdlib.RegisterConnConfig(pgxConfig)
Expand Down
25 changes: 16 additions & 9 deletions worker/pkg/benthos/sql/output_sql_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,18 +254,19 @@ func (s *pooledInsertOutput) WriteBatch(ctx context.Context, batch service.Messa

insertQuery, args, err := s.queryBuilder.BuildInsertQuery(rows)
if err != nil {
return err
return fmt.Errorf("failed to build insert query: %w", err)
}

if _, err := s.db.ExecContext(ctx, insertQuery, args...); err != nil {
shouldRetry := s.skipForeignKeyViolations && neosync_benthos.IsForeignKeyViolationError(err.Error())
shouldRetry := neosync_benthos.ShouldRetryInsert(err.Error(), s.skipForeignKeyViolations)
if !shouldRetry {
return err
return fmt.Errorf("failed to execute insert query: %w", err)
}
s.logger.Infof("received error during batch write that is retryable, proceeding with row by row insert: %s", err.Error())

err = s.RetryInsertRowByRow(ctx, s.queryBuilder, rows)
if err != nil {
return err
return fmt.Errorf("failed to retry insert query: %w", err)
}
}
return nil
Expand All @@ -277,23 +278,29 @@ func (s *pooledInsertOutput) RetryInsertRowByRow(
rows []map[string]any,
) error {
fkErrorCount := 0
otherErrorCount := 0
insertCount := 0
for _, row := range rows {
insertQuery, args, err := builder.BuildInsertQuery([]map[string]any{row})
if err != nil {
return err
}
_, err = s.db.ExecContext(ctx, insertQuery, args...)
if err != nil && neosync_benthos.IsForeignKeyViolationError(err.Error()) {
fkErrorCount++
} else if err != nil && !neosync_benthos.IsForeignKeyViolationError(err.Error()) {
return err
if err != nil {
if !neosync_benthos.ShouldRetryInsert(err.Error(), s.skipForeignKeyViolations) {
return fmt.Errorf("failed to retry insert query: %w", err)
} else if neosync_benthos.IsForeignKeyViolationError(err.Error()) {
fkErrorCount++
} else {
otherErrorCount++
s.logger.Warnf("received retryable error during row by row insert. skipping row: %s", err.Error())
}
}
if err == nil {
insertCount++
}
}
s.logger.Infof("Completed batch insert with %d foreign key violations. Skipped rows: %d, Successfully inserted: %d", fkErrorCount, fkErrorCount, insertCount)
s.logger.Infof("Completed batch insert with %d foreign key violations. Total Skipped rows: %d, Successfully inserted: %d", fkErrorCount, otherErrorCount, insertCount)
return nil
}

Expand Down
18 changes: 18 additions & 0 deletions worker/pkg/benthos/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func IsCriticalError(errMsg string) bool {
"incorrect date value",
"incorrect time value",
"does not exist",
"syntax error at or near",
"ON CONFLICT DO UPDATE requires inference specification or constraint name",
}

for _, errStr := range criticalErrors {
Expand All @@ -62,6 +64,7 @@ func IsGenerateJobCriticalError(errMsg string) bool {
"incorrect date value",
"incorrect time value",
"does not exist",
"syntax error at or near",
}

for _, errStr := range criticalErrors {
Expand Down Expand Up @@ -90,3 +93,18 @@ func IsForeignKeyViolationError(errMsg string) bool {
}
return false
}

func ShouldRetryInsert(errMsg string, shouldCheckForForeignKeyViolation bool) bool {
if shouldCheckForForeignKeyViolation && IsForeignKeyViolationError(errMsg) {
return true
}
otherErrors := []string{
"ON CONFLICT DO UPDATE command cannot affect row a second time",
}
for _, errStr := range otherErrors {
if containsIgnoreCase(errMsg, errStr) {
return true
}
}
return false
}
16 changes: 14 additions & 2 deletions worker/pkg/query-builder/insert-query-builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ func (d *PostgresDriver) BuildInsertQuery(rows []map[string]any) (query string,
if len(rows) == 0 {
return "", []any{}, errors.New("no rows to insert")
}
if len(d.options.conflictConfig.onConflictDoUpdate.conflictColumns) == 0 {
d.logger.Warn("no conflict columns specified for on conflict do update, defaulting to on conflict do nothing")
onConflictDoNothing := true
insertQuery, args, err := BuildInsertQuery(d.driver, d.schema, d.table, goquRows, &onConflictDoNothing)
if err != nil {
return "", nil, fmt.Errorf("failed to build insert query on conflict do nothing fallback: %w", err)
}
if d.options.shouldOverrideColumnDefault {
insertQuery = sqlmanager_postgres.BuildPgInsertIdentityAlwaysSql(insertQuery)
}
return insertQuery, args, nil
}

columns := make([]string, 0, len(rows[0]))
for col := range rows[0] {
Expand All @@ -146,12 +158,12 @@ func (d *PostgresDriver) BuildInsertQuery(rows []map[string]any) (query string,
onConflictDoNothing := d.options.conflictConfig.onConflictDoNothing != nil
insertQuery, args, err := BuildInsertQuery(d.driver, d.schema, d.table, goquRows, &onConflictDoNothing)
if err != nil {
return "", nil, err
return "", nil, fmt.Errorf("failed to build insert query: %w", err)
}
if d.options.shouldOverrideColumnDefault {
insertQuery = sqlmanager_postgres.BuildPgInsertIdentityAlwaysSql(insertQuery)
}
return insertQuery, args, err
return insertQuery, args, nil
}

func (d *PostgresDriver) buildInsertOnConflictDoUpdateQuery(
Expand Down

0 comments on commit be0adca

Please sign in to comment.