Skip to content

Commit

Permalink
fix(kinesis): Limit Aggregated Records to 1MB (#169)
Browse files Browse the repository at this point in the history
* fix(kinesis): Limit Aggregated Records to 1MB

* docs: Add Comment
  • Loading branch information
jshlbrd authored May 3, 2024
1 parent b923598 commit 9065f1c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
49 changes: 49 additions & 0 deletions internal/aws/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type Aggregate struct {
Record *rec.AggregatedRecord
Count int
Size int
PartitionKey string
}

Expand All @@ -31,11 +32,49 @@ type Aggregate struct {
func (a *Aggregate) New() {
a.Record = &rec.AggregatedRecord{}
a.Count = 0
a.Size = 0

a.PartitionKey = ""
a.Record.PartitionKeyTable = make([]string, 0)
}

func varIntSize(i int) int {
if i == 0 {
return 1
}

var needed int
for i > 0 {
needed++
i >>= 1
}

bytes := needed / 7
if needed%7 > 0 {
bytes++
}

return bytes
}

func (a *Aggregate) calculateRecordSize(data []byte, partitionKey string) int {
var recordSize int
// https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L344-L349
pkSize := 1 + varIntSize(len(partitionKey)) + len(partitionKey)
recordSize += pkSize
// https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L362-L364
pkiSize := 1 + varIntSize(a.Count)
recordSize += pkiSize
// https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L371-L374
dataSize := 1 + varIntSize(len(data)) + len(data)
recordSize += dataSize
// https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L376-L378
recordSize = recordSize + 1 + varIntSize(pkiSize+dataSize)

// input record size + current aggregated record size + 4 byte magic header + 16 byte MD5 digest
return recordSize + a.Record.XXX_Size() + 20
}

// Add inserts a Kinesis record into an aggregated Kinesis record
// https://github.com/awslabs/kinesis-aggregation/blob/398fbd4b430d4bf590431b301d03cbbc94279cef/python/aws_kinesis_agg/aggregator.py#L382
func (a *Aggregate) Add(data []byte, partitionKey string) bool {
Expand All @@ -49,15 +88,25 @@ func (a *Aggregate) Add(data []byte, partitionKey string) bool {
a.PartitionKey = partitionKey
}

// Verify the record size won't exceed the 1 MB limit of the Kinesis service.
// https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
if a.calculateRecordSize(data, partitionKey) > 1024*1024 {
return false
}

pki := uint64(a.Count)
r := &rec.Record{
PartitionKeyIndex: &pki,
Data: data,
}

// Append the data to the aggregated record.
a.Record.Records = append(a.Record.Records, r)
a.Record.PartitionKeyTable = append(a.Record.PartitionKeyTable, partitionKey)

// Update the record count and size. This is not used in the aggregated record.
a.Count++
a.Size += a.calculateRecordSize(data, partitionKey)

return true
}
Expand Down
2 changes: 2 additions & 0 deletions transform/send_aws_kinesis_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ func (tf *sendAWSKinesisDataStream) aggregateRecords(partitionKey string, data [
records = append(records, agg.Get())

agg.New()

// This silently drops any data that is between ~0.9999 MB and 1 MB.
_ = agg.Add(d, partitionKey)
}

Expand Down

0 comments on commit 9065f1c

Please sign in to comment.