Skip to content

Commit

Permalink
Merge pull request #3019 from redpanda-data/mihaitodor-adjust-kafka-t…
Browse files Browse the repository at this point in the history
…imestamp-format

Adjust Kafka timestamp format
  • Loading branch information
mihaitodor authored Nov 20, 2024
2 parents 82bb8af + fe08ed8 commit 6184108
Show file tree
Hide file tree
Showing 33 changed files with 160 additions and 116 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ All notable changes to this project will be documented in this file.

- New `pg_stream` input supporting change data capture (CDC) from PostgreSQL (@le-vlad)
- Field `metadata_max_age` added to the `redpanda_migrator_offsets` output. (@mihaitodor)
- Field `kafka_timestamp_ms` added to the `kafka`, `kafka_franz`, `redpanda`, `redpanda_common` and `redpanda_migrator` outputs. (@mihaitodor)

### Changed

- `snowflake_streaming` with `schema_evolution.enabled` set to true can now autocreate tables.
- Fields `translate_schema_ids` and `schema_registry_output_resource` added to the `redpanda_migrator` output. (@mihaitodor)
- Fields `backfill_dependencies` and `input_resource` added to the `schema_registry` output. (@mihaitodor)
- The `schema_registry` input and output and the `schema_registry_encode` and `schema_registry_decode` processors now use the `github.com/twmb/franz-go/pkg/sr` SchemaRegistry client. (@mihaitodor)
- Metadata field `kafka_timestamp_ms` added to the `kafka`, `kafka_franz`, `redpanda`, `redpanda_common` and `redpanda_migrator` inputs now contains a unix timestamp with millisecond precision. (@mihaitodor)
- Metadata field `kafka_timestamp` removed from the `kafka`, `kafka_franz`, `redpanda`, `redpanda_common` and `redpanda_migrator` inputs. (@mihaitodor)

## 4.39.0 - 2024-11-07

