Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync the upstream with runtime fixes #1064

Merged
merged 9 commits into from
Dec 12, 2024
Next Next commit
Add support to convert http-headers to kafka-headers
  • Loading branch information
ayeshLK committed Nov 23, 2024
commit 759205bab3fb8d6745bce14ccf094a9b143d07d2
20 changes: 17 additions & 3 deletions examples/kafka-hub/hub/hub_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,18 @@ websubhub:Service hubService = service object {
if config:SECURITY_ON {
check security:authorize(headers, ["update_content"]);
}
check self.updateMessage(message);
check self.updateMessage(message, headers);
return websubhub:ACKNOWLEDGEMENT;
}

isolated function updateMessage(websubhub:UpdateMessage msg) returns websubhub:UpdateMessageError? {
isolated function updateMessage(websubhub:UpdateMessage msg, http:Headers headers) returns websubhub:UpdateMessageError? {
boolean topicAvailable = false;
lock {
topicAvailable = registeredTopicsCache.hasKey(msg.hubTopic);
}
if topicAvailable {
error? errorResponse = persist:addUpdateMessage(msg.hubTopic, msg);
map<string[]> messageHeaders = getHeadersMap(headers);
error? errorResponse = persist:addUpdateMessage(msg.hubTopic, msg, messageHeaders);
if errorResponse is websubhub:UpdateMessageError {
return errorResponse;
} else if errorResponse is error {
Expand All @@ -228,3 +229,16 @@ websubhub:Service hubService = service object {
}
}
};

isolated function getHeadersMap(http:Headers httpHeaders) returns map<string[]> {
map<string[]> headers = {};
foreach string headerName in httpHeaders.getHeaderNames() {
var headerValues = httpHeaders.getHeaders(headerName);
// safe to ingore the error as here we are retrieving only the available headers
if headerValues is error {
continue;
}
headers[headerName] = headerValues;
}
return headers;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ isolated function updateHubState(websubhub:TopicRegistration|websubhub:TopicDere
}
}

public isolated function addUpdateMessage(string topicName, websubhub:UpdateMessage message) returns error? {
public isolated function addUpdateMessage(string topicName, websubhub:UpdateMessage message,
map<string|string[]> headers = {}) returns error? {
json payload = <json>message.content;
check produceKafkaMessage(topicName, payload);
}
Expand Down