From 4671de5e29a6b644c7c1fa9621a9c0c5b5cd8010 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Mon, 17 Aug 2020 11:51:53 -0700 Subject: [PATCH 1/3] Passing Kafka headers to downstream systems in KafkaConnectorTask --- .../datastream/connectors/kafka/KafkaConnectorTask.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorTask.java index 980593c34..c3c7e4aa7 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorTask.java @@ -142,7 +142,8 @@ protected DatastreamProducerRecord translate(ConsumerRecord fromKafka, Ins eventsSourceTimestamp = fromKafka.timestamp(); } - BrooklinEnvelope envelope = new BrooklinEnvelope(fromKafka.key(), fromKafka.value(), null, metadata); + BrooklinEnvelope envelope = new BrooklinEnvelope(fromKafka.key(), fromKafka.value(), null, + fromKafka.headers(), metadata); DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); builder.addEvent(envelope); builder.setEventsSourceTimestamp(eventsSourceTimestamp); From cbc0df9f198f4c83954877bddf094a81771d0824 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Thu, 1 Oct 2020 15:05:35 -0700 Subject: [PATCH 2/3] Added committed offsets in KafkaTopicPartitionTracker and diag endpoint response --- .../AbstractKafkaBasedConnectorTask.java | 1 + .../kafka/AbstractKafkaConnector.java | 4 +- .../kafka/KafkaConnectorDiagUtils.java | 17 ++++-- .../kafka/KafkaConsumerOffsetsResponse.java | 23 +++++--- .../kafka/KafkaTopicPartitionTracker.java | 51 ++++++++++++++---- .../mirrormaker/TestKafkaConsumerOffsets.java | 53 +++++++++++++------ 6 files changed, 106 insertions(+), 43 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index bccc18c34..3648499c9 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -598,6 +598,7 @@ protected void commitWithRetries(Consumer consumer, Optional { KafkaTopicPartitionTracker tracker = connectorTaskEntry.getConnectorTask().getKafkaTopicPartitionTracker(); - KafkaConsumerOffsetsResponse response = new KafkaConsumerOffsetsResponse(tracker.getConsumerOffsets(), - tracker.getConsumerGroupId()); + KafkaConsumerOffsetsResponse response = new KafkaConsumerOffsetsResponse(tracker.getConsumedOffsets(), + tracker.getCommittedOffsets(), tracker.getConsumerGroupId()); serializedResponses.add(response); }); } diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorDiagUtils.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorDiagUtils.java index 873827a65..780f65918 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorDiagUtils.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorDiagUtils.java @@ -61,7 +61,7 @@ public static String reduceConsumerOffsetsResponses(Map response } responseList.forEach(response -> { - if (response.getConsumerOffsets() == null || response.getConsumerOffsets().isEmpty()) { + if (response.getConsumedOffsets() == null || response.getConsumedOffsets().isEmpty()) { logger.warn("Empty consumer offset map from instance {}. Ignoring the result", instance); } else if (StringUtils.isBlank(response.getConsumerGroupId())) { logger.warn("Invalid consumer group id from instance {}, Ignoring the result", instance); @@ -69,10 +69,17 @@ public static String reduceConsumerOffsetsResponses(Map response KafkaConsumerOffsetsResponse reducedResponse = result.computeIfAbsent(response.getConsumerGroupId(), k -> new KafkaConsumerOffsetsResponse(response.getConsumerGroupId())); - Map> consumerOffsets = response.getConsumerOffsets(); - consumerOffsets.forEach((topic, partitionOffsets) -> { - Map> reducedConsumerOffsets = reducedResponse.getConsumerOffsets(); - Map reducedPartitionOffsets = reducedConsumerOffsets.computeIfAbsent(topic, k -> new HashMap<>()); + Map> consumedOffsets = response.getConsumedOffsets(); + consumedOffsets.forEach((topic, partitionOffsets) -> { + Map> reducedConsumedOffsets = reducedResponse.getConsumedOffsets(); + Map reducedPartitionOffsets = reducedConsumedOffsets.computeIfAbsent(topic, k -> new HashMap<>()); + reducedPartitionOffsets.putAll(partitionOffsets); + }); + + Map> committedOffsets = response.getCommittedOffsets(); + committedOffsets.forEach((topic, partitionOffsets) -> { + Map> reducedCommittedOffsets = reducedResponse.getCommittedOffsets(); + Map reducedPartitionOffsets = reducedCommittedOffsets.computeIfAbsent(topic, k -> new HashMap<>()); reducedPartitionOffsets.putAll(partitionOffsets); }); } diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConsumerOffsetsResponse.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConsumerOffsetsResponse.java index bdf0a4afa..be29f7ffc 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConsumerOffsetsResponse.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConsumerOffsetsResponse.java @@ -16,17 +16,21 @@ */ public class KafkaConsumerOffsetsResponse { private final String _consumerGroupId; - private final Map> _consumerOffsets; + private final Map> _consumedOffsets; + private final Map> _committedOffsets; /** * Constructor for {@link KafkaConsumerOffsetsResponse} - * @param consumerOffsets Consumer offsets for all topic partitions + * @param consumedOffsets Consumed offsets for all topic partitions + * @param committedOffsets Committed offsets for all topic partitions * @param consumerGroupId Consumer group ID */ - public KafkaConsumerOffsetsResponse(@JsonProperty("consumerOffsets") Map> consumerOffsets, + public KafkaConsumerOffsetsResponse(@JsonProperty("consumedOffsets") Map> consumedOffsets, + @JsonProperty("committedOffsets") Map> committedOffsets, @JsonProperty("consumerGroupId") String consumerGroupId) { _consumerGroupId = consumerGroupId; - _consumerOffsets = consumerOffsets; + _consumedOffsets = consumedOffsets; + _committedOffsets = committedOffsets; } /** @@ -34,15 +38,18 @@ public KafkaConsumerOffsetsResponse(@JsonProperty("consumerOffsets") Map(), consumerGroupId); + this(new HashMap<>(), new HashMap<>(), consumerGroupId); } - public Map> getConsumerOffsets() { - return _consumerOffsets; + public Map> getConsumedOffsets() { + return _consumedOffsets; + } + + public Map> getCommittedOffsets() { + return _committedOffsets; } public String getConsumerGroupId() { return _consumerGroupId; } - } diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaTopicPartitionTracker.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaTopicPartitionTracker.java index b74bccbe7..4fedf41b0 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaTopicPartitionTracker.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaTopicPartitionTracker.java @@ -16,6 +16,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.jetbrains.annotations.NotNull; @@ -32,8 +33,8 @@ public class KafkaTopicPartitionTracker { private final String _consumerGroupId; private final Map> _topicPartitions = new ConcurrentHashMap<>(); - - private final Map> _consumerOffsets = new ConcurrentHashMap<>(); + private final Map> _consumedOffsets = new ConcurrentHashMap<>(); + private final Map> _committedOffsets = new ConcurrentHashMap<>(); /** * Constructor for KafkaTopicPartitionTracker @@ -75,22 +76,30 @@ public void onPartitionsRevoked(@NotNull Collection topicPartiti } }); - // Remove consumer offsets for partitions that have been revoked. The reason to remove the consumer offsets + // Remove consumed offsets for partitions that have been revoked. The reason to remove the consumed offsets // here is that another host may handle these partitions due to rebalance, and we don't want to have duplicate // consumer offsets for affected partitions (even though the ones with larger offsets wins). + removeOffsetsForTopicPartition(topicPartitions, _consumedOffsets); + + // Remove committed offsets for partitions that have been revoked. + removeOffsetsForTopicPartition(topicPartitions, _committedOffsets); + } + + private void removeOffsetsForTopicPartition(@NotNull Collection topicPartitions, + Map> committedOffsets) { topicPartitions.forEach(topicPartition -> { - Map partitions = _consumerOffsets.get(topicPartition.topic()); + Map partitions = committedOffsets.get(topicPartition.topic()); if (partitions != null) { partitions.remove(topicPartition.partition()); if (partitions.isEmpty()) { - _consumerOffsets.remove(topicPartition.topic()); + committedOffsets.remove(topicPartition.topic()); } } }); } /** - * Updates consumer offsets for partitions that have been polled + * Updates consumed offsets for partitions that have been polled * @param consumerRecords consumer records that have been the result of the poll */ public void onPartitionsPolled(@NotNull ConsumerRecords consumerRecords) { @@ -100,22 +109,42 @@ public void onPartitionsPolled(@NotNull ConsumerRecords consumerRecords) { List> partitionRecords = consumerRecords.records(topicPartition); ConsumerRecord lastRecord = partitionRecords.get(partitionRecords.size() - 1); - Map partitionOffsetMap = _consumerOffsets.computeIfAbsent(topicPartition.topic(), + Map partitionOffsetMap = _consumedOffsets.computeIfAbsent(topicPartition.topic(), k -> new ConcurrentHashMap<>()); partitionOffsetMap.put(topicPartition.partition(), lastRecord.offset()); }); } + /** + * Updates committed offsets for topic partitions + * @param offsetMap offsets for topic partitions that have been committed + */ + public void onOffsetsCommitted(Map offsetMap) { + Collection topicPartitions = offsetMap.keySet(); + + topicPartitions.forEach(topicPartition -> { + Map partitionOffsetMap = _committedOffsets.computeIfAbsent(topicPartition.topic(), + k -> new ConcurrentHashMap<>()); + partitionOffsetMap.put(topicPartition.partition(), offsetMap.get(topicPartition).offset()); + }); + } + public Map> getTopicPartitions() { return Collections.unmodifiableMap(_topicPartitions); } /** - * Returns a map of consumer offsets for all topic partitions - * @return A map of consumer offsets for all topic partitions. + * Returns a map of consumed offsets for all topic partitions + */ + public Map> getConsumedOffsets() { + return Collections.unmodifiableMap(_consumedOffsets); + } + + /** + * Returns a map of committed offsets for all topic partitions */ - public Map> getConsumerOffsets() { - return Collections.unmodifiableMap(_consumerOffsets); + public Map> getCommittedOffsets() { + return Collections.unmodifiableMap(_committedOffsets); } public final String getConsumerGroupId() { diff --git a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaConsumerOffsets.java b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaConsumerOffsets.java index d8ec121eb..9d1eeeb5e 100644 --- a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaConsumerOffsets.java +++ b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaConsumerOffsets.java @@ -102,7 +102,7 @@ public void testConsumerOffsetsReducer() { partitionOffsets2.put(1, 10L); topicPartitionOffsets1.put(topic2, partitionOffsets2); - responseList1.add(new KafkaConsumerOffsetsResponse(topicPartitionOffsets1, consumerGroup1)); + responseList1.add(new KafkaConsumerOffsetsResponse(topicPartitionOffsets1, topicPartitionOffsets1, consumerGroup1)); // instance 1 consumer group 2 Map> topicPartitionOffsets2 = new HashMap<>(); @@ -117,7 +117,7 @@ public void testConsumerOffsetsReducer() { partitionOffsets4.put(1, 20L); topicPartitionOffsets2.put(topic2, partitionOffsets4); - responseList1.add(new KafkaConsumerOffsetsResponse(topicPartitionOffsets2, consumerGroup2)); + responseList1.add(new KafkaConsumerOffsetsResponse(topicPartitionOffsets2, topicPartitionOffsets2, consumerGroup2)); // constructing instance2 consumer offsets List responseList2 = new ArrayList<>(); @@ -130,7 +130,7 @@ public void testConsumerOffsetsReducer() { partitionOffsets5.put(3, 10L); topicPartitionOffsets3.put(topic1, partitionOffsets5); - responseList2.add(new KafkaConsumerOffsetsResponse(topicPartitionOffsets3, consumerGroup1)); + responseList2.add(new KafkaConsumerOffsetsResponse(topicPartitionOffsets3, topicPartitionOffsets3, consumerGroup1)); // instance 2 consumer group 3 Map> topicPartitionOffsets4 = new HashMap<>(); @@ -138,7 +138,7 @@ public void testConsumerOffsetsReducer() { Map partitionOffsets6 = new HashMap<>(); partitionOffsets6.put(0, 30L); topicPartitionOffsets4.put(topic2, partitionOffsets6); - responseList2.add(new KafkaConsumerOffsetsResponse(topicPartitionOffsets4, consumerGroup3)); + responseList2.add(new KafkaConsumerOffsetsResponse(topicPartitionOffsets4, topicPartitionOffsets4, consumerGroup3)); // reducing responses and asserting correctness Map responseMap = new HashMap<>(); @@ -154,22 +154,30 @@ public void testConsumerOffsetsReducer() { KafkaConsumerOffsetsResponse cg1Response = responseList.stream(). filter(r -> r.getConsumerGroupId().equals(consumerGroup1)).findAny().orElse(null); Assert.assertNotNull(cg1Response); - Assert.assertEquals(cg1Response.getConsumerOffsets().keySet().size(), 2); // cg1 consumes both topics - Assert.assertEquals(cg1Response.getConsumerOffsets().get(topic1).keySet().size(), 4); // cg1 consumes 4 partitions for topic 1 - Assert.assertEquals(cg1Response.getConsumerOffsets().get(topic2).keySet().size(), 2); // cg1 consumes 2 partitions for topic 2 + Assert.assertEquals(cg1Response.getConsumedOffsets().keySet().size(), 2); // cg1 consumes both topics + Assert.assertEquals(cg1Response.getCommittedOffsets().keySet().size(), 2); + Assert.assertEquals(cg1Response.getConsumedOffsets().get(topic1).keySet().size(), 4); // cg1 consumes 4 partitions for topic 1 + Assert.assertEquals(cg1Response.getCommittedOffsets().get(topic1).keySet().size(), 4); + Assert.assertEquals(cg1Response.getConsumedOffsets().get(topic2).keySet().size(), 2); // cg1 consumes 2 partitions for topic 2 + Assert.assertEquals(cg1Response.getCommittedOffsets().get(topic2).keySet().size(), 2); KafkaConsumerOffsetsResponse cg2Response = responseList.stream(). filter(r -> r.getConsumerGroupId().equals(consumerGroup2)).findAny().orElse(null); Assert.assertNotNull(cg2Response); - Assert.assertEquals(cg2Response.getConsumerOffsets().keySet().size(), 2); // cg2 consumers both topics - Assert.assertEquals(cg2Response.getConsumerOffsets().get(topic1).keySet().size(), 2); // cg2 consumes 2 partitions for topic 1 - Assert.assertEquals(cg2Response.getConsumerOffsets().get(topic2).keySet().size(), 2); // cg2 consumes 2 partitions for topic 2 + Assert.assertEquals(cg2Response.getConsumedOffsets().keySet().size(), 2); // cg2 consumers both topics + Assert.assertEquals(cg2Response.getCommittedOffsets().keySet().size(), 2); + Assert.assertEquals(cg2Response.getConsumedOffsets().get(topic1).keySet().size(), 2); // cg2 consumes 2 partitions for topic 1 + Assert.assertEquals(cg2Response.getCommittedOffsets().get(topic1).keySet().size(), 2); + Assert.assertEquals(cg2Response.getConsumedOffsets().get(topic2).keySet().size(), 2); // cg2 consumes 2 partitions for topic 2 + Assert.assertEquals(cg2Response.getCommittedOffsets().get(topic2).keySet().size(), 2); KafkaConsumerOffsetsResponse cg3Response = responseList.stream(). filter(r -> r.getConsumerGroupId().equals(consumerGroup3)).findAny().orElse(null); Assert.assertNotNull(cg3Response); - Assert.assertEquals(cg3Response.getConsumerOffsets().keySet().size(), 1); // cg3 consumes only topic 2 - Assert.assertEquals(cg3Response.getConsumerOffsets().get(topic2).size(), 1); // cg3 consumes 1 partition for topic 2 + Assert.assertEquals(cg3Response.getConsumedOffsets().keySet().size(), 1); // cg3 consumes only topic 2 + Assert.assertEquals(cg3Response.getCommittedOffsets().keySet().size(), 1); + Assert.assertEquals(cg3Response.getConsumedOffsets().get(topic2).size(), 1); // cg3 consumes 1 partition for topic 2 + Assert.assertEquals(cg3Response.getCommittedOffsets().get(topic2).size(), 1); } private boolean testConsumerOffsetsAreUpdated(KafkaMirrorMakerConnector connector) { @@ -184,19 +192,30 @@ private boolean testConsumerOffsetsAreUpdated(KafkaMirrorMakerConnector connecto KafkaConsumerOffsetsResponse offsetResponse = responseList.get(0); // check that all topic partitions were polled and offsets were updated - boolean allTopicsWerePolled = offsetResponse.getConsumerOffsets().size() == TOPIC_COUNT; - boolean allPartitionsWerePolled = offsetResponse.getConsumerOffsets().values().stream(). + boolean allTopicsWerePolled = offsetResponse.getConsumedOffsets().size() == TOPIC_COUNT; + boolean allPartitionsWerePolled = offsetResponse.getConsumedOffsets().values().stream(). allMatch(m -> m.keySet().size() == PARTITION_COUNT); if (!allTopicsWerePolled || !allPartitionsWerePolled) { return false; } - for (String topic : offsetResponse.getConsumerOffsets().keySet()) { - Map partitionOffsets = offsetResponse.getConsumerOffsets().get(topic); + for (String topic : offsetResponse.getConsumedOffsets().keySet()) { + Map partitionOffsets = offsetResponse.getConsumedOffsets().get(topic); for (Integer partition : partitionOffsets.keySet()) { - // check consumer offsets. Note that offsets are zero based + // check consumed offsets. Note that offsets are zero based + if (partitionOffsets.get(partition) != PARTITION_MESSAGE_COUNT - 1) { + return false; + } + } + } + + for (String topic : offsetResponse.getCommittedOffsets().keySet()) { + Map partitionOffsets = offsetResponse.getCommittedOffsets().get(topic); + + for (Integer partition : partitionOffsets.keySet()) { + // check committed offsets. Note that offsets are zero based if (partitionOffsets.get(partition) != PARTITION_MESSAGE_COUNT - 1) { return false; } From bd3434a87f76fa1953265ef22241547ac821265d Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Tue, 10 Nov 2020 13:47:35 -0800 Subject: [PATCH 3/3] Reduced amount of logs printed in ServerComponentHealthAggregator --- .../server/diagnostics/ServerComponentHealthAggregator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datastream-server-restli/src/main/java/com/linkedin/datastream/server/diagnostics/ServerComponentHealthAggregator.java b/datastream-server-restli/src/main/java/com/linkedin/datastream/server/diagnostics/ServerComponentHealthAggregator.java index 0579321e0..5b68695be 100644 --- a/datastream-server-restli/src/main/java/com/linkedin/datastream/server/diagnostics/ServerComponentHealthAggregator.java +++ b/datastream-server-restli/src/main/java/com/linkedin/datastream/server/diagnostics/ServerComponentHealthAggregator.java @@ -92,8 +92,7 @@ public List getResponses(String componentType, String com if (!errorMessage.isEmpty()) { errorResponses.put(hostName, errorMessage); } else { - String message = "Received REST response from the host: " + dmsUri + " with status: " + response.getStatus(); - LOG.info(message); + LOG.info("Received REST response from the host: " + dmsUri); responses.put(hostName, response.getStatus()); } }