From 84a5c66a1b4b5d19a7a2657583fa78f4d692b3ca Mon Sep 17 00:00:00 2001 From: Santhosh Date: Thu, 18 Aug 2016 18:42:55 +0530 Subject: [PATCH] upgrade kakfa consumer client version to 0.10.0 --- pom.xml | 2 -- .../pinterest/secor/common/LogFilePath.java | 11 +++---- .../secor/io/FileReaderWriterFactory.java | 3 +- .../secor/tools/TestLogMessageProducer.java | 12 +++---- .../parser/PatternDateMessageParserTest.java | 1 - .../secor/performance/PerformanceTest.java | 33 +++++++++---------- 6 files changed, 27 insertions(+), 35 deletions(-) diff --git a/pom.xml b/pom.xml index 6eaf66cf0..1a0a6619e 100644 --- a/pom.xml +++ b/pom.xml @@ -222,7 +222,6 @@ true - org.codehaus.mojo @@ -381,7 +380,6 @@ - diff --git a/src/main/java/com/pinterest/secor/common/LogFilePath.java b/src/main/java/com/pinterest/secor/common/LogFilePath.java index 382f872d2..8471cd34d 100644 --- a/src/main/java/com/pinterest/secor/common/LogFilePath.java +++ b/src/main/java/com/pinterest/secor/common/LogFilePath.java @@ -16,12 +16,6 @@ */ package com.pinterest.secor.common; -import com.pinterest.secor.message.ParsedMessage; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.text.StrSubstitutor; - -import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -31,6 +25,11 @@ import java.util.Map; import java.util.Random; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.text.StrSubstitutor; + +import com.pinterest.secor.message.ParsedMessage; + /** * LogFilePath represents path of a log file. It contains convenience method for building and * decomposing paths. diff --git a/src/main/java/com/pinterest/secor/io/FileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/FileReaderWriterFactory.java index 4b76b93cf..0a5436775 100644 --- a/src/main/java/com/pinterest/secor/io/FileReaderWriterFactory.java +++ b/src/main/java/com/pinterest/secor/io/FileReaderWriterFactory.java @@ -17,10 +17,9 @@ package com.pinterest.secor.io; -import com.pinterest.secor.common.LogFilePath; import org.apache.hadoop.io.compress.CompressionCodec; -import java.io.IOException; +import com.pinterest.secor.common.LogFilePath; /** * Provides a single factory class to make FileReader and FileWriter diff --git a/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java b/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java index a8eb86023..07f3c2f60 100644 --- a/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java +++ b/src/main/java/com/pinterest/secor/tools/TestLogMessageProducer.java @@ -16,10 +16,9 @@ */ package com.pinterest.secor.tools; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TBinaryProtocol; @@ -64,8 +63,7 @@ public void run() { properties.put("key.serializer.class", "kafka.serializer.StringEncoder"); properties.put("request.required.acks", "1"); - ProducerConfig config = new ProducerConfig(properties); - Producer producer = new Producer(config); + Producer producer = new KafkaProducer(properties); TProtocolFactory protocol = null; if(mType.equals("json")) { @@ -92,7 +90,7 @@ public void run() { } catch(TException e) { throw new RuntimeException("Failed to serialize message " + testMessage, e); } - KeyedMessage data = new KeyedMessage( + ProducerRecord data = new ProducerRecord( mTopic, Integer.toString(i), bytes); producer.send(data); } diff --git a/src/test/java/com/pinterest/secor/parser/PatternDateMessageParserTest.java b/src/test/java/com/pinterest/secor/parser/PatternDateMessageParserTest.java index 7eeb75dc7..abe0050dd 100644 --- a/src/test/java/com/pinterest/secor/parser/PatternDateMessageParserTest.java +++ b/src/test/java/com/pinterest/secor/parser/PatternDateMessageParserTest.java @@ -21,7 +21,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.mockito.stubbing.OngoingStubbing; import org.powermock.modules.junit4.PowerMockRunner; import com.pinterest.secor.common.SecorConfig; diff --git a/src/test/java/com/pinterest/secor/performance/PerformanceTest.java b/src/test/java/com/pinterest/secor/performance/PerformanceTest.java index 59dd1e434..1f66c2998 100644 --- a/src/test/java/com/pinterest/secor/performance/PerformanceTest.java +++ b/src/test/java/com/pinterest/secor/performance/PerformanceTest.java @@ -24,7 +24,19 @@ import java.util.Properties; import java.util.Random; +import kafka.admin.AdminUtils; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.TopicAndPartition; +import kafka.common.TopicExistsException; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -38,17 +50,6 @@ import com.pinterest.secor.util.FileUtil; import com.pinterest.secor.util.RateLimitUtil; -import kafka.admin.AdminUtils; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.TopicAndPartition; -import kafka.common.TopicExistsException; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; -import kafka.utils.ZKStringSerializer$; - /** * A performance test for secor * @@ -94,10 +95,8 @@ public static void main(String[] args) throws Exception { props.put("request.required.acks", "1"); props.put("producer.type", "async"); - ProducerConfig producerConfig = new ProducerConfig(props); - Producer producer = new Producer( - producerConfig); + Producer producer = new KafkaProducer(props); long size = 0; int message_size = Integer.parseInt(args[3]); @@ -109,7 +108,7 @@ public static void main(String[] args) throws Exception { Arrays.fill(payload, (byte) 1); String msg = new String(payload, "UTF-8"); size += msg.length(); - KeyedMessage data = new KeyedMessage( + ProducerRecord data = new ProducerRecord( topic, ip, msg); producer.send(data); } @@ -238,13 +237,13 @@ private static void createTopics(List topics, int partitions, String zkConfig) throws InterruptedException { ZkClient zkClient = createZkClient(zkConfig); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); try { Properties props = new Properties(); int replicationFactor = 1; for (String topic : topics) { - AdminUtils.createTopic(zkClient, topic, partitions, - replicationFactor, props); + AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, props, null); } } catch (TopicExistsException e) { System.out.println(e.getMessage());