Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Avro Streaming with zstd Compression for Snowflake #527

Merged
merged 3 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords(
return 0, err
}

numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, "17", flowJobName)
partitionID := util.RandomString(16)
numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -192,14 +193,14 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile(
flowJobName string,
) (int, string, error) {
var numRecords int
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema)
if s.config.StagingPath == "" {
ocfWriter := avro.NewPeerDBOCFWriterWithCompression(s.connector.ctx, stream, avroSchema)
tmpDir, err := os.MkdirTemp("", "peerdb-avro")
if err != nil {
return 0, "", fmt.Errorf("failed to create temp dir: %w", err)
}

localFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, partitionID)
localFilePath := fmt.Sprintf("%s/%s.avro.zst", tmpDir, partitionID)
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partitionID,
Expand All @@ -211,6 +212,7 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile(

return numRecords, localFilePath, nil
} else if strings.HasPrefix(s.config.StagingPath, "s3://") {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema)
s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath)
if err != nil {
return 0, "", fmt.Errorf("failed to parse staging path: %w", err)
Expand Down
52 changes: 51 additions & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/klauspost/compress/zstd"
"github.com/linkedin/goavro/v2"
log "github.com/sirupsen/logrus"
uber_atomic "go.uber.org/atomic"
Expand All @@ -22,6 +23,8 @@ type PeerDBOCFWriter struct {
ctx context.Context
stream *model.QRecordStream
avroSchema *model.QRecordAvroSchemaDefinition
compress bool
writer io.WriteCloser
}

func NewPeerDBOCFWriter(
Expand All @@ -33,17 +36,50 @@ func NewPeerDBOCFWriter(
ctx: ctx,
stream: stream,
avroSchema: avroSchema,
compress: false,
}
}

func NewPeerDBOCFWriterWithCompression(
ctx context.Context,
stream *model.QRecordStream,
avroSchema *model.QRecordAvroSchemaDefinition,
) *PeerDBOCFWriter {
return &PeerDBOCFWriter{
ctx: ctx,
stream: stream,
avroSchema: avroSchema,
compress: true,
}
}

func (p *PeerDBOCFWriter) initWriteCloser(w io.Writer) error {
var err error
if p.compress {
p.writer, err = zstd.NewWriter(w)
if err != nil {
return fmt.Errorf("error while initializing zstd encoding writer: %w", err)
}
} else {
p.writer = &nopWriteCloser{w}
}
return nil
}

func (p *PeerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error) {
err := p.initWriteCloser(w)
if err != nil {
return nil, fmt.Errorf("failed to create compressed writer: %w", err)
}

ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: w,
W: p.writer,
Schema: p.avroSchema.Schema,
})
if err != nil {
return nil, fmt.Errorf("failed to create OCF writer: %w", err)
}

return ocfWriter, nil
}

Expand Down Expand Up @@ -107,6 +143,9 @@ func (p *PeerDBOCFWriter) WriteOCF(w io.Writer) (int, error) {
if err != nil {
return 0, fmt.Errorf("failed to create OCF writer: %w", err)
}
// we have to keep a reference to the underlying writer as goavro doesn't provide any access to it
defer p.writer.Close()

numRows, err := p.writeRecordsToOCFWriter(ocfWriter)
if err != nil {
return 0, fmt.Errorf("failed to write records to OCF writer: %w", err)
Expand Down Expand Up @@ -160,3 +199,14 @@ func (p *PeerDBOCFWriter) WriteRecordsToAvroFile(filePath string) (int, error) {
defer file.Close()
return p.WriteOCF(file)
}

type nopWriteCloser struct {
io.Writer
}

func (n *nopWriteCloser) Close() error {
if closer, ok := n.Writer.(io.Closer); ok {
return closer.Close()
}
return nil
}
2 changes: 1 addition & 1 deletion flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ require (
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,8 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g=
github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
Expand Down
Loading