Expand Down
1 change: 1 addition & 0 deletions docs/modules/components/pages/inputs/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ This input adds the following metadata fields to each message:
- kafka_partition
- kafka_offset
- kafka_lag
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_tombstone_message
- All existing message headers (version 0.11+)
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/components/pages/inputs/kafka_franz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ This input adds the following metadata fields to each message:
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_timestamp
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_tombstone_message
- All record headers
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/components/pages/inputs/redpanda.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ This input adds the following metadata fields to each message:
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_timestamp
- kafka_tombstone_message
- All record headers
```
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/components/pages/inputs/redpanda_common.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ This input adds the following metadata fields to each message:
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_timestamp
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_tombstone_message
- All record headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ This input adds the following metadata fields to each message:
- kafka_partition
- kafka_offset
- kafka_lag
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_tombstone_message
- All record headers
Expand Down
10 changes: 5 additions & 5 deletions docs/modules/components/pages/outputs/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ output:
initial_interval: 3s
max_interval: 10s
max_elapsed_time: 30s
timestamp: ${! timestamp_unix() } # No default (optional)
timestamp_ms: ${! timestamp_unix_milli() } # No default (optional)
```
--
Expand Down Expand Up @@ -831,9 +831,9 @@ max_elapsed_time: 1m
max_elapsed_time: 1h
```
=== `timestamp`
=== `timestamp_ms`
An optional timestamp to set for each message. When left empty, the current timestamp is used.
An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
Expand All @@ -843,9 +843,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter
```yml
# Examples
timestamp: ${! timestamp_unix() }
timestamp_ms: ${! timestamp_unix_milli() }
timestamp: ${! metadata("kafka_timestamp_unix") }
timestamp_ms: ${! metadata("kafka_timestamp_ms") }
```
10 changes: 5 additions & 5 deletions docs/modules/components/pages/outputs/kafka_franz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ output:
metadata:
include_prefixes: []
include_patterns: []
timestamp: ${! timestamp_unix() } # No default (optional)
timestamp_ms: ${! timestamp_unix_milli() } # No default (optional)
max_in_flight: 10
batching:
count: 0
Expand Down Expand Up @@ -576,9 +576,9 @@ include_patterns:
- _timestamp_unix$
```
=== `timestamp`
=== `timestamp_ms`
An optional timestamp to set for each message. When left empty, the current timestamp is used.
An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
Expand All @@ -588,9 +588,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter
```yml
# Examples
timestamp: ${! timestamp_unix() }
timestamp_ms: ${! timestamp_unix_milli() }
timestamp: ${! metadata("kafka_timestamp_unix") }
timestamp_ms: ${! metadata("kafka_timestamp_ms") }
```
=== `max_in_flight`
Expand Down
10 changes: 5 additions & 5 deletions docs/modules/components/pages/outputs/ockam_kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ output:
metadata:
include_prefixes: []
include_patterns: []
timestamp: ${! timestamp_unix() } # No default (optional)
timestamp_ms: ${! timestamp_unix_milli() } # No default (optional)
disable_content_encryption: false
enrollment_ticket: "" # No default (optional)
identity_name: "" # No default (optional)
Expand Down Expand Up @@ -589,9 +589,9 @@ include_patterns:
- _timestamp_unix$
```
=== `kafka.timestamp`
=== `kafka.timestamp_ms`
An optional timestamp to set for each message. When left empty, the current timestamp is used.
An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
Expand All @@ -601,9 +601,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter
```yml
# Examples
timestamp: ${! timestamp_unix() }
timestamp_ms: ${! timestamp_unix_milli() }
timestamp: ${! metadata("kafka_timestamp_unix") }
timestamp_ms: ${! metadata("kafka_timestamp_ms") }
```
=== `disable_content_encryption`
Expand Down
10 changes: 5 additions & 5 deletions docs/modules/components/pages/outputs/redpanda.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ output:
metadata:
include_prefixes: []
include_patterns: []
timestamp: ${! timestamp_unix() } # No default (optional)
timestamp_ms: ${! timestamp_unix_milli() } # No default (optional)
max_in_flight: 256
partitioner: "" # No default (optional)
idempotent_write: true
Expand Down Expand Up @@ -561,9 +561,9 @@ include_patterns:
- _timestamp_unix$
```
=== `timestamp`
=== `timestamp_ms`
An optional timestamp to set for each message. When left empty, the current timestamp is used.
An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
Expand All @@ -573,9 +573,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter
```yml
# Examples
timestamp: ${! timestamp_unix() }
timestamp_ms: ${! timestamp_unix_milli() }
timestamp: ${! metadata("kafka_timestamp_unix") }
timestamp_ms: ${! metadata("kafka_timestamp_ms") }
```
=== `max_in_flight`
Expand Down
10 changes: 5 additions & 5 deletions docs/modules/components/pages/outputs/redpanda_common.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ output:
metadata:
include_prefixes: []
include_patterns: []
timestamp: ${! timestamp_unix() } # No default (optional)
timestamp_ms: ${! timestamp_unix_milli() } # No default (optional)
max_in_flight: 10
batching:
count: 0
Expand Down Expand Up @@ -205,9 +205,9 @@ include_patterns:
- _timestamp_unix$
```
=== `timestamp`
=== `timestamp_ms`
An optional timestamp to set for each message. When left empty, the current timestamp is used.
An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
Expand All @@ -217,9 +217,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter
```yml
# Examples
timestamp: ${! timestamp_unix() }
timestamp_ms: ${! timestamp_unix_milli() }
timestamp: ${! metadata("kafka_timestamp_unix") }
timestamp_ms: ${! metadata("kafka_timestamp_ms") }
```
=== `max_in_flight`
Expand Down
12 changes: 6 additions & 6 deletions docs/modules/components/pages/outputs/redpanda_migrator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ output:
metadata:
include_prefixes: []
include_patterns: []
timestamp: ${! timestamp_unix() } # No default (optional)
timestamp_ms: ${! timestamp_unix_milli() } # No default (optional)
max_in_flight: 10
batching:
count: 0
Expand Down Expand Up @@ -138,7 +138,7 @@ output:
key: ${! metadata("kafka_key") }
partitioner: manual
partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) }
timestamp: ${! metadata("kafka_timestamp_unix").or(timestamp_unix()) }
timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) }
input_resource: redpanda_migrator_input
max_in_flight: 1
```
Expand Down Expand Up @@ -617,9 +617,9 @@ include_patterns:
- _timestamp_unix$
```
=== `timestamp`
=== `timestamp_ms`
An optional timestamp to set for each message. When left empty, the current timestamp is used.
An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
Expand All @@ -629,9 +629,9 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter
```yml
# Examples
timestamp: ${! timestamp_unix() }
timestamp_ms: ${! timestamp_unix_milli() }
timestamp: ${! metadata("kafka_timestamp_unix") }
timestamp_ms: ${! metadata("kafka_timestamp_ms") }
```
=== `max_in_flight`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter
timestamp: ${! timestamp_unix() }
timestamp: ${! metadata("kafka_timestamp_unix") }
timestamp: ${! metadata("kafka_timestamp_ms") }
```
=== `timeout`
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/kafka/enterprise/redpanda_common_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ This input adds the following metadata fields to each message:
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_timestamp
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_tombstone_message
- All record headers
Expand Down
9 changes: 1 addition & 8 deletions internal/impl/kafka/enterprise/redpanda_common_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,7 @@ func redpandaCommonOutputConfig() *service.ConfigSpec {
Default(10),
service.NewBatchPolicyField(roFieldBatching),
).
LintRule(`
root = if this.partitioner == "manual" {
if this.partition.or("") == "" {
"a partition must be specified when the partitioner is set to manual"
}
} else if this.partition.or("") != "" {
"a partition cannot be specified unless the partitioner is set to manual"
}`).
LintRule(kafka.FranzWriterConfigLints()).
Example("Simple Output", "Data is generated and written to a topic bar, targetting the cluster configured within the redpanda block at the bottom. This is useful as it allows us to configure TLS and SASL only once for potentially multiple inputs and outputs.", `
input:
generate:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ tests:
redpanda_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]
consumer_group: "migrator"
schema_registry:
url: http://localhost:8081

Expand All @@ -133,11 +134,13 @@ tests:
- redpanda_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator"
- kafka_franz:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "__consumer_offsets" ]
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator_offsets"

Expand All @@ -146,6 +149,7 @@ tests:
redpanda_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]
consumer_group: "migrator"
schema_registry:
url: http://localhost:8081

Expand All @@ -168,15 +172,15 @@ tests:
- broker:
inputs:
- redpanda_migrator:
seed_brokers:
- 127.0.0.1:9092
topics:
- foobar
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator"
- kafka_franz:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "__consumer_offsets" ]
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator_offsets"

Expand All @@ -185,19 +189,20 @@ tests:
redpanda_migrator:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]
consumer_group: "migrator"

expected:
broker:
inputs:
- redpanda_migrator:
seed_brokers:
- 127.0.0.1:9092
topics:
- foobar
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "foobar" ]
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator"
- kafka_franz:
seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "__consumer_offsets" ]
consumer_group: "migrator"
processors:
- mapping: meta input_label = "redpanda_migrator_offsets"
Loading

0 comments on commit 6184108

Please sign in to comment.