Skip to content

Commit

Permalink
upgrade kakfa consumer client version to 0.10.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Santhosh authored and Santhosh committed Aug 18, 2016
1 parent 43c0d1d commit 84a5c66
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 35 deletions.
2 changes: 0 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@
<filtering>true</filtering>
</resource>
</resources>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down Expand Up @@ -381,7 +380,6 @@
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

<profiles>
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/com/pinterest/secor/common/LogFilePath.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@
*/
package com.pinterest.secor.common;

import com.pinterest.secor.message.ParsedMessage;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrSubstitutor;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -31,6 +25,11 @@
import java.util.Map;
import java.util.Random;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrSubstitutor;

import com.pinterest.secor.message.ParsedMessage;

/**
* LogFilePath represents path of a log file. It contains convenience method for building and
* decomposing paths.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package com.pinterest.secor.io;


import com.pinterest.secor.common.LogFilePath;
import org.apache.hadoop.io.compress.CompressionCodec;

import java.io.IOException;
import com.pinterest.secor.common.LogFilePath;

/**
* Provides a single factory class to make FileReader and FileWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
*/
package com.pinterest.secor.tools;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
Expand Down Expand Up @@ -64,8 +63,7 @@ public void run() {
properties.put("key.serializer.class", "kafka.serializer.StringEncoder");
properties.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(properties);
Producer<String, byte[]> producer = new Producer<String, byte[]>(config);
Producer<String, byte[]> producer = new KafkaProducer<String, byte[]>(properties);

TProtocolFactory protocol = null;
if(mType.equals("json")) {
Expand All @@ -92,7 +90,7 @@ public void run() {
} catch(TException e) {
throw new RuntimeException("Failed to serialize message " + testMessage, e);
}
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(
ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(
mTopic, Integer.toString(i), bytes);
producer.send(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
import org.powermock.modules.junit4.PowerMockRunner;

import com.pinterest.secor.common.SecorConfig;
Expand Down
33 changes: 16 additions & 17 deletions src/test/java/com/pinterest/secor/performance/PerformanceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,19 @@
import java.util.Properties;
import java.util.Random;

import kafka.admin.AdminUtils;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.common.TopicExistsException;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -38,17 +50,6 @@
import com.pinterest.secor.util.FileUtil;
import com.pinterest.secor.util.RateLimitUtil;

import kafka.admin.AdminUtils;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.common.TopicExistsException;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.ZKStringSerializer$;

/**
* A performance test for secor
*
Expand Down Expand Up @@ -94,10 +95,8 @@ public static void main(String[] args) throws Exception {
props.put("request.required.acks", "1");
props.put("producer.type", "async");

ProducerConfig producerConfig = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(
producerConfig);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
long size = 0;
int message_size = Integer.parseInt(args[3]);

Expand All @@ -109,7 +108,7 @@ public static void main(String[] args) throws Exception {
Arrays.fill(payload, (byte) 1);
String msg = new String(payload, "UTF-8");
size += msg.length();
KeyedMessage<String, String> data = new KeyedMessage<String, String>(
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
topic, ip, msg);
producer.send(data);
}
Expand Down Expand Up @@ -238,13 +237,13 @@ private static void createTopics(List<String> topics, int partitions,
String zkConfig) throws InterruptedException {

ZkClient zkClient = createZkClient(zkConfig);
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);

try {
Properties props = new Properties();
int replicationFactor = 1;
for (String topic : topics) {
AdminUtils.createTopic(zkClient, topic, partitions,
replicationFactor, props);
AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, props, null);
}
} catch (TopicExistsException e) {
System.out.println(e.getMessage());
Expand Down

0 comments on commit 84a5c66

Please sign in to comment.