From fc72841f10d7df1d4e66314eb9072155eb491747 Mon Sep 17 00:00:00 2001 From: "Ranim NAIMI (EGM)" Date: Fri, 19 Jan 2024 17:11:41 +0100 Subject: [PATCH 01/17] added throttling to subscription --- .../subscription/model/Subscription.kt | 5 +++-- .../service/SubscriptionService.kt | 20 ++++++++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt index 04ce928d9..878069fbd 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt @@ -20,7 +20,7 @@ import java.net.URI import java.time.Instant import java.time.ZoneOffset import java.time.ZonedDateTime -import java.util.UUID +import java.util.* val defaultNotificationTriggers = listOf( ATTRIBUTE_CREATED.notificationTrigger, @@ -49,7 +49,8 @@ data class Subscription( // - used to compact entities in notifications // - used when needed to serve contexts in JSON notifications @JsonProperty(value = JSONLD_CONTEXT) - val contexts: List + val contexts: List, + val throttling: Int? = null ) { @Transient diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index a2f92c482..793eb0038 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -1,7 +1,10 @@ package com.egm.stellio.subscription.service -import arrow.core.* +import arrow.core.Either +import arrow.core.Option +import arrow.core.left import arrow.core.raise.either +import arrow.core.right import com.egm.stellio.shared.model.* import com.egm.stellio.shared.util.* import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE_TERM @@ -10,8 +13,6 @@ import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SUBSCRIPTION_TERM import com.egm.stellio.shared.util.JsonLdUtils.expandJsonLdTerm import com.egm.stellio.subscription.config.SubscriptionProperties import com.egm.stellio.subscription.model.* -import com.egm.stellio.subscription.model.GeoQ -import com.egm.stellio.subscription.model.Subscription import com.egm.stellio.subscription.utils.* import com.egm.stellio.subscription.utils.ParsingUtils.endpointInfoMapToString import com.egm.stellio.subscription.utils.ParsingUtils.endpointInfoToString @@ -51,10 +52,12 @@ class SubscriptionService( checkIdIsValid(subscription).bind() checkEntitiesOrWatchedAttributes(subscription).bind() checkTimeIntervalGreaterThanZero(subscription).bind() + checkThrottlingGreaterThanZero(subscription).bind() checkSubscriptionValidity(subscription).bind() checkExpiresAtInTheFuture(subscription).bind() checkIdPatternIsValid(subscription).bind() checkNotificationTriggersAreValid(subscription).bind() + } private fun checkTypeIsSubscription(subscription: Subscription): Either = @@ -84,6 +87,14 @@ class SubscriptionService( BadRequestDataException("The value of 'timeInterval' must be greater than zero (int)").left() else Unit.right() + private fun checkThrottlingGreaterThanZero(subscription: Subscription): Either = + if ((subscription.timeInterval != null && subscription.timeInterval > 0 && subscription.throttling != null) || + (subscription.timeInterval != null && subscription.timeInterval < 1 && subscription.throttling != null) || + (subscription.timeInterval == null && subscription.throttling != null && subscription.throttling < 1) + ) + BadRequestDataException("Error").left() + else Unit.right() + private fun checkExpiresAtInTheFuture(subscription: Subscription): Either = if (subscription.expiresAt != null && subscription.expiresAt.isBefore(ngsiLdDateTime())) BadRequestDataException("'expiresAt' must be in the future").left() @@ -416,6 +427,7 @@ class SubscriptionService( } listOf(Pair("notif_attributes", attributes)) } + "format" -> { val format = if (attribute.value == "keyValues") @@ -424,6 +436,7 @@ class SubscriptionService( NotificationParams.FormatType.NORMALIZED.name listOf(Pair("notif_format", format)) } + "endpoint" -> { val endpoint = attribute.value as Map val accept = @@ -441,6 +454,7 @@ class SubscriptionService( Pair("endpoint_notifier_info", Json.of(endpointInfoMapToString(endpointNotifierInfo))) ) } + else -> throw BadRequestDataException("Could not update attribute ${attribute.key}") } } From c3a57fefece8a3f56813a193619d1f6cafacfb8b Mon Sep 17 00:00:00 2001 From: "Ranim NAIMI (EGM)" Date: Fri, 19 Jan 2024 17:27:49 +0100 Subject: [PATCH 02/17] commit after merge with dev --- .../com/egm/stellio/subscription/service/SubscriptionService.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index 793eb0038..cded8b8a5 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -58,6 +58,7 @@ class SubscriptionService( checkIdPatternIsValid(subscription).bind() checkNotificationTriggersAreValid(subscription).bind() + } private fun checkTypeIsSubscription(subscription: Subscription): Either = From f911bb7aa798935ae7f591f6d71b92dccee0e192 Mon Sep 17 00:00:00 2001 From: "Ranim NAIMI (EGM)" Date: Tue, 23 Jan 2024 14:48:21 +0100 Subject: [PATCH 03/17] added throttling to subscription create and added unit tests --- .../service/SubscriptionService.kt | 33 ++++++----- .../V0_26__add_throttling_to_subscription.sql | 2 + .../service/SubscriptionServiceTests.kt | 57 +++++++++++++++---- .../resources/ngsild/subscription_full.json | 3 +- 4 files changed, 71 insertions(+), 24 deletions(-) create mode 100644 subscription-service/src/main/resources/db/migration/V0_26__add_throttling_to_subscription.sql diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index cded8b8a5..a21e58c71 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -77,23 +77,29 @@ class SubscriptionService( else Unit.right() private fun checkSubscriptionValidity(subscription: Subscription): Either = - if (subscription.watchedAttributes != null && subscription.timeInterval != null) - BadRequestDataException( - "You can't use 'timeInterval' in conjunction with 'watchedAttributes'" - ).left() - else Unit.right() + when { + subscription.watchedAttributes != null && subscription.timeInterval != null -> { + BadRequestDataException( + "You can't use 'timeInterval' in conjunction with 'watchedAttributes'" + ).left() + } + subscription.timeInterval != null && subscription.throttling != null -> { + BadRequestDataException( + "You can't use 'timeInterval' in conjunction with 'throttling'" + ).left() + } + else -> + Unit.right() + } private fun checkTimeIntervalGreaterThanZero(subscription: Subscription): Either = - if (subscription.timeInterval != null && subscription.timeInterval < 1) + if (subscription.timeInterval != null && subscription.timeInterval < 1 && subscription.throttling == null) BadRequestDataException("The value of 'timeInterval' must be greater than zero (int)").left() else Unit.right() private fun checkThrottlingGreaterThanZero(subscription: Subscription): Either = - if ((subscription.timeInterval != null && subscription.timeInterval > 0 && subscription.throttling != null) || - (subscription.timeInterval != null && subscription.timeInterval < 1 && subscription.throttling != null) || - (subscription.timeInterval == null && subscription.throttling != null && subscription.throttling < 1) - ) - BadRequestDataException("Error").left() + if (subscription.throttling != null && subscription.throttling < 1) + BadRequestDataException("The value of 'throttling' must be greater than zero (int)").left() else Unit.right() private fun checkExpiresAtInTheFuture(subscription: Subscription): Either = @@ -146,11 +152,11 @@ class SubscriptionService( INSERT INTO subscription(id, type, subscription_name, created_at, description, watched_attributes, notification_trigger, time_interval, q, scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, times_sent, is_active, - expires_at, sub, contexts) + expires_at, sub, contexts, throttling) VALUES(:id, :type, :subscription_name, :created_at, :description, :watched_attributes, :notification_trigger, :time_interval, :q, :scope_q, :notif_attributes, :notif_format, :endpoint_uri, :endpoint_accept, :endpoint_receiver_info, :endpoint_notifier_info, :times_sent, :is_active, - :expires_at, :sub, :contexts) + :expires_at, :sub, :contexts, :throttling) """.trimIndent() databaseClient.sql(insertStatement) @@ -175,6 +181,7 @@ class SubscriptionService( .bind("expires_at", subscription.expiresAt) .bind("sub", sub.toStringValue()) .bind("contexts", subscription.contexts.toTypedArray()) + .bind("throttling", subscription.throttling) .execute().bind() geoQuery?.let { diff --git a/subscription-service/src/main/resources/db/migration/V0_26__add_throttling_to_subscription.sql b/subscription-service/src/main/resources/db/migration/V0_26__add_throttling_to_subscription.sql new file mode 100644 index 000000000..90052808f --- /dev/null +++ b/subscription-service/src/main/resources/db/migration/V0_26__add_throttling_to_subscription.sql @@ -0,0 +1,2 @@ +ALTER TABLE subscription + ADD throttling integer; \ No newline at end of file diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index 35a44cc74..6cc24721a 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -149,6 +149,43 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { } } + @Test + fun `it should not allow a subscription with timeInterval and throttling`() = runTest { + val payload = mapOf( + "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), + "type" to NGSILD_SUBSCRIPTION_TERM, + "timeInterval" to 10, + "entities" to listOf(mapOf("type" to BEEHIVE_TYPE)), + "notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")), + "throttling" to 30 + ) + + val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() + subscriptionService.validateNewSubscription(subscription) + .shouldFailWith { + it is BadRequestDataException && + it.message == "You can't use 'timeInterval' in conjunction with 'throttling'" + } + } + + @Test + fun `it should not allow a subscription with a negative throttling`() = runTest { + val payload = mapOf( + "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), + "type" to NGSILD_SUBSCRIPTION_TERM, + "entities" to listOf(mapOf("type" to BEEHIVE_TYPE)), + "notification" to mapOf("endpoint" to mapOf("uri" to "http://my.endpoint/notifiy")), + "throttling" to -30 + ) + + val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() + subscriptionService.validateNewSubscription(subscription) + .shouldFailWith { + it is BadRequestDataException && + it.message == "The value of 'throttling' must be greater than zero (int)" + } + } + @Test fun `it should not allow a subscription with a negative timeInterval`() = runTest { val payload = mapOf( @@ -263,10 +300,10 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { ) && it.watchedAttributes == listOf(INCOMING_PROPERTY) && it.notificationTrigger == listOf( - ENTITY_CREATED.notificationTrigger, - ATTRIBUTE_UPDATED.notificationTrigger, - ENTITY_DELETED.notificationTrigger - ) && + ENTITY_CREATED.notificationTrigger, + ATTRIBUTE_UPDATED.notificationTrigger, + ENTITY_DELETED.notificationTrigger + ) && it.timeInterval == null && it.q == "foodQuantity<150;foodName=='dietary fibres'" && ( @@ -281,10 +318,10 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { it.notification.attributes == listOf(INCOMING_PROPERTY, OUTGOING_PROPERTY) && it.notification.format == FormatType.NORMALIZED && it.notification.endpoint == Endpoint( - URI("http://localhost:8084"), - Endpoint.AcceptType.JSON, - listOf(EndpointInfo("Authorization-token", "Authorization-token-value")) - ) && + URI("http://localhost:8084"), + Endpoint.AcceptType.JSON, + listOf(EndpointInfo("Authorization-token", "Authorization-token-value")) + ) && it.expiresAt == ZonedDateTime.parse("2100-01-01T00:00:00Z") } } @@ -725,8 +762,8 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { it.notification.endpoint.accept.name == "JSONLD" && it.notification.endpoint.uri.toString() == "http://localhost:8080" && it.notification.endpoint.receiverInfo == listOf( - EndpointInfo("Authorization-token", "Authorization-token-newValue") - ) + EndpointInfo("Authorization-token", "Authorization-token-newValue") + ) } } diff --git a/subscription-service/src/test/resources/ngsild/subscription_full.json b/subscription-service/src/test/resources/ngsild/subscription_full.json index 584f81164..08d3ba852 100644 --- a/subscription-service/src/test/resources/ngsild/subscription_full.json +++ b/subscription-service/src/test/resources/ngsild/subscription_full.json @@ -35,5 +35,6 @@ ] } }, - "expiresAt": "2100-01-01T00:00:00Z" + "expiresAt": "2100-01-01T00:00:00Z", + "throttling": 60 } From efe50baf80a89c6759d4fc9bee86e0acd1f1ace6 Mon Sep 17 00:00:00 2001 From: ranim-n Date: Wed, 24 Jan 2024 11:38:43 +0100 Subject: [PATCH 04/17] fixed unit test for a subscription with all attributes --- .../egm/stellio/subscription/service/SubscriptionService.kt | 5 +++-- .../stellio/subscription/service/SubscriptionServiceTests.kt | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index a21e58c71..49da8669b 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -252,7 +252,7 @@ class SubscriptionService( notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status, times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id, id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates, - pgis_geometry, geoproperty, scope_q, expires_at, contexts + pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling FROM subscription LEFT JOIN entity_selector ON entity_selector.subscription_id = :id LEFT JOIN geometry_query ON geometry_query.subscription_id = :id @@ -691,7 +691,8 @@ class SubscriptionService( lastSuccess = toNullableZonedDateTime(row["last_success"]) ), isActive = toBoolean(row["is_active"]), - contexts = toList(row["contexts"]) + contexts = toList(row["contexts"]), + throttling = toNullableInt(row["throttling"]) ) } diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index 6cc24721a..201c853f3 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -322,7 +322,8 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { Endpoint.AcceptType.JSON, listOf(EndpointInfo("Authorization-token", "Authorization-token-value")) ) && - it.expiresAt == ZonedDateTime.parse("2100-01-01T00:00:00Z") + it.expiresAt == ZonedDateTime.parse("2100-01-01T00:00:00Z") && + it.throttling == 60 } } From a5c5b36c77354c51e382b8ab91bdecbdfcf1e61a Mon Sep 17 00:00:00 2001 From: ranim-n Date: Wed, 24 Jan 2024 15:22:11 +0100 Subject: [PATCH 05/17] fixed indentation --- .../subscription/model/Subscription.kt | 2 +- .../service/SubscriptionService.kt | 2 -- .../service/SubscriptionServiceTests.kt | 20 +++++++++---------- .../support/WithTimescaleContainer.kt | 1 + 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt index 878069fbd..2fab55846 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/model/Subscription.kt @@ -20,7 +20,7 @@ import java.net.URI import java.time.Instant import java.time.ZoneOffset import java.time.ZonedDateTime -import java.util.* +import java.util.UUID val defaultNotificationTriggers = listOf( ATTRIBUTE_CREATED.notificationTrigger, diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index 49da8669b..b197985c4 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -57,8 +57,6 @@ class SubscriptionService( checkExpiresAtInTheFuture(subscription).bind() checkIdPatternIsValid(subscription).bind() checkNotificationTriggersAreValid(subscription).bind() - - } private fun checkTypeIsSubscription(subscription: Subscription): Either = diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index 201c853f3..39f73cf63 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -300,10 +300,10 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { ) && it.watchedAttributes == listOf(INCOMING_PROPERTY) && it.notificationTrigger == listOf( - ENTITY_CREATED.notificationTrigger, - ATTRIBUTE_UPDATED.notificationTrigger, - ENTITY_DELETED.notificationTrigger - ) && + ENTITY_CREATED.notificationTrigger, + ATTRIBUTE_UPDATED.notificationTrigger, + ENTITY_DELETED.notificationTrigger + ) && it.timeInterval == null && it.q == "foodQuantity<150;foodName=='dietary fibres'" && ( @@ -318,10 +318,10 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { it.notification.attributes == listOf(INCOMING_PROPERTY, OUTGOING_PROPERTY) && it.notification.format == FormatType.NORMALIZED && it.notification.endpoint == Endpoint( - URI("http://localhost:8084"), - Endpoint.AcceptType.JSON, - listOf(EndpointInfo("Authorization-token", "Authorization-token-value")) - ) && + URI("http://localhost:8084"), + Endpoint.AcceptType.JSON, + listOf(EndpointInfo("Authorization-token", "Authorization-token-value")) + ) && it.expiresAt == ZonedDateTime.parse("2100-01-01T00:00:00Z") && it.throttling == 60 } @@ -763,8 +763,8 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { it.notification.endpoint.accept.name == "JSONLD" && it.notification.endpoint.uri.toString() == "http://localhost:8080" && it.notification.endpoint.receiverInfo == listOf( - EndpointInfo("Authorization-token", "Authorization-token-newValue") - ) + EndpointInfo("Authorization-token", "Authorization-token-newValue") + ) } } diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithTimescaleContainer.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithTimescaleContainer.kt index 6c5e4f471..3db69acca 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithTimescaleContainer.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithTimescaleContainer.kt @@ -25,6 +25,7 @@ interface WithTimescaleContainer { withEnv("POSTGRES_MULTIPLE_EXTENSIONS", "postgis,timescaledb,pgcrypto") withExposedPorts(5432) setWaitStrategy(Wait.forLogMessage(".*database system is ready to accept connections.*", 2)) + withReuse(true) } @JvmStatic From 7f59493f48b744a8a0bad509d609f69866d8822b Mon Sep 17 00:00:00 2001 From: ranim-n Date: Thu, 25 Jan 2024 09:09:59 +0100 Subject: [PATCH 06/17] added throttling to the update of a subscription --- .../egm/stellio/subscription/service/SubscriptionService.kt | 5 +++-- .../subscription/service/SubscriptionServiceTests.kt | 6 ++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index b197985c4..03c3c36ff 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -354,14 +354,15 @@ class SubscriptionService( "q", "scopeQ", "isActive", - "modifiedAt" + "modifiedAt", + "throttling" ).contains(it.key) -> { val columnName = it.key.toSqlColumnName() val value = it.value.toSqlValue(it.key) updateSubscriptionAttribute(subscriptionId, columnName, value).bind() } - listOf("csf", "throttling", "temporalQ").contains(it.key) -> { + listOf("csf", "temporalQ").contains(it.key) -> { logger.warn("Subscription $subscriptionId has unsupported attribute: ${it.key}") NotImplementedException("Subscription $subscriptionId has unsupported attribute: ${it.key}") .left().bind() diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index 39f73cf63..467e1edcb 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -716,7 +716,8 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { "geometry" to "Point", "coordinates" to "[100.0, 0.0]", "geoproperty" to "https://uri.etsi.org/ngsi-ld/observationSpace" - ) + ), + "throttling" to 50 ) subscriptionService.update(subscription.id, parsedInput, APIC_COMPOUND_CONTEXTS) @@ -732,7 +733,8 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { it.geoQ!!.georel == "equals" && it.geoQ!!.geometry == "Point" && it.geoQ!!.coordinates == "[100.0, 0.0]" && - it.geoQ!!.geoproperty == "https://uri.etsi.org/ngsi-ld/observationSpace" + it.geoQ!!.geoproperty == "https://uri.etsi.org/ngsi-ld/observationSpace" && + it.throttling == 50 } } From ac455f16f768b44eba20107ad432d61a840793c1 Mon Sep 17 00:00:00 2001 From: ranim-n Date: Thu, 25 Jan 2024 16:19:20 +0100 Subject: [PATCH 07/17] fixed update method's length after adding throttling --- .../stellio/subscription/service/SubscriptionService.kt | 8 ++++---- .../com/egm/stellio/subscription/web/APIResponses.kt | 5 +++++ .../subscription/service/SubscriptionServiceTests.kt | 4 ++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index 03c3c36ff..eb70e23f4 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -20,6 +20,8 @@ import com.egm.stellio.subscription.utils.ParsingUtils.parseEndpointInfo import com.egm.stellio.subscription.utils.ParsingUtils.parseEntitySelector import com.egm.stellio.subscription.utils.ParsingUtils.toSqlColumnName import com.egm.stellio.subscription.utils.ParsingUtils.toSqlValue +import com.egm.stellio.subscription.web.invalidSubscriptiondAttributeMessage +import com.egm.stellio.subscription.web.unsupportedSubscriptiondAttributeMessage import io.r2dbc.postgresql.codec.Json import kotlinx.coroutines.reactive.awaitFirst import org.locationtech.jts.geom.Geometry @@ -363,14 +365,12 @@ class SubscriptionService( } listOf("csf", "temporalQ").contains(it.key) -> { - logger.warn("Subscription $subscriptionId has unsupported attribute: ${it.key}") - NotImplementedException("Subscription $subscriptionId has unsupported attribute: ${it.key}") + NotImplementedException(unsupportedSubscriptiondAttributeMessage(subscriptionId,it)) .left().bind() } else -> { - logger.warn("Subscription $subscriptionId has invalid attribute: ${it.key}") - BadRequestDataException("Subscription $subscriptionId has invalid attribute: ${it.key}") + BadRequestDataException(invalidSubscriptiondAttributeMessage(subscriptionId,it)) .left().bind() } } diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt index 26ec9a88b..a21b75107 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt @@ -6,3 +6,8 @@ fun subscriptionNotFoundMessage(subscriptionId: URI) = "Could not find a subscri fun subscriptionAlreadyExistsMessage(subscriptionId: URI) = "A subscription with id $subscriptionId already exists" fun subscriptionUnauthorizedMessage(subscriptionId: URI) = "User is not authorized to access subscription $subscriptionId" +fun unsupportedSubscriptiondAttributeMessage(subscriptionId: URI, attribute: Map.Entry) = + "Subscription $subscriptionId has unsupported attribute: ${attribute.key}" + +fun invalidSubscriptiondAttributeMessage(subscriptionId: URI, attribute: Map.Entry) = + "Subscription $subscriptionId has invalid attribute: ${attribute.key}" diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index 467e1edcb..83dbf22f6 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -927,12 +927,12 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { val subscription = loadAndDeserializeSubscription("subscription_minimal_entities.json") subscriptionService.create(subscription, mockUserSub).shouldSucceed() - val parsedInput = mapOf("type" to NGSILD_SUBSCRIPTION_TERM, "throttling" to "someValue") + val parsedInput = mapOf("type" to NGSILD_SUBSCRIPTION_TERM, "csf" to "someValue") subscriptionService.update(subscription.id, parsedInput, APIC_COMPOUND_CONTEXTS) .shouldFailWith { it is NotImplementedException && - it.message == "Subscription urn:ngsi-ld:Subscription:1 has unsupported attribute: throttling" + it.message == "Subscription urn:ngsi-ld:Subscription:1 has unsupported attribute: csf" } } From b115fb23581bc78b667b433e7fda8dcf6ff7764d Mon Sep 17 00:00:00 2001 From: ranim-n Date: Thu, 25 Jan 2024 16:25:29 +0100 Subject: [PATCH 08/17] fixed issues from detekt --- .../egm/stellio/subscription/service/SubscriptionService.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index eb70e23f4..d9ccd309b 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -47,8 +47,6 @@ class SubscriptionService( private val r2dbcEntityTemplate: R2dbcEntityTemplate ) { - private val logger = LoggerFactory.getLogger(javaClass) - suspend fun validateNewSubscription(subscription: Subscription): Either = either { checkTypeIsSubscription(subscription).bind() checkIdIsValid(subscription).bind() @@ -365,12 +363,12 @@ class SubscriptionService( } listOf("csf", "temporalQ").contains(it.key) -> { - NotImplementedException(unsupportedSubscriptiondAttributeMessage(subscriptionId,it)) + NotImplementedException(unsupportedSubscriptiondAttributeMessage(subscriptionId, it)) .left().bind() } else -> { - BadRequestDataException(invalidSubscriptiondAttributeMessage(subscriptionId,it)) + BadRequestDataException(invalidSubscriptiondAttributeMessage(subscriptionId, it)) .left().bind() } } From e2e3a93b10e27d16a2e50400d9f6ed43cc1e39db Mon Sep 17 00:00:00 2001 From: ranim-n Date: Thu, 25 Jan 2024 16:54:24 +0100 Subject: [PATCH 09/17] fixed issues from detekt part 2 --- .../com/egm/stellio/subscription/service/SubscriptionService.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index d9ccd309b..3f3a62718 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -25,7 +25,6 @@ import com.egm.stellio.subscription.web.unsupportedSubscriptiondAttributeMessage import io.r2dbc.postgresql.codec.Json import kotlinx.coroutines.reactive.awaitFirst import org.locationtech.jts.geom.Geometry -import org.slf4j.LoggerFactory import org.springframework.data.r2dbc.core.R2dbcEntityTemplate import org.springframework.data.relational.core.query.Criteria.where import org.springframework.data.relational.core.query.Query.query From fcbc190b6e9585334bc3d892d6e0fcef421d75e3 Mon Sep 17 00:00:00 2001 From: ranim-n Date: Mon, 29 Jan 2024 16:34:03 +0100 Subject: [PATCH 10/17] added throttling to getting matching subscriptions --- .../service/SubscriptionService.kt | 12 ++++++---- .../service/SubscriptionServiceTests.kt | 23 +++++++++++++++++++ 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index 3f3a62718..2a4301385 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -497,7 +497,7 @@ class SubscriptionService( notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status, times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id, id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates, - pgis_geometry, geoproperty, scope_q, expires_at, contexts + pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling FROM subscription LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id @@ -539,13 +539,15 @@ class SubscriptionService( entity_selector.id as entity_id, entity_selector.id_pattern as id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates, pgis_geometry, geoproperty, scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, times_sent, - endpoint_receiver_info, endpoint_notifier_info, contexts + endpoint_receiver_info, endpoint_notifier_info, contexts, throttling FROM subscription LEFT JOIN entity_selector on subscription.id = entity_selector.subscription_id LEFT JOIN geometry_query on subscription.id = geometry_query.subscription_id WHERE is_active AND ( expires_at is null OR expires_at >= :date ) AND time_interval IS NULL + AND ( throttling IS NULL + OR (last_notification + throttling * INTERVAL '1 second') > CURRENT_TIMESTAMP ) AND ( string_to_array(watched_attributes, ',') && string_to_array(:updatedAttributes, ',') OR watched_attributes IS NULL) AND CASE @@ -717,7 +719,8 @@ class SubscriptionService( lastFailure = null, lastSuccess = null ), - contexts = toList(row["contexts"]) + contexts = toList(row["contexts"]), + throttling = toNullableInt(row["throttling"]) ) } @@ -753,11 +756,12 @@ class SubscriptionService( scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status, times_sent, last_notification, last_failure, last_success, is_active, entity_selector.id as entity_id, id_pattern, entity_selector.type_selection as type_selection, georel, - geometry, coordinates, pgis_geometry, geoproperty, contexts + geometry, coordinates, pgis_geometry, geoproperty, contexts, throttling FROM subscription LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id WHERE time_interval IS NOT NULL + AND throttling IS NULL AND (last_notification IS NULL OR ((EXTRACT(EPOCH FROM last_notification) + time_interval) < EXTRACT(EPOCH FROM :currentDate)) ) diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index 83dbf22f6..f18cf2081 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -1102,6 +1102,29 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { .shouldSucceedWith { assertEquals(0, it.size) } } + @Test + fun `it should not return a subscription if throttling has not elapsed yet`() = runTest { + val expandedEntity = expandJsonLdEntity(entity, APIC_COMPOUND_CONTEXTS) + + val payload = mapOf( + "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), + "type" to NGSILD_SUBSCRIPTION_TERM, + "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY), + "notification" to mapOf( + "endpoint" to mapOf("uri" to "http://my.endpoint/notifiy"), + "lastNotification" to ngsiLdDateTime() + ), + "throttling" to 300 + ) + + val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() + + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + + subscriptionService.getMatchingSubscriptions(expandedEntity, setOf(NGSILD_LOCATION_PROPERTY), ATTRIBUTE_UPDATED) + .shouldSucceedWith { assertEquals(0, it.size) } + } + @ParameterizedTest @CsvSource( "near;minDistance==1000, Polygon, '[[[100.0, 0.0], [101.0, 0.0], [101.0, -1.0], [100.0, 0.0]]]', 0", From 90d20a1e6ae796eef5d0e765f9ef95177a64cf65 Mon Sep 17 00:00:00 2001 From: ranim-n Date: Wed, 31 Jan 2024 10:20:48 +0100 Subject: [PATCH 11/17] fixed previous commit related to throttling while getting matching subscriptions --- .../stellio/subscription/service/SubscriptionService.kt | 7 +++---- .../com/egm/stellio/subscription/web/APIResponses.kt | 8 ++++---- .../subscription/service/SubscriptionServiceTests.kt | 6 +++++- .../subscription/support/WithTimescaleContainer.kt | 1 - 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index 2a4301385..388831d51 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -362,12 +362,12 @@ class SubscriptionService( } listOf("csf", "temporalQ").contains(it.key) -> { - NotImplementedException(unsupportedSubscriptiondAttributeMessage(subscriptionId, it)) + NotImplementedException(unsupportedSubscriptiondAttributeMessage(subscriptionId, it.key)) .left().bind() } else -> { - BadRequestDataException(invalidSubscriptiondAttributeMessage(subscriptionId, it)) + BadRequestDataException(invalidSubscriptiondAttributeMessage(subscriptionId, it.key)) .left().bind() } } @@ -547,7 +547,7 @@ class SubscriptionService( AND ( expires_at is null OR expires_at >= :date ) AND time_interval IS NULL AND ( throttling IS NULL - OR (last_notification + throttling * INTERVAL '1 second') > CURRENT_TIMESTAMP ) + OR (last_notification + throttling * INTERVAL '1 second') > CURRENT_TIMESTAMP AT TIME ZONE 'UTC') AND ( string_to_array(watched_attributes, ',') && string_to_array(:updatedAttributes, ',') OR watched_attributes IS NULL) AND CASE @@ -761,7 +761,6 @@ class SubscriptionService( LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id WHERE time_interval IS NOT NULL - AND throttling IS NULL AND (last_notification IS NULL OR ((EXTRACT(EPOCH FROM last_notification) + time_interval) < EXTRACT(EPOCH FROM :currentDate)) ) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt index a21b75107..eca9bffb9 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt @@ -6,8 +6,8 @@ fun subscriptionNotFoundMessage(subscriptionId: URI) = "Could not find a subscri fun subscriptionAlreadyExistsMessage(subscriptionId: URI) = "A subscription with id $subscriptionId already exists" fun subscriptionUnauthorizedMessage(subscriptionId: URI) = "User is not authorized to access subscription $subscriptionId" -fun unsupportedSubscriptiondAttributeMessage(subscriptionId: URI, attribute: Map.Entry) = - "Subscription $subscriptionId has unsupported attribute: ${attribute.key}" +fun unsupportedSubscriptiondAttributeMessage(subscriptionId: URI, key: String) = + "Subscription $subscriptionId has unsupported attribute: $key" -fun invalidSubscriptiondAttributeMessage(subscriptionId: URI, attribute: Map.Entry) = - "Subscription $subscriptionId has invalid attribute: ${attribute.key}" +fun invalidSubscriptiondAttributeMessage(subscriptionId: URI, key: String) = + "Subscription $subscriptionId has invalid attribute: $key" diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index f18cf2081..4453b5a8c 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -1121,7 +1121,11 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { subscriptionService.create(subscription, mockUserSub).shouldSucceed() - subscriptionService.getMatchingSubscriptions(expandedEntity, setOf(NGSILD_LOCATION_PROPERTY), ATTRIBUTE_UPDATED) + subscriptionService.getMatchingSubscriptions( + expandedEntity, + setOf(INCOMING_COMPACT_PROPERTY), + ATTRIBUTE_UPDATED + ) .shouldSucceedWith { assertEquals(0, it.size) } } diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithTimescaleContainer.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithTimescaleContainer.kt index 3db69acca..6c5e4f471 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithTimescaleContainer.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithTimescaleContainer.kt @@ -25,7 +25,6 @@ interface WithTimescaleContainer { withEnv("POSTGRES_MULTIPLE_EXTENSIONS", "postgis,timescaledb,pgcrypto") withExposedPorts(5432) setWaitStrategy(Wait.forLogMessage(".*database system is ready to accept connections.*", 2)) - withReuse(true) } @JvmStatic From 3adb87f225f3495cdde020ee9971fb6994e56e1d Mon Sep 17 00:00:00 2001 From: ranim-n Date: Wed, 31 Jan 2024 17:33:50 +0100 Subject: [PATCH 12/17] minor fixes : added final unit test --- .../service/SubscriptionServiceTests.kt | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index 4453b5a8c..50a3af39e 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -1129,6 +1129,33 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { .shouldSucceedWith { assertEquals(0, it.size) } } + @Test + fun `it should return a subscription if throttling has elapsed`() = runTest { + val expandedEntity = expandJsonLdEntity(entity, APIC_COMPOUND_CONTEXTS) + + val payload = mapOf( + "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), + "type" to NGSILD_SUBSCRIPTION_TERM, + "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY), + "notification" to mapOf( + "endpoint" to mapOf("uri" to "http://my.endpoint/notifiy"), + "lastNotification" to ngsiLdDateTime().minusMinutes(5) + ), + "throttling" to 200 + ) + + val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() + + subscriptionService.create(subscription, mockUserSub).shouldSucceed() + + subscriptionService.getMatchingSubscriptions( + expandedEntity, + setOf(INCOMING_COMPACT_PROPERTY), + ATTRIBUTE_UPDATED + ) + .shouldSucceedWith { assertEquals(0, it.size) } + } + @ParameterizedTest @CsvSource( "near;minDistance==1000, Polygon, '[[[100.0, 0.0], [101.0, 0.0], [101.0, -1.0], [100.0, 0.0]]]', 0", From 7fa1398912a7ace4aed9381ea14a4a254095f984 Mon Sep 17 00:00:00 2001 From: ranim-n Date: Thu, 1 Feb 2024 10:05:35 +0100 Subject: [PATCH 13/17] added delay after creating a subscription --- .../subscription/service/SubscriptionServiceTests.kt | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index 50a3af39e..a48759755 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -38,7 +38,7 @@ import org.springframework.test.context.ActiveProfiles import org.springframework.test.context.TestPropertySource import java.net.URI import java.time.ZonedDateTime -import java.util.UUID +import java.util.* import kotlin.time.Duration @SpringBootTest @@ -1139,15 +1139,18 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY), "notification" to mapOf( "endpoint" to mapOf("uri" to "http://my.endpoint/notifiy"), - "lastNotification" to ngsiLdDateTime().minusMinutes(5) + "lastNotification" to ngsiLdDateTime() ), - "throttling" to 200 + "throttling" to 5 ) val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() subscriptionService.create(subscription, mockUserSub).shouldSucceed() + runBlocking { + delay(5000) + } subscriptionService.getMatchingSubscriptions( expandedEntity, setOf(INCOMING_COMPACT_PROPERTY), From 3d4771f340a02ba347aacc56dc6e63567c29d5b0 Mon Sep 17 00:00:00 2001 From: ranim-n Date: Thu, 1 Feb 2024 10:16:12 +0100 Subject: [PATCH 14/17] fixed import --- .../stellio/subscription/service/SubscriptionServiceTests.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index a48759755..0ce212272 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -38,7 +38,7 @@ import org.springframework.test.context.ActiveProfiles import org.springframework.test.context.TestPropertySource import java.net.URI import java.time.ZonedDateTime -import java.util.* +import java.util.UUID import kotlin.time.Duration @SpringBootTest From cfa03162ab7972e088be86de27adfa7ac79ffe9f Mon Sep 17 00:00:00 2001 From: Benoit Orihuela Date: Thu, 1 Feb 2024 22:13:30 +0100 Subject: [PATCH 15/17] fix: incorrect condition when checking if throttling has elapsed - also fix unit tests so that watchedAttributes always match --- .../service/SubscriptionService.kt | 4 +- .../service/SubscriptionServiceTests.kt | 48 ++++++++++++------- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index 388831d51..c9440a5f6 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -547,7 +547,7 @@ class SubscriptionService( AND ( expires_at is null OR expires_at >= :date ) AND time_interval IS NULL AND ( throttling IS NULL - OR (last_notification + throttling * INTERVAL '1 second') > CURRENT_TIMESTAMP AT TIME ZONE 'UTC') + OR (last_notification + throttling * INTERVAL '1 second') < :date) AND ( string_to_array(watched_attributes, ',') && string_to_array(:updatedAttributes, ',') OR watched_attributes IS NULL) AND CASE @@ -558,7 +558,7 @@ class SubscriptionService( """.trimIndent() return databaseClient.sql(selectStatement) .bind("updatedAttributes", updatedAttributes.joinToString(separator = ",")) - .bind("date", Instant.now().atZone(ZoneOffset.UTC)) + .bind("date", ngsiLdDateTime()) .allToMappedList { rowToMinimalMatchSubscription(it) } .mergeEntitySelectorsOnSubscriptions() } diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index 0ce212272..8c91aed65 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -1107,26 +1107,31 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { val expandedEntity = expandJsonLdEntity(entity, APIC_COMPOUND_CONTEXTS) val payload = mapOf( - "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), + "id" to "urn:ngsi-ld:Subscription:01".toUri(), "type" to NGSILD_SUBSCRIPTION_TERM, - "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY), + "watchedAttributes" to listOf(NGSILD_LOCATION_TERM), "notification" to mapOf( - "endpoint" to mapOf("uri" to "http://my.endpoint/notifiy"), - "lastNotification" to ngsiLdDateTime() + "endpoint" to mapOf("uri" to "http://my.endpoint/notifiy") ), "throttling" to 300 ) - val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() + val subscription = ParsingUtils.parseSubscription(payload, APIC_COMPOUND_CONTEXTS).shouldSucceedAndResult() subscriptionService.create(subscription, mockUserSub).shouldSucceed() + subscriptionService.updateSubscriptionNotification( + subscription, + Notification(subscriptionId = subscription.id, data = emptyList()), + true + ) subscriptionService.getMatchingSubscriptions( expandedEntity, - setOf(INCOMING_COMPACT_PROPERTY), + setOf(NGSILD_LOCATION_PROPERTY), ATTRIBUTE_UPDATED - ) - .shouldSucceedWith { assertEquals(0, it.size) } + ).shouldSucceedWith { + assertEquals(0, it.size) + } } @Test @@ -1134,29 +1139,36 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { val expandedEntity = expandJsonLdEntity(entity, APIC_COMPOUND_CONTEXTS) val payload = mapOf( - "id" to "urn:ngsi-ld:Beehive:1234567890".toUri(), + "id" to "urn:ngsi-ld:Subscription:01".toUri(), "type" to NGSILD_SUBSCRIPTION_TERM, - "watchedAttributes" to listOf(INCOMING_COMPACT_PROPERTY), + "watchedAttributes" to listOf(NGSILD_LOCATION_TERM), "notification" to mapOf( - "endpoint" to mapOf("uri" to "http://my.endpoint/notifiy"), - "lastNotification" to ngsiLdDateTime() + "endpoint" to mapOf("uri" to "http://my.endpoint/notifiy") ), - "throttling" to 5 + "throttling" to 1 ) - val subscription = ParsingUtils.parseSubscription(payload, emptyList()).shouldSucceedAndResult() + val subscription = ParsingUtils.parseSubscription(payload, APIC_COMPOUND_CONTEXTS).shouldSucceedAndResult() subscriptionService.create(subscription, mockUserSub).shouldSucceed() + subscriptionService.updateSubscriptionNotification( + subscription, + Notification(subscriptionId = subscription.id, data = emptyList()), + true + ) + // add a delay for throttling period to be elapsed runBlocking { - delay(5000) + delay(2000) } + subscriptionService.getMatchingSubscriptions( expandedEntity, - setOf(INCOMING_COMPACT_PROPERTY), + setOf(NGSILD_LOCATION_PROPERTY), ATTRIBUTE_UPDATED - ) - .shouldSucceedWith { assertEquals(0, it.size) } + ).shouldSucceedWith { + assertEquals(1, it.size) + } } @ParameterizedTest From 5692d999760dde5c7c5cb1ed62d1df41eb9bd8e3 Mon Sep 17 00:00:00 2001 From: ranim-n Date: Fri, 2 Feb 2024 18:05:01 +0100 Subject: [PATCH 16/17] fixed condition while checking timeInterval --- .../com/egm/stellio/subscription/service/SubscriptionService.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index c9440a5f6..5bb54c841 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -90,7 +90,7 @@ class SubscriptionService( } private fun checkTimeIntervalGreaterThanZero(subscription: Subscription): Either = - if (subscription.timeInterval != null && subscription.timeInterval < 1 && subscription.throttling == null) + if (subscription.timeInterval != null && subscription.timeInterval < 1) BadRequestDataException("The value of 'timeInterval' must be greater than zero (int)").left() else Unit.right() From 4afeb58f12457e0965fe85fa13d751c56fffe56f Mon Sep 17 00:00:00 2001 From: ranim-n Date: Sun, 4 Feb 2024 10:46:56 +0100 Subject: [PATCH 17/17] fixed functions name --- .../stellio/subscription/service/SubscriptionService.kt | 8 ++++---- .../com/egm/stellio/subscription/web/APIResponses.kt | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index 5bb54c841..bd4c779f7 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -20,8 +20,8 @@ import com.egm.stellio.subscription.utils.ParsingUtils.parseEndpointInfo import com.egm.stellio.subscription.utils.ParsingUtils.parseEntitySelector import com.egm.stellio.subscription.utils.ParsingUtils.toSqlColumnName import com.egm.stellio.subscription.utils.ParsingUtils.toSqlValue -import com.egm.stellio.subscription.web.invalidSubscriptiondAttributeMessage -import com.egm.stellio.subscription.web.unsupportedSubscriptiondAttributeMessage +import com.egm.stellio.subscription.web.invalidSubscriptionAttributeMessage +import com.egm.stellio.subscription.web.unsupportedSubscriptionAttributeMessage import io.r2dbc.postgresql.codec.Json import kotlinx.coroutines.reactive.awaitFirst import org.locationtech.jts.geom.Geometry @@ -362,12 +362,12 @@ class SubscriptionService( } listOf("csf", "temporalQ").contains(it.key) -> { - NotImplementedException(unsupportedSubscriptiondAttributeMessage(subscriptionId, it.key)) + NotImplementedException(unsupportedSubscriptionAttributeMessage(subscriptionId, it.key)) .left().bind() } else -> { - BadRequestDataException(invalidSubscriptiondAttributeMessage(subscriptionId, it.key)) + BadRequestDataException(invalidSubscriptionAttributeMessage(subscriptionId, it.key)) .left().bind() } } diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt index eca9bffb9..bb593b6c1 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/web/APIResponses.kt @@ -6,8 +6,8 @@ fun subscriptionNotFoundMessage(subscriptionId: URI) = "Could not find a subscri fun subscriptionAlreadyExistsMessage(subscriptionId: URI) = "A subscription with id $subscriptionId already exists" fun subscriptionUnauthorizedMessage(subscriptionId: URI) = "User is not authorized to access subscription $subscriptionId" -fun unsupportedSubscriptiondAttributeMessage(subscriptionId: URI, key: String) = +fun unsupportedSubscriptionAttributeMessage(subscriptionId: URI, key: String) = "Subscription $subscriptionId has unsupported attribute: $key" -fun invalidSubscriptiondAttributeMessage(subscriptionId: URI, key: String) = +fun invalidSubscriptionAttributeMessage(subscriptionId: URI, key: String) = "Subscription $subscriptionId has invalid attribute: $key"