Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync the upstream with runtime fixes #1064

Merged
merged 9 commits into from
Dec 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ Then execute the below command to run the program.
BAL_CONFIG_FILES=/path/to/Config.toml bal run target/bin/wbsbsubscriber.jar
```

Running the subscriber on docker.

```sh
docker run --rm --name <name-of-container> -e TOPIC_NAME="<topic-name>" -e CONSUMER_GROUP="<consumer-group>" -e HUB_URL="<hub-URL>" -e UNSUB_ON_SHUTDOWN="true" -e SVC_PORT="<subscriber-port>" --network="<docker-network-name>" --hostname=<hostname> ayeshalmeida/wbsbsubscriber:5.0.0
```

## Publishing to the Hub

Go into the `publisher` directory and execute the following command.
Expand All @@ -198,6 +204,12 @@ Then execute the below command to run the program.
BAL_CONFIG_FILES=/path/to/Config.toml bal run target/bin/wbsbpublisher.jar
```

Running the publisher on docker.

```sh
docker run --rm --network host --name websub-publisher -e TOPIC_NAME="<topic-name>" ayeshalmeida/wbsbpublisher:4.0.0
ayeshLK marked this conversation as resolved.
Show resolved Hide resolved
```

# Scaling the Hub
The scaling of the hub can be done vertically or horizontally. As the hub itself does not maintain any state, those can be scaled up and down as needed. The entire cluster of hubs can be considered as one unit as in you can publish to a particular hub and consume the update message from another hub.

Expand Down
2 changes: 1 addition & 1 deletion examples/kafka-hub/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: 'kafkahub'

services:
hub-1:
image: 'ayeshalmeida/kafkahub:10.0.0'
image: 'ayeshalmeida/kafkahub:11.0.0'
hostname: hub1
container_name: hub-1
ports:
Expand Down
2 changes: 1 addition & 1 deletion examples/kafka-hub/hub/Cloud.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[container.image]
repository="ballerina"
name="kafkahub"
tag="10.0.0"
tag="11.0.0"

[[container.copy.files]]
sourceFile="./resources"
Expand Down
20 changes: 17 additions & 3 deletions examples/kafka-hub/hub/hub_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,18 @@ websubhub:Service hubService = service object {
if config:SECURITY_ON {
check security:authorize(headers, ["update_content"]);
}
check self.updateMessage(message);
check self.updateMessage(message, headers);
return websubhub:ACKNOWLEDGEMENT;
}

