Skip to content

Commit

Permalink
CAMEL-21300: camel-platform-http - Consumer should have option to con…
Browse files Browse the repository at this point in the history
…trol if writing response failing should cause Exchange to fail
  • Loading branch information
luigidemasi committed Oct 14, 2024
1 parent f394600 commit 0c267b2
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ protected Object createBody() {
}

public PlatformHttpMessage newInstance() {
return new PlatformHttpMessage(this.request, this.response, this.getExchange(), this.binding, this.requestRead);
PlatformHttpMessage answer = new PlatformHttpMessage(this.request, this.response, this.getExchange(), this.binding, this.requestRead);
if (answer.camelContext == null) {
answer.setCamelContext(this.camelContext);
}
return answer;
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.Suspendable;
import org.apache.camel.SuspendableService;
import org.apache.camel.component.platform.http.PlatformHttpEndpoint;
import org.apache.camel.component.platform.http.spi.PlatformHttpConsumer;
import org.apache.camel.http.common.DefaultHttpBinding;
import org.apache.camel.http.common.HttpBinding;
import org.apache.camel.http.common.HttpHelper;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
Expand All @@ -42,7 +41,8 @@ public class SpringBootPlatformHttpConsumer extends DefaultConsumer implements P

private static final Logger LOG = LoggerFactory.getLogger(SpringBootPlatformHttpConsumer.class);

private final DefaultHttpBinding binding;
private HttpBinding binding;
private final boolean handleWriteResponseError;
private Executor executor;

public SpringBootPlatformHttpConsumer(PlatformHttpEndpoint endpoint, Processor processor) {
Expand All @@ -52,6 +52,7 @@ public SpringBootPlatformHttpConsumer(PlatformHttpEndpoint endpoint, Processor p
this.binding.setMuteException(endpoint.isMuteException());
this.binding.setFileNameExtWhitelist(endpoint.getFileNameExtWhitelist());
this.binding.setUseReaderForPayload(!endpoint.isUseStreaming());
this.handleWriteResponseError = endpoint.isHandleWriteResponseError();
this.executor = Executors.newSingleThreadExecutor();
}

Expand All @@ -60,6 +61,13 @@ public SpringBootPlatformHttpConsumer(PlatformHttpEndpoint endpoint, Processor p
this.executor = executor;
}

/**
* Used for testing purposes
*/
void setBinding(HttpBinding binding) {
this.binding = binding;
}

@Override
public PlatformHttpEndpoint getEndpoint() {
return (PlatformHttpEndpoint) super.getEndpoint();
Expand Down Expand Up @@ -122,36 +130,47 @@ protected void handleService(HttpServletRequest request, HttpServletResponse res
} catch (Exception e) {
exchange.setException(e);
} finally {
afterProcess(response, exchange, true);
afterProcess(response, exchange);
}
}

protected void afterProcess(HttpServletResponse response, Exchange exchange, boolean rethrow)
throws IOException, ServletException {
protected void afterProcess(HttpServletResponse response, Exchange exchange) throws Exception {
boolean writeFailure = false;
try {
// now lets output to the res
if (LOG.isTraceEnabled()) {
LOG.trace("Writing res for exchangeId: {}", exchange.getExchangeId());
}
binding.writeResponse(exchange, response);
} catch (IOException e) {
LOG.error("Error processing request", e);
if (rethrow) {
throw e;
} else {
exchange.setException(e);
}
} catch (Exception e) {
LOG.error("Error processing request", e);
if (rethrow) {
throw new ServletException(e);
} else {
exchange.setException(e);
}
writeFailure = true;
handleFailure(exchange, e);
} finally {
doneUoW(exchange);
releaseExchange(exchange, false);
}
try {
if (writeFailure && !response.isCommitted()) {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
} catch (Exception e) {
// ignore
}

}

private void handleFailure(Exchange exchange, Throwable failure) {
getExceptionHandler().handleException(
"Failed writing HTTP response url: " + getEndpoint().getPath() + " due to: " + failure.getMessage(),
failure);
if (handleWriteResponseError) {
Exception existing = exchange.getException();
if (existing != null) {
failure.addSuppressed(existing);
}
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, failure);
exchange.setException(failure);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.camel.component.platform.http.springboot;

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.component.platform.http.PlatformHttpEndpoint;
import org.apache.camel.component.platform.http.spi.PlatformHttpConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@
abstract class PlatformHttpBase {

@Autowired
private TestRestTemplate restTemplate;
TestRestTemplate restTemplate;

@Autowired
CamelContext camelContext;

@Test
public void testGet() {
public void testGet() throws Exception {
waitUntilRouteIsStarted(1, getGetRouteId());

Assertions.assertThat(restTemplate.getForEntity("/myget", String.class).getStatusCodeValue()).isEqualTo(200);
}

@Test
public void testPost() {
public void testPost() throws Exception {
waitUntilRouteIsStarted(1, getPostRouteId());

Assertions.assertThat(restTemplate.postForEntity("/mypost", "test", String.class).getBody()).isEqualTo("TEST");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.platform.http.springboot;

import jakarta.servlet.http.HttpServletResponse;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.platform.http.PlatformHttpEndpoint;
import org.apache.camel.component.platform.http.spi.PlatformHttpConsumer;
import org.apache.camel.component.platform.http.spi.PlatformHttpEngine;
import org.apache.camel.http.common.DefaultHttpBinding;
import org.apache.camel.spring.boot.CamelAutoConfiguration;
import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;

import java.io.IOException;

@EnableAutoConfiguration
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
@CamelSpringBootTest
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = { CamelAutoConfiguration.class,
SpringBootPlatformHttpHandleWriteErrorTest.class, SpringBootPlatformHttpHandleWriteErrorTest.TestConfiguration.class,
PlatformHttpComponentAutoConfiguration.class, SpringBootPlatformHttpAutoConfiguration.class })
public class SpringBootPlatformHttpHandleWriteErrorTest extends PlatformHttpBase {

private static final String postRouteId = "SpringBootPlatformHttpHandleWriteErrorTest_mypost";
private static final String getRouteId = "SpringBootPlatformHttpHandleWriteErrorTest_myget";

@Test
@Override
public void testGet() throws Exception {
MockEndpoint me = camelContext.getEndpoint("mock:failure", MockEndpoint.class);
me.expectedMessageCount(0);

super.testGet();

me.assertIsSatisfied();
}

@Test
@Override
public void testPost() throws Exception {
MockEndpoint me = camelContext.getEndpoint("mock:failure", MockEndpoint.class);
me.expectedMessageCount(1);

waitUntilRouteIsStarted(1, getPostRouteId());
Assertions.assertThat(restTemplate.postForEntity("/mypost", "test", String.class).getStatusCode().value()).isEqualTo(500);

me.assertIsSatisfied();
}

// *************************************
// Config
// *************************************
@Configuration
public static class TestConfiguration {

@Bean(name = "platform-http-engine")
public PlatformHttpEngine myHttpEngine(Environment env) {
int port = Integer.parseInt(env.getProperty("server.port", "8080"));
return new MyEngine(port);
}

@Bean
public RouteBuilder servletPlatformHttpRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
onCompletion().onFailureOnly().to("log:failure").to("mock:failure");

from("platform-http:/myget").id(getRouteId).setBody().constant("get");
from("platform-http:/mypost?handleWriteResponseError=true").id(postRouteId).transform().body(String.class, b -> b.toUpperCase());
}
};
}
}

@Override
protected String getPostRouteId() {
return postRouteId;
}

@Override
protected String getGetRouteId() {
return getRouteId;
}

private static class MyErrorBinding extends DefaultHttpBinding {

@Override
public void writeResponse(Exchange exchange, HttpServletResponse response) throws IOException {
// force an exception during writing response to simulate error at that point
String uri = exchange.getMessage().getHeader(Exchange.HTTP_URI, String.class);
if ("/mypost".equals(uri)) {
throw new IOException("Forced error");
} else {
super.writeResponse(exchange, response);
}
}
}

private static class MyEngine extends SpringBootPlatformHttpEngine {

public MyEngine(int port) {
super(port);
}

@Override
public PlatformHttpConsumer createConsumer(PlatformHttpEndpoint endpoint, Processor processor) {
SpringBootPlatformHttpConsumer answer = new SpringBootPlatformHttpConsumer(endpoint, processor);
answer.setBinding(new MyErrorBinding());
return answer;
}
}
}

0 comments on commit 0c267b2

Please sign in to comment.