From 99b4a5f45b8d5f8f040d2d2fb0c33bd3afd8e095 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Wed, 20 Nov 2024 16:44:39 +0530 Subject: [PATCH 1/3] Add support to retry for specific status code for message delivery --- examples/kafka-hub/hub/Config.toml | 3 +++ examples/kafka-hub/hub/modules/config/configurations.bal | 3 +++ examples/kafka-hub/hub/websub_subscribers.bal | 3 ++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/examples/kafka-hub/hub/Config.toml b/examples/kafka-hub/hub/Config.toml index 915adeff..d0b4b8db 100644 --- a/examples/kafka-hub/hub/Config.toml +++ b/examples/kafka-hub/hub/Config.toml @@ -36,6 +36,9 @@ MESSAGE_DELIVERY_COUNT = 3 # The message delivery timeout MESSAGE_DELIVERY_TIMEOUT = 10.0 +# The HTTP status codes for which the client should retry +RETRYABLE_STATUS_CODES = [500, 502, 503] + # The Oauth2 authorization related configurations [kafkaHub.config.OAUTH2_CONFIG] issuer = "https://localhost:9443/oauth2/token" diff --git a/examples/kafka-hub/hub/modules/config/configurations.bal b/examples/kafka-hub/hub/modules/config/configurations.bal index 5f47678f..61a4d756 100644 --- a/examples/kafka-hub/hub/modules/config/configurations.bal +++ b/examples/kafka-hub/hub/modules/config/configurations.bal @@ -64,6 +64,9 @@ public configurable int MESSAGE_DELIVERY_COUNT = 3; # The message delivery timeout public configurable decimal MESSAGE_DELIVERY_TIMEOUT = 10; +# The HTTP status codes for which the client should retry +public configurable int[] RETRYABLE_STATUS_CODES = [500, 502, 503]; + # The Oauth2 authorization related configurations public configurable types:OAuth2Config OAUTH2_CONFIG = ?; diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index 05042e73..17de27f1 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -75,7 +75,8 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc interval: config:MESSAGE_DELIVERY_RETRY_INTERVAL, count: config:MESSAGE_DELIVERY_COUNT, backOffFactor: 2.0, - maxWaitInterval: 20 + maxWaitInterval: 20, + statusCodes: config:RETRYABLE_STATUS_CODES }, timeout: config:MESSAGE_DELIVERY_TIMEOUT }); From c2db62ce15c06143462197c0f077127942b29e0a Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Wed, 20 Nov 2024 16:53:32 +0530 Subject: [PATCH 2/3] Add support to configure retryable HTTP status codes for message delivery --- examples/kafka-hub/docker-compose.yaml | 2 ++ examples/kafka-hub/hub/Config.toml | 2 +- .../kafka-hub/hub/modules/config/configurations.bal | 12 +++++++++++- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/examples/kafka-hub/docker-compose.yaml b/examples/kafka-hub/docker-compose.yaml index 8c5ce879..538ac83a 100644 --- a/examples/kafka-hub/docker-compose.yaml +++ b/examples/kafka-hub/docker-compose.yaml @@ -35,6 +35,8 @@ services: KEYSTORE_PASSWORD: "password" # Maximum number of records returned in a single call to consumer-poll KAFKA_CONSUMER_MAX_POLL_RECORDS: 50 + # The HTTP status codes for which the client should retry + RETRYABLE_STATUS_CODES: "500 502 503" volumes: # Kafka client truststore file - ./_resources/secrets/kafka-client/kafka-client.trustStore.jks:/home/ballerina/resources/brokercerts/client-truststore.jks diff --git a/examples/kafka-hub/hub/Config.toml b/examples/kafka-hub/hub/Config.toml index d0b4b8db..90e8241b 100644 --- a/examples/kafka-hub/hub/Config.toml +++ b/examples/kafka-hub/hub/Config.toml @@ -37,7 +37,7 @@ MESSAGE_DELIVERY_COUNT = 3 MESSAGE_DELIVERY_TIMEOUT = 10.0 # The HTTP status codes for which the client should retry -RETRYABLE_STATUS_CODES = [500, 502, 503] +MESSAGE_DELIVERY_RETRYABLE_STATUS_CODES = [500, 502, 503] # The Oauth2 authorization related configurations [kafkaHub.config.OAUTH2_CONFIG] diff --git a/examples/kafka-hub/hub/modules/config/configurations.bal b/examples/kafka-hub/hub/modules/config/configurations.bal index 61a4d756..e20fc8a7 100644 --- a/examples/kafka-hub/hub/modules/config/configurations.bal +++ b/examples/kafka-hub/hub/modules/config/configurations.bal @@ -65,7 +65,9 @@ public configurable int MESSAGE_DELIVERY_COUNT = 3; public configurable decimal MESSAGE_DELIVERY_TIMEOUT = 10; # The HTTP status codes for which the client should retry -public configurable int[] RETRYABLE_STATUS_CODES = [500, 502, 503]; +public configurable int[] MESSAGE_DELIVERY_RETRYABLE_STATUS_CODES = [500, 502, 503]; + +public final readonly & int[] RETRYABLE_STATUS_CODES = check getRetryableStatusCodes(MESSAGE_DELIVERY_RETRYABLE_STATUS_CODES).cloneReadOnly(); # The Oauth2 authorization related configurations public configurable types:OAuth2Config OAUTH2_CONFIG = ?; @@ -79,3 +81,11 @@ public final string WEBSUB_EVENTS_CONSUMER_GROUP = os:getEnv("WEBSUB_EVENTS_CONS isolated function constructSystemConsumerGroup() returns string { return string `websub-events-receiver-${SERVER_IDENTIFIER}-${util:generateRandomString()}`; } + +isolated function getRetryableStatusCodes(int[] configuredCodes) returns int[]|error { + if os:getEnv("RETRYABLE_STATUS_CODES") is "" { + return configuredCodes; + } + string[] statusCodes = re ` `.split(os:getEnv("RETRYABLE_STATUS_CODES")); + return statusCodes.'map(i => check int:fromString(i)); +} From d4dc22dcceacf106c10e808b38ff86ee940ac075 Mon Sep 17 00:00:00 2001 From: Ayesh Almeida Date: Wed, 20 Nov 2024 17:26:01 +0530 Subject: [PATCH 3/3] Incorporate review suggestions --- examples/kafka-hub/docker-compose.yaml | 2 +- examples/kafka-hub/hub/modules/config/configurations.bal | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/kafka-hub/docker-compose.yaml b/examples/kafka-hub/docker-compose.yaml index 538ac83a..6bbf5a4a 100644 --- a/examples/kafka-hub/docker-compose.yaml +++ b/examples/kafka-hub/docker-compose.yaml @@ -36,7 +36,7 @@ services: # Maximum number of records returned in a single call to consumer-poll KAFKA_CONSUMER_MAX_POLL_RECORDS: 50 # The HTTP status codes for which the client should retry - RETRYABLE_STATUS_CODES: "500 502 503" + RETRYABLE_STATUS_CODES: "500,502,503" volumes: # Kafka client truststore file - ./_resources/secrets/kafka-client/kafka-client.trustStore.jks:/home/ballerina/resources/brokercerts/client-truststore.jks diff --git a/examples/kafka-hub/hub/modules/config/configurations.bal b/examples/kafka-hub/hub/modules/config/configurations.bal index e20fc8a7..cfc7e110 100644 --- a/examples/kafka-hub/hub/modules/config/configurations.bal +++ b/examples/kafka-hub/hub/modules/config/configurations.bal @@ -86,6 +86,6 @@ isolated function getRetryableStatusCodes(int[] configuredCodes) returns int[]|e if os:getEnv("RETRYABLE_STATUS_CODES") is "" { return configuredCodes; } - string[] statusCodes = re ` `.split(os:getEnv("RETRYABLE_STATUS_CODES")); - return statusCodes.'map(i => check int:fromString(i)); + string[] statusCodes = re `,`.split(os:getEnv("RETRYABLE_STATUS_CODES")); + return statusCodes.'map(i => check int:fromString(i.trim())); }