diff --git a/examples/kafka-hub/docker-compose.yaml b/examples/kafka-hub/docker-compose.yaml index 8c5ce879..6bbf5a4a 100644 --- a/examples/kafka-hub/docker-compose.yaml +++ b/examples/kafka-hub/docker-compose.yaml @@ -35,6 +35,8 @@ services: KEYSTORE_PASSWORD: "password" # Maximum number of records returned in a single call to consumer-poll KAFKA_CONSUMER_MAX_POLL_RECORDS: 50 + # The HTTP status codes for which the client should retry + RETRYABLE_STATUS_CODES: "500,502,503" volumes: # Kafka client truststore file - ./_resources/secrets/kafka-client/kafka-client.trustStore.jks:/home/ballerina/resources/brokercerts/client-truststore.jks diff --git a/examples/kafka-hub/hub/Config.toml b/examples/kafka-hub/hub/Config.toml index 915adeff..90e8241b 100644 --- a/examples/kafka-hub/hub/Config.toml +++ b/examples/kafka-hub/hub/Config.toml @@ -36,6 +36,9 @@ MESSAGE_DELIVERY_COUNT = 3 # The message delivery timeout MESSAGE_DELIVERY_TIMEOUT = 10.0 +# The HTTP status codes for which the client should retry +MESSAGE_DELIVERY_RETRYABLE_STATUS_CODES = [500, 502, 503] + # The Oauth2 authorization related configurations [kafkaHub.config.OAUTH2_CONFIG] issuer = "https://localhost:9443/oauth2/token" diff --git a/examples/kafka-hub/hub/modules/config/configurations.bal b/examples/kafka-hub/hub/modules/config/configurations.bal index 5f47678f..cfc7e110 100644 --- a/examples/kafka-hub/hub/modules/config/configurations.bal +++ b/examples/kafka-hub/hub/modules/config/configurations.bal @@ -64,6 +64,11 @@ public configurable int MESSAGE_DELIVERY_COUNT = 3; # The message delivery timeout public configurable decimal MESSAGE_DELIVERY_TIMEOUT = 10; +# The HTTP status codes for which the client should retry +public configurable int[] MESSAGE_DELIVERY_RETRYABLE_STATUS_CODES = [500, 502, 503]; + +public final readonly & int[] RETRYABLE_STATUS_CODES = check getRetryableStatusCodes(MESSAGE_DELIVERY_RETRYABLE_STATUS_CODES).cloneReadOnly(); + # The Oauth2 authorization related configurations public configurable types:OAuth2Config OAUTH2_CONFIG = ?; @@ -76,3 +81,11 @@ public final string WEBSUB_EVENTS_CONSUMER_GROUP = os:getEnv("WEBSUB_EVENTS_CONS isolated function constructSystemConsumerGroup() returns string { return string `websub-events-receiver-${SERVER_IDENTIFIER}-${util:generateRandomString()}`; } + +isolated function getRetryableStatusCodes(int[] configuredCodes) returns int[]|error { + if os:getEnv("RETRYABLE_STATUS_CODES") is "" { + return configuredCodes; + } + string[] statusCodes = re `,`.split(os:getEnv("RETRYABLE_STATUS_CODES")); + return statusCodes.'map(i => check int:fromString(i.trim())); +} diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 05042e73..17de27f1 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -75,7 +75,8 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc interval: config:MESSAGE_DELIVERY_RETRY_INTERVAL, count: config:MESSAGE_DELIVERY_COUNT, backOffFactor: 2.0, - maxWaitInterval: 20 + maxWaitInterval: 20, + statusCodes: config:RETRYABLE_STATUS_CODES }, timeout: config:MESSAGE_DELIVERY_TIMEOUT });