Skip to content

Commit

Permalink
Optimize Avro Streaming with zstd Compression for Snowflake (#527)
Browse files Browse the repository at this point in the history
- Introduced Zstandard (`zstd`) compression while streaming source rows,
reducing post-processing time.
- Generated unique `partitionID` with a random 16-character string.

This enhancement not only bypasses Snowflake's automatic gzip
compression but also ensures a more efficient data transfer by
compressing data on-the-fly. `zstd` is also faster that Snowflake's
default `gzip`.

See `AUTO_COMPRESS` and `SOURCE_COMPRESSION` here
https://docs.snowflake.com/en/sql-reference/sql/put for more details
  • Loading branch information
iskakaushik authored Oct 18, 2023
1 parent ddf0abc commit e6003de
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 5 deletions.
8 changes: 5 additions & 3 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords(
return 0, err
}

numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, "17", flowJobName)
partitionID := util.RandomString(16)
numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -192,14 +193,14 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile(
flowJobName string,
) (int, string, error) {
var numRecords int
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema)
if s.config.StagingPath == "" {
ocfWriter := avro.NewPeerDBOCFWriterWithCompression(s.connector.ctx, stream, avroSchema)
tmpDir, err := os.MkdirTemp("", "peerdb-avro")
if err != nil {
return 0, "", fmt.Errorf("failed to create temp dir: %w", err)
}

localFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, partitionID)
localFilePath := fmt.Sprintf("%s/%s.avro.zst", tmpDir, partitionID)
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partitionID,
Expand All @@ -211,6 +212,7 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile(

return numRecords, localFilePath, nil
} else if strings.HasPrefix(s.config.StagingPath, "s3://") {
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema)
s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath)
if err != nil {
return 0, "", fmt.Errorf("failed to parse staging path: %w", err)
Expand Down
52 changes: 51 additions & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/klauspost/compress/zstd"
"github.com/linkedin/goavro/v2"
log "github.com/sirupsen/logrus"
uber_atomic "go.uber.org/atomic"
Expand All @@ -22,6 +23,8 @@ type PeerDBOCFWriter struct {
ctx context.Context
stream *model.QRecordStream
avroSchema *model.QRecordAvroSchemaDefinition
compress bool
writer io.WriteCloser
}

func NewPeerDBOCFWriter(
Expand All @@ -33,17 +36,50 @@ func NewPeerDBOCFWriter(
ctx: ctx,
stream: stream,
avroSchema: avroSchema,
compress: false,
}
}

func NewPeerDBOCFWriterWithCompression(
ctx context.Context,
stream *model.QRecordStream,
avroSchema *model.QRecordAvroSchemaDefinition,
) *PeerDBOCFWriter {
return &PeerDBOCFWriter{
ctx: ctx,
stream: stream,
avroSchema: avroSchema,
compress: true,
}
}

func (p *PeerDBOCFWriter) initWriteCloser(w io.Writer) error {
var err error
if p.compress {
p.writer, err = zstd.NewWriter(w)
if err != nil {
return fmt.Errorf("error while initializing zstd encoding writer: %w", err)
}
} else {
p.writer = &nopWriteCloser{w}
}
return nil
}

func (p *PeerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error) {
err := p.initWriteCloser(w)
if err != nil {
return nil, fmt.Errorf("failed to create compressed writer: %w", err)
}

ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: w,
W: p.writer,
Schema: p.avroSchema.Schema,
})
if err != nil {
return nil, fmt.Errorf("failed to create OCF writer: %w", err)
}

return ocfWriter, nil
}

Expand Down Expand Up @@ -107,6 +143,9 @@ func (p *PeerDBOCFWriter) WriteOCF(w io.Writer) (int, error) {
if err != nil {
return 0, fmt.Errorf("failed to create OCF writer: %w", err)
}
// we have to keep a reference to the underlying writer as goavro doesn't provide any access to it
defer p.writer.Close()

numRows, err := p.writeRecordsToOCFWriter(ocfWriter)
if err != nil {
return 0, fmt.Errorf("failed to write records to OCF writer: %w", err)
Expand Down Expand Up @@ -160,3 +199,14 @@ func (p *PeerDBOCFWriter) WriteRecordsToAvroFile(filePath string) (int, error) {
defer file.Close()
return p.WriteOCF(file)
}

type nopWriteCloser struct {
io.Writer
}

func (n *nopWriteCloser) Close() error {
if closer, ok := n.Writer.(io.Closer); ok {
return closer.Close()
}
return nil
}
2 changes: 1 addition & 1 deletion flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ require (
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,8 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g=
github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
Expand Down

0 comments on commit e6003de

Please sign in to comment.