Skip to content

Commit

Permalink
Merge pull request #1085 from stellio-hub/feature/add-throttling
Browse files Browse the repository at this point in the history
Feature/add throttling
  • Loading branch information
ranim-n authored Feb 5, 2024
2 parents d0b294d + 4afeb58 commit bef2f83
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ data class Subscription(
// - used to compact entities in notifications
// - used when needed to serve contexts in JSON notifications
@JsonProperty(value = JSONLD_CONTEXT)
val contexts: List<ExpandedTerm>
val contexts: List<ExpandedTerm>,
val throttling: Int? = null
) {

@Transient
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.egm.stellio.subscription.service

import arrow.core.*
import arrow.core.Either
import arrow.core.Option
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.shared.model.*
import com.egm.stellio.shared.util.*
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE_TERM
Expand All @@ -10,19 +13,18 @@ import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SUBSCRIPTION_TERM
import com.egm.stellio.shared.util.JsonLdUtils.expandJsonLdTerm
import com.egm.stellio.subscription.config.SubscriptionProperties
import com.egm.stellio.subscription.model.*
import com.egm.stellio.subscription.model.GeoQ
import com.egm.stellio.subscription.model.Subscription
import com.egm.stellio.subscription.utils.*
import com.egm.stellio.subscription.utils.ParsingUtils.endpointInfoMapToString
import com.egm.stellio.subscription.utils.ParsingUtils.endpointInfoToString
import com.egm.stellio.subscription.utils.ParsingUtils.parseEndpointInfo
import com.egm.stellio.subscription.utils.ParsingUtils.parseEntitySelector
import com.egm.stellio.subscription.utils.ParsingUtils.toSqlColumnName
import com.egm.stellio.subscription.utils.ParsingUtils.toSqlValue
import com.egm.stellio.subscription.web.invalidSubscriptionAttributeMessage
import com.egm.stellio.subscription.web.unsupportedSubscriptionAttributeMessage
import io.r2dbc.postgresql.codec.Json
import kotlinx.coroutines.reactive.awaitFirst
import org.locationtech.jts.geom.Geometry
import org.slf4j.LoggerFactory
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate
import org.springframework.data.relational.core.query.Criteria.where
import org.springframework.data.relational.core.query.Query.query
Expand All @@ -44,13 +46,12 @@ class SubscriptionService(
private val r2dbcEntityTemplate: R2dbcEntityTemplate
) {

private val logger = LoggerFactory.getLogger(javaClass)

suspend fun validateNewSubscription(subscription: Subscription): Either<APIException, Unit> = either {
checkTypeIsSubscription(subscription).bind()
checkIdIsValid(subscription).bind()
checkEntitiesOrWatchedAttributes(subscription).bind()
checkTimeIntervalGreaterThanZero(subscription).bind()
checkThrottlingGreaterThanZero(subscription).bind()
checkSubscriptionValidity(subscription).bind()
checkExpiresAtInTheFuture(subscription).bind()
checkIdPatternIsValid(subscription).bind()
Expand All @@ -73,17 +74,31 @@ class SubscriptionService(
else Unit.right()

private fun checkSubscriptionValidity(subscription: Subscription): Either<APIException, Unit> =
if (subscription.watchedAttributes != null && subscription.timeInterval != null)
BadRequestDataException(
"You can't use 'timeInterval' in conjunction with 'watchedAttributes'"
).left()
else Unit.right()
when {
subscription.watchedAttributes != null && subscription.timeInterval != null -> {
BadRequestDataException(
"You can't use 'timeInterval' in conjunction with 'watchedAttributes'"
).left()
}
subscription.timeInterval != null && subscription.throttling != null -> {
BadRequestDataException(
"You can't use 'timeInterval' in conjunction with 'throttling'"
).left()
}
else ->
Unit.right()
}

private fun checkTimeIntervalGreaterThanZero(subscription: Subscription): Either<APIException, Unit> =
if (subscription.timeInterval != null && subscription.timeInterval < 1)
BadRequestDataException("The value of 'timeInterval' must be greater than zero (int)").left()
else Unit.right()

private fun checkThrottlingGreaterThanZero(subscription: Subscription): Either<APIException, Unit> =
if (subscription.throttling != null && subscription.throttling < 1)
BadRequestDataException("The value of 'throttling' must be greater than zero (int)").left()
else Unit.right()

private fun checkExpiresAtInTheFuture(subscription: Subscription): Either<BadRequestDataException, Unit> =
if (subscription.expiresAt != null && subscription.expiresAt.isBefore(ngsiLdDateTime()))
BadRequestDataException("'expiresAt' must be in the future").left()
Expand Down Expand Up @@ -134,11 +149,11 @@ class SubscriptionService(
INSERT INTO subscription(id, type, subscription_name, created_at, description, watched_attributes,
notification_trigger, time_interval, q, scope_q, notif_attributes, notif_format, endpoint_uri,
endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, times_sent, is_active,
expires_at, sub, contexts)
expires_at, sub, contexts, throttling)
VALUES(:id, :type, :subscription_name, :created_at, :description, :watched_attributes,
:notification_trigger, :time_interval, :q, :scope_q, :notif_attributes, :notif_format, :endpoint_uri,
:endpoint_accept, :endpoint_receiver_info, :endpoint_notifier_info, :times_sent, :is_active,
:expires_at, :sub, :contexts)
:expires_at, :sub, :contexts, :throttling)
""".trimIndent()

databaseClient.sql(insertStatement)
Expand All @@ -163,6 +178,7 @@ class SubscriptionService(
.bind("expires_at", subscription.expiresAt)
.bind("sub", sub.toStringValue())
.bind("contexts", subscription.contexts.toTypedArray())
.bind("throttling", subscription.throttling)
.execute().bind()

geoQuery?.let {
Expand Down Expand Up @@ -233,7 +249,7 @@ class SubscriptionService(
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = :id
LEFT JOIN geometry_query ON geometry_query.subscription_id = :id
Expand Down Expand Up @@ -337,22 +353,21 @@ class SubscriptionService(
"q",
"scopeQ",
"isActive",
"modifiedAt"
"modifiedAt",
"throttling"
).contains(it.key) -> {
val columnName = it.key.toSqlColumnName()
val value = it.value.toSqlValue(it.key)
updateSubscriptionAttribute(subscriptionId, columnName, value).bind()
}

listOf("csf", "throttling", "temporalQ").contains(it.key) -> {
logger.warn("Subscription $subscriptionId has unsupported attribute: ${it.key}")
NotImplementedException("Subscription $subscriptionId has unsupported attribute: ${it.key}")
listOf("csf", "temporalQ").contains(it.key) -> {
NotImplementedException(unsupportedSubscriptionAttributeMessage(subscriptionId, it.key))
.left().bind<Unit>()
}

else -> {
logger.warn("Subscription $subscriptionId has invalid attribute: ${it.key}")
BadRequestDataException("Subscription $subscriptionId has invalid attribute: ${it.key}")
BadRequestDataException(invalidSubscriptionAttributeMessage(subscriptionId, it.key))
.left().bind<Unit>()
}
}
Expand Down Expand Up @@ -416,6 +431,7 @@ class SubscriptionService(
}
listOf(Pair("notif_attributes", attributes))
}

"format" -> {
val format =
if (attribute.value == "keyValues")
Expand All @@ -424,6 +440,7 @@ class SubscriptionService(
NotificationParams.FormatType.NORMALIZED.name
listOf(Pair("notif_format", format))
}

"endpoint" -> {
val endpoint = attribute.value as Map<String, Any>
val accept =
Expand All @@ -441,6 +458,7 @@ class SubscriptionService(
Pair("endpoint_notifier_info", Json.of(endpointInfoMapToString(endpointNotifierInfo)))
)
}

else -> throw BadRequestDataException("Could not update attribute ${attribute.key}")
}
}
Expand Down Expand Up @@ -479,7 +497,7 @@ class SubscriptionService(
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down Expand Up @@ -521,13 +539,15 @@ class SubscriptionService(
entity_selector.id as entity_id, entity_selector.id_pattern as id_pattern,
entity_selector.type_selection as type_selection, georel, geometry, coordinates, pgis_geometry,
geoproperty, scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, times_sent,
endpoint_receiver_info, endpoint_notifier_info, contexts
endpoint_receiver_info, endpoint_notifier_info, contexts, throttling
FROM subscription
LEFT JOIN entity_selector on subscription.id = entity_selector.subscription_id
LEFT JOIN geometry_query on subscription.id = geometry_query.subscription_id
WHERE is_active
AND ( expires_at is null OR expires_at >= :date )
AND time_interval IS NULL
AND ( throttling IS NULL
OR (last_notification + throttling * INTERVAL '1 second') < :date)
AND ( string_to_array(watched_attributes, ',') && string_to_array(:updatedAttributes, ',')
OR watched_attributes IS NULL)
AND CASE
Expand All @@ -538,7 +558,7 @@ class SubscriptionService(
""".trimIndent()
return databaseClient.sql(selectStatement)
.bind("updatedAttributes", updatedAttributes.joinToString(separator = ","))
.bind("date", Instant.now().atZone(ZoneOffset.UTC))
.bind("date", ngsiLdDateTime())
.allToMappedList { rowToMinimalMatchSubscription(it) }
.mergeEntitySelectorsOnSubscriptions()
}
Expand Down Expand Up @@ -669,7 +689,8 @@ class SubscriptionService(
lastSuccess = toNullableZonedDateTime(row["last_success"])
),
isActive = toBoolean(row["is_active"]),
contexts = toList(row["contexts"])
contexts = toList(row["contexts"]),
throttling = toNullableInt(row["throttling"])
)
}

