Skip to content

Commit

Permalink
Merge pull request #925 from ayeshLK/kafkahub-mtls-dev
Browse files Browse the repository at this point in the history
Update KafkaHub deployment related configurations
  • Loading branch information
ayeshLK authored May 2, 2024
2 parents 209d765 + a8fb3d8 commit 27aef30
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 26 deletions.
2 changes: 1 addition & 1 deletion examples/kafka-hub/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: 'kafkahub'

services:
hub-1:
image: 'ayeshalmeida/kafkahub:9.0.0'
image: 'ayeshalmeida/kafkahub:10.0.0'
hostname: hub1
container_name: hub-1
ports:
Expand Down
2 changes: 1 addition & 1 deletion examples/kafka-hub/hub/Cloud.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[container.image]
repository="ballerina"
name="kafkahub"
tag="9.0.0"
tag="10.0.0"

[[container.copy.files]]
sourceFile="./resources"
Expand Down
9 changes: 1 addition & 8 deletions examples/kafka-hub/hub/hub_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,7 @@ http:Service healthCheckService = service object {
}
};

websubhub:Service hubService = @websubhub:ServiceConfig {
webHookConfig: {
secureSocket: {
cert: "./resources/server.crt"
}
}
}
service object {
websubhub:Service hubService = service object {

# Registers a `topic` in the hub.
#
Expand Down
5 changes: 1 addition & 4 deletions examples/kafka-hub/hub/websub_subscribers.bal
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc
backOffFactor: 2.0,
maxWaitInterval: 20
},
timeout: config:MESSAGE_DELIVERY_TIMEOUT,
secureSocket: {
cert: "./resources/server.crt"
}
timeout: config:MESSAGE_DELIVERY_TIMEOUT
});
do {
while true {
Expand Down
2 changes: 1 addition & 1 deletion examples/kafka-hub/subscriber/Cloud.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[container.image]
repository="ballerina"
name="wbsbsubscriber"
tag="4.0.0"
tag="5.0.0"

[[container.copy.files]]
sourceFile="./resources"
Expand Down
30 changes: 19 additions & 11 deletions examples/kafka-hub/subscriber/subscriber_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
// specific language governing permissions and limitations
// under the License.

import ballerina/websub;
import ballerina/log;
import ballerina/http;
import ballerina/websubhub;
import ballerina/log;
import ballerina/os;
import ballerina/websub;
import ballerina/websubhub;

final string topicName = os:getEnv("TOPIC_NAME") == "" ? "priceUpdate" : os:getEnv("TOPIC_NAME");
final string hubUrl = os:getEnv("HUB_URL") == "" ? "https://lb:9090/hub" : os:getEnv("HUB_URL");
Expand All @@ -31,12 +31,13 @@ type OAuth2Config record {|
string trustStore;
string trustStorePassword;
|};

configurable OAuth2Config oauth2Config = ?;

listener websub:Listener securedSubscriber = new(9100, host = os:getEnv("HOSTNAME"));
listener websub:Listener securedSubscriber = getListener();

function init() returns error? {
websubhub:PublisherClient websubHubClientEP = check new(hubUrl,
websubhub:PublisherClient websubHubClientEP = check new (hubUrl,
auth = {
tokenUrl: oauth2Config.tokenUrl,
clientId: oauth2Config.clientId,
Expand Down Expand Up @@ -67,10 +68,10 @@ function init() returns error? {
}
}

@websub:SubscriberServiceConfig {
@websub:SubscriberServiceConfig {
target: [hubUrl, topicName],
httpConfig: {
auth : {
auth: {
tokenUrl: oauth2Config.tokenUrl,
clientId: oauth2Config.clientId,
clientSecret: oauth2Config.clientSecret,
Expand All @@ -84,7 +85,7 @@ function init() returns error? {
}
}
},
secureSocket : {
secureSocket: {
cert: {
path: "./resources/subscriber.truststore.jks",
password: "password"
Expand All @@ -93,10 +94,10 @@ function init() returns error? {
},
unsubscribeOnShutdown: unsubOnShutdown,
customParams: getCustomParams()
}
}
service /JuApTOXq19 on securedSubscriber {
remote function onSubscriptionVerification(websub:SubscriptionVerification msg)

remote function onSubscriptionVerification(websub:SubscriptionVerification msg)
returns websub:SubscriptionVerificationSuccess {
log:printInfo(string `Successfully subscribed for notifications on topic [${topicName}]`);
return websub:SUBSCRIPTION_VERIFICATION_SUCCESS;
Expand All @@ -116,3 +117,10 @@ isolated function getCustomParams() returns map<string> {
consumerGroup: os:getEnv("CONSUMER_GROUP")
};
}

isolated function getListener() returns websub:Listener|error {
if os:getEnv("SVC_PORT") == "" {
return new (9100, host = os:getEnv("HOSTNAME"));
}
return new (check int:fromString(os:getEnv("SVC_PORT")), host = os:getEnv("HOSTNAME"));
}

0 comments on commit 27aef30

Please sign in to comment.