Skip to content

Commit

Permalink
better logging and also buffered pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Feb 13, 2024
1 parent 93e0bd3 commit da1e2a6
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
11 changes: 10 additions & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/djherbis/buffer"
"github.com/djherbis/nio/v3"
"github.com/klauspost/compress/flate"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
Expand Down Expand Up @@ -190,11 +192,14 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(ctx context.Context, bucketName, key
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}

r, w := io.Pipe()
buf := buffer.New(32 * 1024 * 1024) // 32MB in memory Buffer
r, w := nio.Pipe(buf)

defer r.Close()
var writeOcfError error
var numRows int
var noPanic bool

go func() {
defer w.Close()
numRows, writeOcfError = p.WriteOCF(ctx, w)
Expand All @@ -206,16 +211,20 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(ctx context.Context, bucketName, key
Key: aws.String(key),
Body: r,
})

if err != nil {
s3Path := "s3://" + bucketName + "/" + key
logger.Error("failed to upload file: ", slog.Any("error", err), slog.Any("s3_path", s3Path))
return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err)
}

if !noPanic {
logger.Error("WriteOCF panicked while writing avro to S3", slog.Any("bucket", bucketName), slog.Any("key", key))
return nil, fmt.Errorf("WriteOCF panicked while writing avro to S3 %s/%s", bucketName, key)
}

if writeOcfError != nil {
logger.Error("failed to write records to OCF: ", slog.Any("error", writeOcfError))
return nil, writeOcfError
}

Expand Down
2 changes: 2 additions & 0 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 // indirect
github.com/aws/smithy-go v1.19.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/djherbis/buffer v1.2.0
github.com/djherbis/nio/v3 v3.0.1
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/goccy/go-json v0.10.2 // indirect
Expand Down
5 changes: 5 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ github.com/danieljoos/wincred v1.2.1/go.mod h1:uGaFL9fDn3OLTvzCGulzE+SzjEe5NGlh5
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/djherbis/buffer v1.1.0/go.mod h1:VwN8VdFkMY0DCALdY8o00d3IZ6Amz/UNVMWcSaJT44o=
github.com/djherbis/buffer v1.2.0 h1:PH5Dd2ss0C7CRRhQCZ2u7MssF+No9ide8Ye71nPHcrQ=
github.com/djherbis/buffer v1.2.0/go.mod h1:fjnebbZjCUpPinBRD+TDwXSOeNQ7fPQWLfGQqiAiUyE=
github.com/djherbis/nio/v3 v3.0.1 h1:6wxhnuppteMa6RHA4L81Dq7ThkZH8SwnDzXDYy95vB4=
github.com/djherbis/nio/v3 v3.0.1/go.mod h1:Ng4h80pbZFMla1yKzm61cF0tqqilXZYrogmWgZxOcmg=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
Expand Down

0 comments on commit da1e2a6

Please sign in to comment.