diff --git a/CHANGELOG.md b/CHANGELOG.md index 2544c1dbb2..e46ce85f6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file. - New `pg_stream` input supporting change data capture (CDC) from PostgreSQL (@le-vlad) - Field `metadata_max_age` added to the `redpanda_migrator_offsets` output. (@mihaitodor) +- Field `kafka_timestamp_ms` added to the `kafka`, `kafka_franz`, `redpanda`, `redpanda_common` and `redpanda_migrator` outputs. (@mihaitodor) ### Changed @@ -16,6 +17,8 @@ All notable changes to this project will be documented in this file. - Fields `translate_schema_ids` and `schema_registry_output_resource` added to the `redpanda_migrator` output. (@mihaitodor) - Fields `backfill_dependencies` and `input_resource` added to the `schema_registry` output. (@mihaitodor) - The `schema_registry` input and output and the `schema_registry_encode` and `schema_registry_decode` processors now use the `github.com/twmb/franz-go/pkg/sr` SchemaRegistry client. (@mihaitodor) +- Metadata field `kafka_timestamp_ms` added to the `kafka`, `kafka_franz`, `redpanda`, `redpanda_common` and `redpanda_migrator` inputs now contains a unix timestamp with millisecond precision. (@mihaitodor) +- Metadata field `kafka_timestamp` removed from the `kafka`, `kafka_franz`, `redpanda`, `redpanda_common` and `redpanda_migrator` inputs. (@mihaitodor) ## 4.39.0 - 2024-11-07 diff --git a/docs/modules/components/pages/inputs/kafka.adoc b/docs/modules/components/pages/inputs/kafka.adoc index 84ffc88641..045f56b126 100644 --- a/docs/modules/components/pages/inputs/kafka.adoc +++ b/docs/modules/components/pages/inputs/kafka.adoc @@ -115,6 +115,7 @@ This input adds the following metadata fields to each message: - kafka_partition - kafka_offset - kafka_lag +- kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message - All existing message headers (version 0.11+) diff --git a/docs/modules/components/pages/inputs/kafka_franz.adoc b/docs/modules/components/pages/inputs/kafka_franz.adoc index 8e11e4b639..ab35851710 100644 --- a/docs/modules/components/pages/inputs/kafka_franz.adoc +++ b/docs/modules/components/pages/inputs/kafka_franz.adoc @@ -103,7 +103,7 @@ This input adds the following metadata fields to each message: - kafka_topic - kafka_partition - kafka_offset -- kafka_timestamp +- kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message - All record headers diff --git a/docs/modules/components/pages/inputs/redpanda.adoc b/docs/modules/components/pages/inputs/redpanda.adoc index 67a97dc347..381400ead6 100644 --- a/docs/modules/components/pages/inputs/redpanda.adoc +++ b/docs/modules/components/pages/inputs/redpanda.adoc @@ -123,8 +123,8 @@ This input adds the following metadata fields to each message: - kafka_topic - kafka_partition - kafka_offset +- kafka_timestamp_ms - kafka_timestamp_unix -- kafka_timestamp - kafka_tombstone_message - All record headers ``` diff --git a/docs/modules/components/pages/inputs/redpanda_common.adoc b/docs/modules/components/pages/inputs/redpanda_common.adoc index 4463de2f7b..ab0b6a912c 100644 --- a/docs/modules/components/pages/inputs/redpanda_common.adoc +++ b/docs/modules/components/pages/inputs/redpanda_common.adoc @@ -109,7 +109,7 @@ This input adds the following metadata fields to each message: - kafka_topic - kafka_partition - kafka_offset -- kafka_timestamp +- kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message - All record headers diff --git a/docs/modules/components/pages/inputs/redpanda_migrator.adoc b/docs/modules/components/pages/inputs/redpanda_migrator.adoc index e87078661f..efbb5b8034 100644 --- a/docs/modules/components/pages/inputs/redpanda_migrator.adoc +++ b/docs/modules/components/pages/inputs/redpanda_migrator.adoc @@ -110,6 +110,7 @@ This input adds the following metadata fields to each message: - kafka_partition - kafka_offset - kafka_lag +- kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message - All record headers diff --git a/docs/modules/components/pages/outputs/kafka.adoc b/docs/modules/components/pages/outputs/kafka.adoc index 2f168f5400..3a24544e7d 100644 --- a/docs/modules/components/pages/outputs/kafka.adoc +++ b/docs/modules/components/pages/outputs/kafka.adoc @@ -112,7 +112,7 @@ output: initial_interval: 3s max_interval: 10s max_elapsed_time: 30s - timestamp: ${! timestamp_unix() } # No default (optional) + timestamp_ms: ${! timestamp_unix_milli() } # No default (optional) ``` -- @@ -831,9 +831,9 @@ max_elapsed_time: 1m max_elapsed_time: 1h ``` -=== `timestamp` +=== `timestamp_ms` -An optional timestamp to set for each message. When left empty, the current timestamp is used. +An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. @@ -843,9 +843,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter ```yml # Examples -timestamp: ${! timestamp_unix() } +timestamp_ms: ${! timestamp_unix_milli() } -timestamp: ${! metadata("kafka_timestamp_unix") } +timestamp_ms: ${! metadata("kafka_timestamp_ms") } ``` diff --git a/docs/modules/components/pages/outputs/kafka_franz.adoc b/docs/modules/components/pages/outputs/kafka_franz.adoc index 299f909831..25a1c019f6 100644 --- a/docs/modules/components/pages/outputs/kafka_franz.adoc +++ b/docs/modules/components/pages/outputs/kafka_franz.adoc @@ -81,7 +81,7 @@ output: metadata: include_prefixes: [] include_patterns: [] - timestamp: ${! timestamp_unix() } # No default (optional) + timestamp_ms: ${! timestamp_unix_milli() } # No default (optional) max_in_flight: 10 batching: count: 0 @@ -576,9 +576,9 @@ include_patterns: - _timestamp_unix$ ``` -=== `timestamp` +=== `timestamp_ms` -An optional timestamp to set for each message. When left empty, the current timestamp is used. +An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. @@ -588,9 +588,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter ```yml # Examples -timestamp: ${! timestamp_unix() } +timestamp_ms: ${! timestamp_unix_milli() } -timestamp: ${! metadata("kafka_timestamp_unix") } +timestamp_ms: ${! metadata("kafka_timestamp_ms") } ``` === `max_in_flight` diff --git a/docs/modules/components/pages/outputs/ockam_kafka.adoc b/docs/modules/components/pages/outputs/ockam_kafka.adoc index c0da06592e..d44d4cbc7b 100644 --- a/docs/modules/components/pages/outputs/ockam_kafka.adoc +++ b/docs/modules/components/pages/outputs/ockam_kafka.adoc @@ -99,7 +99,7 @@ output: metadata: include_prefixes: [] include_patterns: [] - timestamp: ${! timestamp_unix() } # No default (optional) + timestamp_ms: ${! timestamp_unix_milli() } # No default (optional) disable_content_encryption: false enrollment_ticket: "" # No default (optional) identity_name: "" # No default (optional) @@ -589,9 +589,9 @@ include_patterns: - _timestamp_unix$ ``` -=== `kafka.timestamp` +=== `kafka.timestamp_ms` -An optional timestamp to set for each message. When left empty, the current timestamp is used. +An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. @@ -601,9 +601,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter ```yml # Examples -timestamp: ${! timestamp_unix() } +timestamp_ms: ${! timestamp_unix_milli() } -timestamp: ${! metadata("kafka_timestamp_unix") } +timestamp_ms: ${! metadata("kafka_timestamp_ms") } ``` === `disable_content_encryption` diff --git a/docs/modules/components/pages/outputs/redpanda.adoc b/docs/modules/components/pages/outputs/redpanda.adoc index e422fb2729..da100ca31c 100644 --- a/docs/modules/components/pages/outputs/redpanda.adoc +++ b/docs/modules/components/pages/outputs/redpanda.adoc @@ -74,7 +74,7 @@ output: metadata: include_prefixes: [] include_patterns: [] - timestamp: ${! timestamp_unix() } # No default (optional) + timestamp_ms: ${! timestamp_unix_milli() } # No default (optional) max_in_flight: 256 partitioner: "" # No default (optional) idempotent_write: true @@ -561,9 +561,9 @@ include_patterns: - _timestamp_unix$ ``` -=== `timestamp` +=== `timestamp_ms` -An optional timestamp to set for each message. When left empty, the current timestamp is used. +An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. @@ -573,9 +573,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter ```yml # Examples -timestamp: ${! timestamp_unix() } +timestamp_ms: ${! timestamp_unix_milli() } -timestamp: ${! metadata("kafka_timestamp_unix") } +timestamp_ms: ${! metadata("kafka_timestamp_ms") } ``` === `max_in_flight` diff --git a/docs/modules/components/pages/outputs/redpanda_common.adoc b/docs/modules/components/pages/outputs/redpanda_common.adoc index 4a4e1bf21c..165afd6abb 100644 --- a/docs/modules/components/pages/outputs/redpanda_common.adoc +++ b/docs/modules/components/pages/outputs/redpanda_common.adoc @@ -67,7 +67,7 @@ output: metadata: include_prefixes: [] include_patterns: [] - timestamp: ${! timestamp_unix() } # No default (optional) + timestamp_ms: ${! timestamp_unix_milli() } # No default (optional) max_in_flight: 10 batching: count: 0 @@ -205,9 +205,9 @@ include_patterns: - _timestamp_unix$ ``` -=== `timestamp` +=== `timestamp_ms` -An optional timestamp to set for each message. When left empty, the current timestamp is used. +An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. @@ -217,9 +217,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter ```yml # Examples -timestamp: ${! timestamp_unix() } +timestamp_ms: ${! timestamp_unix_milli() } -timestamp: ${! metadata("kafka_timestamp_unix") } +timestamp_ms: ${! metadata("kafka_timestamp_ms") } ``` === `max_in_flight` diff --git a/docs/modules/components/pages/outputs/redpanda_migrator.adoc b/docs/modules/components/pages/outputs/redpanda_migrator.adoc index a1dc0cb851..5966836055 100644 --- a/docs/modules/components/pages/outputs/redpanda_migrator.adoc +++ b/docs/modules/components/pages/outputs/redpanda_migrator.adoc @@ -81,7 +81,7 @@ output: metadata: include_prefixes: [] include_patterns: [] - timestamp: ${! timestamp_unix() } # No default (optional) + timestamp_ms: ${! timestamp_unix_milli() } # No default (optional) max_in_flight: 10 batching: count: 0 @@ -138,7 +138,7 @@ output: key: ${! metadata("kafka_key") } partitioner: manual partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) } - timestamp: ${! metadata("kafka_timestamp_unix").or(timestamp_unix()) } + timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) } input_resource: redpanda_migrator_input max_in_flight: 1 ``` @@ -617,9 +617,9 @@ include_patterns: - _timestamp_unix$ ``` -=== `timestamp` +=== `timestamp_ms` -An optional timestamp to set for each message. When left empty, the current timestamp is used. +An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. @@ -629,9 +629,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter ```yml # Examples -timestamp: ${! timestamp_unix() } +timestamp_ms: ${! timestamp_unix_milli() } -timestamp: ${! metadata("kafka_timestamp_unix") } +timestamp_ms: ${! metadata("kafka_timestamp_ms") } ``` === `max_in_flight` diff --git a/docs/modules/components/pages/processors/redpanda_data_transform.adoc b/docs/modules/components/pages/processors/redpanda_data_transform.adoc index e2bfb49b32..e252b7a556 100644 --- a/docs/modules/components/pages/processors/redpanda_data_transform.adoc +++ b/docs/modules/components/pages/processors/redpanda_data_transform.adoc @@ -221,7 +221,7 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter timestamp: ${! timestamp_unix() } -timestamp: ${! metadata("kafka_timestamp_unix") } +timestamp: ${! metadata("kafka_timestamp_ms") } ``` === `timeout` diff --git a/internal/impl/kafka/enterprise/redpanda_common_input.go b/internal/impl/kafka/enterprise/redpanda_common_input.go index 885ddd975e..1798940c71 100644 --- a/internal/impl/kafka/enterprise/redpanda_common_input.go +++ b/internal/impl/kafka/enterprise/redpanda_common_input.go @@ -73,7 +73,7 @@ This input adds the following metadata fields to each message: - kafka_topic - kafka_partition - kafka_offset -- kafka_timestamp +- kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message - All record headers diff --git a/internal/impl/kafka/enterprise/redpanda_common_output.go b/internal/impl/kafka/enterprise/redpanda_common_output.go index 282d21fb54..fea495f48f 100644 --- a/internal/impl/kafka/enterprise/redpanda_common_output.go +++ b/internal/impl/kafka/enterprise/redpanda_common_output.go @@ -27,14 +27,7 @@ func redpandaCommonOutputConfig() *service.ConfigSpec { Default(10), service.NewBatchPolicyField(roFieldBatching), ). - LintRule(` -root = if this.partitioner == "manual" { -if this.partition.or("") == "" { -"a partition must be specified when the partitioner is set to manual" -} -} else if this.partition.or("") != "" { -"a partition cannot be specified unless the partitioner is set to manual" -}`). + LintRule(kafka.FranzWriterConfigLints()). Example("Simple Output", "Data is generated and written to a topic bar, targetting the cluster configured within the redpanda block at the bottom. This is useful as it allows us to configure TLS and SASL only once for potentially multiple inputs and outputs.", ` input: generate: diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_bundle_input.tmpl.yaml b/internal/impl/kafka/enterprise/redpanda_migrator_bundle_input.tmpl.yaml index e35ba55c1e..b21046833d 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_bundle_input.tmpl.yaml +++ b/internal/impl/kafka/enterprise/redpanda_migrator_bundle_input.tmpl.yaml @@ -109,6 +109,7 @@ tests: redpanda_migrator: seed_brokers: [ "127.0.0.1:9092" ] topics: [ "foobar" ] + consumer_group: "migrator" schema_registry: url: http://localhost:8081 @@ -133,11 +134,13 @@ tests: - redpanda_migrator: seed_brokers: [ "127.0.0.1:9092" ] topics: [ "foobar" ] + consumer_group: "migrator" processors: - mapping: meta input_label = "redpanda_migrator" - kafka_franz: seed_brokers: [ "127.0.0.1:9092" ] topics: [ "__consumer_offsets" ] + consumer_group: "migrator" processors: - mapping: meta input_label = "redpanda_migrator_offsets" @@ -146,6 +149,7 @@ tests: redpanda_migrator: seed_brokers: [ "127.0.0.1:9092" ] topics: [ "foobar" ] + consumer_group: "migrator" schema_registry: url: http://localhost:8081 @@ -168,15 +172,15 @@ tests: - broker: inputs: - redpanda_migrator: - seed_brokers: - - 127.0.0.1:9092 - topics: - - foobar + seed_brokers: [ "127.0.0.1:9092" ] + topics: [ "foobar" ] + consumer_group: "migrator" processors: - mapping: meta input_label = "redpanda_migrator" - kafka_franz: seed_brokers: [ "127.0.0.1:9092" ] topics: [ "__consumer_offsets" ] + consumer_group: "migrator" processors: - mapping: meta input_label = "redpanda_migrator_offsets" @@ -185,19 +189,20 @@ tests: redpanda_migrator: seed_brokers: [ "127.0.0.1:9092" ] topics: [ "foobar" ] + consumer_group: "migrator" expected: broker: inputs: - redpanda_migrator: - seed_brokers: - - 127.0.0.1:9092 - topics: - - foobar + seed_brokers: [ "127.0.0.1:9092" ] + topics: [ "foobar" ] + consumer_group: "migrator" processors: - mapping: meta input_label = "redpanda_migrator" - kafka_franz: seed_brokers: [ "127.0.0.1:9092" ] topics: [ "__consumer_offsets" ] + consumer_group: "migrator" processors: - mapping: meta input_label = "redpanda_migrator_offsets" diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml b/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml index 7a3db7f3bc..65a5d296ec 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml +++ b/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml @@ -43,7 +43,7 @@ mapping: | "key": "${! metadata(\"kafka_key\") }", "partition": "${! metadata(\"kafka_partition\").or(throw(\"missing kafka_partition metadata\")) }", "partitioner": "manual", - "timestamp": "${! metadata(\"kafka_timestamp_unix\").or(timestamp_unix()) }", + "timestamp_ms": "${! metadata(\"kafka_timestamp_ms\").or(timestamp_unix_milli()) }", "max_in_flight": $rpMigratorMaxInFlight, "metadata": { "include_patterns": [ @@ -167,7 +167,7 @@ tests: partitioner: manual seed_brokers: - 127.0.0.1:9092 - timestamp: ${! metadata("kafka_timestamp_unix").or(timestamp_unix()) } + timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) } topic: ${! metadata("kafka_topic").or(throw("missing kafka_topic metadata")) } metadata: include_patterns: @@ -229,7 +229,7 @@ tests: partitioner: manual seed_brokers: - 127.0.0.1:9092 - timestamp: ${! metadata("kafka_timestamp_unix").or(timestamp_unix()) } + timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) } topic: ${! metadata("kafka_topic").or(throw("missing kafka_topic metadata")) } metadata: include_patterns: diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_input.go b/internal/impl/kafka/enterprise/redpanda_migrator_input.go index ea7b17fa37..3149929f65 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_input.go +++ b/internal/impl/kafka/enterprise/redpanda_migrator_input.go @@ -69,6 +69,7 @@ This input adds the following metadata fields to each message: - kafka_partition - kafka_offset - kafka_lag +- kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message - All record headers diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_output.go b/internal/impl/kafka/enterprise/redpanda_migrator_output.go index af18daac9d..76ade738e0 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_output.go +++ b/internal/impl/kafka/enterprise/redpanda_migrator_output.go @@ -60,14 +60,8 @@ ACL migration adheres to the following principles: - Only topic ACLs are migrated, group ACLs are not migrated `). Fields(RedpandaMigratorOutputConfigFields()...). - LintRule(` -root = if this.partitioner == "manual" { -if this.partition.or("") == "" { -"a partition must be specified when the partitioner is set to manual" -} -} else if this.partition.or("") != "" { -"a partition cannot be specified unless the partitioner is set to manual" -}`).Example("Transfer data", "Writes messages to the configured broker and creates topics and topic ACLs if they don't exist. It also ensures that the message order is preserved.", ` + LintRule(kafka.FranzWriterConfigLints()). + Example("Transfer data", "Writes messages to the configured broker and creates topics and topic ACLs if they don't exist. It also ensures that the message order is preserved.", ` output: redpanda_migrator: seed_brokers: [ "127.0.0.1:9093" ] @@ -75,7 +69,7 @@ output: key: ${! metadata("kafka_key") } partitioner: manual partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) } - timestamp: ${! metadata("kafka_timestamp_unix").or(timestamp_unix()) } + timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) } input_resource: redpanda_migrator_input max_in_flight: 1 `) diff --git a/internal/impl/kafka/franz_reader.go b/internal/impl/kafka/franz_reader.go index 8fed08b962..98894805c1 100644 --- a/internal/impl/kafka/franz_reader.go +++ b/internal/impl/kafka/franz_reader.go @@ -16,7 +16,6 @@ package kafka import ( "fmt" - "time" "github.com/dustin/go-humanize" "github.com/twmb/franz-go/pkg/kgo" @@ -214,7 +213,7 @@ func FranzRecordToMessageV0(record *kgo.Record, multiHeader bool) *service.Messa msg.MetaSetMut("kafka_partition", int(record.Partition)) msg.MetaSetMut("kafka_offset", int(record.Offset)) msg.MetaSetMut("kafka_timestamp_unix", record.Timestamp.Unix()) - msg.MetaSetMut("kafka_timestamp", record.Timestamp.Format(time.RFC3339Nano)) + msg.MetaSetMut("kafka_timestamp_ms", record.Timestamp.UnixMilli()) msg.MetaSetMut("kafka_tombstone_message", record.Value == nil) if multiHeader { // in multi header mode we gather headers so we can encode them as lists @@ -245,7 +244,7 @@ func FranzRecordToMessageV1(record *kgo.Record) *service.Message { msg.MetaSetMut("kafka_partition", int(record.Partition)) msg.MetaSetMut("kafka_offset", int(record.Offset)) msg.MetaSetMut("kafka_timestamp_unix", record.Timestamp.Unix()) - msg.MetaSetMut("kafka_timestamp", record.Timestamp.Format(time.RFC3339Nano)) + msg.MetaSetMut("kafka_timestamp_ms", record.Timestamp.UnixMilli()) msg.MetaSetMut("kafka_tombstone_message", record.Value == nil) headers := map[string][]any{} diff --git a/internal/impl/kafka/franz_reader_ordered.go b/internal/impl/kafka/franz_reader_ordered.go index d644344c1e..d32f57a08e 100644 --- a/internal/impl/kafka/franz_reader_ordered.go +++ b/internal/impl/kafka/franz_reader_ordered.go @@ -250,7 +250,7 @@ func (c *partitionState) pop() *batchWithAckFn { return nil } -func (c *partitionState) addRecords(ctx context.Context, topic string, partition int32, batch *batchWithRecords, bufferSize uint64) (pauseFetch bool) { +func (c *partitionState) addRecords(topic string, partition int32, batch *batchWithRecords, bufferSize uint64) (pauseFetch bool) { c.mut.Lock() defer c.mut.Unlock() @@ -288,7 +288,7 @@ func (c *partitionState) pauseFetch(topic string, partition int32, limit uint64) return partTracker.pauseFetch(limit) } -func (c *partitionState) removeTopicPartitions(ctx context.Context, m map[string][]int32) { +func (c *partitionState) removeTopicPartitions(m map[string][]int32) { c.mut.Lock() defer c.mut.Unlock() @@ -355,17 +355,17 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error { if commitErr := c.CommitMarkedOffsets(rctx); commitErr != nil { f.log.Errorf("Commit error on partition revoke: %v", commitErr) } - checkpoints.removeTopicPartitions(rctx, m) + checkpoints.removeTopicPartitions(m) }), - kgo.OnPartitionsLost(func(rctx context.Context, _ *kgo.Client, m map[string][]int32) { + kgo.OnPartitionsLost(func(_ context.Context, _ *kgo.Client, m map[string][]int32) { // No point trying to commit our offsets, just clean up our topic map - checkpoints.removeTopicPartitions(rctx, m) + checkpoints.removeTopicPartitions(m) }), - kgo.OnPartitionsAssigned(func(rctx context.Context, _ *kgo.Client, m map[string][]int32) { + kgo.OnPartitionsAssigned(func(_ context.Context, _ *kgo.Client, m map[string][]int32) { for topic, parts := range m { for _, part := range parts { // Adds the partition to our checkpointer - checkpoints.addRecords(rctx, topic, part, nil, f.cacheLimit) + checkpoints.addRecords(topic, part, nil, f.cacheLimit) } } }), @@ -435,7 +435,7 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error { pauseTopicPartitions := map[string][]int32{} fetches.EachPartition(func(p kgo.FetchTopicPartition) { if len(p.Records) > 0 { - if checkpoints.addRecords(closeCtx, p.Topic, p.Partition, f.recordsToBatch(p.Records), f.cacheLimit) { + if checkpoints.addRecords(p.Topic, p.Partition, f.recordsToBatch(p.Records), f.cacheLimit) { pauseTopicPartitions[p.Topic] = append(pauseTopicPartitions[p.Topic], p.Partition) } } diff --git a/internal/impl/kafka/franz_writer.go b/internal/impl/kafka/franz_writer.go index 9b49071aa9..076437fea5 100644 --- a/internal/impl/kafka/franz_writer.go +++ b/internal/impl/kafka/franz_writer.go @@ -16,6 +16,7 @@ package kafka import ( "context" + "errors" "fmt" "math" "slices" @@ -204,11 +205,12 @@ func FranzProducerOptsFromConfig(conf *service.ParsedConfig) ([]kgo.Opt, error) //------------------------------------------------------------------------------ const ( - kfwFieldTopic = "topic" - kfwFieldKey = "key" - kfwFieldPartition = "partition" - kfwFieldMetadata = "metadata" - kfwFieldTimestamp = "timestamp" + kfwFieldTopic = "topic" + kfwFieldKey = "key" + kfwFieldPartition = "partition" + kfwFieldMetadata = "metadata" + kfwFieldTimestamp = "timestamp" + kfwFieldTimestampMs = "timestamp_ms" ) // FranzWriterConfigFields returns a slice of config fields specifically for @@ -231,17 +233,34 @@ func FranzWriterConfigFields() []*service.ConfigField { Example(`${! timestamp_unix() }`). Example(`${! metadata("kafka_timestamp_unix") }`). Optional(). + Advanced(). + Deprecated(), + service.NewInterpolatedStringField(kfwFieldTimestampMs). + Description("An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used."). + Example(`${! timestamp_unix_milli() }`). + Example(`${! metadata("kafka_timestamp_ms") }`). + Optional(). Advanced(), } } +// FranzWriterConfigLints returns the linter rules for a the writer config. +func FranzWriterConfigLints() string { + return `root = match { + this.partitioner == "manual" && this.partition.or("") == "" => "a partition must be specified when the partitioner is set to manual" + this.partitioner != "manual" && this.partition.or("") != "" => "a partition cannot be specified unless the partitioner is set to manual" + this.timestamp.or("") != "" && this.timestamp_ms.or("") != "" => "both timestamp and timestamp_ms cannot be specified simultaneously" +}` +} + // FranzWriter implements a Kafka writer using the franz-go library. type FranzWriter struct { - Topic *service.InterpolatedString - Key *service.InterpolatedString - Partition *service.InterpolatedString - Timestamp *service.InterpolatedString - MetaFilter *service.MetadataFilter + Topic *service.InterpolatedString + Key *service.InterpolatedString + Partition *service.InterpolatedString + Timestamp *service.InterpolatedString + IsTimestampMs bool + MetaFilter *service.MetadataFilter accessClientFn func(FranzSharedClientUseFn) error yieldClientFn func(context.Context) error @@ -279,11 +298,23 @@ func NewFranzWriterFromConfig(conf *service.ParsedConfig, accessClientFn func(Fr } } + if conf.Contains(kfwFieldTimestamp) && conf.Contains(kfwFieldTimestampMs) { + return nil, errors.New("cannot specify both timestamp and timestamp_ms fields") + } + if conf.Contains(kfwFieldTimestamp) { if w.Timestamp, err = conf.FieldInterpolatedString(kfwFieldTimestamp); err != nil { return nil, err } } + + if conf.Contains(kfwFieldTimestampMs) { + if w.Timestamp, err = conf.FieldInterpolatedString(kfwFieldTimestampMs); err != nil { + return nil, err + } + w.IsTimestampMs = true + } + return &w, nil } @@ -347,7 +378,11 @@ func (w *FranzWriter) BatchToRecords(ctx context.Context, b service.MessageBatch if ts, err := strconv.ParseInt(tsStr, 10, 64); err != nil { return nil, fmt.Errorf("failed to parse timestamp: %w", err) } else { - record.Timestamp = time.Unix(ts, 0) + if w.IsTimestampMs { + record.Timestamp = time.UnixMilli(ts) + } else { + record.Timestamp = time.Unix(ts, 0) + } } } } diff --git a/internal/impl/kafka/input_kafka_franz.go b/internal/impl/kafka/input_kafka_franz.go index e17f87c8d0..bb6301e760 100644 --- a/internal/impl/kafka/input_kafka_franz.go +++ b/internal/impl/kafka/input_kafka_franz.go @@ -41,7 +41,7 @@ This input adds the following metadata fields to each message: - kafka_topic - kafka_partition - kafka_offset -- kafka_timestamp +- kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message - All record headers diff --git a/internal/impl/kafka/input_redpanda.go b/internal/impl/kafka/input_redpanda.go index 6c260ce15b..5c8bd94664 100644 --- a/internal/impl/kafka/input_redpanda.go +++ b/internal/impl/kafka/input_redpanda.go @@ -69,8 +69,8 @@ This input adds the following metadata fields to each message: - kafka_topic - kafka_partition - kafka_offset +- kafka_timestamp_ms - kafka_timestamp_unix -- kafka_timestamp - kafka_tombstone_message - All record headers ` + "```" + ` diff --git a/internal/impl/kafka/input_sarama_kafka.go b/internal/impl/kafka/input_sarama_kafka.go index 6451fe1d2b..4146d0e870 100644 --- a/internal/impl/kafka/input_sarama_kafka.go +++ b/internal/impl/kafka/input_sarama_kafka.go @@ -72,6 +72,7 @@ This input adds the following metadata fields to each message: - kafka_partition - kafka_offset - kafka_lag +- kafka_timestamp_ms - kafka_timestamp_unix - kafka_tombstone_message - All existing message headers (version 0.11+) @@ -417,6 +418,7 @@ func dataToPart(highestOffset int64, data *sarama.ConsumerMessage, multiHeader b part.MetaSetMut("kafka_topic", data.Topic) part.MetaSetMut("kafka_offset", int(data.Offset)) part.MetaSetMut("kafka_lag", lag) + part.MetaSetMut("kafka_timestamp_ms", data.Timestamp.UnixMilli()) part.MetaSetMut("kafka_timestamp_unix", data.Timestamp.Unix()) part.MetaSetMut("kafka_tombstone_message", data.Value == nil) @@ -526,7 +528,7 @@ func (k *kafkaReader) Connect(ctx context.Context) error { if len(k.topicPartitions) > 0 { return k.connectExplicitTopics(ctx, k.saramConf) } - return k.connectBalancedTopics(ctx, k.saramConf) + return k.connectBalancedTopics(k.saramConf) } // ReadBatch attempts to read a message from a kafkaReader topic. diff --git a/internal/impl/kafka/input_sarama_kafka_cg.go b/internal/impl/kafka/input_sarama_kafka_cg.go index b39d964084..7a05c2f02b 100644 --- a/internal/impl/kafka/input_sarama_kafka_cg.go +++ b/internal/impl/kafka/input_sarama_kafka_cg.go @@ -113,7 +113,7 @@ func (k *kafkaReader) ConsumeClaim(sess sarama.ConsumerGroupSession, claim saram //------------------------------------------------------------------------------ -func (k *kafkaReader) connectBalancedTopics(ctx context.Context, config *sarama.Config) error { +func (k *kafkaReader) connectBalancedTopics(config *sarama.Config) error { // Start a new consumer group group, err := sarama.NewConsumerGroup(k.addresses, k.consumerGroup, config) if err != nil { diff --git a/internal/impl/kafka/integration_franz_test.go b/internal/impl/kafka/integration_franz_test.go index c2854b69f2..9307764ee7 100644 --- a/internal/impl/kafka/integration_franz_test.go +++ b/internal/impl/kafka/integration_franz_test.go @@ -359,7 +359,7 @@ output: kafka_franz: seed_brokers: [ localhost:$PORT ] topic: topic-$ID - timestamp: 666 + timestamp_ms: 1000000000000 input: kafka_franz: @@ -368,7 +368,7 @@ input: consumer_group: "blobfish" processors: - mapping: | - root = if metadata("kafka_timestamp_unix") != 666 { "error: invalid timestamp" } + root = if metadata("kafka_timestamp_ms") != 1000000000000 { "error: invalid timestamp" } ` suite := integration.StreamTests( diff --git a/internal/impl/kafka/integration_sarama_test.go b/internal/impl/kafka/integration_sarama_test.go index 238008a4dd..1f0f324238 100644 --- a/internal/impl/kafka/integration_sarama_test.go +++ b/internal/impl/kafka/integration_sarama_test.go @@ -711,7 +711,7 @@ output: kafka: addresses: [ localhost:$PORT ] topic: topic-$ID - timestamp: 666 + timestamp_ms: 1000000000000 input: kafka: @@ -720,7 +720,7 @@ input: consumer_group: "blobfish" processors: - mapping: | - root = if metadata("kafka_timestamp_unix") != 666 { "error: invalid timestamp" } + root = if metadata("kafka_timestamp_ms") != 1000000000000 { "error: invalid timestamp" } ` suite := integration.StreamTests( diff --git a/internal/impl/kafka/integration_test.go b/internal/impl/kafka/integration_test.go index cfde5efa91..79fb667f87 100644 --- a/internal/impl/kafka/integration_test.go +++ b/internal/impl/kafka/integration_test.go @@ -408,7 +408,7 @@ output: redpanda: seed_brokers: [ localhost:$PORT ] topic: topic-$ID - timestamp: 666 + timestamp_ms: 1000000000000 input: redpanda: @@ -417,7 +417,7 @@ input: consumer_group: "blobfish" processors: - mapping: | - root = if metadata("kafka_timestamp_unix") != 666 { "error: invalid timestamp" } + root = if metadata("kafka_timestamp_ms") != 1000000000000 { "error: invalid timestamp" } ` suite := integration.StreamTests( diff --git a/internal/impl/kafka/output_kafka_franz.go b/internal/impl/kafka/output_kafka_franz.go index f234237f9b..0938312a5f 100644 --- a/internal/impl/kafka/output_kafka_franz.go +++ b/internal/impl/kafka/output_kafka_franz.go @@ -43,14 +43,7 @@ Writes a batch of messages to Kafka brokers and waits for acknowledgement before This output often out-performs the traditional ` + "`kafka`" + ` output as well as providing more useful logs and error messages. `). Fields(FranzKafkaOutputConfigFields()...). - LintRule(` -root = if this.partitioner == "manual" { -if this.partition.or("") == "" { -"a partition must be specified when the partitioner is set to manual" -} -} else if this.partition.or("") != "" { -"a partition cannot be specified unless the partitioner is set to manual" -}`) + LintRule(FranzWriterConfigLints()) } // FranzKafkaOutputConfigFields returns the full suite of config fields for a diff --git a/internal/impl/kafka/output_redpanda.go b/internal/impl/kafka/output_redpanda.go index 0d17125840..76ec443d2d 100644 --- a/internal/impl/kafka/output_redpanda.go +++ b/internal/impl/kafka/output_redpanda.go @@ -37,14 +37,7 @@ func redpandaOutputConfig() *service.ConfigSpec { Writes a batch of messages to Kafka brokers and waits for acknowledgement before propagating it back to the input. `). Fields(redpandaOutputConfigFields()...). - LintRule(` -root = if this.partitioner == "manual" { -if this.partition.or("") == "" { -"a partition must be specified when the partitioner is set to manual" -} -} else if this.partition.or("") != "" { -"a partition cannot be specified unless the partitioner is set to manual" -}`) + LintRule(FranzWriterConfigLints()) } func redpandaOutputConfigFields() []*service.ConfigField { diff --git a/internal/impl/kafka/output_sarama_kafka.go b/internal/impl/kafka/output_sarama_kafka.go index 89d5606af7..75bf2763dd 100644 --- a/internal/impl/kafka/output_sarama_kafka.go +++ b/internal/impl/kafka/output_sarama_kafka.go @@ -58,6 +58,7 @@ const ( oskFieldMaxRetries = "max_retries" oskFieldBackoff = "backoff" oskFieldTimestamp = "timestamp" + oskFieldTimestampMs = "timestamp_ms" ) // OSKConfigSpec creates a new config spec for a kafka output. @@ -171,6 +172,13 @@ Unfortunately this error message will appear for a wide range of connection prob Example(`${! timestamp_unix() }`). Example(`${! metadata("kafka_timestamp_unix") }`). Optional(). + Advanced(). + Deprecated(), + service.NewInterpolatedStringField(oskFieldTimestampMs). + Description("An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used."). + Example(`${! timestamp_unix_milli() }`). + Example(`${! metadata("kafka_timestamp_ms") }`). + Optional(). Advanced(), ) } @@ -205,6 +213,7 @@ type kafkaWriter struct { topic *service.InterpolatedString partition *service.InterpolatedString timestamp *service.InterpolatedString + isTimestampMs bool staticHeaders map[string]string metaFilter *service.MetadataExcludeFilter retryAsBatch bool @@ -316,10 +325,21 @@ func NewKafkaWriterFromParsed(conf *service.ParsedConfig, mgr *service.Resources return nil, err } - if conf.Contains("timestamp") { - if k.timestamp, err = conf.FieldInterpolatedString("timestamp"); err != nil { + if conf.Contains(oskFieldTimestamp) && conf.Contains(oskFieldTimestampMs) { + return nil, errors.New("cannot specify both timestamp and timestamp_ms fields") + } + + if conf.Contains(oskFieldTimestamp) { + if k.timestamp, err = conf.FieldInterpolatedString(oskFieldTimestamp); err != nil { + return nil, err + } + } + + if conf.Contains(oskFieldTimestampMs) { + if k.timestamp, err = conf.FieldInterpolatedString(oskFieldTimestampMs); err != nil { return nil, err } + k.isTimestampMs = true } return &k, nil @@ -581,7 +601,11 @@ func (k *kafkaWriter) WriteBatch(ctx context.Context, msg service.MessageBatch) if ts, err := strconv.ParseInt(tsStr, 10, 64); err != nil { return fmt.Errorf("failed to parse timestamp: %w", err) } else { - nextMsg.Timestamp = time.Unix(ts, 0) + if k.isTimestampMs { + nextMsg.Timestamp = time.UnixMilli(ts) + } else { + nextMsg.Timestamp = time.Unix(ts, 0) + } } } } diff --git a/internal/impl/redpanda/processor_data_transform.go b/internal/impl/redpanda/processor_data_transform.go index aca45fdd90..a52ca1fb84 100644 --- a/internal/impl/redpanda/processor_data_transform.go +++ b/internal/impl/redpanda/processor_data_transform.go @@ -69,7 +69,7 @@ You can find out about how transforms work here: https://docs.redpanda.com/curre Field(service.NewInterpolatedStringField(dtpFieldTimestamp). Description("An optional timestamp to set for each message. When left empty, the current timestamp is used."). Example(`${! timestamp_unix() }`). - Example(`${! metadata("kafka_timestamp_unix") }`). + Example(`${! metadata("kafka_timestamp_ms") }`). Optional(). Advanced()). Field(service.NewDurationField(dtpFieldTimeout).