From e4467c8ae58e1fb0e6f53432aa55a4a2c28730ec Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 28 Mar 2022 17:15:31 -0700 Subject: [PATCH] Use less ancient avro, maintain same wire format by @radai-rosenblatt (#359) --- build.gradle | 7 +++- .../linkedin/xinfra/monitor/common/Utils.java | 36 ++++++++----------- .../GraphiteMetricsReporterService.java | 2 +- .../SignalFxMetricsReporterService.java | 2 +- .../StatsdMetricsReporterService.java | 2 +- 5 files changed, 23 insertions(+), 26 deletions(-) diff --git a/build.gradle b/build.gradle index c2ae00aa..b370bb09 100644 --- a/build.gradle +++ b/build.gradle @@ -27,12 +27,15 @@ allprojects { repositories { mavenCentral() + maven { + url "https://linkedin.jfrog.io/artifactory/avro-util/" + } } dependencies { compile 'net.sourceforge.argparse4j:argparse4j:0.5.0' compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1' - compile 'org.apache.avro:avro:1.4.1' + compile 'org.apache.avro:avro:1.9.2' compile 'org.json:json:20140107' compile 'org.jolokia:jolokia-jvm:1.6.2' compile 'net.savantly:graphite-client:1.1.0-RELEASE' @@ -40,6 +43,8 @@ allprojects { compile 'com.signalfx.public:signalfx-codahale:0.0.47' compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0' compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1' + compile 'org.apache.commons:commons-lang3:3.12.0' + compile 'com.linkedin.avroutil1:helper-all:0.2.81' testCompile 'org.mockito:mockito-core:2.24.0' testCompile 'org.testng:testng:6.8.8' } diff --git a/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java b/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java index 4a6db971..d920437d 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java +++ b/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java @@ -13,7 +13,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; -import java.io.ByteArrayOutputStream; +import com.linkedin.avroutil1.compatibility.AvroCodecUtil; +import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.avroutil1.compatibility.AvroVersion; import java.io.IOException; import java.lang.management.ManagementFactory; import java.time.Duration; @@ -34,10 +36,9 @@ import javax.management.ObjectName; import kafka.admin.BrokerMetadata; import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.JsonEncoder; +import org.apache.avro.io.Decoder; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult; @@ -45,7 +46,6 @@ import org.apache.kafka.clients.admin.PartitionReassignment; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicExistsException; -import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -224,29 +224,21 @@ public static String jsonFromFields(String topic, long idx, long timestamp, Stri * @return GenericRecord that is de-serialized from kafka message w.r.t. expected schema. */ public static GenericRecord genericRecordFromJson(String message) { - GenericRecord record = new GenericData.Record(DefaultTopicSchema.MESSAGE_V0); - JSONObject jsonObject = new JSONObject(message); - record.put(DefaultTopicSchema.TOPIC_FIELD.name(), jsonObject.getString(DefaultTopicSchema.TOPIC_FIELD.name())); - record.put(DefaultTopicSchema.INDEX_FIELD.name(), jsonObject.getLong(DefaultTopicSchema.INDEX_FIELD.name())); - record.put(DefaultTopicSchema.TIME_FIELD.name(), jsonObject.getLong(DefaultTopicSchema.TIME_FIELD.name())); - record.put(DefaultTopicSchema.PRODUCER_ID_FIELD.name(), - jsonObject.getString(DefaultTopicSchema.PRODUCER_ID_FIELD.name())); - record.put(DefaultTopicSchema.CONTENT_FIELD.name(), jsonObject.getString(DefaultTopicSchema.CONTENT_FIELD.name())); - return record; + try { + Decoder jsonDecoder = AvroCompatibilityHelper.newCompatibleJsonDecoder(DefaultTopicSchema.MESSAGE_V0, message); + GenericDatumReader reader = new GenericDatumReader<>(DefaultTopicSchema.MESSAGE_V0, DefaultTopicSchema.MESSAGE_V0); + return reader.read(null, jsonDecoder); + } catch (Exception e) { + throw new IllegalStateException("unable to deserialize " + message, e); + } } public static String jsonFromGenericRecord(GenericRecord record) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - GenericDatumWriter writer = new GenericDatumWriter<>(DefaultTopicSchema.MESSAGE_V0); - try { - Encoder encoder = new JsonEncoder(DefaultTopicSchema.MESSAGE_V0, out); - writer.write(record, encoder); - encoder.flush(); + return AvroCodecUtil.serializeJson(record, AvroVersion.AVRO_1_4); } catch (IOException e) { - LOG.error("Unable to serialize avro record due to error " + e); + throw new IllegalStateException("Unable to serialize avro record due to error: " + record, e); } - return out.toString(); } public static List getMBeanAttributeValues(String mbeanExpr, String attributeExpr) { diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java index 043f34b5..24512d7d 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java @@ -23,7 +23,7 @@ import net.savantly.graphite.GraphiteClient; import net.savantly.graphite.GraphiteClientFactory; import net.savantly.graphite.impl.SimpleCarbonMetric; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java index 9678ed10..e84f1200 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java @@ -19,7 +19,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterService.java index 079beb78..77ce1307 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterService.java @@ -20,7 +20,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;