From c965112c50153a488c06370ad1211b949fdbf6e6 Mon Sep 17 00:00:00 2001 From: Matthias Kaemmer Date: Tue, 1 Aug 2023 15:49:06 +0200 Subject: [PATCH] [#19] Enhance device communication API * added support for device states * added automatic Pub/Sub tenant topic & subscription creation and deletion * added automatically sending of a config in case a config is requested or a new one created * extended config table with error field Signed-off-by: Matthias Kaemmer --- .../api/DeviceCommunicationHttpServer.java | 31 +- .../api/config/ApiCommonConstants.java | 36 ++ .../api/config/DeviceConfigsConstants.java | 13 +- .../api/config/DeviceStatesConstants.java | 42 ++ .../communication/api/data/DeviceConfig.java | 2 - .../api/data/DeviceConfigEntity.java | 29 +- .../data/DeviceConfigInternalResponse.java | 121 ++++++ .../communication/api/data/DeviceState.java | 106 +++++ .../api/data/DeviceStateEntity.java | 107 +++++ .../api/data/ListDeviceStatesResponse.java | 90 ++++ .../api/handler/ConfigTopicEventHandler.java | 43 ++ .../api/handler/DeviceConfigsHandler.java | 12 +- .../api/handler/DeviceStatesHandler.java | 70 ++++ .../api/handler/StateTopicEventHandler.java | 35 ++ .../api/mapper/DeviceStateMapper.java | 59 +++ .../api/repository/DeviceStateRepository.java | 48 +++ .../repository/DeviceStateRepositoryImpl.java | 173 ++++++++ .../api/service/DeviceConfigServiceImpl.java | 84 ---- .../VertxHttpHandlerManagerService.java | 10 +- .../InternalTopicManager.java} | 14 +- .../InternalTopicManagerImpl.java | 187 +++++++++ .../{ => config}/DeviceConfigService.java | 21 +- .../config/DeviceConfigServiceImpl.java | 284 +++++++++++++ .../DatabaseSchemaCreatorImpl.java | 28 +- .../api/service/state/DeviceStateService.java | 37 ++ .../service/state/DeviceStateServiceImpl.java | 101 +++++ .../core/app/DatabaseConfig.java | 12 + .../core/utils/StringValidateUtils.java | 44 ++ .../api/hono-device-communication-v1.yaml | 79 +++- .../src/main/resources/api/hono-endpoint.yaml | 77 +++- .../resources/db/migration/V1.0__create.sql | 11 - ...g_table.sql => v1_create_config_table.sql} | 8 +- .../resources/db/v1_create_state_table.sql | 28 ++ .../communication/api/ApplicationTest.java | 2 - .../DeviceCommunicationHttpServerTest.java | 33 +- .../api/handler/DeviceConfigsHandlerTest.java | 44 +- .../api/handler/DeviceStatesHandlerTest.java | 150 +++++++ .../service/DeviceConfigServiceImplTest.java | 127 ------ .../InternalTopicManagerImplTest.java | 157 +++++++ .../config/DeviceConfigServiceImplTest.java | 388 ++++++++++++++++++ .../DatabaseSchemaCreatorImplTest.java | 27 +- .../DatabaseServiceImplTest.java | 9 +- .../state/DeviceStateServiceImplTest.java | 152 +++++++ .../core/utils/StringValidateUtilsTest.java | 38 ++ 44 files changed, 2828 insertions(+), 341 deletions(-) create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/config/ApiCommonConstants.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceStatesConstants.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfigInternalResponse.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceState.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceStateEntity.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/data/ListDeviceStatesResponse.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/handler/ConfigTopicEventHandler.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceStatesHandler.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/handler/StateTopicEventHandler.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/mapper/DeviceStateMapper.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceStateRepository.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceStateRepositoryImpl.java delete mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceConfigServiceImpl.java rename device-communication/src/main/java/org/eclipse/hono/communication/api/service/{DeviceCommandService.java => communication/InternalTopicManager.java} (66%) create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImpl.java rename device-communication/src/main/java/org/eclipse/hono/communication/api/service/{ => config}/DeviceConfigService.java (69%) create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/config/DeviceConfigServiceImpl.java rename device-communication/src/main/java/org/eclipse/hono/communication/api/service/{ => database}/DatabaseSchemaCreatorImpl.java (70%) create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/state/DeviceStateService.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/state/DeviceStateServiceImpl.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/core/utils/StringValidateUtils.java delete mode 100644 device-communication/src/main/resources/db/migration/V1.0__create.sql rename device-communication/src/main/resources/db/{create_device_config_table.sql => v1_create_config_table.sql} (73%) create mode 100644 device-communication/src/main/resources/db/v1_create_state_table.sql create mode 100644 device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceStatesHandlerTest.java delete mode 100644 device-communication/src/test/java/org/eclipse/hono/communication/api/service/DeviceConfigServiceImplTest.java create mode 100644 device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImplTest.java create mode 100644 device-communication/src/test/java/org/eclipse/hono/communication/api/service/config/DeviceConfigServiceImplTest.java rename device-communication/src/test/java/org/eclipse/hono/communication/api/service/{ => database}/DatabaseSchemaCreatorImplTest.java (75%) rename device-communication/src/test/java/org/eclipse/hono/communication/api/service/{ => database}/DatabaseServiceImplTest.java (90%) create mode 100644 device-communication/src/test/java/org/eclipse/hono/communication/api/service/state/DeviceStateServiceImplTest.java create mode 100644 device-communication/src/test/java/org/eclipse/hono/communication/core/utils/StringValidateUtilsTest.java diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/DeviceCommunicationHttpServer.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/DeviceCommunicationHttpServer.java index fdfe6925..0e1dd8e0 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/DeviceCommunicationHttpServer.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/DeviceCommunicationHttpServer.java @@ -22,9 +22,10 @@ import javax.inject.Singleton; -import org.eclipse.hono.communication.api.service.DatabaseSchemaCreator; -import org.eclipse.hono.communication.api.service.DatabaseService; import org.eclipse.hono.communication.api.service.VertxHttpHandlerManagerService; +import org.eclipse.hono.communication.api.service.communication.InternalTopicManager; +import org.eclipse.hono.communication.api.service.database.DatabaseSchemaCreator; +import org.eclipse.hono.communication.api.service.database.DatabaseService; import org.eclipse.hono.communication.core.app.ApplicationConfig; import org.eclipse.hono.communication.core.app.ServerConfig; import org.eclipse.hono.communication.core.http.AbstractVertxHttpServer; @@ -45,12 +46,12 @@ import io.vertx.ext.web.openapi.RouterBuilder; import io.vertx.ext.web.validation.BadRequestException; - /** * Vertx HTTP Server for the device communication api. */ @Singleton public class DeviceCommunicationHttpServer extends AbstractVertxHttpServer implements HttpServer { + private final Logger log = LoggerFactory.getLogger(DeviceCommunicationHttpServer.class); private final String serverStartedMsg = "HTTP Server is listening at http://{}:{}"; private final String serverFailedMsg = "HTTP Server failed to start: {}"; @@ -58,9 +59,9 @@ public class DeviceCommunicationHttpServer extends AbstractVertxHttpServer imple private final DatabaseService db; private final DatabaseSchemaCreator databaseSchemaCreator; + private final InternalTopicManager internalTopicManager; private List httpEndpointHandlers; - /** * Creates a new DeviceCommunicationHttpServer with all dependencies. * @@ -69,26 +70,27 @@ public class DeviceCommunicationHttpServer extends AbstractVertxHttpServer imple * @param httpHandlerManager The http handler manager * @param databaseService The database connection * @param databaseSchemaCreator The database migrations service + * @param internalTopicManager The internal topic manager */ public DeviceCommunicationHttpServer(final ApplicationConfig appConfigs, final Vertx vertx, final VertxHttpHandlerManagerService httpHandlerManager, final DatabaseService databaseService, - final DatabaseSchemaCreator databaseSchemaCreator) { + final DatabaseSchemaCreator databaseSchemaCreator, final InternalTopicManager internalTopicManager) { super(appConfigs, vertx); this.httpHandlerManager = httpHandlerManager; this.databaseSchemaCreator = databaseSchemaCreator; this.httpEndpointHandlers = new ArrayList<>(); this.db = databaseService; + this.internalTopicManager = internalTopicManager; } - @Override public void start() { - //Create Database Tables databaseSchemaCreator.createDBTables(); - // Create Endpoints Router + internalTopicManager.initPubSub(); + this.httpEndpointHandlers = httpHandlerManager.getAvailableHandlerServices(); RouterBuilder.create(this.vertx, appConfigs.getServerConfig().getOpenApiFilePath()) .onSuccess(routerBuilder -> { @@ -97,7 +99,7 @@ public void start() { }) .onFailure(error -> { if (error != null) { - log.error("Can not create Router: {}", error.getMessage()); + log.error("Can not create Router {}", error.getMessage()); } else { log.error("Can not create Router"); } @@ -106,7 +108,6 @@ public void start() { }); - // Wait until application is stopped Quarkus.waitForExit(); } @@ -118,7 +119,8 @@ public void start() { * @param httpEndpointHandlers All available http endpoint handlers * @return The created Router object */ - Router createRouterWithEndpoints(final RouterBuilder routerBuilder, final List httpEndpointHandlers) { + Router createRouterWithEndpoints(final RouterBuilder routerBuilder, + final List httpEndpointHandlers) { for (HttpEndpointHandler handlerService : httpEndpointHandlers) { handlerService.addRoutes(routerBuilder); } @@ -165,7 +167,6 @@ private void addReadinessHandlers(final Router router, final String readinessPat router.get(readinessPath).handler(healthCheckHandler); } - private void addLivenessHandlers(final Router router, final String livenessPath) { log.info("Adding liveness path: {}", livenessPath); final HealthCheckHandler healthCheckHandler = HealthCheckHandler.create(vertx); @@ -190,8 +191,8 @@ void startVertxServer(final Router router) { .listen(); serverCreationFuture - .onSuccess(server -> log.info(this.serverStartedMsg, serverConfigs.getServerUrl() - , serverConfigs.getServerPort())) + .onSuccess(server -> log.info(this.serverStartedMsg, serverConfigs.getServerUrl(), + serverConfigs.getServerPort())) .onFailure(error -> log.info(this.serverFailedMsg, error.getMessage())); } @@ -229,10 +230,8 @@ void addDefault404ExceptionHandler(final RoutingContext routingContext) { } } - @Override public void stop() { - // stop server custom functionality db.close(); } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/config/ApiCommonConstants.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/ApiCommonConstants.java new file mode 100644 index 00000000..c0b82170 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/ApiCommonConstants.java @@ -0,0 +1,36 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.config; + +/** + * Device constant values. + */ +public final class ApiCommonConstants { + + /** + * Path parameter name for tenantId. + */ + public static final String TENANT_PATH_PARAMS = "tenantid"; + /** + * Path parameter name for deviceId. + */ + public static final String DEVICE_PATH_PARAMS = "deviceid"; + + private ApiCommonConstants() { + // avoid instantiation + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceConfigsConstants.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceConfigsConstants.java index 86ff6ed2..7122e23b 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceConfigsConstants.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceConfigsConstants.java @@ -21,33 +21,24 @@ */ public final class DeviceConfigsConstants { + public static ApiCommonConstants API_COMMON; /** * OpenApi GET device configs operation id. */ public static final String LIST_CONFIG_VERSIONS_OP_ID = "listConfigVersions"; - /** - * Path parameter name for tenantId. - */ - public static final String TENANT_PATH_PARAMS = "tenantid"; - /** - * Path parameter name for deviceId. - */ - public static final String DEVICE_PATH_PARAMS = "deviceid"; /** * Path parameter name for number of versions. */ public static final String NUM_VERSION_QUERY_PARAMS = "numVersions"; - /** * Sql migrations script path. */ - public static final String CREATE_SQL_SCRIPT_PATH = "db/create_device_config_table.sql"; + public static final String CREATE_SQL_SCRIPT_PATH = "db/v1_create_config_table.sql"; /** * OpenApi POST device configs operation id. */ public static final String POST_MODIFY_DEVICE_CONFIG_OP_ID = "modifyCloudToDeviceConfig"; private DeviceConfigsConstants() { - // avoid instantiation } } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceStatesConstants.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceStatesConstants.java new file mode 100644 index 00000000..f70cc710 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceStatesConstants.java @@ -0,0 +1,42 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.config; + +/** + * Device states constant values. + */ +public final class DeviceStatesConstants { + + public static ApiCommonConstants API_COMMON; + /** + * OpenApi GET device states operation id. + */ + public static final String LIST_STATES_OP_ID = "listStates"; + /** + * Path parameter name for number of states. + */ + + public static final String NUM_STATES_QUERY_PARAMS = "numStates"; + /** + * Sql migrations script path. + */ + public static final String CREATE_SQL_SCRIPT_PATH = "db/v1_create_state_table.sql"; + + private DeviceStatesConstants() { + // avoid instantiation + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfig.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfig.java index 47425745..6f24e780 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfig.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfig.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; - - /** * The device configuration. **/ diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfigEntity.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfigEntity.java index 186ccc44..14b4ab5b 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfigEntity.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfigEntity.java @@ -18,6 +18,8 @@ import java.util.Objects; +import com.fasterxml.jackson.annotation.JsonProperty; + /** * The device configuration entity object. **/ @@ -31,6 +33,7 @@ public class DeviceConfigEntity { private String cloudUpdateTime; private String deviceAckTime; private String binaryData; + private String deviceAckError; /** @@ -48,47 +51,66 @@ public void setVersion(final int version) { this.version = version; } + @JsonProperty("tenantId") public String getTenantId() { return tenantId; } + @JsonProperty("tenant_id") public void setTenantId(final String tenantId) { this.tenantId = tenantId; } + @JsonProperty("deviceId") public String getDeviceId() { return deviceId; } + @JsonProperty("device_id") public void setDeviceId(final String deviceId) { this.deviceId = deviceId; } - + @JsonProperty("cloudUpdateTime") public String getCloudUpdateTime() { return cloudUpdateTime; } + @JsonProperty("cloud_update_time") public void setCloudUpdateTime(final String cloudUpdateTime) { this.cloudUpdateTime = cloudUpdateTime; } + @JsonProperty("deviceAckTime") public String getDeviceAckTime() { return deviceAckTime; } + @JsonProperty("device_ack_time") public void setDeviceAckTime(final String deviceAckTime) { this.deviceAckTime = deviceAckTime; } + @JsonProperty("binaryData") public String getBinaryData() { return binaryData; } + @JsonProperty("binary_data") public void setBinaryData(final String binaryData) { this.binaryData = binaryData; } + @JsonProperty("device_ack_error") + public String getDeviceAckError() { + return deviceAckError; + } + + @JsonProperty("device_ack_error") + public void setDeviceAckError(final String deviceAckError) { + this.deviceAckError = deviceAckError; + } + @Override public String toString() { @@ -99,6 +121,7 @@ public String toString() { ", cloudUpdateTime='" + cloudUpdateTime + '\'' + ", deviceAckTime='" + deviceAckTime + '\'' + ", binaryData='" + binaryData + '\'' + + ", device_ack_error='" + deviceAckError + '\'' + '}'; } @@ -111,12 +134,12 @@ public boolean equals(final Object o) { return false; } final var that = (DeviceConfigEntity) o; - return version == that.version && tenantId.equals(that.tenantId) && deviceId.equals(that.deviceId) && cloudUpdateTime.equals(that.cloudUpdateTime) && Objects.equals(deviceAckTime, that.deviceAckTime) && binaryData.equals(that.binaryData); + return version == that.version && tenantId.equals(that.tenantId) && deviceId.equals(that.deviceId) && cloudUpdateTime.equals(that.cloudUpdateTime) && Objects.equals(deviceAckTime, that.deviceAckTime) && binaryData.equals(that.binaryData) && deviceAckError.equals(that.deviceAckError); } @Override public int hashCode() { - return Objects.hash(version, tenantId, deviceId, cloudUpdateTime, deviceAckTime, binaryData); + return Objects.hash(version, tenantId, deviceId, cloudUpdateTime, deviceAckTime, binaryData, deviceAckError); } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfigInternalResponse.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfigInternalResponse.java new file mode 100644 index 00000000..d5a93246 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceConfigInternalResponse.java @@ -0,0 +1,121 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.data; + +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Device config response object. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class DeviceConfigInternalResponse { + + private String version; + private String tenantId; + private String deviceId; + private String deviceAckError; + + /** + * Creates a new DeviceConfigInternalResponse. + */ + public DeviceConfigInternalResponse() { + } + + /** + * Creates a new DeviceConfigInternalResponse. + * + * @param tenantId Tenant id + * @param deviceId Device id + * @param version Device config version + * @param deviceAckError Device acknowledgement error + */ + public DeviceConfigInternalResponse(final String tenantId, final String deviceId, final String version, + final String deviceAckError) { + this.version = version; + this.tenantId = tenantId; + this.deviceId = deviceId; + this.deviceAckError = deviceAckError; + } + + @JsonProperty("version") + public String getVersion() { + return version; + } + + public void setVersion(final String version) { + this.version = version; + } + + @JsonProperty("tenantId") + public String getTenantId() { + return tenantId; + } + + public void setTenantId(final String tenantId) { + this.tenantId = tenantId; + } + + @JsonProperty("deviceId") + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(final String deviceId) { + this.deviceId = deviceId; + } + + @JsonProperty("device_ack_error") + public String getDeviceAckError() { + return deviceAckError; + } + + public void setDeviceAckError(final String deviceAckError) { + this.deviceAckError = deviceAckError; + } + + @Override + public String toString() { + return "DeviceConfigAckResponse{" + + "version='" + version + '\'' + + ", tenantId='" + tenantId + '\'' + + ", deviceId='" + deviceId + '\'' + + ", device_ack_error='" + deviceAckError + '\'' + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DeviceConfigInternalResponse that = (DeviceConfigInternalResponse) o; + return version.equals(that.version) && tenantId.equals(that.tenantId) && deviceId.equals(that.deviceId) + && deviceAckError.equals(that.deviceAckError); + } + + @Override + public int hashCode() { + return Objects.hash(version, tenantId, deviceId, deviceAckError); + } + +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceState.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceState.java new file mode 100644 index 00000000..1c063272 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceState.java @@ -0,0 +1,106 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.data; + +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * The device state. + **/ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class DeviceState { + + private String updateTime; + private String binaryData; + + /** + * Creates a new device state. + */ + public DeviceState() { + } + + /** + * Creates a new device state. + * + * @param updateTime Cloud update time + * @param binaryData Binary data + */ + public DeviceState(final String updateTime, final String binaryData) { + this.updateTime = updateTime; + this.binaryData = binaryData; + } + + @JsonProperty("updateTime") + public String getUpdateTime() { + return updateTime; + } + + @JsonProperty("update_time") + public void setUpdateTime(final String updateTime) { + this.updateTime = updateTime; + } + + @JsonProperty("binaryData") + public String getBinaryData() { + return binaryData; + } + + @JsonProperty("binary_data") + public void setBinaryData(final String binaryData) { + this.binaryData = binaryData; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DeviceState deviceState = (DeviceState) o; + return Objects.equals(updateTime, deviceState.updateTime) && + Objects.equals(binaryData, deviceState.binaryData); + } + + @Override + public int hashCode() { + return Objects.hash(updateTime, binaryData); + } + + @Override + public String toString() { + + return "class DeviceState {\n" + + " updateTime: " + toIndentedString(updateTime) + "\n" + + " binaryData: " + toIndentedString(binaryData) + "\n" + + "}"; + } + + /** + * Convert the given object to string with each line indented by 4 spaces (except the first line). + */ + private String toIndentedString(final Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceStateEntity.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceStateEntity.java new file mode 100644 index 00000000..cbf7c976 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/DeviceStateEntity.java @@ -0,0 +1,107 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.data; + +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * The device state entity object. + **/ +public class DeviceStateEntity { + + private String id; + private String tenantId; + private String deviceId; + private String updateTime; + private String binaryData; + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getTenantId() { + return tenantId; + } + + public void setTenantId(final String tenantId) { + this.tenantId = tenantId; + } + + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(final String deviceId) { + this.deviceId = deviceId; + } + + @JsonProperty("updateTime") + public String getUpdateTime() { + return updateTime; + } + + @JsonProperty("update_time") + public void setUpdateTime(final String updateTime) { + this.updateTime = updateTime; + } + + @JsonProperty("binaryData") + public String getBinaryData() { + return binaryData; + } + + @JsonProperty("binary_data") + public void setBinaryData(final String binaryData) { + this.binaryData = binaryData; + } + + @Override + public String toString() { + return "DeviceStateEntity{" + + "id=" + id + + ", tenantId='" + tenantId + '\'' + + ", deviceId='" + deviceId + '\'' + + ", updateTime='" + updateTime + '\'' + + ", binaryData='" + binaryData + '\'' + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DeviceStateEntity that = (DeviceStateEntity) o; + return id.equals(that.id) && tenantId.equals(that.tenantId) && deviceId.equals(that.deviceId) + && updateTime.equals(that.updateTime) && binaryData.equals(that.binaryData); + } + + @Override + public int hashCode() { + return Objects.hash(id, tenantId, deviceId, updateTime, binaryData); + } + +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/data/ListDeviceStatesResponse.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/ListDeviceStatesResponse.java new file mode 100644 index 00000000..50f36280 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/data/ListDeviceStatesResponse.java @@ -0,0 +1,90 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.data; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A list of a device states. + **/ +public class ListDeviceStatesResponse { + + private List deviceStates = new ArrayList<>(); + + /** + * Creates a new ListDeviceStatesResponse. + */ + public ListDeviceStatesResponse() { + } + + /** + * Creates a new ListDeviceStatesResponse. + * + * @param deviceStates The device states + */ + public ListDeviceStatesResponse(final List deviceStates) { + this.deviceStates = deviceStates; + } + + @JsonProperty("deviceStates") + public List getDeviceStates() { + return deviceStates; + } + + public void setDeviceStates(final List deviceStates) { + this.deviceStates = deviceStates; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final var listDeviceStatesResponse = (ListDeviceStatesResponse) o; + return Objects.equals(deviceStates, listDeviceStatesResponse.deviceStates); + } + + @Override + public int hashCode() { + return Objects.hash(deviceStates); + } + + @Override + public String toString() { + + return "class ListDeviceStatesResponse {\n" + + " deviceStates: " + toIndentedString(deviceStates) + "\n" + + "}"; + } + + /** + * Convert the given object to string with each line indented by 4 spaces (except the first line). + */ + private String toIndentedString(final Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/ConfigTopicEventHandler.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/ConfigTopicEventHandler.java new file mode 100644 index 00000000..e547df2e --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/ConfigTopicEventHandler.java @@ -0,0 +1,43 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.handler; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.pubsub.v1.PubsubMessage; + +/** + * Config topic event handler interface. + */ +public interface ConfigTopicEventHandler { + + /** + * Handle incoming config send requests. + * + * @param pubsubMessage The message to handle + * @param consumer The message consumer + */ + void onDeviceConfigRequest(PubsubMessage pubsubMessage, AckReplyConsumer consumer); + + /** + * Handle incoming config response message. + * + * @param pubsubMessage The message to handle + * @param consumer The message consumer + */ + + void onDeviceConfigResponse(PubsubMessage pubsubMessage, AckReplyConsumer consumer); +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceConfigsHandler.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceConfigsHandler.java index a1649e23..05ac2cc0 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceConfigsHandler.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceConfigsHandler.java @@ -22,7 +22,7 @@ import org.eclipse.hono.communication.api.data.DeviceConfig; import org.eclipse.hono.communication.api.data.DeviceConfigRequest; import org.eclipse.hono.communication.api.data.ListDeviceConfigVersionsResponse; -import org.eclipse.hono.communication.api.service.DeviceConfigService; +import org.eclipse.hono.communication.api.service.config.DeviceConfigService; import org.eclipse.hono.communication.core.http.HttpEndpointHandler; import org.eclipse.hono.communication.core.utils.ResponseUtils; @@ -31,8 +31,6 @@ import io.vertx.ext.web.openapi.RouterBuilder; - - /** * Handler for device config endpoints. */ @@ -66,8 +64,8 @@ public void addRoutes(final RouterBuilder routerBuilder) { * @return Future of DeviceConfig */ public Future handleModifyCloudToDeviceConfig(final RoutingContext routingContext) { - final var tenantId = routingContext.pathParam(DeviceConfigsConstants.TENANT_PATH_PARAMS); - final var deviceId = routingContext.pathParam(DeviceConfigsConstants.DEVICE_PATH_PARAMS); + final var tenantId = routingContext.pathParam(DeviceConfigsConstants.API_COMMON.TENANT_PATH_PARAMS); + final var deviceId = routingContext.pathParam(DeviceConfigsConstants.API_COMMON.DEVICE_PATH_PARAMS); final DeviceConfigRequest deviceConfig = routingContext.body() .asJsonObject() @@ -88,8 +86,8 @@ public Future handleListConfigVersions(final R final var numVersions = routingContext.queryParams().get(DeviceConfigsConstants.NUM_VERSION_QUERY_PARAMS); final var limit = numVersions == null ? 0 : Integer.parseInt(numVersions); - final var tenantId = routingContext.pathParam(DeviceConfigsConstants.TENANT_PATH_PARAMS); - final var deviceId = routingContext.pathParam(DeviceConfigsConstants.DEVICE_PATH_PARAMS); + final var tenantId = routingContext.pathParam(DeviceConfigsConstants.API_COMMON.TENANT_PATH_PARAMS); + final var deviceId = routingContext.pathParam(DeviceConfigsConstants.API_COMMON.DEVICE_PATH_PARAMS); return configService.listAll(deviceId, tenantId, limit) .onSuccess(result -> ResponseUtils.successResponse(routingContext, result)) diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceStatesHandler.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceStatesHandler.java new file mode 100644 index 00000000..c1c6c8f3 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceStatesHandler.java @@ -0,0 +1,70 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.handler; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.hono.communication.api.config.DeviceStatesConstants; +import org.eclipse.hono.communication.api.data.ListDeviceStatesResponse; +import org.eclipse.hono.communication.api.service.state.DeviceStateService; +import org.eclipse.hono.communication.core.http.HttpEndpointHandler; +import org.eclipse.hono.communication.core.utils.ResponseUtils; + +import io.vertx.core.Future; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.openapi.RouterBuilder; + +/** + * Handler for device state endpoints. + */ +@ApplicationScoped +public class DeviceStatesHandler implements HttpEndpointHandler { + + private final DeviceStateService stateService; + + /** + * Creates a new DeviceStatesHandler. + * + * @param stateService The device states service. + */ + public DeviceStatesHandler(final DeviceStateService stateService) { + this.stateService = stateService; + } + + @Override + public void addRoutes(final RouterBuilder routerBuilder) { + routerBuilder.operation(DeviceStatesConstants.LIST_STATES_OP_ID).handler(this::handleListStates); + } + + /** + * Handles get device states. + * + * @param routingContext The RoutingContext + * @return Future of ListDeviceStatesResponse + */ + public Future handleListStates(final RoutingContext routingContext) { + final var numStates = routingContext.queryParams().get(DeviceStatesConstants.NUM_STATES_QUERY_PARAMS); + + final var limit = numStates == null ? 0 : Integer.parseInt(numStates); + final var tenantId = routingContext.pathParam(DeviceStatesConstants.API_COMMON.TENANT_PATH_PARAMS); + final var deviceId = routingContext.pathParam(DeviceStatesConstants.API_COMMON.DEVICE_PATH_PARAMS); + + return stateService.listAll(deviceId, tenantId, limit) + .onSuccess(result -> ResponseUtils.successResponse(routingContext, result)) + .onFailure(err -> ResponseUtils.errorResponse(routingContext, err)); + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/StateTopicEventHandler.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/StateTopicEventHandler.java new file mode 100644 index 00000000..3c75c101 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/StateTopicEventHandler.java @@ -0,0 +1,35 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.handler; + + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.pubsub.v1.PubsubMessage; + +/** + * State topic event handler interface. + */ +public interface StateTopicEventHandler { + + /** + * Handle incoming messages on state topics. + * + * @param pubsubMessage The message to handle. + * @param consumer The message consumer. + */ + void onStateMessage(PubsubMessage pubsubMessage, AckReplyConsumer consumer); +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/mapper/DeviceStateMapper.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/mapper/DeviceStateMapper.java new file mode 100644 index 00000000..b7d82b38 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/mapper/DeviceStateMapper.java @@ -0,0 +1,59 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.mapper; + +import java.time.Instant; + +import org.eclipse.hono.communication.api.data.DeviceState; +import org.eclipse.hono.communication.api.data.DeviceStateEntity; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.NullValuePropertyMappingStrategy; + +import com.google.pubsub.v1.PubsubMessage; + +/** + * Mapper for device state objects. + */ +@Mapper(componentModel = "cdi", nullValuePropertyMappingStrategy = NullValuePropertyMappingStrategy.IGNORE) +public interface DeviceStateMapper { + + /** + * Convert device state entity to device state. + * + * @param entity The device state entity + * @return The device state + */ + DeviceState deviceStateEntityToDeviceState(DeviceStateEntity entity); + + /** + * Convert Pub/Sub message to device state entity. + * + * @param pubsubMessage The Pub/Sub message. + * @return The device state entity. + */ + @Mapping(target = "id", ignore = true) + @Mapping(target = "updateTime", expression = "java(getDateTime())") + @Mapping(target = "tenantId", expression = "java(pubsubMessage.getAttributesMap().get(\"tenantId\"))") + @Mapping(target = "deviceId", expression = "java(pubsubMessage.getAttributesMap().get(\"deviceId\"))") + @Mapping(target = "binaryData", expression = "java(pubsubMessage.getData().toStringUtf8())") + DeviceStateEntity pubSubMessageToDeviceStateEntity(PubsubMessage pubsubMessage); + + default String getDateTime() { + return Instant.now().toString(); + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceStateRepository.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceStateRepository.java new file mode 100644 index 00000000..cb64ba5e --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceStateRepository.java @@ -0,0 +1,48 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.repository; + +import java.util.List; + +import org.eclipse.hono.communication.api.data.DeviceState; +import org.eclipse.hono.communication.api.data.DeviceStateEntity; + +import io.vertx.core.Future; + +/** + * Device state repository interface. + */ +public interface DeviceStateRepository { + + /** + * Lists all states for a specific device. Result is ordered by version desc + * + * @param deviceId The device id + * @param tenantId The tenant id + * @param limit The number of states to show + * @return A Future with a List of DeviceStates + */ + Future> listAll(String deviceId, String tenantId, int limit); + + /** + * Creates a new state and deletes the oldest one if the total number of versions in the DB is bigger than the MAX_LIMIT. + * + * @param entity The instance to insert. + * @return A Future of the created DeviceStateEntity. + */ + Future createNew(DeviceStateEntity entity); +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceStateRepositoryImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceStateRepositoryImpl.java new file mode 100644 index 00000000..88f9b9cc --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceStateRepositoryImpl.java @@ -0,0 +1,173 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.repository; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.hono.communication.api.data.DeviceState; +import org.eclipse.hono.communication.api.data.DeviceStateEntity; +import org.eclipse.hono.communication.api.exception.DeviceNotFoundException; +import org.eclipse.hono.communication.api.service.database.DatabaseService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.sqlclient.RowIterator; +import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.templates.RowMapper; +import io.vertx.sqlclient.templates.SqlTemplate; + +/** + * Repository class for making CRUD operations for device state entities. + */ +@ApplicationScoped +public class DeviceStateRepositoryImpl implements DeviceStateRepository { + + private static final Logger log = LoggerFactory.getLogger(DeviceStateRepositoryImpl.class); + private static final String SQL_COUNT_STATES_WITH_PK_FILTER = "SELECT COUNT(*) AS total FROM device_status WHERE tenant_id = #{tenantId} AND device_id = #{deviceId}"; + private static final String SQL_INSERT = "INSERT INTO device_status (id, tenant_id, device_id, update_time, binary_data) " + + + "VALUES (#{id}, #{tenantId}, #{deviceId}, #{updateTime}, #{binaryData}) RETURNING id"; + private static final String SQL_LIST = "SELECT update_time, binary_data FROM device_status " + + "WHERE device_id = #{deviceId} and tenant_id = #{tenantId} ORDER BY update_time DESC LIMIT #{limit}"; + + private static final String SQL_DELETE = "DELETE FROM device_status WHERE device_id = #{deviceId} AND tenant_id = #{tenantId} AND id NOT IN " + + + "(SELECT id FROM device_status WHERE device_id = #{deviceId} AND tenant_id = #{tenantId} ORDER BY update_time DESC LIMIT 9)"; + private static final String DEVICE_ID_CAPTION = "deviceId"; + private static final String TENANT_ID_CAPTION = "tenantId"; + private static final int MAX_LIMIT = 10; + private final DatabaseService db; + private final DeviceRepository deviceRepository; + + /** + * Creates a new DeviceStateRepositoryImpl. + * + * @param db The database connection + * @param deviceRepository The device repository interface + */ + public DeviceStateRepositoryImpl(final DatabaseService db, + final DeviceRepository deviceRepository) { + this.db = db; + this.deviceRepository = deviceRepository; + } + + @Override + public Future> listAll(final String deviceId, final String tenantId, final int limit) { + final int queryLimit = limit == 0 ? MAX_LIMIT : limit; + return db.getDbClient().withConnection( + sqlConnection -> deviceRepository.searchForDevice(deviceId, tenantId) + .compose( + counter -> { + if (counter < 1) { + throw new DeviceNotFoundException( + String.format("Device with id %s and tenant id %s doesn't exist", + deviceId, + tenantId)); + } + return SqlTemplate + .forQuery(sqlConnection, SQL_LIST) + .mapTo(DeviceStateEntity.class) + .execute(Map.of(DEVICE_ID_CAPTION, deviceId, TENANT_ID_CAPTION, tenantId, "limit", + queryLimit)) + .map(rowSet -> { + final List states = new ArrayList<>(); + rowSet.forEach( + entity -> states.add(new DeviceState(entity.getUpdateTime(), + entity.getBinaryData()))); + return states; + }) + .onSuccess(success -> log.debug( + String.format("Listing all states for device %s and tenant %s", + deviceId, tenantId))) + .onFailure(throwable -> log.error("Error: {}", throwable.getMessage())); + })); + } + + @Override + public Future createNew(final DeviceStateEntity entity) { + return db.getDbClient().withTransaction( + sqlConnection -> deviceRepository.searchForDevice(entity.getDeviceId(), entity.getTenantId()) + .compose( + deviceCounter -> { + if (deviceCounter < 1) { + throw new DeviceNotFoundException( + String.format( + "Device with id %s and tenant id %s doesn't exist", + entity.getDeviceId(), + entity.getTenantId())); + } + return countStates(entity.getDeviceId(), entity.getTenantId()).compose( + stateCounter -> { + if (stateCounter >= 10) { + deleteStates(sqlConnection, entity); + } + return insert(sqlConnection, entity); + }); + }) + .onFailure(error -> log.error(error.getMessage()))); + } + + private Future countStates(final String deviceId, final String tenantId) { + final RowMapper rowMapper = row -> row.getInteger("total"); + return db.getDbClient().withConnection( + sqlConnection -> SqlTemplate + .forQuery(sqlConnection, SQL_COUNT_STATES_WITH_PK_FILTER) + .mapTo(rowMapper) + .execute(Map.of(DEVICE_ID_CAPTION, deviceId, TENANT_ID_CAPTION, tenantId)).map(rowSet -> { + final RowIterator iterator = rowSet.iterator(); + return iterator.next(); + })); + } + + private void deleteStates(final SqlConnection sqlConnection, final DeviceStateEntity entity) { + SqlTemplate.forQuery(sqlConnection, SQL_DELETE) + .execute(Map.of(DEVICE_ID_CAPTION, entity.getDeviceId(), TENANT_ID_CAPTION, entity.getTenantId())); + } + + /** + * Inserts a new entity in to the db. + * + * @param sqlConnection The sql connection instance. + * @param entity The instance to insert. + * @return A Future of the created DeviceStateEntity. + */ + private Future insert(final SqlConnection sqlConnection, final DeviceStateEntity entity) { + entity.setId(UUID.randomUUID().toString()); + return SqlTemplate + .forUpdate(sqlConnection, SQL_INSERT) + .mapFrom(DeviceStateEntity.class) + .mapTo(DeviceStateEntity.class) + .execute(entity) + .map(rowSet -> { + final RowIterator iterator = rowSet.iterator(); + if (iterator.hasNext()) { + return entity; + } else { + throw new IllegalStateException(String.format("Can't create device state: %s", entity)); + } + }) + .onSuccess(success -> log.debug("Device state created successfully: {}", success)) + .onFailure(throwable -> log.error(throwable.getMessage())); + + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceConfigServiceImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceConfigServiceImpl.java deleted file mode 100644 index a8abca20..00000000 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceConfigServiceImpl.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * *********************************************************** - * Copyright (c) 2023 Contributors to the Eclipse Foundation - *

- * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - *

- * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - *

- * SPDX-License-Identifier: EPL-2.0 - * ********************************************************** - * - */ - -package org.eclipse.hono.communication.api.service; - -import javax.enterprise.context.ApplicationScoped; - -import org.eclipse.hono.communication.api.data.DeviceConfig; -import org.eclipse.hono.communication.api.data.DeviceConfigRequest; -import org.eclipse.hono.communication.api.data.ListDeviceConfigVersionsResponse; -import org.eclipse.hono.communication.api.mapper.DeviceConfigMapper; -import org.eclipse.hono.communication.api.repository.DeviceConfigsRepository; - -import io.vertx.core.Future; - -/** - * Service for device commands. - */ - -@ApplicationScoped -public class DeviceConfigServiceImpl implements DeviceConfigService { - - private final DeviceConfigsRepository repository; - private final DatabaseService db; - private final DeviceConfigMapper mapper; - - /** - * Creates a new DeviceConfigServiceImpl with all dependencies. - * - * @param repository The DeviceConfigsRepository - * @param db The database service - * @param mapper The DeviceConfigMapper - */ - public DeviceConfigServiceImpl(final DeviceConfigsRepository repository, - final DatabaseService db, - final DeviceConfigMapper mapper) { - - this.repository = repository; - this.db = db; - this.mapper = mapper; - } - - @Override - public Future modifyCloudToDeviceConfig(final DeviceConfigRequest deviceConfig, final String deviceId, final String tenantId) { - - final var entity = mapper.configRequestToDeviceConfigEntity(deviceConfig); - entity.setDeviceId(deviceId); - entity.setTenantId(tenantId); - - return db.getDbClient().withTransaction( - sqlConnection -> - repository.createNew(sqlConnection, entity)) - .map(mapper::deviceConfigEntityToConfig); - } - - - @Override - public Future listAll(final String deviceId, final String tenantId, final int limit) { - return db.getDbClient().withConnection( - sqlConnection -> repository.listAll(sqlConnection, deviceId, tenantId, limit) - .map( - result -> { - final var listConfig = new ListDeviceConfigVersionsResponse(); - listConfig.setDeviceConfigs(result); - return listConfig; - } - ) - ); - - } -} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/VertxHttpHandlerManagerService.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/VertxHttpHandlerManagerService.java index f7909951..ef1b5ac2 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/VertxHttpHandlerManagerService.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/VertxHttpHandlerManagerService.java @@ -22,6 +22,7 @@ import org.eclipse.hono.communication.api.handler.DeviceCommandHandler; import org.eclipse.hono.communication.api.handler.DeviceConfigsHandler; +import org.eclipse.hono.communication.api.handler.DeviceStatesHandler; import org.eclipse.hono.communication.core.http.HttpEndpointHandler; @@ -39,11 +40,12 @@ public class VertxHttpHandlerManagerService { /** * Creates a new VertxHttpHandlerManagerService with all dependencies. * - * @param configHandler The configuration handler - * @param commandHandler The command handler + * @param configHandler The configuration handler. + * @param commandHandler The command handler. + * @param stateHandler The state handler. */ - public VertxHttpHandlerManagerService(final DeviceConfigsHandler configHandler, final DeviceCommandHandler commandHandler) { - this.availableHandlerServices = List.of(configHandler, commandHandler); + public VertxHttpHandlerManagerService(final DeviceConfigsHandler configHandler, final DeviceCommandHandler commandHandler, final DeviceStatesHandler stateHandler) { + this.availableHandlerServices = List.of(configHandler, commandHandler, stateHandler); } public List getAvailableHandlerServices() { diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceCommandService.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManager.java similarity index 66% rename from device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceCommandService.java rename to device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManager.java index 316d47b5..91b3b23e 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceCommandService.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManager.java @@ -14,19 +14,15 @@ * */ -package org.eclipse.hono.communication.api.service; - -import io.vertx.ext.web.RoutingContext; +package org.eclipse.hono.communication.api.service.communication; /** - * Device commands interface. + * Internal topic manager interface. */ -public interface DeviceCommandService { +public interface InternalTopicManager { /** - * Post device command. - * - * @param routingContext The RoutingContext + * Initializes topics and subscriptions. */ - void postCommand(RoutingContext routingContext); + void initPubSub(); } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImpl.java new file mode 100644 index 00000000..62b36dac --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImpl.java @@ -0,0 +1,187 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.communication; + +import java.util.List; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; +import org.eclipse.hono.client.pubsub.PubSubMessageHelper; +import org.eclipse.hono.communication.api.config.PubSubConstants; +import org.eclipse.hono.communication.api.handler.ConfigTopicEventHandler; +import org.eclipse.hono.communication.api.handler.StateTopicEventHandler; +import org.eclipse.hono.communication.api.repository.DeviceRepository; +import org.eclipse.hono.communication.core.app.InternalMessagingConfig; +import org.eclipse.hono.notification.deviceregistry.LifecycleChange; +import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.common.base.Strings; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; + +import io.vertx.core.Vertx; + +/** + * Internal topic manager interface. + */ +@ApplicationScoped +public class InternalTopicManagerImpl implements InternalTopicManager { + + static ObjectMapper objectMapper = new ObjectMapper(); + private final Logger log = LoggerFactory.getLogger(InternalTopicManagerImpl.class); + private final DeviceRepository deviceRepository; + private final ConfigTopicEventHandler configTopicEventHandler; + private final StateTopicEventHandler stateTopicEventHandler; + private final InternalMessaging internalMessaging; + private final InternalMessagingConfig internalMessagingConfig; + private final Vertx vertx; + + /** + * Creates a new InternalTopicManagerImpl. + * + * @param deviceRepository The device repository + * @param configTopicEventHandler The config topic event handler. + * @param stateTopicEventHandler The state topic event handler. + * @param internalMessaging The internal messaging interface. + * @param internalMessagingConfig The internal messaging config. + * @param vertx The quarkus Vertx instance. + */ + public InternalTopicManagerImpl(final DeviceRepository deviceRepository, + final ConfigTopicEventHandler configTopicEventHandler, + final StateTopicEventHandler stateTopicEventHandler, final InternalMessaging internalMessaging, + final InternalMessagingConfig internalMessagingConfig, final Vertx vertx) { + this.deviceRepository = deviceRepository; + this.configTopicEventHandler = configTopicEventHandler; + this.stateTopicEventHandler = stateTopicEventHandler; + this.internalMessaging = internalMessaging; + this.internalMessagingConfig = internalMessagingConfig; + this.vertx = vertx; + } + + @Override + public void initPubSub() { + log.info("Initialize tenant topics and subscriptions."); + internalMessaging.subscribe(PubSubConstants.TENANT_NOTIFICATIONS, this::onTenantChanges); + deviceRepository.listDistinctTenants() + .onSuccess(tenants -> { + vertx.executeBlocking(promise -> { + tenants.forEach(tenant -> { + createPubSubResourceForTenant(tenant, PubSubResourceType.TOPIC); + createPubSubResourceForTenant(tenant, PubSubResourceType.SUBSCRIPTION); + subscribeToTenantTopics(tenant); + }); + promise.complete(); + }); + log.info("Initialization of tenant topics and subscriptions completed."); + }) + .onFailure(err -> log.error("Error getting tenants for topic creation: {}", err.getMessage())); + } + + /** + * Creates Pub/Sub resources (topics or subscriptions) for the provided tenant. + * + * @param tenantId The tenant. + * @param pubSubResourceType Which Pub Sub resource type should be created (topic or subscription). + */ + private void createPubSubResourceForTenant(final String tenantId, final PubSubResourceType pubSubResourceType) { + final String projectId = internalMessagingConfig.getProjectId(); + if (projectId == null) { + log.error("project ID is null"); + return; + } + PubSubMessageHelper.getCredentialsProvider() + .ifPresentOrElse(provider -> { + final List topics = PubSubConstants.getTenantTopics(); + topics.forEach(topic -> { + final var pubSubBasedAdminClientManager = new PubSubBasedAdminClientManager(projectId, provider); + if (pubSubResourceType == PubSubResourceType.TOPIC) { + pubSubBasedAdminClientManager.getOrCreateTopic(topic, tenantId); + } else { + pubSubBasedAdminClientManager.getOrCreateSubscription(topic, tenantId); + } + pubSubBasedAdminClientManager.closeAdminClients(); + }); + log.info("All {}s created for {}", pubSubResourceType.toString().toLowerCase(), tenantId); + }, () -> log.error("credentials provider is empty")); + } + + private void subscribeToTenantTopics(final String tenant) { + internalMessaging.subscribe( + internalMessagingConfig.getEventTopicFormat().formatted(tenant), + configTopicEventHandler::onDeviceConfigRequest); + internalMessaging.subscribe( + internalMessagingConfig.getCommandAckTopicFormat().formatted(tenant), + configTopicEventHandler::onDeviceConfigResponse); + internalMessaging.subscribe( + internalMessagingConfig.getStateTopicFormat().formatted(tenant), + stateTopicEventHandler::onStateMessage); + } + + /** + * Handle incoming tenant CREATE notifications. + * + * @param pubsubMessage The message to handle + * @param consumer The message consumer + */ + public void onTenantChanges(final PubsubMessage pubsubMessage, final AckReplyConsumer consumer) { + consumer.ack(); + final String jsonString = pubsubMessage.getData().toStringUtf8(); + final TenantChangeNotification notification; + log.info("Handle tenant change notification {}", jsonString); + try { + notification = objectMapper.readValue(jsonString, TenantChangeNotification.class); + } catch (JsonProcessingException e) { + log.error("Can't deserialize tenant change notification: {}", e.getMessage()); + return; + } + final String tenant = notification.getTenantId(); + if (notification.getChange() == LifecycleChange.CREATE && !Strings.isNullOrEmpty(tenant)) { + log.info("Tenant {} was created. All its topics and subscriptions will be created.", tenant); + createPubSubResourceForTenant(tenant, PubSubResourceType.TOPIC); + createPubSubResourceForTenant(tenant, PubSubResourceType.SUBSCRIPTION); + subscribeToTenantTopics(tenant); + } else if (notification.getChange() == LifecycleChange.DELETE && !Strings.isNullOrEmpty(tenant)) { + log.info("Tenant {} was deleted. All its topics and subscriptions will be deleted.", tenant); + cleanupPubSubResources(tenant); + } + } + + private void cleanupPubSubResources(final String tenant) { + final String projectId = internalMessagingConfig.getProjectId(); + final List pubSubTopicsToDelete = PubSubConstants.getTenantTopics().stream().map(id -> TopicName.of(projectId, "%s.%s".formatted(tenant, id)).toString()).toList(); + final List pubSubSubscriptionsToDelete = PubSubConstants.getTenantTopics().stream().map(id -> SubscriptionName.of(projectId, "%s.%s".formatted(tenant, id)).toString()).toList(); + PubSubMessageHelper.getCredentialsProvider() + .ifPresentOrElse(provider -> { + final PubSubBasedAdminClientManager pubSubBasedAdminClientManager = new PubSubBasedAdminClientManager(projectId, provider); + pubSubBasedAdminClientManager.deleteTopics(pubSubTopicsToDelete); + pubSubBasedAdminClientManager.deleteSubscriptions(pubSubSubscriptionsToDelete); + log.info("All topics and subscriptions for tenant {} were deleted successfully.", tenant); + pubSubBasedAdminClientManager.closeAdminClients(); + }, () -> log.error("credentials provider is empty")); + } + + private enum PubSubResourceType { + TOPIC, SUBSCRIPTION + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceConfigService.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/config/DeviceConfigService.java similarity index 69% rename from device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceConfigService.java rename to device-communication/src/main/java/org/eclipse/hono/communication/api/service/config/DeviceConfigService.java index cf282858..4907833b 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceConfigService.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/config/DeviceConfigService.java @@ -14,14 +14,17 @@ * */ -package org.eclipse.hono.communication.api.service; +package org.eclipse.hono.communication.api.service.config; import org.eclipse.hono.communication.api.data.DeviceConfig; +import org.eclipse.hono.communication.api.data.DeviceConfigEntity; +import org.eclipse.hono.communication.api.data.DeviceConfigInternalResponse; import org.eclipse.hono.communication.api.data.DeviceConfigRequest; import org.eclipse.hono.communication.api.data.ListDeviceConfigVersionsResponse; import io.vertx.core.Future; + /** * Device config interface. */ @@ -46,4 +49,20 @@ public interface DeviceConfigService { * @return Future of ListDeviceConfigVersionsResponse */ Future listAll(String deviceId, String tenantId, int limit); + + + /** + * Update field deviceAckTime. + * + * @param config Device config + * @param deviceAckTime Time of ack + */ + void updateDeviceAckTime(DeviceConfigEntity config, String deviceAckTime); + + /** + * Update field error when delivery failure received from the protocol adapter. + * + * @param configErrorResponse Device config error response + */ + void updateDeviceConfigError(DeviceConfigInternalResponse configErrorResponse); } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/config/DeviceConfigServiceImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/config/DeviceConfigServiceImpl.java new file mode 100644 index 00000000..3793decb --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/config/DeviceConfigServiceImpl.java @@ -0,0 +1,284 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.config; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import javax.inject.Singleton; + +import org.apache.commons.codec.binary.Base64; +import org.eclipse.hono.communication.api.data.DeviceConfig; +import org.eclipse.hono.communication.api.data.DeviceConfigEntity; +import org.eclipse.hono.communication.api.data.DeviceConfigInternalResponse; +import org.eclipse.hono.communication.api.data.DeviceConfigRequest; +import org.eclipse.hono.communication.api.data.ListDeviceConfigVersionsResponse; +import org.eclipse.hono.communication.api.handler.ConfigTopicEventHandler; +import org.eclipse.hono.communication.api.mapper.DeviceConfigMapper; +import org.eclipse.hono.communication.api.repository.DeviceConfigRepository; +import org.eclipse.hono.communication.api.service.DeviceServiceAbstract; +import org.eclipse.hono.communication.api.service.communication.InternalMessaging; +import org.eclipse.hono.communication.core.app.InternalMessagingConfig; +import org.eclipse.hono.communication.core.app.InternalMessagingConstants; +import org.eclipse.hono.communication.core.utils.StringValidateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.common.base.Strings; +import com.google.pubsub.v1.PubsubMessage; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; + +/** + * Service for device commands. + */ + +@Singleton +public class DeviceConfigServiceImpl extends DeviceServiceAbstract + implements DeviceConfigService, ConfigTopicEventHandler { + + private static final String TENANT_ID_NOTIFICATION_PROPERTY = "tenantId"; + private static final String DEVICE_ID_NOTIFICATION_PROPERTY = "deviceId"; + private static final String SUBJECT = "config"; + private static final String DEVICE_CONNECTION_TTD = "-1"; + + private final Logger log = LoggerFactory.getLogger(DeviceConfigServiceImpl.class); + private final DeviceConfigRepository repository; + private final DeviceConfigMapper mapper; + private final Vertx vertx; + + /** + * Creates a new DeviceConfigServiceImpl. + * + * @param repository The device config repository + * @param mapper The device config mapper + * @param internalMessagingConfig The internal messaging config + * @param internalMessaging The internal messaging interface + * @param vertx The vertx instance + */ + public DeviceConfigServiceImpl(final DeviceConfigRepository repository, + final DeviceConfigMapper mapper, + final InternalMessagingConfig internalMessagingConfig, + final InternalMessaging internalMessaging, final Vertx vertx) { + + super(internalMessagingConfig, internalMessaging); + + this.repository = repository; + this.mapper = mapper; + this.vertx = vertx; + } + + /** + * Create and publish new device configs. + * + * @param deviceConfig The device configs + * @param deviceId The device id + * @param tenantId The tenant id + * @return Future of device config + */ + public Future modifyCloudToDeviceConfig(final DeviceConfigRequest deviceConfig, final String deviceId, + final String tenantId) { + + if (!StringValidateUtils.isBase64(deviceConfig.getBinaryData())) { + return Future + .failedFuture(new IllegalStateException("Field binaryData type should be String base64 encoded.")); + } + + final var entity = mapper.configRequestToDeviceConfigEntity(deviceConfig); + entity.setDeviceId(deviceId); + entity.setTenantId(tenantId); + + return repository.createNew(entity) + .map(mapper::deviceConfigEntityToConfig) + .onSuccess(result -> { + final var topicToPublish = String.format(messagingConfig.getCommandTopicFormat(), + entity.getTenantId()); + final var messageAttributes = Map.of( + InternalMessagingConstants.DEVICE_ID, entity.getDeviceId(), + InternalMessagingConstants.TENANT_ID, entity.getTenantId(), + InternalMessagingConstants.CORRELATION_ID, result.getVersion(), + InternalMessagingConstants.SUBJECT, SUBJECT, + InternalMessagingConstants.RESPONSE_REQUIRED, "true"); + publish(topicToPublish, result, messageAttributes); + }); + } + + /** + * List all device configs. + * + * @param deviceId The device id + * @param tenantId The tenant id + * @param limit The limit max=10 + * @return Future of ListDeviceConfigVersionsResponse + */ + public Future listAll(final String deviceId, final String tenantId, + final int limit) { + return repository.listAll(deviceId, tenantId, limit) + .map( + result -> { + final var listConfig = new ListDeviceConfigVersionsResponse(); + listConfig.setDeviceConfigs(result); + return listConfig; + }); + } + + @Override + public void onDeviceConfigResponse(final PubsubMessage pubsubMessage, final AckReplyConsumer consumer) { + consumer.ack(); // message was received and processed only once + + final var messageAttributes = pubsubMessage.getAttributesMap(); + + if (!isErrorResponse(messageAttributes)) { + return; + } + + final var deviceId = messageAttributes.get(InternalMessagingConstants.DEVICE_ID); + final var tenantId = messageAttributes.get(InternalMessagingConstants.TENANT_ID); + final var version = messageAttributes.get(InternalMessagingConstants.CORRELATION_ID); + final var status = messageAttributes.get(InternalMessagingConstants.STATUS); + final var errorPayload = new JsonObject(pubsubMessage.getData().toStringUtf8()); + final var error = "error %s: %s".formatted(status, errorPayload.getString("error")); + + final var configErrorResponse = new DeviceConfigInternalResponse(tenantId, deviceId, version, error); + + log.info("Config returned an error: {}", configErrorResponse); + updateDeviceConfigError(configErrorResponse); + + } + + private boolean isErrorResponse(final Map messageAttributes) { + return Objects.equals(messageAttributes.get(messagingConfig.getContentTypeKey()), + messagingConfig.getDeliveryFailureNotificationContentType()); + } + + @Override + public void onDeviceConfigRequest(final PubsubMessage pubsubMessage, final AckReplyConsumer consumer) { + + consumer.ack(); // message was received and processed only once + + final var messageAttributes = pubsubMessage.getAttributesMap(); + final var deviceId = messageAttributes.get(DEVICE_ID_NOTIFICATION_PROPERTY); + final var tenantId = messageAttributes.get(TENANT_ID_NOTIFICATION_PROPERTY); + + if (isNotConfigRequest(messageAttributes, tenantId, deviceId)) { + return; + } + + repository.getDeviceLatestConfig(tenantId, deviceId) + .onSuccess(res -> { + final var config = mapper.deviceConfigEntityToConfig(res); + final var topicToPublish = String.format(messagingConfig.getCommandTopicFormat(), tenantId); + final var attributes = new HashMap(); + attributes.put(InternalMessagingConstants.CORRELATION_ID, config.getVersion()); + attributes.put(InternalMessagingConstants.DEVICE_ID, deviceId); + attributes.put(InternalMessagingConstants.TENANT_ID, tenantId); + attributes.put(InternalMessagingConstants.SUBJECT, SUBJECT); + attributes.put(InternalMessagingConstants.RESPONSE_REQUIRED, "true"); + log.debug("Handle config request event"); + publish(topicToPublish, config, attributes); + }) + .onFailure(err -> log.error("Can't publish configs: {}", err.getMessage())); + } + + private boolean isNotConfigRequest(final Map messageAttributes, final String tenantId, + final String deviceId) { + if (Strings.isNullOrEmpty(deviceId) || Strings.isNullOrEmpty(tenantId)) { + log.warn("Skip device onConnect event: deviceId or tenantId is empty"); + return true; + } + if (!isMqttConfigRequest(messageAttributes) && !isHttpConfigRequest(messageAttributes)) { + log.debug("Skip device event"); + return true; + } + return false; + } + + private boolean isMqttConfigRequest(final Map messageAttributes) { + return Objects.equals(messageAttributes.get(messagingConfig.getContentTypeKey()), + messagingConfig.getEmptyNotificationEventContentType()) && + Objects.equals(messageAttributes.get(messagingConfig.getTtdKey()), DEVICE_CONNECTION_TTD); + } + + private boolean isHttpConfigRequest(final Map messageAttributes) { + return Objects.equals(messageAttributes.get(messagingConfig.getOrigAdapterKey()), "hono-http") + && !messageAttributes.get(messagingConfig.getTtdKey()).isBlank() + && messageAttributes.get(messagingConfig.getOrigAddressKey()).contains(SUBJECT); + } + + private void publish(final String topicToPublish, final DeviceConfig deviceConfig, + final Map attributes) { + final var context = vertx.getOrCreateContext(); + context.executeBlocking(promise -> { + try { + final var configPayload = Base64.decodeBase64(deviceConfig.getBinaryData().getBytes()); + final var tenantId = attributes.get(InternalMessagingConstants.TENANT_ID); + final var deviceId = attributes.get(InternalMessagingConstants.DEVICE_ID); + final var configVersion = deviceConfig.getVersion(); + + resetError(tenantId, deviceId, configVersion); + internalMessaging.publish(topicToPublish, configPayload, attributes); + log.info("Publish device config {}", configPayload); + vertx.setTimer( + messagingConfig.getConfigAckDelay(), id -> configSendSuccess(tenantId, deviceId, configVersion) + .onSuccess(config -> updateDeviceAckTime(config, Instant.now().toString()))); + } catch (Exception ex) { + log.error("Error serialize config {}", deviceConfig); + } finally { + promise.complete(); + } + }); + } + + private Future configSendSuccess(final String tenantId, final String deviceId, + final String configVersion) { + + return repository.getDeviceConfig(tenantId, deviceId, Integer.parseInt(configVersion)) + .compose(config -> { + if (config.getDeviceAckError() == null) { + return Future.succeededFuture(config); + } else { + return Future.failedFuture("Config could not successfully be send"); + } + }); + } + + private void resetError(final String tenantId, final String deviceId, final String configVersion) { + updateDeviceConfigError(new DeviceConfigInternalResponse(tenantId, deviceId, configVersion, null)); + } + + @Override + public void updateDeviceAckTime(final DeviceConfigEntity config, final String deviceAckTime) { + repository.updateDeviceAckTime(config, deviceAckTime) + .onSuccess(ok -> log.info( + "Successfully updated device acknowledged time for config version {} for device {} in tenant {}", + config.getVersion(), config.getDeviceId(), config.getTenantId())); + } + + @Override + public void updateDeviceConfigError(final DeviceConfigInternalResponse configErrorResponse) { + repository.updateDeviceConfigError(configErrorResponse) + .onSuccess( + ok -> log.debug("Successfully updated error for config version {} for device {} in tenant {}", + configErrorResponse.getVersion(), configErrorResponse.getDeviceId(), + configErrorResponse.getTenantId())); + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseSchemaCreatorImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseSchemaCreatorImpl.java similarity index 70% rename from device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseSchemaCreatorImpl.java rename to device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseSchemaCreatorImpl.java index d890db0b..f13ebe3a 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseSchemaCreatorImpl.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseSchemaCreatorImpl.java @@ -14,13 +14,14 @@ * */ -package org.eclipse.hono.communication.api.service; +package org.eclipse.hono.communication.api.service.database; import java.util.Map; import javax.enterprise.context.ApplicationScoped; import org.eclipse.hono.communication.api.config.DeviceConfigsConstants; +import org.eclipse.hono.communication.api.config.DeviceStatesConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,15 +32,15 @@ import io.vertx.sqlclient.templates.SqlTemplate; /** - * Creates all Database tables if they are not exist. + * Creates all Database tables if they do not exist. */ @ApplicationScoped public class DatabaseSchemaCreatorImpl implements DatabaseSchemaCreator { private static final Logger log = LoggerFactory.getLogger(DatabaseSchemaCreatorImpl.class); private final Vertx vertx; - private final String tableCreationErrorMsg = "Table deviceConfig can not be created {}"; - private final String tableCreationSuccessMsg = "Successfully migrate Table: deviceConfig."; + private final String tableCreationErrorMsg = "Table %s can not be created {}"; + private final String tableCreationSuccessMsg = "Successfully migrate Table: %s."; private final DatabaseService db; @@ -56,15 +57,20 @@ public DatabaseSchemaCreatorImpl(final Vertx vertx, final DatabaseService db) { @Override public void createDBTables() { - createDeviceConfigTable(); + createTable(DeviceConfigsConstants.CREATE_SQL_SCRIPT_PATH, "device_config"); + createTable(DeviceStatesConstants.CREATE_SQL_SCRIPT_PATH, "device_status"); } - private void createDeviceConfigTable() { - log.info("Running database migration from file {}", DeviceConfigsConstants.CREATE_SQL_SCRIPT_PATH); + private void createTable(final String filePath, final String tableName) { + log.info("Running database migration from file {}", filePath); final Promise loadScriptTracker = Promise.promise(); - vertx.fileSystem().readFile(DeviceConfigsConstants.CREATE_SQL_SCRIPT_PATH, loadScriptTracker); + vertx.fileSystem().readFile(filePath, loadScriptTracker); + createTableIfNotExist(loadScriptTracker, tableName); + } + + private void createTableIfNotExist(final Promise loadScriptTracker, final String tableName) { db.getDbClient().withTransaction( sqlConnection -> loadScriptTracker.future() @@ -72,13 +78,11 @@ private void createDeviceConfigTable() { .compose(script -> SqlTemplate .forQuery(sqlConnection, script) .execute(Map.of()))) - .onSuccess(ok -> log.info(tableCreationSuccessMsg)) + .onSuccess(ok -> log.info(tableCreationSuccessMsg.formatted(tableName))) .onFailure(error -> { - log.error(tableCreationErrorMsg, error.getMessage()); + log.error(tableCreationErrorMsg.formatted(tableName), error.getMessage()); db.close(); Quarkus.asyncExit(-1); }); - - } } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/state/DeviceStateService.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/state/DeviceStateService.java new file mode 100644 index 00000000..9308467e --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/state/DeviceStateService.java @@ -0,0 +1,37 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.state; + +import org.eclipse.hono.communication.api.data.ListDeviceStatesResponse; + +import io.vertx.core.Future; + +/** + * Device state interface. + */ +public interface DeviceStateService { + + /** + * Lists all the states for a specific device. + * + * @param deviceId Device Id + * @param tenantId Tenant Id + * @param limit Limit between 1 and 10 + * @return Future of ListDeviceStatesResponse + */ + Future listAll(String deviceId, String tenantId, int limit); +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/state/DeviceStateServiceImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/state/DeviceStateServiceImpl.java new file mode 100644 index 00000000..a3afff4b --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/state/DeviceStateServiceImpl.java @@ -0,0 +1,101 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.state; + +import javax.inject.Singleton; + +import org.eclipse.hono.communication.api.data.ListDeviceStatesResponse; +import org.eclipse.hono.communication.api.handler.StateTopicEventHandler; +import org.eclipse.hono.communication.api.mapper.DeviceStateMapper; +import org.eclipse.hono.communication.api.repository.DeviceStateRepository; +import org.eclipse.hono.communication.api.service.DeviceServiceAbstract; +import org.eclipse.hono.communication.api.service.communication.InternalMessaging; +import org.eclipse.hono.communication.core.app.InternalMessagingConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.common.base.Strings; +import com.google.pubsub.v1.PubsubMessage; + +import io.vertx.core.Future; + +/** + * Service for device commands. + */ + +@Singleton +public class DeviceStateServiceImpl extends DeviceServiceAbstract implements DeviceStateService, StateTopicEventHandler { + + private final Logger log = LoggerFactory.getLogger(DeviceStateServiceImpl.class); + private final DeviceStateRepository repository; + private final DeviceStateMapper mapper; + private final String TENANT_ID_EVENT_PROPERTY = "tenantId"; + private final String DEVICE_ID_EVENT_PROPERTY = "deviceId"; + + /** + * Creates a new DeviceStateServiceImpl. + * + * @param repository The device state repository + * @param mapper The device state mapper + * @param internalMessagingConfig The internal messaging config + * @param internalMessaging The internal messaging interface + */ + protected DeviceStateServiceImpl(final DeviceStateRepository repository, final DeviceStateMapper mapper, + final InternalMessagingConfig internalMessagingConfig, final InternalMessaging internalMessaging) { + super(internalMessagingConfig, internalMessaging); + this.repository = repository; + this.mapper = mapper; + } + + @Override + public Future listAll(final String deviceId, final String tenantId, final int limit) { + return repository.listAll(deviceId, tenantId, limit) + .map( + result -> { + final var listState = new ListDeviceStatesResponse(); + listState.setDeviceStates(result); + return listState; + }); + + } + + @Override + public void onStateMessage(final PubsubMessage pubsubMessage, final AckReplyConsumer consumer) { + + consumer.ack(); + + final var messageAttributes = pubsubMessage.getAttributesMap(); + final var deviceId = messageAttributes.get(DEVICE_ID_EVENT_PROPERTY); + final var tenantId = messageAttributes.get(TENANT_ID_EVENT_PROPERTY); + + if (Strings.isNullOrEmpty(deviceId) || Strings.isNullOrEmpty(tenantId)) { + log.warn("Skip device state message: deviceId or tenantId is empty"); + return; + } + + final String msg = pubsubMessage.getData().toStringUtf8(); + + if (Strings.isNullOrEmpty(msg)) { + log.debug("Skip state: payload is empty"); + return; + } + + repository.createNew(mapper.pubSubMessageToDeviceStateEntity(pubsubMessage)) + .onFailure(err -> log.error("Can't save state in DB: {}", err.getMessage())); + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/core/app/DatabaseConfig.java b/device-communication/src/main/java/org/eclipse/hono/communication/core/app/DatabaseConfig.java index 0819106a..c3c79f83 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/core/app/DatabaseConfig.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/core/app/DatabaseConfig.java @@ -42,6 +42,10 @@ public class DatabaseConfig { int poolMaxSize; @ConfigProperty(name = "vertx.device-registration.table") String deviceRegistrationTableName; + @ConfigProperty(name = "vertx.tenant.table") + String tenantTableName; + @ConfigProperty(name = "vertx.tenant.tenant-id-column") + String tenantTableIdColumn; @ConfigProperty(name = "vertx.device-registration.tenant-id-column") String deviceRegistrationTenantIdColumn; @ConfigProperty(name = "vertx.device-registration.device-id-column") @@ -51,6 +55,14 @@ public String getDeviceRegistrationTableName() { return deviceRegistrationTableName; } + public String getTenantTableName() { + return tenantTableName; + } + + public String getTenantTableIdColumn() { + return tenantTableIdColumn; + } + public String getDeviceRegistrationTenantIdColumn() { return deviceRegistrationTenantIdColumn; } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/core/utils/StringValidateUtils.java b/device-communication/src/main/java/org/eclipse/hono/communication/core/utils/StringValidateUtils.java new file mode 100644 index 00000000..95e44b45 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/core/utils/StringValidateUtils.java @@ -0,0 +1,44 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.core.utils; + +import java.util.Base64; + +/** + * String validation utils. + */ +public abstract class StringValidateUtils { + + private StringValidateUtils() { + throw new IllegalStateException("Utility class"); + } + + /** + * Checks if a given string is base64 encoded. + * + * @param stringBase64 String to validate + * @return True if string is base64 else False + */ + public static boolean isBase64(final String stringBase64) { + try { + Base64.getDecoder().decode(stringBase64); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } +} diff --git a/device-communication/src/main/resources/api/hono-device-communication-v1.yaml b/device-communication/src/main/resources/api/hono-device-communication-v1.yaml index 63764c51..e70c8b5c 100644 --- a/device-communication/src/main/resources/api/hono-device-communication-v1.yaml +++ b/device-communication/src/main/resources/api/hono-device-communication-v1.yaml @@ -5,7 +5,7 @@ info: version: 1.0.0 description: Device commands and configs API. servers: - - url: http://localhost:8080/api/v1 + - url: http://localhost:8080/v1 paths: /commands/{tenantid}/{deviceid}: @@ -122,6 +122,51 @@ paths: type: string in: path required: true + /states/{tenantid}/{deviceid}: + summary: Device states + description: List states for a specific device + get: + tags: + - STATES + parameters: + - name: numStates + description: "The number of states to list. States are listed in decreasing + order of the state id. The maximum number of states saved in Database is + 10. If this value is zero, it will return all the states available." + schema: + type: integer + minimum: 0 + maximum: 10 + in: query + required: false + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/ListDeviceStatesResponse' + description: Lists the device states + "404": + description: Device not found + "500": + description: Internal server error + operationId: listStates + summary: List the device states + description: "Lists the last few states of the device in descending + order (i.e.: newest first)." + parameters: + - name: tenantid + description: Unique registry id + schema: + type: string + in: path + required: true + - name: deviceid + description: Unique device id + schema: + type: string + in: path + required: true components: schemas: DeviceCommandRequest: @@ -213,3 +258,35 @@ components: cloudUpdateTime: string deviceAckTime: string binaryData: string + ListDeviceStatesResponse: + title: Root Type for ListDeviceStatesResponse + description: A list of a device states + type: object + properties: + deviceStates: + description: List of DeviceState objects + type: array + items: + $ref: '#/components/schemas/DeviceState' + example: + deviceStates: + - object: DeviceState + DeviceState: + title: Root Type for DeviceState + description: The device state. + type: object + properties: + updateTime: + description: "String (Timestamp format) [Output only] The time at + which this state version was updated. This + timestamp is set by the server. Timestamp in + RFC3339 UTC \"Zulu\" format, accurate to nanoseconds. + Example: \"2014-10-02T15:01:23.045123456Z\"." + type: string + binaryData: + description: "string (bytes format) The device state data + in string base64-encoded format." + type: string + example: + updateTime: string + binaryData: string \ No newline at end of file diff --git a/device-communication/src/main/resources/api/hono-endpoint.yaml b/device-communication/src/main/resources/api/hono-endpoint.yaml index e2e1491b..967451ea 100644 --- a/device-communication/src/main/resources/api/hono-endpoint.yaml +++ b/device-communication/src/main/resources/api/hono-endpoint.yaml @@ -7,7 +7,7 @@ info: version: 1.0.0 schemes: - http -basePath: /api/v1 +basePath: /v1 definitions: DeviceCommandRequest: description: Command json object structure @@ -87,7 +87,6 @@ definitions: required: - binaryData type: object - ListDeviceConfigVersionsResponse: description: A list of a device config versions example: @@ -101,6 +100,39 @@ definitions: type: array title: Root Type for ListDeviceConfigVersionsResponse type: object + DeviceState: + description: The device state. + example: + binaryData: string + updateTime: string + properties: + binaryData: + description: >- + string (bytes format) The device state data in string + base64-encoded format. + type: string + updateTime: + description: >- + String (Timestamp format) [Output only] The time at which this + state version was updated. This timestamp is + set by the server. Timestamp in RFC3339 UTC "Zulu" format, accurate + to nanoseconds. Example: "2014-10-02T15:01:23.045123456Z". + type: string + title: Root Type for DeviceState + type: object + ListDeviceStatesResponse: + description: A list of a device states. + example: + deviceStates: + - object: DeviceState + properties: + deviceStates: + description: List of DeviceState objects. + items: + $ref: "#/definitions/DeviceState" + type: array + title: Root Type for ListDeviceStatesResponse + type: object paths: "/commands/{tenantid}/{deviceid}": parameters: @@ -202,4 +234,43 @@ paths: description: Internal Server error summary: Modify cloud to device config tags: - - CONFIGS \ No newline at end of file + - CONFIGS + "/states/{tenantid}/{deviceid}": + get: + description: >- + Lists the last few versions of the device state in descending + order (i.e.: newest first). + operationId: listStates + parameters: + - description: >- + The number of states to list. States are listed in decreasing + order of the state id. The maximum number of states saved in + Database is 10. If this value is zero, it will return all the + states available. + in: query + maximum: 10 + minimum: 0 + name: numStates + required: false + type: integer + produces: + - application/json + responses: + "200": + description: Lists the device states + schema: + $ref: "#/definitions/ListDeviceStatesResponse" + summary: List the device states + tags: + - STATES + parameters: + - description: Unique registry id + in: path + name: tenantid + required: true + type: string + - description: Unique device id + in: path + name: deviceid + required: true + type: string \ No newline at end of file diff --git a/device-communication/src/main/resources/db/migration/V1.0__create.sql b/device-communication/src/main/resources/db/migration/V1.0__create.sql deleted file mode 100644 index 69fb9487..00000000 --- a/device-communication/src/main/resources/db/migration/V1.0__create.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE TABLE IF NOT EXISTS public."deviceConfig" -( - version INT not null, - "tenantId" VARCHAR(100) not null, - "deviceId" VARCHAR(100) not null, - "cloudUpdateTime" VARCHAR(100) not null, - "deviceAckTime" VARCHAR(100), - "binaryData" VARCHAR not null, - - PRIMARY KEY (version, "tenantId", "deviceId") -) \ No newline at end of file diff --git a/device-communication/src/main/resources/db/create_device_config_table.sql b/device-communication/src/main/resources/db/v1_create_config_table.sql similarity index 73% rename from device-communication/src/main/resources/db/create_device_config_table.sql rename to device-communication/src/main/resources/db/v1_create_config_table.sql index ae425132..994c5302 100644 --- a/device-communication/src/main/resources/db/create_device_config_table.sql +++ b/device-communication/src/main/resources/db/v1_create_config_table.sql @@ -18,11 +18,13 @@ CREATE TABLE IF NOT EXISTS device_configs ( version INT not null, - tenant_id VARCHAR(100) not null, - device_id VARCHAR(100) not null, + tenant_id VARCHAR(256) not null, + device_id VARCHAR(256) not null, cloud_update_time VARCHAR(100) not null, device_ack_time VARCHAR(100), binary_data VARCHAR not null, + device_ack_error VARCHAR(100), - PRIMARY KEY (version, tenant_id, device_id) + PRIMARY KEY (version, tenant_id, device_id), + FOREIGN KEY (tenant_id, device_id) REFERENCES device_registrations (TENANT_ID, DEVICE_ID) ON DELETE CASCADE ) \ No newline at end of file diff --git a/device-communication/src/main/resources/db/v1_create_state_table.sql b/device-communication/src/main/resources/db/v1_create_state_table.sql new file mode 100644 index 00000000..ce100e41 --- /dev/null +++ b/device-communication/src/main/resources/db/v1_create_state_table.sql @@ -0,0 +1,28 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + + +CREATE TABLE IF NOT EXISTS device_status +( + id VARCHAR(100) not null, + tenant_id VARCHAR(256) not null, + device_id VARCHAR(256) not null, + update_time VARCHAR(100) not null, + binary_data VARCHAR not null, + + PRIMARY KEY (id, tenant_id, device_id), + FOREIGN KEY (tenant_id, device_id) REFERENCES device_registrations (TENANT_ID, DEVICE_ID) ON DELETE CASCADE +); \ No newline at end of file diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/ApplicationTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/ApplicationTest.java index c45b74ed..c0af61c9 100644 --- a/device-communication/src/test/java/org/eclipse/hono/communication/api/ApplicationTest.java +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/ApplicationTest.java @@ -28,10 +28,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; - import io.vertx.core.Vertx; - class ApplicationTest { private HttpServer httpServerMock; diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/DeviceCommunicationHttpServerTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/DeviceCommunicationHttpServerTest.java index eee9115b..de3506c5 100644 --- a/device-communication/src/test/java/org/eclipse/hono/communication/api/DeviceCommunicationHttpServerTest.java +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/DeviceCommunicationHttpServerTest.java @@ -17,17 +17,26 @@ package org.eclipse.hono.communication.api; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; - +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.List; import org.eclipse.hono.communication.api.handler.DeviceCommandHandler; -import org.eclipse.hono.communication.api.service.DatabaseSchemaCreator; -import org.eclipse.hono.communication.api.service.DatabaseSchemaCreatorImpl; -import org.eclipse.hono.communication.api.service.DatabaseService; -import org.eclipse.hono.communication.api.service.DatabaseServiceImpl; import org.eclipse.hono.communication.api.service.VertxHttpHandlerManagerService; +import org.eclipse.hono.communication.api.service.communication.InternalTopicManager; +import org.eclipse.hono.communication.api.service.database.DatabaseSchemaCreator; +import org.eclipse.hono.communication.api.service.database.DatabaseSchemaCreatorImpl; +import org.eclipse.hono.communication.api.service.database.DatabaseService; +import org.eclipse.hono.communication.api.service.database.DatabaseServiceImpl; import org.eclipse.hono.communication.core.app.ApplicationConfig; import org.eclipse.hono.communication.core.app.ServerConfig; import org.junit.jupiter.api.AfterEach; @@ -52,7 +61,6 @@ import io.vertx.ext.web.openapi.RouterBuilder; import io.vertx.ext.web.validation.BadRequestException; - class DeviceCommunicationHttpServerTest { private ApplicationConfig appConfigsMock; @@ -67,6 +75,7 @@ class DeviceCommunicationHttpServerTest { private HttpServerRequest httpServerRequestMock; private BadRequestException badRequestExceptionMock; private DatabaseSchemaCreator databaseSchemaCreatorMock; + private InternalTopicManager internalTopicManager; private Route routeMock; private JsonObject jsonObjMock; @@ -89,12 +98,13 @@ void setUp() { jsonObjMock = mock(JsonObject.class); dbMock = mock(DatabaseServiceImpl.class); databaseSchemaCreatorMock = mock(DatabaseSchemaCreatorImpl.class); + internalTopicManager = mock(InternalTopicManager.class); routeMock = mock(Route.class); deviceCommunicationHttpServer = new DeviceCommunicationHttpServer(appConfigsMock, vertxMock, handlerServiceMock, dbMock, - databaseSchemaCreatorMock); + databaseSchemaCreatorMock, internalTopicManager); } @@ -115,7 +125,7 @@ void tearDown() { dbMock, serverConfigMock, databaseSchemaCreatorMock, - routeMock); + routeMock, internalTopicManager); } @@ -178,6 +188,7 @@ void startSucceeded() { verify(routeMock, times(2)).handler(any()); verify(routerMock, times(1)).route(anyString()); verify(routeMock, times(1)).subRouter(any()); + verify(internalTopicManager).initPubSub(); quarkusMockedStatic.verify(Quarkus::waitForExit, times(1)); routerMockedStatic.verifyNoMoreInteractions(); mockedRouterBuilderStatic.verifyNoMoreInteractions(); @@ -213,6 +224,7 @@ void createRouterFailed() { verify(handlerServiceMock, times(1)).getAvailableHandlerServices(); verify(appConfigsMock, times(1)).getServerConfig(); verify(serverConfigMock, times(1)).getOpenApiFilePath(); + verify(internalTopicManager).initPubSub(); quarkusMockedStatic.verify(() -> Quarkus.asyncExit(-1), times(1)); quarkusMockedStatic.verify(Quarkus::waitForExit, times(1)); quarkusMockedStatic.verifyNoMoreInteractions(); @@ -245,7 +257,7 @@ void createServerFailed() { when(routeMock.handler(any())).thenReturn(routeMock); when(vertxMock.createHttpServer(any(HttpServerOptions.class))).thenReturn(httpServerMock); when(httpServerMock.requestHandler(routerMock)).thenReturn(httpServerMock); - when(httpServerMock.listen()).thenReturn(Future.failedFuture(new Throwable("Test error on listen()"))); + when(httpServerMock.listen()).thenReturn(Future.failedFuture(new Throwable())); when(serverConfigMock.getOpenApiFilePath()).thenReturn("/myPath"); when(serverConfigMock.getBasePath()).thenReturn("/basePath"); when(routerMock.route(any())).thenReturn(routeMock); @@ -276,6 +288,7 @@ void createServerFailed() { verify(routeMock, times(2)).handler(any()); verify(routeMock, times(1)).subRouter(any()); verify(routerMock, times(1)).route(anyString()); + verify(internalTopicManager).initPubSub(); routerMockedStatic.verify(() -> Router.router(vertxMock), times(1)); quarkusMockedStatic.verify(Quarkus::waitForExit, times(1)); mockedRouterBuilderStatic.verifyNoMoreInteractions(); diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceConfigsHandlerTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceConfigsHandlerTest.java index e1a65770..e70a2b92 100644 --- a/device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceConfigsHandlerTest.java +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceConfigsHandlerTest.java @@ -18,7 +18,12 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import java.util.List; @@ -26,8 +31,8 @@ import org.eclipse.hono.communication.api.data.DeviceConfig; import org.eclipse.hono.communication.api.data.DeviceConfigRequest; import org.eclipse.hono.communication.api.data.ListDeviceConfigVersionsResponse; -import org.eclipse.hono.communication.api.service.DeviceConfigService; -import org.eclipse.hono.communication.api.service.DeviceConfigServiceImpl; +import org.eclipse.hono.communication.api.service.config.DeviceConfigService; +import org.eclipse.hono.communication.api.service.config.DeviceConfigServiceImpl; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -43,7 +48,6 @@ import io.vertx.ext.web.openapi.Operation; import io.vertx.ext.web.openapi.RouterBuilder; - class DeviceConfigsHandlerTest { private final DeviceConfigService configServiceMock; @@ -113,8 +117,8 @@ void addRoutes() { @Test void handleModifyCloudToDeviceConfig_success() { - when(routingContextMock.pathParam(DeviceConfigsConstants.TENANT_PATH_PARAMS)).thenReturn(tenantID); - when(routingContextMock.pathParam(DeviceConfigsConstants.DEVICE_PATH_PARAMS)).thenReturn(deviceID); + when(routingContextMock.pathParam(DeviceConfigsConstants.API_COMMON.TENANT_PATH_PARAMS)).thenReturn(tenantID); + when(routingContextMock.pathParam(DeviceConfigsConstants.API_COMMON.DEVICE_PATH_PARAMS)).thenReturn(deviceID); when(routingContextMock.body()).thenReturn(requestBodyMock); when(requestBodyMock.asJsonObject()).thenReturn(jsonObjMock); when(jsonObjMock.mapTo(DeviceConfigRequest.class)).thenReturn(deviceConfigRequest); @@ -127,8 +131,8 @@ void handleModifyCloudToDeviceConfig_success() { final var results = deviceConfigsHandler.handleModifyCloudToDeviceConfig(routingContextMock); verify(configServiceMock).modifyCloudToDeviceConfig(deviceConfigRequest, deviceID, tenantID); - verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.TENANT_PATH_PARAMS); - verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.DEVICE_PATH_PARAMS); + verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.API_COMMON.TENANT_PATH_PARAMS); + verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.API_COMMON.DEVICE_PATH_PARAMS); verify(routingContextMock, times(1)).body(); verify(requestBodyMock, times(1)).asJsonObject(); verify(jsonObjMock, times(1)).mapTo(DeviceConfigRequest.class); @@ -139,8 +143,8 @@ void handleModifyCloudToDeviceConfig_success() { @Test void handleModifyCloudToDeviceConfig_failure() { - when(routingContextMock.pathParam(DeviceConfigsConstants.TENANT_PATH_PARAMS)).thenReturn(tenantID); - when(routingContextMock.pathParam(DeviceConfigsConstants.DEVICE_PATH_PARAMS)).thenReturn(deviceID); + when(routingContextMock.pathParam(DeviceConfigsConstants.API_COMMON.TENANT_PATH_PARAMS)).thenReturn(tenantID); + when(routingContextMock.pathParam(DeviceConfigsConstants.API_COMMON.DEVICE_PATH_PARAMS)).thenReturn(deviceID); when(routingContextMock.body()).thenReturn(requestBodyMock); when(requestBodyMock.asJsonObject()).thenReturn(jsonObjMock); when(jsonObjMock.mapTo(DeviceConfigRequest.class)).thenReturn(deviceConfigRequest); @@ -154,9 +158,9 @@ void handleModifyCloudToDeviceConfig_failure() { final var results = deviceConfigsHandler.handleModifyCloudToDeviceConfig(routingContextMock); verify(configServiceMock).modifyCloudToDeviceConfig(deviceConfigRequest, deviceID, tenantID); - verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.TENANT_PATH_PARAMS); + verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.API_COMMON.TENANT_PATH_PARAMS); - verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.DEVICE_PATH_PARAMS); + verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.API_COMMON.DEVICE_PATH_PARAMS); verify(routingContextMock, times(1)).body(); verify(requestBodyMock, times(1)).asJsonObject(); verify(jsonObjMock, times(1)).mapTo(DeviceConfigRequest.class); @@ -171,8 +175,8 @@ void handleListConfigVersions_success() { final var listDeviceConfigVersionsResponse = new ListDeviceConfigVersionsResponse(List.of(deviceConfig)); final MultiMap queryParams = MultiMap.caseInsensitiveMultiMap().add(DeviceConfigsConstants.NUM_VERSION_QUERY_PARAMS, String.valueOf(10)); when(routingContextMock.queryParams()).thenReturn(queryParams); - when(routingContextMock.pathParam(DeviceConfigsConstants.TENANT_PATH_PARAMS)).thenReturn(tenantID); - when(routingContextMock.pathParam(DeviceConfigsConstants.DEVICE_PATH_PARAMS)).thenReturn(deviceID); + when(routingContextMock.pathParam(DeviceConfigsConstants.API_COMMON.TENANT_PATH_PARAMS)).thenReturn(tenantID); + when(routingContextMock.pathParam(DeviceConfigsConstants.API_COMMON.DEVICE_PATH_PARAMS)).thenReturn(deviceID); when(routingContextMock.response()).thenReturn(httpServerResponseMock); when(httpServerResponseMock.setStatusCode(anyInt())).thenReturn(httpServerResponseMock); when(httpServerResponseMock.putHeader("Content-Type", @@ -183,8 +187,8 @@ void handleListConfigVersions_success() { verify(configServiceMock, times(1)).listAll(deviceID, tenantID, 10); verify(routingContextMock, times(1)).queryParams(); - verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.TENANT_PATH_PARAMS); - verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.DEVICE_PATH_PARAMS); + verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.API_COMMON.TENANT_PATH_PARAMS); + verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.API_COMMON.DEVICE_PATH_PARAMS); verifySuccessResponse(results, listDeviceConfigVersionsResponse); @@ -195,8 +199,8 @@ void handleListConfigVersions_success() { void handleListConfigVersions_failed() { final MultiMap queryParams = MultiMap.caseInsensitiveMultiMap().add(DeviceConfigsConstants.NUM_VERSION_QUERY_PARAMS, String.valueOf(10)); when(routingContextMock.queryParams()).thenReturn(queryParams); - when(routingContextMock.pathParam(DeviceConfigsConstants.TENANT_PATH_PARAMS)).thenReturn(tenantID); - when(routingContextMock.pathParam(DeviceConfigsConstants.DEVICE_PATH_PARAMS)).thenReturn(deviceID); + when(routingContextMock.pathParam(DeviceConfigsConstants.API_COMMON.TENANT_PATH_PARAMS)).thenReturn(tenantID); + when(routingContextMock.pathParam(DeviceConfigsConstants.API_COMMON.DEVICE_PATH_PARAMS)).thenReturn(deviceID); when(routingContextMock.response()).thenReturn(httpServerResponseMock); when(httpServerResponseMock.setStatusCode(anyInt())).thenReturn(httpServerResponseMock); when(illegalArgumentExceptionMock.getMessage()).thenReturn(errorMsg); @@ -208,9 +212,9 @@ void handleListConfigVersions_failed() { verify(configServiceMock, times(1)).listAll(deviceID, tenantID, 10); verify(routingContextMock, times(1)).queryParams(); - verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.TENANT_PATH_PARAMS); + verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.API_COMMON.TENANT_PATH_PARAMS); - verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.DEVICE_PATH_PARAMS); + verify(routingContextMock, times(1)).pathParam(DeviceConfigsConstants.API_COMMON.DEVICE_PATH_PARAMS); verifyErrorResponse(results); diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceStatesHandlerTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceStatesHandlerTest.java new file mode 100644 index 00000000..a80ef2e0 --- /dev/null +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceStatesHandlerTest.java @@ -0,0 +1,150 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.handler; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; + +import org.eclipse.hono.communication.api.config.DeviceStatesConstants; +import org.eclipse.hono.communication.api.data.DeviceState; +import org.eclipse.hono.communication.api.data.ListDeviceStatesResponse; +import org.eclipse.hono.communication.api.service.state.DeviceStateService; +import org.eclipse.hono.communication.api.service.state.DeviceStateServiceImpl; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.vertx.core.Future; +import io.vertx.core.MultiMap; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.Json; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.openapi.Operation; +import io.vertx.ext.web.openapi.RouterBuilder; + +class DeviceStatesHandlerTest { + + private final RouterBuilder routerBuilderMock; + private final RoutingContext routingContextMock; + private final Operation operationMock; + private final HttpServerResponse httpServerResponseMock; + private final IllegalArgumentException illegalArgumentExceptionMock; + private final DeviceStateService stateServiceMock; + private final DeviceStatesHandler deviceStatesHandler; + private final String tenantID = "tenant_ID"; + private final String deviceID = "device_ID"; + private final String errorMsg = "test_error"; + private final DeviceState deviceState = new DeviceState(); + + DeviceStatesHandlerTest() { + routerBuilderMock = mock(RouterBuilder.class); + routingContextMock = mock(RoutingContext.class); + operationMock = mock(Operation.class); + httpServerResponseMock = mock(HttpServerResponse.class); + illegalArgumentExceptionMock = mock(IllegalArgumentException.class); + stateServiceMock = mock(DeviceStateServiceImpl.class); + deviceStatesHandler = new DeviceStatesHandler(stateServiceMock); + } + + @Test + void testAddRoutes() { + when(routerBuilderMock.operation(anyString())).thenReturn(operationMock); + when(operationMock.handler(any())).thenReturn(operationMock); + + deviceStatesHandler.addRoutes(routerBuilderMock); + + verify(routerBuilderMock, times(1)).operation(DeviceStatesConstants.LIST_STATES_OP_ID); + verify(operationMock, times(1)).handler(any()); + } + + @Test + void testHandleListStates_success() { + final var listDeviceStatesResponse = new ListDeviceStatesResponse(List.of(deviceState)); + final MultiMap queryParams = MultiMap.caseInsensitiveMultiMap() + .add(DeviceStatesConstants.NUM_STATES_QUERY_PARAMS, String.valueOf(10)); + when(routingContextMock.queryParams()).thenReturn(queryParams); + when(routingContextMock.pathParam(DeviceStatesConstants.API_COMMON.TENANT_PATH_PARAMS)).thenReturn(tenantID); + when(routingContextMock.pathParam(DeviceStatesConstants.API_COMMON.DEVICE_PATH_PARAMS)).thenReturn(deviceID); + when(routingContextMock.response()).thenReturn(httpServerResponseMock); + when(httpServerResponseMock.setStatusCode(anyInt())).thenReturn(httpServerResponseMock); + when(httpServerResponseMock.putHeader("Content-Type", + "application/json")).thenReturn(httpServerResponseMock); + when(stateServiceMock.listAll(deviceID, tenantID, 10)) + .thenReturn(Future.succeededFuture(listDeviceStatesResponse)); + + final var results = deviceStatesHandler.handleListStates(routingContextMock); + + verify(stateServiceMock, times(1)).listAll(deviceID, tenantID, 10); + verify(routingContextMock, times(1)).queryParams(); + verify(routingContextMock, times(1)).pathParam(DeviceStatesConstants.API_COMMON.TENANT_PATH_PARAMS); + verify(routingContextMock, times(1)).pathParam(DeviceStatesConstants.API_COMMON.DEVICE_PATH_PARAMS); + + verifySuccessResponse(results, listDeviceStatesResponse); + } + + @Test + void testHandleListStates_failed() { + final MultiMap queryParams = MultiMap.caseInsensitiveMultiMap() + .add(DeviceStatesConstants.NUM_STATES_QUERY_PARAMS, String.valueOf(10)); + when(routingContextMock.queryParams()).thenReturn(queryParams); + when(routingContextMock.pathParam(DeviceStatesConstants.API_COMMON.TENANT_PATH_PARAMS)).thenReturn(tenantID); + when(routingContextMock.pathParam(DeviceStatesConstants.API_COMMON.DEVICE_PATH_PARAMS)).thenReturn(deviceID); + when(routingContextMock.response()).thenReturn(httpServerResponseMock); + when(httpServerResponseMock.setStatusCode(anyInt())).thenReturn(httpServerResponseMock); + when(illegalArgumentExceptionMock.getMessage()).thenReturn(errorMsg); + when(httpServerResponseMock.putHeader("Content-Type", + "application/json")).thenReturn(httpServerResponseMock); + when(stateServiceMock.listAll(deviceID, tenantID, 10)) + .thenReturn(Future.failedFuture(illegalArgumentExceptionMock)); + + final var results = deviceStatesHandler.handleListStates(routingContextMock); + + verify(stateServiceMock, times(1)).listAll(deviceID, tenantID, 10); + verify(routingContextMock, times(1)).queryParams(); + verify(routingContextMock, times(1)).pathParam(DeviceStatesConstants.API_COMMON.TENANT_PATH_PARAMS); + + verify(routingContextMock, times(1)).pathParam(DeviceStatesConstants.API_COMMON.DEVICE_PATH_PARAMS); + verifyErrorResponse(results); + } + + void verifyErrorResponse(final Future results) { + verify(routingContextMock, times(1)).response(); + verify(httpServerResponseMock).setStatusCode(400); + verify(illegalArgumentExceptionMock).getMessage(); + verify(httpServerResponseMock).putHeader("Content-Type", + "application/json"); + verify(httpServerResponseMock).end(new JsonObject().put("error", errorMsg).encodePrettily()); + Assertions.assertTrue(results.failed()); + } + + void verifySuccessResponse(final Future results, final Object responseObj) { + verify(routingContextMock, times(1)).response(); + verify(httpServerResponseMock).setStatusCode(200); + verify(httpServerResponseMock).putHeader("Content-Type", + "application/json"); + verify(httpServerResponseMock).end(Json.encodePrettily(responseObj)); + Assertions.assertTrue(results.succeeded()); + } + +} diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/DeviceConfigServiceImplTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/DeviceConfigServiceImplTest.java deleted file mode 100644 index 56b767fa..00000000 --- a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/DeviceConfigServiceImplTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * *********************************************************** - * Copyright (c) 2023 Contributors to the Eclipse Foundation - *

- * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - *

- * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - *

- * SPDX-License-Identifier: EPL-2.0 - * ********************************************************** - * - */ - -package org.eclipse.hono.communication.api.service; - -import static org.mockito.Mockito.*; - -import org.eclipse.hono.communication.api.data.DeviceConfig; -import org.eclipse.hono.communication.api.data.DeviceConfigEntity; -import org.eclipse.hono.communication.api.data.DeviceConfigRequest; -import org.eclipse.hono.communication.api.data.ListDeviceConfigVersionsResponse; -import org.eclipse.hono.communication.api.mapper.DeviceConfigMapper; -import org.eclipse.hono.communication.api.repository.DeviceConfigsRepository; -import org.eclipse.hono.communication.api.repository.DeviceConfigsRepositoryImpl; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import io.vertx.core.Future; -import io.vertx.pgclient.PgPool; - - -class DeviceConfigServiceImplTest { - - private final DeviceConfigsRepository repositoryMock; - private final DatabaseService dbMock; - private final PgPool poolMock; - private final DeviceConfigMapper mapperMock; - private final DeviceConfigService deviceConfigService; - - private final String tenantId = "tenant_ID"; - private final String deviceId = "device_ID"; - - DeviceConfigServiceImplTest() { - this.repositoryMock = mock(DeviceConfigsRepositoryImpl.class); - this.dbMock = mock(DatabaseServiceImpl.class); - this.mapperMock = mock(DeviceConfigMapper.class); - this.poolMock = mock(PgPool.class); - this.deviceConfigService = new DeviceConfigServiceImpl(repositoryMock, dbMock, mapperMock); - } - - - @AfterEach - void tearDown() { - verifyNoMoreInteractions(repositoryMock, mapperMock, dbMock); - } - - @Test - void modifyCloudToDeviceConfig_success() { - final var deviceConfigRequest = new DeviceConfigRequest(); - final var deviceConfigEntity = new DeviceConfigEntity(); - final var deviceConfigEntityResponse = new DeviceConfig(); - when(mapperMock.configRequestToDeviceConfigEntity(deviceConfigRequest)).thenReturn(deviceConfigEntity); - when(mapperMock.deviceConfigEntityToConfig(deviceConfigEntity)).thenReturn(deviceConfigEntityResponse); - when(dbMock.getDbClient()).thenReturn(poolMock); - when(poolMock.withTransaction(any())).thenReturn(Future.succeededFuture(deviceConfigEntity)); - - - final var results = deviceConfigService.modifyCloudToDeviceConfig(deviceConfigRequest, deviceId, tenantId); - - verify(mapperMock, times(1)).configRequestToDeviceConfigEntity(deviceConfigRequest); - verify(mapperMock, times(1)).deviceConfigEntityToConfig(deviceConfigEntity); - verify(dbMock, times(1)).getDbClient(); - verify(poolMock, times(1)).withTransaction(any()); - Assertions.assertTrue(results.succeeded()); - } - - @Test - void modifyCloudToDeviceConfig_failure() { - final var deviceConfigRequest = new DeviceConfigRequest(); - final var deviceConfigEntity = new DeviceConfigEntity(); - when(mapperMock.configRequestToDeviceConfigEntity(deviceConfigRequest)).thenReturn(deviceConfigEntity); - when(dbMock.getDbClient()).thenReturn(poolMock); - when(poolMock.withTransaction(any())).thenReturn(Future.failedFuture(new Throwable("test_error"))); - - - final var results = deviceConfigService.modifyCloudToDeviceConfig(deviceConfigRequest, deviceId, tenantId); - - verify(mapperMock, times(1)).configRequestToDeviceConfigEntity(deviceConfigRequest); - verify(dbMock, times(1)).getDbClient(); - verify(poolMock, times(1)).withTransaction(any()); - Assertions.assertTrue(results.failed()); - } - - @Test - void listAll_success() { - final var deviceConfigVersions = new ListDeviceConfigVersionsResponse(); - when(dbMock.getDbClient()).thenReturn(poolMock); - when(poolMock.withConnection(any())).thenReturn(Future.succeededFuture(deviceConfigVersions)); - - final var results = deviceConfigService.listAll(deviceId, tenantId, 10); - - verify(dbMock, times(1)).getDbClient(); - verify(poolMock, times(1)).withConnection(any()); - Assertions.assertTrue(results.succeeded()); - - - } - - - @Test - void listAll_failed() { - when(dbMock.getDbClient()).thenReturn(poolMock); - when(poolMock.withConnection(any())).thenReturn(Future.failedFuture(new Throwable("test_error"))); - - final var results = deviceConfigService.listAll(deviceId, tenantId, 10); - - verify(dbMock, times(1)).getDbClient(); - verify(poolMock, times(1)).withConnection(any()); - Assertions.assertTrue(results.failed()); - - - } -} diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImplTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImplTest.java new file mode 100644 index 00000000..915780ae --- /dev/null +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/InternalTopicManagerImplTest.java @@ -0,0 +1,157 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.communication; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.time.Instant; +import java.util.Optional; + +import org.eclipse.hono.client.pubsub.PubSubMessageHelper; +import org.eclipse.hono.communication.api.handler.ConfigTopicEventHandler; +import org.eclipse.hono.communication.api.handler.StateTopicEventHandler; +import org.eclipse.hono.communication.api.mapper.DeviceConfigMapper; +import org.eclipse.hono.communication.api.repository.DeviceRepository; +import org.eclipse.hono.communication.core.app.InternalMessagingConfig; +import org.eclipse.hono.notification.deviceregistry.LifecycleChange; +import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; + +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +class InternalTopicManagerImplTest { + + private final DeviceRepository deviceRepositoryMock; + private final DeviceConfigMapper mapperMock; + private final InternalMessagingConfig communicationConfigMock; + private final InternalMessaging internalCommunicationMock; + private final String tenantId = "tenant_ID"; + private final PubsubMessage pubsubMessageMock; + private final Context contextMock; + private final AckReplyConsumer ackReplyConsumerMock; + private final ByteString byteStringMock; + private final ConfigTopicEventHandler configTopicEventHandler; + private final StateTopicEventHandler stateTopicEventHandler; + private final InternalTopicManagerImpl internalTopicManager; + private final Vertx vertxMock; + + InternalTopicManagerImplTest() { + this.deviceRepositoryMock = mock(DeviceRepository.class); + this.mapperMock = mock(DeviceConfigMapper.class); + this.communicationConfigMock = mock(InternalMessagingConfig.class); + this.internalCommunicationMock = mock(InternalMessaging.class); + this.pubsubMessageMock = mock(PubsubMessage.class); + this.ackReplyConsumerMock = mock(AckReplyConsumer.class); + this.contextMock = mock(Context.class); + this.byteStringMock = mock(ByteString.class); + this.configTopicEventHandler = mock(ConfigTopicEventHandler.class); + this.stateTopicEventHandler = mock(StateTopicEventHandler.class); + this.vertxMock = mock(Vertx.class); + this.internalTopicManager = new InternalTopicManagerImpl(deviceRepositoryMock, configTopicEventHandler, + stateTopicEventHandler, internalCommunicationMock, communicationConfigMock, vertxMock); + + } + + @AfterEach + void tearDown() { + verifyNoMoreInteractions(deviceRepositoryMock, + mapperMock, + communicationConfigMock, + internalCommunicationMock, + pubsubMessageMock, + ackReplyConsumerMock, + byteStringMock, + configTopicEventHandler, + stateTopicEventHandler, + contextMock, + vertxMock); + } + + @Test + public void testOnTenantChanges_ChangeIsUpdate() throws IOException { + final TenantChangeNotification notification = new TenantChangeNotification(LifecycleChange.UPDATE, tenantId, + Instant.now(), false, false); + when(pubsubMessageMock.getData()).thenReturn(byteStringMock); + when(byteStringMock.toStringUtf8()).thenReturn(new ObjectMapper().writeValueAsString(notification)); + + internalTopicManager.onTenantChanges(pubsubMessageMock, ackReplyConsumerMock); + + verify(ackReplyConsumerMock).ack(); + verify(pubsubMessageMock).getData(); + verify(byteStringMock).toStringUtf8(); + } + + @Test + public void testOnTenantChanges_tenantIdIsEmpty() throws IOException { + final TenantChangeNotification notification = new TenantChangeNotification(LifecycleChange.CREATE, "", + Instant.now(), false, false); + when(pubsubMessageMock.getData()).thenReturn(byteStringMock); + when(byteStringMock.toStringUtf8()).thenReturn(new ObjectMapper().writeValueAsString(notification)); + + internalTopicManager.onTenantChanges(pubsubMessageMock, ackReplyConsumerMock); + + verify(ackReplyConsumerMock).ack(); + verify(pubsubMessageMock).getData(); + verify(byteStringMock).toStringUtf8(); + + } + + @Test + public void testOnTenantChanges_success() throws IOException { + try (MockedStatic mockedPubSubMessageHelper = mockStatic(PubSubMessageHelper.class)) { + final var credMock = mock(FixedCredentialsProvider.class); + mockedPubSubMessageHelper.when(PubSubMessageHelper::getCredentialsProvider) + .thenReturn(Optional.of(credMock)); + + final TenantChangeNotification notification = new TenantChangeNotification(LifecycleChange.CREATE, tenantId, + Instant.now(), false, false); + when(pubsubMessageMock.getData()).thenReturn(byteStringMock); + when(byteStringMock.toStringUtf8()).thenReturn(new ObjectMapper().writeValueAsString(notification)); + when(communicationConfigMock.getEventTopicFormat()).thenReturn("%s.event"); + when(communicationConfigMock.getStateTopicFormat()).thenReturn("%s.event.state"); + when(communicationConfigMock.getCommandAckTopicFormat()).thenReturn("%s.command_response"); + + internalTopicManager.onTenantChanges(pubsubMessageMock, ackReplyConsumerMock); + + verify(ackReplyConsumerMock).ack(); + verify(pubsubMessageMock).getData(); + verify(byteStringMock).toStringUtf8(); + verify(communicationConfigMock).getEventTopicFormat(); + verify(communicationConfigMock).getStateTopicFormat(); + verify(communicationConfigMock).getCommandAckTopicFormat(); + verify(communicationConfigMock, times(2)).getProjectId(); + verify(internalCommunicationMock, times(3)).subscribe(anyString(), any()); + } + } +} diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/config/DeviceConfigServiceImplTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/config/DeviceConfigServiceImplTest.java new file mode 100644 index 00000000..0c06ee85 --- /dev/null +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/config/DeviceConfigServiceImplTest.java @@ -0,0 +1,388 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.config; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import org.eclipse.hono.communication.api.data.DeviceConfig; +import org.eclipse.hono.communication.api.data.DeviceConfigEntity; +import org.eclipse.hono.communication.api.data.DeviceConfigRequest; +import org.eclipse.hono.communication.api.mapper.DeviceConfigMapper; +import org.eclipse.hono.communication.api.repository.DeviceConfigRepository; +import org.eclipse.hono.communication.api.repository.DeviceConfigRepositoryImpl; +import org.eclipse.hono.communication.api.service.communication.InternalMessaging; +import org.eclipse.hono.communication.core.app.InternalMessagingConfig; +import org.eclipse.hono.communication.core.app.InternalMessagingConstants; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; + +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; + +class DeviceConfigServiceImplTest { + + public static final String CONFIG_BASE_64 = "dGVzdCBjb25maWcgMjIyMjIy"; + private final DeviceConfigRepository repositoryMock; + private final DeviceConfigMapper mapperMock; + private final InternalMessagingConfig communicationConfigMock; + private final InternalMessaging internalCommunicationMock; + private final String tenantId = "tenant_ID"; + private final String deviceId = "device_ID"; + private final PubsubMessage pubsubMessageMock; + private final Context contextMock; + private final AckReplyConsumer ackReplyConsumerMock; + private final ByteString byteStringMock; + private final DeviceConfigServiceImpl deviceConfigService; + private final Vertx vertxMock; + + DeviceConfigServiceImplTest() { + this.repositoryMock = mock(DeviceConfigRepositoryImpl.class); + this.mapperMock = mock(DeviceConfigMapper.class); + this.communicationConfigMock = mock(InternalMessagingConfig.class); + this.internalCommunicationMock = mock(InternalMessaging.class); + this.pubsubMessageMock = mock(PubsubMessage.class); + this.ackReplyConsumerMock = mock(AckReplyConsumer.class); + this.contextMock = mock(Context.class); + this.byteStringMock = mock(ByteString.class); + this.vertxMock = mock(Vertx.class); + this.deviceConfigService = createServiceObj(); + } + + private static Handler anyHandler() { + @SuppressWarnings("unchecked") + final Handler result = ArgumentMatchers.any(Handler.class); + return result; + } + + DeviceConfigServiceImpl createServiceObj() { + return new DeviceConfigServiceImpl(repositoryMock, + mapperMock, + communicationConfigMock, + internalCommunicationMock, + vertxMock); + } + + @AfterEach + void tearDown() { + verifyNoMoreInteractions(repositoryMock, + mapperMock, + communicationConfigMock, + internalCommunicationMock, + pubsubMessageMock, + ackReplyConsumerMock, + byteStringMock, + contextMock); + } + + @Test + void modifyCloudToDeviceConfig_success() throws Exception { + final var deviceConfigRequest = new DeviceConfigRequest(); + deviceConfigRequest.setBinaryData(CONFIG_BASE_64); + + final var deviceConfigEntity = new DeviceConfigEntity(); + + final var deviceConfigEntityResponse = new DeviceConfig(); + deviceConfigEntityResponse.setVersion("1"); + deviceConfigEntityResponse.setBinaryData(CONFIG_BASE_64); + + when(repositoryMock.createNew(any())).thenReturn(Future.succeededFuture(deviceConfigEntity)); + when(repositoryMock.updateDeviceConfigError(any())).thenReturn(Future.succeededFuture()); + when(repositoryMock.getDeviceConfig(tenantId, deviceId, 1)) + .thenReturn(Future.succeededFuture(deviceConfigEntity)); + + when(mapperMock.configRequestToDeviceConfigEntity(deviceConfigRequest)).thenReturn(deviceConfigEntity); + when(mapperMock.deviceConfigEntityToConfig(deviceConfigEntity)).thenReturn(deviceConfigEntityResponse); + when(mapperMock.configRequestToDeviceConfigEntity(any())).thenReturn(deviceConfigEntity); + + when(communicationConfigMock.getCommandTopicFormat()).thenReturn("%s.config"); + when(communicationConfigMock.getCommandTopicFormat()).thenReturn("version"); + + doAnswer(invocation -> { + final Promise result = Promise.promise(); + final Handler> handler = invocation.getArgument(0); + handler.handle(result); + return result.future(); + }).when(contextMock).executeBlocking(any()); + + when(vertxMock.getOrCreateContext()).thenReturn(contextMock); + + doAnswer(invocation -> { + final Handler task = invocation.getArgument(1); + task.handle(1L); + return 1L; + }).when(vertxMock).setTimer(anyLong(), anyHandler()); + + doNothing().when(internalCommunicationMock).publish(anyString(), any(), any()); + + final var results = deviceConfigService.modifyCloudToDeviceConfig(deviceConfigRequest, deviceId, tenantId); + + verify(repositoryMock).createNew(any()); + verify(repositoryMock).updateDeviceConfigError(any()); + verify(repositoryMock).getDeviceConfig(tenantId, deviceId, 1); + verify(repositoryMock).updateDeviceAckTime(any(), any()); + verify(mapperMock, times(1)).configRequestToDeviceConfigEntity(deviceConfigRequest); + verify(mapperMock, times(1)).deviceConfigEntityToConfig(deviceConfigEntity); + verify(communicationConfigMock).getCommandTopicFormat(); + verify(communicationConfigMock).getConfigAckDelay(); + verify(contextMock).executeBlocking(any()); + verify(internalCommunicationMock).publish(anyString(), any(), any()); + Assertions.assertTrue(results.succeeded()); + } + + @Test + void modifyCloudToDeviceConfig_failure() { + final var deviceConfigRequest = new DeviceConfigRequest(); + deviceConfigRequest.setBinaryData(CONFIG_BASE_64); + final var deviceConfigEntity = new DeviceConfigEntity(); + deviceConfigEntity.setBinaryData(CONFIG_BASE_64); + + when(repositoryMock.createNew(any())).thenReturn(Future.failedFuture(new NoSuchElementException())); + when(mapperMock.configRequestToDeviceConfigEntity(deviceConfigRequest)).thenReturn(deviceConfigEntity); + + final var results = deviceConfigService.modifyCloudToDeviceConfig(deviceConfigRequest, deviceId, tenantId); + + verify(mapperMock).configRequestToDeviceConfigEntity(any()); + verify(repositoryMock).createNew(any()); + Assertions.assertTrue(results.failed()); + } + + @Test + void modifyCloudToDeviceConfig_failure_noBase64() { + final var deviceConfigRequest = new DeviceConfigRequest(); + deviceConfigRequest.setBinaryData("test 2"); + final var deviceConfigEntity = new DeviceConfigEntity(); + deviceConfigEntity.setBinaryData(""); + + when(repositoryMock.createNew(any())).thenReturn(Future.failedFuture(new NoSuchElementException())); + when(mapperMock.configRequestToDeviceConfigEntity(deviceConfigRequest)).thenReturn(deviceConfigEntity); + + final var results = deviceConfigService.modifyCloudToDeviceConfig(deviceConfigRequest, deviceId, tenantId); + + Assertions.assertTrue(results.failed()); + } + + @Test + void listAll_success() { + when(repositoryMock.listAll(deviceId, tenantId, 10)) + .thenReturn(Future.succeededFuture(List.of(new DeviceConfig()))); + + final var results = deviceConfigService.listAll(deviceId, tenantId, 10); + + verify(repositoryMock).listAll(deviceId, tenantId, 10); + Assertions.assertTrue(results.succeeded()); + } + + @Test + void listAll_failed() { + when(repositoryMock.listAll(deviceId, tenantId, 10)).thenReturn(Future.failedFuture(new RuntimeException())); + + final var results = deviceConfigService.listAll(deviceId, tenantId, 10); + + verify(repositoryMock).listAll(deviceId, tenantId, 10); + Assertions.assertTrue(results.failed()); + } + + @Test + void updateDeviceAckTime() { + doReturn(Future.succeededFuture()).when(repositoryMock).updateDeviceAckTime(any(), anyString()); + + deviceConfigService.updateDeviceAckTime(new DeviceConfigEntity(), Instant.now().toString()); + + verify(repositoryMock).updateDeviceAckTime(any(), anyString()); + } + + @Test + void onDeviceConfigResponse_skipIfNoErrorResponse() { + when(pubsubMessageMock.getAttributesMap()) + .thenReturn(Map.of( + "deviceId", "device-123", + "tenantId", "tenant-123", + "configVersion", "12")); + + when(communicationConfigMock.getContentTypeKey()).thenReturn("content-type"); + when(communicationConfigMock.getDeliveryFailureNotificationContentType()).thenReturn("delivery-failure"); + + deviceConfigService.onDeviceConfigResponse(pubsubMessageMock, ackReplyConsumerMock); + verify(ackReplyConsumerMock).ack(); + verify(pubsubMessageMock).getAttributesMap(); + verify(communicationConfigMock).getContentTypeKey(); + verify(communicationConfigMock).getDeliveryFailureNotificationContentType(); + verify(repositoryMock, never()).updateDeviceConfigError(any()); + } + + @Test + void onDeviceConfigResponse() { + when(pubsubMessageMock.getAttributesMap()) + .thenReturn(Map.of( + InternalMessagingConstants.DEVICE_ID, "device-123", + InternalMessagingConstants.TENANT_ID, "tenant-123", + InternalMessagingConstants.STATUS, "500", + InternalMessagingConstants.CORRELATION_ID, "12")); + when(pubsubMessageMock.getData()) + .thenReturn(ByteString.copyFromUtf8("{\"error\": \"test\"}")); + + when(communicationConfigMock.getContentTypeKey()).thenReturn("content-type"); + doReturn(Future.succeededFuture()).when(repositoryMock).updateDeviceConfigError(any()); + + deviceConfigService.onDeviceConfigResponse(pubsubMessageMock, ackReplyConsumerMock); + verify(ackReplyConsumerMock).ack(); + verify(pubsubMessageMock).getAttributesMap(); + verify(pubsubMessageMock).getData(); + verify(communicationConfigMock).getContentTypeKey(); + verify(communicationConfigMock).getDeliveryFailureNotificationContentType(); + verify(repositoryMock).updateDeviceConfigError(any()); + } + + @Test + public void onDeviceConfigRequest_SkipsIfDeviceIdOrTenantIdIsEmpty() { + when(pubsubMessageMock.getAttributesMap()) + .thenReturn(Map.of( + "deviceId", "", + "tenantId", "tenant-123")); + when(pubsubMessageMock.getData()).thenReturn(ByteString.copyFromUtf8("{\"cause\": \"connected\"}")); + + when(communicationConfigMock.getContentTypeKey()).thenReturn("content-type"); + + deviceConfigService.onDeviceConfigRequest(pubsubMessageMock, ackReplyConsumerMock); + + verify(pubsubMessageMock).getAttributesMap(); + verify(ackReplyConsumerMock).ack(); + } + + @Test + public void onDeviceConfigRequest_SkipsEvent() { + when(pubsubMessageMock.getAttributesMap()) + .thenReturn(Map.of( + "deviceId", "device-123", + "tenantId", "tenant-123", + "content-type", "test", + "orig_adapter", "adapter-mqtt")); + + + when(communicationConfigMock.getContentTypeKey()).thenReturn("content-type"); + when(communicationConfigMock.getOrigAdapterKey()).thenReturn("adapter-mqtt"); + when(communicationConfigMock.getEmptyNotificationEventContentType()).thenReturn("skip-content"); + + deviceConfigService.onDeviceConfigRequest(pubsubMessageMock, ackReplyConsumerMock); + + verify(pubsubMessageMock).getAttributesMap(); + verify(communicationConfigMock).getContentTypeKey(); + verify(communicationConfigMock).getOrigAdapterKey(); + verify(communicationConfigMock, times(1)).getEmptyNotificationEventContentType(); + + verify(ackReplyConsumerMock).ack(); + } + + @Test + public void onDeviceConfigRequest_PublishesDeviceConfig() throws Exception { + final String deviceId = "device-123"; + final String tenantId = "tenant-123"; + + final String topic = "tenant-123-config"; + final byte[] message = "{}".getBytes(); + final Map messageAttributes = Map.of( + "deviceId", deviceId, + "tenantId", tenantId, + "content", "event", + "ttd", "-1"); + final var deviceConfigEntity = new DeviceConfigEntity(); + final var deviceConfigEntityResponse = new DeviceConfig(); + + when(pubsubMessageMock.getAttributesMap()).thenReturn(messageAttributes); + when(repositoryMock.getDeviceLatestConfig(tenantId, deviceId)) + .thenReturn(Future.succeededFuture(deviceConfigEntity)); + when(mapperMock.deviceConfigEntityToConfig(deviceConfigEntity)).thenReturn(deviceConfigEntityResponse); + when(communicationConfigMock.getCommandTopicFormat()).thenReturn("%s-config"); + + when(communicationConfigMock.getContentTypeKey()).thenReturn("content"); + when(communicationConfigMock.getEmptyNotificationEventContentType()).thenReturn("event"); + when(communicationConfigMock.getTtdKey()).thenReturn("ttd"); + doNothing().when(internalCommunicationMock).publish(topic, message, messageAttributes); + when(vertxMock.getOrCreateContext()).thenReturn(contextMock); + + deviceConfigService.onDeviceConfigRequest(pubsubMessageMock, ackReplyConsumerMock); + + verify(pubsubMessageMock).getAttributesMap(); + verify(repositoryMock).getDeviceLatestConfig(tenantId, deviceId); + verify(mapperMock).deviceConfigEntityToConfig(deviceConfigEntity); + verify(communicationConfigMock).getCommandTopicFormat(); + verify(communicationConfigMock).getContentTypeKey(); + verify(communicationConfigMock).getEmptyNotificationEventContentType(); + verify(communicationConfigMock).getTtdKey(); + + verify(contextMock).executeBlocking(any()); + + verify(ackReplyConsumerMock).ack(); + } + + @Test + public void onDeviceConfigRequest_PublishesDeviceConfig_failed() { + final String deviceId = "device-123"; + final String tenantId = "tenant-123"; + + final Map messageAttributes = Map.of( + "deviceId", deviceId, + "content", "event", + "tenantId", tenantId, + "ttd", "-1"); + + when(pubsubMessageMock.getAttributesMap()).thenReturn(messageAttributes); + when(repositoryMock.getDeviceLatestConfig(tenantId, deviceId)) + .thenReturn(Future.failedFuture(new RuntimeException())); + when(pubsubMessageMock.getData()).thenReturn(ByteString.copyFromUtf8("{\"cause\": \"connected\"}")); + + when(communicationConfigMock.getContentTypeKey()).thenReturn("content"); + when(communicationConfigMock.getEmptyNotificationEventContentType()).thenReturn("event"); + when(communicationConfigMock.getTtdKey()).thenReturn("ttd"); + when(communicationConfigMock.getCommandAckTopicFormat()).thenReturn("%s.ack"); + + deviceConfigService.onDeviceConfigRequest(pubsubMessageMock, ackReplyConsumerMock); + + verify(pubsubMessageMock).getAttributesMap(); + verify(repositoryMock).getDeviceLatestConfig(tenantId, deviceId); + verify(ackReplyConsumerMock).ack(); + + verify(communicationConfigMock).getContentTypeKey(); + verify(communicationConfigMock).getEmptyNotificationEventContentType(); + verify(communicationConfigMock).getTtdKey(); + } +} diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/DatabaseSchemaCreatorImplTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/database/DatabaseSchemaCreatorImplTest.java similarity index 75% rename from device-communication/src/test/java/org/eclipse/hono/communication/api/service/DatabaseSchemaCreatorImplTest.java rename to device-communication/src/test/java/org/eclipse/hono/communication/api/service/database/DatabaseSchemaCreatorImplTest.java index 24c9cb8d..56f036de 100644 --- a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/DatabaseSchemaCreatorImplTest.java +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/database/DatabaseSchemaCreatorImplTest.java @@ -14,11 +14,15 @@ * */ -package org.eclipse.hono.communication.api.service; +package org.eclipse.hono.communication.api.service.database; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -29,7 +33,6 @@ import io.vertx.core.file.FileSystem; import io.vertx.pgclient.PgPool; - class DatabaseSchemaCreatorImplTest { private final Vertx vertxMock; @@ -66,10 +69,10 @@ void createDBTables_success() { databaseSchemaCreator.createDBTables(); - verify(vertxMock).fileSystem(); - verify(fileSystemMock).readFile(anyString(), any()); - verify(dbMock).getDbClient(); - verify(pgPoolMock).withTransaction(any()); + verify(vertxMock, times(2)).fileSystem(); + verify(fileSystemMock, times(2)).readFile(anyString(), any()); + verify(dbMock, times(2)).getDbClient(); + verify(pgPoolMock, times(2)).withTransaction(any()); } @@ -83,11 +86,11 @@ void createDBTables_failed() { databaseSchemaCreator.createDBTables(); - verify(vertxMock).fileSystem(); - verify(fileSystemMock).readFile(anyString(), any()); - verify(dbMock).getDbClient(); - verify(dbMock).close(); - verify(pgPoolMock).withTransaction(any()); + verify(vertxMock, times(2)).fileSystem(); + verify(fileSystemMock, times(2)).readFile(anyString(), any()); + verify(dbMock, times(2)).getDbClient(); + verify(dbMock, times(2)).close(); + verify(pgPoolMock, times(2)).withTransaction(any()); } } diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/DatabaseServiceImplTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/database/DatabaseServiceImplTest.java similarity index 90% rename from device-communication/src/test/java/org/eclipse/hono/communication/api/service/DatabaseServiceImplTest.java rename to device-communication/src/test/java/org/eclipse/hono/communication/api/service/database/DatabaseServiceImplTest.java index 886c2259..792ff408 100644 --- a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/DatabaseServiceImplTest.java +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/database/DatabaseServiceImplTest.java @@ -14,9 +14,13 @@ * */ -package org.eclipse.hono.communication.api.service; +package org.eclipse.hono.communication.api.service.database; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import org.eclipse.hono.communication.core.app.DatabaseConfig; import org.eclipse.hono.communication.core.utils.DbUtils; @@ -28,7 +32,6 @@ import io.vertx.core.Vertx; import io.vertx.pgclient.PgPool; - class DatabaseServiceImplTest { private final Vertx vertxMock; diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/state/DeviceStateServiceImplTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/state/DeviceStateServiceImplTest.java new file mode 100644 index 00000000..169345ea --- /dev/null +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/state/DeviceStateServiceImplTest.java @@ -0,0 +1,152 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.state; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; + +import org.eclipse.hono.communication.api.data.DeviceState; +import org.eclipse.hono.communication.api.data.DeviceStateEntity; +import org.eclipse.hono.communication.api.mapper.DeviceStateMapper; +import org.eclipse.hono.communication.api.repository.DeviceStateRepository; +import org.eclipse.hono.communication.api.repository.DeviceStateRepositoryImpl; +import org.eclipse.hono.communication.api.service.communication.InternalMessaging; +import org.eclipse.hono.communication.core.app.InternalMessagingConfig; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; + +import io.vertx.core.Future; + +class DeviceStateServiceImplTest { + + private final DeviceStateRepository repositoryMock; + private final DeviceStateMapper mapperMock; + private final InternalMessagingConfig communicationConfigMock; + private final InternalMessaging internalCommunicationMock; + private final PubsubMessage pubsubMessageMock; + private final AckReplyConsumer ackReplyConsumerMock; + private final String tenantId = "tenant_ID"; + private final String deviceId = "device_ID"; + private final DeviceStateServiceImpl deviceStateService; + + DeviceStateServiceImplTest() { + this.repositoryMock = mock(DeviceStateRepositoryImpl.class); + this.mapperMock = mock(DeviceStateMapper.class); + this.communicationConfigMock = mock(InternalMessagingConfig.class); + this.internalCommunicationMock = mock(InternalMessaging.class); + this.pubsubMessageMock = mock(PubsubMessage.class); + this.ackReplyConsumerMock = mock(AckReplyConsumer.class); + deviceStateService = new DeviceStateServiceImpl(repositoryMock, mapperMock, communicationConfigMock, + internalCommunicationMock); + } + + @AfterEach + void tearDown() { + verifyNoMoreInteractions(repositoryMock, + mapperMock, + communicationConfigMock, + internalCommunicationMock, + pubsubMessageMock); + } + + @Test + void testListAll_success() { + when(repositoryMock.listAll(deviceId, tenantId, 10)) + .thenReturn(Future.succeededFuture(List.of(new DeviceState()))); + + final var results = deviceStateService.listAll(deviceId, tenantId, 10); + + verify(repositoryMock).listAll(deviceId, tenantId, 10); + Assertions.assertTrue(results.succeeded()); + + } + + @Test + void testListAll_failed() { + when(repositoryMock.listAll(deviceId, tenantId, 10)).thenReturn(Future.failedFuture(new RuntimeException())); + + final var results = deviceStateService.listAll(deviceId, tenantId, 10); + + verify(repositoryMock).listAll(deviceId, tenantId, 10); + Assertions.assertTrue(results.failed()); + + } + + @Test + public void testOnStateMessage_SkipsIfDeviceIdOrTenantIdIsEmpty() { + when(pubsubMessageMock.getAttributesMap()) + .thenReturn(Map.of( + "deviceId", "", + "tenantId", "tenant-123")); + + + deviceStateService.onStateMessage(pubsubMessageMock, ackReplyConsumerMock); + + verify(pubsubMessageMock).getAttributesMap(); + } + + @Test + public void testOnStateMessage_SkipsPayloadEmpty() { + when(pubsubMessageMock.getAttributesMap()) + .thenReturn(Map.of( + "deviceId", "device-123", + "tenantId", "tenant-123")); + when(pubsubMessageMock.getData()).thenReturn(ByteString.copyFromUtf8("")); + + + deviceStateService.onStateMessage(pubsubMessageMock, ackReplyConsumerMock); + + verify(pubsubMessageMock).getAttributesMap(); + verify(pubsubMessageMock).getData(); + verify(ackReplyConsumerMock).ack(); + } + + @Test + public void testOnStateMessage_CreatesNewStateEntryInDB() { + final String deviceId = "device-123"; + final String tenantId = "tenant-123"; + + final Map messageAttributes = Map.of( + "deviceId", deviceId, + "tenantId", tenantId); + final var deviceStateEntity = new DeviceStateEntity(); + + when(pubsubMessageMock.getAttributesMap()).thenReturn(messageAttributes); + when(pubsubMessageMock.getData()).thenReturn(ByteString.copyFromUtf8("{\"cause\": \"connected\"}")); + when(repositoryMock.createNew(deviceStateEntity)).thenReturn(Future.succeededFuture(deviceStateEntity)); + when(mapperMock.pubSubMessageToDeviceStateEntity(pubsubMessageMock)).thenReturn(deviceStateEntity); + + + deviceStateService.onStateMessage(pubsubMessageMock, ackReplyConsumerMock); + + verify(pubsubMessageMock).getAttributesMap(); + verify(pubsubMessageMock).getData(); + verify(repositoryMock).createNew(deviceStateEntity); + verify(mapperMock).pubSubMessageToDeviceStateEntity(pubsubMessageMock); + verify(ackReplyConsumerMock).ack(); + } +} diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/core/utils/StringValidateUtilsTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/core/utils/StringValidateUtilsTest.java new file mode 100644 index 00000000..e15ce066 --- /dev/null +++ b/device-communication/src/test/java/org/eclipse/hono/communication/core/utils/StringValidateUtilsTest.java @@ -0,0 +1,38 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ +package org.eclipse.hono.communication.core.utils; + +import java.util.Base64; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class StringValidateUtilsTest { + + @Test + public void isBase64_valid_true() { + final String string = "This is a test."; + final String base64 = Base64.getEncoder().encodeToString(string.getBytes()); + Assertions.assertTrue(StringValidateUtils.isBase64(base64)); + } + + @Test + public void isBase64_invalid_false() { + final String string = "This is a test."; + Assertions.assertFalse(StringValidateUtils.isBase64(string)); + } + +}