diff --git a/pkg/accesslogs/processor_test.go b/pkg/accesslogs/processor_test.go index 6b8f6e4c..9f76e9e7 100644 --- a/pkg/accesslogs/processor_test.go +++ b/pkg/accesslogs/processor_test.go @@ -87,58 +87,81 @@ func TestProcessor(t *testing.T) { func TestProcessorWithShipment(t *testing.T) { t.Parallel() - ctx := testcontext.New(t) - defer ctx.Cleanup() - - log := zaptest.NewLogger(t) - defer ctx.Check(log.Sync) - - s := newInMemoryStorage() - p := NewProcessor(log, Options{ - DefaultShipmentLimit: 20 * memory.B, - }) - defer ctx.Check(p.Close) - - ctx.Go(p.Run) - - uuid1, err := uuid.New() - require.NoError(t, err) - uuid2, err := uuid.New() - require.NoError(t, err) - key1 := Key{ - PublicProjectID: uuid1, - Bucket: "bucket1", - Prefix: "prefix1", - } - key2 := Key{ - PublicProjectID: uuid2, - Bucket: "bucket2", - Prefix: "prefix2/", - } - entry1 := newTestEntry("entry1") - entry2 := newTestEntry("entry2") - - for i := 0; i < 10; i++ { - require.NoError(t, p.QueueEntry(s, key1, entry1)) - require.NoError(t, p.QueueEntry(s, key2, entry1)) - require.NoError(t, p.QueueEntry(s, key1, entry2)) - require.NoError(t, p.QueueEntry(s, key2, entry2)) + tests := []struct { + name string + shipmentLimit memory.Size + expectedShipments int + }{ + { + name: "small shipment limit", + shipmentLimit: 20 * memory.B, + expectedShipments: 10, + }, + { + name: "big shipment limit", + shipmentLimit: 64 * memory.MiB, + expectedShipments: 1, + }, } - - require.NoError(t, p.Close()) // sync, don't wait until the deferred call - - for _, bucket := range []string{key1.Bucket, key2.Bucket} { - buf := bytes.NewBuffer(nil) - - for _, v := range s.getBucketContents(bucket) { - buf.Write(v) - } - - bucketContents := buf.String() - require.Equal(t, 20, strings.Count(bucketContents, "\n")) - bucketContents = strings.Replace(bucketContents, entry1.String()+"\n", "", 10) - bucketContents = strings.Replace(bucketContents, entry2.String()+"\n", "", 10) - require.Empty(t, bucketContents) + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + log := zaptest.NewLogger(t) + defer ctx.Check(log.Sync) + + s := newInMemoryStorage() + p := NewProcessor(log, Options{ + DefaultShipmentLimit: tc.shipmentLimit, + }) + defer ctx.Check(p.Close) + + ctx.Go(p.Run) + + uuid1, err := uuid.New() + require.NoError(t, err) + uuid2, err := uuid.New() + require.NoError(t, err) + key1 := Key{ + PublicProjectID: uuid1, + Bucket: "bucket1", + Prefix: "prefix1", + } + key2 := Key{ + PublicProjectID: uuid2, + Bucket: "bucket2", + Prefix: "prefix2/", + } + entry1 := newTestEntry("entry1") + entry2 := newTestEntry("entry2") + + for i := 0; i < 10; i++ { + require.NoError(t, p.QueueEntry(s, key1, entry1)) + require.NoError(t, p.QueueEntry(s, key2, entry1)) + require.NoError(t, p.QueueEntry(s, key1, entry2)) + require.NoError(t, p.QueueEntry(s, key2, entry2)) + } + + require.NoError(t, p.Close()) // sync, don't wait until the deferred call + + for _, bucket := range []string{key1.Bucket, key2.Bucket} { + buf := bytes.NewBuffer(nil) + + require.Len(t, s.getBucketContents(bucket), tc.expectedShipments) + + for _, v := range s.getBucketContents(bucket) { + buf.Write(v) + } + + bucketContents := buf.String() + require.Equal(t, 20, strings.Count(bucketContents, "\n")) + bucketContents = strings.Replace(bucketContents, entry1.String()+"\n", "", 10) + bucketContents = strings.Replace(bucketContents, entry2.String()+"\n", "", 10) + require.Empty(t, bucketContents) + } + }) } }