diff --git a/base/model/src/main/java/org/eclipse/ditto/base/model/auth/DittoAuthorizationContextType.java b/base/model/src/main/java/org/eclipse/ditto/base/model/auth/DittoAuthorizationContextType.java index a8c5258440..4d7de26334 100644 --- a/base/model/src/main/java/org/eclipse/ditto/base/model/auth/DittoAuthorizationContextType.java +++ b/base/model/src/main/java/org/eclipse/ditto/base/model/auth/DittoAuthorizationContextType.java @@ -34,6 +34,13 @@ public final class DittoAuthorizationContextType extends AuthorizationContextTyp public static final DittoAuthorizationContextType PRE_AUTHENTICATED_HTTP = new DittoAuthorizationContextType("pre-authenticated-http"); + /** + * Type indicating that the authorization context was created the pre-authenticated mechanism via CoAP which is + * setting an authenticated subject as header field. + */ + public static final DittoAuthorizationContextType PRE_AUTHENTICATED_COAP = + new DittoAuthorizationContextType("pre-authenticated-coap"); + /** * Type indicating that the authorization context was created using the pre-authenticated mechanism of connections * by having configured the contained auth subjects in a Ditto connection source/target. @@ -62,7 +69,7 @@ private DittoAuthorizationContextType(final String type) { * @return an array containing the Ditto specified authorization context types. */ public static AuthorizationContextType[] values() { - return new AuthorizationContextType[]{ PRE_AUTHENTICATED_HTTP, PRE_AUTHENTICATED_CONNECTION, JWT, UNSPECIFIED }; + return new AuthorizationContextType[]{ PRE_AUTHENTICATED_HTTP, PRE_AUTHENTICATED_COAP, PRE_AUTHENTICATED_CONNECTION, JWT, UNSPECIFIED }; } /** diff --git a/bom/pom.xml b/bom/pom.xml index d7ff2b70b1..658b11ca8c 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -59,6 +59,7 @@ 1.0.4 4.1.86.Final 2.3.0 + 3.8.0 1.7.36 1.2.11 @@ -263,6 +264,17 @@ ${cloudevents.version} + + org.eclipse.californium + californium-core + ${californium.version} + + + org.eclipse.californium + scandium + ${californium.version} + + org.scala-lang diff --git a/gateway/service/pom.xml b/gateway/service/pom.xml index a911c56a33..9fceabf704 100644 --- a/gateway/service/pom.xml +++ b/gateway/service/pom.xml @@ -49,6 +49,15 @@ runtime + + org.eclipse.californium + californium-core + + + org.eclipse.californium + scandium + + org.eclipse.ditto ditto-base-model diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapDeviceInfoSupplier.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapDeviceInfoSupplier.java new file mode 100644 index 0000000000..0a667d3934 --- /dev/null +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapDeviceInfoSupplier.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.gateway.service.coap; + +import java.security.Principal; +import java.util.HashMap; +import java.util.Map; + +import org.eclipse.californium.elements.auth.AdditionalInfo; +import org.eclipse.californium.scandium.auth.ApplicationLevelInfoSupplier; +import org.eclipse.ditto.base.model.auth.AuthorizationContext; +import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; +import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; +import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger; + +/** + * TODO TJ doc + * TODO TJ use in scope of authenticating with PSK / Certificate + */ +final class DittoCoapDeviceInfoSupplier implements ApplicationLevelInfoSupplier { + + private static final ThreadSafeDittoLogger LOGGER = + DittoLoggerFactory.getThreadSafeLogger(DittoCoapDeviceInfoSupplier.class); + + /** + * Creates additional information for authenticated devices. + * + * @param context the {@link AuthorizationContext} of the authenticated device. + * @return additional device information. + */ + public static AdditionalInfo createDeviceInfo(final AuthorizationContext context) { + final Map result = new HashMap<>(); + result.put(DittoHeaderDefinition.AUTHORIZATION_CONTEXT.getKey(), context); + return AdditionalInfo.from(result); + } + + @Override + public AdditionalInfo getInfo(final Principal principal, final Object customArgument) { + if (customArgument instanceof AdditionalInfo additionalInfo) { + final AuthorizationContext authorizationContext = + additionalInfo.get(DittoHeaderDefinition.AUTHORIZATION_CONTEXT.getKey(), AuthorizationContext.class); + LOGGER.info("get AdditionalInfo auth context: {} - for principal: {}", authorizationContext, principal); + return additionalInfo; + } + LOGGER.debug("did not get additional info"); + return AdditionalInfo.empty(); + } +} diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapResourceFacade.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapResourceFacade.java new file mode 100644 index 0000000000..f88d54f310 --- /dev/null +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapResourceFacade.java @@ -0,0 +1,411 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.gateway.service.coap; + +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.BAD_GATEWAY; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.BAD_REQUEST; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.CHANGED; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.CONFLICT; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.CONTENT; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.CREATED; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.DELETED; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.FORBIDDEN; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.GATEWAY_TIMEOUT; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.INTERNAL_SERVER_ERROR; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.METHOD_NOT_ALLOWED; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.NOT_ACCEPTABLE; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.NOT_FOUND; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.NOT_IMPLEMENTED; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.PRECONDITION_FAILED; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.REQUEST_ENTITY_TOO_LARGE; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.SERVICE_UNAVAILABLE; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.UNAUTHORIZED; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.UNSUPPORTED_CONTENT_FORMAT; +import static org.eclipse.californium.core.coap.CoAP.ResponseCode.VALID; +import static org.eclipse.californium.core.coap.MediaTypeRegistry.APPLICATION_JSON; +import static org.eclipse.californium.core.coap.MediaTypeRegistry.TEXT_PLAIN; +import static org.eclipse.californium.core.coap.MediaTypeRegistry.UNDEFINED; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +import org.eclipse.californium.core.CoapResource; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.MediaTypeRegistry; +import org.eclipse.californium.core.coap.Request; +import org.eclipse.californium.core.coap.Response; +import org.eclipse.californium.core.coap.Token; +import org.eclipse.californium.core.observe.ObserveNotificationOrderer; +import org.eclipse.californium.core.server.resources.CoapExchange; +import org.eclipse.californium.core.server.resources.Resource; +import org.eclipse.californium.elements.EndpointContext; +import org.eclipse.californium.elements.auth.ExtensiblePrincipal; +import org.eclipse.ditto.base.model.auth.AuthorizationContext; +import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; +import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; +import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.http.javadsl.model.HttpHeader; +import akka.http.javadsl.model.HttpMethod; +import akka.http.javadsl.model.HttpMethods; +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.HttpResponse; +import akka.http.javadsl.model.MediaTypes; +import akka.http.javadsl.model.ResponseEntity; +import akka.http.javadsl.model.StatusCode; +import akka.http.javadsl.server.Route; +import akka.http.javadsl.unmarshalling.sse.EventStreamUnmarshalling; +import akka.stream.KillSwitch; +import akka.stream.KillSwitches; +import akka.stream.UniqueKillSwitch; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.util.ByteString; + +final class DittoCoapResourceFacade extends CoapResource { + + private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(DittoCoapResourceFacade.class); + + private final ActorSystem actorSystem; + private final Flow httpFlow; + private final ObserveNotificationOrderer notificationOrderer; + private final Map activeObserveSessions; + + DittoCoapResourceFacade(final String name, final ActorSystem actorSystem, final Route rootRoute) { + + super(name, true); + this.actorSystem = actorSystem; + httpFlow = rootRoute.flow(actorSystem); + + getAttributes().setTitle("Ditto CoAP RootResource"); + getAttributes().addContentType(APPLICATION_JSON); + + setObservable(true); + setObserveType(CoAP.Type.CON); // TODO TJ which observeType to use? do I need 2 resources for 2 observeTypes? + + this.notificationOrderer = new ObserveNotificationOrderer(); + activeObserveSessions = new ConcurrentHashMap<>(); // TODO TJ concurrent or not? + } + + /** + * Gets an authenticated device's {@link AuthorizationContext} for a CoAP request. + * TODO TJ use + * + * @param exchange The CoAP exchange with AuthorizationContext of the authenticated device. + * @return The AuthorizationContext or empty optional if the request has not been authenticated. + */ + private static Optional getAuthorizationContext(final CoapExchange exchange) { + + return Optional.ofNullable(exchange.advanced().getRequest().getSourceContext()) + .map(EndpointContext::getPeerIdentity) + .filter(ExtensiblePrincipal.class::isInstance) + .map(ExtensiblePrincipal.class::cast) + .map(ExtensiblePrincipal::getExtendedInfo) + .map(info -> + info.get(DittoHeaderDefinition.AUTHORIZATION_CONTEXT.getKey(), AuthorizationContext.class)); + } + + @Override + public Executor getExecutor() { + return super.getExecutor(); // TODO TJ configure an Akka dispatcher as executor? + } + + @Override + public Resource getChild(final String name) { + // always return 'this' resource for all children in order to handle all requests with this resource + return this; + } + + /** + * Performs actions to perform when shutting down this single root resource. + */ + public void shutdown() { + activeObserveSessions.values() + .forEach(activeObserve -> activeObserve.getKillSwitch().shutdown()); + } + + @Override + public void handleGET(final CoapExchange exchange) { + if (exchange.getRequestOptions().hasObserve()) { + handleCoapObserve(exchange); + } else { + handleCoapRequest(exchange, HttpMethods.GET); + } + } + + private void handleCoapObserve(final CoapExchange exchange) { + final Token token = exchange.advanced().getRequest().getToken(); + if (exchange.advanced().getRequest().isObserve()) { + if (activeObserveSessions.containsKey(token)) { + // observe session is already active, no need to re-subscribe .. + LOGGER.withCorrelationId(token.getAsString()) + .info("Observe for token <{}> is still active, updating..", token); + final ActiveObserve updatedObserve = activeObserveSessions.get(token).withCoapExchange(exchange); + activeObserveSessions.put(token, updatedObserve); + } else { + handleCoapObserveRequest(exchange) + .thenAccept(uniqueKillSwitch -> { + if (null != uniqueKillSwitch) { + activeObserveSessions.put(token, + new ActiveObserve(token, exchange, uniqueKillSwitch) + ); + } + }); + } + // TODO TJ if observe on message path, respond with no content instead of handling "normal": + handleCoapRequest(exchange, HttpMethods.GET); + } else if (exchange.advanced().getRequest().isObserveCancel()) { + if (activeObserveSessions.containsKey(token)) { + // cancel observe + LOGGER.withCorrelationId(token.getAsString()) + .info("Unobserving for token <{}>", token); + activeObserveSessions.remove(token) + .getKillSwitch() + .shutdown(); + } + exchange.respond(VALID); // TODO TJ find out what to send back to an observe cancel .. + } + } + + @Override + public void handlePOST(final CoapExchange exchange) { + handleCoapRequest(exchange, HttpMethods.POST); + } + + @Override + public void handlePUT(final CoapExchange exchange) { + handleCoapRequest(exchange, HttpMethods.PUT); + } + + @Override + public void handleDELETE(final CoapExchange exchange) { + handleCoapRequest(exchange, HttpMethods.DELETE); + } + + @Override + public void handlePATCH(final CoapExchange exchange) { + handleCoapRequest(exchange, HttpMethods.PATCH); + } + + @Override + public void handleIPATCH(final CoapExchange exchange) { + handleCoapRequest(exchange, HttpMethods.PATCH); + } + + @Override + public int getNotificationSequenceNumber() { + return notificationOrderer.getCurrent(); + } + + private void handleCoapRequest(final CoapExchange exchange, final HttpMethod httpMethod) { + final String coapToken = exchange.advanced().getRequest().getTokenString(); + if (LOGGER.isInfoEnabled()) { + LOGGER.withCorrelationId(coapToken) + .info("Handling CoAP <{}> request with MID <{}> and token <{}>: <{}>", + httpMethod.name(), + exchange.advanced().getRequest().getMID(), + exchange.advanced().getRequest().getTokenString(), + exchange.getRequestOptions().getUriString()); + } + + final Request request = exchange.advanced().getRequest(); + final int accept; + if (request.getOptions().getAccept() == UNDEFINED) { + accept = APPLICATION_JSON; + } else { + accept = request.getOptions().getAccept(); + } + + if (accept == APPLICATION_JSON) { + Source.single(exchange) + .map(coapExchange -> translateCoapRequestToHttpRequest(coapExchange, httpMethod)) + .via(httpFlow) + .mapAsync(1, httpResponse -> + translateHttpResponseToCoapResponse(actorSystem, httpResponse, accept)) + .runWith(Sink.foreach(exchange::respond), actorSystem); + } else { + final String ct = MediaTypeRegistry.toString(accept); + exchange.respond(NOT_ACCEPTABLE, "Type \"" + ct + "\" is not supported for this resource!", TEXT_PLAIN); + } + } + + private CompletionStage handleCoapObserveRequest(final CoapExchange exchange) { + final Token token = exchange.advanced().getRequest().getToken(); + if (LOGGER.isInfoEnabled()) { + LOGGER.withCorrelationId(token.getAsString()) + .info("Handling CoAP observe request <{}> with MID <{}> and token <{}>: <{}>", + exchange.getRequestOptions().getObserve(), + exchange.advanced().getRequest().getMID(), + token.getAsString(), + exchange.getRequestOptions().getUriString()); + } + + return Source.single(exchange) + .map(coapExchange -> HttpRequest.create() + .withMethod(HttpMethods.GET) + .withUri(coapExchange.getRequestOptions().getUriString()) + .withHeaders(List.of( + HttpHeader.parse(DittoHeaderDefinition.ACCEPT.getKey(), "text/event-stream"), + HttpHeader.parse(DittoHeaderDefinition.CORRELATION_ID.getKey(), token.getAsString()), + HttpHeader.parse("ditto-coap-proxy", "true") + )) + .withEntity(coapExchange.getRequestPayload())) + .via(httpFlow) + .mapAsync(1, httpResponse -> { + final ResponseEntity entity = httpResponse.entity(); + if (entity.getContentType().mediaType().equals(MediaTypes.TEXT_EVENT_STREAM)) { + // SSE opened + return EventStreamUnmarshalling.fromEventsStream(actorSystem) + .unmarshal(entity, actorSystem) + .thenApply(source -> source + .viaMat(KillSwitches.single(), Keep.right()) + .toMat(Sink.foreach(sse -> { + notificationOrderer.getNextObserveNumber(); + Optional.ofNullable(activeObserveSessions.get(token)) + .map(ActiveObserve::getCoapExchange) + .orElse(exchange) + .respond(CONTENT, sse.getData(), APPLICATION_JSON); + }), Keep.left()) + .run(actorSystem) + ); + } else { + return CompletableFuture.completedStage(null); + } + }) + .runWith(Sink.head(), actorSystem); + } + + private static CompletionStage translateHttpResponseToCoapResponse( + final ActorSystem actorSystem, + final HttpResponse httpResponse, + final int accept) { + + final CoAP.ResponseCode responseCode = translateHttpStatusCodeToCoapResponseCode(httpResponse.status()); + final Response response = new Response(responseCode); + response.getOptions().setContentFormat(accept); + + return httpResponse.entity().getDataBytes() + .fold(ByteString.emptyByteString(), ByteString::concat) + .map(ByteString::utf8String) + .runWith(Sink.head(), actorSystem) + .thenApply(str -> (Response) response.setPayload(str)); + } + + private static CoAP.ResponseCode translateHttpStatusCodeToCoapResponseCode(final StatusCode statusCode) { + return switch (statusCode.intValue()) { + case 200: + yield CONTENT; + case 201: + yield CREATED; + case 202: + yield DELETED; + case 204: + yield CHANGED; + case 304: + yield VALID; + case 400: + yield BAD_REQUEST; + case 401: + yield UNAUTHORIZED; + case 403: + yield FORBIDDEN; + case 404: + yield NOT_FOUND; + case 405: + yield METHOD_NOT_ALLOWED; + case 406: + yield NOT_ACCEPTABLE; + case 409: + yield CONFLICT; + case 412: + yield PRECONDITION_FAILED; + case 413: + yield REQUEST_ENTITY_TOO_LARGE; + case 415: + yield UNSUPPORTED_CONTENT_FORMAT; + case 500: + yield INTERNAL_SERVER_ERROR; + case 501: + yield NOT_IMPLEMENTED; + case 502: + yield BAD_GATEWAY; + case 503: + yield SERVICE_UNAVAILABLE; + case 504: + yield GATEWAY_TIMEOUT; + default: + yield NOT_IMPLEMENTED; + }; + } + + private static HttpRequest translateCoapRequestToHttpRequest( + final CoapExchange coapExchange, + final HttpMethod httpMethod) { + + final String coapToken = coapExchange.advanced().getRequest().getTokenString(); + return HttpRequest.create() + .withMethod(httpMethod) + .withUri(coapExchange.getRequestOptions().getUriString()) + .withHeaders(List.of( + HttpHeader.parse(DittoHeaderDefinition.CORRELATION_ID.getKey(), coapToken), + HttpHeader.parse("ditto-coap-proxy", "true") + )) // TODO TJ map headers like If-Match, etc .. which CoAP supports + .withEntity(coapExchange.getRequestPayload()); + } + + private static final class ActiveObserve { + private final Token token; + private final CoapExchange coapExchange; + private final Instant lastObserveTimestamp; + private final KillSwitch killSwitch; + + private ActiveObserve(final Token token, final CoapExchange coapExchange, final KillSwitch killSwitch) { + this.token = token; + this.coapExchange = coapExchange; + lastObserveTimestamp = Instant.now(); + // TODO TJ cancel the observation after lastObserveTimestamp + "maxAge" seconds (+5?) + this.killSwitch = killSwitch; + } + + ActiveObserve withCoapExchange(final CoapExchange coapExchange) { + return new ActiveObserve(token, coapExchange, killSwitch); + } + + Token getToken() { + return token; + } + + CoapExchange getCoapExchange() { + return coapExchange; + } + + Instant getLastObserveTimestamp() { + return lastObserveTimestamp; + } + + KillSwitch getKillSwitch() { + return killSwitch; + } + } + +} diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapServer.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapServer.java new file mode 100644 index 0000000000..e10d16eaf9 --- /dev/null +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapServer.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.gateway.service.coap; + +import org.eclipse.californium.core.CoapServer; +import org.eclipse.californium.elements.config.Configuration; + +import akka.actor.ActorSystem; +import akka.http.javadsl.server.Route; + +final class DittoCoapServer extends CoapServer { + + private final DittoCoapResourceFacade dittoCoapResourceFacade; + + DittoCoapServer(final ActorSystem actorSystem, + final Route rootRoute, + final Configuration config, + final int... ports) { + + super(config, ports); + dittoCoapResourceFacade = new DittoCoapResourceFacade("api", actorSystem, rootRoute); + getRoot().add(dittoCoapResourceFacade); + } + + @Override + public synchronized void destroy() { + dittoCoapResourceFacade.shutdown(); + super.destroy(); + } + +} diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapServerActor.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapServerActor.java new file mode 100644 index 0000000000..be24e0c748 --- /dev/null +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/coap/DittoCoapServerActor.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.gateway.service.coap; + +import org.eclipse.californium.core.config.CoapConfig; +import org.eclipse.californium.elements.config.Configuration; +import org.eclipse.californium.elements.config.UdpConfig; +import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; +import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; + +import akka.actor.AbstractActor; +import akka.actor.Props; +import akka.http.javadsl.server.Route; +import akka.japi.pf.ReceiveBuilder; + +/** + * Wraps the {@link DittoCoapServer} created via Eclipse Californium. + */ +public final class DittoCoapServerActor extends AbstractActor { + + /** + * The name of this Actor. + */ + public static final String ACTOR_NAME = "coapServerActor"; + + private final ThreadSafeDittoLoggingAdapter logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this); + + private final DittoCoapServer coapServer; + + @SuppressWarnings("unused") + private DittoCoapServerActor(final Route rootRoute) { + Configuration.createStandardWithoutFile(); + coapServer = new DittoCoapServer(getContext().getSystem(), + rootRoute, + Configuration.getStandard()); // TODO TJ provide Coap Configuration via HOCON conf.. + } + + /** + * Creates props for {@code DittoCoapServerActor}. + * + * @param rootRoute the root Route for delegating CoAP requests to. + * @return the props. + */ + public static Props props(final Route rootRoute) { + return Props.create(DittoCoapServerActor.class, rootRoute); + } + + @Override + public Receive createReceive() { + return ReceiveBuilder.create() + .matchAny(m -> logger.warning("Received unknown message: <{}>", m)) + .build(); + } + + @Override + public void preStart() { + CoapConfig.register(); + UdpConfig.register(); + + logger.info("Starting CoAP server .."); + coapServer.start(); + } + + @Override + public void postStop() throws Exception { + super.postStop(); + coapServer.destroy(); + } +} diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestResultLoggingDirective.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestResultLoggingDirective.java index d6fdcd3811..6cc18bb309 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestResultLoggingDirective.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestResultLoggingDirective.java @@ -26,6 +26,7 @@ import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger; +import akka.http.javadsl.model.HttpHeader; import akka.http.javadsl.server.Complete; import akka.http.javadsl.server.Route; @@ -65,8 +66,12 @@ public static Route logRequestResult(final CharSequence correlationId, final Sup final ThreadSafeDittoLogger logger = LOGGER.withCorrelationId(correlationId); if (routeResult instanceof Complete complete) { final int statusCode = complete.getResponse().status().intValue(); - logger.info("StatusCode of request {} '{}' was: {}", requestMethod, filteredRelativeRequestUri, - statusCode); + final boolean isCoapRequest = request.getHeader("ditto-coap-proxy") + .map(HttpHeader::value) + .map(Boolean::parseBoolean) + .orElse(false); + logger.info("StatusCode of <{}> request <{}> '{}' was: {}", isCoapRequest ? "CoAP" : "HTTP", + requestMethod, filteredRelativeRequestUri, statusCode); if (logger.isDebugEnabled()) { final String filteredRawRequestUri = filterRawUri(HttpUtils.getRawRequestUri(request)); logger.debug("Raw request URI was: {}", filteredRawRequestUri); diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java index c5a3f400ed..f81168c270 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/RequestTracingDirective.java @@ -220,6 +220,11 @@ private static void addRequestResponseTags( final HttpResponse httpResponse, @Nullable final CharSequence correlationId ) { + final boolean isCoapRequest = httpRequest.getHeader("ditto-coap-proxy") + .map(HttpHeader::value) + .map(Boolean::parseBoolean) + .orElse(false); + startedSpan.tag(SpanTagKey.REQUEST_PROTOCOL.getTagForValue(isCoapRequest ? "CoAP" : "HTTP")); startedSpan.tag(SpanTagKey.REQUEST_METHOD_NAME.getTagForValue(getRequestMethodName(httpRequest))); @Nullable final var relativeRequestUri = tryToGetRelativeRequestUri(httpRequest, correlationId); if (null != relativeRequestUri) { diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java index 72b676c96f..12a1f18631 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java @@ -14,13 +14,21 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import javax.annotation.Nullable; + +import org.eclipse.ditto.base.model.auth.AuthorizationContext; +import org.eclipse.ditto.base.model.auth.AuthorizationSubject; +import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType; +import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.gateway.service.security.authentication.AuthenticationChain; import org.eclipse.ditto.gateway.service.security.authentication.AuthenticationFailureAggregator; import org.eclipse.ditto.gateway.service.security.authentication.AuthenticationFailureAggregators; import org.eclipse.ditto.gateway.service.security.authentication.AuthenticationProvider; import org.eclipse.ditto.gateway.service.security.authentication.AuthenticationResult; +import org.eclipse.ditto.gateway.service.security.authentication.DefaultAuthenticationResult; import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtAuthenticationFactory; import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtAuthenticationProvider; import org.eclipse.ditto.gateway.service.security.authentication.preauth.PreAuthenticatedAuthenticationProvider; @@ -30,10 +38,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.mongodb.lang.Nullable; import com.typesafe.config.Config; import akka.actor.ActorSystem; +import akka.http.javadsl.server.RequestContext; /** * Ditto's default factory for building authentication directives. @@ -47,6 +55,7 @@ public final class DittoGatewayAuthenticationDirectiveFactory implements Gateway private final Executor authenticationDispatcher; @Nullable private GatewayAuthenticationDirective gatewayHttpAuthenticationDirective; @Nullable private GatewayAuthenticationDirective gatewayWsAuthenticationDirective; + @Nullable private GatewayAuthenticationDirective gatewayCoapAuthenticationDirective; public DittoGatewayAuthenticationDirectiveFactory(final ActorSystem actorSystem, final Config config) { authConfig = DittoGatewayConfig.of(DefaultScopedConfig.dittoScoped(actorSystem.settings().config())) @@ -89,9 +98,39 @@ public GatewayAuthenticationDirective buildWsAuthentication( return gatewayWsAuthenticationDirective; } + @Override + public GatewayAuthenticationDirective buildCoapAuthentication() { + + if (null == gatewayCoapAuthenticationDirective) { + final AuthenticationProvider coapAuthenticationProvider = new AuthenticationProvider<>() { + @Override + public boolean isApplicable(final RequestContext requestContext) { + return true; // TODO TJ extract CoAP AuthenticationProvider + } + + @Override + public CompletableFuture authenticate(final RequestContext requestContext, + final DittoHeaders dittoHeaders) { + return CompletableFuture.completedFuture( + DefaultAuthenticationResult.successful(dittoHeaders, + AuthorizationContext.newInstance( + DittoAuthorizationContextType.PRE_AUTHENTICATED_COAP, + AuthorizationSubject.newInstance("coap:some-device-id") // TODO TJ determine from coap auth + ) + ) + ); + } + }; + gatewayCoapAuthenticationDirective = + generateGatewayAuthenticationDirective(authConfig, coapAuthenticationProvider, + authenticationDispatcher); + } + return gatewayCoapAuthenticationDirective; + } + private static GatewayAuthenticationDirective generateGatewayAuthenticationDirective( final AuthenticationConfig authConfig, - final AuthenticationProvider jwtAuthenticationProvider, + @Nullable final AuthenticationProvider authenticationProvider, final Executor authenticationDispatcher) { final Collection> authenticationProviders = new ArrayList<>(); @@ -100,7 +139,9 @@ private static GatewayAuthenticationDirective generateGatewayAuthenticationDirec authenticationProviders.add(PreAuthenticatedAuthenticationProvider.getInstance()); } - authenticationProviders.add(jwtAuthenticationProvider); + if (null != authenticationProvider) { + authenticationProviders.add(authenticationProvider); + } final AuthenticationFailureAggregator authenticationFailureAggregator = AuthenticationFailureAggregators.getDefault(); diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/GatewayAuthenticationDirectiveFactory.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/GatewayAuthenticationDirectiveFactory.java index 372bf2f12c..9c66b8898e 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/GatewayAuthenticationDirectiveFactory.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/GatewayAuthenticationDirectiveFactory.java @@ -15,8 +15,8 @@ import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtAuthenticationFactory; -import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint; import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds; +import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint; import com.typesafe.config.Config; @@ -41,6 +41,13 @@ public interface GatewayAuthenticationDirectiveFactory extends DittoExtensionPoi */ GatewayAuthenticationDirective buildWsAuthentication(JwtAuthenticationFactory jwtAuthenticationFactory); + /** + * Builds the {@link GatewayAuthenticationDirective authentication directive} that should be used for CoAP API. + * + * @return The built {@link GatewayAuthenticationDirective authentication directive}. + */ + GatewayAuthenticationDirective buildCoapAuthentication(); + /** * Loads the implementation of {@code GatewayAuthenticationDirectiveFactory} which is configured for the * {@code ActorSystem}. diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/AbstractRoute.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/AbstractRoute.java index 43c2f5bcf8..4891cca9bd 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/AbstractRoute.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/AbstractRoute.java @@ -352,7 +352,11 @@ protected Route withCustomRequestTimeout(@Nullable final Duration optionalTimeou customRequestTimeout = checkTimeoutFunction.apply(optionalTimeout); } - return increaseHttpRequestTimeout(inner, customRequestTimeout); + if (routeBaseProperties.isCoapRoute()) { + return inner.apply(customRequestTimeout); + } else { + return increaseHttpRequestTimeout(inner, customRequestTimeout); + } } private CompletionStage toStrict(final HttpResponse response) { diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/RootRoute.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/RootRoute.java index dc8a5c4e7d..3421d44cc4 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/RootRoute.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/RootRoute.java @@ -20,6 +20,7 @@ import java.util.UUID; import java.util.concurrent.CompletionStage; import java.util.function.Function; +import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -82,28 +83,28 @@ public final class RootRoute extends AllDirectives { static final String HTTP_PATH_API_PREFIX = "api"; static final String WS_PATH_PREFIX = "ws"; - private final StatusRoute ownStatusRoute; - private final OverallStatusRoute overallStatusRoute; - private final CachingHealthRoute cachingHealthRoute; - private final DevOpsRoute devopsRoute; - - private final PoliciesRoute policiesRoute; - private final SseRouteBuilder sseThingsRouteBuilder; - private final ThingsRoute thingsRoute; - private final ThingSearchRoute thingSearchRoute; - private final ConnectionsRoute connectionsRoute; - private final WebSocketRouteBuilder websocketRouteBuilder; - private final StatsRoute statsRoute; - private final WhoamiRoute whoamiRoute; - private final CloudEventsRoute cloudEventsRoute; + @Nullable private final StatusRoute ownStatusRoute; + @Nullable private final OverallStatusRoute overallStatusRoute; + @Nullable private final CachingHealthRoute cachingHealthRoute; + @Nullable private final DevOpsRoute devopsRoute; + + @Nullable private final PoliciesRoute policiesRoute; + @Nullable private final SseRouteBuilder sseThingsRouteBuilder; + @Nullable private final ThingsRoute thingsRoute; + @Nullable private final ThingSearchRoute thingSearchRoute; + @Nullable private final ConnectionsRoute connectionsRoute; + @Nullable private final WebSocketRouteBuilder websocketRouteBuilder; + @Nullable private final StatsRoute statsRoute; + @Nullable private final WhoamiRoute whoamiRoute; + @Nullable private final CloudEventsRoute cloudEventsRoute; private final CustomApiRoutesProvider customApiRoutesProvider; private final RouteBaseProperties routeBaseProperties; private final GatewayAuthenticationDirective apiAuthenticationDirective; private final GatewayAuthenticationDirective wsAuthenticationDirective; - private final CorsEnablingDirective corsDirective; - private final HttpsEnsuringDirective httpsDirective; - private final RequestTimeoutHandlingDirective requestTimeoutHandlingDirective; + @Nullable private final CorsEnablingDirective corsDirective; + @Nullable private final HttpsEnsuringDirective httpsDirective; + @Nullable private final RequestTimeoutHandlingDirective requestTimeoutHandlingDirective; private final ExceptionHandler exceptionHandler; private final Map supportedSchemaVersions; private final ProtocolAdapterProvider protocolAdapterProvider; @@ -114,6 +115,7 @@ public final class RootRoute extends AllDirectives { private RootRoute(final Builder builder) { final HttpConfig httpConfig = builder.httpConfig; + final boolean isCoapRoute = builder.isCoapRoute; ownStatusRoute = builder.statusRoute; overallStatusRoute = builder.overallStatusRoute; cachingHealthRoute = builder.cachingHealthRoute; @@ -131,9 +133,9 @@ private RootRoute(final Builder builder) { routeBaseProperties = builder.routeBaseProperties; apiAuthenticationDirective = builder.httpAuthenticationDirective; wsAuthenticationDirective = builder.wsAuthenticationDirective; - requestTimeoutHandlingDirective = RequestTimeoutHandlingDirective.getInstance(httpConfig); - httpsDirective = HttpsEnsuringDirective.getInstance(httpConfig); - corsDirective = CorsEnablingDirective.getInstance(httpConfig); + requestTimeoutHandlingDirective = isCoapRoute ? null : RequestTimeoutHandlingDirective.getInstance(httpConfig); + httpsDirective = isCoapRoute ? null : HttpsEnsuringDirective.getInstance(httpConfig); + corsDirective = isCoapRoute ? null : CorsEnablingDirective.getInstance(httpConfig); supportedSchemaVersions = new HashMap<>(builder.supportedSchemaVersions); protocolAdapterProvider = builder.protocolAdapterProvider; dreToHttpResponse = DittoRuntimeExceptionToHttpResponse.getInstance(builder.headerTranslator); @@ -148,8 +150,8 @@ private RootRoute(final Builder builder) { builder.dittoHeadersValidator); } - public static RootRouteBuilder getBuilder(final HttpConfig httpConfig) { - return new Builder(httpConfig) + public static RootRouteBuilder getBuilder(final HttpConfig httpConfig, final boolean isCoapRoute) { + return new Builder(httpConfig, isCoapRoute) .customHeadersHandler(NoopCustomHeadersHandler.getInstance()) .rejectionHandler(DittoRejectionHandlerFactory.createInstance()); } @@ -164,14 +166,14 @@ private Route buildRoute() { parameterMap(queryParameters -> extractRequestContext(ctx -> concat( - statsRoute.buildStatsRoute(correlationId), // /stats - cachingHealthRoute.buildHealthRoute(), // /health - connections(ctx, correlationId, queryParameters), // /api/2/connections + null != statsRoute ? statsRoute.buildStatsRoute(correlationId) : reject(), // /stats + null != cachingHealthRoute ? cachingHealthRoute.buildHealthRoute() : reject(), // /health + null != connectionsRoute ? connections(ctx, correlationId, queryParameters) : reject(), // /api/2/connections api(ctx, correlationId, queryParameters), // /api - ws(ctx, correlationId, queryParameters), // /ws - ownStatusRoute.buildStatusRoute(), // /status - overallStatusRoute.buildOverallStatusRoute(), // /overall - devopsRoute.buildDevOpsRoute(ctx, queryParameters) // /devops + null != websocketRouteBuilder ? ws(ctx, correlationId, queryParameters) : reject(), // /ws + null != ownStatusRoute ? ownStatusRoute.buildStatusRoute() : reject(), // /status + null != overallStatusRoute ? overallStatusRoute.buildOverallStatusRoute() : reject(), // /overall + null != devopsRoute ? devopsRoute.buildDevOpsRoute(ctx, queryParameters) : reject() // /devops ) ) ) @@ -185,37 +187,43 @@ private Route wrapWithRootDirectives(final Function rootRoute) { (which normally should not occur */ handleExceptions(exceptionHandler, () -> CorrelationIdEnsuringDirective.ensureCorrelationId( - correlationId -> requestTimeoutHandlingDirective - .handleRequestTimeout(correlationId, () -> - RequestTracingDirective.traceRequest( - () -> RequestResultLoggingDirective.logRequestResult( - correlationId, - () -> innerRouteProvider.apply(correlationId) - ), - correlationId - ) - ) + correlationId -> { + final Supplier inner = () -> + RequestTracingDirective.traceRequest( + () -> RequestResultLoggingDirective.logRequestResult( + correlationId, + () -> innerRouteProvider.apply(correlationId) + ), + correlationId + ); + return null != requestTimeoutHandlingDirective ? + requestTimeoutHandlingDirective.handleRequestTimeout(correlationId, inner) : + inner.get(); + } ) ); final Function innerRouteProvider = correlationId -> EncodingEnsuringDirective.ensureEncoding(() -> - httpsDirective.ensureHttps(correlationId, () -> - corsDirective.enableCors(() -> - /* handling the rejections is done by akka automatically, but if we - do it here explicitly, we are able to log the status code for the - rejection (e.g. 404 or 405) in a wrapping directive. */ - handleRejections(rejectionHandler, () -> + { + /* handling the rejections is done by akka automatically, but if we + do it here explicitly, we are able to log the status code for the + rejection (e.g. 404 or 405) in a wrapping directive. */ + final RouteAdapter innerAdapter = handleRejections(rejectionHandler, () -> /* the inner handleExceptions is for handling exceptions occurring in the route route. It makes sure that the wrapping directives such as addSecurityResponseHeaders are even called in an error case in the route route. */ - handleExceptions(exceptionHandler, () -> - rootRoute.apply(correlationId) - ) - ) - ) - ) + handleExceptions(exceptionHandler, () -> + rootRoute.apply(correlationId) + ) + ); + return null != httpsDirective ? httpsDirective.ensureHttps(correlationId, () -> + null != corsDirective ? corsDirective.enableCors(() -> + innerAdapter + ) : innerAdapter + ) : innerAdapter; + } ); return outerRouteProvider.apply(innerRouteProvider); } @@ -242,8 +250,8 @@ private Route connections(final RequestContext ctx, final String correlationId, .withRequestContext(ctx) .withQueryParameters(queryParameters) .build(CustomHeadersHandler.RequestType.API), - dittoHeaders -> connectionsRoute.buildConnectionsRoute(ctx, - dittoHeaders) + dittoHeaders -> null != connectionsRoute ? + connectionsRoute.buildConnectionsRoute(ctx, dittoHeaders) : reject() ).seal() // sealing here is important as we don't want to fall back to other routes if devops auth failed ) @@ -313,25 +321,25 @@ private Route buildApiSubRoutes(final RequestContext ctx, final DittoHeaders dit return concat( // /api/{apiVersion}/policies - policiesRoute.buildPoliciesRoute(ctx, dittoHeaders, authenticationResult), + null != policiesRoute ? policiesRoute.buildPoliciesRoute(ctx, dittoHeaders, authenticationResult) : reject(), // /api/{apiVersion}/things SSE support - buildSseThingsRoute(ctx, dittoHeaders), + null != sseThingsRouteBuilder ? buildSseThingsRoute(ctx, dittoHeaders) : reject(), // /api/{apiVersion}/things - thingsRoute.buildThingsRoute(ctx, dittoHeaders), + null != thingsRoute ? thingsRoute.buildThingsRoute(ctx, dittoHeaders) : reject(), // /api/{apiVersion}/search/things - thingSearchRoute.buildSearchRoute(ctx, dittoHeaders), + null != thingSearchRoute ? thingSearchRoute.buildSearchRoute(ctx, dittoHeaders) : reject(), // /api/{apiVersion}/whoami - whoamiRoute.buildWhoamiRoute(ctx, dittoHeaders), + null != whoamiRoute ? whoamiRoute.buildWhoamiRoute(ctx, dittoHeaders) : reject(), // /api/{apiVersion}/cloudevents - cloudEventsRoute.buildCloudEventsRoute(ctx, dittoHeaders) + null != cloudEventsRoute ? cloudEventsRoute.buildCloudEventsRoute(ctx, dittoHeaders) : reject() ).orElse(customApiSubRoutes); } private Route buildSseThingsRoute(final RequestContext ctx, final DittoHeaders dittoHeaders) { return handleExceptions(exceptionHandler, - () -> sseThingsRouteBuilder.build(ctx, - () -> overwriteDittoHeadersForSse(ctx, dittoHeaders))); + () -> null != sseThingsRouteBuilder ? sseThingsRouteBuilder.build(ctx, + () -> overwriteDittoHeadersForSse(ctx, dittoHeaders)) : reject()); } private CompletionStage overwriteDittoHeadersForSse(final RequestContext ctx, @@ -364,8 +372,9 @@ private Route ws(final RequestContext ctx, final CharSequence correlationId, @Nullable final String userAgent = getUserAgentOrNull(ctx); final ProtocolAdapter chosenProtocolAdapter = protocolAdapterProvider.getProtocolAdapter(userAgent); - return websocketRouteBuilder.build(wsVersion, correlationId, dittoHeaders, - chosenProtocolAdapter, ctx); + return null != websocketRouteBuilder ? + websocketRouteBuilder.build(wsVersion, correlationId, dittoHeaders, + chosenProtocolAdapter, ctx) : reject(); }); } ) @@ -427,6 +436,7 @@ private static Function new IllegalStateException( "Expected correlation-id in SSE DittoHeaders: " + dittoHeaders)); @@ -412,7 +418,7 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage { final SupervisedStream.WithQueue withQueue = pair.first(); final KillSwitch killSwitch = pair.second(); + + final boolean isCoapRequest = Optional.ofNullable(dittoHeaders.get("ditto-coap-proxy")) + .map(Boolean::parseBoolean) + .orElse(false); + final String streamingType = isCoapRequest ? STREAMING_TYPE_COAP_OBSERVE : STREAMING_TYPE_SSE; + final String connectionCorrelationId = dittoHeaders.getCorrelationId() .orElseThrow(() -> new IllegalStateException( "Expected correlation-id in SSE DittoHeaders: " + @@ -474,7 +486,7 @@ private Route createMessagesSseRoute(final RequestContext ctx, final var authorizationContext = dittoHeaders.getAuthorizationContext(); final var connect = new Connect(withQueue.getSourceQueue(), connectionCorrelationId, - STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(), + streamingType, jsonSchemaVersion, null, Set.of(), authorizationContext, null); final String resourcePathRqlStatement; if (INBOX_OUTBOX_WITH_SUBJECT_PATTERN.matcher(messagePath).matches()) { diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/starter/GatewayRootActor.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/starter/GatewayRootActor.java index ff504bb33c..782dba9538 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/starter/GatewayRootActor.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/starter/GatewayRootActor.java @@ -20,6 +20,7 @@ import org.eclipse.ditto.edge.service.dispatching.EdgeCommandForwarderActor; import org.eclipse.ditto.edge.service.dispatching.ShardRegions; import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator; +import org.eclipse.ditto.gateway.service.coap.DittoCoapServerActor; import org.eclipse.ditto.gateway.service.endpoints.directives.auth.DevopsAuthenticationDirectiveFactory; import org.eclipse.ditto.gateway.service.endpoints.directives.auth.GatewayAuthenticationDirectiveFactory; import org.eclipse.ditto.gateway.service.endpoints.routes.CustomApiRoutesProvider; @@ -153,6 +154,10 @@ private GatewayRootActor(final GatewayConfig gatewayConfig, final ActorRef pubSu healthCheckActor, pubSubMediator, healthCheckConfig, jwtAuthenticationFactory, devopsAuthenticationDirectiveFactory, protocolAdapterProvider, headerTranslator); + final Route routeForCoap = createRouteForCoap(actorSystem, gatewayConfig, proxyActor, streamingActor, + pubSubMediator, protocolAdapterProvider, headerTranslator); + startChildActor(DittoCoapServerActor.ACTOR_NAME, DittoCoapServerActor.props(routeForCoap)); + httpBinding = Http.get(actorSystem) .newServerAt(hostname, httpConfig.getPort()) .bindFlow(HttpBindFlowProvider.get(actorSystem, dittoExtensionConfig).getFlow(rootRoute)) @@ -233,7 +238,7 @@ private static Route createRoute(final ActorSystem actorSystem, final var commandConfig = gatewayConfig.getCommandConfig(); - final var routeBaseProperties = RouteBaseProperties.newBuilder() + final var routeBaseProperties = RouteBaseProperties.newBuilder(false) .actorSystem(actorSystem) .proxyActor(proxyActor) .httpConfig(httpConfig) @@ -244,7 +249,7 @@ private static Route createRoute(final ActorSystem actorSystem, final var customApiRoutesProvider = CustomApiRoutesProvider.get(actorSystem, dittoExtensionConfig); - return RootRoute.getBuilder(httpConfig) + return RootRoute.getBuilder(httpConfig, false) .statsRoute(new StatsRoute(routeBaseProperties, devopsAuthenticationDirective)) .statusRoute(new StatusRoute(clusterStateSupplier, healthCheckingActor, actorSystem)) .overallStatusRoute(new OverallStatusRoute(clusterStateSupplier, @@ -281,6 +286,63 @@ private static Route createRoute(final ActorSystem actorSystem, .build(); } + private static Route createRouteForCoap(final ActorSystem actorSystem, + final GatewayConfig gatewayConfig, + final ActorRef proxyActor, + final ActorRef streamingActor, + final ActorRef pubSubMediator, + final ProtocolAdapterProvider protocolAdapterProvider, + final HeaderTranslator headerTranslator) { + + final var dittoExtensionConfig = ScopedConfig.dittoExtension(actorSystem.settings().config()); + final var authConfig = gatewayConfig.getAuthenticationConfig(); + + final var authenticationDirectiveFactory = + GatewayAuthenticationDirectiveFactory.get(actorSystem, dittoExtensionConfig); + + final var httpConfig = gatewayConfig.getHttpConfig(); + + final var dittoHeadersValidator = + DittoHeadersValidator.get(actorSystem, dittoExtensionConfig); + + final var streamingConfig = gatewayConfig.getStreamingConfig(); + final var signalEnrichmentProvider = + GatewaySignalEnrichmentProvider.get(actorSystem, dittoExtensionConfig); + + final var commandConfig = gatewayConfig.getCommandConfig(); + + final var routeBaseProperties = RouteBaseProperties.newBuilder(true) + .actorSystem(actorSystem) + .proxyActor(proxyActor) + .httpConfig(httpConfig) + .commandConfig(commandConfig) + .headerTranslator(headerTranslator) + .build(); + + final var customApiRoutesProvider = + CustomApiRoutesProvider.get(actorSystem, dittoExtensionConfig); + + return RootRoute.getBuilder(httpConfig, true) + .policiesRoute(new PoliciesRoute(routeBaseProperties, + OAuthTokenIntegrationSubjectIdFactory.of(authConfig.getOAuthConfig()))) + .sseThingsRoute( + ThingsSseRouteBuilder.getInstance(actorSystem, streamingActor, streamingConfig, pubSubMediator) + .withProxyActor(proxyActor) + .withSignalEnrichmentProvider(signalEnrichmentProvider)) + .thingsRoute(new ThingsRoute(routeBaseProperties, + gatewayConfig.getMessageConfig(), + gatewayConfig.getClaimMessageConfig())) + .whoamiRoute(new WhoamiRoute(routeBaseProperties)) + .cloudEventsRoute(new CloudEventsRoute(routeBaseProperties, gatewayConfig.getCloudEventsConfig())) + .supportedSchemaVersions(httpConfig.getSupportedSchemaVersions()) + .protocolAdapterProvider(protocolAdapterProvider) + .headerTranslator(headerTranslator) + .httpAuthenticationDirective(authenticationDirectiveFactory.buildCoapAuthentication()) + .dittoHeadersValidator(dittoHeadersValidator) + .customApiRoutesProvider(customApiRoutesProvider, routeBaseProperties) + .build(); + } + private ActorRef createHealthCheckActor(final HealthCheckConfig healthCheckConfig) { final var healthCheckingActorOptions = HealthCheckingActorOptions.getBuilder(healthCheckConfig.isEnabled(), healthCheckConfig.getInterval()) diff --git a/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/endpoints/EndpointTestBase.java b/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/endpoints/EndpointTestBase.java index b2c4b31367..ba0bd4f2d4 100755 --- a/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/endpoints/EndpointTestBase.java +++ b/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/endpoints/EndpointTestBase.java @@ -159,7 +159,7 @@ public void before() { final var adapterProvider = ProtocolAdapterProvider.load(protocolConfig, system()); httpHeaderTranslator = adapterProvider.getHttpHeaderTranslator(); - routeBaseProperties = RouteBaseProperties.newBuilder() + routeBaseProperties = RouteBaseProperties.newBuilder(false) .proxyActor(createDummyResponseActor(getResponseProvider())) .actorSystem(system()) .httpConfig(httpConfig) diff --git a/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/endpoints/routes/RootRouteTest.java b/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/endpoints/routes/RootRouteTest.java index c8cf8364ac..60b3d1d75e 100755 --- a/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/endpoints/routes/RootRouteTest.java +++ b/gateway/service/src/test/java/org/eclipse/ditto/gateway/service/endpoints/routes/RootRouteTest.java @@ -143,7 +143,7 @@ public void setUp() { final var devOpsAuthenticationDirective = devopsAuthenticationDirectiveFactory.devops(); final var dittoExtensionConfig = ScopedConfig.dittoExtension(routeBaseProperties.getActorSystem().settings().config()); - final var rootRoute = RootRoute.getBuilder(httpConfig) + final var rootRoute = RootRoute.getBuilder(httpConfig, false) .statsRoute(new StatsRoute(routeBaseProperties, devOpsAuthenticationDirective)) .statusRoute(new StatusRoute(clusterStatusSupplier, createHealthCheckingActorMock(), diff --git a/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/TraceUtils.java b/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/TraceUtils.java index 681d089045..71deb0afe1 100644 --- a/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/TraceUtils.java +++ b/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/TraceUtils.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import akka.http.javadsl.model.HttpHeader; import akka.http.javadsl.model.HttpRequest; /** @@ -58,8 +59,14 @@ private TraceUtils() { * @return The prepared {@link PreparedTimer} */ public static PreparedTimer newHttpRoundTripTimer(final HttpRequest request) { + final boolean isCoapRequest = request.getHeader("ditto-coap-proxy") + .map(HttpHeader::value) + .map(Boolean::parseBoolean) + .orElse(false); + return newExpiringTimer(HTTP_ROUNDTRIP_METRIC_NAME) .tags(getTraceInformationTags(request)) + .tag(SpanTagKey.REQUEST_PROTOCOL.getTagForValue(isCoapRequest ? "CoAP" : "HTTP")) .tag(SpanTagKey.REQUEST_METHOD_NAME.getTagForValue(request.method().name())) .tag(SpanTagKey.CHANNEL.getTagForValue(determineChannel(request))); } diff --git a/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/span/SpanTagKey.java b/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/span/SpanTagKey.java index 243d3103c8..4d54f20c43 100644 --- a/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/span/SpanTagKey.java +++ b/internal/utils/tracing/src/main/java/org/eclipse/ditto/internal/utils/tracing/span/SpanTagKey.java @@ -46,6 +46,9 @@ public abstract class SpanTagKey { public static final SpanTagKey HTTP_STATUS = new HttpStatusImplementation(KEY_PREFIX + "statusCode"); + public static final SpanTagKey REQUEST_PROTOCOL = + new CharSequenceImplementation(KEY_PREFIX + "request.protocol"); + public static final SpanTagKey REQUEST_METHOD_NAME = new CharSequenceImplementation(KEY_PREFIX + "request.method");