diff --git a/examples/kafka-hub/docker-compose.yaml b/examples/kafka-hub/docker-compose.yaml index 3e28f976..9fba8c11 100644 --- a/examples/kafka-hub/docker-compose.yaml +++ b/examples/kafka-hub/docker-compose.yaml @@ -3,7 +3,7 @@ name: 'kafkahub' services: hub-1: - image: 'ayeshalmeida/kafkahub:9.0.0' + image: 'ayeshalmeida/kafkahub:10.0.0' hostname: hub1 container_name: hub-1 ports: diff --git a/examples/kafka-hub/hub/Cloud.toml b/examples/kafka-hub/hub/Cloud.toml index 90304c58..3e39c08e 100644 --- a/examples/kafka-hub/hub/Cloud.toml +++ b/examples/kafka-hub/hub/Cloud.toml @@ -1,7 +1,7 @@ [container.image] repository="ballerina" name="kafkahub" -tag="9.0.0" +tag="10.0.0" [[container.copy.files]] sourceFile="./resources" diff --git a/examples/kafka-hub/hub/hub_service.bal b/examples/kafka-hub/hub/hub_service.bal index 3981a347..f5e5d303 100644 --- a/examples/kafka-hub/hub/hub_service.bal +++ b/examples/kafka-hub/hub/hub_service.bal @@ -32,14 +32,7 @@ http:Service healthCheckService = service object { } }; -websubhub:Service hubService = @websubhub:ServiceConfig { - webHookConfig: { - secureSocket: { - cert: "./resources/server.crt" - } - } -} -service object { +websubhub:Service hubService = service object { # Registers a `topic` in the hub. # diff --git a/examples/kafka-hub/hub/websub_subscribers.bal b/examples/kafka-hub/hub/websub_subscribers.bal index ff64840e..761b8b46 100644 --- a/examples/kafka-hub/hub/websub_subscribers.bal +++ b/examples/kafka-hub/hub/websub_subscribers.bal @@ -77,10 +77,7 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc backOffFactor: 2.0, maxWaitInterval: 20 }, - timeout: config:MESSAGE_DELIVERY_TIMEOUT, - secureSocket: { - cert: "./resources/server.crt" - } + timeout: config:MESSAGE_DELIVERY_TIMEOUT }); do { while true { diff --git a/examples/kafka-hub/subscriber/Cloud.toml b/examples/kafka-hub/subscriber/Cloud.toml index 6007113e..e3076be4 100644 --- a/examples/kafka-hub/subscriber/Cloud.toml +++ b/examples/kafka-hub/subscriber/Cloud.toml @@ -1,7 +1,7 @@ [container.image] repository="ballerina" name="wbsbsubscriber" -tag="4.0.0" +tag="5.0.0" [[container.copy.files]] sourceFile="./resources" diff --git a/examples/kafka-hub/subscriber/subscriber_service.bal b/examples/kafka-hub/subscriber/subscriber_service.bal index 3e0f647b..00b36d7a 100644 --- a/examples/kafka-hub/subscriber/subscriber_service.bal +++ b/examples/kafka-hub/subscriber/subscriber_service.bal @@ -14,11 +14,11 @@ // specific language governing permissions and limitations // under the License. -import ballerina/websub; -import ballerina/log; import ballerina/http; -import ballerina/websubhub; +import ballerina/log; import ballerina/os; +import ballerina/websub; +import ballerina/websubhub; final string topicName = os:getEnv("TOPIC_NAME") == "" ? "priceUpdate" : os:getEnv("TOPIC_NAME"); final string hubUrl = os:getEnv("HUB_URL") == "" ? "https://lb:9090/hub" : os:getEnv("HUB_URL"); @@ -31,12 +31,13 @@ type OAuth2Config record {| string trustStore; string trustStorePassword; |}; + configurable OAuth2Config oauth2Config = ?; -listener websub:Listener securedSubscriber = new(9100, host = os:getEnv("HOSTNAME")); +listener websub:Listener securedSubscriber = getListener(); function init() returns error? { - websubhub:PublisherClient websubHubClientEP = check new(hubUrl, + websubhub:PublisherClient websubHubClientEP = check new (hubUrl, auth = { tokenUrl: oauth2Config.tokenUrl, clientId: oauth2Config.clientId, @@ -67,10 +68,10 @@ function init() returns error? { } } -@websub:SubscriberServiceConfig { +@websub:SubscriberServiceConfig { target: [hubUrl, topicName], httpConfig: { - auth : { + auth: { tokenUrl: oauth2Config.tokenUrl, clientId: oauth2Config.clientId, clientSecret: oauth2Config.clientSecret, @@ -84,7 +85,7 @@ function init() returns error? { } } }, - secureSocket : { + secureSocket: { cert: { path: "./resources/subscriber.truststore.jks", password: "password" @@ -93,10 +94,10 @@ function init() returns error? { }, unsubscribeOnShutdown: unsubOnShutdown, customParams: getCustomParams() -} +} service /JuApTOXq19 on securedSubscriber { - - remote function onSubscriptionVerification(websub:SubscriptionVerification msg) + + remote function onSubscriptionVerification(websub:SubscriptionVerification msg) returns websub:SubscriptionVerificationSuccess { log:printInfo(string `Successfully subscribed for notifications on topic [${topicName}]`); return websub:SUBSCRIPTION_VERIFICATION_SUCCESS; @@ -116,3 +117,10 @@ isolated function getCustomParams() returns map { consumerGroup: os:getEnv("CONSUMER_GROUP") }; } + +isolated function getListener() returns websub:Listener|error { + if os:getEnv("SVC_PORT") == "" { + return new (9100, host = os:getEnv("HOSTNAME")); + } + return new (check int:fromString(os:getEnv("SVC_PORT")), host = os:getEnv("HOSTNAME")); +}