Skip to content

Commit

Permalink
Compress only before upload, not every message
Browse files Browse the repository at this point in the history
Total perf improvement seen in upload is 10x
  • Loading branch information
alok87 committed May 2, 2021
1 parent 646b605 commit 93a3e90
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,8 @@ func (b *batchProcessor) processMessage(
"Error marshalling message.Value, message: %+v", message)
}

err = util.GzipWrite(resp.bodyBuf, messageValueBytes)
if err != nil {
return bytesProcessed, fmt.Errorf("Compression error: %v", err)
}
err = util.GzipWrite(resp.bodyBuf, []byte{'\n'})
if err != nil {
return bytesProcessed, fmt.Errorf("Compression error: %v", err)
}
resp.bodyBuf.Write(messageValueBytes)
resp.bodyBuf.Write([]byte{'\n'})

bytesProcessed += message.Bytes

Expand All @@ -402,7 +396,6 @@ func (b *batchProcessor) processMessages(
msgBuf []*serializer.Message,
resp *response,
) (int64, error) {

var totalBytesProcessed int64
for messageID, message := range msgBuf {
select {
Expand Down Expand Up @@ -448,7 +441,15 @@ func (b *batchProcessor) processBatch(
klog.V(4).Infof("%s: batchId:%d, size:%d: uploading...",
b.topic, resp.batchID, len(msgBuf),
)
err = b.s3sink.Upload(resp.s3Key, resp.bodyBuf)

// Compress
gzBodyBuf := bytes.NewBuffer(make([]byte, 0, 4096))
err = util.GzipWrite(gzBodyBuf, resp.bodyBuf.Bytes())
if err != nil {
klog.Fatalf("Error in compressing, exitting, err:%v", err)
}

err = b.s3sink.Upload(resp.s3Key, gzBodyBuf)
if err != nil {
resp.err = fmt.Errorf("Error writing to s3, err=%v", err)
return
Expand All @@ -458,6 +459,7 @@ func (b *batchProcessor) processBatch(
b.topic, resp.batchID, resp.startOffset, resp.endOffset,
)
resp.bodyBuf.Truncate(0)
gzBodyBuf.Truncate(0)
resp.messagesProcessed = len(msgBuf)
}

Expand Down

0 comments on commit 93a3e90

Please sign in to comment.