From 8c7dc4774f5bae5619918521c6cea5e41f6ae106 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Wed, 11 Dec 2024 14:01:21 +0000 Subject: [PATCH] Switch the `redpanda_migrator` to the new ack mechanism Signed-off-by: Mihai Todor --- CHANGELOG.md | 6 +- .../components/pages/inputs/redpanda.adoc | 15 + .../pages/inputs/redpanda_common.adoc | 15 + .../pages/inputs/redpanda_migrator.adoc | 42 +- .../inputs/redpanda_migrator_offsets.adoc | 10 + .../pages/outputs/redpanda_migrator.adoc | 119 +---- internal/asyncroutine/periodic.go | 2 +- .../impl/kafka/enterprise/integration_test.go | 227 +++++---- .../kafka/enterprise/redpanda_common_input.go | 7 +- .../enterprise/redpanda_common_output.go | 2 +- .../enterprise/redpanda_migrator_input.go | 452 ++++-------------- .../redpanda_migrator_offsets_input.go | 2 +- .../enterprise/redpanda_migrator_output.go | 353 ++++++-------- internal/impl/kafka/franz_reader_ordered.go | 127 ++++- internal/impl/kafka/franz_writer.go | 43 +- internal/impl/kafka/input_redpanda.go | 7 +- internal/impl/kafka/output_kafka_franz.go | 2 +- internal/impl/kafka/output_redpanda.go | 2 +- internal/impl/ockam/output_kafka.go | 2 +- 19 files changed, 580 insertions(+), 855 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index adb85768c1..f95276872b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,17 +22,21 @@ All notable changes to this project will be documented in this file. - The `redpanda_migrator_bundle` output now skips schema ID translation when `translate_schema_ids: false` and `schema_registry` is configured. (@mihaitodor) - The `redpanda_migrator` output no longer rejects messages if it can't perform schema ID translation. (@mihaitodor) - The `redpanda_migrator` input no longer converts the kafka key to string. (@mihaitodor) -- Field `multi_header` for the `redpanda_migrator` input is now deprecated. (@mihaitodor) ### Added - New `redpanda_migrator_offsets` input. (@mihaitodor) - Fields `offset_topic`, `offset_group`, `offset_partition`, `offset_commit_timestamp` and `offset_metadata` added to the `redpanda_migrator_offsets` output. (@mihaitodor) - Fields `kafka_key` and `max_in_flight` for the `redpanda_migrator_offsets` output are now deprecated. (@mihaitodor) +- Fields `batching` for the `redpanda_migrator` output is now deprecated. (@mihaitodor) +- Field `topic_lag_refresh_period` added to the `redpanda` and `redpanda_common` inputs. (@mihaitodor) +- Metric `redpanda_lag` now emitted by the `redpanda` and `redpanda_common` inputs. (@mihaitodor) +- Metadata `kafka_lag` now emitted by the `redpanda` and `redpanda_common` inputs. (@mihaitodor) ### Changed - The `kafka_key` and `max_in_flight` fields of the `redpanda_migrator_offsets` output have been deprecated. +- Fields `batch_size` and `multi_header` for the `redpanda_migrator` input are now deprecated. (@mihaitodor) ## 4.43.0 - 2024-12-05 diff --git a/docs/modules/components/pages/inputs/redpanda.adoc b/docs/modules/components/pages/inputs/redpanda.adoc index 381400ead6..2b6673bb65 100644 --- a/docs/modules/components/pages/inputs/redpanda.adoc +++ b/docs/modules/components/pages/inputs/redpanda.adoc @@ -75,6 +75,7 @@ input: consumer_group: "" # No default (optional) commit_period: 5s partition_buffer_bytes: 1MB + topic_lag_refresh_period: 5s auto_replay_nacks: true ``` @@ -114,6 +115,10 @@ output: Records are processed and delivered from each partition in batches as received from brokers. These batch sizes are therefore dynamically sized in order to optimise throughput, but can be tuned with the config fields `fetch_max_partition_bytes` and `fetch_max_bytes`. Batches can be further broken down using the xref:components:processors/split.adoc[`split`] processor. +== Metrics + +Emits a `redpanda_lag` metric with `topic` and `partition` labels for each consumed topic. + == Metadata This input adds the following metadata fields to each message: @@ -123,6 +128,7 @@ This input adds the following metadata fields to each message: - kafka_topic - kafka_partition - kafka_offset +- kafka_lag - kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message @@ -635,6 +641,15 @@ A buffer size (in bytes) for each consumed partition, allowing records to be que *Default*: `"1MB"` +=== `topic_lag_refresh_period` + +The period of time between each topic lag refresh cycle. + + +*Type*: `string` + +*Default*: `"5s"` + === `auto_replay_nacks` Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. diff --git a/docs/modules/components/pages/inputs/redpanda_common.adoc b/docs/modules/components/pages/inputs/redpanda_common.adoc index ab0b6a912c..e93ea13687 100644 --- a/docs/modules/components/pages/inputs/redpanda_common.adoc +++ b/docs/modules/components/pages/inputs/redpanda_common.adoc @@ -63,6 +63,7 @@ input: consumer_group: "" # No default (optional) commit_period: 5s partition_buffer_bytes: 1MB + topic_lag_refresh_period: 5s auto_replay_nacks: true ``` @@ -100,6 +101,10 @@ output: Records are processed and delivered from each partition in batches as received from brokers. These batch sizes are therefore dynamically sized in order to optimise throughput, but can be tuned with the config fields `fetch_max_partition_bytes` and `fetch_max_bytes`. Batches can be further broken down using the xref:components:processors/split.adoc[`split`] processor. +== Metrics + +Emits a `redpanda_lag` metric with `topic` and `partition` labels for each consumed topic. + == Metadata This input adds the following metadata fields to each message: @@ -109,6 +114,7 @@ This input adds the following metadata fields to each message: - kafka_topic - kafka_partition - kafka_offset +- kafka_lag - kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message @@ -235,6 +241,15 @@ A buffer size (in bytes) for each consumed partition, allowing records to be que *Default*: `"1MB"` +=== `topic_lag_refresh_period` + +The period of time between each topic lag refresh cycle. + + +*Type*: `string` + +*Default*: `"5s"` + === `auto_replay_nacks` Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. diff --git a/docs/modules/components/pages/inputs/redpanda_migrator.adoc b/docs/modules/components/pages/inputs/redpanda_migrator.adoc index e127a2b19d..f5d24a1fc4 100644 --- a/docs/modules/components/pages/inputs/redpanda_migrator.adoc +++ b/docs/modules/components/pages/inputs/redpanda_migrator.adoc @@ -76,13 +76,12 @@ input: fetch_max_partition_bytes: 1MiB consumer_group: "" # No default (optional) commit_period: 5s - batch_size: 1024 - auto_replay_nacks: true + partition_buffer_bytes: 1MB topic_lag_refresh_period: 5s + auto_replay_nacks: true output_resource: redpanda_migrator_output replication_factor_override: true replication_factor: 3 - multi_header: false ``` -- @@ -94,7 +93,9 @@ This input should be used in combination with a `redpanda_migrator` output which When a consumer group is specified this input consumes one or more topics where partitions will automatically balance across any other connected clients with the same consumer group. When a consumer group is not specified topics can either be consumed in their entirety or with explicit partitions. -It attempts to create all selected topics it along with their associated ACLs in the broker that the `redpanda_migrator` output points to identified by the label specified in `output_resource`. +It attempts to create all selected topics along with their associated ACLs in the broker that the `redpanda_migrator` output points to identified by the label specified in `output_resource`. + +It provides the same delivery guarantees and ordering semantics as the `redpanda` input. == Metrics @@ -613,32 +614,32 @@ The period of time between each commit of the current partition offsets. Offsets *Default*: `"5s"` -=== `batch_size` +=== `partition_buffer_bytes` -The maximum number of messages that should be accumulated into each batch. +A buffer size (in bytes) for each consumed partition, allowing records to be queued internally before flushing. Increasing this may improve throughput at the cost of higher memory utilisation. Note that each buffer can grow slightly beyond this value. -*Type*: `int` +*Type*: `string` -*Default*: `1024` +*Default*: `"1MB"` -=== `auto_replay_nacks` +=== `topic_lag_refresh_period` -Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. +The period of time between each topic lag refresh cycle. -*Type*: `bool` +*Type*: `string` -*Default*: `true` +*Default*: `"5s"` -=== `topic_lag_refresh_period` +=== `auto_replay_nacks` -The period of time between each topic lag refresh cycle. +Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. -*Type*: `string` +*Type*: `bool` -*Default*: `"5s"` +*Default*: `true` === `output_resource` @@ -667,13 +668,4 @@ Replication factor for created topics. This is only used when `replication_facto *Default*: `3` -=== `multi_header` - -Decode headers into lists to allow handling of multiple values with the same key - - -*Type*: `bool` - -*Default*: `false` - diff --git a/docs/modules/components/pages/inputs/redpanda_migrator_offsets.adoc b/docs/modules/components/pages/inputs/redpanda_migrator_offsets.adoc index 4b5a7a6e4c..eb0fdfe5f3 100644 --- a/docs/modules/components/pages/inputs/redpanda_migrator_offsets.adoc +++ b/docs/modules/components/pages/inputs/redpanda_migrator_offsets.adoc @@ -76,6 +76,7 @@ input: consumer_group: "" # No default (optional) commit_period: 5s partition_buffer_bytes: 1MB + topic_lag_refresh_period: 5s auto_replay_nacks: true ``` @@ -585,6 +586,15 @@ A buffer size (in bytes) for each consumed partition, allowing records to be que *Default*: `"1MB"` +=== `topic_lag_refresh_period` + +The period of time between each topic lag refresh cycle. + + +*Type*: `string` + +*Default*: `"5s"` + === `auto_replay_nacks` Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. diff --git a/docs/modules/components/pages/outputs/redpanda_migrator.adoc b/docs/modules/components/pages/outputs/redpanda_migrator.adoc index 5966836055..fec8751fd5 100644 --- a/docs/modules/components/pages/outputs/redpanda_migrator.adoc +++ b/docs/modules/components/pages/outputs/redpanda_migrator.adoc @@ -46,12 +46,7 @@ output: metadata: include_prefixes: [] include_patterns: [] - max_in_flight: 10 - batching: - count: 0 - byte_size: 0 - period: "" - check: "" + max_in_flight: 256 ``` -- @@ -82,13 +77,7 @@ output: include_prefixes: [] include_patterns: [] timestamp_ms: ${! timestamp_unix_milli() } # No default (optional) - max_in_flight: 10 - batching: - count: 0 - byte_size: 0 - period: "" - check: "" - processors: [] # No default (optional) + max_in_flight: 256 input_resource: redpanda_migrator_input replication_factor_override: true replication_factor: 3 @@ -641,109 +630,7 @@ The maximum number of batches to be sending in parallel at any given time. *Type*: `int` -*Default*: `10` - -=== `batching` - -Allows you to configure a xref:configuration:batching.adoc[batching policy]. - - -*Type*: `object` - - -```yml -# Examples - -batching: - byte_size: 5000 - count: 0 - period: 1s - -batching: - count: 10 - period: 1s - -batching: - check: this.contains("END BATCH") - count: 0 - period: 1m -``` - -=== `batching.count` - -A number of messages at which the batch should be flushed. If `0` disables count based batching. - - -*Type*: `int` - -*Default*: `0` - -=== `batching.byte_size` - -An amount of bytes at which the batch should be flushed. If `0` disables size based batching. - - -*Type*: `int` - -*Default*: `0` - -=== `batching.period` - -A period in which an incomplete batch should be flushed regardless of its size. - - -*Type*: `string` - -*Default*: `""` - -```yml -# Examples - -period: 1s - -period: 1m - -period: 500ms -``` - -=== `batching.check` - -A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. - - -*Type*: `string` - -*Default*: `""` - -```yml -# Examples - -check: this.type == "end_of_transaction" -``` - -=== `batching.processors` - -A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. - - -*Type*: `array` - - -```yml -# Examples - -processors: - - archive: - format: concatenate - -processors: - - archive: - format: lines - -processors: - - archive: - format: json_array -``` +*Default*: `256` === `input_resource` diff --git a/internal/asyncroutine/periodic.go b/internal/asyncroutine/periodic.go index 53a922e5e0..1d278d85ee 100644 --- a/internal/asyncroutine/periodic.go +++ b/internal/asyncroutine/periodic.go @@ -54,7 +54,7 @@ func NewPeriodicWithContext(duration time.Duration, work func(context.Context)) // Start starts the `Periodic` work. // -// It does not do work immedately, only after the time has passed. +// It does not do work immediately, only after the time has passed. func (p *Periodic) Start() { if p.cancel != nil { return diff --git a/internal/impl/kafka/enterprise/integration_test.go b/internal/impl/kafka/enterprise/integration_test.go index 99ac43fd61..942a7be438 100644 --- a/internal/impl/kafka/enterprise/integration_test.go +++ b/internal/impl/kafka/enterprise/integration_test.go @@ -678,8 +678,9 @@ func TestRedpandaMigratorIntegration(t *testing.T) { createSchema(t, source.schemaRegistryURL, dummyTopic, fmt.Sprintf(`{"name":"%s", "type": "record", "fields":[{"name":"test", "type": "string"}]}`, dummyTopic), nil) // Produce one message - streamBuilder := service.NewStreamBuilder() - require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(` + { + streamBuilder := service.NewStreamBuilder() + require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(` pipeline: processors: - schema_registry_encode: @@ -691,31 +692,33 @@ output: seed_brokers: [ %s ] topic: %s `, source.schemaRegistryURL, dummyTopic, source.brokerAddr, dummyTopic))) - require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`)) - inFunc, err := streamBuilder.AddProducerFunc() - require.NoError(t, err) + inFunc, err := streamBuilder.AddProducerFunc() + require.NoError(t, err) - stream, err := streamBuilder.Build() - require.NoError(t, err) + stream, err := streamBuilder.Build() + require.NoError(t, err) - license.InjectTestService(stream.Resources()) + license.InjectTestService(stream.Resources()) - ctx, done := context.WithTimeout(context.Background(), 3*time.Second) - t.Cleanup(done) + ctx, done := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(done) - go func() { - require.NoError(t, inFunc(ctx, service.NewMessage([]byte(`{"test":"foobar"}`)))) + go func() { + require.NoError(t, inFunc(ctx, service.NewMessage([]byte(`{"test":"foobar"}`)))) - require.NoError(t, stream.StopWithin(1*time.Second)) - }() + require.NoError(t, stream.StopWithin(2*time.Second)) + }() - err = stream.Run(ctx) - require.NoError(t, err) + err = stream.Run(ctx) + require.NoError(t, err) + } // Run the Redpanda Migrator bundle - streamBuilder = service.NewStreamBuilder() - require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(` + { + streamBuilder := service.NewStreamBuilder() + require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(` input: redpanda_migrator_bundle: redpanda_migrator: @@ -727,6 +730,11 @@ input: replication_factor: -1 schema_registry: url: %s + processors: + - log: + message: meta ${! @ } + - log: + message: content ${! content() } output: redpanda_migrator_bundle: @@ -737,38 +745,40 @@ output: schema_registry: url: %s `, source.brokerAddr, dummyTopic, source.schemaRegistryURL, destination.brokerAddr, destination.schemaRegistryURL))) - // require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, streamBuilder.SetLoggerYAML(`level: INFO`)) - stream, err = streamBuilder.Build() - require.NoError(t, err) + stream, err := streamBuilder.Build() + require.NoError(t, err) - license.InjectTestService(stream.Resources()) + license.InjectTestService(stream.Resources()) - t.Log("Running migrator") + // Run stream in the background and shut it down when the test is finished + migratorCloseChan := make(chan struct{}) + go func() { + err = stream.Run(context.Background()) + require.NoError(t, err) - // Run stream in the background and shut it down when the test is finished - migratorCloseChan := make(chan struct{}) - go func() { - err = stream.Run(context.Background()) - require.NoError(t, err) + t.Log("Migrator shut down") - close(migratorCloseChan) - }() - t.Cleanup(func() { - require.NoError(t, stream.StopWithin(3*time.Second)) + close(migratorCloseChan) + }() + t.Cleanup(func() { + require.NoError(t, stream.StopWithin(3*time.Second)) - <-migratorCloseChan - }) + <-migratorCloseChan + }) + } + // Wait for Migrator to sync the existing message time.Sleep(10 * time.Second) - // Read the message using a consumer group + // Read the message from source using a consumer group dummyConsumerGroup := "test" - - streamBuilder = service.NewStreamBuilder() - require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(` + { + streamBuilder := service.NewStreamBuilder() + require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(` input: - kafka_franz: + redpanda: seed_brokers: [ %s ] topics: [ %s ] consumer_group: %s @@ -778,41 +788,44 @@ input: url: %s avro_raw_json: true `, source.brokerAddr, dummyTopic, dummyConsumerGroup, source.schemaRegistryURL))) - require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`)) - recvChan := make(chan struct{}, 1) - err = streamBuilder.AddConsumerFunc(func(ctx context.Context, m *service.Message) error { - b, err := m.AsBytes() - require.NoError(t, err) - assert.Equal(t, `{"test":"foobar"}`, string(b)) + recvChan := make(chan struct{}) + err = streamBuilder.AddConsumerFunc(func(ctx context.Context, m *service.Message) error { + b, err := m.AsBytes() + require.NoError(t, err) - close(recvChan) - return nil - }) - require.NoError(t, err) + assert.Equal(t, `{"test":"foobar"}`, string(b)) - stream, err = streamBuilder.Build() - require.NoError(t, err) + close(recvChan) + return nil + }) + require.NoError(t, err) - license.InjectTestService(stream.Resources()) + stream, err := streamBuilder.Build() + require.NoError(t, err) - ctx, done = context.WithTimeout(context.Background(), 10*time.Second) - t.Cleanup(done) + license.InjectTestService(stream.Resources()) - go func() { - <-recvChan + ctx, done := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(done) - require.NoError(t, stream.StopWithin(5*time.Second)) - }() + go func() { + require.NoError(t, stream.Run(ctx)) + }() - err = stream.Run(ctx) - require.NoError(t, err) + <-recvChan + require.NoError(t, stream.StopWithin(3*time.Second)) + } time.Sleep(10 * time.Second) + t.Log("Finished reading message foobar with CG") + // Wait for Migrator to sync the consumer group - // Produce one message - streamBuilder = service.NewStreamBuilder() - require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(` + // Produce one message in the source + { + streamBuilder := service.NewStreamBuilder() + require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(` pipeline: processors: - schema_registry_encode: @@ -824,74 +837,78 @@ output: seed_brokers: [ %s ] topic: %s `, source.schemaRegistryURL, dummyTopic, source.brokerAddr, dummyTopic))) - require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`)) - inFunc, err = streamBuilder.AddProducerFunc() - require.NoError(t, err) + inFunc, err := streamBuilder.AddProducerFunc() + require.NoError(t, err) - stream, err = streamBuilder.Build() - require.NoError(t, err) + stream, err := streamBuilder.Build() + require.NoError(t, err) - license.InjectTestService(stream.Resources()) + license.InjectTestService(stream.Resources()) - ctx, done = context.WithTimeout(context.Background(), 3*time.Second) - t.Cleanup(done) + ctx, done := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(done) - go func() { - require.NoError(t, inFunc(ctx, service.NewMessage([]byte(`{"test":"foobar"}`)))) + go func() { + require.NoError(t, inFunc(ctx, service.NewMessage([]byte(`{"test":"barfoo"}`)))) - require.NoError(t, stream.StopWithin(1*time.Second)) - }() + require.NoError(t, stream.StopWithin(3*time.Second)) + }() - err = stream.Run(ctx) - require.NoError(t, err) + require.NoError(t, stream.Run(ctx)) + } - t.Log("Produced message") - time.Sleep(10 * time.Second) //////////////////////////////// TODOOOOOOOOOOOOOOOOOOOO + // Wait for Migrator to sync consumer groups and the new message + time.Sleep(30 * time.Second) + t.Log("Finished producing barfoo in source") - // Read the message using a consumer group - streamBuilder = service.NewStreamBuilder() - require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(` + t.Log("Source broker: " + source.brokerAddr) + t.Log("Dest broker: " + destination.brokerAddr) + + // Read the new message from the destination using a consumer group + { + streamBuilder := service.NewStreamBuilder() + require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(` input: kafka_franz: seed_brokers: [ %s ] topics: [ %s ] consumer_group: %s - start_from_oldest: true + start_from_oldest: false processors: - schema_registry_decode: url: %s avro_raw_json: true -`, destination.brokerAddr, dummyTopic, dummyConsumerGroup, destination.schemaRegistryURL))) - require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`)) +`, source.brokerAddr, dummyTopic, dummyConsumerGroup, source.schemaRegistryURL))) + require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`)) - recvChan = make(chan struct{}) - err = streamBuilder.AddConsumerFunc(func(ctx context.Context, m *service.Message) error { - b, err := m.AsBytes() + recvChan := make(chan struct{}) + err = streamBuilder.AddConsumerFunc(func(ctx context.Context, m *service.Message) error { + b, err := m.AsBytes() + require.NoError(t, err) + assert.Equal(t, `{"test":"barfoo"}`, string(b)) + + close(recvChan) + return nil + }) require.NoError(t, err) - assert.Equal(t, `{"test":"foobar"}`, string(b)) - close(recvChan) - return nil - }) - require.NoError(t, err) + stream, err := streamBuilder.Build() + require.NoError(t, err) - stream, err = streamBuilder.Build() - require.NoError(t, err) + license.InjectTestService(stream.Resources()) - license.InjectTestService(stream.Resources()) + ctx, done := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(done) - ctx, done = context.WithTimeout(context.Background(), 5*time.Second) - t.Cleanup(done) + go func() { + require.NoError(t, stream.Run(ctx)) + }() - go func() { <-recvChan - require.NoError(t, stream.StopWithin(3*time.Second)) - }() - err = stream.Run(ctx) - require.NoError(t, err) - - t.Log("Read message") + t.Log("Read message") + } } diff --git a/internal/impl/kafka/enterprise/redpanda_common_input.go b/internal/impl/kafka/enterprise/redpanda_common_input.go index a6d062f6a6..733a444172 100644 --- a/internal/impl/kafka/enterprise/redpanda_common_input.go +++ b/internal/impl/kafka/enterprise/redpanda_common_input.go @@ -65,6 +65,10 @@ output: Records are processed and delivered from each partition in batches as received from brokers. These batch sizes are therefore dynamically sized in order to optimise throughput, but can be tuned with the config fields ` + "`fetch_max_partition_bytes` and `fetch_max_bytes`" + `. Batches can be further broken down using the ` + "xref:components:processors/split.adoc[`split`] processor" + `. +== Metrics + +Emits a ` + "`redpanda_lag`" + ` metric with ` + "`topic`" + ` and ` + "`partition`" + ` labels for each consumed topic. + == Metadata This input adds the following metadata fields to each message: @@ -74,6 +78,7 @@ This input adds the following metadata fields to each message: - kafka_topic - kafka_partition - kafka_offset +- kafka_lag - kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message @@ -119,7 +124,7 @@ func init() { time.Sleep(time.Millisecond * 100) } return - }, nil) + }, nil, nil, nil) if err != nil { return nil, err } diff --git a/internal/impl/kafka/enterprise/redpanda_common_output.go b/internal/impl/kafka/enterprise/redpanda_common_output.go index 8b3fdcc109..488cd907f3 100644 --- a/internal/impl/kafka/enterprise/redpanda_common_output.go +++ b/internal/impl/kafka/enterprise/redpanda_common_output.go @@ -81,7 +81,7 @@ func init() { } output, err = kafka.NewFranzWriterFromConfig(conf, func(fn kafka.FranzSharedClientUseFn) error { return kafka.FranzSharedClientUse(sharedGlobalRedpandaClientKey, mgr, fn) - }, func(context.Context) error { return nil }) + }, func(context.Context) error { return nil }, nil, nil) return }) if err != nil { diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_input.go b/internal/impl/kafka/enterprise/redpanda_migrator_input.go index 462b1f4fb4..3ca4666fa6 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_input.go +++ b/internal/impl/kafka/enterprise/redpanda_migrator_input.go @@ -10,16 +10,9 @@ package enterprise import ( "context" - "errors" - "fmt" - "regexp" "slices" - "strconv" - "sync" "time" - "github.com/Jeffail/shutdown" - "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/redpanda-data/benthos/v4/public/service" @@ -29,16 +22,13 @@ import ( ) const ( - rmiFieldConsumerGroup = "consumer_group" - rmiFieldCommitPeriod = "commit_period" - rmiFieldBatchSize = "batch_size" - rmiFieldTopicLagRefreshPeriod = "topic_lag_refresh_period" rmiFieldOutputResource = "output_resource" rmiFieldReplicationFactorOverride = "replication_factor_override" rmiFieldReplicationFactor = "replication_factor" // Deprecated rmiFieldMultiHeader = "multi_header" + rmiFieldBatchSize = "batch_size" rmiResourceDefaultLabel = "redpanda_migrator_input" ) @@ -56,7 +46,9 @@ This input should be used in combination with a ` + "`redpanda_migrator`" + ` ou When a consumer group is specified this input consumes one or more topics where partitions will automatically balance across any other connected clients with the same consumer group. When a consumer group is not specified topics can either be consumed in their entirety or with explicit partitions. -It attempts to create all selected topics it along with their associated ACLs in the broker that the ` + "`redpanda_migrator`" + ` output points to identified by the label specified in ` + "`output_resource`" + `. +It attempts to create all selected topics along with their associated ACLs in the broker that the ` + "`redpanda_migrator`" + ` output points to identified by the label specified in ` + "`output_resource`" + `. + +It provides the same delivery guarantees and ordering semantics as the ` + "`redpanda`" + ` input. == Metrics @@ -78,7 +70,7 @@ This input adds the following metadata fields to each message: - All record headers ` + "```" + ` `). - Fields(RedpandaMigratorInputConfigFields()...). + Fields(redpandaMigratorInputConfigFields()...). LintRule(` let has_topic_partitions = this.topics.any(t -> t.contains(":")) root = if $has_topic_partitions { @@ -95,29 +87,13 @@ root = if $has_topic_partitions { `) } -// RedpandaMigratorInputConfigFields returns the full suite of config fields for a `redpanda_migrator` input using the -// franz-go client library. -func RedpandaMigratorInputConfigFields() []*service.ConfigField { +func redpandaMigratorInputConfigFields() []*service.ConfigField { return slices.Concat( kafka.FranzConnectionFields(), kafka.FranzConsumerFields(), + kafka.FranzReaderOrderedConfigFields(), []*service.ConfigField{ - service.NewStringField(rmiFieldConsumerGroup). - Description("An optional consumer group to consume as. When specified the partitions of specified topics are automatically distributed across consumers sharing a consumer group, and partition offsets are automatically committed and resumed under this name. Consumer groups are not supported when specifying explicit partitions to consume from in the `topics` field."). - Optional(), - service.NewDurationField(rmiFieldCommitPeriod). - Description("The period of time between each commit of the current partition offsets. Offsets are always committed during shutdown."). - Default("5s"). - Advanced(), - service.NewIntField(rmiFieldBatchSize). - Description("The maximum number of messages that should be accumulated into each batch."). - Default(1024). - Advanced(), service.NewAutoRetryNacksToggleField(), - service.NewDurationField(rmiFieldTopicLagRefreshPeriod). - Description("The period of time between each topic lag refresh cycle."). - Default("5s"). - Advanced(), service.NewStringField(rmiFieldOutputResource). Description("The label of the redpanda_migrator output in which the currently selected topics need to be created before attempting to read messages."). Default(rmoResourceDefaultLabel). @@ -135,7 +111,13 @@ func RedpandaMigratorInputConfigFields() []*service.ConfigField { service.NewBoolField(rmiFieldMultiHeader). Description("Decode headers into lists to allow handling of multiple values with the same key"). Default(false). - Advanced(), + Advanced(). + Deprecated(), + service.NewIntField(rmiFieldBatchSize). + Description("The maximum number of messages that should be accumulated into each batch."). + Default(1024). + Advanced(). + Deprecated(), }, ) } @@ -147,358 +129,100 @@ func init() { return nil, err } - rdr, err := NewRedpandaMigratorReaderFromConfig(conf, mgr) + tmpOpts, err := kafka.FranzConnectionOptsFromConfig(conf, mgr.Logger()) if err != nil { return nil, err } - return service.AutoRetryNacksBatchedToggled(conf, rdr) - }) - if err != nil { - panic(err) - } -} - -//------------------------------------------------------------------------------ - -// RedpandaMigratorReader implements a kafka reader using the franz-go library. -type RedpandaMigratorReader struct { - clientDetails *kafka.FranzConnectionDetails - consumerDetails *kafka.FranzConsumerDetails - - clientLabel string - - topicPatterns []*regexp.Regexp - - consumerGroup string - commitPeriod time.Duration - batchSize int - topicLagRefreshPeriod time.Duration - outputResource string - replicationFactorOverride bool - replicationFactor int - - connMut sync.Mutex - readMut sync.Mutex - client *kgo.Client - topicLagGauge *service.MetricGauge - topicLagCache sync.Map - outputTopicsCreated bool + clientOpts := append([]kgo.Opt{}, tmpOpts...) - mgr *service.Resources - shutSig *shutdown.Signaller -} - -// NewRedpandaMigratorReaderFromConfig attempts to instantiate a new RedpandaMigratorReader -// from a parsed config. -func NewRedpandaMigratorReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*RedpandaMigratorReader, error) { - r := RedpandaMigratorReader{ - mgr: mgr, - shutSig: shutdown.NewSignaller(), - topicLagGauge: mgr.Metrics().NewGauge("input_redpanda_migrator_lag", "topic", "partition"), - } - - var err error - - if r.clientDetails, err = kafka.FranzConnectionDetailsFromConfig(conf, mgr.Logger()); err != nil { - return nil, err - } - if r.consumerDetails, err = kafka.FranzConsumerDetailsFromConfig(conf); err != nil { - return nil, err - } - - if r.consumerDetails.RegexPattern { - r.topicPatterns = make([]*regexp.Regexp, 0, len(r.consumerDetails.Topics)) - for _, topic := range r.consumerDetails.Topics { - tp, err := regexp.Compile(topic) - if err != nil { - return nil, fmt.Errorf("failed to compile topic regex %q: %s", topic, err) - } - r.topicPatterns = append(r.topicPatterns, tp) - } - } - - if conf.Contains(rmiFieldConsumerGroup) { - if r.consumerGroup, err = conf.FieldString(rmiFieldConsumerGroup); err != nil { - return nil, err - } - } - - if r.batchSize, err = conf.FieldInt(rmiFieldBatchSize); err != nil { - return nil, err - } - - if r.commitPeriod, err = conf.FieldDuration(rmiFieldCommitPeriod); err != nil { - return nil, err - } - - if r.topicLagRefreshPeriod, err = conf.FieldDuration(rmiFieldTopicLagRefreshPeriod); err != nil { - return nil, err - } - - if r.replicationFactorOverride, err = conf.FieldBool(rmiFieldReplicationFactorOverride); err != nil { - return nil, err - } - - if r.replicationFactor, err = conf.FieldInt(rmiFieldReplicationFactor); err != nil { - return nil, err - } - - if r.outputResource, err = conf.FieldString(rmiFieldOutputResource); err != nil { - return nil, err - } - - if r.clientLabel = mgr.Label(); r.clientLabel == "" { - r.clientLabel = rmiResourceDefaultLabel - } - - return &r, nil -} - -func (r *RedpandaMigratorReader) recordToMessage(record *kgo.Record) *service.Message { - msg := kafka.FranzRecordToMessageV1(record) - - lag := int64(0) - if val, ok := r.topicLagCache.Load(fmt.Sprintf("%s_%d", record.Topic, record.Partition)); ok { - lag = val.(int64) - } - msg.MetaSetMut("kafka_lag", lag) - - // The record lives on for checkpointing, but we don't need the contents - // going forward so discard these. This looked fine to me but could - // potentially be a source of problems so treat this as sus. - record.Key = nil - record.Value = nil - - return msg -} - -//------------------------------------------------------------------------------ - -// Connect to the kafka seed brokers. -func (r *RedpandaMigratorReader) Connect(ctx context.Context) error { - r.connMut.Lock() - defer r.connMut.Unlock() - - if r.client != nil { - return nil - } - - if r.shutSig.IsSoftStopSignalled() { - r.shutSig.TriggerHasStopped() - return service.ErrEndOfInput - } - - clientOpts := append([]kgo.Opt{}, r.clientDetails.FranzOpts()...) - clientOpts = append(clientOpts, r.consumerDetails.FranzOpts()...) - if r.consumerGroup != "" { - clientOpts = append(clientOpts, - // TODO: Do we need to do anything in `kgo.OnPartitionsRevoked()` / `kgo.OnPartitionsLost()` - kgo.ConsumerGroup(r.consumerGroup), - kgo.AutoCommitMarks(), - kgo.BlockRebalanceOnPoll(), - kgo.AutoCommitInterval(r.commitPeriod), - kgo.WithLogger(&kafka.KGoLogger{L: r.mgr.Logger()}), - ) - } - - var err error - if r.client, err = kgo.NewClient(clientOpts...); err != nil { - return err - } - - // Check connectivity to cluster - if err = r.client.Ping(ctx); err != nil { - return fmt.Errorf("failed to connect to cluster: %s", err) - } - - if err = kafka.FranzSharedClientSet(r.clientLabel, &kafka.FranzSharedClientInfo{ - Client: r.client, - ConnDetails: r.clientDetails, - }, r.mgr); err != nil { - r.mgr.Logger().With("error", err).Warn("Failed to store client connection for sharing") - } - - go func() { - closeCtx, done := r.shutSig.SoftStopCtx(context.Background()) - defer done() - - adminClient := kadm.NewClient(r.client) - - for { - ctx, done = context.WithTimeout(closeCtx, r.topicLagRefreshPeriod) - var lags kadm.DescribedGroupLags - var err error - if lags, err = adminClient.Lag(ctx, r.consumerGroup); err != nil { - r.mgr.Logger().Errorf("Failed to fetch group lags: %s", err) + if tmpOpts, err = kafka.FranzConsumerOptsFromConfig(conf); err != nil { + return nil, err } - done() - - lags.Each(func(gl kadm.DescribedGroupLag) { - for _, gl := range gl.Lag { - for _, pl := range gl { - lag := pl.Lag - if lag < 0 { - lag = 0 - } - - r.topicLagGauge.Set(lag, pl.Topic, strconv.Itoa(int(pl.Partition))) - r.topicLagCache.Store(fmt.Sprintf("%s_%d", pl.Topic, pl.Partition), lag) - } - } - }) + clientOpts = append(clientOpts, tmpOpts...) - select { - case <-r.shutSig.SoftStopChan(): - return - case <-time.After(r.topicLagRefreshPeriod): + replicationFactorOverride, err := conf.FieldBool(rmiFieldReplicationFactorOverride) + if err != nil { + return nil, err } - } - }() - return nil -} - -// ReadBatch attempts to extract a batch of messages from the target topics. -func (r *RedpandaMigratorReader) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { - r.connMut.Lock() - defer r.connMut.Unlock() - - r.readMut.Lock() - defer r.readMut.Unlock() - - if r.client == nil { - return nil, nil, service.ErrNotConnected - } - - // TODO: Is there a way to wait a while until we actually get f.batchSize messages instead of returning as many as - // we have right now? Otherwise, maybe switch back to `PollFetches()` and have `batch_byte_size` and `batch_period` - // via `FetchMinBytes`, `FetchMaxBytes` and `FetchMaxWait()`? - - // TODO: Looks like when using `regexp_topics: true`, franz-go takes over a minute to discover topics which were - // created after `PollRecords()` was called for the first time. Might need to adjust the timeout for the internal - // topic cache. - fetches := r.client.PollRecords(ctx, r.batchSize) - if errs := fetches.Errors(); len(errs) > 0 { - // Any non-temporal error sets this true and we close the client - // forcing a reconnect. - nonTemporalErr := false - - for _, kerr := range errs { - // TODO: The documentation from franz-go is top-tier, it - // should be straight forward to expand this to include more - // errors that are safe to disregard. - if errors.Is(kerr.Err, context.DeadlineExceeded) || - errors.Is(kerr.Err, context.Canceled) { - continue + replicationFactor, err := conf.FieldInt(rmiFieldReplicationFactor) + if err != nil { + return nil, err } - nonTemporalErr = true - - if !errors.Is(kerr.Err, kgo.ErrClientClosed) { - r.mgr.Logger().Errorf("Kafka poll error on topic %v, partition %v: %v", kerr.Topic, kerr.Partition, kerr.Err) + outputResource, err := conf.FieldString(rmiFieldOutputResource) + if err != nil { + return nil, err } - } - - if nonTemporalErr { - r.client.Close() - r.client = nil - return nil, nil, service.ErrNotConnected - } - } - // TODO: Is there a way to get the actual selected topics instead of all of them? - topics := r.client.GetConsumeTopics() - if r.consumerDetails.RegexPattern { - topics = slices.DeleteFunc(topics, func(topic string) bool { - for _, tp := range r.topicPatterns { - if tp.MatchString(topic) { - return false - } + clientLabel := mgr.Label() + if clientLabel == "" { + clientLabel = rmiResourceDefaultLabel } - return true - }) - } - if len(topics) > 0 { - r.mgr.Logger().Debugf("Consuming from topics: %s", topics) - } else if r.consumerDetails.RegexPattern { - r.mgr.Logger().Warn("No matching topics found") - } + rdr, err := kafka.NewFranzReaderOrderedFromConfig(conf, mgr, + func() ([]kgo.Opt, error) { + return clientOpts, nil + }, + nil, + func(ctx context.Context, res *service.Resources, client *kgo.Client) { + if err = kafka.FranzSharedClientSet(clientLabel, &kafka.FranzSharedClientInfo{ + Client: client, + }, res); err != nil { + res.Logger().With("error", err).Warn("Failed to store client connection for sharing") + } - if !r.outputTopicsCreated { - if err := kafka.FranzSharedClientUse(r.outputResource, r.mgr, func(details *kafka.FranzSharedClientInfo) error { - for _, topic := range topics { - if err := createTopic(ctx, topic, r.replicationFactorOverride, r.replicationFactor, r.client, details.Client); err != nil && err != errTopicAlreadyExists { - // We could end up attempting to create a topic which doesn't have any messages in it, so if that - // fails, we can just log an error and carry on. If it does contain messages, the output will - // attempt to create it again anyway and will trigger and error if it can't. - // The output `topicCache` could be populated here to avoid the redundant call to create topics, but - // it's not worth the complexity. - r.mgr.Logger().Errorf("Failed to create topic %q and ACLs: %s", topic, err) - } else { - if err == errTopicAlreadyExists { - r.mgr.Logger().Debugf("Topic %q already exists", topic) + topics := client.GetConsumeTopics() + if len(topics) > 0 { + mgr.Logger().Debugf("Consuming from topics: %s", topics) } else { - r.mgr.Logger().Infof("Created topic %q in output cluster", topic) + mgr.Logger().Warn("No matching topics found") + return } - if err := createACLs(ctx, topic, r.client, details.Client); err != nil { - r.mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", topic, err) + + // Make multiple attempts until the output connects in the background. + // TODO: It would be nicer to somehow get notified when the output is ready. + for { + if err = kafka.FranzSharedClientUse(outputResource, res, func(details *kafka.FranzSharedClientInfo) error { + for _, topic := range topics { + if err := createTopic(ctx, topic, replicationFactorOverride, replicationFactor, client, details.Client); err != nil && err != errTopicAlreadyExists { + // We could end up attempting to create a topic which doesn't have any messages in it, so if that + // fails, we can just log an error and carry on. If it does contain messages, the output will + // attempt to create it again anyway and will trigger and error if it can't. + // The output `topicCache` could be populated here to avoid the redundant call to create topics, but + // it's not worth the complexity. + mgr.Logger().Errorf("Failed to create topic %q and ACLs: %s", topic, err) + } else { + if err == errTopicAlreadyExists { + mgr.Logger().Debugf("Topic %q already exists", topic) + } else { + mgr.Logger().Infof("Created topic %q in output cluster", topic) + } + if err := createACLs(ctx, topic, client, details.Client); err != nil { + mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", topic, err) + } + } + } + return nil + }); err == nil { + break + } + + time.Sleep(time.Millisecond * 100) } - } + }, + func(res *service.Resources) { + _, _ = kafka.FranzSharedClientPop(clientLabel, res) + }) + if err != nil { + return nil, err } - r.outputTopicsCreated = true - return nil - }); err != nil { - r.mgr.Logger().With("error", err, "resource", r.outputResource).Warn("Failed to access shared client for given resource identifier") - } + return service.AutoRetryNacksBatchedToggled(conf, rdr) + }) + if err != nil { + panic(err) } - - resBatch := make(service.MessageBatch, 0, fetches.NumRecords()) - fetches.EachRecord(func(rec *kgo.Record) { - resBatch = append(resBatch, r.recordToMessage(rec)) - }) - - return resBatch, func(ctx context.Context, res error) error { - r.readMut.Lock() - defer r.readMut.Unlock() - - // TODO: What should happen when `auto_replay_nacks: false` and a batch gets rejected followed by another one - // which gets acked? - // Also see "Res will always be nil because we initialize with service.AutoRetryNacks" comment in - // `input_kafka_franz.go` - if res != nil { - return res - } - - r.client.MarkCommitRecords(fetches.Records()...) - r.client.AllowRebalance() - - return nil - }, nil } -// Close underlying connections. -func (r *RedpandaMigratorReader) Close(ctx context.Context) error { - r.connMut.Lock() - defer r.connMut.Unlock() - - go func() { - r.shutSig.TriggerSoftStop() - if r.client != nil { - _, _ = kafka.FranzSharedClientPop(r.clientLabel, r.mgr) - - r.client.Close() - r.client = nil - - r.shutSig.TriggerHasStopped() - } - }() - - select { - case <-r.shutSig.HasStoppedChan(): - case <-ctx.Done(): - return ctx.Err() - } - return nil -} +//------------------------------------------------------------------------------ diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_offsets_input.go b/internal/impl/kafka/enterprise/redpanda_migrator_offsets_input.go index 81da522e35..79aa7ccc9d 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_offsets_input.go +++ b/internal/impl/kafka/enterprise/redpanda_migrator_offsets_input.go @@ -194,7 +194,7 @@ func init() { msg.MetaSetMut("kafka_offset_metadata", offsetCommitValue.Metadata) return msg, nil - }) + }, nil, nil) if err != nil { return nil, err } diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_output.go b/internal/impl/kafka/enterprise/redpanda_migrator_output.go index cf4a5ae53c..5e2bb08a7c 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_output.go +++ b/internal/impl/kafka/enterprise/redpanda_migrator_output.go @@ -60,7 +60,7 @@ ACL migration adheres to the following principles: - `+"`ALLOW ALL`"+` ACLs for topics are downgraded to `+"`ALLOW READ`"+` - Only topic ACLs are migrated, group ACLs are not migrated `). - Fields(RedpandaMigratorOutputConfigFields()...). + Fields(redpandaMigratorOutputConfigFields()...). LintRule(kafka.FranzWriterConfigLints()). Example("Transfer data", "Writes messages to the configured broker and creates topics and topic ACLs if they don't exist. It also ensures that the message order is preserved.", ` output: @@ -76,17 +76,14 @@ output: `) } -// RedpandaMigratorOutputConfigFields returns the full suite of config fields for a `redpanda_migrator` output using -// the franz-go client library. -func RedpandaMigratorOutputConfigFields() []*service.ConfigField { +func redpandaMigratorOutputConfigFields() []*service.ConfigField { return slices.Concat( kafka.FranzConnectionFields(), kafka.FranzWriterConfigFields(), []*service.ConfigField{ service.NewIntField(rmoFieldMaxInFlight). Description("The maximum number of batches to be sending in parallel at any given time."). - Default(10), - service.NewBatchPolicyField(rmoFieldBatching), + Default(256), service.NewStringField(rmoFieldInputResource). Description("The label of the redpanda_migrator input from which to read the configurations for topics and ACLs which need to be created."). Default(rmiResourceDefaultLabel). @@ -107,6 +104,7 @@ func RedpandaMigratorOutputConfigFields() []*service.ConfigField { // Deprecated service.NewStringField(rmoFieldRackID).Deprecated(), + service.NewBatchPolicyField(rmoFieldBatching).Deprecated(), }, kafka.FranzProducerFields(), ) @@ -127,220 +125,169 @@ func init() { if maxInFlight, err = conf.FieldInt(rmoFieldMaxInFlight); err != nil { return } - if batchPolicy, err = conf.FieldBatchPolicy(rmoFieldBatching); err != nil { + + var inputResource string + if inputResource, err = conf.FieldString(rmoFieldInputResource); err != nil { return } - output, err = NewRedpandaMigratorWriterFromConfig(conf, mgr) - return - }) - if err != nil { - panic(err) - } -} - -//------------------------------------------------------------------------------ - -// RedpandaMigratorWriter implements a Kafka writer using the franz-go library. -type RedpandaMigratorWriter struct { - recordConverter *kafka.FranzWriter - replicationFactorOverride bool - replicationFactor int - translateSchemaIDs bool - inputResource string - schemaRegistryOutputResource srResourceKey - - clientDetails *kafka.FranzConnectionDetails - clientOpts []kgo.Opt - connMut sync.Mutex - client *kgo.Client - topicCache sync.Map - // Stores the source to destination SchemaID mapping. - schemaIDCache sync.Map - schemaRegistryOutput *schemaRegistryOutput - - clientLabel string - - mgr *service.Resources -} - -// NewRedpandaMigratorWriterFromConfig attempts to instantiate a RedpandaMigratorWriter from a parsed config. -func NewRedpandaMigratorWriterFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*RedpandaMigratorWriter, error) { - w := RedpandaMigratorWriter{ - mgr: mgr, - } - - var err error - - // NOTE: We do not provide closures for client access and yielding because - // this writer is only used for its BatchToRecords method. If we ever expand - // in order to use this as a full writer then we need to provide a full - // suite of arguments here. - if w.recordConverter, err = kafka.NewFranzWriterFromConfig(conf, nil, nil); err != nil { - return nil, err - } - - if w.clientDetails, err = kafka.FranzConnectionDetailsFromConfig(conf, mgr.Logger()); err != nil { - return nil, err - } - w.clientOpts = w.clientDetails.FranzOpts() - - var tmpOpts []kgo.Opt - if tmpOpts, err = kafka.FranzProducerOptsFromConfig(conf); err != nil { - return nil, err - } - w.clientOpts = append(w.clientOpts, tmpOpts...) - - if w.inputResource, err = conf.FieldString(rmoFieldInputResource); err != nil { - return nil, err - } - - if w.replicationFactorOverride, err = conf.FieldBool(rmoFieldRepFactorOverride); err != nil { - return nil, err - } - - if w.replicationFactor, err = conf.FieldInt(rmoFieldRepFactor); err != nil { - return nil, err - } - - if w.translateSchemaIDs, err = conf.FieldBool(rmoFieldTranslateSchemaIDs); err != nil { - return nil, err - } - - if w.translateSchemaIDs { - var res string - if res, err = conf.FieldString(rmoFieldSchemaRegistryOutputResource); err != nil { - return nil, err - } - w.schemaRegistryOutputResource = srResourceKey(res) - } - - if w.clientLabel = mgr.Label(); w.clientLabel == "" { - w.clientLabel = rmoResourceDefaultLabel - } - - return &w, nil -} - -//------------------------------------------------------------------------------ - -// Connect to the target seed brokers. -func (w *RedpandaMigratorWriter) Connect(ctx context.Context) error { - w.connMut.Lock() - defer w.connMut.Unlock() - - if w.client != nil { - return nil - } - - var err error - if w.client, err = kgo.NewClient(w.clientOpts...); err != nil { - return err - } - // Check connectivity to cluster - if err := w.client.Ping(ctx); err != nil { - return fmt.Errorf("failed to connect to cluster: %s", err) - } - - if err = kafka.FranzSharedClientSet(w.clientLabel, &kafka.FranzSharedClientInfo{ - Client: w.client, - ConnDetails: w.clientDetails, - }, w.mgr); err != nil { - w.mgr.Logger().With("error", err).Warn("Failed to store client connection for sharing") - } - - if w.translateSchemaIDs { - if res, ok := w.mgr.GetGeneric(w.schemaRegistryOutputResource); ok { - w.schemaRegistryOutput = res.(*schemaRegistryOutput) - } else { - w.mgr.Logger().Warnf("schema_registry output resource %q not found; skipping schema ID translation", w.schemaRegistryOutputResource) - } - } - - return nil -} + var replicationFactorOverride bool + if replicationFactorOverride, err = conf.FieldBool(rmoFieldRepFactorOverride); err != nil { + return + } -// WriteBatch attempts to write a batch of messages to the target topics. -func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.MessageBatch) error { - w.connMut.Lock() - defer w.connMut.Unlock() + var replicationFactor int + if replicationFactor, err = conf.FieldInt(rmoFieldRepFactor); err != nil { + return + } - if w.client == nil { - return service.ErrNotConnected - } + var translateSchemaIDs bool + if translateSchemaIDs, err = conf.FieldBool(rmoFieldTranslateSchemaIDs); err != nil { + return + } - records, err := w.recordConverter.BatchToRecords(ctx, b) - if err != nil { - return err - } + var schemaRegistryOutputResource srResourceKey + if translateSchemaIDs { + var res string + if res, err = conf.FieldString(rmoFieldSchemaRegistryOutputResource); err != nil { + return + } + schemaRegistryOutputResource = srResourceKey(res) + } - var ch franz_sr.ConfluentHeader - if w.translateSchemaIDs && w.schemaRegistryOutput != nil { - for recordIdx, record := range records { - schemaID, _, err := ch.DecodeID(record.Value) - if err != nil { - w.mgr.Logger().Warnf("Failed to extract schema ID from message index %d on topic %q: %s", recordIdx, record.Topic, err) - continue + var clientLabel string + if clientLabel = mgr.Label(); clientLabel == "" { + clientLabel = rmoResourceDefaultLabel } - var destSchemaID int - if cachedID, ok := w.schemaIDCache.Load(schemaID); !ok { - destSchemaID, err = w.schemaRegistryOutput.GetDestinationSchemaID(ctx, schemaID) - if err != nil { - w.mgr.Logger().Warnf("Failed to fetch destination schema ID from message index %d on topic %q: %s", recordIdx, record.Topic, err) - continue - } - w.schemaIDCache.Store(schemaID, destSchemaID) - } else { - destSchemaID = cachedID.(int) + var tmpOpts, clientOpts []kgo.Opt + + var connDetails *kafka.FranzConnectionDetails + if connDetails, err = kafka.FranzConnectionDetailsFromConfig(conf, mgr.Logger()); err != nil { + return } + clientOpts = append(clientOpts, connDetails.FranzOpts()...) - err = sr.UpdateID(record.Value, destSchemaID) - if err != nil { - w.mgr.Logger().Warnf("Failed to update schema ID in message index %d on topic %s: %q", recordIdx, record.Topic, err) - continue + if tmpOpts, err = kafka.FranzProducerOptsFromConfig(conf); err != nil { + return } - } - } + clientOpts = append(clientOpts, tmpOpts...) + + clientOpts = append(clientOpts, kgo.AllowAutoTopicCreation()) // TODO: Configure this? + + var client *kgo.Client + var clientMut sync.Mutex + // Stores the source to destination SchemaID mapping. + var schemaIDCache sync.Map + var topicCache sync.Map + output, err = kafka.NewFranzWriterFromConfig(conf, + func(fn kafka.FranzSharedClientUseFn) error { + clientMut.Lock() + defer clientMut.Unlock() + + if client == nil { + var err error + if client, err = kgo.NewClient(clientOpts...); err != nil { + return err + } + } - if err := kafka.FranzSharedClientUse(w.inputResource, w.mgr, func(details *kafka.FranzSharedClientInfo) error { - for _, record := range records { - if _, ok := w.topicCache.Load(record.Topic); !ok { - if err := createTopic(ctx, record.Topic, w.replicationFactorOverride, w.replicationFactor, details.Client, w.client); err != nil && err != errTopicAlreadyExists { - return fmt.Errorf("failed to create topic %q: %s", record.Topic, err) - } else { - if err == errTopicAlreadyExists { - w.mgr.Logger().Debugf("Topic %q already exists", record.Topic) - } else { - w.mgr.Logger().Infof("Created topic %q", record.Topic) + return fn(&kafka.FranzSharedClientInfo{ + Client: client, + ConnDetails: connDetails, + }) + }, + func(context.Context) error { + clientMut.Lock() + defer clientMut.Unlock() + + if client == nil { + return nil } - if err := createACLs(ctx, record.Topic, details.Client, w.client); err != nil { - w.mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", record.Topic, err) + + _, _ = kafka.FranzSharedClientPop(clientLabel, mgr) + + client.Close() + client = nil + return nil + }, + func(client *kgo.Client) { + if err = kafka.FranzSharedClientSet(clientLabel, &kafka.FranzSharedClientInfo{ + Client: client, + }, mgr); err != nil { + mgr.Logger().With("error", err).Warn("Failed to store client connection for sharing") } + }, + func(ctx context.Context, client *kgo.Client, records []*kgo.Record) error { + if translateSchemaIDs { + + if res, ok := mgr.GetGeneric(schemaRegistryOutputResource); ok { + srOutput := res.(*schemaRegistryOutput) + + var ch franz_sr.ConfluentHeader + for recordIdx, record := range records { + schemaID, _, err := ch.DecodeID(record.Value) + if err != nil { + mgr.Logger().Warnf("Failed to extract schema ID from message index %d on topic %q: %s", recordIdx, record.Topic, err) + continue + } + + var destSchemaID int + if cachedID, ok := schemaIDCache.Load(schemaID); !ok { + destSchemaID, err = srOutput.GetDestinationSchemaID(ctx, schemaID) + if err != nil { + mgr.Logger().Warnf("Failed to fetch destination schema ID from message index %d on topic %q: %s", recordIdx, record.Topic, err) + continue + } + schemaIDCache.Store(schemaID, destSchemaID) + } else { + destSchemaID = cachedID.(int) + } + + err = sr.UpdateID(record.Value, destSchemaID) + if err != nil { + mgr.Logger().Warnf("Failed to update schema ID in message index %d on topic %s: %q", recordIdx, record.Topic, err) + continue + } + } + } else { + mgr.Logger().Warnf("schema_registry output resource %q not found; skipping schema ID translation", schemaRegistryOutputResource) + return nil + } - w.topicCache.Store(record.Topic, struct{}{}) - } - } - } - return nil - }); err != nil { - w.mgr.Logger().With("error", err, "resource", w.inputResource).Warn("Failed to access shared client for given resource identifier") - } + } - return w.client.ProduceSync(ctx, records...).FirstErr() -} + // Once we get here, the input should already be initialised and its pre-flight hook should have + // been called already. Thus, we don't need to loop until the input is ready. + if err := kafka.FranzSharedClientUse(inputResource, mgr, func(details *kafka.FranzSharedClientInfo) error { + for _, record := range records { + if _, ok := topicCache.Load(record.Topic); !ok { + if err := createTopic(ctx, record.Topic, replicationFactorOverride, replicationFactor, details.Client, client); err != nil && err != errTopicAlreadyExists { + return fmt.Errorf("failed to create topic %q: %s", record.Topic, err) + } else { + if err == errTopicAlreadyExists { + mgr.Logger().Debugf("Topic %q already exists", record.Topic) + } else { + mgr.Logger().Infof("Created topic %q", record.Topic) + } + if err := createACLs(ctx, record.Topic, details.Client, client); err != nil { + mgr.Logger().Errorf("Failed to create ACLs for topic %q: %s", record.Topic, err) + } + + topicCache.Store(record.Topic, struct{}{}) + } + } + } + return nil + }); err != nil { + mgr.Logger().With("error", err, "resource", inputResource).Warn("Failed to access shared client for given resource identifier") + } -func (w *RedpandaMigratorWriter) disconnect() { - if w.client == nil { - return + return nil + }) + return + }) + if err != nil { + panic(err) } - _, _ = kafka.FranzSharedClientPop(w.clientLabel, w.mgr) - w.client.Close() - w.client = nil -} - -// Close underlying connections. -func (w *RedpandaMigratorWriter) Close(ctx context.Context) error { - w.disconnect() - return nil } diff --git a/internal/impl/kafka/franz_reader_ordered.go b/internal/impl/kafka/franz_reader_ordered.go index 9e628a59b3..ad7f4605fd 100644 --- a/internal/impl/kafka/franz_reader_ordered.go +++ b/internal/impl/kafka/franz_reader_ordered.go @@ -17,6 +17,8 @@ package kafka import ( "context" "errors" + "fmt" + "strconv" "sync" "sync/atomic" "time" @@ -24,17 +26,20 @@ import ( "github.com/Jeffail/checkpoint" "github.com/Jeffail/shutdown" "github.com/cenkalti/backoff/v4" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/connect/v4/internal/asyncroutine" "github.com/redpanda-data/connect/v4/internal/dispatch" ) const ( - kroFieldConsumerGroup = "consumer_group" - kroFieldCommitPeriod = "commit_period" - kroFieldPartitionBuffer = "partition_buffer_bytes" + kroFieldConsumerGroup = "consumer_group" + kroFieldCommitPeriod = "commit_period" + kroFieldPartitionBuffer = "partition_buffer_bytes" + kroFieldTopicLagRefreshPeriod = "topic_lag_refresh_period" ) // FranzReaderOrderedConfigFields returns config fields for customising the @@ -52,26 +57,40 @@ func FranzReaderOrderedConfigFields() []*service.ConfigField { Description("A buffer size (in bytes) for each consumed partition, allowing records to be queued internally before flushing. Increasing this may improve throughput at the cost of higher memory utilisation. Note that each buffer can grow slightly beyond this value."). Default("1MB"). Advanced(), + service.NewDurationField(kroFieldTopicLagRefreshPeriod). + Description("The period of time between each topic lag refresh cycle."). + Default("5s"). + Advanced(), } } //------------------------------------------------------------------------------ -// RecordToMessageFn is a function that converts a Kafka record into a Message. -type RecordToMessageFn func(record *kgo.Record) (*service.Message, error) +// recordToMessageFn is a function that converts a Kafka record into a Message. +type recordToMessageFn func(record *kgo.Record) (*service.Message, error) + +// preflightHookFn is a function which is executed once before the first batch of messages is emitted. +type preflightHookFn func(ctx context.Context, res *service.Resources, client *kgo.Client) // FranzReaderOrdered implements a kafka reader using the franz-go library. type FranzReaderOrdered struct { clientOpts func() ([]kgo.Opt, error) - partState *partitionState - - consumerGroup string - commitPeriod time.Duration - cacheLimit uint64 - recordToMessageFn RecordToMessageFn - - readBackOff backoff.BackOff + partState *partitionState + lagUpdater *asyncroutine.Periodic + topicLagGauge *service.MetricGauge + topicLagCache sync.Map + preflightHookExecuted bool + client *kgo.Client + + consumerGroup string + commitPeriod time.Duration + topicLagRefreshPeriod time.Duration + cacheLimit uint64 + recordToMessageFn recordToMessageFn + preflightHookFn preflightHookFn + closeHookFn func(res *service.Resources) + readBackOff backoff.BackOff res *service.Resources log *service.Logger @@ -80,7 +99,7 @@ type FranzReaderOrdered struct { // NewFranzReaderOrderedFromConfig attempts to instantiate a new // FranzReaderOrdered reader from a parsed config. -func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error), recordToMessageFn RecordToMessageFn) (*FranzReaderOrdered, error) { +func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error), recordToMessageFn recordToMessageFn, preflightHookFn preflightHookFn, closeHookFn func(res *service.Resources)) (*FranzReaderOrdered, error) { readBackOff := backoff.NewExponentialBackOff() readBackOff.InitialInterval = time.Millisecond readBackOff.MaxInterval = time.Millisecond * 100 @@ -95,7 +114,10 @@ func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Re log: res.Logger(), shutSig: shutdown.NewSignaller(), clientOpts: optsFn, + topicLagGauge: res.Metrics().NewGauge("redpanda_lag", "topic", "partition"), recordToMessageFn: recordToMessageFn, + preflightHookFn: preflightHookFn, + closeHookFn: closeHookFn, } f.consumerGroup, _ = conf.FieldString(kroFieldConsumerGroup) @@ -109,6 +131,10 @@ func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Re return nil, err } + if f.topicLagRefreshPeriod, err = conf.FieldDuration(kroFieldTopicLagRefreshPeriod); err != nil { + return nil, err + } + return &f, nil } @@ -122,13 +148,20 @@ func (f *FranzReaderOrdered) recordsToBatch(records []*kgo.Record) *batchWithRec var length uint64 var batch service.MessageBatch for _, r := range records { - record, err := f.recordToMessageFn(r) + msg, err := f.recordToMessageFn(r) if err != nil { f.log.Debugf("Failed to convert kafka record to message: %s", err) continue } length += uint64(len(r.Value) + len(r.Key)) - batch = append(batch, record) + batch = append(batch, msg) + + lag := int64(0) + if val, ok := f.topicLagCache.Load(fmt.Sprintf("%s_%d", r.Topic, r.Partition)); ok { + lag = val.(int64) + } + msg.MetaSetMut("kafka_lag", lag) + // The record lives on for checkpointing, but we don't need the contents // going forward so discard these. This looked fine to me but could // potentially be a source of problems so treat this as sus. @@ -349,14 +382,13 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error { return err } - var cl *kgo.Client commitFn := func(r *kgo.Record) {} if f.consumerGroup != "" { commitFn = func(r *kgo.Record) { - if cl == nil { + if f.client == nil { return } - cl.MarkCommitRecords(r) + f.client.MarkCommitRecords(r) } } @@ -389,13 +421,50 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error { ) } - if cl, err = kgo.NewClient(clientOpts...); err != nil { + if f.client, err = kgo.NewClient(clientOpts...); err != nil { return err } + // Check connectivity to cluster + if err = f.client.Ping(ctx); err != nil { + return fmt.Errorf("failed to connect to cluster: %s", err) + } + + if f.lagUpdater != nil { + f.lagUpdater.Stop() + } + adminClient := kadm.NewClient(f.client) + f.lagUpdater = asyncroutine.NewPeriodic(f.topicLagRefreshPeriod, func() { + ctx, done := context.WithTimeout(context.Background(), f.topicLagRefreshPeriod) + defer done() + + lags, err := adminClient.Lag(ctx, f.consumerGroup) + if err != nil { + f.log.Debugf("Failed to fetch group lags: %s", err) + } + + lags.Each(func(gl kadm.DescribedGroupLag) { + for _, gl := range gl.Lag { + for _, pl := range gl { + lag := pl.Lag + if lag < 0 { + lag = 0 + } + + f.topicLagGauge.Set(lag, pl.Topic, strconv.Itoa(int(pl.Partition))) + f.topicLagCache.Store(fmt.Sprintf("%s_%d", pl.Topic, pl.Partition), lag) + } + } + }) + }) + f.lagUpdater.Start() + go func() { defer func() { - cl.Close() + if f.closeHookFn != nil { + f.closeHookFn(f.res) + } + f.client.Close() if f.shutSig.IsSoftStopSignalled() { f.shutSig.TriggerHasStopped() } @@ -412,7 +481,7 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error { // In this case we don't want to actually resume any of them yet so // I add a forced timeout to deal with it. stallCtx, pollDone := context.WithTimeout(closeCtx, time.Second) - fetches := cl.PollFetches(stallCtx) + fetches := f.client.PollFetches(stallCtx) pollDone() if errs := fetches.Errors(); len(errs) > 0 { @@ -437,7 +506,7 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error { } if nonTemporalErr { - cl.Close() + f.client.Close() return } } @@ -449,6 +518,7 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error { fetches.EachPartition(func(p kgo.FetchTopicPartition) { if len(p.Records) > 0 { batch := f.recordsToBatch(p.Records) + // TODO: do we have to ack? if len(batch.b) == 0 { return } @@ -457,11 +527,11 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error { } } }) - _ = cl.PauseFetchPartitions(pauseTopicPartitions) + _ = f.client.PauseFetchPartitions(pauseTopicPartitions) noActivePartitions: for { - pausedPartitionTopics := cl.PauseFetchPartitions(nil) + pausedPartitionTopics := f.client.PauseFetchPartitions(nil) // Walk all the disabled topic partitions and check whether any // of them can be resumed. @@ -474,7 +544,7 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error { } } if len(resumeTopicPartitions) > 0 { - cl.ResumeFetchPartitions(resumeTopicPartitions) + f.client.ResumeFetchPartitions(resumeTopicPartitions) } if len(f.consumerGroup) == 0 || len(resumeTopicPartitions) > 0 || checkpoints.tallyActivePartitions(pausedPartitionTopics) > 0 { @@ -496,6 +566,11 @@ func (f *FranzReaderOrdered) ReadBatch(ctx context.Context) (service.MessageBatc for { if mAck := f.partState.pop(); mAck != nil { + if f.preflightHookFn != nil && !f.preflightHookExecuted { + f.preflightHookFn(ctx, f.res, f.client) + f.preflightHookExecuted = true + } + f.readBackOff.Reset() return mAck.batch, func(ctx context.Context, res error) error { // Res will always be nil because we initialize with service.AutoRetryNacks diff --git a/internal/impl/kafka/franz_writer.go b/internal/impl/kafka/franz_writer.go index 076437fea5..84cac22f90 100644 --- a/internal/impl/kafka/franz_writer.go +++ b/internal/impl/kafka/franz_writer.go @@ -253,6 +253,15 @@ func FranzWriterConfigLints() string { }` } +// accessClientFn is a function which is executed to fetch the client. +type accessClientFn func(FranzSharedClientUseFn) error + +// yieldClientFn is a function which is executed during close to yield the client. +type yieldClientFn func(context.Context) error + +// onWriteHookFn is a function which is executed before a message batch is written. +type onWriteHookFn func(ctx context.Context, client *kgo.Client, records []*kgo.Record) error + // FranzWriter implements a Kafka writer using the franz-go library. type FranzWriter struct { Topic *service.InterpolatedString @@ -262,17 +271,21 @@ type FranzWriter struct { IsTimestampMs bool MetaFilter *service.MetadataFilter - accessClientFn func(FranzSharedClientUseFn) error - yieldClientFn func(context.Context) error + accessClientFn accessClientFn + yieldClientFn yieldClientFn + onConnectHookFn func(client *kgo.Client) + onWriteHookFn onWriteHookFn } // NewFranzWriterFromConfig uses a parsed config to extract customisation for // writing data to a Kafka broker. A closure function must be provided that is // responsible for granting access to a connected client. -func NewFranzWriterFromConfig(conf *service.ParsedConfig, accessClientFn func(FranzSharedClientUseFn) error, yieldClientFn func(context.Context) error) (*FranzWriter, error) { +func NewFranzWriterFromConfig(conf *service.ParsedConfig, accessClientFn accessClientFn, yieldClientFn yieldClientFn, onConnectHookFn func(client *kgo.Client), onWriteHookFn onWriteHookFn) (*FranzWriter, error) { w := FranzWriter{ - accessClientFn: accessClientFn, - yieldClientFn: yieldClientFn, + accessClientFn: accessClientFn, + yieldClientFn: yieldClientFn, + onConnectHookFn: onConnectHookFn, + onWriteHookFn: onWriteHookFn, } var err error @@ -394,13 +407,23 @@ func (w *FranzWriter) BatchToRecords(ctx context.Context, b service.MessageBatch // Connect to the target seed brokers. func (w *FranzWriter) Connect(ctx context.Context) error { - return w.accessClientFn(func(details *FranzSharedClientInfo) error { + var client *kgo.Client + if err := w.accessClientFn(func(details *FranzSharedClientInfo) error { // Check connectivity to cluster if err := details.Client.Ping(ctx); err != nil { return fmt.Errorf("failed to connect to cluster: %s", err) } + client = details.Client return nil - }) + }); err != nil { + return err + } + + if w.onConnectHookFn != nil { + w.onConnectHookFn(client) + } + + return nil } // WriteBatch attempts to write a batch of messages to the target topics. @@ -414,6 +437,12 @@ func (w *FranzWriter) WriteBatch(ctx context.Context, b service.MessageBatch) er return err } + if w.onWriteHookFn != nil { + if err := w.onWriteHookFn(ctx, details.Client, records); err != nil { + return fmt.Errorf("on write hook failed: %s", err) + } + } + var ( wg sync.WaitGroup results = make(kgo.ProduceResults, 0, len(records)) diff --git a/internal/impl/kafka/input_redpanda.go b/internal/impl/kafka/input_redpanda.go index e1aecbec5b..6eb3a8f222 100644 --- a/internal/impl/kafka/input_redpanda.go +++ b/internal/impl/kafka/input_redpanda.go @@ -60,6 +60,10 @@ output: Records are processed and delivered from each partition in batches as received from brokers. These batch sizes are therefore dynamically sized in order to optimise throughput, but can be tuned with the config fields ` + "`fetch_max_partition_bytes` and `fetch_max_bytes`" + `. Batches can be further broken down using the ` + "xref:components:processors/split.adoc[`split`] processor" + `. +== Metrics + +Emits a ` + "`redpanda_lag`" + ` metric with ` + "`topic`" + ` and ` + "`partition`" + ` labels for each consumed topic. + == Metadata This input adds the following metadata fields to each message: @@ -69,6 +73,7 @@ This input adds the following metadata fields to each message: - kafka_topic - kafka_partition - kafka_offset +- kafka_lag - kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message @@ -119,7 +124,7 @@ func init() { rdr, err := NewFranzReaderOrderedFromConfig(conf, mgr, func() ([]kgo.Opt, error) { return clientOpts, nil - }, nil) + }, nil, nil, nil) if err != nil { return nil, err } diff --git a/internal/impl/kafka/output_kafka_franz.go b/internal/impl/kafka/output_kafka_franz.go index 0938312a5f..609a737335 100644 --- a/internal/impl/kafka/output_kafka_franz.go +++ b/internal/impl/kafka/output_kafka_franz.go @@ -115,7 +115,7 @@ func init() { client.Close() client = nil return nil - }) + }, nil, nil) return }) if err != nil { diff --git a/internal/impl/kafka/output_redpanda.go b/internal/impl/kafka/output_redpanda.go index 76ec443d2d..a93e122aa5 100644 --- a/internal/impl/kafka/output_redpanda.go +++ b/internal/impl/kafka/output_redpanda.go @@ -107,7 +107,7 @@ func init() { client.Close() client = nil return nil - }) + }, nil, nil) return }) if err != nil { diff --git a/internal/impl/ockam/output_kafka.go b/internal/impl/ockam/output_kafka.go index f07e5d6c66..eb2cf82b19 100644 --- a/internal/impl/ockam/output_kafka.go +++ b/internal/impl/ockam/output_kafka.go @@ -221,7 +221,7 @@ func newOckamKafkaOutput(conf *service.ParsedConfig, log *service.Logger) (*ocka client.Close() client = nil return nil - }) + }, nil, nil) if err != nil { return nil, err }