Skip to content

Commit

Permalink
better CDC error logging (#1275)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Feb 13, 2024
1 parent 9081af8 commit 01bc1a4
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 11 deletions.
12 changes: 8 additions & 4 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,21 @@ func (p *PostgresCDCSource) consumeStream(
// if we are past the next standby deadline (?)
if time.Now().After(nextStandbyMessageDeadline) {
if !cdcRecordsStorage.IsEmpty() {
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records, will return at next commit",
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records",
p.flowJobName,
cdcRecordsStorage.Len()),
)

if !p.commitLock {
// immediate return if we are not waiting for a commit
p.logger.Info(
fmt.Sprintf("no commit lock, returning currently accumulated records - %d",
cdcRecordsStorage.Len()))
return nil
} else {
p.logger.Info(fmt.Sprintf("commit lock, waiting for commit to return records - %d",
cdcRecordsStorage.Len()))
waitingForCommit = true
}

waitingForCommit = true
} else {
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait",
p.flowJobName),
Expand Down
24 changes: 17 additions & 7 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"io"
"log/slog"
"os"
"runtime/debug"
"sync/atomic"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/djherbis/buffer"
"github.com/djherbis/nio/v3"
"github.com/klauspost/compress/flate"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
Expand Down Expand Up @@ -190,32 +193,39 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(ctx context.Context, bucketName, key
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}

r, w := io.Pipe()
buf := buffer.New(32 * 1024 * 1024) // 32MB in memory Buffer
r, w := nio.Pipe(buf)

defer r.Close()
var writeOcfError error
var numRows int
var noPanic bool

go func() {
defer w.Close()
defer func() {
if r := recover(); r != nil {
writeOcfError = fmt.Errorf("panic occurred during WriteOCF: %v", r)
stack := string(debug.Stack())
logger.Error("panic during WriteOCF", slog.Any("error", writeOcfError), slog.String("stack", stack))
}
w.Close()
}()
numRows, writeOcfError = p.WriteOCF(ctx, w)
noPanic = true
}()

_, err = manager.NewUploader(s3svc).Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: r,
})

if err != nil {
s3Path := "s3://" + bucketName + "/" + key
logger.Error("failed to upload file: ", slog.Any("error", err), slog.Any("s3_path", s3Path))
return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err)
}

if !noPanic {
return nil, fmt.Errorf("WriteOCF panicked while writing avro to S3 %s/%s", bucketName, key)
}
if writeOcfError != nil {
logger.Error("failed to write records to OCF: ", slog.Any("error", writeOcfError))
return nil, writeOcfError
}

Expand Down
2 changes: 2 additions & 0 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 // indirect
github.com/aws/smithy-go v1.19.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/djherbis/buffer v1.2.0
github.com/djherbis/nio/v3 v3.0.1
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/goccy/go-json v0.10.2 // indirect
Expand Down
5 changes: 5 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ github.com/danieljoos/wincred v1.2.1/go.mod h1:uGaFL9fDn3OLTvzCGulzE+SzjEe5NGlh5
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/djherbis/buffer v1.1.0/go.mod h1:VwN8VdFkMY0DCALdY8o00d3IZ6Amz/UNVMWcSaJT44o=
github.com/djherbis/buffer v1.2.0 h1:PH5Dd2ss0C7CRRhQCZ2u7MssF+No9ide8Ye71nPHcrQ=
github.com/djherbis/buffer v1.2.0/go.mod h1:fjnebbZjCUpPinBRD+TDwXSOeNQ7fPQWLfGQqiAiUyE=
github.com/djherbis/nio/v3 v3.0.1 h1:6wxhnuppteMa6RHA4L81Dq7ThkZH8SwnDzXDYy95vB4=
github.com/djherbis/nio/v3 v3.0.1/go.mod h1:Ng4h80pbZFMla1yKzm61cF0tqqilXZYrogmWgZxOcmg=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
Expand Down

0 comments on commit 01bc1a4

Please sign in to comment.