Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce thread counts, context switching and dropwizard #7

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
5 changes: 3 additions & 2 deletions deploy/configs/nonclustered.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ topicConfig:
ringBufferSize: 1024
outputParallelism: 4
batchSizeMB: 1
batchMilliSeconds: 50
storageHandlerName: devnull
batchMilliSeconds: 1
storageHandlerName: filesystem
storageHandlerConfig:
retryTimeoutMillis: 5000
storageDirs: /Volumes/RAMDisk/val
maxAttempts: 2
disableNotifications: true
bucket: <mybucket-name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pinterest.memq.client.producer.http.DaemonThreadFactory;
import com.pinterest.memq.client.producer.netty.MemqNettyProducer;
import com.pinterest.memq.commons.config.SSLConfig;
import com.pinterest.memq.commons.protocol.Broker;
Expand All @@ -53,6 +52,7 @@
import com.pinterest.memq.commons.protocol.TopicMetadata;
import com.pinterest.memq.commons.protocol.TopicMetadataRequestPacket;
import com.pinterest.memq.commons.protocol.TopicMetadataResponsePacket;
import com.pinterest.memq.core.utils.DaemonThreadFactory;
import com.pinterest.memq.core.utils.MemqUtils;

import io.netty.bootstrap.Bootstrap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -38,7 +39,6 @@
import com.pinterest.memq.client.commons2.network.netty.ClientChannelInitializer;
import com.pinterest.memq.client.commons2.retry.ExponentialBackoffRetryStrategy;
import com.pinterest.memq.client.commons2.retry.RetryStrategy;
import com.pinterest.memq.client.producer.http.DaemonThreadFactory;
import com.pinterest.memq.commons.config.SSLConfig;
import com.pinterest.memq.commons.protocol.RequestPacket;
import com.pinterest.memq.commons.protocol.RequestType;
Expand All @@ -57,9 +57,12 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ReferenceCountUtil;
import net.openhft.affinity.AffinityStrategies;
import net.openhft.affinity.AffinityThreadFactory;

