From f7742b8e1ad7047ad540e444fbc727a2882f01af Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 17 Oct 2023 21:42:30 -0400 Subject: [PATCH 1/2] Optimize Avro Streaming with Zstandard Compression for Snowflake - Introduced Zstandard (zstd) compression while streaming source rows, reducing post-processing time. - Generated unique `partitionID` with a random 16-character string. This enhancement not only bypasses Snowflake's automatic gzip compression but also ensures a more efficient data transfer by compressing data on-the-fly. See `AUTO_COMPRESS` and `SOURCE_COMPRESSION` here https://docs.snowflake.com/en/sql-reference/sql/put for more details --- flow/connectors/snowflake/qrep_avro_sync.go | 8 ++-- flow/connectors/utils/avro/avro_writer.go | 42 ++++++++++++++++++++- flow/go.mod | 2 +- flow/go.sum | 2 + 4 files changed, 49 insertions(+), 5 deletions(-) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 7d725fa1bc..8202d769fc 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -59,7 +59,8 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords( return 0, err } - numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, "17", flowJobName) + partitionID := util.RandomString(16) + numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName) if err != nil { return 0, err } @@ -192,14 +193,14 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( flowJobName string, ) (int, string, error) { var numRecords int - ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema) if s.config.StagingPath == "" { + ocfWriter := avro.NewPeerDBOCFWriterWithCompression(s.connector.ctx, stream, avroSchema) tmpDir, err := os.MkdirTemp("", "peerdb-avro") if err != nil { return 0, "", fmt.Errorf("failed to create temp dir: %w", err) } - localFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, partitionID) + localFilePath := fmt.Sprintf("%s/%s.avro.zst", tmpDir, partitionID) log.WithFields(log.Fields{ "flowName": flowJobName, "partitionID": partitionID, @@ -211,6 +212,7 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( return numRecords, localFilePath, nil } else if strings.HasPrefix(s.config.StagingPath, "s3://") { + ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema) s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath) if err != nil { return 0, "", fmt.Errorf("failed to parse staging path: %w", err) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 29f45e929d..b892bc904c 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -13,6 +13,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/klauspost/compress/zstd" "github.com/linkedin/goavro/v2" log "github.com/sirupsen/logrus" uber_atomic "go.uber.org/atomic" @@ -22,6 +23,7 @@ type PeerDBOCFWriter struct { ctx context.Context stream *model.QRecordStream avroSchema *model.QRecordAvroSchemaDefinition + compress bool } func NewPeerDBOCFWriter( @@ -33,12 +35,39 @@ func NewPeerDBOCFWriter( ctx: ctx, stream: stream, avroSchema: avroSchema, + compress: false, } } +func NewPeerDBOCFWriterWithCompression( + ctx context.Context, + stream *model.QRecordStream, + avroSchema *model.QRecordAvroSchemaDefinition, +) *PeerDBOCFWriter { + return &PeerDBOCFWriter{ + ctx: ctx, + stream: stream, + avroSchema: avroSchema, + compress: true, + } +} + +func (p *PeerDBOCFWriter) getCompressedWriter(w io.Writer) (io.WriteCloser, error) { + if p.compress { + return zstd.NewWriter(w) + } + return &nopWriteCloser{w}, nil +} + func (p *PeerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error) { + compressedWriter, err := p.getCompressedWriter(w) + if err != nil { + return nil, fmt.Errorf("failed to create compressed writer: %w", err) + } + defer compressedWriter.Close() + ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{ - W: w, + W: compressedWriter, Schema: p.avroSchema.Schema, }) if err != nil { @@ -160,3 +189,14 @@ func (p *PeerDBOCFWriter) WriteRecordsToAvroFile(filePath string) (int, error) { defer file.Close() return p.WriteOCF(file) } + +type nopWriteCloser struct { + io.Writer +} + +func (n *nopWriteCloser) Close() error { + if closer, ok := n.Writer.(io.Closer); ok { + return closer.Close() + } + return nil +} diff --git a/flow/go.mod b/flow/go.mod index be495bf428..88150adf52 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -108,7 +108,7 @@ require ( github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.17.0 // indirect + github.com/klauspost/compress v1.17.1 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect diff --git a/flow/go.sum b/flow/go.sum index 0da445658a..b6a1ba5b98 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -1230,6 +1230,8 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g= +github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= From 8bd54ff23fd8ada42b9af558c8bc8232097499bf Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Wed, 18 Oct 2023 16:07:21 +0530 Subject: [PATCH 2/2] fixing flow tests pt.1 --- flow/connectors/utils/avro/avro_writer.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index b892bc904c..0b4cf09d7e 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -24,6 +24,7 @@ type PeerDBOCFWriter struct { stream *model.QRecordStream avroSchema *model.QRecordAvroSchemaDefinition compress bool + writer io.WriteCloser } func NewPeerDBOCFWriter( @@ -52,27 +53,33 @@ func NewPeerDBOCFWriterWithCompression( } } -func (p *PeerDBOCFWriter) getCompressedWriter(w io.Writer) (io.WriteCloser, error) { +func (p *PeerDBOCFWriter) initWriteCloser(w io.Writer) error { + var err error if p.compress { - return zstd.NewWriter(w) + p.writer, err = zstd.NewWriter(w) + if err != nil { + return fmt.Errorf("error while initializing zstd encoding writer: %w", err) + } + } else { + p.writer = &nopWriteCloser{w} } - return &nopWriteCloser{w}, nil + return nil } func (p *PeerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error) { - compressedWriter, err := p.getCompressedWriter(w) + err := p.initWriteCloser(w) if err != nil { return nil, fmt.Errorf("failed to create compressed writer: %w", err) } - defer compressedWriter.Close() ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{ - W: compressedWriter, + W: p.writer, Schema: p.avroSchema.Schema, }) if err != nil { return nil, fmt.Errorf("failed to create OCF writer: %w", err) } + return ocfWriter, nil } @@ -136,6 +143,9 @@ func (p *PeerDBOCFWriter) WriteOCF(w io.Writer) (int, error) { if err != nil { return 0, fmt.Errorf("failed to create OCF writer: %w", err) } + // we have to keep a reference to the underlying writer as goavro doesn't provide any access to it + defer p.writer.Close() + numRows, err := p.writeRecordsToOCFWriter(ocfWriter) if err != nil { return 0, fmt.Errorf("failed to write records to OCF writer: %w", err)