diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 7d725fa1bc..8202d769fc 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -59,7 +59,8 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords( return 0, err } - numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, "17", flowJobName) + partitionID := util.RandomString(16) + numRecords, localFilePath, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName) if err != nil { return 0, err } @@ -192,14 +193,14 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( flowJobName string, ) (int, string, error) { var numRecords int - ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema) if s.config.StagingPath == "" { + ocfWriter := avro.NewPeerDBOCFWriterWithCompression(s.connector.ctx, stream, avroSchema) tmpDir, err := os.MkdirTemp("", "peerdb-avro") if err != nil { return 0, "", fmt.Errorf("failed to create temp dir: %w", err) } - localFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, partitionID) + localFilePath := fmt.Sprintf("%s/%s.avro.zst", tmpDir, partitionID) log.WithFields(log.Fields{ "flowName": flowJobName, "partitionID": partitionID, @@ -211,6 +212,7 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile( return numRecords, localFilePath, nil } else if strings.HasPrefix(s.config.StagingPath, "s3://") { + ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema) s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath) if err != nil { return 0, "", fmt.Errorf("failed to parse staging path: %w", err) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 29f45e929d..0b4cf09d7e 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -13,6 +13,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/klauspost/compress/zstd" "github.com/linkedin/goavro/v2" log "github.com/sirupsen/logrus" uber_atomic "go.uber.org/atomic" @@ -22,6 +23,8 @@ type PeerDBOCFWriter struct { ctx context.Context stream *model.QRecordStream avroSchema *model.QRecordAvroSchemaDefinition + compress bool + writer io.WriteCloser } func NewPeerDBOCFWriter( @@ -33,17 +36,50 @@ func NewPeerDBOCFWriter( ctx: ctx, stream: stream, avroSchema: avroSchema, + compress: false, } } +func NewPeerDBOCFWriterWithCompression( + ctx context.Context, + stream *model.QRecordStream, + avroSchema *model.QRecordAvroSchemaDefinition, +) *PeerDBOCFWriter { + return &PeerDBOCFWriter{ + ctx: ctx, + stream: stream, + avroSchema: avroSchema, + compress: true, + } +} + +func (p *PeerDBOCFWriter) initWriteCloser(w io.Writer) error { + var err error + if p.compress { + p.writer, err = zstd.NewWriter(w) + if err != nil { + return fmt.Errorf("error while initializing zstd encoding writer: %w", err) + } + } else { + p.writer = &nopWriteCloser{w} + } + return nil +} + func (p *PeerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error) { + err := p.initWriteCloser(w) + if err != nil { + return nil, fmt.Errorf("failed to create compressed writer: %w", err) + } + ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{ - W: w, + W: p.writer, Schema: p.avroSchema.Schema, }) if err != nil { return nil, fmt.Errorf("failed to create OCF writer: %w", err) } + return ocfWriter, nil } @@ -107,6 +143,9 @@ func (p *PeerDBOCFWriter) WriteOCF(w io.Writer) (int, error) { if err != nil { return 0, fmt.Errorf("failed to create OCF writer: %w", err) } + // we have to keep a reference to the underlying writer as goavro doesn't provide any access to it + defer p.writer.Close() + numRows, err := p.writeRecordsToOCFWriter(ocfWriter) if err != nil { return 0, fmt.Errorf("failed to write records to OCF writer: %w", err) @@ -160,3 +199,14 @@ func (p *PeerDBOCFWriter) WriteRecordsToAvroFile(filePath string) (int, error) { defer file.Close() return p.WriteOCF(file) } + +type nopWriteCloser struct { + io.Writer +} + +func (n *nopWriteCloser) Close() error { + if closer, ok := n.Writer.(io.Closer); ok { + return closer.Close() + } + return nil +} diff --git a/flow/go.mod b/flow/go.mod index be495bf428..88150adf52 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -108,7 +108,7 @@ require ( github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.17.0 // indirect + github.com/klauspost/compress v1.17.1 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect diff --git a/flow/go.sum b/flow/go.sum index 0da445658a..b6a1ba5b98 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -1230,6 +1230,8 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g= +github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=