// No thread-safety guarantees
public class NetworkClient implements Closeable {
private static final int DEFAULT_EVENT_LOOP_THREADS = 1;
private static final Logger logger = LoggerFactory.getLogger(NetworkClient.class);
public static final String CONFIG_INITIAL_RETRY_INTERVAL_MS = "initialRetryIntervalMs";
public static final String CONFIG_MAX_RETRY_COUNT = "maxRetryCount";
Expand Down Expand Up @@ -107,15 +110,17 @@ public NetworkClient(Properties properties, SSLConfig sslConfig) {
}
this.responseHandler = new ResponseHandler();
bootstrap = new Bootstrap();
ThreadFactory threadFactory = new AffinityThreadFactory("atf_wrk", true, AffinityStrategies.DIFFERENT_CORE);
if (Epoll.isAvailable()) {
eventLoopGroup = new EpollEventLoopGroup(1, new DaemonThreadFactory("MemqCommonClientNettyGroup"));
eventLoopGroup = new EpollEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS, threadFactory);
bootstrap.channel(EpollSocketChannel.class);
} else {
eventLoopGroup = new NioEventLoopGroup(1, new DaemonThreadFactory("MemqCommonClientNettyGroup"));
eventLoopGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS, threadFactory);
bootstrap.channel(NioSocketChannel.class);
}
bootstrap.group(eventLoopGroup);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new ClientChannelInitializer(responseHandler, sslConfig, idleTimeoutMs));
ScheduledThreadPoolExecutor tmpScheduler = new ScheduledThreadPoolExecutor(1);
tmpScheduler.setRemoveOnCancelPolicy(true);
Expand Down Expand Up @@ -159,7 +164,12 @@ public CompletableFuture<ResponsePacket> send(InetSocketAddress socketAddress, R
try {
buffer = PooledByteBufAllocator.DEFAULT.buffer(request.getSize(RequestType.PROTOCOL_VERSION));
request.write(buffer, RequestType.PROTOCOL_VERSION);
long ts = System.currentTimeMillis();
channelFuture.channel().writeAndFlush(buffer);
ts = System.currentTimeMillis() - ts;
if (ts>2) {
System.out.println("Network dispatch delay:"+ts+"ms size:"+request.getSize(RequestType.PROTOCOL_VERSION));
}
} catch (Exception e) {
logger.warn("Failed to write request " + request.getClientRequestId(), e);
ReferenceCountUtil.release(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.pinterest.memq.client.producer.http.DaemonThreadFactory;
import com.pinterest.memq.commons.MemqLogMessage;
import com.pinterest.memq.core.utils.DaemonThreadFactory;

public class KafkaNotificationSource {

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import com.pinterest.memq.client.commons2.MemqCommonClient;
import com.pinterest.memq.client.producer.MemqProducer;
import com.pinterest.memq.client.producer.TaskRequest;
import com.pinterest.memq.client.producer.http.DaemonThreadFactory;
import com.pinterest.memq.commons.config.SSLConfig;
import com.pinterest.memq.commons.protocol.Broker;
import com.pinterest.memq.commons.protocol.TopicMetadata;
import com.pinterest.memq.core.utils.DaemonThreadFactory;

public class MemqNettyProducer<H, T> extends MemqProducer<H, T> {

Expand Down
9 changes: 6 additions & 3 deletions memq-commons/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.pinterest.memq</groupId>
Expand Down Expand Up @@ -82,6 +80,11 @@
<artifactId>metrics-core</artifactId>
<version>4.1.17</version>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>affinity</artifactId>
<version>3.21ea83</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -34,10 +36,12 @@
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import com.pinterest.memq.commons.mon.OpenTSDBClient.MetricsBuffer;
import com.pinterest.memq.core.utils.DaemonThreadFactory;

public class OpenTSDBReporter extends ScheduledReporter {

private static final Logger logger = Logger.getLogger(OpenTSDBClient.class.getName());
private static ScheduledExecutorService ES = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
private OpenTSDBClient client;
private String[] tags;
private String baseName;
Expand All @@ -52,7 +56,7 @@ protected OpenTSDBReporter(String baseName,
OpenTSDBClient client,
String localHostAddress,
Map<String, Object> tags) throws UnknownHostException {
super(registry, registryName, filter, rateUnit, durationUnit);
super(registry, registryName, filter, rateUnit, durationUnit, ES);
if (baseName == null || baseName.isEmpty()) {
this.baseName = "";
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,29 @@
package com.pinterest.memq.core.utils;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class DaemonThreadFactory implements ThreadFactory {

public static DaemonThreadFactory INSTANCE = new DaemonThreadFactory();

private String basename;
private AtomicInteger counter;

public DaemonThreadFactory() {
this("common");
}

public DaemonThreadFactory(String basename) {
this.basename = basename;
this.counter = new AtomicInteger();
}

@Override
public Thread newThread(Runnable r) {
Thread th = new Thread(r);
th.setDaemon(true);
th.setName(basename + "-" + counter.getAndIncrement());
return th;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.pinterest.memq.client.commons.Compression;
import com.pinterest.memq.client.commons.serde.ByteArraySerializer;
import com.pinterest.memq.client.producer.MemqWriteResult;
Expand All @@ -37,23 +41,39 @@ public class ExampleMemqProducer {

public static void main(String[] args) throws IOException, InterruptedException,
ExecutionException {
MetricRegistry reg = new MetricRegistry();
Timer totalLatency = reg.timer("totalLatency");
Timer messageLatency = reg.timer("messageLatency");
int nThreads = 1;
if (args.length > 0) {
nThreads = Integer.parseInt(args[0]);
}
ExecutorService es = Executors.newFixedThreadPool(nThreads, new ThreadFactory() {
String hostport = "127.0.0.1:9092";
if (args.length > 1) {
hostport = args[1];
}
final String conn = hostport;
ThreadFactory tf = new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {
Thread th = new Thread(r);
th.setDaemon(true);
return th;
}
});
};
ExecutorService es = Executors.newFixedThreadPool(nThreads, tf);
ScheduledExecutorService bg = Executors.newScheduledThreadPool(1, tf);
bg.scheduleAtFixedRate(() -> {
System.out.print("\r" + messageLatency.getSnapshot().getMax() / 1000_000 + " "
+ messageLatency.getSnapshot().get99thPercentile() / 1000_000 + " "
+ messageLatency.getSnapshot().getMean() / 1000_000);
}, 1, 1, TimeUnit.SECONDS);

String pathname = "/tmp/memq_serverset";
PrintWriter pr = new PrintWriter(new File(pathname));
String s = "{\"az\": \"us-east-1a\", \"ip\": \"127.0.0.1\", \"port\": \"8080\", \"stage_name\": \"prototype\", \"version\": \"none\", \"weight\": 1}";
String s = "{\"az\": \"us-east-1a\", \"ip\": \"" + hostport.split(":")[0]
+ "\", \"port\": \"8080\", \"stage_name\": \"prototype\", \"version\": \"none\", \"weight\": 1}";
pr.println(s);
pr.close();
for (int x = 0; x < nThreads; x++) {
Expand All @@ -64,27 +84,28 @@ public Thread newThread(Runnable r) {
MemqProducer<byte[], byte[]> instance = new MemqProducer.Builder<byte[], byte[]>()
.disableAcks(false).keySerializer(new ByteArraySerializer())
.valueSerializer(new ByteArraySerializer()).topic(topicName).cluster("local")
.compression(Compression.ZSTD).maxPayloadBytes(1024 * 150)
.bootstrapServers("127.0.0.1:9092").build();
.compression(Compression.NONE).maxPayloadBytes(1024 * 10).maxInflightRequests(60)
.bootstrapServers(conn).build();
StringBuilder builder = new StringBuilder();
while (builder.length() < 1024 * 100) {
while (builder.length() < 1024 * 5) {
builder.append(UUID.randomUUID().toString());
}

byte[] bytes = builder.toString().getBytes("utf-8");
for (int i = 0; i < 5000; i++) {
long ts = System.currentTimeMillis();
for (int i = 0; i < 500000; i++) {
Context time = totalLatency.time();
List<Future<MemqWriteResult>> result = new ArrayList<>();
for (int k = 0; k < 30; k++) {
Future<MemqWriteResult> writeToTopic = instance.write(null, bytes, System.nanoTime());
result.add(writeToTopic);
}
instance.flush();
for (Future<MemqWriteResult> future : result) {
future.get();
MemqWriteResult memqWriteResult = future.get();
int ackLatency = memqWriteResult.getAckLatency();
messageLatency.update(ackLatency, TimeUnit.MILLISECONDS);
}
ts = System.currentTimeMillis() - ts;
System.out.println(ts + "ms");
time.stop();
}
} catch (Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.pinterest.memq.client.examples;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public class KafkaProducerTest {

public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "-1");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.LINGER_MS_CONFIG, "0");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 50; i++) {
builder.append(UUID.randomUUID().toString());
}
byte[] bytes = builder.toString().getBytes();
System.out.println(bytes.length);
while (true) {
long ts = System.nanoTime();
producer.send(new ProducerRecord<byte[], byte[]>("test", bytes)).get();
System.out.println((System.nanoTime() - ts) / (1000)+"us");
}
}

}
Loading