Skip to content

Commit

Permalink
use recover
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Feb 13, 2024
1 parent da1e2a6 commit c0e3f6f
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,16 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(ctx context.Context, bucketName, key
defer r.Close()
var writeOcfError error
var numRows int
var noPanic bool

go func() {
defer w.Close()
defer func() {
if r := recover(); r != nil {
writeOcfError = fmt.Errorf("panic occurred during WriteOCF: %v", r)
logger.Error("panic during WriteOCF", slog.Any("error", writeOcfError))
}
}()
numRows, writeOcfError = p.WriteOCF(ctx, w)
noPanic = true
}()

_, err = manager.NewUploader(s3svc).Upload(ctx, &s3.PutObjectInput{
Expand All @@ -218,11 +222,6 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(ctx context.Context, bucketName, key
return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err)
}

if !noPanic {
logger.Error("WriteOCF panicked while writing avro to S3", slog.Any("bucket", bucketName), slog.Any("key", key))
return nil, fmt.Errorf("WriteOCF panicked while writing avro to S3 %s/%s", bucketName, key)
}

if writeOcfError != nil {
logger.Error("failed to write records to OCF: ", slog.Any("error", writeOcfError))
return nil, writeOcfError
Expand Down

0 comments on commit c0e3f6f

Please sign in to comment.