Skip to content

Commit

Permalink
Merge pull request #209 from practo/compression
Browse files Browse the repository at this point in the history
Compressed jsons uploads and downloads
  • Loading branch information
alok87 authored May 3, 2021
2 parents ddf528a + 93a3e90 commit c42a7c0
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pkg/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ func (r *Redshift) Copy(ctx context.Context, tx *sql.Tx,

json := ""
if typeJson == true {
json = "json 'auto'"
json = "json 'auto' gzip"
}

csv := ""
Expand Down
17 changes: 14 additions & 3 deletions pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/practo/tipoca-stream/redshiftsink/pkg/transformer"
"github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/debezium"
"github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/masker"
"github.com/practo/tipoca-stream/redshiftsink/pkg/util"
"github.com/spf13/viper"
"path/filepath"
"strings"
Expand Down Expand Up @@ -194,7 +195,7 @@ func constructS3key(
offset int64,
) string {
s3FileName := fmt.Sprintf(
"%d_offset_%d_partition.json",
"%d_offset_%d_partition.json.gz",
offset,
partition,
)
Expand Down Expand Up @@ -366,8 +367,10 @@ func (b *batchProcessor) processMessage(
return bytesProcessed, fmt.Errorf(
"Error marshalling message.Value, message: %+v", message)
}

resp.bodyBuf.Write(messageValueBytes)
resp.bodyBuf.Write([]byte{'\n'})

bytesProcessed += message.Bytes

if b.maskMessages && len(resp.maskSchema) == 0 {
Expand All @@ -393,7 +396,6 @@ func (b *batchProcessor) processMessages(
msgBuf []*serializer.Message,
resp *response,
) (int64, error) {

var totalBytesProcessed int64
for messageID, message := range msgBuf {
select {
Expand Down Expand Up @@ -439,7 +441,15 @@ func (b *batchProcessor) processBatch(
klog.V(4).Infof("%s: batchId:%d, size:%d: uploading...",
b.topic, resp.batchID, len(msgBuf),
)
err = b.s3sink.Upload(resp.s3Key, resp.bodyBuf)

// Compress
gzBodyBuf := bytes.NewBuffer(make([]byte, 0, 4096))
err = util.GzipWrite(gzBodyBuf, resp.bodyBuf.Bytes())
if err != nil {
klog.Fatalf("Error in compressing, exitting, err:%v", err)
}

err = b.s3sink.Upload(resp.s3Key, gzBodyBuf)
if err != nil {
resp.err = fmt.Errorf("Error writing to s3, err=%v", err)
return
Expand All @@ -449,6 +459,7 @@ func (b *batchProcessor) processBatch(
b.topic, resp.batchID, resp.startOffset, resp.endOffset,
)
resp.bodyBuf.Truncate(0)
gzBodyBuf.Truncate(0)
resp.messagesProcessed = len(msgBuf)
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/util/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package util

import (
"compress/gzip"
"io"
)

// GzipWrite writes gzipped data bytes to the writer
func GzipWrite(w io.Writer, data []byte) error {
writer, err := gzip.NewWriterLevel(w, gzip.BestSpeed)
defer writer.Close()
if err != nil {
return err
}

_, err = writer.Write(data)
if err != nil {
return err
}

return nil
}

0 comments on commit c42a7c0

Please sign in to comment.