Expand Down Expand Up @@ -698,7 +719,8 @@ class SubscriptionService(
lastFailure = null,
lastSuccess = null
),
contexts = toList(row["contexts"])
contexts = toList(row["contexts"]),
throttling = toNullableInt(row["throttling"])
)
}

Expand Down Expand Up @@ -734,7 +756,7 @@ class SubscriptionService(
scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info,
endpoint_notifier_info, status, times_sent, last_notification, last_failure, last_success, is_active,
entity_selector.id as entity_id, id_pattern, entity_selector.type_selection as type_selection, georel,
geometry, coordinates, pgis_geometry, geoproperty, contexts
geometry, coordinates, pgis_geometry, geoproperty, contexts, throttling
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@ fun subscriptionNotFoundMessage(subscriptionId: URI) = "Could not find a subscri
fun subscriptionAlreadyExistsMessage(subscriptionId: URI) = "A subscription with id $subscriptionId already exists"
fun subscriptionUnauthorizedMessage(subscriptionId: URI) =
"User is not authorized to access subscription $subscriptionId"
fun unsupportedSubscriptionAttributeMessage(subscriptionId: URI, key: String) =
"Subscription $subscriptionId has unsupported attribute: $key"

fun invalidSubscriptionAttributeMessage(subscriptionId: URI, key: String) =
"Subscription $subscriptionId has invalid attribute: $key"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE subscription
ADD throttling integer;
Loading

0 comments on commit bef2f83

Please sign in to comment.