diff --git a/pom.xml b/pom.xml index 06eddf8..0f5fd26 100644 --- a/pom.xml +++ b/pom.xml @@ -67,8 +67,8 @@ io.netty - netty - 3.10.6.Final + netty-all + 4.1.12.Final com.google.guava diff --git a/src/main/java/com/metamx/http/client/CredentialedHttpClient.java b/src/main/java/com/metamx/http/client/CredentialedHttpClient.java index daba088..0bed984 100644 --- a/src/main/java/com/metamx/http/client/CredentialedHttpClient.java +++ b/src/main/java/com/metamx/http/client/CredentialedHttpClient.java @@ -22,11 +22,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.metamx.http.client.auth.Credentials; import com.metamx.http.client.response.HttpResponseHandler; -import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Duration; -import java.net.URL; - /** */ public class CredentialedHttpClient extends AbstractHttpClient diff --git a/src/main/java/com/metamx/http/client/HttpClientConfig.java b/src/main/java/com/metamx/http/client/HttpClientConfig.java index 7d18c7b..76074b2 100644 --- a/src/main/java/com/metamx/http/client/HttpClientConfig.java +++ b/src/main/java/com/metamx/http/client/HttpClientConfig.java @@ -56,7 +56,7 @@ public String getEncodingString() * * @return encoding name */ - public abstract String getEncodingString(); + public abstract String getEncodingString(); /*TODO use for Content-Encoding*/ } public static final CompressionCodec DEFAULT_COMPRESSION_CODEC = CompressionCodec.GZIP; diff --git a/src/main/java/com/metamx/http/client/HttpClientInit.java b/src/main/java/com/metamx/http/client/HttpClientInit.java index 1cb5650..1e48b91 100644 --- a/src/main/java/com/metamx/http/client/HttpClientInit.java +++ b/src/main/java/com/metamx/http/client/HttpClientInit.java @@ -24,21 +24,17 @@ import com.metamx.http.client.pool.ChannelResourceFactory; import com.metamx.http.client.pool.ResourcePool; import com.metamx.http.client.pool.ResourcePoolConfig; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.socket.nio.NioClientBossPool; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioWorkerPool; -import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.logging.Log4JLoggerFactory; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.ThreadNameDeterminer; -import org.jboss.netty.util.Timer; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.internal.logging.InternalLoggerFactory; +import io.netty.util.internal.logging.Log4JLoggerFactory; import org.joda.time.Duration; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.security.KeyManagementException; import java.security.KeyStore; @@ -46,7 +42,6 @@ import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; /** */ @@ -55,47 +50,18 @@ public class HttpClientInit public static HttpClient createClient(HttpClientConfig config, Lifecycle lifecycle) { try { - // We need to use the full constructor in order to set a ThreadNameDeterminer. The other parameters are taken - // from the defaults in HashedWheelTimer's other constructors. - final HashedWheelTimer timer = new HashedWheelTimer( - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("HttpClient-Timer-%s") - .build(), - ThreadNameDeterminer.CURRENT, - 100, - TimeUnit.MILLISECONDS, - 512 - ); - lifecycle.addMaybeStartHandler( - new Lifecycle.Handler() - { - @Override - public void start() throws Exception - { - timer.start(); - } - - @Override - public void stop() - { - timer.stop(); - } - } - ); return lifecycle.addMaybeStartManagedInstance( new NettyHttpClient( new ResourcePool<>( new ChannelResourceFactory( - createBootstrap(lifecycle, timer, config.getBossPoolSize(), config.getWorkerPoolSize()), + createBootstrap(lifecycle, config.getBossPoolSize(), config.getWorkerPoolSize()), config.getSslContext(), - timer, config.getSslHandshakeTimeout() == null ? -1 : config.getSslHandshakeTimeout().getMillis() ), new ResourcePoolConfig(config.getNumConnections()) ), config.getReadTimeout(), - config.getCompressionCodec(), - timer + config.getCompressionCodec() ) ); } @@ -114,17 +80,10 @@ public static HttpClient createClient(ResourcePoolConfig config, final SSLContex } @Deprecated // use createClient directly - public static ClientBootstrap createBootstrap(Lifecycle lifecycle, Timer timer) + public static Bootstrap createBootstrap(Lifecycle lifecycle) { final HttpClientConfig defaultConfig = HttpClientConfig.builder().build(); - return createBootstrap(lifecycle, timer, defaultConfig.getBossPoolSize(), defaultConfig.getWorkerPoolSize()); - } - - @Deprecated // use createClient directly - public static ClientBootstrap createBootstrap(Lifecycle lifecycle) - { - final Timer timer = new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).build()); - return createBootstrap(lifecycle, timer); + return createBootstrap(lifecycle, defaultConfig.getBossPoolSize(), defaultConfig.getWorkerPoolSize()); } public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath, final String keyStorePassword) @@ -144,22 +103,7 @@ public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath return sslContext; } - catch (CertificateException e) { - throw Throwables.propagate(e); - } - catch (NoSuchAlgorithmException e) { - throw Throwables.propagate(e); - } - catch (KeyStoreException e) { - throw Throwables.propagate(e); - } - catch (KeyManagementException e) { - throw Throwables.propagate(e); - } - catch (FileNotFoundException e) { - throw Throwables.propagate(e); - } - catch (IOException e) { + catch (CertificateException | NoSuchAlgorithmException | KeyStoreException | KeyManagementException | IOException e) { throw Throwables.propagate(e); } finally { @@ -167,37 +111,25 @@ public static SSLContext sslContextWithTrustedKeyStore(final String keyStorePath } } - private static ClientBootstrap createBootstrap(Lifecycle lifecycle, Timer timer, int bossPoolSize, int workerPoolSize) + private static Bootstrap createBootstrap(Lifecycle lifecycle, int bossPoolSize, int workerPoolSize) { - final NioClientBossPool bossPool = new NioClientBossPool( - Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("HttpClient-Netty-Boss-%s") - .build() - ), + final NioEventLoopGroup group = new NioEventLoopGroup( bossPoolSize, - timer, - ThreadNameDeterminer.CURRENT - ); - - final NioWorkerPool workerPool = new NioWorkerPool( Executors.newCachedThreadPool( new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("HttpClient-Netty-Worker-%s") + .setNameFormat("HttpClient-Netty-Client-%s") .build() - ), - workerPoolSize, - ThreadNameDeterminer.CURRENT + ) ); - final ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(bossPool, workerPool)); - - bootstrap.setOption("keepAlive", true); - bootstrap.setPipelineFactory(new HttpClientPipelineFactory()); + final Bootstrap bootstrap = new Bootstrap() + .group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new HttpClientPipelineFactory()); - InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory()); + InternalLoggerFactory.setDefaultFactory(Log4JLoggerFactory.INSTANCE); try { lifecycle.addMaybeStartHandler( @@ -211,7 +143,6 @@ public void start() throws Exception @Override public void stop() { - bootstrap.releaseExternalResources(); } } ); diff --git a/src/main/java/com/metamx/http/client/NettyHttpClient.java b/src/main/java/com/metamx/http/client/NettyHttpClient.java index eaf201b..bf95b2e 100644 --- a/src/main/java/com/metamx/http/client/NettyHttpClient.java +++ b/src/main/java/com/metamx/http/client/NettyHttpClient.java @@ -30,24 +30,21 @@ import com.metamx.http.client.pool.ResourcePool; import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.HttpResponseHandler; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.handler.codec.http.DefaultHttpRequest; -import org.jboss.netty.handler.codec.http.HttpChunk; -import org.jboss.netty.handler.codec.http.HttpHeaders; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpVersion; -import org.jboss.netty.handler.timeout.ReadTimeoutHandler; -import org.jboss.netty.util.Timer; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.AsciiString; import org.joda.time.Duration; import java.net.URL; @@ -64,33 +61,24 @@ public class NettyHttpClient extends AbstractHttpClient private static final String READ_TIMEOUT_HANDLER_NAME = "read-timeout"; private static final String LAST_HANDLER_NAME = "last-handler"; - private final Timer timer; private final ResourcePool pool; private final HttpClientConfig.CompressionCodec compressionCodec; private final Duration defaultReadTimeout; - public NettyHttpClient( - ResourcePool pool - ) + public NettyHttpClient(ResourcePool pool) { - this(pool, null, HttpClientConfig.DEFAULT_COMPRESSION_CODEC, null); + this(pool, null, HttpClientConfig.DEFAULT_COMPRESSION_CODEC); } NettyHttpClient( ResourcePool pool, Duration defaultReadTimeout, - HttpClientConfig.CompressionCodec compressionCodec, - Timer timer + HttpClientConfig.CompressionCodec compressionCodec ) { this.pool = Preconditions.checkNotNull(pool, "pool"); this.defaultReadTimeout = defaultReadTimeout; this.compressionCodec = Preconditions.checkNotNull(compressionCodec); - this.timer = timer; - - if (defaultReadTimeout != null && defaultReadTimeout.getMillis() > 0) { - Preconditions.checkNotNull(timer, "timer"); - } } @LifecycleStart @@ -106,12 +94,7 @@ public void stop() public HttpClient withReadTimeout(Duration readTimeout) { - return new NettyHttpClient(pool, readTimeout, compressionCodec, timer); - } - - public NettyHttpClient withTimer(Timer timer) - { - return new NettyHttpClient(pool, defaultReadTimeout, compressionCodec, timer); + return new NettyHttpClient(pool, readTimeout, compressionCodec); } @Override @@ -123,7 +106,7 @@ public ListenableFuture go( { final HttpMethod method = request.getMethod(); final URL url = request.getUrl(); - final Multimap headers = request.getHeaders(); + final Multimap headers = request.getHeaders(); final String requestDesc = String.format("%s %s", method, url); if (log.isDebugEnabled()) { @@ -140,112 +123,105 @@ public ListenableFuture go( return Futures.immediateFailedFuture( new ChannelException( "Faulty channel in resource pool", - channelFuture.getCause() + channelFuture.cause() ) ); } else { - channel = channelFuture.getChannel(); + channel = channelFuture.channel(); } final String urlFile = Strings.nullToEmpty(url.getFile()); - final HttpRequest httpRequest = new DefaultHttpRequest( - HttpVersion.HTTP_1_1, - method, - urlFile.isEmpty() ? "/" : urlFile - ); + String uri; + if (urlFile.isEmpty()) { + uri = "/"; + } else { + uri = urlFile; + } + final DefaultFullHttpRequest httpRequest; + if (request.hasContent()) { + httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri, request.getContent()); + } else { + httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri); + } - if (!headers.containsKey(HttpHeaders.Names.HOST)) { - httpRequest.headers().add(HttpHeaders.Names.HOST, getHost(url)); + if (!headers.containsKey(HttpHeaderNames.HOST)) { + httpRequest.headers().add(HttpHeaderNames.HOST, getHost(url)); } // If Accept-Encoding is set in the Request, use that. Otherwise use the default from "compressionCodec". - if (!headers.containsKey(HttpHeaders.Names.ACCEPT_ENCODING)) { - httpRequest.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, compressionCodec.getEncodingString()); + if (!headers.containsKey(HttpHeaderNames.ACCEPT_ENCODING)) { + httpRequest.headers().set(HttpHeaderNames.ACCEPT_ENCODING, compressionCodec.getEncodingString()); } - for (Map.Entry> entry : headers.asMap().entrySet()) { - String key = entry.getKey(); + for (Map.Entry> entry : headers.asMap().entrySet()) { + AsciiString key = entry.getKey(); for (String obj : entry.getValue()) { httpRequest.headers().add(key, obj); } } - if (request.hasContent()) { - httpRequest.setContent(request.getContent()); - } - final long readTimeout = getReadTimeout(requestReadTimeout); final SettableFuture retVal = SettableFuture.create(); if (readTimeout > 0) { - channel.getPipeline().addLast( - READ_TIMEOUT_HANDLER_NAME, - new ReadTimeoutHandler(timer, readTimeout, TimeUnit.MILLISECONDS) - ); + channel.pipeline().addLast(READ_TIMEOUT_HANDLER_NAME, new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS)); } - - channel.getPipeline().addLast( + channel.pipeline().addLast( LAST_HANDLER_NAME, - new SimpleChannelUpstreamHandler() + new SimpleChannelInboundHandler() { - private volatile ClientResponse response = null; + private ClientResponse response = null; @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (log.isDebugEnabled()) { - log.debug("[%s] messageReceived: %s", requestDesc, e.getMessage()); + log.debug("[%s] messageReceived: %s", requestDesc, msg); } try { - Object msg = e.getMessage(); - if (msg instanceof HttpResponse) { HttpResponse httpResponse = (HttpResponse) msg; if (log.isDebugEnabled()) { - log.debug("[%s] Got response: %s", requestDesc, httpResponse.getStatus()); + log.debug("[%s] Got response: %s", requestDesc, httpResponse.status()); } response = handler.handleResponse(httpResponse); if (response.isFinished()) { retVal.set((Final) response.getObj()); } - - if (!httpResponse.isChunked()) { - finishRequest(); - } - } else if (msg instanceof HttpChunk) { - HttpChunk httpChunk = (HttpChunk) msg; + } + if (msg instanceof HttpContent) { + HttpContent httpChunk = (HttpContent) msg; + boolean isLast = httpChunk instanceof LastHttpContent; if (log.isDebugEnabled()) { log.debug( "[%s] Got chunk: %sB, last=%s", requestDesc, - httpChunk.getContent().readableBytes(), - httpChunk.isLast() + httpChunk.content().readableBytes(), + isLast ); } - - if (httpChunk.isLast()) { + response = handler.handleChunk(response, httpChunk); + if (response.isFinished()) { + retVal.set((Final) response.getObj()); + } + if (isLast) { finishRequest(); - } else { - response = handler.handleChunk(response, httpChunk); - if (response.isFinished() && !retVal.isDone()) { - retVal.set((Final) response.getObj()); - } } - } else { + } + if (!(msg instanceof HttpContent) && !(msg instanceof HttpResponse)) { throw new IllegalStateException(String.format("Unknown message type[%s]", msg.getClass())); } + } catch (Exception ex) { log.warn(ex, "[%s] Exception thrown while processing message, closing channel.", requestDesc); - if (!retVal.isDone()) { - retVal.set(null); + retVal.setException(ex); } channel.close(); channelResourceContainer.returnResource(); - throw ex; } } @@ -270,21 +246,19 @@ private void finishRequest() } @Override - public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws Exception + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (log.isDebugEnabled()) { - final Throwable cause = event.getCause(); if (cause == null) { log.debug("[%s] Caught exception", requestDesc); } else { log.debug(cause, "[%s] Caught exception", requestDesc); } } - - retVal.setException(event.getCause()); + retVal.setException(cause); // response is non-null if we received initial chunk and then exception occurs if (response != null) { - handler.exceptionCaught(response, event.getCause()); + handler.exceptionCaught(response, cause); } removeHandlers(); try { @@ -296,12 +270,12 @@ public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) finally { channelResourceContainer.returnResource(); } - - context.sendUpstream(event); + super.exceptionCaught(ctx, cause); } + @Override - public void channelDisconnected(ChannelHandlerContext context, ChannelStateEvent event) throws Exception + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (log.isDebugEnabled()) { log.debug("[%s] Channel disconnected", requestDesc); @@ -316,20 +290,20 @@ public void channelDisconnected(ChannelHandlerContext context, ChannelStateEvent log.warn("[%s] Channel disconnected before response complete", requestDesc); retVal.setException(new ChannelException("Channel disconnected")); } - context.sendUpstream(event); + super.channelInactive(ctx); } private void removeHandlers() { if (readTimeout > 0) { - channel.getPipeline().remove(READ_TIMEOUT_HANDLER_NAME); + channel.pipeline().remove(READ_TIMEOUT_HANDLER_NAME); } - channel.getPipeline().remove(LAST_HANDLER_NAME); + channel.pipeline().remove(LAST_HANDLER_NAME); } } ); - channel.write(httpRequest).addListener( + channel.writeAndFlush(httpRequest).addListener( new ChannelFutureListener() { @Override @@ -342,7 +316,7 @@ public void operationComplete(ChannelFuture future) throws Exception retVal.setException( new ChannelException( String.format("[%s] Failed to write request to channel", requestDesc), - future.getCause() + future.cause() ) ); } @@ -364,13 +338,7 @@ private long getReadTimeout(Duration requestReadTimeout) } else { timeout = 0; } - - if (timeout > 0 && timer == null) { - log.warn("Cannot time out requests without a timer! Disabling timeout for this request."); - return 0; - } else { - return timeout; - } + return timeout; } private String getHost(URL url) diff --git a/src/main/java/com/metamx/http/client/Request.java b/src/main/java/com/metamx/http/client/Request.java index b846f93..3ed2ed3 100644 --- a/src/main/java/com/metamx/http/client/Request.java +++ b/src/main/java/com/metamx/http/client/Request.java @@ -22,16 +22,13 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; -import com.metamx.http.client.response.HttpResponseHandler; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferFactory; -import org.jboss.netty.buffer.HeapChannelBufferFactory; -import org.jboss.netty.handler.codec.base64.Base64; -import org.jboss.netty.handler.codec.http.HttpHeaders; -import org.jboss.netty.handler.codec.http.HttpMethod; - +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.util.AsciiString; import java.net.URL; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -41,12 +38,10 @@ */ public class Request { - private static final ChannelBufferFactory factory = HeapChannelBufferFactory.getInstance(); - private final HttpMethod method; private final URL url; - private final Multimap headers = Multimaps.newListMultimap( - Maps.>newHashMap(), + private final Multimap headers = Multimaps.newListMultimap( + Maps.>newHashMap(), new Supplier>() { @Override public List get() { @@ -55,7 +50,7 @@ public List get() { } ); - private ChannelBuffer content; + private ByteBuf content; public Request( HttpMethod method, @@ -76,7 +71,7 @@ public URL getUrl() return url; } - public Multimap getHeaders() + public Multimap getHeaders() { return headers; } @@ -86,7 +81,7 @@ public boolean hasContent() return content != null; } - public ChannelBuffer getContent() + public ByteBuf getContent() { return content; } @@ -98,39 +93,39 @@ public Request copy() { return retVal; } - public Request setHeader(String header, String value) + public Request setHeader(AsciiString header, String value) { headers.replaceValues(header, Arrays.asList(value)); return this; } - public Request setHeaderValues(String header, Iterable value) + public Request setHeaderValues(AsciiString header, Iterable value) { headers.replaceValues(header, value); return this; } - public Request setHeaderValues(Multimap inHeaders) { - for (Map.Entry> entry : inHeaders.asMap().entrySet()) { + public Request setHeaderValues(Multimap inHeaders) { + for (Map.Entry> entry : inHeaders.asMap().entrySet()) { this.setHeaderValues(entry.getKey(), entry.getValue()); } return this; } - public Request addHeader(String header, String value) + public Request addHeader(AsciiString header, String value) { headers.put(header, value); return this; } - public Request addHeaderValues(String header, Iterable value) + public Request addHeaderValues(AsciiString header, Iterable value) { headers.putAll(header, value); return this; } - public Request addHeaderValues(Multimap inHeaders) { - for (Map.Entry> entry : inHeaders.asMap().entrySet()) { + public Request addHeaderValues(Multimap inHeaders) { + for (Map.Entry> entry : inHeaders.asMap().entrySet()) { this.addHeaderValues(entry.getKey(), entry.getValue()); } return this; @@ -146,7 +141,7 @@ public Request setContent(byte[] bytes, int offset, int length) return setContent(null, bytes, offset, length); } - public Request setContent(ChannelBuffer content) + public Request setContent(ByteBuf content) { return setContent(null, content); } @@ -158,18 +153,18 @@ public Request setContent(String contentType, byte[] bytes) public Request setContent(String contentType, byte[] bytes, int offset, int length) { - return setContent(contentType, factory.getBuffer(bytes, offset, length)); + return setContent(contentType, bytes, offset, length); } - public Request setContent(String contentType, ChannelBuffer content) + public Request setContent(String contentType, ByteBuf content) { if (contentType != null) { - setHeader(HttpHeaders.Names.CONTENT_TYPE, contentType); + setHeader(HttpHeaderNames.CONTENT_TYPE, contentType); } this.content = content; - setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(content.writerIndex())); + setHeader(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(content.writerIndex())); return this; } @@ -177,16 +172,14 @@ public Request setContent(String contentType, ChannelBuffer content) public Request setBasicAuthentication(String username, String password) { final String base64Value = base64Encode(String.format("%s:%s", username, password)); - setHeader(HttpHeaders.Names.AUTHORIZATION, String.format("Basic %s", base64Value)); + setHeader(HttpHeaderNames.AUTHORIZATION, String.format("Basic %s", base64Value)); return this; } private String base64Encode(final String value) { - final ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance(); - return Base64 - .encode(bufferFactory.getBuffer(ByteBuffer.wrap(value.getBytes(Charsets.UTF_8))), false) + .encode(Unpooled.wrappedBuffer(value.getBytes(Charsets.UTF_8)), false) .toString(Charsets.UTF_8); } } \ No newline at end of file diff --git a/src/main/java/com/metamx/http/client/netty/HttpClientPipelineFactory.java b/src/main/java/com/metamx/http/client/netty/HttpClientPipelineFactory.java index 51759ed..2a2f352 100644 --- a/src/main/java/com/metamx/http/client/netty/HttpClientPipelineFactory.java +++ b/src/main/java/com/metamx/http/client/netty/HttpClientPipelineFactory.java @@ -16,24 +16,21 @@ package com.metamx.http.client.netty; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.DefaultChannelPipeline; -import org.jboss.netty.handler.codec.http.HttpClientCodec; -import org.jboss.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpContentDecompressor; /** */ -public class HttpClientPipelineFactory implements ChannelPipelineFactory +public class HttpClientPipelineFactory extends ChannelInitializer { @Override - public ChannelPipeline getPipeline() throws Exception + protected void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = new DefaultChannelPipeline(); - + ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("codec", new HttpClientCodec()); pipeline.addLast("inflater", new HttpContentDecompressor()); - - return pipeline; } } diff --git a/src/main/java/com/metamx/http/client/pool/ChannelResourceFactory.java b/src/main/java/com/metamx/http/client/pool/ChannelResourceFactory.java index 0a50bf4..8f057b8 100644 --- a/src/main/java/com/metamx/http/client/pool/ChannelResourceFactory.java +++ b/src/main/java/com/metamx/http/client/pool/ChannelResourceFactory.java @@ -18,23 +18,22 @@ import com.google.common.base.Preconditions; import com.metamx.common.logger.Logger; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.handler.ssl.ImmediateExecutor; -import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.util.Timer; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; /** */ @@ -44,26 +43,19 @@ public class ChannelResourceFactory implements ResourceFactory= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT; - - if (sslContext != null) { - Preconditions.checkNotNull(timer, "timer is required when sslContext is present"); - } } @Override @@ -93,21 +85,13 @@ public ChannelFuture generate(final String hostname) sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); sslEngine.setSSLParameters(sslParameters); sslEngine.setUseClientMode(true); - final SslHandler sslHandler = new SslHandler( - sslEngine, - SslHandler.getDefaultBufferPool(), - false, - timer, - sslHandshakeTimeout - ); - - // https://github.com/netty/netty/issues/160 - sslHandler.setCloseOnSSLException(true); + final SslHandler sslHandler = new SslHandler(sslEngine); + sslHandler.setHandshakeTimeoutMillis(sslHandshakeTimeout); - final ChannelPipeline pipeline = connectFuture.getChannel().getPipeline(); + final ChannelPipeline pipeline = connectFuture.channel().pipeline(); pipeline.addFirst("ssl", sslHandler); - final ChannelFuture handshakeFuture = Channels.future(connectFuture.getChannel()); + final ChannelPromise handshakeFuture = connectFuture.channel().newPromise(); connectFuture.addListener( new ChannelFutureListener() { @@ -115,11 +99,11 @@ public ChannelFuture generate(final String hostname) public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { - sslHandler.handshake().addListener( - new ChannelFutureListener() + sslHandler.handshakeFuture().addListener( + new GenericFutureListener>() { @Override - public void operationComplete(ChannelFuture f2) throws Exception + public void operationComplete(Future f2) throws Exception { if (f2.isSuccess()) { handshakeFuture.setSuccess(); @@ -127,7 +111,7 @@ public void operationComplete(ChannelFuture f2) throws Exception handshakeFuture.setFailure( new ChannelException( String.format("Failed to handshake with host[%s]", hostname), - f2.getCause() + f2.cause() ) ); } @@ -138,7 +122,7 @@ public void operationComplete(ChannelFuture f2) throws Exception handshakeFuture.setFailure( new ChannelException( String.format("Failed to connect to host[%s]", hostname), - f.getCause() + f.cause() ) ); } @@ -157,23 +141,22 @@ public void operationComplete(ChannelFuture f2) throws Exception @Override public boolean isGood(ChannelFuture resource) { - Channel channel = resource.awaitUninterruptibly().getChannel(); + Channel channel = resource.awaitUninterruptibly().channel(); boolean isSuccess = resource.isSuccess(); - boolean isConnected = channel.isConnected(); boolean isOpen = channel.isOpen(); if (log.isTraceEnabled()) { - log.trace("isGood = isSucess[%s] && isConnected[%s] && isOpen[%s]", isSuccess, isConnected, isOpen); + log.trace("isGood = isSucess[%s] && isOpen[%s]", isSuccess, isOpen); } - return isSuccess && isConnected && isOpen; + return isSuccess && isOpen; } @Override public void close(ChannelFuture resource) { log.trace("Closing"); - resource.awaitUninterruptibly().getChannel().close(); + resource.awaitUninterruptibly().channel().close(); } } diff --git a/src/main/java/com/metamx/http/client/response/FullResponseHandler.java b/src/main/java/com/metamx/http/client/response/FullResponseHandler.java index 992bdb8..9351420 100644 --- a/src/main/java/com/metamx/http/client/response/FullResponseHandler.java +++ b/src/main/java/com/metamx/http/client/response/FullResponseHandler.java @@ -16,9 +16,10 @@ package com.metamx.http.client.response; -import org.jboss.netty.handler.codec.http.HttpChunk; -import org.jboss.netty.handler.codec.http.HttpResponse; - +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpResponse; import java.nio.charset.Charset; /** @@ -35,11 +36,12 @@ public FullResponseHandler(Charset charset) @Override public ClientResponse handleResponse(HttpResponse response) { + ByteBuf content = response instanceof HttpContent ? ((HttpContent) response).content() : Unpooled.EMPTY_BUFFER; return ClientResponse.unfinished( new FullResponseHolder( - response.getStatus(), + response.status(), response, - new StringBuilder(response.getContent().toString(charset)) + new StringBuilder(content.toString(charset)) ) ); } @@ -47,7 +49,7 @@ public ClientResponse handleResponse(HttpResponse response) @Override public ClientResponse handleChunk( ClientResponse response, - HttpChunk chunk + HttpContent chunk ) { final StringBuilder builder = response.getObj().getBuilder(); @@ -56,7 +58,7 @@ public ClientResponse handleChunk( return ClientResponse.finished(null); } - builder.append(chunk.getContent().toString(charset)); + builder.append(chunk.content().toString(charset)); return response; } diff --git a/src/main/java/com/metamx/http/client/response/FullResponseHolder.java b/src/main/java/com/metamx/http/client/response/FullResponseHolder.java index 4ab8af4..c16282a 100644 --- a/src/main/java/com/metamx/http/client/response/FullResponseHolder.java +++ b/src/main/java/com/metamx/http/client/response/FullResponseHolder.java @@ -16,8 +16,8 @@ package com.metamx.http.client.response; -import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; /** */ diff --git a/src/main/java/com/metamx/http/client/response/HttpResponseHandler.java b/src/main/java/com/metamx/http/client/response/HttpResponseHandler.java index 412a4a3..2f47aac 100644 --- a/src/main/java/com/metamx/http/client/response/HttpResponseHandler.java +++ b/src/main/java/com/metamx/http/client/response/HttpResponseHandler.java @@ -16,8 +16,8 @@ package com.metamx.http.client.response; -import org.jboss.netty.handler.codec.http.HttpChunk; -import org.jboss.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpResponse; /** * A handler for an HTTP request. @@ -45,7 +45,7 @@ public interface HttpResponseHandler * @return */ public ClientResponse handleResponse(HttpResponse response); - public ClientResponse handleChunk(ClientResponse clientResponse, HttpChunk chunk); + public ClientResponse handleChunk(ClientResponse clientResponse, HttpContent chunk); public ClientResponse done(ClientResponse clientResponse); public void exceptionCaught(ClientResponse clientResponse,Throwable e); } diff --git a/src/main/java/com/metamx/http/client/response/InputStreamResponseHandler.java b/src/main/java/com/metamx/http/client/response/InputStreamResponseHandler.java index 3246180..cfdc773 100644 --- a/src/main/java/com/metamx/http/client/response/InputStreamResponseHandler.java +++ b/src/main/java/com/metamx/http/client/response/InputStreamResponseHandler.java @@ -16,12 +16,11 @@ package com.metamx.http.client.response; -import com.google.common.base.Throwables; import com.metamx.http.client.io.AppendableByteArrayInputStream; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.handler.codec.http.HttpChunk; -import org.jboss.netty.handler.codec.http.HttpResponse; - +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpResponse; import java.io.InputStream; /** @@ -31,17 +30,18 @@ public class InputStreamResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response) { + ByteBuf content = response instanceof HttpContent ? ((HttpContent) response).content() : Unpooled.EMPTY_BUFFER; AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); - in.add(getContentBytes(response.getContent())); + in.add(getBytes(content)); return ClientResponse.finished(in); } @Override public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk + ClientResponse clientResponse, HttpContent chunk ) { - clientResponse.getObj().add(getContentBytes(chunk.getContent())); + clientResponse.getObj().add(getBytes(chunk.content())); return clientResponse; } @@ -63,10 +63,11 @@ public void exceptionCaught( obj.exceptionCaught(e); } - private byte[] getContentBytes(ChannelBuffer content) + private byte[] getBytes(ByteBuf content) { - byte[] contentBytes = new byte[content.readableBytes()]; - content.readBytes(contentBytes); - return contentBytes; + byte[] bytes = new byte[content.readableBytes()]; + int readerIndex = content.readerIndex(); + content.getBytes(readerIndex, bytes); + return bytes; } } diff --git a/src/main/java/com/metamx/http/client/response/SequenceInputStreamResponseHandler.java b/src/main/java/com/metamx/http/client/response/SequenceInputStreamResponseHandler.java index e7166c2..5a79daa 100644 --- a/src/main/java/com/metamx/http/client/response/SequenceInputStreamResponseHandler.java +++ b/src/main/java/com/metamx/http/client/response/SequenceInputStreamResponseHandler.java @@ -19,11 +19,11 @@ import com.google.common.base.Throwables; import com.google.common.io.ByteSource; import com.metamx.common.logger.Logger; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferInputStream; -import org.jboss.netty.handler.codec.http.HttpChunk; -import org.jboss.netty.handler.codec.http.HttpResponse; - +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpResponse; import java.io.IOException; import java.io.InputStream; import java.io.SequenceInputStream; @@ -54,15 +54,16 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response) { + ByteBuf content = response instanceof HttpContent ? ((HttpContent) response).content() : Unpooled.EMPTY_BUFFER; try { - queue.put(new ChannelBufferInputStream(response.getContent())); + queue.put(new ByteBufInputStream(content)); } catch (InterruptedException e) { log.error(e, "Queue appending interrupted"); Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - byteCount.addAndGet(response.getContent().readableBytes()); + byteCount.addAndGet(content.readableBytes()); return ClientResponse.finished( new SequenceInputStream( new Enumeration() @@ -96,14 +97,12 @@ public InputStream nextElement() @Override public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk + ClientResponse clientResponse, HttpContent chunk ) { - final ChannelBuffer channelBuffer = chunk.getContent(); - final int bytes = channelBuffer.readableBytes(); - if (bytes > 0) { + if (chunk.content().readableBytes() > 0) { try { - queue.put(new ChannelBufferInputStream(channelBuffer)); + queue.put(new ByteBufInputStream(chunk.content())); // Queue.size() can be expensive in some implementations, but LinkedBlockingQueue.size is just an AtomicLong log.debug("Added stream. Queue length %d", queue.size()); } @@ -112,7 +111,7 @@ public ClientResponse handleChunk( Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - byteCount.addAndGet(bytes); + byteCount.addAndGet(chunk.content().readableBytes()); } else { log.debug("Skipping zero length chunk"); } diff --git a/src/main/java/com/metamx/http/client/response/StatusResponseHandler.java b/src/main/java/com/metamx/http/client/response/StatusResponseHandler.java index 7708dc0..5ad14c5 100644 --- a/src/main/java/com/metamx/http/client/response/StatusResponseHandler.java +++ b/src/main/java/com/metamx/http/client/response/StatusResponseHandler.java @@ -16,9 +16,10 @@ package com.metamx.http.client.response; -import org.jboss.netty.handler.codec.http.HttpChunk; -import org.jboss.netty.handler.codec.http.HttpResponse; - +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpResponse; import java.nio.charset.Charset; /** @@ -35,10 +36,11 @@ public StatusResponseHandler(Charset charset) @Override public ClientResponse handleResponse(HttpResponse response) { + ByteBuf content = response instanceof HttpContent ? ((HttpContent) response).content() : Unpooled.EMPTY_BUFFER; return ClientResponse.unfinished( new StatusResponseHolder( - response.getStatus(), - new StringBuilder(response.getContent().toString(charset)) + response.status(), + new StringBuilder(content.toString(charset)) ) ); } @@ -46,7 +48,7 @@ public ClientResponse handleResponse(HttpResponse response @Override public ClientResponse handleChunk( ClientResponse response, - HttpChunk chunk + HttpContent chunk ) { final StringBuilder builder = response.getObj().getBuilder(); @@ -55,7 +57,7 @@ public ClientResponse handleChunk( return ClientResponse.finished(null); } - builder.append(chunk.getContent().toString(charset)); + builder.append(chunk.content().toString(charset)); return response; } diff --git a/src/main/java/com/metamx/http/client/response/StatusResponseHolder.java b/src/main/java/com/metamx/http/client/response/StatusResponseHolder.java index 7ce3286..2bb280f 100644 --- a/src/main/java/com/metamx/http/client/response/StatusResponseHolder.java +++ b/src/main/java/com/metamx/http/client/response/StatusResponseHolder.java @@ -16,7 +16,7 @@ package com.metamx.http.client.response; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpResponseStatus; /** */ diff --git a/src/main/java/com/metamx/http/client/response/ToStringResponseHandler.java b/src/main/java/com/metamx/http/client/response/ToStringResponseHandler.java index 9b2ec21..f693e9d 100644 --- a/src/main/java/com/metamx/http/client/response/ToStringResponseHandler.java +++ b/src/main/java/com/metamx/http/client/response/ToStringResponseHandler.java @@ -16,8 +16,8 @@ package com.metamx.http.client.response; -import org.jboss.netty.handler.codec.http.HttpChunk; -import org.jboss.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpResponse; import java.nio.charset.Charset; @@ -35,13 +35,19 @@ public ToStringResponseHandler(Charset charset) @Override public ClientResponse handleResponse(HttpResponse response) { - return ClientResponse.unfinished(new StringBuilder(response.getContent().toString(charset))); + final StringBuilder builder; + if (response instanceof HttpContent) { + builder = new StringBuilder(((HttpContent) response).content().toString(charset)); + } else { + builder = new StringBuilder(); + } + return ClientResponse.unfinished(builder); } @Override public ClientResponse handleChunk( ClientResponse response, - HttpChunk chunk + HttpContent chunk ) { final StringBuilder builder = response.getObj(); @@ -49,7 +55,7 @@ public ClientResponse handleChunk( return ClientResponse.finished(null); } - builder.append(chunk.getContent().toString(charset)); + builder.append(chunk.content().toString(charset)); return response; } diff --git a/src/test/java/com/metamx/http/client/FriendlyServersTest.java b/src/test/java/com/metamx/http/client/FriendlyServersTest.java index bfa2107..52bd4bb 100644 --- a/src/test/java/com/metamx/http/client/FriendlyServersTest.java +++ b/src/test/java/com/metamx/http/client/FriendlyServersTest.java @@ -21,6 +21,21 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; +import io.netty.channel.ChannelException; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.URL; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLHandshakeException; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; @@ -29,26 +44,10 @@ import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLHandshakeException; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.URL; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - /** * Tests with servers that are at least moderately well-behaving. */ @@ -71,7 +70,9 @@ public void run() BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); OutputStream out = clientSocket.getOutputStream() ) { - while (!in.readLine().equals("")); // skip lines + while (!in.readLine().equals("")) { + ; // skip lines + } out.write("HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nhello!".getBytes(Charsets.UTF_8)); } catch (Exception e) { @@ -92,7 +93,7 @@ public void run() new StatusResponseHandler(Charsets.UTF_8) ).get(); - Assert.assertEquals(200, response.getStatus().getCode()); + Assert.assertEquals(200, response.getStatus().code()); Assert.assertEquals("hello!", response.getContent()); } finally { @@ -123,7 +124,7 @@ public void run() // Read headers String header; while (!(header = in.readLine()).equals("")) { - if (header.equals("Accept-Encoding: identity")) { + if (header.toLowerCase().equals("Accept-Encoding: identity".toLowerCase())) { foundAcceptEncoding.set(true); } } @@ -149,7 +150,7 @@ public void run() new StatusResponseHandler(Charsets.UTF_8) ).get(); - Assert.assertEquals(200, response.getStatus().getCode()); + Assert.assertEquals(200, response.getStatus().code()); Assert.assertEquals("hello!", response.getContent()); Assert.assertTrue(foundAcceptEncoding.get()); } @@ -199,17 +200,23 @@ public void testFriendlySelfSignedHttpsServer() throws Exception { final HttpResponseStatus status = trustingClient .go( - new Request(HttpMethod.GET, new URL(String.format("https://localhost:%d/", sslConnector.getLocalPort()))), + new Request( + HttpMethod.GET, + new URL(String.format("https://localhost:%d/", sslConnector.getLocalPort())) + ), new StatusResponseHandler(Charsets.UTF_8) ).get().getStatus(); - Assert.assertEquals(404, status.getCode()); + Assert.assertEquals(404, status.code()); } // Incorrect name ("127.0.0.1") { final ListenableFuture response1 = trustingClient .go( - new Request(HttpMethod.GET, new URL(String.format("https://127.0.0.1:%d/", sslConnector.getLocalPort()))), + new Request( + HttpMethod.GET, + new URL(String.format("https://127.0.0.1:%d/", sslConnector.getLocalPort())) + ), new StatusResponseHandler(Charsets.UTF_8) ); @@ -265,13 +272,13 @@ public void testHttpBin() throws Throwable final HttpClient client = HttpClientInit.createClient(config, lifecycle); { - final HttpResponseStatus status =client + final HttpResponseStatus status = client .go( new Request(HttpMethod.GET, new URL("https://httpbin.org/get")), new StatusResponseHandler(Charsets.UTF_8) ).get().getStatus(); - Assert.assertEquals(200, status.getCode()); + Assert.assertEquals(200, status.code()); } { @@ -282,7 +289,7 @@ public void testHttpBin() throws Throwable new StatusResponseHandler(Charsets.UTF_8) ).get().getStatus(); - Assert.assertEquals(200, status.getCode()); + Assert.assertEquals(200, status.code()); } } finally { diff --git a/src/test/java/com/metamx/http/client/JankyServersTest.java b/src/test/java/com/metamx/http/client/JankyServersTest.java index 9614f4d..2645dd3 100644 --- a/src/test/java/com/metamx/http/client/JankyServersTest.java +++ b/src/test/java/com/metamx/http/client/JankyServersTest.java @@ -22,9 +22,9 @@ import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import java.io.IOException; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.timeout.ReadTimeoutException; +import io.netty.channel.ChannelException; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.timeout.ReadTimeoutException; import org.joda.time.Duration; import org.junit.AfterClass; import org.junit.Assert; @@ -305,16 +305,7 @@ public void testHttpEchoServer() throws Throwable new StatusResponseHandler(Charsets.UTF_8) ); - Throwable e = null; - try { - response.get(); - } - catch (ExecutionException e1) { - e = e1.getCause(); - } - - Assert.assertTrue("IllegalArgumentException thrown by 'get'", e instanceof IllegalArgumentException); - Assert.assertTrue("Expected error message", e.getMessage().matches(".*invalid version format:.*")); + Assert.assertEquals(999, response.get().getStatus().code()); } finally { lifecycle.stop(); diff --git a/src/test/java/com/metamx/http/client/MockHttpClient.java b/src/test/java/com/metamx/http/client/MockHttpClient.java index 63d73e3..f42edbc 100644 --- a/src/test/java/com/metamx/http/client/MockHttpClient.java +++ b/src/test/java/com/metamx/http/client/MockHttpClient.java @@ -22,7 +22,7 @@ import com.metamx.http.client.pool.ResourcePool; import com.metamx.http.client.pool.ResourcePoolConfig; import com.metamx.http.client.response.HttpResponseHandler; -import org.jboss.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFuture; import org.joda.time.Duration; /** diff --git a/src/test/java/com/metamx/http/client/response/SequenceInputStreamResponseHandlerTest.java b/src/test/java/com/metamx/http/client/response/SequenceInputStreamResponseHandlerTest.java index 2da013e..fd2408c 100644 --- a/src/test/java/com/metamx/http/client/response/SequenceInputStreamResponseHandlerTest.java +++ b/src/test/java/com/metamx/http/client/response/SequenceInputStreamResponseHandlerTest.java @@ -16,17 +16,17 @@ package com.metamx.http.client.response; -import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; -import org.jboss.netty.handler.codec.http.DefaultHttpChunk; -import org.jboss.netty.handler.codec.http.DefaultHttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.jboss.netty.handler.codec.http.HttpVersion; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.UnpooledHeapByteBuf; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -34,6 +34,10 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.Random; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; public class SequenceInputStreamResponseHandlerTest { @@ -80,20 +84,21 @@ public void testExceptionalChunkedStream() throws IOException SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler(); final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.setChunked(true); + HttpUtil.setTransferEncodingChunked(response, true); ClientResponse clientResponse = responseHandler.handleResponse(response); final int failAt = Math.abs(RANDOM.nextInt()) % allBytes.length; while (it.hasNext()) { - final DefaultHttpChunk chunk = new DefaultHttpChunk( - new BigEndianHeapChannelBuffer(it.next()) + byte[] array = it.next(); + final DefaultHttpContent chunk = new DefaultHttpContent( + new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, array, array.length) { @Override - public void getBytes(int index, byte[] dst, int dstIndex, int length) + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { if (dstIndex + length >= failAt) { throw new TesterException(); } - super.getBytes(index, dst, dstIndex, length); + return super.getBytes(index, dst, dstIndex, length); } } ); @@ -114,21 +119,22 @@ public static class TesterException extends RuntimeException public void testExceptionalSingleStream() throws IOException { SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler(); - final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.setChunked(false); - response.setContent( - new BigEndianHeapChannelBuffer(allBytes) + final HttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.OK, + new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, allBytes, allBytes.length) { @Override - public void getBytes(int index, byte[] dst, int dstIndex, int length) + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { if (dstIndex + length >= allBytes.length) { throw new TesterException(); } - super.getBytes(index, dst, dstIndex, length); + return super.getBytes(index, dst, dstIndex, length); } } ); + HttpUtil.setTransferEncodingChunked(response, false); ClientResponse clientResponse = responseHandler.handleResponse(response); clientResponse = responseHandler.done(clientResponse); @@ -144,10 +150,10 @@ public void simpleMultiStreamTest() throws IOException SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler(); final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.setChunked(true); + HttpUtil.setTransferEncodingChunked(response, true); ClientResponse clientResponse = responseHandler.handleResponse(response); while (it.hasNext()) { - final DefaultHttpChunk chunk = new DefaultHttpChunk(new BigEndianHeapChannelBuffer(it.next())); + final DefaultHttpContent chunk = new DefaultHttpContent(Unpooled.wrappedBuffer(it.next())); clientResponse = responseHandler.handleChunk(clientResponse, chunk); } clientResponse = responseHandler.done(clientResponse); @@ -166,7 +172,6 @@ public void simpleMultiStreamTest() throws IOException Assert.assertEquals(allBytes.length, responseHandler.getByteCount()); } - @Test public void alignedMultiStreamTest() throws IOException { @@ -174,10 +179,10 @@ public void alignedMultiStreamTest() throws IOException SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler(); final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.setChunked(true); + HttpUtil.setTransferEncodingChunked(response, true); ClientResponse clientResponse = responseHandler.handleResponse(response); while (it.hasNext()) { - final DefaultHttpChunk chunk = new DefaultHttpChunk(new BigEndianHeapChannelBuffer(it.next())); + final DefaultHttpContent chunk = new DefaultHttpContent(Unpooled.wrappedBuffer(it.next())); clientResponse = responseHandler.handleChunk(clientResponse, chunk); } clientResponse = responseHandler.done(clientResponse); @@ -185,7 +190,7 @@ public void alignedMultiStreamTest() throws IOException final InputStream stream = clientResponse.getObj(); final InputStream expectedStream = new ByteArrayInputStream(allBytes); - for(byte[] bytes : BYTE_LIST) { + for (byte[] bytes : BYTE_LIST) { final byte[] expectedBytes = new byte[bytes.length]; final byte[] actualBytes = new byte[expectedBytes.length]; fillBuff(stream, actualBytes); @@ -200,9 +205,12 @@ public void alignedMultiStreamTest() throws IOException public void simpleSingleStreamTest() throws IOException { SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler(); - final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.setChunked(false); - response.setContent(new BigEndianHeapChannelBuffer(allBytes)); + final HttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.OK, + Unpooled.wrappedBuffer(allBytes) + ); + HttpUtil.setTransferEncodingChunked(response, false); ClientResponse clientResponse = responseHandler.handleResponse(response); clientResponse = responseHandler.done(clientResponse);