diff --git a/pom.xml b/pom.xml
index 06eddf8..0f5fd26 100644
--- a/pom.xml
+++ b/pom.xml
@@ -67,8 +67,8 @@
io.netty
- netty
- 3.10.6.Final
+ netty-all
+ 4.1.12.Final
com.google.guava
diff --git a/src/main/java/com/metamx/http/client/CredentialedHttpClient.java b/src/main/java/com/metamx/http/client/CredentialedHttpClient.java
index daba088..0bed984 100644
--- a/src/main/java/com/metamx/http/client/CredentialedHttpClient.java
+++ b/src/main/java/com/metamx/http/client/CredentialedHttpClient.java
@@ -22,11 +22,8 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.http.client.auth.Credentials;
import com.metamx.http.client.response.HttpResponseHandler;
-import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
-import java.net.URL;
-
/**
*/
public class CredentialedHttpClient extends AbstractHttpClient
diff --git a/src/main/java/com/metamx/http/client/HttpClientConfig.java b/src/main/java/com/metamx/http/client/HttpClientConfig.java
index 7d18c7b..76074b2 100644
--- a/src/main/java/com/metamx/http/client/HttpClientConfig.java
+++ b/src/main/java/com/metamx/http/client/HttpClientConfig.java
@@ -56,7 +56,7 @@ public String getEncodingString()
*
* @return encoding name
*/
- public abstract String getEncodingString();
+ public abstract String getEncodingString(); /*TODO use for Content-Encoding*/
}
public static final CompressionCodec DEFAULT_COMPRESSION_CODEC = CompressionCodec.GZIP;
diff --git a/src/main/java/com/metamx/http/client/HttpClientInit.java b/src/main/java/com/metamx/http/client/HttpClientInit.java
index 1cb5650..1e48b91 100644
--- a/src/main/java/com/metamx/http/client/HttpClientInit.java
+++ b/src/main/java/com/metamx/http/client/HttpClientInit.java
@@ -24,21 +24,17 @@
import com.metamx.http.client.pool.ChannelResourceFactory;
import com.metamx.http.client.pool.ResourcePool;
import com.metamx.http.client.pool.ResourcePoolConfig;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.socket.nio.NioClientBossPool;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioWorkerPool;
-import org.jboss.netty.logging.InternalLoggerFactory;
-import org.jboss.netty.logging.Log4JLoggerFactory;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import org.jboss.netty.util.Timer;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import io.netty.util.internal.logging.Log4JLoggerFactory;
import org.joda.time.Duration;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
@@ -46,7 +42,6 @@
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
/**
*/
@@ -55,47 +50,18 @@ public class HttpClientInit
public static HttpClient createClient(HttpClientConfig config, Lifecycle lifecycle)
{
try {
- // We need to use the full constructor in order to set a ThreadNameDeterminer. The other parameters are taken
- // from the defaults in HashedWheelTimer's other constructors.
- final HashedWheelTimer timer = new HashedWheelTimer(
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("HttpClient-Timer-%s")
- .build(),
- ThreadNameDeterminer.CURRENT,
- 100,
- TimeUnit.MILLISECONDS,
- 512
- );
- lifecycle.addMaybeStartHandler(
- new Lifecycle.Handler()
- {
- @Override
- public void start() throws Exception
- {
- timer.start();
- }
-
- @Override
- public void stop()
- {
- timer.stop();
- }
- }
- );
return lifecycle.addMaybeStartManagedInstance(
new NettyHttpClient(
new ResourcePool<>(
new ChannelResourceFactory(
- createBootstrap(lifecycle, timer, config.getBossPoolSize(), config.getWorkerPoolSize()),
+ createBootstrap(lifecycle, config.getBossPoolSize(), config.getWorkerPoolSize()),
config.getSslContext(),
- timer,
config.getSslHandshakeTimeout() == null ? -1 : config.getSslHandshakeTimeout().getMillis()
),
new ResourcePoolConfig(config.getNumConnections())
),
config.getReadTimeout(),
- config.getCompressionCodec(),
- timer
+ config.getCompressionCodec()
)
);
}
@@ -114,17 +80,10 @@ public static HttpClient createClient(ResourcePoolConfig config, final SSLContex
}
@Deprecated // use createClient directly
- public static ClientBootstrap createBootstrap(Lifecycle lifecycle, Timer timer)
+ public static Bootstrap createBootstrap(Lifecycle lifecycle)
{
final HttpClientConfig defaultConfig = HttpClientConfig.builder().build();
- return createBootstrap(lifecycle, timer, defaultConfig.getBossPoolSize(), defaultConfig.getWorkerPoolSize());
- }
-
- @Deprecated // use createClient directly
- public static ClientBootstrap createBootstrap(Lifecycle lifecycle)
- {
- final Timer timer = new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).build());
- return createBootstrap(lifecycle, timer);
+ return createBootstrap(lifecycle, defaultConfig.getBossPoolSize(), defaultConfig.getWorkerPoolSize());
}
public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath, final String keyStorePassword)
@@ -144,22 +103,7 @@ public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath
return sslContext;
}
- catch (CertificateException e) {
- throw Throwables.propagate(e);
- }
- catch (NoSuchAlgorithmException e) {
- throw Throwables.propagate(e);
- }
- catch (KeyStoreException e) {
- throw Throwables.propagate(e);
- }
- catch (KeyManagementException e) {
- throw Throwables.propagate(e);
- }
- catch (FileNotFoundException e) {
- throw Throwables.propagate(e);
- }
- catch (IOException e) {
+ catch (CertificateException | NoSuchAlgorithmException | KeyStoreException | KeyManagementException | IOException e) {
throw Throwables.propagate(e);
}
finally {
@@ -167,37 +111,25 @@ public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath
}
}
- private static ClientBootstrap createBootstrap(Lifecycle lifecycle, Timer timer, int bossPoolSize, int workerPoolSize)
+ private static Bootstrap createBootstrap(Lifecycle lifecycle, int bossPoolSize, int workerPoolSize)
{
- final NioClientBossPool bossPool = new NioClientBossPool(
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("HttpClient-Netty-Boss-%s")
- .build()
- ),
+ final NioEventLoopGroup group = new NioEventLoopGroup(
bossPoolSize,
- timer,
- ThreadNameDeterminer.CURRENT
- );
-
- final NioWorkerPool workerPool = new NioWorkerPool(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
- .setNameFormat("HttpClient-Netty-Worker-%s")
+ .setNameFormat("HttpClient-Netty-Client-%s")
.build()
- ),
- workerPoolSize,
- ThreadNameDeterminer.CURRENT
+ )
);
- final ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(bossPool, workerPool));
-
- bootstrap.setOption("keepAlive", true);
- bootstrap.setPipelineFactory(new HttpClientPipelineFactory());
+ final Bootstrap bootstrap = new Bootstrap()
+ .group(group)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .handler(new HttpClientPipelineFactory());
- InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
+ InternalLoggerFactory.setDefaultFactory(Log4JLoggerFactory.INSTANCE);
try {
lifecycle.addMaybeStartHandler(
@@ -211,7 +143,6 @@ public void start() throws Exception
@Override
public void stop()
{
- bootstrap.releaseExternalResources();
}
}
);
diff --git a/src/main/java/com/metamx/http/client/NettyHttpClient.java b/src/main/java/com/metamx/http/client/NettyHttpClient.java
index eaf201b..bf95b2e 100644
--- a/src/main/java/com/metamx/http/client/NettyHttpClient.java
+++ b/src/main/java/com/metamx/http/client/NettyHttpClient.java
@@ -30,24 +30,21 @@
import com.metamx.http.client.pool.ResourcePool;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.HttpResponseHandler;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelException;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
-import org.jboss.netty.handler.codec.http.HttpChunk;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.Timer;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelException;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.AsciiString;
import org.joda.time.Duration;
import java.net.URL;
@@ -64,33 +61,24 @@ public class NettyHttpClient extends AbstractHttpClient
private static final String READ_TIMEOUT_HANDLER_NAME = "read-timeout";
private static final String LAST_HANDLER_NAME = "last-handler";
- private final Timer timer;
private final ResourcePool pool;
private final HttpClientConfig.CompressionCodec compressionCodec;
private final Duration defaultReadTimeout;
- public NettyHttpClient(
- ResourcePool pool
- )
+ public NettyHttpClient(ResourcePool pool)
{
- this(pool, null, HttpClientConfig.DEFAULT_COMPRESSION_CODEC, null);
+ this(pool, null, HttpClientConfig.DEFAULT_COMPRESSION_CODEC);
}
NettyHttpClient(
ResourcePool pool,
Duration defaultReadTimeout,
- HttpClientConfig.CompressionCodec compressionCodec,
- Timer timer
+ HttpClientConfig.CompressionCodec compressionCodec
)
{
this.pool = Preconditions.checkNotNull(pool, "pool");
this.defaultReadTimeout = defaultReadTimeout;
this.compressionCodec = Preconditions.checkNotNull(compressionCodec);
- this.timer = timer;
-
- if (defaultReadTimeout != null && defaultReadTimeout.getMillis() > 0) {
- Preconditions.checkNotNull(timer, "timer");
- }
}
@LifecycleStart
@@ -106,12 +94,7 @@ public void stop()
public HttpClient withReadTimeout(Duration readTimeout)
{
- return new NettyHttpClient(pool, readTimeout, compressionCodec, timer);
- }
-
- public NettyHttpClient withTimer(Timer timer)
- {
- return new NettyHttpClient(pool, defaultReadTimeout, compressionCodec, timer);
+ return new NettyHttpClient(pool, readTimeout, compressionCodec);
}
@Override
@@ -123,7 +106,7 @@ public ListenableFuture go(
{
final HttpMethod method = request.getMethod();
final URL url = request.getUrl();
- final Multimap headers = request.getHeaders();
+ final Multimap headers = request.getHeaders();
final String requestDesc = String.format("%s %s", method, url);
if (log.isDebugEnabled()) {
@@ -140,112 +123,105 @@ public ListenableFuture go(
return Futures.immediateFailedFuture(
new ChannelException(
"Faulty channel in resource pool",
- channelFuture.getCause()
+ channelFuture.cause()
)
);
} else {
- channel = channelFuture.getChannel();
+ channel = channelFuture.channel();
}
final String urlFile = Strings.nullToEmpty(url.getFile());
- final HttpRequest httpRequest = new DefaultHttpRequest(
- HttpVersion.HTTP_1_1,
- method,
- urlFile.isEmpty() ? "/" : urlFile
- );
+ String uri;
+ if (urlFile.isEmpty()) {
+ uri = "/";
+ } else {
+ uri = urlFile;
+ }
+ final DefaultFullHttpRequest httpRequest;
+ if (request.hasContent()) {
+ httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri, request.getContent());
+ } else {
+ httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri);
+ }
- if (!headers.containsKey(HttpHeaders.Names.HOST)) {
- httpRequest.headers().add(HttpHeaders.Names.HOST, getHost(url));
+ if (!headers.containsKey(HttpHeaderNames.HOST)) {
+ httpRequest.headers().add(HttpHeaderNames.HOST, getHost(url));
}
// If Accept-Encoding is set in the Request, use that. Otherwise use the default from "compressionCodec".
- if (!headers.containsKey(HttpHeaders.Names.ACCEPT_ENCODING)) {
- httpRequest.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, compressionCodec.getEncodingString());
+ if (!headers.containsKey(HttpHeaderNames.ACCEPT_ENCODING)) {
+ httpRequest.headers().set(HttpHeaderNames.ACCEPT_ENCODING, compressionCodec.getEncodingString());
}
- for (Map.Entry> entry : headers.asMap().entrySet()) {
- String key = entry.getKey();
+ for (Map.Entry> entry : headers.asMap().entrySet()) {
+ AsciiString key = entry.getKey();
for (String obj : entry.getValue()) {
httpRequest.headers().add(key, obj);
}
}
- if (request.hasContent()) {
- httpRequest.setContent(request.getContent());
- }
-
final long readTimeout = getReadTimeout(requestReadTimeout);
final SettableFuture retVal = SettableFuture.create();
if (readTimeout > 0) {
- channel.getPipeline().addLast(
- READ_TIMEOUT_HANDLER_NAME,
- new ReadTimeoutHandler(timer, readTimeout, TimeUnit.MILLISECONDS)
- );
+ channel.pipeline().addLast(READ_TIMEOUT_HANDLER_NAME, new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS));
}
-
- channel.getPipeline().addLast(
+ channel.pipeline().addLast(
LAST_HANDLER_NAME,
- new SimpleChannelUpstreamHandler()
+ new SimpleChannelInboundHandler()
{
- private volatile ClientResponse response = null;
+ private ClientResponse response = null;
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception
{
if (log.isDebugEnabled()) {
- log.debug("[%s] messageReceived: %s", requestDesc, e.getMessage());
+ log.debug("[%s] messageReceived: %s", requestDesc, msg);
}
try {
- Object msg = e.getMessage();
-
if (msg instanceof HttpResponse) {
HttpResponse httpResponse = (HttpResponse) msg;
if (log.isDebugEnabled()) {
- log.debug("[%s] Got response: %s", requestDesc, httpResponse.getStatus());
+ log.debug("[%s] Got response: %s", requestDesc, httpResponse.status());
}
response = handler.handleResponse(httpResponse);
if (response.isFinished()) {
retVal.set((Final) response.getObj());
}
-
- if (!httpResponse.isChunked()) {
- finishRequest();
- }
- } else if (msg instanceof HttpChunk) {
- HttpChunk httpChunk = (HttpChunk) msg;
+ }
+ if (msg instanceof HttpContent) {
+ HttpContent httpChunk = (HttpContent) msg;
+ boolean isLast = httpChunk instanceof LastHttpContent;
if (log.isDebugEnabled()) {
log.debug(
"[%s] Got chunk: %sB, last=%s",
requestDesc,
- httpChunk.getContent().readableBytes(),
- httpChunk.isLast()
+ httpChunk.content().readableBytes(),
+ isLast
);
}
-
- if (httpChunk.isLast()) {
+ response = handler.handleChunk(response, httpChunk);
+ if (response.isFinished()) {
+ retVal.set((Final) response.getObj());
+ }
+ if (isLast) {
finishRequest();
- } else {
- response = handler.handleChunk(response, httpChunk);
- if (response.isFinished() && !retVal.isDone()) {
- retVal.set((Final) response.getObj());
- }
}
- } else {
+ }
+ if (!(msg instanceof HttpContent) && !(msg instanceof HttpResponse)) {
throw new IllegalStateException(String.format("Unknown message type[%s]", msg.getClass()));
}
+
}
catch (Exception ex) {
log.warn(ex, "[%s] Exception thrown while processing message, closing channel.", requestDesc);
-
if (!retVal.isDone()) {
- retVal.set(null);
+ retVal.setException(ex);
}
channel.close();
channelResourceContainer.returnResource();
-
throw ex;
}
}
@@ -270,21 +246,19 @@ private void finishRequest()
}
@Override
- public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws Exception
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
if (log.isDebugEnabled()) {
- final Throwable cause = event.getCause();
if (cause == null) {
log.debug("[%s] Caught exception", requestDesc);
} else {
log.debug(cause, "[%s] Caught exception", requestDesc);
}
}
-
- retVal.setException(event.getCause());
+ retVal.setException(cause);
// response is non-null if we received initial chunk and then exception occurs
if (response != null) {
- handler.exceptionCaught(response, event.getCause());
+ handler.exceptionCaught(response, cause);
}
removeHandlers();
try {
@@ -296,12 +270,12 @@ public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event)
finally {
channelResourceContainer.returnResource();
}
-
- context.sendUpstream(event);
+ super.exceptionCaught(ctx, cause);
}
+
@Override
- public void channelDisconnected(ChannelHandlerContext context, ChannelStateEvent event) throws Exception
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
if (log.isDebugEnabled()) {
log.debug("[%s] Channel disconnected", requestDesc);
@@ -316,20 +290,20 @@ public void channelDisconnected(ChannelHandlerContext context, ChannelStateEvent
log.warn("[%s] Channel disconnected before response complete", requestDesc);
retVal.setException(new ChannelException("Channel disconnected"));
}
- context.sendUpstream(event);
+ super.channelInactive(ctx);
}
private void removeHandlers()
{
if (readTimeout > 0) {
- channel.getPipeline().remove(READ_TIMEOUT_HANDLER_NAME);
+ channel.pipeline().remove(READ_TIMEOUT_HANDLER_NAME);
}
- channel.getPipeline().remove(LAST_HANDLER_NAME);
+ channel.pipeline().remove(LAST_HANDLER_NAME);
}
}
);
- channel.write(httpRequest).addListener(
+ channel.writeAndFlush(httpRequest).addListener(
new ChannelFutureListener()
{
@Override
@@ -342,7 +316,7 @@ public void operationComplete(ChannelFuture future) throws Exception
retVal.setException(
new ChannelException(
String.format("[%s] Failed to write request to channel", requestDesc),
- future.getCause()
+ future.cause()
)
);
}
@@ -364,13 +338,7 @@ private long getReadTimeout(Duration requestReadTimeout)
} else {
timeout = 0;
}
-
- if (timeout > 0 && timer == null) {
- log.warn("Cannot time out requests without a timer! Disabling timeout for this request.");
- return 0;
- } else {
- return timeout;
- }
+ return timeout;
}
private String getHost(URL url)
diff --git a/src/main/java/com/metamx/http/client/Request.java b/src/main/java/com/metamx/http/client/Request.java
index b846f93..3ed2ed3 100644
--- a/src/main/java/com/metamx/http/client/Request.java
+++ b/src/main/java/com/metamx/http/client/Request.java
@@ -22,16 +22,13 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
-import com.metamx.http.client.response.HttpResponseHandler;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferFactory;
-import org.jboss.netty.buffer.HeapChannelBufferFactory;
-import org.jboss.netty.handler.codec.base64.Base64;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.base64.Base64;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.util.AsciiString;
import java.net.URL;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -41,12 +38,10 @@
*/
public class Request
{
- private static final ChannelBufferFactory factory = HeapChannelBufferFactory.getInstance();
-
private final HttpMethod method;
private final URL url;
- private final Multimap headers = Multimaps.newListMultimap(
- Maps.>newHashMap(),
+ private final Multimap headers = Multimaps.newListMultimap(
+ Maps.>newHashMap(),
new Supplier>() {
@Override
public List get() {
@@ -55,7 +50,7 @@ public List get() {
}
);
- private ChannelBuffer content;
+ private ByteBuf content;
public Request(
HttpMethod method,
@@ -76,7 +71,7 @@ public URL getUrl()
return url;
}
- public Multimap getHeaders()
+ public Multimap getHeaders()
{
return headers;
}
@@ -86,7 +81,7 @@ public boolean hasContent()
return content != null;
}
- public ChannelBuffer getContent()
+ public ByteBuf getContent()
{
return content;
}
@@ -98,39 +93,39 @@ public Request copy() {
return retVal;
}
- public Request setHeader(String header, String value)
+ public Request setHeader(AsciiString header, String value)
{
headers.replaceValues(header, Arrays.asList(value));
return this;
}
- public Request setHeaderValues(String header, Iterable value)
+ public Request setHeaderValues(AsciiString header, Iterable value)
{
headers.replaceValues(header, value);
return this;
}
- public Request setHeaderValues(Multimap inHeaders) {
- for (Map.Entry> entry : inHeaders.asMap().entrySet()) {
+ public Request setHeaderValues(Multimap inHeaders) {
+ for (Map.Entry> entry : inHeaders.asMap().entrySet()) {
this.setHeaderValues(entry.getKey(), entry.getValue());
}
return this;
}
- public Request addHeader(String header, String value)
+ public Request addHeader(AsciiString header, String value)
{
headers.put(header, value);
return this;
}
- public Request addHeaderValues(String header, Iterable value)
+ public Request addHeaderValues(AsciiString header, Iterable value)
{
headers.putAll(header, value);
return this;
}
- public Request addHeaderValues(Multimap inHeaders) {
- for (Map.Entry> entry : inHeaders.asMap().entrySet()) {
+ public Request addHeaderValues(Multimap inHeaders) {
+ for (Map.Entry> entry : inHeaders.asMap().entrySet()) {
this.addHeaderValues(entry.getKey(), entry.getValue());
}
return this;
@@ -146,7 +141,7 @@ public Request setContent(byte[] bytes, int offset, int length)
return setContent(null, bytes, offset, length);
}
- public Request setContent(ChannelBuffer content)
+ public Request setContent(ByteBuf content)
{
return setContent(null, content);
}
@@ -158,18 +153,18 @@ public Request setContent(String contentType, byte[] bytes)
public Request setContent(String contentType, byte[] bytes, int offset, int length)
{
- return setContent(contentType, factory.getBuffer(bytes, offset, length));
+ return setContent(contentType, bytes, offset, length);
}
- public Request setContent(String contentType, ChannelBuffer content)
+ public Request setContent(String contentType, ByteBuf content)
{
if (contentType != null) {
- setHeader(HttpHeaders.Names.CONTENT_TYPE, contentType);
+ setHeader(HttpHeaderNames.CONTENT_TYPE, contentType);
}
this.content = content;
- setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(content.writerIndex()));
+ setHeader(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(content.writerIndex()));
return this;
}
@@ -177,16 +172,14 @@ public Request setContent(String contentType, ChannelBuffer content)
public Request setBasicAuthentication(String username, String password)
{
final String base64Value = base64Encode(String.format("%s:%s", username, password));
- setHeader(HttpHeaders.Names.AUTHORIZATION, String.format("Basic %s", base64Value));
+ setHeader(HttpHeaderNames.AUTHORIZATION, String.format("Basic %s", base64Value));
return this;
}
private String base64Encode(final String value)
{
- final ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance();
-
return Base64
- .encode(bufferFactory.getBuffer(ByteBuffer.wrap(value.getBytes(Charsets.UTF_8))), false)
+ .encode(Unpooled.wrappedBuffer(value.getBytes(Charsets.UTF_8)), false)
.toString(Charsets.UTF_8);
}
}
\ No newline at end of file
diff --git a/src/main/java/com/metamx/http/client/netty/HttpClientPipelineFactory.java b/src/main/java/com/metamx/http/client/netty/HttpClientPipelineFactory.java
index 51759ed..2a2f352 100644
--- a/src/main/java/com/metamx/http/client/netty/HttpClientPipelineFactory.java
+++ b/src/main/java/com/metamx/http/client/netty/HttpClientPipelineFactory.java
@@ -16,24 +16,21 @@
package com.metamx.http.client.netty;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.DefaultChannelPipeline;
-import org.jboss.netty.handler.codec.http.HttpClientCodec;
-import org.jboss.netty.handler.codec.http.HttpContentDecompressor;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpContentDecompressor;
/**
*/
-public class HttpClientPipelineFactory implements ChannelPipelineFactory
+public class HttpClientPipelineFactory extends ChannelInitializer
{
@Override
- public ChannelPipeline getPipeline() throws Exception
+ protected void initChannel(SocketChannel ch) throws Exception
{
- ChannelPipeline pipeline = new DefaultChannelPipeline();
-
+ ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec", new HttpClientCodec());
pipeline.addLast("inflater", new HttpContentDecompressor());
-
- return pipeline;
}
}
diff --git a/src/main/java/com/metamx/http/client/pool/ChannelResourceFactory.java b/src/main/java/com/metamx/http/client/pool/ChannelResourceFactory.java
index 0a50bf4..8f057b8 100644
--- a/src/main/java/com/metamx/http/client/pool/ChannelResourceFactory.java
+++ b/src/main/java/com/metamx/http/client/pool/ChannelResourceFactory.java
@@ -18,23 +18,22 @@
import com.google.common.base.Preconditions;
import com.metamx.common.logger.Logger;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelException;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.ssl.ImmediateExecutor;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.util.Timer;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelException;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
/**
*/
@@ -44,26 +43,19 @@ public class ChannelResourceFactory implements ResourceFactory= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT;
-
- if (sslContext != null) {
- Preconditions.checkNotNull(timer, "timer is required when sslContext is present");
- }
}
@Override
@@ -93,21 +85,13 @@ public ChannelFuture generate(final String hostname)
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
sslEngine.setSSLParameters(sslParameters);
sslEngine.setUseClientMode(true);
- final SslHandler sslHandler = new SslHandler(
- sslEngine,
- SslHandler.getDefaultBufferPool(),
- false,
- timer,
- sslHandshakeTimeout
- );
-
- // https://github.com/netty/netty/issues/160
- sslHandler.setCloseOnSSLException(true);
+ final SslHandler sslHandler = new SslHandler(sslEngine);
+ sslHandler.setHandshakeTimeoutMillis(sslHandshakeTimeout);
- final ChannelPipeline pipeline = connectFuture.getChannel().getPipeline();
+ final ChannelPipeline pipeline = connectFuture.channel().pipeline();
pipeline.addFirst("ssl", sslHandler);
- final ChannelFuture handshakeFuture = Channels.future(connectFuture.getChannel());
+ final ChannelPromise handshakeFuture = connectFuture.channel().newPromise();
connectFuture.addListener(
new ChannelFutureListener()
{
@@ -115,11 +99,11 @@ public ChannelFuture generate(final String hostname)
public void operationComplete(ChannelFuture f) throws Exception
{
if (f.isSuccess()) {
- sslHandler.handshake().addListener(
- new ChannelFutureListener()
+ sslHandler.handshakeFuture().addListener(
+ new GenericFutureListener>()
{
@Override
- public void operationComplete(ChannelFuture f2) throws Exception
+ public void operationComplete(Future f2) throws Exception
{
if (f2.isSuccess()) {
handshakeFuture.setSuccess();
@@ -127,7 +111,7 @@ public void operationComplete(ChannelFuture f2) throws Exception
handshakeFuture.setFailure(
new ChannelException(
String.format("Failed to handshake with host[%s]", hostname),
- f2.getCause()
+ f2.cause()
)
);
}
@@ -138,7 +122,7 @@ public void operationComplete(ChannelFuture f2) throws Exception
handshakeFuture.setFailure(
new ChannelException(
String.format("Failed to connect to host[%s]", hostname),
- f.getCause()
+ f.cause()
)
);
}
@@ -157,23 +141,22 @@ public void operationComplete(ChannelFuture f2) throws Exception
@Override
public boolean isGood(ChannelFuture resource)
{
- Channel channel = resource.awaitUninterruptibly().getChannel();
+ Channel channel = resource.awaitUninterruptibly().channel();
boolean isSuccess = resource.isSuccess();
- boolean isConnected = channel.isConnected();
boolean isOpen = channel.isOpen();
if (log.isTraceEnabled()) {
- log.trace("isGood = isSucess[%s] && isConnected[%s] && isOpen[%s]", isSuccess, isConnected, isOpen);
+ log.trace("isGood = isSucess[%s] && isOpen[%s]", isSuccess, isOpen);
}
- return isSuccess && isConnected && isOpen;
+ return isSuccess && isOpen;
}
@Override
public void close(ChannelFuture resource)
{
log.trace("Closing");
- resource.awaitUninterruptibly().getChannel().close();
+ resource.awaitUninterruptibly().channel().close();
}
}
diff --git a/src/main/java/com/metamx/http/client/response/FullResponseHandler.java b/src/main/java/com/metamx/http/client/response/FullResponseHandler.java
index 992bdb8..9351420 100644
--- a/src/main/java/com/metamx/http/client/response/FullResponseHandler.java
+++ b/src/main/java/com/metamx/http/client/response/FullResponseHandler.java
@@ -16,9 +16,10 @@
package com.metamx.http.client.response;
-import org.jboss.netty.handler.codec.http.HttpChunk;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpResponse;
import java.nio.charset.Charset;
/**
@@ -35,11 +36,12 @@ public FullResponseHandler(Charset charset)
@Override
public ClientResponse handleResponse(HttpResponse response)
{
+ ByteBuf content = response instanceof HttpContent ? ((HttpContent) response).content() : Unpooled.EMPTY_BUFFER;
return ClientResponse.unfinished(
new FullResponseHolder(
- response.getStatus(),
+ response.status(),
response,
- new StringBuilder(response.getContent().toString(charset))
+ new StringBuilder(content.toString(charset))
)
);
}
@@ -47,7 +49,7 @@ public ClientResponse handleResponse(HttpResponse response)
@Override
public ClientResponse handleChunk(
ClientResponse response,
- HttpChunk chunk
+ HttpContent chunk
)
{
final StringBuilder builder = response.getObj().getBuilder();
@@ -56,7 +58,7 @@ public ClientResponse handleChunk(
return ClientResponse.finished(null);
}
- builder.append(chunk.getContent().toString(charset));
+ builder.append(chunk.content().toString(charset));
return response;
}
diff --git a/src/main/java/com/metamx/http/client/response/FullResponseHolder.java b/src/main/java/com/metamx/http/client/response/FullResponseHolder.java
index 4ab8af4..c16282a 100644
--- a/src/main/java/com/metamx/http/client/response/FullResponseHolder.java
+++ b/src/main/java/com/metamx/http/client/response/FullResponseHolder.java
@@ -16,8 +16,8 @@
package com.metamx.http.client.response;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
/**
*/
diff --git a/src/main/java/com/metamx/http/client/response/HttpResponseHandler.java b/src/main/java/com/metamx/http/client/response/HttpResponseHandler.java
index 412a4a3..2f47aac 100644
--- a/src/main/java/com/metamx/http/client/response/HttpResponseHandler.java
+++ b/src/main/java/com/metamx/http/client/response/HttpResponseHandler.java
@@ -16,8 +16,8 @@
package com.metamx.http.client.response;
-import org.jboss.netty.handler.codec.http.HttpChunk;
-import org.jboss.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpResponse;
/**
* A handler for an HTTP request.
@@ -45,7 +45,7 @@ public interface HttpResponseHandler
* @return
*/
public ClientResponse handleResponse(HttpResponse response);
- public ClientResponse handleChunk(ClientResponse clientResponse, HttpChunk chunk);
+ public ClientResponse handleChunk(ClientResponse clientResponse, HttpContent chunk);
public ClientResponse done(ClientResponse clientResponse);
public void exceptionCaught(ClientResponse clientResponse,Throwable e);
}
diff --git a/src/main/java/com/metamx/http/client/response/InputStreamResponseHandler.java b/src/main/java/com/metamx/http/client/response/InputStreamResponseHandler.java
index 3246180..cfdc773 100644
--- a/src/main/java/com/metamx/http/client/response/InputStreamResponseHandler.java
+++ b/src/main/java/com/metamx/http/client/response/InputStreamResponseHandler.java
@@ -16,12 +16,11 @@
package com.metamx.http.client.response;
-import com.google.common.base.Throwables;
import com.metamx.http.client.io.AppendableByteArrayInputStream;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.handler.codec.http.HttpChunk;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpResponse;
import java.io.InputStream;
/**
@@ -31,17 +30,18 @@ public class InputStreamResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response)
{
+ ByteBuf content = response instanceof HttpContent ? ((HttpContent) response).content() : Unpooled.EMPTY_BUFFER;
AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
- in.add(getContentBytes(response.getContent()));
+ in.add(getBytes(content));
return ClientResponse.finished(in);
}
@Override
public ClientResponse handleChunk(
- ClientResponse clientResponse, HttpChunk chunk
+ ClientResponse clientResponse, HttpContent chunk
)
{
- clientResponse.getObj().add(getContentBytes(chunk.getContent()));
+ clientResponse.getObj().add(getBytes(chunk.content()));
return clientResponse;
}
@@ -63,10 +63,11 @@ public void exceptionCaught(
obj.exceptionCaught(e);
}
- private byte[] getContentBytes(ChannelBuffer content)
+ private byte[] getBytes(ByteBuf content)
{
- byte[] contentBytes = new byte[content.readableBytes()];
- content.readBytes(contentBytes);
- return contentBytes;
+ byte[] bytes = new byte[content.readableBytes()];
+ int readerIndex = content.readerIndex();
+ content.getBytes(readerIndex, bytes);
+ return bytes;
}
}
diff --git a/src/main/java/com/metamx/http/client/response/SequenceInputStreamResponseHandler.java b/src/main/java/com/metamx/http/client/response/SequenceInputStreamResponseHandler.java
index e7166c2..5a79daa 100644
--- a/src/main/java/com/metamx/http/client/response/SequenceInputStreamResponseHandler.java
+++ b/src/main/java/com/metamx/http/client/response/SequenceInputStreamResponseHandler.java
@@ -19,11 +19,11 @@
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import com.metamx.common.logger.Logger;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.handler.codec.http.HttpChunk;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
@@ -54,15 +54,16 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response)
{
+ ByteBuf content = response instanceof HttpContent ? ((HttpContent) response).content() : Unpooled.EMPTY_BUFFER;
try {
- queue.put(new ChannelBufferInputStream(response.getContent()));
+ queue.put(new ByteBufInputStream(content));
}
catch (InterruptedException e) {
log.error(e, "Queue appending interrupted");
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
- byteCount.addAndGet(response.getContent().readableBytes());
+ byteCount.addAndGet(content.readableBytes());
return ClientResponse.finished(
new SequenceInputStream(
new Enumeration()
@@ -96,14 +97,12 @@ public InputStream nextElement()
@Override
public ClientResponse handleChunk(
- ClientResponse clientResponse, HttpChunk chunk
+ ClientResponse clientResponse, HttpContent chunk
)
{
- final ChannelBuffer channelBuffer = chunk.getContent();
- final int bytes = channelBuffer.readableBytes();
- if (bytes > 0) {
+ if (chunk.content().readableBytes() > 0) {
try {
- queue.put(new ChannelBufferInputStream(channelBuffer));
+ queue.put(new ByteBufInputStream(chunk.content()));
// Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong
log.debug("Added stream. Queue length %d", queue.size());
}
@@ -112,7 +111,7 @@ public ClientResponse handleChunk(
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
- byteCount.addAndGet(bytes);
+ byteCount.addAndGet(chunk.content().readableBytes());
} else {
log.debug("Skipping zero length chunk");
}
diff --git a/src/main/java/com/metamx/http/client/response/StatusResponseHandler.java b/src/main/java/com/metamx/http/client/response/StatusResponseHandler.java
index 7708dc0..5ad14c5 100644
--- a/src/main/java/com/metamx/http/client/response/StatusResponseHandler.java
+++ b/src/main/java/com/metamx/http/client/response/StatusResponseHandler.java
@@ -16,9 +16,10 @@
package com.metamx.http.client.response;
-import org.jboss.netty.handler.codec.http.HttpChunk;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpResponse;
import java.nio.charset.Charset;
/**
@@ -35,10 +36,11 @@ public StatusResponseHandler(Charset charset)
@Override
public ClientResponse handleResponse(HttpResponse response)
{
+ ByteBuf content = response instanceof HttpContent ? ((HttpContent) response).content() : Unpooled.EMPTY_BUFFER;
return ClientResponse.unfinished(
new StatusResponseHolder(
- response.getStatus(),
- new StringBuilder(response.getContent().toString(charset))
+ response.status(),
+ new StringBuilder(content.toString(charset))
)
);
}
@@ -46,7 +48,7 @@ public ClientResponse handleResponse(HttpResponse response
@Override
public ClientResponse handleChunk(
ClientResponse response,
- HttpChunk chunk
+ HttpContent chunk
)
{
final StringBuilder builder = response.getObj().getBuilder();
@@ -55,7 +57,7 @@ public ClientResponse handleChunk(
return ClientResponse.finished(null);
}
- builder.append(chunk.getContent().toString(charset));
+ builder.append(chunk.content().toString(charset));
return response;
}
diff --git a/src/main/java/com/metamx/http/client/response/StatusResponseHolder.java b/src/main/java/com/metamx/http/client/response/StatusResponseHolder.java
index 7ce3286..2bb280f 100644
--- a/src/main/java/com/metamx/http/client/response/StatusResponseHolder.java
+++ b/src/main/java/com/metamx/http/client/response/StatusResponseHolder.java
@@ -16,7 +16,7 @@
package com.metamx.http.client.response;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpResponseStatus;
/**
*/
diff --git a/src/main/java/com/metamx/http/client/response/ToStringResponseHandler.java b/src/main/java/com/metamx/http/client/response/ToStringResponseHandler.java
index 9b2ec21..f693e9d 100644
--- a/src/main/java/com/metamx/http/client/response/ToStringResponseHandler.java
+++ b/src/main/java/com/metamx/http/client/response/ToStringResponseHandler.java
@@ -16,8 +16,8 @@
package com.metamx.http.client.response;
-import org.jboss.netty.handler.codec.http.HttpChunk;
-import org.jboss.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpResponse;
import java.nio.charset.Charset;
@@ -35,13 +35,19 @@ public ToStringResponseHandler(Charset charset)
@Override
public ClientResponse handleResponse(HttpResponse response)
{
- return ClientResponse.unfinished(new StringBuilder(response.getContent().toString(charset)));
+ final StringBuilder builder;
+ if (response instanceof HttpContent) {
+ builder = new StringBuilder(((HttpContent) response).content().toString(charset));
+ } else {
+ builder = new StringBuilder();
+ }
+ return ClientResponse.unfinished(builder);
}
@Override
public ClientResponse handleChunk(
ClientResponse response,
- HttpChunk chunk
+ HttpContent chunk
)
{
final StringBuilder builder = response.getObj();
@@ -49,7 +55,7 @@ public ClientResponse handleChunk(
return ClientResponse.finished(null);
}
- builder.append(chunk.getContent().toString(charset));
+ builder.append(chunk.content().toString(charset));
return response;
}
diff --git a/src/test/java/com/metamx/http/client/FriendlyServersTest.java b/src/test/java/com/metamx/http/client/FriendlyServersTest.java
index bfa2107..52bd4bb 100644
--- a/src/test/java/com/metamx/http/client/FriendlyServersTest.java
+++ b/src/test/java/com/metamx/http/client/FriendlyServersTest.java
@@ -21,6 +21,21 @@
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
+import io.netty.channel.ChannelException;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URL;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLHandshakeException;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -29,26 +44,10 @@
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.jboss.netty.channel.ChannelException;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLHandshakeException;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.URL;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* Tests with servers that are at least moderately well-behaving.
*/
@@ -71,7 +70,9 @@ public void run()
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
OutputStream out = clientSocket.getOutputStream()
) {
- while (!in.readLine().equals("")); // skip lines
+ while (!in.readLine().equals("")) {
+ ; // skip lines
+ }
out.write("HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nhello!".getBytes(Charsets.UTF_8));
}
catch (Exception e) {
@@ -92,7 +93,7 @@ public void run()
new StatusResponseHandler(Charsets.UTF_8)
).get();
- Assert.assertEquals(200, response.getStatus().getCode());
+ Assert.assertEquals(200, response.getStatus().code());
Assert.assertEquals("hello!", response.getContent());
}
finally {
@@ -123,7 +124,7 @@ public void run()
// Read headers
String header;
while (!(header = in.readLine()).equals("")) {
- if (header.equals("Accept-Encoding: identity")) {
+ if (header.toLowerCase().equals("Accept-Encoding: identity".toLowerCase())) {
foundAcceptEncoding.set(true);
}
}
@@ -149,7 +150,7 @@ public void run()
new StatusResponseHandler(Charsets.UTF_8)
).get();
- Assert.assertEquals(200, response.getStatus().getCode());
+ Assert.assertEquals(200, response.getStatus().code());
Assert.assertEquals("hello!", response.getContent());
Assert.assertTrue(foundAcceptEncoding.get());
}
@@ -199,17 +200,23 @@ public void testFriendlySelfSignedHttpsServer() throws Exception
{
final HttpResponseStatus status = trustingClient
.go(
- new Request(HttpMethod.GET, new URL(String.format("https://localhost:%d/", sslConnector.getLocalPort()))),
+ new Request(
+ HttpMethod.GET,
+ new URL(String.format("https://localhost:%d/", sslConnector.getLocalPort()))
+ ),
new StatusResponseHandler(Charsets.UTF_8)
).get().getStatus();
- Assert.assertEquals(404, status.getCode());
+ Assert.assertEquals(404, status.code());
}
// Incorrect name ("127.0.0.1")
{
final ListenableFuture response1 = trustingClient
.go(
- new Request(HttpMethod.GET, new URL(String.format("https://127.0.0.1:%d/", sslConnector.getLocalPort()))),
+ new Request(
+ HttpMethod.GET,
+ new URL(String.format("https://127.0.0.1:%d/", sslConnector.getLocalPort()))
+ ),
new StatusResponseHandler(Charsets.UTF_8)
);
@@ -265,13 +272,13 @@ public void testHttpBin() throws Throwable
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
{
- final HttpResponseStatus status =client
+ final HttpResponseStatus status = client
.go(
new Request(HttpMethod.GET, new URL("https://httpbin.org/get")),
new StatusResponseHandler(Charsets.UTF_8)
).get().getStatus();
- Assert.assertEquals(200, status.getCode());
+ Assert.assertEquals(200, status.code());
}
{
@@ -282,7 +289,7 @@ public void testHttpBin() throws Throwable
new StatusResponseHandler(Charsets.UTF_8)
).get().getStatus();
- Assert.assertEquals(200, status.getCode());
+ Assert.assertEquals(200, status.code());
}
}
finally {
diff --git a/src/test/java/com/metamx/http/client/JankyServersTest.java b/src/test/java/com/metamx/http/client/JankyServersTest.java
index 9614f4d..2645dd3 100644
--- a/src/test/java/com/metamx/http/client/JankyServersTest.java
+++ b/src/test/java/com/metamx/http/client/JankyServersTest.java
@@ -22,9 +22,9 @@
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import java.io.IOException;
-import org.jboss.netty.channel.ChannelException;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import io.netty.channel.ChannelException;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -305,16 +305,7 @@ public void testHttpEchoServer() throws Throwable
new StatusResponseHandler(Charsets.UTF_8)
);
- Throwable e = null;
- try {
- response.get();
- }
- catch (ExecutionException e1) {
- e = e1.getCause();
- }
-
- Assert.assertTrue("IllegalArgumentException thrown by 'get'", e instanceof IllegalArgumentException);
- Assert.assertTrue("Expected error message", e.getMessage().matches(".*invalid version format:.*"));
+ Assert.assertEquals(999, response.get().getStatus().code());
}
finally {
lifecycle.stop();
diff --git a/src/test/java/com/metamx/http/client/MockHttpClient.java b/src/test/java/com/metamx/http/client/MockHttpClient.java
index 63d73e3..f42edbc 100644
--- a/src/test/java/com/metamx/http/client/MockHttpClient.java
+++ b/src/test/java/com/metamx/http/client/MockHttpClient.java
@@ -22,7 +22,7 @@
import com.metamx.http.client.pool.ResourcePool;
import com.metamx.http.client.pool.ResourcePoolConfig;
import com.metamx.http.client.response.HttpResponseHandler;
-import org.jboss.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFuture;
import org.joda.time.Duration;
/**
diff --git a/src/test/java/com/metamx/http/client/response/SequenceInputStreamResponseHandlerTest.java b/src/test/java/com/metamx/http/client/response/SequenceInputStreamResponseHandlerTest.java
index 2da013e..fd2408c 100644
--- a/src/test/java/com/metamx/http/client/response/SequenceInputStreamResponseHandlerTest.java
+++ b/src/test/java/com/metamx/http/client/response/SequenceInputStreamResponseHandlerTest.java
@@ -16,17 +16,17 @@
package com.metamx.http.client.response;
-import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
-import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.buffer.UnpooledHeapByteBuf;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -34,6 +34,10 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
public class SequenceInputStreamResponseHandlerTest
{
@@ -80,20 +84,21 @@ public void testExceptionalChunkedStream() throws IOException
SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler();
final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
- response.setChunked(true);
+ HttpUtil.setTransferEncodingChunked(response, true);
ClientResponse clientResponse = responseHandler.handleResponse(response);
final int failAt = Math.abs(RANDOM.nextInt()) % allBytes.length;
while (it.hasNext()) {
- final DefaultHttpChunk chunk = new DefaultHttpChunk(
- new BigEndianHeapChannelBuffer(it.next())
+ byte[] array = it.next();
+ final DefaultHttpContent chunk = new DefaultHttpContent(
+ new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, array, array.length)
{
@Override
- public void getBytes(int index, byte[] dst, int dstIndex, int length)
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length)
{
if (dstIndex + length >= failAt) {
throw new TesterException();
}
- super.getBytes(index, dst, dstIndex, length);
+ return super.getBytes(index, dst, dstIndex, length);
}
}
);
@@ -114,21 +119,22 @@ public static class TesterException extends RuntimeException
public void testExceptionalSingleStream() throws IOException
{
SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler();
- final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
- response.setChunked(false);
- response.setContent(
- new BigEndianHeapChannelBuffer(allBytes)
+ final HttpResponse response = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1,
+ HttpResponseStatus.OK,
+ new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, allBytes, allBytes.length)
{
@Override
- public void getBytes(int index, byte[] dst, int dstIndex, int length)
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length)
{
if (dstIndex + length >= allBytes.length) {
throw new TesterException();
}
- super.getBytes(index, dst, dstIndex, length);
+ return super.getBytes(index, dst, dstIndex, length);
}
}
);
+ HttpUtil.setTransferEncodingChunked(response, false);
ClientResponse clientResponse = responseHandler.handleResponse(response);
clientResponse = responseHandler.done(clientResponse);
@@ -144,10 +150,10 @@ public void simpleMultiStreamTest() throws IOException
SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler();
final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
- response.setChunked(true);
+ HttpUtil.setTransferEncodingChunked(response, true);
ClientResponse clientResponse = responseHandler.handleResponse(response);
while (it.hasNext()) {
- final DefaultHttpChunk chunk = new DefaultHttpChunk(new BigEndianHeapChannelBuffer(it.next()));
+ final DefaultHttpContent chunk = new DefaultHttpContent(Unpooled.wrappedBuffer(it.next()));
clientResponse = responseHandler.handleChunk(clientResponse, chunk);
}
clientResponse = responseHandler.done(clientResponse);
@@ -166,7 +172,6 @@ public void simpleMultiStreamTest() throws IOException
Assert.assertEquals(allBytes.length, responseHandler.getByteCount());
}
-
@Test
public void alignedMultiStreamTest() throws IOException
{
@@ -174,10 +179,10 @@ public void alignedMultiStreamTest() throws IOException
SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler();
final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
- response.setChunked(true);
+ HttpUtil.setTransferEncodingChunked(response, true);
ClientResponse clientResponse = responseHandler.handleResponse(response);
while (it.hasNext()) {
- final DefaultHttpChunk chunk = new DefaultHttpChunk(new BigEndianHeapChannelBuffer(it.next()));
+ final DefaultHttpContent chunk = new DefaultHttpContent(Unpooled.wrappedBuffer(it.next()));
clientResponse = responseHandler.handleChunk(clientResponse, chunk);
}
clientResponse = responseHandler.done(clientResponse);
@@ -185,7 +190,7 @@ public void alignedMultiStreamTest() throws IOException
final InputStream stream = clientResponse.getObj();
final InputStream expectedStream = new ByteArrayInputStream(allBytes);
- for(byte[] bytes : BYTE_LIST) {
+ for (byte[] bytes : BYTE_LIST) {
final byte[] expectedBytes = new byte[bytes.length];
final byte[] actualBytes = new byte[expectedBytes.length];
fillBuff(stream, actualBytes);
@@ -200,9 +205,12 @@ public void alignedMultiStreamTest() throws IOException
public void simpleSingleStreamTest() throws IOException
{
SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler();
- final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
- response.setChunked(false);
- response.setContent(new BigEndianHeapChannelBuffer(allBytes));
+ final HttpResponse response = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1,
+ HttpResponseStatus.OK,
+ Unpooled.wrappedBuffer(allBytes)
+ );
+ HttpUtil.setTransferEncodingChunked(response, false);
ClientResponse clientResponse = responseHandler.handleResponse(response);
clientResponse = responseHandler.done(clientResponse);