diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0bb1836 --- /dev/null +++ b/.gitignore @@ -0,0 +1,133 @@ +############################## +## Java +############################## +.mtj.tmp/ +*.class +*.jar +*.war +*.ear +*.nar +hs_err_pid* + +############################## +## Node.js +############################## +node_modules +# we use yarn +package-lock.json +yarn-error.log + +############################## +## Maven +############################## +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +pom.xml.bak +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +.mvn/wrapper/maven-wrapper.jar +.m2/repository + +############################## +## Gradle +############################## +bin/ +build/ +.gradle +.gradletasknamecache +gradle-app.setting +!gradle-wrapper.jar + +############################## +## IntelliJ +############################## +out/ +.idea/* +#!.idea/codeStyles/ +#!.idea/fileTemplates/ +#!.idea/inspectionProfiles/ +#!.idea/codeStyleSettings.xml +#!.idea/sonarSettings.xml +#!.idea/vcs.xml +#!.idea/externalDependencies.xml +#!.idea/encodings.xml +#!.idea/checkstyle-idea.xml + +## User-specific stuff +#.idea/**/workspace.xml +#.idea/**/tasks.xml +#.idea/**/usage.statistics.xml +#.idea/**/dictionaries +#.idea/**/shelf +# +## Generated files +#.idea/**/contentModel.xml +# +## Sensitive or high-churn files +#.idea/**/dataSources/ +#.idea/**/dataSources.ids +#.idea/**/dataSources.local.xml +#.idea/**/sqlDataSources.xml +#.idea/**/dynamic.xml +#.idea/**/uiDesigner.xml +#.idea/**/dbnavigator.xml +# +## Gradle +#.idea/**/gradle.xml +#.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +.idea_modules/ +*.iml +*.ipr +*.iws + +############################## +## Eclipse +############################## +.settings/ +bin/ +tmp/ +.metadata +.classpath +.project +*.tmp +*.bak +*.swp +*~.nib +local.properties +.loadpath +.factorypath + +############################## +## NetBeans +############################## +nbproject/private/ +build/ +nbbuild/ +dist/ +nbdist/ +nbactions.xml +nb-configuration.xml + +############################## +## Visual Studio Code +############################## +.vscode/ +.code-workspace + +############################## +## OS X +############################## +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..88906e0 --- /dev/null +++ b/README.md @@ -0,0 +1,17 @@ +## Beaver IoT Integrations +Integrations are the primary means for Beaver IoT to interact with third-party services, devices, platforms, etc., enabling device connectivity, device control, and feature expansion. + +## Start building your integrations + +The following is the directory structure of this project. + +``` + beaver-iot-integrations/ + ├── application-dev/ + │ ├── src/main/java/com/milesight/beaveriot/DevelopApplication.java # Start and debug your integrations from here. + │ ├── integrations/ # integration directory + │ │ └── sample-integrations/ # Sample integrations + │ │ └── ... # All integrations +``` + +If you want to develop your own integration, please create a new integration package under the `beaver-iot-integrations/integrations/` directory. For more information, please refer to [Quick Start](https://www.milesight.com/beaver-iot/docs/dev-guides/backend/build-integration) of Integration Development. diff --git a/application-dev/pom.xml b/application-dev/pom.xml new file mode 100644 index 0000000..dc60428 --- /dev/null +++ b/application-dev/pom.xml @@ -0,0 +1,126 @@ + + + + com.milesight.beaveriot + beaver-iot-integrations + 1.0-SNAPSHOT + + + 4.0.0 + + com.milesight.beaveriot + application-dev + + + 17 + 17 + UTF-8 + + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-tomcat + + + + + org.springframework.boot + spring-boot-starter-undertow + + + + + org.projectlombok + lombok + + + + com.milesight.beaveriot + data-jpa + ${project.version} + + + + + + com.h2database + h2 + runtime + + + + + com.milesight.beaveriot + context + + + + com.milesight.beaveriot + rule-engine-component + + + + com.milesight.beaveriot + eventbus-component + + + + + com.milesight.beaveriot + device-service + + + com.milesight.beaveriot + integration + + + com.milesight.beaveriot + dashboard-service + + + + + + + com.milesight.beaveriot + user-service + + + com.milesight.beaveriot + entity-service + + + + + + + + + + + + + + \ No newline at end of file diff --git a/application-dev/src/main/java/com/milesight/beaveriot/DevelopApplication.java b/application-dev/src/main/java/com/milesight/beaveriot/DevelopApplication.java new file mode 100644 index 0000000..86c6a03 --- /dev/null +++ b/application-dev/src/main/java/com/milesight/beaveriot/DevelopApplication.java @@ -0,0 +1,39 @@ +package com.milesight.beaveriot; + +import com.milesight.beaveriot.data.jpa.BaseJpaRepositoryImpl; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.data.jpa.repository.config.EnableJpaAuditing; +import org.springframework.data.jpa.repository.config.EnableJpaRepositories; +import org.springframework.scheduling.annotation.EnableAsync; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.CorsRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +/** + * @author leon + */ +@EnableJpaAuditing +@EnableJpaRepositories(repositoryBaseClass = BaseJpaRepositoryImpl.class ) +@SpringBootApplication +@EnableAsync +public class DevelopApplication { + + public static void main(String[] args) { + SpringApplication.run(DevelopApplication.class, args); + } + + @Configuration + public class WebConfig implements WebMvcConfigurer { + @Override + public void addCorsMappings(CorsRegistry registry) { + registry.addMapping("/**") + .allowCredentials(true) + .allowedOriginPatterns("*") + .allowedMethods("GET", "POST", "PUT", "DELETE") + .allowedHeaders("*") + .exposedHeaders("*"); + } + } +} diff --git a/application-dev/src/main/resources/application.yml b/application-dev/src/main/resources/application.yml new file mode 100644 index 0000000..d24a6cb --- /dev/null +++ b/application-dev/src/main/resources/application.yml @@ -0,0 +1,82 @@ + +server: + port: 9200 + +spring: + jackson: + default-property-inclusion: non_null + property-naming-strategy: SNAKE_CASE + jpa: + show-sql: true + properties: + jakarta.persistence.query.timeout: "${JAVAX_PERSISTENCE_QUERY_TIMEOUT:30000}" # General timeout for JDBC queries + hibernate.type.wrapper_array_handling: ALLOW + hibernate.naming.physical-strategy: org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy + hibernate.hbm2ddl.auto: update + hibernate.dialect: org.hibernate.dialect.H2Dialect + datasource: + url: "${SPRING_DATASOURCE_URL:jdbc:h2:file:~/beaver-iot/h2/beaver-dev;AUTO_SERVER=TRUE}" + username: "${SPRING_DATASOURCE_USERNAME:sa}" + password: "${SPRING_DATASOURCE_PASSWORD:}" + driverClassName: "${SPRING_DATASOURCE_DRIVER_CLASS_NAME:org.h2.Driver}" + + h2: + console: + enabled: ${SPRING_H2_CONSOLE_ENABLED:true} + path: /public/h2-console + +camel: + springboot: + routes-include-pattern: classpath:routes/*.yaml +websocket: + enabled: true + port: ${WEBSOCKET_PORT:9201} + context-path: ${WEBSOCKET_CONTEXT_PATH:/websocket} +oauth2: + registered-client-id: ${OAUTH2_REGISTERED_CLIENT_ID:default} + client-id: ${OAUTH2_CLIENT_ID:iab} + client-secret: ${OAUTH2_CLIENT_SECRET:milesight*iab} + rsa: + private-key: ${OAUTH2_RSA_PRIVATE_KEY:} + public-key: ${OAUTH2_RSA_PUBLIC_KEY:} + ignore-urls: + - "/oauth2/token" + - "/user/register" + - "/user/status" + - "/public/**" + +logging: + level: + org.springframework.security: debug + com.milesight.beaveriot: debug + +OAUTH2_RSA_PRIVATE_KEY: | + MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCtCpHZwvZf9hUQgYGpQoK3Y7fn + RRWb4ZrsVdpD8bf3GezPQEKmsyYSdKvXhg3vq6oLcYqjlg8SV7XmytNqjjVkOMR8BOaVMkIvujCI + pJemxbR08U7FVW4ShFyxL+mSV5PlLPqHbPbOF5MHtxxHByx4zzwmja+kQ5YokhtQd6+TVkDLUw66 + 0YUO364mEMXrT351n0M4gDHLQzq1xyCyURmU+ELCjh8PUYzFePNkWj0XbjTGeQQZpZRbuqm8XZgY + aoN37TydaBLB0z1KipB7CprDNyFnzaGUntBPvUMX/YBFVL05u7Ny5E3AyalwJpEdtRKK+lFnzCB1 + QXndx0gbuoFxAgMBAAECggEAB4jFx/L+sjfvO9w/roR4V47Oc5Hk+ngMvEySp9Gu/mHRF0cHf+Wd + +0C8OLKcCbryOPRVIoFGn7hU8bsUEHgnp/j2ySWXepiJHDXhphlwhvDlxg+5q8rn84Ny274nz9jj + Vh+qutnKNzf8jWHyJ7+OtqXjSxRxONg+gf62lL7ZVLNi42HA2VI8Qq2sJ+j4eCsQ5+lYZ9WFRd/Y + ZoD5wvua4FG7F6OXp8G65gRcrg4M8Uj5BWX7BAo3KZJMEuVR64TRgTgNkt/1kbWVYyUKNR82x5BR + Y5o5tZ+aRW+pXZv2EAHeHqTvYh4U950IfXpdMo71r/ocEPDZaw+JGN7NKfpj6QKBgQDVRIfXmqwL + XEaWwjnLgo8tmo3GtByTgdIwO0vj6V7gvWjRlq4LMqeOfzlfuDjnCSwt/4oyNKgtddx2I7GzrdDR + qx9e0XceYt2zr/9UyPaV+ZExD6vhwUvQ/gCcgzGb693xZspyLxRo+Qi6yfwxMA8LMKD4nZhJeqU1 + zsihAGUuNQKBgQDPtqTL//npZljH8MULzSNxjwNId3oVCIXJAB9Bohyf7j+arQwlUy/gx/O0pl4p + qW0yTNTa9Cz6LuYsT+wwa0B4sYabXp8M5Ie367JznjaoRGnhGZczy321+WtMK3FD2m9LLCiPtySf + Df/9/T8VP3+y2P/DXbNWKDv89+cCi6adzQKBgQCn+EbJcD/q3q2WFblxaQ4dy+m4Q6lyIECMzlcS + VX7toSDKWlQP68B+ggONOUpP5TCPtgGBU2nWDEssHSbbrp2WVaHqiy23mASnfomqnHFiY+KP8pjc + wOJW44dVvhq3hIkUlXIDhsMbfnD6tRVVJFXk6VMPHQiRbvBqNchczjPitQKBgBVn+z/S14yIMaoZ + VBVNRNPVKz9yoLA+OptmWdJHxK9uvDtSjifilNKTiLgbVSTyAwDmDiGqfGAkKTBz5CFxTCmkEf09 + GSMZAiugLLnhmjxpIIhEPOLnm8L/O2GUMI4PmtyiZmO5OhryNGmWicPE0YI6/tDBfVPpvZTqb3JB + tiAZAoGACp2pp29vghRRA617LhDoawWG1FoRYmD+pfUXi2twPmOpM8wNicEq2KblARQa93JQHyCV + b4KXFLAItUD24a6+0OwPNLQ3AEaShRM0OSOdcqfV4Dsr5SgacBGky6su/0CbMEiNE0UNKuDe+13i + 1xDAeCLhl0C0nRG6OyS8EDKRV2o= +OAUTH2_RSA_PUBLIC_KEY: | + MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArQqR2cL2X/YVEIGBqUKCt2O350UVm+Ga + 7FXaQ/G39xnsz0BCprMmEnSr14YN76uqC3GKo5YPEle15srTao41ZDjEfATmlTJCL7owiKSXpsW0 + dPFOxVVuEoRcsS/pkleT5Sz6h2z2zheTB7ccRwcseM88Jo2vpEOWKJIbUHevk1ZAy1MOutGFDt+u + JhDF609+dZ9DOIAxy0M6tccgslEZlPhCwo4fD1GMxXjzZFo9F240xnkEGaWUW7qpvF2YGGqDd+08 + nWgSwdM9SoqQewqawzchZ82hlJ7QT71DF/2ARVS9ObuzcuRNwMmpcCaRHbUSivpRZ8wgdUF53cdI + G7qBcQIDAQAB \ No newline at end of file diff --git a/integrations/msc-integration/pom.xml b/integrations/msc-integration/pom.xml new file mode 100644 index 0000000..8bc8c78 --- /dev/null +++ b/integrations/msc-integration/pom.xml @@ -0,0 +1,111 @@ + + + 4.0.0 + + com.milesight.beaveriot + integrations + 1.0-SNAPSHOT + + + msc-integration + + + 17 + 17 + UTF-8 + + + + + com.milesight.beaveriot + context + ${project.version} + provided + + + com.milesight + msc-java-sdk + 1.0.0-SNAPSHOT + + + + + com.milesight.beaveriot + application-dev + 1.0-SNAPSHOT + test + + + org.springframework.boot + spring-boot-test-autoconfigure + 3.3.4 + test + + + org.springframework.boot + spring-boot-starter-test + 3.3.4 + test + + + org.spockframework + spock-spring + + + org.junit.platform + junit-platform-commons + + + org.junit.platform + junit-platform-engine + + + org.spockframework + spock-core + + + org.codehaus.groovy + groovy + + + org.mockito + mockito-junit-jupiter + + + org.mockito + mockito-core + + + org.mockito + mockito-inline + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + org.apache.maven.plugins + maven-surefire-plugin + + + org.codehaus.gmavenplus + gmavenplus-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + org.jacoco + jacoco-maven-plugin + + + + diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/MscIntegrationBootstrap.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/MscIntegrationBootstrap.java new file mode 100644 index 0000000..bdfdeea --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/MscIntegrationBootstrap.java @@ -0,0 +1,53 @@ +package com.milesight.beaveriot.integration.msc; + +import com.milesight.beaveriot.context.integration.bootstrap.IntegrationBootstrap; +import com.milesight.beaveriot.context.integration.model.Integration; +import com.milesight.beaveriot.integration.msc.service.MscConnectionService; +import com.milesight.beaveriot.integration.msc.service.MscDataSyncService; +import com.milesight.beaveriot.integration.msc.service.MscWebhookService; +import lombok.extern.slf4j.*; +import org.apache.camel.CamelContext; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.PropertySource; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class MscIntegrationBootstrap implements IntegrationBootstrap { + + @Autowired + private MscConnectionService mscConnectionService; + + @Autowired + private MscDataSyncService mscDataFetchingService; + + @Autowired + private MscWebhookService mscWebhookService; + + + @Override + public void onPrepared(Integration integrationConfig) { + + } + + @Override + public void onStarted(Integration integrationConfig) { + log.info("MSC integration starting"); + mscConnectionService.init(); + mscDataFetchingService.init(); + mscWebhookService.init(); + log.info("MSC integration started"); + } + + @Override + public void onDestroy(Integration integrationConfig) { + log.info("MSC integration stopping"); + mscDataFetchingService.stop(); + log.info("MSC integration stopped"); + } + + @Override + public void customizeRoute(CamelContext context) throws Exception { + IntegrationBootstrap.super.customizeRoute(context); + } +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/constant/MscIntegrationConstants.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/constant/MscIntegrationConstants.java new file mode 100644 index 0000000..411553b --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/constant/MscIntegrationConstants.java @@ -0,0 +1,33 @@ +package com.milesight.beaveriot.integration.msc.constant; + +public interface MscIntegrationConstants { + + String INTEGRATION_IDENTIFIER = "msc-integration"; + + interface DeviceAdditionalDataName { + + String DEVICE_ID = "device_id"; + + } + + interface InternalPropertyIdentifier { + + interface Pattern { + String PREFIX = "_#"; + String SUFFIX = "#_"; + String TEMPLATE = "_#%s#_"; + + static boolean match(String key) { + return key.startsWith(PREFIX) && key.endsWith(SUFFIX); + } + } + + String LAST_SYNC_TIME = "_#last_sync_time#_"; + + static String getLastSyncTimeKey(String deviceKey) { + return String.format("%s.%s", deviceKey, LAST_SYNC_TIME); + } + + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/controller/MscIntegrationPublicController.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/controller/MscIntegrationPublicController.java new file mode 100644 index 0000000..cf47fba --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/controller/MscIntegrationPublicController.java @@ -0,0 +1,36 @@ +package com.milesight.beaveriot.integration.msc.controller; + +import com.milesight.beaveriot.integration.msc.model.WebhookPayload; +import com.milesight.beaveriot.integration.msc.service.MscWebhookService; +import lombok.extern.slf4j.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * + */ +@Slf4j +@RestController +@RequestMapping("/public/integration/msc") +public class MscIntegrationPublicController { + + @Autowired + private MscWebhookService mscWebhookService; + + @PostMapping("/webhook") + public String webhook(@RequestHeader(name = "x-msc-request-signature") String signature, + @RequestHeader(name = "x-msc-webhook-uuid") String webhookUuid, + @RequestHeader(name = "x-msc-request-timestamp") String requestTimestamp, + @RequestHeader(name = "x-msc-request-nonce") String requestNonce, + @RequestBody List webhookPayloads) { + mscWebhookService.handleWebhookData(signature, webhookUuid, requestTimestamp, requestNonce, webhookPayloads); + return "success"; + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/entity/MscConnectionPropertiesEntities.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/entity/MscConnectionPropertiesEntities.java new file mode 100644 index 0000000..05270dc --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/entity/MscConnectionPropertiesEntities.java @@ -0,0 +1,104 @@ +package com.milesight.beaveriot.integration.msc.entity; + +import com.milesight.beaveriot.base.utils.StringUtils; +import com.milesight.beaveriot.context.integration.entity.annotation.Attribute; +import com.milesight.beaveriot.context.integration.entity.annotation.Entities; +import com.milesight.beaveriot.context.integration.entity.annotation.Entity; +import com.milesight.beaveriot.context.integration.entity.annotation.IntegrationEntities; +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.model.ExchangePayload; +import com.milesight.beaveriot.integration.msc.constant.MscIntegrationConstants; +import com.milesight.beaveriot.integration.msc.model.IntegrationStatus; +import lombok.*; +import lombok.experimental.*; + +@FieldNameConstants +@EqualsAndHashCode(callSuper = true) +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@IntegrationEntities +public class MscConnectionPropertiesEntities extends ExchangePayload { + + public static String getKey(String propertyKey) { + return MscIntegrationConstants.INTEGRATION_IDENTIFIER + ".integration." + StringUtils.toSnakeCase(propertyKey); + } + + /** + * The status of the connection.
+ * Possible values:
+ * READY
+ * NOT_READY
+ * ERROR
+ */ + @Entity(accessMod = AccessMod.R, attributes = {@Attribute(enumClass = IntegrationStatus.class)}) + private String openapiStatus; + + @Entity(accessMod = AccessMod.R, attributes = {@Attribute(enumClass = IntegrationStatus.class)}) + private String webhookStatus; + + @Entity + private Openapi openapi; + + @Entity + private Webhook webhook; + + @Entity + private ScheduledDataFetch scheduledDataFetch; + + @FieldNameConstants + @EqualsAndHashCode(callSuper = true) + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Entities + public static class Openapi extends ExchangePayload { + + @Entity(attributes = {@Attribute(minLength = 1)}) + private String serverUrl; + + @Entity(attributes = {@Attribute(minLength = 1)}) + private String clientId; + + @Entity(attributes = {@Attribute(minLength = 1)}) + private String clientSecret; + + } + + @FieldNameConstants + @EqualsAndHashCode(callSuper = true) + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Entities + public static class Webhook extends ExchangePayload { + + @Entity + private Boolean enabled; + + @Entity(attributes = {@Attribute(minLength = 1)}) + private String secretKey; + + } + + @FieldNameConstants + @EqualsAndHashCode(callSuper = true) + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Entities + public static class ScheduledDataFetch extends ExchangePayload { + + @Entity + private Boolean enabled; + + @Entity(attributes = {@Attribute(min = 30, max = 86400)}) + private Integer period; + + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/entity/MscServiceEntities.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/entity/MscServiceEntities.java new file mode 100644 index 0000000..2aeab4a --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/entity/MscServiceEntities.java @@ -0,0 +1,59 @@ +package com.milesight.beaveriot.integration.msc.entity; + +import com.milesight.beaveriot.context.integration.entity.annotation.Attribute; +import com.milesight.beaveriot.context.integration.entity.annotation.Entities; +import com.milesight.beaveriot.context.integration.entity.annotation.Entity; +import com.milesight.beaveriot.context.integration.entity.annotation.IntegrationEntities; +import com.milesight.beaveriot.context.integration.enums.EntityType; +import com.milesight.beaveriot.context.integration.model.ExchangePayload; +import lombok.*; + +@EqualsAndHashCode(callSuper = true) +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@IntegrationEntities +public class MscServiceEntities extends ExchangePayload { + + @Entity(type = EntityType.SERVICE) + private AddDevice addDevice; + + @Entity(type = EntityType.SERVICE) + private SyncDevice syncDevice; + + @Entity(type = EntityType.SERVICE) + private DeleteDevice deleteDevice; + + @EqualsAndHashCode(callSuper = true) + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Entities + public static class AddDevice extends ExchangePayload { + + @Entity(attributes = {@Attribute(minLength = 12, maxLength = 16)}) + private String sn; + + } + + @EqualsAndHashCode(callSuper = true) + @Data + @Builder + @NoArgsConstructor + @Entities + public static class DeleteDevice extends ExchangePayload { + + } + + @EqualsAndHashCode(callSuper = true) + @Data + @Builder + @NoArgsConstructor + @Entities + public static class SyncDevice extends ExchangePayload { + + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/IntegrationStatus.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/IntegrationStatus.java new file mode 100644 index 0000000..4ef8042 --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/IntegrationStatus.java @@ -0,0 +1,29 @@ +package com.milesight.beaveriot.integration.msc.model; + +import com.milesight.beaveriot.base.enums.EnumCode; +import lombok.*; + + +@Getter +@RequiredArgsConstructor +public enum IntegrationStatus implements EnumCode { + READY, + NOT_READY, + ERROR, + ; + + @Override + public String toString() { + return name(); + } + + @Override + public String getCode() { + return name(); + } + + @Override + public String getValue() { + return name(); + } +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslEventWrapper.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslEventWrapper.java new file mode 100644 index 0000000..1c2e2d2 --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslEventWrapper.java @@ -0,0 +1,65 @@ +package com.milesight.beaveriot.integration.msc.model; + +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.enums.EntityType; +import com.milesight.beaveriot.context.integration.enums.EntityValueType; +import com.milesight.cloud.sdk.client.model.TslDataSpec; +import com.milesight.cloud.sdk.client.model.TslEventSpec; +import lombok.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + + +public record TslEventWrapper(TslEventSpec spec) implements TslItemWrapper { + + @Override + public EntityType getEntityType() { + return EntityType.EVENT; + } + + @Override + public EntityValueType getValueType() { + return EntityValueType.OBJECT; + } + + @Override + public String getParentId() { + return null; + } + + @Override + public String getId() { + return spec.getId(); + } + + @Override + public String getName() { + return spec.getName(); + } + + @Override + public TslDataSpec getDataSpec() { + return null; + } + + @Override + public AccessMod getAccessMode() { + return null; + } + + @Override + public List getOutputs() { + if (spec.getOutputs() == null) { + return List.of(); + } + return spec.getOutputs() + .stream() + .filter(spec -> spec.getDataSpec() != null) + .map(spec -> new TslParamWrapper(spec, this)) + .toList(); + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslItemWrapper.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslItemWrapper.java new file mode 100644 index 0000000..e3e565a --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslItemWrapper.java @@ -0,0 +1,30 @@ +package com.milesight.beaveriot.integration.msc.model; + + +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.enums.EntityType; +import com.milesight.beaveriot.context.integration.enums.EntityValueType; +import com.milesight.cloud.sdk.client.model.TslDataSpec; +import lombok.*; + +import java.util.List; + +public interface TslItemWrapper { + + EntityType getEntityType(); + + EntityValueType getValueType(); + + String getParentId(); + + String getId(); + + String getName(); + + TslDataSpec getDataSpec(); + + AccessMod getAccessMode(); + + List getOutputs(); + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslParamWrapper.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslParamWrapper.java new file mode 100644 index 0000000..719ea18 --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslParamWrapper.java @@ -0,0 +1,61 @@ +package com.milesight.beaveriot.integration.msc.model; + +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.enums.EntityType; +import com.milesight.beaveriot.context.integration.enums.EntityValueType; +import com.milesight.beaveriot.integration.msc.util.MscTslUtils; +import com.milesight.cloud.sdk.client.model.TslDataSpec; +import com.milesight.cloud.sdk.client.model.TslParamSpec; +import lombok.*; + +import java.util.List; + +public record TslParamWrapper(TslParamSpec spec, TslItemWrapper functionItem) implements TslItemWrapper { + + @Override + public EntityType getEntityType() { + return functionItem.getEntityType(); + } + + @Override + public EntityValueType getValueType() { + if (spec.getDataSpec() == null) { + throw new NullPointerException("Data spec is null"); + } + return MscTslUtils.convertDataTypeToEntityValueType(spec.getDataSpec().getDataType()); + } + + @Override + public String getParentId() { + if (spec.getDataSpec() != null && spec.getDataSpec().getParentId() != null) { + return spec.getDataSpec().getParentId(); + } + return functionItem().getId(); + } + + @Override + public String getId() { + return spec.getId(); + } + + @Override + public String getName() { + return spec.getName(); + } + + @Override + public TslDataSpec getDataSpec() { + return spec.getDataSpec(); + } + + @Override + public AccessMod getAccessMode() { + return null; + } + + @Override + public List getOutputs() { + return null; + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslPropertyWrapper.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslPropertyWrapper.java new file mode 100644 index 0000000..cf4d292 --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslPropertyWrapper.java @@ -0,0 +1,55 @@ +package com.milesight.beaveriot.integration.msc.model; + +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.enums.EntityType; +import com.milesight.beaveriot.context.integration.enums.EntityValueType; +import com.milesight.beaveriot.integration.msc.util.MscTslUtils; +import com.milesight.cloud.sdk.client.model.TslDataSpec; +import com.milesight.cloud.sdk.client.model.TslPropertySpec; +import lombok.*; + +import java.util.List; + +public record TslPropertyWrapper(TslPropertySpec spec) implements TslItemWrapper { + + @Override + public EntityType getEntityType() { + return EntityType.PROPERTY; + } + + @Override + public EntityValueType getValueType() { + return MscTslUtils.convertDataTypeToEntityValueType(spec.getDataSpec().getDataType()); + } + + @Override + public String getParentId() { + return spec.getDataSpec().getParentId(); + } + + @Override + public String getId() { + return spec.getId(); + } + + @Override + public String getName() { + return spec.getName(); + } + + @Override + public TslDataSpec getDataSpec() { + return spec.getDataSpec(); + } + + @Override + public AccessMod getAccessMode() { + return AccessMod.valueOf(spec.getAccessMode().name()); + } + + @Override + public List getOutputs() { + return null; + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslServiceWrapper.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslServiceWrapper.java new file mode 100644 index 0000000..024dd57 --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/TslServiceWrapper.java @@ -0,0 +1,63 @@ +package com.milesight.beaveriot.integration.msc.model; + +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.enums.EntityType; +import com.milesight.beaveriot.context.integration.enums.EntityValueType; +import com.milesight.cloud.sdk.client.model.TslDataSpec; +import com.milesight.cloud.sdk.client.model.TslServiceSpec; +import lombok.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public record TslServiceWrapper(TslServiceSpec spec) implements TslItemWrapper { + + @Override + public EntityType getEntityType() { + return EntityType.SERVICE; + } + + @Override + public EntityValueType getValueType() { + return EntityValueType.OBJECT; + } + + @Override + public String getParentId() { + return null; + } + + @Override + public String getId() { + return spec.getId(); + } + + @Override + public String getName() { + return spec.getName(); + } + + @Override + public TslDataSpec getDataSpec() { + return null; + } + + @Override + public AccessMod getAccessMode() { + return null; + } + + @Override + public List getOutputs() { + if (spec.getOutputs() == null) { + return List.of(); + } + return spec.getOutputs() + .stream() + .filter(spec -> spec.getDataSpec() != null) + .map(spec -> new TslParamWrapper(spec, this)) + .toList(); + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/WebhookPayload.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/WebhookPayload.java new file mode 100644 index 0000000..6f14df4 --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/model/WebhookPayload.java @@ -0,0 +1,71 @@ +package com.milesight.beaveriot.integration.msc.model; + +import com.fasterxml.jackson.annotation.JsonAlias; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.*; + +import javax.annotation.Nullable; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WebhookPayload { + + @JsonAlias("eventId") + private String eventId; + + @JsonAlias("eventCreatedTime") + private Long eventCreatedTime; + + @JsonAlias("eventVersion") + private String eventVersion; + + @JsonAlias("eventType") + private String eventType; + + @Nullable + private JsonNode data; + + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class DeviceData { + + @JsonAlias("deviceProfile") + private Profile deviceProfile; + + private String type; + + @JsonAlias("tslId") + private String tslId; + + @Nullable + private JsonNode payload; + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class Profile { + + @JsonAlias("deviceId") + private Long deviceId; + + private String sn; + + @JsonAlias("devEUI") + private String devEUI; + + private String name; + + @JsonAlias("communicationMethod") + private String communicationMethod; + + private String model; + } + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/package-info.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/package-info.java new file mode 100644 index 0000000..7cbde8e --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/package-info.java @@ -0,0 +1,4 @@ +/** + * @author leon + */ +package com.milesight.beaveriot.integration.msc; \ No newline at end of file diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/IMscClientProvider.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/IMscClientProvider.java new file mode 100644 index 0000000..acd3281 --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/IMscClientProvider.java @@ -0,0 +1,9 @@ +package com.milesight.beaveriot.integration.msc.service; + +import com.milesight.msc.sdk.MscClient; + +public interface IMscClientProvider { + + MscClient getMscClient(); + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscConnectionService.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscConnectionService.java new file mode 100644 index 0000000..972096a --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscConnectionService.java @@ -0,0 +1,112 @@ +package com.milesight.beaveriot.integration.msc.service; + +import com.milesight.beaveriot.context.api.EntityValueServiceProvider; +import com.milesight.beaveriot.context.api.ExchangeFlowExecutor; +import com.milesight.beaveriot.context.integration.model.ExchangePayload; +import com.milesight.beaveriot.context.integration.model.event.ExchangeEvent; +import com.milesight.beaveriot.eventbus.annotations.EventSubscribe; +import com.milesight.beaveriot.eventbus.api.Event; +import com.milesight.beaveriot.integration.msc.entity.MscConnectionPropertiesEntities; +import com.milesight.beaveriot.integration.msc.model.IntegrationStatus; +import com.milesight.msc.sdk.MscClient; +import com.milesight.msc.sdk.config.Credentials; +import lombok.*; +import lombok.extern.slf4j.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.Objects; + + +@Slf4j +@Component +public class MscConnectionService implements IMscClientProvider { + + private static final String OPENAPI_STATUS_KEY = MscConnectionPropertiesEntities.getKey(MscConnectionPropertiesEntities.Fields.openapiStatus); + + @Autowired + private EntityValueServiceProvider entityValueServiceProvider; + + @Autowired + private ExchangeFlowExecutor exchangeFlowExecutor; + + @Getter + private MscClient mscClient; + + @EventSubscribe(payloadKeyExpression = "msc-integration.integration.openapi.*", eventType = ExchangeEvent.EventType.DOWN) + public void onOpenapiPropertiesUpdate(Event event) { + if (isConfigChanged(event)) { + val openapiSettings = event.getPayload(); + initConnection(openapiSettings); + exchangeFlowExecutor.syncExchangeDown(new ExchangePayload(Map.of(OPENAPI_STATUS_KEY, IntegrationStatus.NOT_READY.name()))); + } + testConnection(); + } + + private void initConnection(MscConnectionPropertiesEntities.Openapi openapiSettings) { + mscClient = MscClient.builder() + .endpoint(openapiSettings.getServerUrl()) + .credentials(Credentials.builder() + .clientId(openapiSettings.getClientId()) + .clientSecret(openapiSettings.getClientSecret()) + .build()) + .build(); + } + + private void testConnection() { + try { + mscClient.test(); + exchangeFlowExecutor.syncExchangeDown(new ExchangePayload(Map.of(OPENAPI_STATUS_KEY, IntegrationStatus.READY.name()))); + } catch (Exception e) { + log.error("Error occurs while testing connection", e); + exchangeFlowExecutor.syncExchangeDown(new ExchangePayload(Map.of(OPENAPI_STATUS_KEY, IntegrationStatus.ERROR.name()))); + } + } + + private boolean isConfigChanged(Event event) { + // check if required fields are set + if (event.getPayload().getServerUrl() == null) { + return false; + } + if (event.getPayload().getClientId() == null) { + return false; + } + if (event.getPayload().getClientSecret() == null) { + return false; + } + // check if mscClient is initiated + if (mscClient == null) { + return true; + } + if (mscClient.getConfig() == null) { + return true; + } + if (mscClient.getConfig().getCredentials() == null) { + return true; + } + // check if endpoint, clientId or clientSecret changed + if (!Objects.equals(mscClient.getConfig().getEndpoint(), event.getPayload().getServerUrl())) { + return true; + } + if (!Objects.equals(mscClient.getConfig().getCredentials().getClientId(), event.getPayload().getClientId())) { + return true; + } + return !Objects.equals(mscClient.getConfig().getCredentials().getClientSecret(), event.getPayload().getClientSecret()); + } + + public void init() { + try { + val settings = entityValueServiceProvider.findValuesByKey( + MscConnectionPropertiesEntities.getKey(MscConnectionPropertiesEntities.Fields.openapi), MscConnectionPropertiesEntities.Openapi.class); + if (!settings.isEmpty()) { + initConnection(settings); + testConnection(); + } + } catch (Exception e) { + log.error("Error occurs while initializing connection", e); + exchangeFlowExecutor.syncExchangeDown(new ExchangePayload(Map.of(OPENAPI_STATUS_KEY, IntegrationStatus.NOT_READY.name()))); + } + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscDataSyncService.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscDataSyncService.java new file mode 100644 index 0000000..7a96eb6 --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscDataSyncService.java @@ -0,0 +1,444 @@ +package com.milesight.beaveriot.integration.msc.service; + +import com.fasterxml.jackson.databind.JsonNode; +import com.milesight.beaveriot.context.api.DeviceServiceProvider; +import com.milesight.beaveriot.context.api.EntityValueServiceProvider; +import com.milesight.beaveriot.context.api.ExchangeFlowExecutor; +import com.milesight.beaveriot.context.constants.ExchangeContextKeys; +import com.milesight.beaveriot.context.integration.model.Device; +import com.milesight.beaveriot.context.integration.model.ExchangePayload; +import com.milesight.beaveriot.context.integration.model.event.ExchangeEvent; +import com.milesight.beaveriot.eventbus.annotations.EventSubscribe; +import com.milesight.beaveriot.eventbus.api.Event; +import com.milesight.beaveriot.integration.msc.constant.MscIntegrationConstants; +import com.milesight.beaveriot.integration.msc.entity.MscConnectionPropertiesEntities; +import com.milesight.beaveriot.integration.msc.entity.MscServiceEntities; +import com.milesight.beaveriot.integration.msc.model.IntegrationStatus; +import com.milesight.beaveriot.integration.msc.util.MscTslUtils; +import com.milesight.cloud.sdk.client.model.DeviceDetailResponse; +import com.milesight.cloud.sdk.client.model.DeviceSearchRequest; +import com.milesight.msc.sdk.utils.TimeUtils; +import lombok.*; +import lombok.extern.slf4j.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Optional; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +@Slf4j +@Getter +@Service +public class MscDataSyncService { + + @Lazy + @Autowired + private IMscClientProvider mscClientProvider; + + @Lazy + @Autowired + private MscDeviceService mscDeviceService; + + @Autowired + private DeviceServiceProvider deviceServiceProvider; + + @Autowired + private EntityValueServiceProvider entityValueServiceProvider; + + @Autowired + private ExchangeFlowExecutor exchangeFlowExecutor; + + private Timer timer; + + private int periodSeconds = 0; + + // Only two existing tasks allowed at a time (one running and one waiting) + private static final ExecutorService syncAllDataExecutor = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), + (r, executor) -> { + throw new RejectedExecutionException("Another task is running."); + }); + + private static final ExecutorService concurrentSyncDeviceDataExecutor = new ThreadPoolExecutor(2, 4, + 300L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + + private static final ConcurrentHashMap deviceIdentifierToTaskLock = new ConcurrentHashMap<>(128); + + @EventSubscribe(payloadKeyExpression = "msc-integration.integration.scheduled_data_fetch.*", eventType = ExchangeEvent.EventType.DOWN) + public void onScheduledDataFetchPropertiesUpdate(Event event) { + if (event.getPayload().getPeriod() != null) { + periodSeconds = event.getPayload().getPeriod(); + } + restart(); + } + + @EventSubscribe(payloadKeyExpression = "msc-integration.integration.openapi_status", eventType = ExchangeEvent.EventType.DOWN) + public void onOpenapiStatusUpdate(Event event) { + val status = event.getPayload().getOpenapiStatus(); + if (IntegrationStatus.READY.name().equals(status)) { + try { + syncAllDataExecutor.submit(this::syncDeltaData); + } catch (RejectedExecutionException e) { + log.error("Task rejected: ", e); + } + } + } + + @SneakyThrows + @EventSubscribe(payloadKeyExpression = "msc-integration.integration.sync_device", eventType = ExchangeEvent.EventType.DOWN) + public void onSyncDevice(Event event) { + syncAllDataExecutor.submit(this::syncAllData).get(); + } + + + public void restart() { + stop(); + start(); + } + + public void stop() { + if (timer != null) { + timer.cancel(); + timer = null; + } + log.info("timer stopped"); + } + + public void init() { + start(); + } + + public void start() { + log.info("timer starting"); + if (timer != null) { + return; + } + if (periodSeconds == 0) { + val scheduledDataFetchSettings = entityValueServiceProvider.findValuesByKey( + MscConnectionPropertiesEntities.getKey(MscConnectionPropertiesEntities.Fields.scheduledDataFetch), + MscConnectionPropertiesEntities.ScheduledDataFetch.class); + if (scheduledDataFetchSettings.isEmpty()) { + periodSeconds = -1; + return; + } + if (!Boolean.TRUE.equals(scheduledDataFetchSettings.getEnabled()) + || scheduledDataFetchSettings.getPeriod() == null + || scheduledDataFetchSettings.getPeriod() == 0) { + // not enabled or invalid period + periodSeconds = -1; + } else if (scheduledDataFetchSettings.getPeriod() > 0) { + periodSeconds = scheduledDataFetchSettings.getPeriod(); + } + } + if (periodSeconds < 0) { + return; + } + timer = new Timer(); + + // setup timer + val periodMills = periodSeconds * 1000L; + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + syncAllDataExecutor.submit(() -> syncDeltaData()); + } catch (RejectedExecutionException e) { + log.error("Task rejected: ", e); + } + } + }, periodMills, periodMills); + + log.info("timer started"); + } + + /** + * Pull data from MSC, all devices and part of history data which created after the last execution will be added to local storage. + */ + void syncDeltaData() { + log.info("Fetching delta data from MSC"); + try { + syncAllDeviceData(true); + } catch (Exception e) { + log.error("Error while fetching delta data from MSC", e); + } + } + + /** + * Pull all devices and all history data. + */ + void syncAllData() { + log.info("Fetching all data from MSC"); + try { + syncAllDeviceData(false); + } catch (Exception e) { + log.error("Error while fetching all data from MSC", e); + } + } + + private Object markDeviceTaskRunning(String identifier, boolean force) { + var lock = deviceIdentifierToTaskLock.get(identifier); + if (force && lock != null) { + return lock; + } else if (lock == null) { + lock = new Object(); + val previous = deviceIdentifierToTaskLock.putIfAbsent(identifier, lock); + if (previous == null) { + return lock; + } else { + // put value failed + if (force) { + return previous; + } + return null; + } + } else { + return null; + } + } + + private void markDeviceTaskFinished(String identifier, Object lock) { + deviceIdentifierToTaskLock.remove(identifier, lock); + } + + @SneakyThrows + void syncAllDeviceData(boolean delta) { + if (mscClientProvider == null || mscClientProvider.getMscClient() == null) { + log.warn("MscClient not initiated."); + return; + } + syncDevicesFromMsc(); + syncDeviceHistoryDataFromMsc(delta); + } + + void syncDevicesFromMsc() throws IOException { + log.info("Sync devices from MSC."); + val mscClient = mscClientProvider.getMscClient(); + val allDevices = deviceServiceProvider.findAll(MscIntegrationConstants.INTEGRATION_IDENTIFIER); + log.info("Found {} devices from local.", allDevices.size()); + val existingDevices = allDevices.stream().map(Device::getIdentifier).collect(Collectors.toSet()); + long pageNumber = 1; + long pageSize = 10; + long total = 0; + long fetched = -1; + while (fetched < total) { + val response = mscClient.device().searchDetails(new DeviceSearchRequest() + .pageSize(pageSize) + .pageNumber(pageNumber)) + .execute() + .body(); + if (response == null || response.getData() == null || response.getData().getTotal() == null) { + log.warn("Response is empty: {}", response); + return; + } + val list = response.getData().getContent(); + if (list == null || list.isEmpty()) { + log.warn("Content is empty."); + return; + } + fetched += pageSize; + total = response.getData().getTotal(); + + val syncDeviceTasks = list.stream().map(details -> { + val identifier = details.getSn(); + if (identifier == null) { + return CompletableFuture.completedFuture(null); + } + var type = Task.Type.ADD_LOCAL_DEVICE; + if (existingDevices.contains(identifier)) { + existingDevices.remove(identifier); + type = Task.Type.UPDATE_LOCAL_DEVICE; + } + return syncDeviceData(new Task(type, identifier, details)); + }).toArray(CompletableFuture[]::new); + CompletableFuture.allOf(syncDeviceTasks).join(); + } + log.info("Pull devices from MSC finished, total devices: {}", total); + + val removeDevicesTasks = existingDevices.stream() + .map(identifier -> syncDeviceData(new Task(Task.Type.REMOVE_LOCAL_DEVICE, identifier, null))) + .toArray(CompletableFuture[]::new); + CompletableFuture.allOf(removeDevicesTasks).join(); + } + + void syncDeviceHistoryDataFromMsc(boolean delta) { + log.info("Sync device history data from MSC."); + val allDevices = deviceServiceProvider.findAll(MscIntegrationConstants.INTEGRATION_IDENTIFIER); + log.info("Found {} devices from local.", allDevices.size()); + allDevices.forEach(device -> { + try { + int lastSyncTime = 0; + if (delta) { + lastSyncTime = getAndUpdateLastSyncTime(device); + } + syncPropertiesHistory(device, lastSyncTime); + // events and services are not supported yet + } catch (Exception e) { + log.error("Error occurs while syncing device history data from MSC, device key: {}", device.getKey(), e); + } + }); + log.info("Sync device history data from MSC finished, total devices: {}", allDevices.size()); + } + + public CompletableFuture syncDeviceData(Task task) { + // if fetching or removing data, then return + val lock = markDeviceTaskRunning(task.identifier, task.type == Task.Type.REMOVE_LOCAL_DEVICE); + if (lock == null) { + log.info("Skip execution because device task is running: {}", task.identifier); + return CompletableFuture.completedFuture(null); + } + return CompletableFuture.supplyAsync(() -> { + try { + Device device = null; + switch (task.type) { + case REMOVE_LOCAL_DEVICE -> device = removeLocalDevice(task.identifier); + case ADD_LOCAL_DEVICE -> device = addLocalDevice(task); + case UPDATE_LOCAL_DEVICE -> device = updateLocalDevice(task); + } + + if (task.type != Task.Type.REMOVE_LOCAL_DEVICE && device == null) { + log.warn("Add or update local device failed: {}", task.identifier); + return false; + + } + return true; + } catch (Exception e) { + log.error("Error while syncing local device data.", e); + return false; + } finally { + markDeviceTaskFinished(task.identifier, lock); + } + }, concurrentSyncDeviceDataExecutor); + } + + private int getAndUpdateLastSyncTime(Device device) { + // update last sync time + val timestamp = TimeUtils.currentTimeSeconds(); + val lastSyncTimeKey = MscIntegrationConstants.InternalPropertyIdentifier.getLastSyncTimeKey(device.getKey()); + val lastSyncTime = Optional.ofNullable(entityValueServiceProvider.findValueByKey(lastSyncTimeKey)) + .map(JsonNode::intValue) + .orElse(0); + exchangeFlowExecutor.syncExchangeDown(ExchangePayload.create(lastSyncTimeKey, timestamp)); + return lastSyncTime; + } + + @SneakyThrows + private void syncPropertiesHistory(Device device, int lastSyncTime) { + // deviceId should not be null + val deviceId = (String) device.getAdditional().get(MscIntegrationConstants.DeviceAdditionalDataName.DEVICE_ID); + long time24HoursBefore = TimeUtils.currentTimeSeconds() - TimeUnit.DAYS.toSeconds(1); + long startTime = Math.max(lastSyncTime, time24HoursBefore) * 1000; + long endTime = TimeUtils.currentTimeMillis(); + long pageSize = 100; + String pageKey = null; + boolean hasNextPage = true; + val isLatestData = new AtomicBoolean(true); + while (hasNextPage) { + val page = mscClientProvider.getMscClient() + .device() + .getPropertiesHistory(deviceId, startTime, endTime, pageSize, pageKey, null) + .execute() + .body(); + if (page == null || page.getData() == null || page.getData().getList() == null) { + log.warn("Response is empty."); + break; + } + pageKey = page.getData().getNextPageKey(); + hasNextPage = pageKey != null; + page.getData().getList().forEach(item -> { + val objectMapper = mscClientProvider.getMscClient().getObjectMapper(); + val properties = objectMapper.convertValue(item.getProperties(), JsonNode.class); + saveHistoryData(device.getKey(), null, properties, item.getTs() == null ? TimeUtils.currentTimeMillis() : item.getTs(), isLatestData.get()); + if (isLatestData.get()) { + isLatestData.set(false); + } + }); + } + } + + public void saveHistoryData(String deviceKey, String eventId, JsonNode data, long timestampMs, boolean isLatestData) { + val payload = eventId == null + ? MscTslUtils.convertJsonNodeToExchangePayload(deviceKey, data) + : MscTslUtils.convertJsonNodeToExchangePayload(String.format("%s.%s", deviceKey, eventId), data, false); + if (payload == null || payload.isEmpty()) { + return; + } + payload.setTimestamp(timestampMs); + log.debug("Save device history data: {}", payload); + if (!isLatestData) { + entityValueServiceProvider.saveHistoryRecord(payload, payload.getTimestamp()); + } else { + payload.putContext(ExchangeContextKeys.IGNORE_INVALID_KEY, true); + exchangeFlowExecutor.asyncExchangeUp(payload); + } + } + + @SneakyThrows + private Device updateLocalDevice(Task task) { + log.info("Update local device: {}", task.identifier); + val details = getDeviceDetails(task); + val deviceId = details.getDeviceId(); + val thingSpec = mscDeviceService.getThingSpec(String.valueOf(deviceId)); + return mscDeviceService.updateLocalDevice(task.identifier, String.valueOf(deviceId), thingSpec); + } + + @SneakyThrows + private Device addLocalDevice(Task task) { + log.info("Add local device: {}", task.identifier); + val details = getDeviceDetails(task); + val deviceId = details.getDeviceId(); + val thingSpec = mscDeviceService.getThingSpec(String.valueOf(deviceId)); + return mscDeviceService.addLocalDevice(task.identifier, details.getName(), String.valueOf(deviceId), thingSpec); + } + + @SuppressWarnings("ConstantConditions") + private DeviceDetailResponse getDeviceDetails(Task task) + throws IOException, NullPointerException, IndexOutOfBoundsException { + + var details = task.details; + if (details == null) { + details = mscClientProvider.getMscClient().device().searchDetails(DeviceSearchRequest.builder() + .sn(task.identifier) + .pageNumber(1L) + .pageSize(1L) + .build()) + .execute() + .body() + .getData() + .getContent() + .get(0); + } + return details; + } + + private Device removeLocalDevice(String identifier) { + // delete is unsupported currently + return null; + } + + + public record Task(@Nonnull Type type, @Nonnull String identifier, @Nullable DeviceDetailResponse details) { + + public enum Type { + ADD_LOCAL_DEVICE, + UPDATE_LOCAL_DEVICE, + REMOVE_LOCAL_DEVICE, + ; + } + + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscDeviceService.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscDeviceService.java new file mode 100644 index 0000000..922e19e --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscDeviceService.java @@ -0,0 +1,227 @@ +package com.milesight.beaveriot.integration.msc.service; + +import com.milesight.beaveriot.context.constants.IntegrationConstants; +import com.milesight.beaveriot.integration.msc.constant.MscIntegrationConstants; +import com.milesight.cloud.sdk.client.model.DeviceSaveOrUpdateRequest; +import com.milesight.cloud.sdk.client.model.ThingSpec; +import com.milesight.cloud.sdk.client.model.TslPropertyDataUpdateRequest; +import com.milesight.cloud.sdk.client.model.TslServiceCallRequest; +import com.milesight.beaveriot.context.api.DeviceServiceProvider; +import com.milesight.beaveriot.context.integration.model.DeviceBuilder; +import com.milesight.beaveriot.context.integration.model.EntityBuilder; +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.enums.EntityType; +import com.milesight.beaveriot.context.integration.enums.EntityValueType; +import com.milesight.beaveriot.context.integration.model.Device; +import com.milesight.beaveriot.context.integration.model.Entity; +import com.milesight.beaveriot.context.integration.model.ExchangePayload; +import com.milesight.beaveriot.context.integration.model.event.ExchangeEvent; +import com.milesight.beaveriot.eventbus.annotations.EventSubscribe; +import com.milesight.beaveriot.eventbus.api.Event; +import com.milesight.beaveriot.integration.msc.entity.MscServiceEntities; +import com.milesight.beaveriot.integration.msc.util.MscTslUtils; +import com.milesight.msc.sdk.error.MscApiException; +import com.milesight.msc.sdk.error.MscSdkException; +import lombok.*; +import lombok.extern.slf4j.*; +import org.jetbrains.annotations.Nullable; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; + + +@Slf4j +@Service +public class MscDeviceService { + + @Lazy + @Autowired + private IMscClientProvider mscClientProvider; + + @Autowired + private DeviceServiceProvider deviceServiceProvider; + + @SneakyThrows + @EventSubscribe(payloadKeyExpression = "msc-integration.device.*", eventType = ExchangeEvent.EventType.DOWN) + public void onDeviceExchangeEvent(ExchangeEvent event) { + val exchangePayload = event.getPayload(); + val devices = exchangePayload.getExchangeEntities() + .values() + .stream() + .map(Entity::getDeviceKey) + .distinct() + .map(deviceServiceProvider::findByKey) + .filter(Objects::nonNull) + .toList(); + if (devices.size() != 1) { + log.warn("Invalid device number: {}", devices.size()); + return; + } + val device = devices.get(0); + + handlePropertiesPayload(device, exchangePayload); + handleServicePayload(device, exchangePayload); + } + + private void handleServicePayload(Device device, ExchangePayload exchangePayload) { + val objectMapper = mscClientProvider.getMscClient().getObjectMapper(); + val servicePayload = exchangePayload.getPayloadsByEntityType(EntityType.SERVICE); + if (servicePayload.isEmpty()) { + return; + } + val deviceId = (String) device.getAdditional().get(MscIntegrationConstants.DeviceAdditionalDataName.DEVICE_ID); + val serviceGroups = MscTslUtils.convertExchangePayloadToGroupedJsonNode( + objectMapper, device.getKey(), servicePayload); + serviceGroups.entrySet().removeIf(entry -> MscIntegrationConstants.InternalPropertyIdentifier.Pattern.match(entry.getKey())); + if (serviceGroups.isEmpty()) { + return; + } + serviceGroups.forEach((serviceId, serviceProperties) -> + mscClientProvider.getMscClient().device().callService(deviceId, TslServiceCallRequest.builder() + .serviceId(serviceId) + .inputs(serviceProperties) + .build())); + } + + @SneakyThrows + private void handlePropertiesPayload(Device device, ExchangePayload exchangePayload) { + val objectMapper = mscClientProvider.getMscClient().getObjectMapper(); + val propertiesPayload = exchangePayload.getPayloadsByEntityType(EntityType.PROPERTY); + if (propertiesPayload.isEmpty()) { + return; + } + val properties = MscTslUtils.convertExchangePayloadToGroupedJsonNode( + objectMapper, device.getKey(), propertiesPayload); + properties.entrySet().removeIf(entry -> MscIntegrationConstants.InternalPropertyIdentifier.Pattern.match(entry.getKey())); + if (properties.isEmpty()) { + return; + } + val deviceId = (String) device.getAdditional().get(MscIntegrationConstants.DeviceAdditionalDataName.DEVICE_ID); + mscClientProvider.getMscClient().device().updateProperties(deviceId, TslPropertyDataUpdateRequest.builder() + .properties(properties) + .build()) + .execute(); + } + + @SneakyThrows + @EventSubscribe(payloadKeyExpression = "msc-integration.integration.add_device.*", eventType = ExchangeEvent.EventType.DOWN) + public void onAddDevice(Event event) { + val deviceName = event.getPayload().getContext("device_name", "Device Name"); + if (mscClientProvider == null || mscClientProvider.getMscClient() == null) { + log.warn("MscClient not initiated."); + return; + } + val identifier = event.getPayload().getSn(); + val mscClient = mscClientProvider.getMscClient(); + val addDeviceResponse = mscClient.device().attach(DeviceSaveOrUpdateRequest.builder() + .name(deviceName) + .snDevEUI(identifier) + .autoProvision(false) + .build()) + .execute() + .body(); + if (addDeviceResponse == null || addDeviceResponse.getData() == null + || addDeviceResponse.getData().getDeviceId() == null) { + log.warn("Add device failed: '{}' '{}'", deviceName, identifier); + return; + } + + val deviceId = addDeviceResponse.getData().getDeviceId(); + log.info("Device '{}' added to MSC with id '{}'", deviceName, deviceId); + + final String deviceIdStr = String.valueOf(deviceId); + val thingSpec = getThingSpec(deviceIdStr); + + addLocalDevice(identifier, deviceName, deviceIdStr, thingSpec); + } + + public Device addLocalDevice(String identifier, String deviceName, String deviceId, ThingSpec thingSpec) { + val integrationId = MscIntegrationConstants.INTEGRATION_IDENTIFIER; + val deviceKey = IntegrationConstants.formatIntegrationDeviceKey(integrationId, identifier); + val entities = MscTslUtils.thingSpecificationToEntities(integrationId, deviceKey, thingSpec); + addAdditionalEntities(integrationId, deviceKey, entities); + + val device = new DeviceBuilder(integrationId) + .name(deviceName) + .identifier(identifier) + .additional(Map.of(MscIntegrationConstants.DeviceAdditionalDataName.DEVICE_ID, deviceId)) + .entities(entities) + .build(); + deviceServiceProvider.save(device); + return device; + } + + public Device updateLocalDevice(String identifier, String deviceId, ThingSpec thingSpec) { + val integrationId = MscIntegrationConstants.INTEGRATION_IDENTIFIER; + val deviceKey = IntegrationConstants.formatIntegrationDeviceKey(integrationId, identifier); + val entities = MscTslUtils.thingSpecificationToEntities(integrationId, deviceKey, thingSpec); + addAdditionalEntities(integrationId, deviceKey, entities); + + val device = deviceServiceProvider.findByIdentifier(identifier, integrationId); + // update device attributes except name +// device.setIdentifier(identifier); + device.setAdditional(Map.of(MscIntegrationConstants.DeviceAdditionalDataName.DEVICE_ID, deviceId)); + device.setEntities(entities); + deviceServiceProvider.save(device); + return device; + } + + @Nullable + public ThingSpec getThingSpec(String deviceId) throws IOException, MscSdkException { + val mscClient = mscClientProvider.getMscClient(); + ThingSpec thingSpec = null; + val response = mscClient.device() + .getThingSpecification(deviceId) + .execute() + .body(); + if (response != null && response.getData() != null) { + thingSpec = response.getData(); + } + return thingSpec; + } + + private static void addAdditionalEntities(String integrationId, String deviceKey, List entities) { + entities.add(new EntityBuilder(integrationId, deviceKey) + .identifier(MscIntegrationConstants.InternalPropertyIdentifier.LAST_SYNC_TIME) + .property(MscIntegrationConstants.InternalPropertyIdentifier.LAST_SYNC_TIME, AccessMod.R) + .valueType(EntityValueType.LONG) + .attributes(Map.of("internal", true)) + .build()); + } + + @SneakyThrows + @EventSubscribe(payloadKeyExpression = "msc-integration.integration.delete_device", eventType = ExchangeEvent.EventType.DOWN) + public void onDeleteDevice(Event event) { + if (mscClientProvider == null || mscClientProvider.getMscClient() == null) { + log.warn("MscClient not initiated."); + return; + } + val device = deviceServiceProvider.findByIdentifier( + ((Device) event.getPayload().getContext("device")).getIdentifier(), MscIntegrationConstants.INTEGRATION_IDENTIFIER); + val additionalData = device.getAdditional(); + if (additionalData == null) { + return; + } + val deviceId = additionalData.get(MscIntegrationConstants.DeviceAdditionalDataName.DEVICE_ID); + if (deviceId == null) { + return; + } + try { + mscClientProvider.getMscClient().device().delete(deviceId.toString()) + .execute(); + } catch (MscApiException e) { + if (!"device_not_found".equals(e.getErrorResponse().getErrCode())) { + throw e; + } else { + log.warn("Device '{}' ({}) not found in MSC", device.getIdentifier(), deviceId); + } + } + deviceServiceProvider.deleteById(device.getId()); + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscWebhookService.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscWebhookService.java new file mode 100644 index 0000000..e7ac096 --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscWebhookService.java @@ -0,0 +1,194 @@ +package com.milesight.beaveriot.integration.msc.service; + +import com.milesight.beaveriot.context.api.DeviceServiceProvider; +import com.milesight.beaveriot.context.api.EntityValueServiceProvider; +import com.milesight.beaveriot.context.api.ExchangeFlowExecutor; +import com.milesight.beaveriot.context.integration.model.ExchangePayload; +import com.milesight.beaveriot.context.integration.model.event.ExchangeEvent; +import com.milesight.beaveriot.eventbus.annotations.EventSubscribe; +import com.milesight.beaveriot.eventbus.api.Event; +import com.milesight.beaveriot.integration.msc.constant.MscIntegrationConstants; +import com.milesight.beaveriot.integration.msc.entity.MscConnectionPropertiesEntities; +import com.milesight.beaveriot.integration.msc.model.IntegrationStatus; +import com.milesight.beaveriot.integration.msc.model.WebhookPayload; +import com.milesight.msc.sdk.utils.HMacUtils; +import com.milesight.msc.sdk.utils.TimeUtils; +import lombok.*; +import lombok.extern.slf4j.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; + +import javax.crypto.Mac; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + + +@Slf4j +@Service +public class MscWebhookService { + + private static final String WEBHOOK_STATUS_KEY = MscConnectionPropertiesEntities.getKey(MscConnectionPropertiesEntities.Fields.webhookStatus); + + private static final int MAX_FAILURES = 10; + + private final AtomicInteger failureCount = new AtomicInteger(0); + + @Getter + private boolean enabled = false; + + private Mac mac; + + @Autowired + private EntityValueServiceProvider entityValueServiceProvider; + + @Autowired + private ExchangeFlowExecutor exchangeFlowExecutor; + + @Autowired + private DeviceServiceProvider deviceServiceProvider; + + @Lazy + @Autowired + private IMscClientProvider mscClientProvider; + + @Autowired + private MscDataSyncService dataSyncService; + + public void init() { + val webhookSettingsKey = MscConnectionPropertiesEntities.getKey(MscConnectionPropertiesEntities.Fields.webhook); + val webhookSettings = entityValueServiceProvider.findValuesByKey(webhookSettingsKey, MscConnectionPropertiesEntities.Webhook.class); + if (webhookSettings.isEmpty()) { + log.info("Webhook settings not found"); + return; + } + enabled = Boolean.TRUE.equals(webhookSettings.getEnabled()); + if (webhookSettings.getSecretKey() != null) { + mac = HMacUtils.getMac(webhookSettings.getSecretKey()); + } + if (!enabled) { + updateWebhookStatus(IntegrationStatus.NOT_READY); + } + } + + @EventSubscribe(payloadKeyExpression = "msc-integration.integration.webhook.*", eventType = ExchangeEvent.EventType.DOWN) + public void onWebhookPropertiesUpdate(Event event) { + enabled = Boolean.TRUE.equals(event.getPayload().getEnabled()); + if (event.getPayload().getSecretKey() != null && !event.getPayload().getSecretKey().isEmpty()) { + mac = HMacUtils.getMac(event.getPayload().getSecretKey()); + } else { + mac = null; + } + } + + public void handleWebhookData(String signature, + String webhookUuid, + String requestTimestamp, + String requestNonce, + List webhookPayloads) { + + if (log.isDebugEnabled()) { + log.debug("Received webhook data: {} {} {} {} {}", signature, webhookUuid, requestTimestamp, requestNonce, webhookPayloads); + } else { + log.debug("Received webhook data, size: {}", webhookPayloads.size()); + } + if (!enabled) { + log.debug("Webhook is disabled."); + return; + } + + val currentSeconds = TimeUtils.currentTimeSeconds(); + if (Long.parseLong(requestTimestamp) + 60 < currentSeconds) { + log.warn("Webhook request outdated: {}", requestTimestamp); + markWebhookStatusAsError(); + return; + } + + if (!isSignatureValid(signature, requestTimestamp, requestNonce)) { + log.warn("Signature invalid: {}", signature); + markWebhookStatusAsError(); + return; + } + + webhookPayloads.forEach(webhookPayload -> { + log.debug("Receive webhook payload: {}", webhookPayload); + val eventType = webhookPayload.getEventType(); + if (eventType == null) { + log.warn("Event type not found"); + return; + } + + // webhook is ready + updateWebhookStatus(IntegrationStatus.READY); + + if ("device_data".equalsIgnoreCase(eventType)) { + try { + handleDeviceData(webhookPayload); + } catch (Exception e) { + log.error("Handle webhook data failed", e); + } + } else { + log.debug("Ignored event type: {}", eventType); + } + }); + } + + /** + * mark as error when continuously failed to validate signature or timestamp + */ + private void markWebhookStatusAsError() { + val failures = failureCount.incrementAndGet(); + if (failures > MAX_FAILURES) { + updateWebhookStatus(IntegrationStatus.ERROR); + } + } + + private void updateWebhookStatus(@NonNull IntegrationStatus status) { + if (!IntegrationStatus.ERROR.equals(status)) { + // recover from error + failureCount.set(0); + } + exchangeFlowExecutor.asyncExchangeUp(ExchangePayload.create(WEBHOOK_STATUS_KEY, status.name())); + } + + private void handleDeviceData(WebhookPayload webhookPayload) { + if (webhookPayload.getData() == null) { + log.warn("Webhook data is null: {}", webhookPayload); + return; + } + val client = mscClientProvider.getMscClient(); + val deviceData = client.getObjectMapper().convertValue(webhookPayload.getData(), WebhookPayload.DeviceData.class); + if (!"PROPERTY".equalsIgnoreCase(deviceData.getType()) + && !"EVENT".equalsIgnoreCase(deviceData.getType())) { + log.debug("Not tsl property or event: {}", deviceData.getType()); + return; + } + val eventId = deviceData.getTslId(); + val data = deviceData.getPayload(); + val profile = deviceData.getDeviceProfile(); + if (data == null || profile == null) { + log.warn("Invalid data: {}", deviceData); + return; + } + + val sn = deviceData.getDeviceProfile().getSn(); + val device = deviceServiceProvider.findByIdentifier(sn, MscIntegrationConstants.INTEGRATION_IDENTIFIER); + if (device == null) { + log.warn("Device not added, try to sync data: {}", sn); + dataSyncService.syncDeviceData(new MscDataSyncService.Task(MscDataSyncService.Task.Type.ADD_LOCAL_DEVICE, sn, null)); + return; + } + + // save data + dataSyncService.saveHistoryData(device.getKey(), eventId, data, webhookPayload.getEventCreatedTime() * 1000, true); + } + + public boolean isSignatureValid(String signature, String requestTimestamp, String requestNonce) { + if (mac != null) { + val expectedSignature = HMacUtils.digestHex(mac, String.format("%s%s", requestTimestamp, requestNonce)); + return expectedSignature.equals(signature); + } + return true; + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/util/MscTslUtils.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/util/MscTslUtils.java new file mode 100644 index 0000000..6cb8593 --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/util/MscTslUtils.java @@ -0,0 +1,427 @@ +package com.milesight.beaveriot.integration.msc.util; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.milesight.beaveriot.context.integration.enums.EntityValueType; +import com.milesight.beaveriot.context.integration.model.AttributeBuilder; +import com.milesight.beaveriot.context.integration.model.Entity; +import com.milesight.beaveriot.context.integration.model.EntityBuilder; +import com.milesight.beaveriot.context.integration.model.ExchangePayload; +import com.milesight.beaveriot.integration.msc.model.TslEventWrapper; +import com.milesight.beaveriot.integration.msc.model.TslItemWrapper; +import com.milesight.beaveriot.integration.msc.model.TslParamWrapper; +import com.milesight.beaveriot.integration.msc.model.TslPropertyWrapper; +import com.milesight.beaveriot.integration.msc.model.TslServiceWrapper; +import com.milesight.cloud.sdk.client.model.ThingSpec; +import com.milesight.cloud.sdk.client.model.TslDataSpec; +import com.milesight.cloud.sdk.client.model.TslDataValidatorSpec; +import com.milesight.cloud.sdk.client.model.TslKeyValuePair; +import lombok.*; +import lombok.extern.slf4j.*; +import org.jetbrains.annotations.NotNull; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Slf4j +public class MscTslUtils { + + private MscTslUtils() { + throw new IllegalStateException("Utility class"); + } + + @Nonnull + public static List thingSpecificationToEntities(@NonNull String integrationId, @NonNull String deviceKey, @NonNull ThingSpec thingSpec) { + + val items = getTslItems(thingSpec); + val children = items.stream() + .filter(item -> item.getDataSpec() != null && item.getDataSpec().getParentId() != null) + .collect(Collectors.groupingBy(TslItemWrapper::getParentId)); + + val parents = items.stream() + .filter(item -> item.getDataSpec() == null || item.getDataSpec().getParentId() == null) + .map(item -> new TslNode(item, null, null, null)) + .toList(); + val queue = new ArrayDeque<>(parents); + val identifierToParentEntity = new HashMap(); + val parentIdentifierToChildEntities = new HashMap>(); + + while (!queue.isEmpty()) { + val node = queue.poll(); + val item = node.item; + if (item instanceof TslParamWrapper && item.getId() == null) { + // deprecated spec is not allowed + continue; + } + + String itemPath; + if (node.parentPath != null) { + if (node.arrayIndex != null) { + // Array item path is generated from index + itemPath = String.format("%s[%d]", node.parentPath, node.arrayIndex); + } else { + // Child item path is inherited from parent + itemPath = String.format("%s.%s", node.parentPath, item.getId().substring(item.getId().lastIndexOf('.') + 1)); + } + } else { + itemPath = item.getId(); + } + + String rootIdentifier; + val isArray = isArray(item.getDataSpec()); + val isRoot = node.rootIdentifier == null; + if (isArray) { + // Don't create entity for array node + if (isRoot) { + // Array cannot be a parent + rootIdentifier = null; + } else { + rootIdentifier = node.rootIdentifier; + } + } else { + val entityBuilder = new EntityBuilder(integrationId, deviceKey) + .identifier(standardizeEntityIdentifier(itemPath)) + .valueType(item.getValueType()) + .attributes(convertTslDataSpecToEntityAttributes(item.getDataSpec())); + switch (item.getEntityType()) { + case PROPERTY -> entityBuilder.property(item.getName(), item.getAccessMode()); + case EVENT -> entityBuilder.event(item.getName()); + case SERVICE -> entityBuilder.service(item.getName()); + } + val entity = entityBuilder.build(); + + if (isRoot) { + rootIdentifier = entity.getIdentifier(); + identifierToParentEntity.put(rootIdentifier, entity); + } else { + rootIdentifier = node.rootIdentifier; + parentIdentifierToChildEntities.computeIfAbsent(rootIdentifier, id -> new ArrayList<>()).add(entity); + } + } + + val maxArraySize = Optional.ofNullable(item.getDataSpec()) + .map(TslDataSpec::getValidator) + .map(TslDataValidatorSpec::getMaxSize) + .orElse(4L); + val childNodes = children.getOrDefault(item.getId(), List.of()); + if (childNodes.isEmpty()) { + continue; + } + + // Push children to queue + if (isArray) { + // Array only contains one type of child item + val childNode = childNodes.get(0); + IntStream.range(0, maxArraySize.intValue()).forEach(index -> + queue.add(new TslNode(childNode, itemPath, rootIdentifier, index))); + } else { + childNodes.forEach(childNode -> + queue.add(new TslNode(childNode, itemPath, rootIdentifier, null))); + } + } + + val entities = identifierToParentEntity.values().stream() + .sorted(Comparator.comparing(Entity::getIdentifier)) + .collect(Collectors.toList()); + entities.forEach(entity -> { + val childrenEntities = parentIdentifierToChildEntities.get(entity.getIdentifier()); + if (childrenEntities != null) { + childrenEntities.sort(Comparator.comparing(Entity::getIdentifier)); + entity.setChildren(childrenEntities); + } + entity.initializeProperties(integrationId, deviceKey); + }); + return entities; + } + + record TslNode(TslItemWrapper item, String parentPath, String rootIdentifier, Integer arrayIndex) { + + } + + private static List getTslItems(ThingSpec thingSpec) { + val items = new ArrayList(); + if (thingSpec.getProperties() != null) { + thingSpec.getProperties() + .stream() + .map(TslPropertyWrapper::new) + .forEach(items::add); + } + if (thingSpec.getEvents() != null) { + thingSpec.getEvents() + .stream() + .map(TslEventWrapper::new) + .peek(items::add) + .forEach(item -> items.addAll(getOutputParams(item))); + } + if (thingSpec.getServices() != null) { + thingSpec.getServices() + .stream() + .map(TslServiceWrapper::new) + .peek(items::add) + .forEach(item -> items.addAll(getOutputParams(item))); + } + return items; + } + + private static List getOutputParams(TslItemWrapper item) { + return item.getOutputs().stream() + .peek(v -> { + if (v.getDataSpec().getParentId() == null) { + // Bind output params to its function + v.getDataSpec().setParentId(v.getId()); + } + }).toList(); + } + + private static String standardizeEntityIdentifier(String rawIdentifier) { + if (rawIdentifier == null || rawIdentifier.indexOf('.') < 0) { + return rawIdentifier; + } + val fullIdentifier = rawIdentifier.replace('.', '@'); + return fullIdentifier.substring(fullIdentifier.indexOf('@') + 1); + } + + @Nullable + private static Map convertTslDataSpecToEntityAttributes(TslDataSpec dataSpec) { + if (dataSpec == null) { + return null; + } + val attributeBuilder = new AttributeBuilder(); + if (dataSpec.getUnitName() != null) { + attributeBuilder.unit(dataSpec.getUnitName()); + } + if (dataSpec.getValidator() != null) { + if (dataSpec.getValidator().getMax() != null) { + attributeBuilder.max(dataSpec.getValidator().getMax().doubleValue()); + } + if (dataSpec.getValidator().getMin() != null) { + attributeBuilder.min(dataSpec.getValidator().getMin().doubleValue()); + } + if (dataSpec.getValidator().getMaxSize() != null) { + attributeBuilder.maxLength(dataSpec.getValidator().getMaxSize().intValue()); + } + if (dataSpec.getValidator().getMinSize() != null) { + attributeBuilder.minLength(dataSpec.getValidator().getMinSize().intValue()); + } + } + if (dataSpec.getFractionDigits() != null) { + attributeBuilder.fractionDigits(dataSpec.getFractionDigits().intValue()); + } + if (dataSpec.getMappings() != null && !dataSpec.getMappings().isEmpty()) { + attributeBuilder.enums(dataSpec.getMappings() + .stream() + .collect(Collectors.toMap(v -> { + if (TslDataSpec.DataTypeEnum.BOOL.equals(dataSpec.getDataType())) { + return switch (v.getKey()) { + case "false", "False", "FALSE", "0" -> "false"; + case "true", "True", "TRUE", "1" -> "true"; + default -> v.getKey(); + }; + } + return v.getKey(); + }, TslKeyValuePair::getValue, (a, b) -> a))); + } + return attributeBuilder.build(); + } + + private static boolean isArray(TslDataSpec dataSpec) { + if (dataSpec == null) { + return false; + } + return TslDataSpec.DataTypeEnum.ARRAY.equals(dataSpec.getDataType()); + } + + public static EntityValueType convertDataTypeToEntityValueType(TslDataSpec.DataTypeEnum dataType) { + if (dataType == null) { + return null; + } + switch (dataType) { + case STRING, ENUM, FILE, IMAGE: + return EntityValueType.STRING; + case INT, LONG, DATE, LOCAL_TIME: + return EntityValueType.LONG; + case FLOAT, DOUBLE: + return EntityValueType.DOUBLE; + case BOOL: + return EntityValueType.BOOLEAN; + case STRUCT: + return EntityValueType.OBJECT; + case ARRAY: + return null; + default: + log.warn("Unsupported data type: {}", dataType); + return null; + } + } + + @Nullable + public static ExchangePayload convertJsonNodeToExchangePayload(String deviceKey, JsonNode jsonNode) { + return convertJsonNodeToExchangePayload(deviceKey, jsonNode, true); + } + + /** + * Convert json node to exchange payload + * + * @param previousEntityKey The entity key from parent entity + * @param jsonNode The json data + * @param isRoot If json data is root properties, it should be true, otherwise it should be false + * @return exchange payload + */ + @Nullable + public static ExchangePayload convertJsonNodeToExchangePayload(String previousEntityKey, JsonNode jsonNode, boolean isRoot) { + val result = new HashMap(); + if (jsonNode == null || jsonNode.isEmpty() || !jsonNode.isObject()) { + return null; + } + val entries = new ArrayDeque(); + int initialDepth = isRoot ? 0 : 1; + entries.push(new JsonEntry(previousEntityKey, jsonNode, initialDepth)); + while (!entries.isEmpty()) { + val parent = entries.pop(); + if (parent.value instanceof ObjectNode objectNode) { + objectNode.fields().forEachRemaining(entry -> { + val fieldName = entry.getKey(); + val value = entry.getValue(); + val parentEntityKey = parent.parentEntityKey; + val entityKeyTemplate = parent.depth < 2 ? "%s.%s" : "%s@%s"; + val entityKey = String.format(entityKeyTemplate, parentEntityKey, fieldName); + if (value == null || value.isNull()) { + log.debug("Null value is ignored: {}", entityKey); + } else if (value.isContainerNode()) { + entries.push(new JsonEntry(entityKey, value, parent.depth + 1)); + } else { + result.put(entityKey, value); + } + }); + } else if (parent.value instanceof ArrayNode arrayNode) { + int i = 0; + for (JsonNode value : arrayNode) { + val entityKey = String.format("%s[%d]", parent.parentEntityKey, i); + if (value == null || value.isNull()) { + log.debug("Null value is ignored: {}", entityKey); + } else if (value.isContainerNode()) { + // Only object count level + entries.push(new JsonEntry(entityKey, value, parent.depth)); + } else { + result.put(entityKey, value); + } + i++; + } + } + } + if (result.isEmpty()) { + return null; + } + return ExchangePayload.create(result); + } + + private record JsonEntry(String parentEntityKey, JsonNode value, int depth) { + } + + public static Map convertExchangePayloadToGroupedJsonNode(@NotNull ObjectMapper objectMapper, @NotNull String entityKeyPublicPrefix, @NotNull Map keyValues) { + Objects.requireNonNull(objectMapper); + Objects.requireNonNull(entityKeyPublicPrefix); + Objects.requireNonNull(keyValues); + + val result = new HashMap(); + keyValues.forEach((key, value) -> { + if (!key.startsWith(entityKeyPublicPrefix) || key.equals(entityKeyPublicPrefix)) { + log.debug("Ignored invalid key: {}, prefix is {}", key, entityKeyPublicPrefix); + return; + } + val path = key.substring(entityKeyPublicPrefix.length() + 1); + if (path.length() == 0) { + log.debug("Ignored invalid key: {}", key); + return; + } + if (value == null) { + log.debug("Null value is ignored: {}", key); + return; + } + ensureParentAndSetValue(objectMapper, result, path, value); + }); + return result; + } + + private static void ensureParentAndSetValue(@NotNull ObjectMapper objectMapper, HashMap result, String path, Object value) { + val paths = path.split("[.@\\[]"); + if (paths.length == 0) { + return; + } + val jsonValue = objectMapper.convertValue(value, JsonNode.class); + if (paths.length == 1) { + result.computeIfAbsent(paths[0], k -> jsonValue); + return; + } + + var parent = result.computeIfAbsent(paths[0], k -> createContainerNodeByKeyType(objectMapper, paths[1])); + val lastIndex = paths.length - 1; + for (int i = 1; i < lastIndex; i++) { + var key = paths[i]; + var nextKey = paths[i + 1]; + parent = setValueIfNotExists(parent, key, () -> createContainerNodeByKeyType(objectMapper, nextKey)); + if (parent == null) { + return; + } + } + + val key = paths[lastIndex]; + setValueIfNotExists(parent, key, () -> jsonValue); + } + + private static JsonNode setValueIfNotExists(JsonNode parent, String key, Supplier valueGetter) { + JsonNode existingValue; + if (isArrayIndexKey(key) && parent instanceof ArrayNode arrayParent) { + val arrayIndex = Integer.parseInt(key.substring(0, key.length() - 1)); + if (arrayParent.size() <= arrayIndex) { + fillUpArrayWithNull(arrayParent, arrayIndex + 1); + } + existingValue = parent.get(arrayIndex); + if (existingValue == null || existingValue.isNull()) { + val jsonValue = valueGetter.get(); + arrayParent.set(arrayIndex, jsonValue); + existingValue = jsonValue; + } + } else if (parent instanceof ObjectNode objectParent) { + existingValue = parent.get(key); + if (existingValue == null) { + val jsonValue = valueGetter.get(); + objectParent.set(key, jsonValue); + existingValue = jsonValue; + } + } else { + // Parent is not a container or key is invalid + log.warn("Invalid path: {}", key); + return null; + } + return existingValue; + } + + private static boolean isArrayIndexKey(String key) { + return key.endsWith("]"); + } + + private static JsonNode createContainerNodeByKeyType(@NotNull ObjectMapper objectMapper, String key) { + return isArrayIndexKey(key) ? objectMapper.createArrayNode() : objectMapper.createObjectNode(); + } + + private static void fillUpArrayWithNull(ArrayNode array, Integer targetSize) { + while (targetSize > array.size()) { + array.addNull(); + } + } + +} diff --git a/integrations/msc-integration/src/main/java/com/milesight/beaveriot/package-info.java b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/package-info.java new file mode 100644 index 0000000..cf156c3 --- /dev/null +++ b/integrations/msc-integration/src/main/java/com/milesight/beaveriot/package-info.java @@ -0,0 +1,4 @@ +/** + * @author leon + */ +package com.milesight.beaveriot; \ No newline at end of file diff --git a/integrations/msc-integration/src/main/resources/integration.yaml b/integrations/msc-integration/src/main/resources/integration.yaml new file mode 100644 index 0000000..b203049 --- /dev/null +++ b/integrations/msc-integration/src/main/resources/integration.yaml @@ -0,0 +1,9 @@ + +integration: + msc-integration: + name: "Milesight Development Platform" + description: "Milesight Development Platform" + icon-url: "public/icon.svg" + enabled: true + entity-identifier-add-device: add_device + entity-identifier-delete-device: delete_device diff --git a/integrations/msc-integration/src/main/resources/static/public/icon.svg b/integrations/msc-integration/src/main/resources/static/public/icon.svg new file mode 100644 index 0000000..5bba7c6 --- /dev/null +++ b/integrations/msc-integration/src/main/resources/static/public/icon.svg @@ -0,0 +1,6 @@ + + + + + + diff --git a/integrations/msc-integration/src/test/groovy/com/milesight/beaveriot/integration/msc/service/MscDataSyncServiceTest.groovy b/integrations/msc-integration/src/test/groovy/com/milesight/beaveriot/integration/msc/service/MscDataSyncServiceTest.groovy new file mode 100644 index 0000000..2f77d69 --- /dev/null +++ b/integrations/msc-integration/src/test/groovy/com/milesight/beaveriot/integration/msc/service/MscDataSyncServiceTest.groovy @@ -0,0 +1,43 @@ +package com.milesight.beaveriot.integration.msc.service + +import com.milesight.beaveriot.DevelopApplication +import com.milesight.msc.sdk.MscClient +import com.milesight.msc.sdk.config.Credentials +import org.spockframework.spring.SpringBean +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.ActiveProfiles +import spock.lang.Ignore +import spock.lang.Specification + +@ActiveProfiles("test") +@SpringBootTest(classes = [DevelopApplication]) +class MscDataSyncServiceTest extends Specification { + + @Autowired + MscDataSyncService mscDataSyncService + + @SpringBean + MscConnectionService mscConnectionService = Mock() + + def setup() { + mscConnectionService.getMscClient() >> MscClient.builder() + .endpoint(System.getenv("MSC_ENDPOINT")) + .credentials(Credentials.builder() + .clientId(System.getenv("MSC_CLIENT_ID")) + .clientSecret(System.getenv("MSC_CLIENT_SECRET")) + .build()) + .build() + } + + + @Ignore + def "when sync all device data then should not throw any exception"() { + given: + mscDataSyncService.syncAllDeviceData(false) + + expect: + true + } + +} diff --git a/integrations/msc-integration/src/test/groovy/com/milesight/beaveriot/integration/msc/util/MscTslUtilsTest.groovy b/integrations/msc-integration/src/test/groovy/com/milesight/beaveriot/integration/msc/util/MscTslUtilsTest.groovy new file mode 100644 index 0000000..ad44843 --- /dev/null +++ b/integrations/msc-integration/src/test/groovy/com/milesight/beaveriot/integration/msc/util/MscTslUtilsTest.groovy @@ -0,0 +1,236 @@ +package com.milesight.beaveriot.integration.msc.util + +import com.fasterxml.jackson.databind.node.ArrayNode +import com.fasterxml.jackson.databind.node.ObjectNode +import com.fasterxml.jackson.databind.node.TextNode +import com.milesight.beaveriot.base.utils.JsonUtils +import com.milesight.beaveriot.context.integration.enums.EntityType +import com.milesight.beaveriot.context.integration.enums.EntityValueType +import com.milesight.cloud.sdk.client.model.* +import spock.lang.Specification + +class MscTslUtilsTest extends Specification { + + + def "given thing spec when calling thingSpecificationToEntities then should return property entities"() { + given: + def properties = [ + new TslPropertySpec() + .id("data") + .name("Data") + .accessMode(TslPropertySpec.AccessModeEnum.RW) + .dataSpec(new TslDataSpec() + .dataType(TslDataSpec.DataTypeEnum.STRUCT)), + new TslPropertySpec() + .id("data.long_value") + .name("Long Value") + .accessMode(TslPropertySpec.AccessModeEnum.RW) + .dataSpec(new TslDataSpec() + .parentId("data") + .dataType(TslDataSpec.DataTypeEnum.LONG) + .fractionDigits(2) + .validator(new TslDataValidatorSpec() + .min(BigDecimal.valueOf(9)) + .max(BigDecimal.valueOf(23)))), + new TslPropertySpec() + .id("data.enum_value") + .name("Enum Value") + .accessMode(TslPropertySpec.AccessModeEnum.W) + .dataSpec(new TslDataSpec() + .parentId("data") + .dataType(TslDataSpec.DataTypeEnum.ENUM) + .mappings([ + new TslKeyValuePair().key("a").value("1"), + new TslKeyValuePair().key("b").value("2"), + ])), + new TslPropertySpec() + .id("data.struct_value") + .name("Struct Value") + .accessMode(TslPropertySpec.AccessModeEnum.R) + .dataSpec(new TslDataSpec() + .parentId("data") + .dataType(TslDataSpec.DataTypeEnum.STRUCT)), + new TslPropertySpec() + .id("data.struct_value.string_value") + .name("String Value") + .accessMode(TslPropertySpec.AccessModeEnum.R) + .dataSpec(new TslDataSpec() + .parentId("data.struct_value") + .dataType(TslDataSpec.DataTypeEnum.STRING) + .validator(new TslDataValidatorSpec() + .minSize(5) + .maxSize(15))), + new TslPropertySpec() + .id("data.array_value") + .name("Data Array Value") + .accessMode(TslPropertySpec.AccessModeEnum.R) + .dataSpec(new TslDataSpec() + .parentId("data") + .dataType(TslDataSpec.DataTypeEnum.ARRAY) + .elementDataType(TslDataSpec.ElementDataTypeEnum.STRUCT) + .validator(new TslDataValidatorSpec() + .minSize(1) + .maxSize(2))), + new TslPropertySpec() + .id("data.array_value._item") + .name("Data Array Element") + .accessMode(TslPropertySpec.AccessModeEnum.R) + .dataSpec(new TslDataSpec() + .parentId("data.array_value") + .dataType(TslDataSpec.DataTypeEnum.STRUCT)), + new TslPropertySpec() + .id("data.array_value._item.int_value") + .name("Data Array Element Value") + .accessMode(TslPropertySpec.AccessModeEnum.R) + .dataSpec(new TslDataSpec() + .parentId("data.array_value._item") + .dataType(TslDataSpec.DataTypeEnum.INT)), + new TslPropertySpec() + .id("array_value") + .name("Array Value") + .accessMode(TslPropertySpec.AccessModeEnum.R) + .dataSpec(new TslDataSpec() + .dataType(TslDataSpec.DataTypeEnum.ARRAY) + .elementDataType(TslDataSpec.ElementDataTypeEnum.STRUCT)), + new TslPropertySpec() + .id("array_value._item") + .name("Array Element") + .accessMode(TslPropertySpec.AccessModeEnum.R) + .dataSpec(new TslDataSpec() + .parentId("array_value") + .dataType(TslDataSpec.DataTypeEnum.STRUCT)), + new TslPropertySpec() + .id("array_value._item.int_value1") + .name("Array Element Value 1") + .accessMode(TslPropertySpec.AccessModeEnum.R) + .dataSpec(new TslDataSpec() + .parentId("array_value._item") + .dataType(TslDataSpec.DataTypeEnum.INT)), + new TslPropertySpec() + .id("array_value._item.int_value2") + .name("Array Element Value 2") + .accessMode(TslPropertySpec.AccessModeEnum.R) + .dataSpec(new TslDataSpec() + .parentId("array_value._item") + .dataType(TslDataSpec.DataTypeEnum.INT)), + ] + def thingSpec = new ThingSpec() + .properties(properties) + + when: + def result = MscTslUtils.thingSpecificationToEntities("", "", thingSpec) + + then: + result.size() == 5 + def dataEntity = result.get(4) + dataEntity.identifier == "data" + dataEntity.name == "Data" + dataEntity.type == EntityType.PROPERTY + dataEntity.valueType == EntityValueType.OBJECT + + def children = dataEntity.children + children.size() == 8 + + def arrayValueFirstElementEntity = children.get(0) + arrayValueFirstElementEntity.identifier == "array_value[0]" + arrayValueFirstElementEntity.type == EntityType.PROPERTY + arrayValueFirstElementEntity.valueType == EntityValueType.OBJECT + arrayValueFirstElementEntity.children.size() == 0 + + def arrayValueSecondElementIntValueEntity = children.get(3) + arrayValueSecondElementIntValueEntity.identifier == "array_value[1]@int_value" + arrayValueSecondElementIntValueEntity.type == EntityType.PROPERTY + arrayValueSecondElementIntValueEntity.valueType == EntityValueType.LONG + + def enumValueEntity = children.get(4) + enumValueEntity.identifier == "enum_value" + enumValueEntity.type == EntityType.PROPERTY + enumValueEntity.valueType == EntityValueType.STRING + enumValueEntity.attributes["enum"]["a"] == "1" + enumValueEntity.attributes["enum"]["b"] == "2" + + def longValueEntity = children.get(5) + longValueEntity.identifier == "long_value" + longValueEntity.type == EntityType.PROPERTY + longValueEntity.valueType == EntityValueType.LONG + longValueEntity.attributes["fraction_digits"] == 2 + longValueEntity.attributes["min"] == 9 + longValueEntity.attributes["max"] == 23 + + def structValueEntity = children.get(6) + structValueEntity.identifier == "struct_value" + structValueEntity.type == EntityType.PROPERTY + structValueEntity.valueType == EntityValueType.OBJECT + structValueEntity.children.size() == 0 + structValueEntity.attributes.isEmpty() + + def stringValueEntity = children.get(7) + stringValueEntity.identifier == "struct_value@string_value" + stringValueEntity.type == EntityType.PROPERTY + stringValueEntity.valueType == EntityValueType.STRING + stringValueEntity.attributes["min_length"] == 5 + stringValueEntity.attributes["max_length"] == 15 + + def arrayEntity = result.get(0) + arrayEntity.identifier == "array_value[0]" + arrayEntity.name == "Array Element" + arrayEntity.type == EntityType.PROPERTY + arrayEntity.valueType == EntityValueType.OBJECT + + def arrayChildren = arrayEntity.children + arrayChildren.size() == 2 + } + + def "given ObjectNode embed ArrayNode when convertJsonNodeToExchangePayload then get valid ExchangePayload"() { + given: + def jsonNode = JsonUtils.toJsonNode([ + "string" : "value", + "struct": ["struct": ["string": "value"]], + "array" : [1, [2], ["int": 3], [null, ["struct": ["string": "value"]]]], + ]) + + when: + def result = MscTslUtils.convertJsonNodeToExchangePayload("prefix", jsonNode) + + then: + result["prefix.string"].textValue() == "value" + result["prefix.struct.struct@string"].textValue() == "value" + result["prefix.array[0]"].intValue() == 1 + result["prefix.array[1][0]"].intValue() == 2 + result["prefix.array[2].int"].intValue() == 3 + result["prefix.array[3][1].struct@string"].textValue() == "value" + } + + def "given ExchangePayload when convertExchangePayloadToGroupedJsonNode then get grouped JsonNode"() { + given: + def exchangePayload = [:] as Map + exchangePayload["prefix.string"] = "value" + exchangePayload["prefix.struct.struct@string"] = "value" + exchangePayload["prefix.array[0]"] = 1 + exchangePayload["prefix.array[1][0]"] = 2 + exchangePayload["prefix.array[2].int"] = 3 + exchangePayload["prefix.array[3][1].struct@string"] = "value" + + when: + def result = MscTslUtils.convertExchangePayloadToGroupedJsonNode(JsonUtils.objectMapper, "prefix", exchangePayload) + + then: + result["string"] instanceof TextNode + result["string"].textValue() == "value" + result["struct"] instanceof ObjectNode + result["struct"]["struct"]["string"].textValue() == "value" + result["array"] instanceof ArrayNode + result["array"].size() == 4 + result["array"][0].intValue() == 1 + result["array"][1] instanceof ArrayNode + result["array"][1].size() == 1 + result["array"][1][0].intValue() == 2 + result["array"][2] instanceof ObjectNode + result["array"][2].get("int").intValue() == 3 + result["array"][3] instanceof ArrayNode + result["array"][3].size() == 2 + result["array"][3][1] instanceof ObjectNode + result["array"][3][1]["struct"]["string"].textValue() == "value" + } + +} diff --git a/integrations/msc-integration/src/test/resources/application-test.yaml b/integrations/msc-integration/src/test/resources/application-test.yaml new file mode 100644 index 0000000..8e4076b --- /dev/null +++ b/integrations/msc-integration/src/test/resources/application-test.yaml @@ -0,0 +1,6 @@ +SPRING_DATASOURCE_URL: jdbc:h2:mem:beaver-dev +SPRING_H2_CONSOLE_ENABLED: false + +spring: + main: + allow-bean-definition-overriding: true diff --git a/integrations/ping/pom.xml b/integrations/ping/pom.xml new file mode 100644 index 0000000..409a577 --- /dev/null +++ b/integrations/ping/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + com.milesight.beaveriot + integrations + 1.0-SNAPSHOT + + + ping + + + 17 + 17 + UTF-8 + + + + + com.milesight.beaveriot + context + ${project.version} + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + diff --git a/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingBootstrap.java b/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingBootstrap.java new file mode 100644 index 0000000..289da44 --- /dev/null +++ b/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingBootstrap.java @@ -0,0 +1,23 @@ +package com.milesight.beaveriot.ping; + +import com.milesight.beaveriot.context.integration.bootstrap.IntegrationBootstrap; +import com.milesight.beaveriot.context.integration.model.Integration; +import org.springframework.stereotype.Component; + +@Component +public class PingBootstrap implements IntegrationBootstrap { + @Override + public void onPrepared(Integration integration) { + // do nothing + } + + @Override + public void onStarted(Integration integrationConfig) { + // do nothing + } + + @Override + public void onDestroy(Integration integration) { + // do nothing + } +} diff --git a/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingConstants.java b/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingConstants.java new file mode 100644 index 0000000..6ed3860 --- /dev/null +++ b/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingConstants.java @@ -0,0 +1,11 @@ +package com.milesight.beaveriot.ping; + +public class PingConstants { + private PingConstants() {} + + public static final String INTEGRATION_ID = "ping"; + + public enum DeviceStatus { + ONLINE, OFFLINE; + } +} diff --git a/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingIntegrationEntities.java b/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingIntegrationEntities.java new file mode 100644 index 0000000..3acdd5d --- /dev/null +++ b/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingIntegrationEntities.java @@ -0,0 +1,40 @@ +package com.milesight.beaveriot.ping; + +import com.milesight.beaveriot.context.integration.entity.annotation.Attribute; +import com.milesight.beaveriot.context.integration.entity.annotation.Entities; +import com.milesight.beaveriot.context.integration.entity.annotation.Entity; +import com.milesight.beaveriot.context.integration.entity.annotation.IntegrationEntities; +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.enums.EntityType; +import com.milesight.beaveriot.context.integration.model.ExchangePayload; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +@IntegrationEntities +public class PingIntegrationEntities extends ExchangePayload { + @Entity(type = EntityType.SERVICE, name = "Device Connection Benchmark", identifier = "benchmark") + private String benchmark; + + @Entity(type = EntityType.PROPERTY, name = "Detect Status", identifier = "detect_status", attributes = @Attribute(enumClass = DetectStatus.class), accessMod = AccessMod.R) + private Long detectStatus; + + @Entity(type = EntityType.SERVICE, identifier = "add_device") + private AddDevice addDevice; + + @Entity(type = EntityType.SERVICE, identifier = "delete_device") + private String deleteDevice; + + @Data + @EqualsAndHashCode(callSuper = true) + @Entities + public static class AddDevice extends ExchangePayload { + @Entity + private String ip; + } + + public enum DetectStatus { + STANDBY, DETECTING; + } +} diff --git a/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingService.java b/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingService.java new file mode 100644 index 0000000..c86463f --- /dev/null +++ b/integrations/ping/src/main/java/com/milesight/beaveriot/ping/PingService.java @@ -0,0 +1,141 @@ +package com.milesight.beaveriot.ping; + +import com.fasterxml.jackson.databind.JsonNode; +import com.milesight.beaveriot.context.api.DeviceServiceProvider; +import com.milesight.beaveriot.context.api.EntityValueServiceProvider; +import com.milesight.beaveriot.context.api.ExchangeFlowExecutor; +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.enums.EntityValueType; +import com.milesight.beaveriot.context.integration.model.*; +import com.milesight.beaveriot.context.integration.model.event.ExchangeEvent; +import com.milesight.beaveriot.eventbus.annotations.EventSubscribe; +import com.milesight.beaveriot.eventbus.api.Event; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; +import org.springframework.util.Assert; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +@Service +@Slf4j +public class PingService { + @Autowired + private DeviceServiceProvider deviceServiceProvider; + + @Autowired + private ExchangeFlowExecutor exchangeFlowExecutor; + + @Autowired + private EntityValueServiceProvider entityValueServiceProvider; + + @EventSubscribe(payloadKeyExpression = PingConstants.INTEGRATION_ID + ".integration.add_device.*", eventType = ExchangeEvent.EventType.DOWN) + public void onAddDevice(Event event) { + String deviceName = event.getPayload().getContext("device_name", "Device Name"); + String ip = event.getPayload().getIp(); + + + Entity statusEntity = new EntityBuilder(PingConstants.INTEGRATION_ID) + .identifier("status") + .property("Device Status", AccessMod.R) + .valueType(EntityValueType.LONG) + .attributes(new AttributeBuilder().enums(PingConstants.DeviceStatus.class).build()) + .build(); + Entity delayEntity = new EntityBuilder(PingConstants.INTEGRATION_ID) + .identifier("delay") + .property("Network Delay", AccessMod.R) + .valueType(EntityValueType.LONG) + .attributes(new AttributeBuilder().unit("ms").build()) + .build(); + + Device device = new DeviceBuilder(PingConstants.INTEGRATION_ID) + .name(deviceName) + .identifier(ip.replace(".", "_")) + .additional(Map.of("ip", ip)) + .entities(List.of(statusEntity, delayEntity)) + .build(); + + deviceServiceProvider.save(device); + } + + @EventSubscribe(payloadKeyExpression = PingConstants.INTEGRATION_ID + ".integration.delete_device", eventType = ExchangeEvent.EventType.DOWN) + public void onDeleteDevice(Event event) { + Device device = (Device) event.getPayload().getContext("device"); + deviceServiceProvider.deleteById(device.getId()); + } + + @EventSubscribe(payloadKeyExpression = PingConstants.INTEGRATION_ID + ".integration.benchmark", eventType = ExchangeEvent.EventType.DOWN) + @Async + public void benchmark(Event event) { + // mark benchmark starting + String detectStatusKey = PingConstants.INTEGRATION_ID + ".integration.detect_status"; + JsonNode detectStatusNode = entityValueServiceProvider.findValueByKey(detectStatusKey); + if (detectStatusNode.isNull() || detectStatusNode.asLong() == PingIntegrationEntities.DetectStatus.DETECTING.ordinal()) { + log.warn("[WARNING] Benchmark running"); + return; + } + + try { + doBenchmark(detectStatusKey); + } catch (Exception e) { + log.error("[Benchmark Error] " + e); + } finally { + // mark benchmark done + ExchangePayload donePayload = new ExchangePayload(); + donePayload.put(detectStatusKey, PingIntegrationEntities.DetectStatus.STANDBY.ordinal()); + exchangeFlowExecutor.syncExchangeUp(donePayload); + } + } + + public void doBenchmark(String detectStatusKey) { + exchangeFlowExecutor.syncExchangeDown(new ExchangePayload(Map.of(detectStatusKey, PingIntegrationEntities.DetectStatus.DETECTING.ordinal()))); + int timeout = 2000; + + // start pinging + List devices = deviceServiceProvider.findAll(PingConstants.INTEGRATION_ID); + AtomicReference activeCount = new AtomicReference<>(0L); + AtomicReference inactiveCount = new AtomicReference<>(0L); + + devices.forEach(device -> { + Long delay = null; + String ip = (String) device.getAdditional().get("ip"); + try { + long startTimestamp = System.currentTimeMillis(); + InetAddress inet = InetAddress.getByName(ip); + if (inet.isReachable(timeout)) { + delay = System.currentTimeMillis() - startTimestamp; + } + } catch (IOException e) { + log.warn("[Not reachable]: " + ip); + } + + int deviceStatus = PingConstants.DeviceStatus.OFFLINE.ordinal(); + if (delay != null) { + activeCount.updateAndGet(v -> v + 1); + deviceStatus = PingConstants.DeviceStatus.ONLINE.ordinal(); + } else { + inactiveCount.updateAndGet(v -> v + 1); + } + + // Device have only one entity + ExchangePayload exchangePayload = new ExchangePayload(); + final int fDeviceStatus = deviceStatus; + final Long fDelay = delay; + device.getEntities().forEach(entity -> { + if (entity.getIdentifier().equals("status")) { + exchangePayload.put(entity.getKey(), fDeviceStatus); + } else if (entity.getIdentifier().equals("delay")) { + exchangePayload.put(entity.getKey(), fDelay); + } + }); + + Assert.notEmpty(exchangePayload, "Exchange should not be empty!"); + exchangeFlowExecutor.asyncExchangeDown(exchangePayload); + }); + } +} diff --git a/integrations/ping/src/main/resources/integration.yaml b/integrations/ping/src/main/resources/integration.yaml new file mode 100644 index 0000000..3ad88a7 --- /dev/null +++ b/integrations/ping/src/main/resources/integration.yaml @@ -0,0 +1,8 @@ +integration: + ping: + name: Ping + description: "Check whether device is online" + icon-url: "public/ping-logo.svg" + enabled: true + entity-identifier-add-device: add_device + entity-identifier-delete-device: delete_device \ No newline at end of file diff --git a/integrations/ping/src/main/resources/static/public/ping-logo.svg b/integrations/ping/src/main/resources/static/public/ping-logo.svg new file mode 100644 index 0000000..38e40ce --- /dev/null +++ b/integrations/ping/src/main/resources/static/public/ping-logo.svg @@ -0,0 +1,7 @@ + + +IcoFont Icons +network + + + \ No newline at end of file diff --git a/integrations/pom.xml b/integrations/pom.xml new file mode 100644 index 0000000..b30494d --- /dev/null +++ b/integrations/pom.xml @@ -0,0 +1,27 @@ + + + + com.milesight.beaveriot + beaver-iot-integrations + 1.0-SNAPSHOT + + 4.0.0 + + com.milesight.beaveriot + integrations + pom + + sample-integrations + msc-integration + ping + + + + 17 + 17 + UTF-8 + + + \ No newline at end of file diff --git a/integrations/sample-integrations/my-integration/pom.xml b/integrations/sample-integrations/my-integration/pom.xml new file mode 100644 index 0000000..17d4847 --- /dev/null +++ b/integrations/sample-integrations/my-integration/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + com.milesight.beaveriot + sample-integrations + 1.0-SNAPSHOT + + + my-integration + + + 17 + 17 + UTF-8 + + + + + com.milesight.beaveriot + context + ${project.version} + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + diff --git a/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/MyIntegrationBootstrap.java b/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/MyIntegrationBootstrap.java new file mode 100644 index 0000000..8b01a2c --- /dev/null +++ b/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/MyIntegrationBootstrap.java @@ -0,0 +1,23 @@ +package com.milesight.beaveriot.myintegration; + +import com.milesight.beaveriot.context.integration.bootstrap.IntegrationBootstrap; +import com.milesight.beaveriot.context.integration.model.Integration; +import org.springframework.stereotype.Component; + +@Component +public class MyIntegrationBootstrap implements IntegrationBootstrap { + @Override + public void onPrepared(Integration integration) { + + } + + @Override + public void onStarted(Integration integrationConfig) { + System.out.println("Hello, world!"); + } + + @Override + public void onDestroy(Integration integration) { + + } +} diff --git a/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/controller/MyIntegrationController.java b/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/controller/MyIntegrationController.java new file mode 100644 index 0000000..7678fd1 --- /dev/null +++ b/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/controller/MyIntegrationController.java @@ -0,0 +1,48 @@ +package com.milesight.beaveriot.myintegration.controller; + +import com.fasterxml.jackson.databind.JsonNode; +import com.milesight.beaveriot.base.response.ResponseBody; +import com.milesight.beaveriot.base.response.ResponseBuilder; +import com.milesight.beaveriot.context.api.DeviceServiceProvider; +import com.milesight.beaveriot.context.api.EntityValueServiceProvider; +import com.milesight.beaveriot.myintegration.entity.MyDeviceEntities; +import lombok.Data; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; + +@RestController +@RequestMapping("/my-integration") // Should use integration identifier +public class MyIntegrationController { + @Autowired + private DeviceServiceProvider deviceServiceProvider; + + @Autowired + private EntityValueServiceProvider entityValueServiceProvider; + + @GetMapping("/active-count") + // highlight-next-line + public ResponseBody getActiveDeviceCount() { + List statusEntityKeys = new ArrayList<>(); + deviceServiceProvider.findAll("my-integration").forEach(device -> statusEntityKeys.add(device.getEntities().get(0).getKey())); + Long count = entityValueServiceProvider + .findValuesByKeys(statusEntityKeys) + .values() + .stream() + .map(JsonNode::asInt) + .filter(status -> status == MyDeviceEntities.DeviceStatus.ONLINE.ordinal()) + .count(); + CountResponse resp = new CountResponse(); + resp.setCount(count); + return ResponseBuilder.success(resp); + } + + @Data + public class CountResponse { + private Long count; + } +} diff --git a/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/entity/MyDeviceEntities.java b/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/entity/MyDeviceEntities.java new file mode 100644 index 0000000..67806f6 --- /dev/null +++ b/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/entity/MyDeviceEntities.java @@ -0,0 +1,24 @@ +package com.milesight.beaveriot.myintegration.entity; + +import com.milesight.beaveriot.context.integration.entity.annotation.Attribute; +import com.milesight.beaveriot.context.integration.entity.annotation.DeviceEntities; +import com.milesight.beaveriot.context.integration.entity.annotation.Entity; +import com.milesight.beaveriot.context.integration.entity.annotation.KeyValue; +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.enums.EntityType; +import com.milesight.beaveriot.context.integration.model.ExchangePayload; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +@DeviceEntities(name="Default Device", identifier = "localhost", additional = {@KeyValue(key = "ip", value = "localhost")}) +public class MyDeviceEntities extends ExchangePayload { + @Entity(type = EntityType.PROPERTY, name = "Device Connection Status", accessMod = AccessMod.R, attributes = @Attribute(enumClass = DeviceStatus.class)) + // highlight-next-line + private Long status; + + public enum DeviceStatus { + ONLINE, OFFLINE; + } +} diff --git a/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/entity/MyIntegrationEntities.java b/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/entity/MyIntegrationEntities.java new file mode 100644 index 0000000..44cc338 --- /dev/null +++ b/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/entity/MyIntegrationEntities.java @@ -0,0 +1,72 @@ +package com.milesight.beaveriot.myintegration.entity; + +import com.milesight.beaveriot.context.integration.context.AddDeviceAware; +import com.milesight.beaveriot.context.integration.context.DeleteDeviceAware; +import com.milesight.beaveriot.context.integration.entity.annotation.Attribute; +import com.milesight.beaveriot.context.integration.entity.annotation.Entities; +import com.milesight.beaveriot.context.integration.entity.annotation.Entity; +import com.milesight.beaveriot.context.integration.entity.annotation.IntegrationEntities; +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.enums.EntityType; +import com.milesight.beaveriot.context.integration.model.ExchangePayload; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +@IntegrationEntities +public class MyIntegrationEntities extends ExchangePayload { + @Entity(type = EntityType.SERVICE, name = "Device Connection Benchmark", identifier = "benchmark") + // highlight-next-line + private String benchmark; + + @Entity(type = EntityType.PROPERTY, name = "Detect Status", identifier = "detect_status", attributes = @Attribute(enumClass = DetectStatus.class), accessMod = AccessMod.R) + // highlight-next-line + private Integer detectStatus; + + @Entity(type = EntityType.EVENT, name = "Detect Report", identifier = "detect_report") + // highlight-next-line + private DetectReport detectReport; + + @Entity(type = EntityType.SERVICE, identifier = "add_device") + // highlight-next-line + private AddDevice addDevice; + + @Entity(type = EntityType.SERVICE, identifier = "delete_device") + // highlight-next-line + private DeleteDevice deleteDevice; + + + @Data + @EqualsAndHashCode(callSuper = true) + @Entities + public static class DetectReport extends ExchangePayload { + // Entity type inherits from parent entity (DetectReport) + @Entity + private Long consumedTime; + + @Entity + private Long onlineCount; + + @Entity + private Long offlineCount; + } + + @Data + @EqualsAndHashCode(callSuper = true) + @Entities + public static class AddDevice extends ExchangePayload implements AddDeviceAware { + @Entity + private String ip; + } + + @Data + @EqualsAndHashCode(callSuper = true) + @Entities + public static class DeleteDevice extends ExchangePayload implements DeleteDeviceAware { + } + + public enum DetectStatus { + STANDBY, DETECTING; + } +} diff --git a/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/service/MyDeviceService.java b/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/service/MyDeviceService.java new file mode 100644 index 0000000..fb4d749 --- /dev/null +++ b/integrations/sample-integrations/my-integration/src/main/java/com/milesight/beaveriot/myintegration/service/MyDeviceService.java @@ -0,0 +1,117 @@ +package com.milesight.beaveriot.myintegration.service; + +import com.milesight.beaveriot.context.api.DeviceServiceProvider; +import com.milesight.beaveriot.context.api.ExchangeFlowExecutor; +import com.milesight.beaveriot.context.integration.enums.AccessMod; +import com.milesight.beaveriot.context.integration.enums.EntityValueType; +import com.milesight.beaveriot.context.integration.model.*; +import com.milesight.beaveriot.context.integration.model.event.ExchangeEvent; +import com.milesight.beaveriot.context.integration.proxy.ExchangePayloadProxy; +import com.milesight.beaveriot.eventbus.annotations.EventSubscribe; +import com.milesight.beaveriot.eventbus.api.Event; +import com.milesight.beaveriot.myintegration.entity.MyDeviceEntities; +import com.milesight.beaveriot.myintegration.entity.MyIntegrationEntities; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +@Service +public class MyDeviceService { + @Autowired + private DeviceServiceProvider deviceServiceProvider; + + @Autowired + private ExchangeFlowExecutor exchangeFlowExecutor; + + @EventSubscribe(payloadKeyExpression = "my-integration.integration.add_device.*", eventType = ExchangeEvent.EventType.DOWN) + // highlight-next-line + public void onAddDevice(Event event) { + MyIntegrationEntities.AddDevice addDevice = event.getPayload(); + String deviceName = addDevice.getAddDeviceName(); + String ip = addDevice.getIp(); + final String integrationId = "my-integration"; + Device device = new DeviceBuilder(integrationId) + .name(deviceName) + .identifier(ip.replace(".", "_")) + .additional(Map.of("ip", ip)) + .entity(()->{ + return new EntityBuilder(integrationId) + .identifier("status") + .property("Device Status", AccessMod.R) + .valueType(EntityValueType.LONG) + .attributes(new AttributeBuilder().enums(MyDeviceEntities.DeviceStatus.class).build()) + .build(); + }) + .build(); + + deviceServiceProvider.save(device); + } + + @EventSubscribe(payloadKeyExpression = "my-integration.integration.delete_device", eventType = ExchangeEvent.EventType.DOWN) + // highlight-next-line + public void onDeleteDevice(Event event) { + Device device = event.getPayload().getDeletedDevice(); + deviceServiceProvider.deleteById(device.getId()); + } + + @EventSubscribe(payloadKeyExpression = "my-integration.integration.benchmark", eventType = ExchangeEvent.EventType.DOWN) + // highlight-next-line + public void doBenchmark(Event event) { + // mark benchmark starting + exchangeFlowExecutor.syncExchangeDown(new ExchangePayload(Map.of("my-integration.integration.detect_status", MyIntegrationEntities.DetectStatus.DETECTING.ordinal()))); + int timeout = 5000; + + // start pinging + List devices = deviceServiceProvider.findAll("my-integration"); + AtomicReference activeCount = new AtomicReference<>(0L); + AtomicReference inactiveCount = new AtomicReference<>(0L); + Long startTimestamp = System.currentTimeMillis(); + devices.forEach(device -> { + boolean isSuccess = false; + try { + String ip = (String) device.getAdditional().get("ip"); + InetAddress inet = InetAddress.getByName(ip); + if (inet.isReachable(timeout)) { + isSuccess = true; + } + } catch (Exception e) { + e.printStackTrace(); + } + + int deviceStatus = MyDeviceEntities.DeviceStatus.OFFLINE.ordinal(); + if (isSuccess) { + activeCount.updateAndGet(v -> v + 1); + deviceStatus = MyDeviceEntities.DeviceStatus.ONLINE.ordinal(); + } else { + inactiveCount.updateAndGet(v -> v + 1); + } + + // Device have only one entity + String deviceStatusKey = device.getEntities().get(0).getKey(); + exchangeFlowExecutor.asyncExchangeDown(new ExchangePayload(Map.of(deviceStatusKey, (long) deviceStatus))); + }); + Long endTimestamp = System.currentTimeMillis(); + + // mark benchmark done + MyIntegrationEntities myIntegrationEntities = ExchangePayload.createProxy(MyIntegrationEntities.class); + myIntegrationEntities.setDetectStatus(MyIntegrationEntities.DetectStatus.STANDBY.ordinal()); + myIntegrationEntities.setDetectReport(null); + MyIntegrationEntities.DetectReport detectReport = myIntegrationEntities.getDetectReport(); + detectReport.setConsumedTime(endTimestamp - startTimestamp); + detectReport.setOnlineCount(activeCount.get()); + detectReport.setOfflineCount(inactiveCount.get()); + + exchangeFlowExecutor.syncExchangeUp(myIntegrationEntities); + } + + @EventSubscribe(payloadKeyExpression = "my-integration.integration.detect_report.*", eventType = ExchangeEvent.EventType.UP) + // highlight-next-line + public void listenDetectReport(Event event) { + System.out.println("[Get-Report] " + event.getPayload()); // do something with this report + } +} diff --git a/integrations/sample-integrations/my-integration/src/main/resources/integration.yaml b/integrations/sample-integrations/my-integration/src/main/resources/integration.yaml new file mode 100644 index 0000000..42343ba --- /dev/null +++ b/integrations/sample-integrations/my-integration/src/main/resources/integration.yaml @@ -0,0 +1,8 @@ +integration: + my-integration: + name: My Integration Name + description: "My Demo Integration" + icon-url: "public/my-integration-logo.svg" + enabled: true + entity-identifier-add-device: add_device + entity-identifier-delete-device: delete_device \ No newline at end of file diff --git a/integrations/sample-integrations/my-integration/src/main/resources/static/public/my-integration-logo.svg b/integrations/sample-integrations/my-integration/src/main/resources/static/public/my-integration-logo.svg new file mode 100644 index 0000000..2af6cfa --- /dev/null +++ b/integrations/sample-integrations/my-integration/src/main/resources/static/public/my-integration-logo.svg @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/integrations/sample-integrations/pom.xml b/integrations/sample-integrations/pom.xml new file mode 100644 index 0000000..d1d376a --- /dev/null +++ b/integrations/sample-integrations/pom.xml @@ -0,0 +1,34 @@ + + + 4.0.0 + + com.milesight.beaveriot + integrations + 1.0-SNAPSHOT + + + sample-integrations + + + my-integration + + + pom + + 17 + 17 + UTF-8 + + + + + com.milesight.beaveriot + context + ${project.version} + provided + + + + \ No newline at end of file diff --git a/integrations/sample-integrations/src/main/java/com/milesight/beaveriot/package-info.java b/integrations/sample-integrations/src/main/java/com/milesight/beaveriot/package-info.java new file mode 100644 index 0000000..cf156c3 --- /dev/null +++ b/integrations/sample-integrations/src/main/java/com/milesight/beaveriot/package-info.java @@ -0,0 +1,4 @@ +/** + * @author leon + */ +package com.milesight.beaveriot; \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..9d65cd3 --- /dev/null +++ b/pom.xml @@ -0,0 +1,96 @@ + + + 4.0.0 + + com.milesight.beaveriot + beaver-iot-integrations + 1.0-SNAPSHOT + pom + + application-dev + integrations + + + + 17 + 17 + UTF-8 + 1.0-SNAPSHOT + + + + + + com.milesight.beaveriot + beaver-iot-parent + ${beaver-iot.version} + pom + import + + + + + + + nexus + https://nexus.milesight.com/repository/maven-open-source/ + + + snapshots + https://nexus.milesight.com/repository/maven-open-source/ + false + + + + + snapshots + nexus-snapshots + https://nexus.milesight.com/repository/maven-open-source/ + + true + always + + + true + always + + + + + + snapshots + nexus-snapshots + https://nexus.milesight.com/repository/maven-open-source/ + + true + always + + + true + always + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + + + + + + +