From 8f2b049d2ee8f508b89b377eff27ed3259ddf994 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 30 Apr 2021 15:47:53 +0530 Subject: [PATCH 1/3] Gzip compressed uploads --- pkg/redshiftbatcher/batch_processor.go | 15 ++++++++++++--- pkg/util/compress.go | 22 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) create mode 100644 pkg/util/compress.go diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index 910ee8157..2d2aae39f 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -17,6 +17,7 @@ import ( "github.com/practo/tipoca-stream/redshiftsink/pkg/transformer" "github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/debezium" "github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/masker" + "github.com/practo/tipoca-stream/redshiftsink/pkg/util" "github.com/spf13/viper" "path/filepath" "strings" @@ -194,7 +195,7 @@ func constructS3key( offset int64, ) string { s3FileName := fmt.Sprintf( - "%d_offset_%d_partition.json", + "%d_offset_%d_partition.json.gz", offset, partition, ) @@ -366,8 +367,16 @@ func (b *batchProcessor) processMessage( return bytesProcessed, fmt.Errorf( "Error marshalling message.Value, message: %+v", message) } - resp.bodyBuf.Write(messageValueBytes) - resp.bodyBuf.Write([]byte{'\n'}) + + err = util.GzipWrite(resp.bodyBuf, messageValueBytes) + if err != nil { + return bytesProcessed, fmt.Errorf("Compression error: %v", err) + } + err = util.GzipWrite(resp.bodyBuf, []byte{'\n'}) + if err != nil { + return bytesProcessed, fmt.Errorf("Compression error: %v", err) + } + bytesProcessed += message.Bytes if b.maskMessages && len(resp.maskSchema) == 0 { diff --git a/pkg/util/compress.go b/pkg/util/compress.go new file mode 100644 index 000000000..7a86f4c42 --- /dev/null +++ b/pkg/util/compress.go @@ -0,0 +1,22 @@ +package util + +import ( + "compress/gzip" + "io" +) + +// GzipWrite writes gzipped data bytes to the writer +func GzipWrite(w io.Writer, data []byte) error { + writer, err := gzip.NewWriterLevel(w, gzip.BestSpeed) + defer writer.Close() + if err != nil { + return err + } + + _, err = writer.Write(data) + if err != nil { + return err + } + + return nil +} From 646b605e2c042cbc568ccf19c3a1bda679dff3e3 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 30 Apr 2021 16:03:00 +0530 Subject: [PATCH 2/3] COPY JSON loads always expect compressed option https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-file-compression.html#copy-gzip https://stackoverflow.com/questions/27165109/how-to-load-gzipped-json-data-from-a-copy This is not backward compatible, need to take care of it still or make all moves then release this --- pkg/redshift/redshift.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/redshift/redshift.go b/pkg/redshift/redshift.go index 34c527221..2c9f7e5d1 100644 --- a/pkg/redshift/redshift.go +++ b/pkg/redshift/redshift.go @@ -830,7 +830,7 @@ func (r *Redshift) Copy(ctx context.Context, tx *sql.Tx, json := "" if typeJson == true { - json = "json 'auto'" + json = "json 'auto' gzip" } csv := "" From 93a3e9025c0179856812023c25fc2c92a970be05 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Sun, 2 May 2021 14:53:33 +0530 Subject: [PATCH 3/3] Compress only before upload, not every message Total perf improvement seen in upload is 10x --- pkg/redshiftbatcher/batch_processor.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index 2d2aae39f..76721c800 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -368,14 +368,8 @@ func (b *batchProcessor) processMessage( "Error marshalling message.Value, message: %+v", message) } - err = util.GzipWrite(resp.bodyBuf, messageValueBytes) - if err != nil { - return bytesProcessed, fmt.Errorf("Compression error: %v", err) - } - err = util.GzipWrite(resp.bodyBuf, []byte{'\n'}) - if err != nil { - return bytesProcessed, fmt.Errorf("Compression error: %v", err) - } + resp.bodyBuf.Write(messageValueBytes) + resp.bodyBuf.Write([]byte{'\n'}) bytesProcessed += message.Bytes @@ -402,7 +396,6 @@ func (b *batchProcessor) processMessages( msgBuf []*serializer.Message, resp *response, ) (int64, error) { - var totalBytesProcessed int64 for messageID, message := range msgBuf { select { @@ -448,7 +441,15 @@ func (b *batchProcessor) processBatch( klog.V(4).Infof("%s: batchId:%d, size:%d: uploading...", b.topic, resp.batchID, len(msgBuf), ) - err = b.s3sink.Upload(resp.s3Key, resp.bodyBuf) + + // Compress + gzBodyBuf := bytes.NewBuffer(make([]byte, 0, 4096)) + err = util.GzipWrite(gzBodyBuf, resp.bodyBuf.Bytes()) + if err != nil { + klog.Fatalf("Error in compressing, exitting, err:%v", err) + } + + err = b.s3sink.Upload(resp.s3Key, gzBodyBuf) if err != nil { resp.err = fmt.Errorf("Error writing to s3, err=%v", err) return @@ -458,6 +459,7 @@ func (b *batchProcessor) processBatch( b.topic, resp.batchID, resp.startOffset, resp.endOffset, ) resp.bodyBuf.Truncate(0) + gzBodyBuf.Truncate(0) resp.messagesProcessed = len(msgBuf) }