From 93a3e9025c0179856812023c25fc2c92a970be05 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Sun, 2 May 2021 14:53:33 +0530 Subject: [PATCH] Compress only before upload, not every message Total perf improvement seen in upload is 10x --- pkg/redshiftbatcher/batch_processor.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index 2d2aae39f..76721c800 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -368,14 +368,8 @@ func (b *batchProcessor) processMessage( "Error marshalling message.Value, message: %+v", message) } - err = util.GzipWrite(resp.bodyBuf, messageValueBytes) - if err != nil { - return bytesProcessed, fmt.Errorf("Compression error: %v", err) - } - err = util.GzipWrite(resp.bodyBuf, []byte{'\n'}) - if err != nil { - return bytesProcessed, fmt.Errorf("Compression error: %v", err) - } + resp.bodyBuf.Write(messageValueBytes) + resp.bodyBuf.Write([]byte{'\n'}) bytesProcessed += message.Bytes @@ -402,7 +396,6 @@ func (b *batchProcessor) processMessages( msgBuf []*serializer.Message, resp *response, ) (int64, error) { - var totalBytesProcessed int64 for messageID, message := range msgBuf { select { @@ -448,7 +441,15 @@ func (b *batchProcessor) processBatch( klog.V(4).Infof("%s: batchId:%d, size:%d: uploading...", b.topic, resp.batchID, len(msgBuf), ) - err = b.s3sink.Upload(resp.s3Key, resp.bodyBuf) + + // Compress + gzBodyBuf := bytes.NewBuffer(make([]byte, 0, 4096)) + err = util.GzipWrite(gzBodyBuf, resp.bodyBuf.Bytes()) + if err != nil { + klog.Fatalf("Error in compressing, exitting, err:%v", err) + } + + err = b.s3sink.Upload(resp.s3Key, gzBodyBuf) if err != nil { resp.err = fmt.Errorf("Error writing to s3, err=%v", err) return @@ -458,6 +459,7 @@ func (b *batchProcessor) processBatch( b.topic, resp.batchID, resp.startOffset, resp.endOffset, ) resp.bodyBuf.Truncate(0) + gzBodyBuf.Truncate(0) resp.messagesProcessed = len(msgBuf) }