isolated function updateMessage(websubhub:UpdateMessage msg) returns websubhub:UpdateMessageError? {
isolated function updateMessage(websubhub:UpdateMessage msg, http:Headers headers) returns websubhub:UpdateMessageError? {
boolean topicAvailable = false;
lock {
topicAvailable = registeredTopicsCache.hasKey(msg.hubTopic);
}
if topicAvailable {
error? errorResponse = persist:addUpdateMessage(msg.hubTopic, msg);
map<string[]> messageHeaders = getHeadersMap(headers);
error? errorResponse = persist:addUpdateMessage(msg.hubTopic, msg, messageHeaders);
if errorResponse is websubhub:UpdateMessageError {
return errorResponse;
} else if errorResponse is error {
Expand All @@ -228,3 +229,16 @@ websubhub:Service hubService = service object {
}
}
};

isolated function getHeadersMap(http:Headers httpHeaders) returns map<string[]> {
map<string[]> headers = {};
foreach string headerName in httpHeaders.getHeaderNames() {
var headerValues = httpHeaders.getHeaders(headerName);
// safe to ingore the error as here we are retrieving only the available headers
if headerValues is error {
continue;
}
headers[headerName] = headerValues;
}
return headers;
}
4 changes: 2 additions & 2 deletions examples/kafka-hub/hub/hub_state_update.bal
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ function initializeHubState() returns error? {

function updateHubState() returns error? {
while true {
kafka:ConsumerRecord[] records = check conn:websubEventsConsumer->poll(config:POLLING_INTERVAL);
kafka:BytesConsumerRecord[] records = check conn:websubEventsConsumer->poll(config:POLLING_INTERVAL);
if records.length() <= 0 {
continue;
}
foreach kafka:ConsumerRecord currentRecord in records {
foreach kafka:BytesConsumerRecord currentRecord in records {
string lastPersistedData = check string:fromBytes(currentRecord.value);
error? result = processStateUpdateEvent(lastPersistedData);
if result is error {
Expand Down
38 changes: 24 additions & 14 deletions examples/kafka-hub/hub/modules/connections/connections.bal
Original file line number Diff line number Diff line change
Expand Up @@ -94,36 +94,46 @@ public final kafka:Consumer websubEventsConsumer = check new (config:KAFKA_URL,
#
# + topicName - The kafka-topic to which the consumer should subscribe
# + groupName - The consumer group name
# + partition - The kafka topic-partitions
# + partitions - The kafka topic-partitions
# + return - `kafka:Consumer` if succcessful or else `error`
public isolated function createMessageConsumer(string topicName, string groupName, int|int[]? partition = ()) returns kafka:Consumer|error {
public isolated function createMessageConsumer(string topicName, string groupName, int[]? partitions = ()) returns kafka:Consumer|error {
// Messages are distributed to subscribers in parallel.
// In this scenario, manually committing offsets is unnecessary because
// the next message polling starts as soon as the worker begins delivering messages to the subscribers.
// Therefore, auto-commit is enabled to handle offset management automatically.
// Related issue: https://github.com/ballerina-platform/ballerina-library/issues/7376
kafka:ConsumerConfiguration consumerConfiguration = {
groupId: groupName,
topics: [topicName],
autoCommit: true,
secureSocket: secureSocketConfig,
securityProtocol: kafka:PROTOCOL_SSL,
maxPollRecords: config:CONSUMER_MAX_POLL_RECORDS
};
if partition is () {
if partitions is () {
// Kafka consumer topic subscription should only be used when manual partition assignment is not used
consumerConfiguration.topics = [topicName];
return new (config:KAFKA_URL, consumerConfiguration);
}

kafka:Consumer consumerEp = check new (config:KAFKA_URL, consumerConfiguration);
check consumerEp->assign(getTopicPartitions(topicName, partition));
return consumerEp;
}
log:printInfo("Assigning kafka-topic partitions manually", details = partitions);
kafka:Consumer|kafka:Error consumerEp = check new (config:KAFKA_URL, consumerConfiguration);
if consumerEp is kafka:Error {
log:printError("Error occurred while creating the consumer", consumerEp);
return consumerEp;
}

isolated function getTopicPartitions(string topic, int|int[] partition) returns kafka:TopicPartition[] {
if partition is int {
return [
{topic, partition}
];
kafka:TopicPartition[] kafkaTopicPartitions = partitions.'map(p => {topic: topicName, partition: p});
kafka:Error? paritionAssignmentErr = consumerEp->assign(kafkaTopicPartitions);
if paritionAssignmentErr is kafka:Error {
log:printError("Error occurred while assigning partitions to the consumer", paritionAssignmentErr);
return paritionAssignmentErr;
}
return partition.'map(p => {topic, partition: p});

kafka:Error? kafkaSeekErr = consumerEp->seekToBeginning(kafkaTopicPartitions);
if kafkaSeekErr is kafka:Error {
log:printError("Error occurred while assigning seeking partitions for the consumer", paritionAssignmentErr);
return kafkaSeekErr;
}

return consumerEp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ isolated function updateHubState(websubhub:TopicRegistration|websubhub:TopicDere
}
}

public isolated function addUpdateMessage(string topicName, websubhub:UpdateMessage message) returns error? {
public isolated function addUpdateMessage(string topicName, websubhub:UpdateMessage message,
map<string|string[]> headers = {}) returns error? {
json payload = <json>message.content;
check produceKafkaMessage(topicName, payload);
}
Expand Down
35 changes: 21 additions & 14 deletions examples/kafka-hub/hub/websub_subscribers.bal
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import ballerina/mime;
isolated map<websubhub:VerifiedSubscription> subscribersCache = {};

const string CONSUMER_GROUP = "consumerGroup";
const string CONSUMER_TOPIC_PARTITION = "topicPartition";
const string CONSUMER_TOPIC_PARTITIONS = "topicPartitions";
const string SERVER_ID = "SERVER_ID";
const string STATUS = "status";
const string STALE_STATE = "stale";
Expand Down Expand Up @@ -70,12 +70,9 @@ isolated function processUnsubscription(websubhub:VerifiedUnsubscription unsubsc

isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubscription subscription) returns error? {
string consumerGroup = check value:ensureType(subscription[CONSUMER_GROUP]);
int? topicPartition = ();
if subscription.hasKey(CONSUMER_TOPIC_PARTITION) {
string partitionDetails = check value:ensureType(subscription[CONSUMER_TOPIC_PARTITION]);
topicPartition = check int:fromString(partitionDetails);
}
kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup, topicPartition);
int[]? topicPartitions = check getTopicPartitions(subscription);
log:printInfo("Manually partition assignment", isRequired = topicPartitions !is ());
kafka:Consumer consumerEp = check conn:createMessageConsumer(subscription.hubTopic, consumerGroup, topicPartitions);
websubhub:HubClient clientEp = check new (subscription, {
retryConfig: {
interval: config:MESSAGE_DELIVERY_RETRY_INTERVAL,
Expand All @@ -88,7 +85,7 @@ isolated function pollForNewUpdates(string subscriberId, websubhub:VerifiedSubsc
});
do {
while true {
readonly & kafka:ConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL);
readonly & kafka:BytesConsumerRecord[] records = check consumerEp->poll(config:POLLING_INTERVAL);
if !isValidConsumer(subscription.hubTopic, subscriberId) {
fail error(string `Subscriber with Id ${subscriberId} or topic ${subscription.hubTopic} is invalid`);
}
Expand Down Expand Up @@ -116,7 +113,7 @@ isolated function isValidSubscription(string subscriberId) returns boolean {
}
}

isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:HubClient clientEp) returns error? {
isolated function notifySubscribers(kafka:BytesConsumerRecord[] records, websubhub:HubClient clientEp) returns error? {
do {
foreach var kafkaRecord in records {
websubhub:ContentDistributionMessage message = check deSerializeKafkaRecord(kafkaRecord);
Expand All @@ -127,7 +124,7 @@ isolated function notifySubscribers(kafka:ConsumerRecord[] records, websubhub:Hu
}
}

isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) returns websubhub:ContentDistributionMessage|error {
isolated function deSerializeKafkaRecord(kafka:BytesConsumerRecord kafkaRecord) returns websubhub:ContentDistributionMessage|error {
byte[] content = kafkaRecord.value;
string message = check string:fromBytes(content);
json payload = check value:fromJsonString(message);
Expand All @@ -139,14 +136,24 @@ isolated function deSerializeKafkaRecord(kafka:ConsumerRecord kafkaRecord) retur
return distributionMsg;
}

isolated function getHeaders(kafka:ConsumerRecord kafkaRecord) returns map<string|string[]>|error {
isolated function getHeaders(kafka:BytesConsumerRecord kafkaRecord) returns map<string|string[]>|error {
map<string|string[]> headers = {};
foreach var ['key, value] in kafkaRecord.headers.entries().toArray() {
if value is string || value is string[] {
headers['key] = value;
} else if value is byte[] {
if value is byte[] {
headers['key] = check string:fromBytes(value);
} else if value is byte[][] {
string[] headerValue = value.'map(v => check string:fromBytes(v));
headers['key] = headerValue;
}
}
return headers;
}

isolated function getTopicPartitions(websubhub:VerifiedSubscription subscription) returns int[]|error? {
if !subscription.hasKey(CONSUMER_TOPIC_PARTITIONS) {
return;
}
// Kafka topic partitions will be a string with comma separated integers eg: "1,2,3,4"
string partitionInfo = check value:ensureType(subscription[CONSUMER_TOPIC_PARTITIONS]);
return re `,`.split(partitionInfo).'map(p => p.trim()).'map(p => check int:fromString(p));
}
2 changes: 1 addition & 1 deletion examples/kafka-hub/subscriber/Cloud.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[container.image]
repository="ballerina"
name="wbsbsubscriber"
tag="5.0.0"
tag="6.0.0"

[[container.copy.files]]
sourceFile="./resources"
Expand Down
17 changes: 12 additions & 5 deletions examples/kafka-hub/subscriber/subscriber_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,19 @@ service /JuApTOXq19 on securedSubscriber {
}

isolated function getCustomParams() returns map<string> {
if os:getEnv("CONSUMER_GROUP") == "" {
return {};
if os:getEnv("CONSUMER_GROUP") !is "" {
return {
consumerGroup: os:getEnv("CONSUMER_GROUP")
};
}
return {
consumerGroup: os:getEnv("CONSUMER_GROUP")
};

if os:getEnv("TOPIC_PARTITIONS") !is "" {
return {
topicPartitions: os:getEnv("TOPIC_PARTITIONS")
};
}

return {};
}

isolated function getListener() returns websub:Listener|error {
Expand Down
Loading