Skip to content

Commit

Permalink
SIP2-110: Update edge-sip2 to Vert.x 4.2.5
Browse files Browse the repository at this point in the history
Update Vert.x from 3.9.* to 4.2.5.

Replace Vertx by WebClient for better re-use enabling
HTTP pooling and HTTP pipe-lining.

Use existing methods for random port.

(cherry picked from commit bdd2962)
  • Loading branch information
julianladisch committed Mar 9, 2022
1 parent 0d5d46b commit 610078f
Show file tree
Hide file tree
Showing 24 changed files with 167 additions and 209 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-stack-depchain</artifactId>
<version>3.9.2</version>
<version>4.2.5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -331,7 +331,7 @@
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>module-info.class</exclude>
<exclude>**/module-info.class</exclude>
</excludes>
</filter>
</filters>
Expand Down
80 changes: 39 additions & 41 deletions src/main/java/org/folio/edge/sip2/MainVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.ext.web.client.WebClient;
import java.nio.charset.Charset;
import java.util.EnumMap;
import java.util.HashMap;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class MainVerticle extends AbstractVerticle {
private final Map<Integer, Metrics> metricsMap = new HashMap<>();
private JsonObject multiTenantConfig = new JsonObject();
private ConfigRetriever configRetriever;

/**
* Construct the {@code MainVerticle}.
*/
Expand All @@ -72,23 +73,23 @@ private String getSanitizedConfig() {
JsonPointer.from("/tenantConfigRetrieverOptions/stores/0/config").writeJson(sc, "******");
return sc.encodePrettily();
}

@Override
public void start(Promise<Void> startFuture) {
log.debug("Startup configuration: {}", () -> getSanitizedConfig());

// We need to reduce the complexity of this method...
if (handlers == null) {
String okapiUrl = config().getString("okapiUrl");

final WebClient webClient = WebClient.create(vertx);
final Injector injector = Guice.createInjector(
new FolioResourceProviderModule(okapiUrl, vertx),
new FolioResourceProviderModule(okapiUrl, webClient),
new ApplicationModule());
handlers = new EnumMap<>(Command.class);
handlers.put(CHECKOUT, injector.getInstance(CheckoutHandler.class));
handlers.put(CHECKIN, injector.getInstance(CheckinHandler.class));
handlers.put(SC_STATUS, HandlersFactory.getScStatusHandlerInstance(null, null,
null, null, okapiUrl, vertx));
null, null, okapiUrl, webClient));
handlers.put(REQUEST_ACS_RESEND, HandlersFactory.getACSResendHandler());
handlers.put(LOGIN, injector.getInstance(LoginHandler.class));
handlers.put(PATRON_INFORMATION, injector.getInstance(PatronInformationHandler.class));
Expand All @@ -110,11 +111,11 @@ public void start(Promise<Void> startFuture) {
metricsMap.putIfAbsent(port, metrics);

server.connectHandler(socket -> {

String clientAddress = socket.remoteAddress().host();
JsonObject tenantConfig = TenantUtils.lookupTenantConfigForIPaddress(multiTenantConfig,
JsonObject tenantConfig = TenantUtils.lookupTenantConfigForIPaddress(multiTenantConfig,
clientAddress);

final SessionData sessionData = SessionData.createSession(
tenantConfig.getString("tenant"),
tenantConfig.getString("fieldDelimiter", "|").charAt(0),
Expand All @@ -140,7 +141,7 @@ public void start(Promise<Void> startFuture) {
final Parser parser = Parser.builder()
.delimiter(sessionData.getFieldDelimiter())
.charset(Charset.forName(sessionData.getCharset()))
.errorDetectionEnaled(sessionData.isErrorDetectionEnabled())
.errorDetectionEnabled(sessionData.isErrorDetectionEnabled())
.timezone(sessionData.getTimeZone())
.build();

Expand Down Expand Up @@ -178,28 +179,26 @@ public void start(Promise<Void> startFuture) {

handler
.execute(message.getRequest(), sessionData)
.setHandler(ar -> {
if (ar.succeeded()) {
final String responseMsg;
if (message.getCommand() == REQUEST_ACS_RESEND) {
// we don't want to modify the response
responseMsg = ar.result();
} else {
responseMsg = formatResponse(ar.result(), message, sessionData,
messageDelimiter);
}
handler.writeHistory(sessionData, message, responseMsg);
log.info("Sip response {}", responseMsg);
sample.stop(metrics.commandTimer(message.getCommand()));
socket.write(responseMsg, sessionData.getCharset());
.onSuccess(result -> {
final String responseMsg;
if (message.getCommand() == REQUEST_ACS_RESEND) {
// we don't want to modify the response
responseMsg = result;
} else {
String errorMsg = "Failed to respond to request";
log.error(errorMsg, ar.cause());
sample.stop(metrics.commandTimer(message.getCommand()));
socket.write(ar.cause().getMessage() + messageDelimiter,
sessionData.getCharset());
metrics.responseError();
responseMsg = formatResponse(result, message, sessionData,
messageDelimiter);
}
handler.writeHistory(sessionData, message, responseMsg);
log.info("Sip response {}", responseMsg);
sample.stop(metrics.commandTimer(message.getCommand()));
socket.write(responseMsg, sessionData.getCharset());
}).onFailure(e -> {
String errorMsg = "Failed to respond to request";
log.error(errorMsg, e);
sample.stop(metrics.commandTimer(message.getCommand()));
socket.write(e.getMessage() + messageDelimiter,
sessionData.getCharset());
metrics.responseError();
});
} catch (Exception ex) {
String message = "Problems handling the request: " + ex.getMessage();
Expand Down Expand Up @@ -245,11 +244,11 @@ public void start(Promise<Void> startFuture) {
startFuture.fail(ar.cause());
}
});

configRetriever.listen(change -> {
multiTenantConfig = change.getNewConfiguration();
log.info("Tenant config changed: {}", () -> multiTenantConfig.encodePrettily());
});
});

}

Expand Down Expand Up @@ -279,16 +278,15 @@ private void handleInvalidMessage(
//resends validation if checksum string does not match
ISip2RequestHandler handler = handlers.get(Command.REQUEST_SC_RESEND);
handler.execute(message.getRequest(), sessionData)
.setHandler(ar -> {
if (ar.succeeded()) {
sample.stop(metrics.commandTimer(message.getCommand()));
socket.write(formatResponse(ar.result(), message, sessionData,
messageDelimiter, true), sessionData.getCharset());
} else {
log.error("Failed to send SC resend", ar.cause());
metrics.scResendError();
sample.stop(metrics.commandTimer(message.getCommand()));
}
.onSuccess(result -> {
sample.stop(metrics.commandTimer(message.getCommand()));
socket.write(formatResponse(result, message, sessionData,
messageDelimiter, true), sessionData.getCharset());
})
.onFailure(e -> {
log.error("Failed to send SC resend", e);
metrics.scResendError();
sample.stop(metrics.commandTimer(message.getCommand()));
});
} else {
sample.stop(metrics.commandTimer(message.getCommand()));
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/folio/edge/sip2/handlers/HandlersFactory.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.folio.edge.sip2.handlers;

import freemarker.template.Template;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;
import java.time.Clock;
import java.util.Objects;
import org.folio.edge.sip2.handlers.freemarker.FreemarkerRepository;
Expand Down Expand Up @@ -33,12 +33,12 @@ public static ISip2RequestHandler getScStatusHandlerInstance(
Template freeMarkerTemplate,
Clock clock,
String okapiUrl,
Vertx vertx) {
WebClient webClient) {

Objects.requireNonNull(okapiUrl, "okapiUrl is required");
Objects.requireNonNull(vertx, "vertx is required");
Objects.requireNonNull(webClient, "webClient is required");

resProvider = getResourceProvider(resProvider, okapiUrl, vertx);
resProvider = getResourceProvider(resProvider, okapiUrl, webClient);

if (configRepo == null && clock == null) {
configRepo = new ConfigurationRepository(resProvider, Clock.systemUTC());
Expand All @@ -58,9 +58,9 @@ public static ISip2RequestHandler getInvalidMessageHandler() {

@SuppressWarnings("unchecked")
private static <T> IResourceProvider<T> getResourceProvider(
IResourceProvider<T> resourceProvider, String okapiUrl, Vertx vertx) {
IResourceProvider<T> resourceProvider, String okapiUrl, WebClient webClient) {
if (resourceProvider == null) {
return (IResourceProvider<T>) new FolioResourceProvider(okapiUrl, vertx);
return (IResourceProvider<T>) new FolioResourceProvider(okapiUrl, webClient);
}

return resourceProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;

/**
* Module for creating a {@code FolioResourceProvider} via Dependency injection.
Expand All @@ -12,21 +12,21 @@
*/
public class FolioResourceProviderModule extends AbstractModule {
private final String okapiUrl;
private final Vertx vertx;
private final WebClient webClient;

/**
* Build a module for dependency injection.
* @param okapiUrl the okapi url
* @param vertx the instance of vertx
* @param webClient the instance of WebClient
*/
public FolioResourceProviderModule(String okapiUrl, Vertx vertx) {
public FolioResourceProviderModule(String okapiUrl, WebClient webClient) {
this.okapiUrl = okapiUrl;
this.vertx = vertx;
this.webClient = webClient;
}

@Override
protected void configure() {
bind(String.class).annotatedWith(Names.named("okapiUrl")).toInstance(okapiUrl);
bind(Vertx.class).annotatedWith(Names.named("vertx")).toInstance(vertx);
bind(WebClient.class).annotatedWith(Names.named("webClient")).toInstance(webClient);
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/folio/edge/sip2/parser/Parser.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public ParserBuilder delimiter(Character delimiter) {
return this;
}

public ParserBuilder errorDetectionEnaled(Boolean errorDetectionEnabled) {
public ParserBuilder errorDetectionEnabled(Boolean errorDetectionEnabled) {
this.errorDetectionEnabled = errorDetectionEnabled;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.folio.edge.sip2.repositories;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpRequest;
Expand Down Expand Up @@ -39,14 +37,14 @@ public class FolioResourceProvider implements IResourceProvider<IRequestData> {
/**
* Construct a FOLIO resource provider with the specified parameters.
* @param okapiUrl the URL for okapi
* @param vertx the vertx instance
* @param webClient the WebClient instance
*/
@Inject
public FolioResourceProvider(
@Named("okapiUrl") String okapiUrl,
@Named("vertx") Vertx vertx) {
@Named("webClient") WebClient webClient) {
this.okapiUrl = okapiUrl;
this.client = WebClient.create(vertx);
this.client = webClient;
}

@Override
Expand All @@ -59,8 +57,7 @@ public Future<IResource> retrieveResource(IRequestData requestData) {
setHeaders(requestData.getHeaders(), request,
Objects.requireNonNull(requestData.getSessionData(), "SessionData cannot be null"));

final Future<IResource> future = Future.future();
request
return request
.expect(ResponsePredicate.create(ResponsePredicate.SC_OK, getErrorConverter()))
// Some APIs return application/json, some return with the charset
// parameter (e.g. circulation). So we can't use the built-in JSON
Expand All @@ -69,9 +66,9 @@ public Future<IResource> retrieveResource(IRequestData requestData) {
"application/json",
"application/json; charset=utf-8")))
.as(BodyCodec.jsonObject())
.send(ar -> handleResponse(future, ar));

return future;
.send()
.map(FolioResourceProvider::toIResource)
.onFailure(e -> log.error("Request failed", e));
}

@Override
Expand All @@ -85,8 +82,7 @@ public Future<IResource> createResource(IRequestData requestData) {

setHeaders(requestData.getHeaders(), request, requestData.getSessionData());

final Future<IResource> future = Future.future();
request
return request
.expect(ResponsePredicate.create(ResponsePredicate.SC_SUCCESS, getErrorConverter()))
// Some APIs return application/json, some return with the charset
// parameter (e.g. circulation). So we can't use the built-in JSON
Expand All @@ -95,10 +91,9 @@ public Future<IResource> createResource(IRequestData requestData) {
"application/json",
"application/json; charset=utf-8")))
.as(BodyCodec.jsonObject())
.sendJsonObject(requestData.getBody(),
ar -> handleResponse(future, ar));

return future;
.sendJsonObject(requestData.getBody())
.map(FolioResourceProvider::toIResource)
.onFailure(e -> log.error("Request failed", e));
}

@Override
Expand Down Expand Up @@ -130,18 +125,9 @@ private void setHeaders(
request.putHeader(HEADER_X_OKAPI_TENANT, sessionData.getTenant());
}

private void handleResponse(
Future<IResource> future,
AsyncResult<HttpResponse<JsonObject>> ar) {
if (ar.succeeded()) {
log.debug("FOLIO response body: {}",
() -> ar.result().body().encodePrettily());

future.complete(new FolioResource(ar.result().body(), ar.result().headers()));
} else {
log.error("Request failed", ar.cause());
future.fail(ar.cause());
}
private static IResource toIResource(HttpResponse<JsonObject> httpResponse) {
log.debug("FOLIO response body: {}", () -> httpResponse.body().encodePrettily());
return new FolioResource(httpResponse.body(), httpResponse.headers());
}

private ErrorConverter getErrorConverter() {
Expand Down
Loading

0 comments on commit 610078f

Please sign in to comment.