diff --git a/ballerina-tests/http-advanced-tests/Ballerina.toml b/ballerina-tests/http-advanced-tests/Ballerina.toml index 3666d83c68..2e2f5ab74b 100644 --- a/ballerina-tests/http-advanced-tests/Ballerina.toml +++ b/ballerina-tests/http-advanced-tests/Ballerina.toml @@ -1,17 +1,17 @@ [package] org = "ballerina" name = "http_advanced_tests" -version = "2.11.3" +version = "2.12.0" [[dependency]] org = "ballerina" name = "http_test_common" repository = "local" -version = "2.11.3" +version = "2.12.0" [platform.java17] graalvmCompatible = true [[platform.java17.dependency]] scope = "testOnly" -path = "../../test-utils/build/libs/http-test-utils-2.11.3-SNAPSHOT.jar" +path = "../../test-utils/build/libs/http-test-utils-2.12.0-SNAPSHOT.jar" diff --git a/ballerina-tests/http-advanced-tests/Dependencies.toml b/ballerina-tests/http-advanced-tests/Dependencies.toml index 30edf040ce..cc8f694fec 100644 --- a/ballerina-tests/http-advanced-tests/Dependencies.toml +++ b/ballerina-tests/http-advanced-tests/Dependencies.toml @@ -72,7 +72,7 @@ modules = [ [[package]] org = "ballerina" name = "http" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -105,7 +105,7 @@ modules = [ [[package]] org = "ballerina" name = "http_advanced_tests" -version = "2.11.3" +version = "2.12.0" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "file"}, @@ -125,7 +125,7 @@ modules = [ [[package]] org = "ballerina" name = "http_test_common" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "lang.string"}, @@ -160,7 +160,7 @@ scope = "testOnly" [[package]] org = "ballerina" name = "jwt" -version = "2.11.0" +version = "2.12.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "cache"}, @@ -285,7 +285,7 @@ dependencies = [ [[package]] org = "ballerina" name = "mime" -version = "2.9.0" +version = "2.10.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, diff --git a/ballerina-tests/http-advanced-tests/tests/server_sent_event_tests.bal b/ballerina-tests/http-advanced-tests/tests/server_sent_event_tests.bal new file mode 100644 index 0000000000..320722a0d5 --- /dev/null +++ b/ballerina-tests/http-advanced-tests/tests/server_sent_event_tests.bal @@ -0,0 +1,188 @@ +// Copyright (c) 2024 WSO2 LLC. (https://www.wso2.com). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/http; +import ballerina/lang.runtime; +import ballerina/test; + +listener http:Listener http1SseListener = new http:Listener(http1SsePort, httpVersion = http:HTTP_1_1); +listener http:Listener http2SseListener = new http:Listener(http2SsePort); +final http:Client http1SseClient = check new (string `http://localhost:${http1SsePort}`, httpVersion = http:HTTP_1_1); +final http:Client http2SseClient = check new (string `http://localhost:${http2SsePort}`); + +class SseEventGenerator { + private final int eventCount; + private boolean completeWithError; + private int currentEventCount = 0; + + function init(int eventCount = 10, boolean completeWithError = false) { + self.eventCount = eventCount; + self.completeWithError = completeWithError; + } + + public isolated function next() returns record {|http:SseEvent value;|}|error? { + runtime:sleep(0.1); + http:SseEvent sseEvent = {data: string `count: ${self.currentEventCount}`, id: self.currentEventCount.toString()}; + if self.currentEventCount == 0 { + sseEvent.event = "start"; + } else if self.currentEventCount == self.eventCount { + sseEvent.event = "end"; + } else { + sseEvent.event = "continue"; + sseEvent.'retry = 10; + } + if self.currentEventCount > self.eventCount { + return self.completeWithError ? error("ending with error") : (); + } + self.currentEventCount += 1; + return {value: sseEvent}; + } +} + +class SseEventGenerator2 { + private final int eventCount; + private int currentEventCount = 0; + + public isolated function next() returns record {|http:SseEvent value;|}|error { + http:SseEvent sseEvent = {data: string `count: ${self.currentEventCount}`, id: self.currentEventCount.toString()}; + if self.currentEventCount == 0 { + sseEvent.event = "start"; + } else { + sseEvent.event = "continue"; + sseEvent.'retry = 10; + } + if self.currentEventCount > 3 { + return error("ending with error"); + } + self.currentEventCount += 1; + return {value: sseEvent}; + } +} + +service /sse on http1SseListener { + resource function 'default [string... paths](http:Request req) returns stream { + return new (new SseEventGenerator()); + } +} + +service /sse on http2SseListener { + resource function post .(http:Request req) returns http:Response { + http:Response response = new; + stream sseEventStream = new (new SseEventGenerator()); + response.setSseEventStream(sseEventStream); + return response; + } + + resource function get completeWithError() returns stream { + return new (new SseEventGenerator(3, true)); + } + + resource function get completeWithError2() returns stream { + return new (new SseEventGenerator2()); + } +} + +@test:Config {} +function testHttp1ResponseHeadersForSseEventStream() returns error? { + http:Response response = check http1SseClient->/sse; + test:assertEquals(response.getHeader("Connection"), "keep-alive"); + test:assertEquals(response.getHeader("Content-Type"), "text/event-stream"); + test:assertEquals(response.getHeader("Transfer-Encoding"), "chunked"); + test:assertTrue((check response.getHeader("Cache-Control")).startsWith("no-cache")); + stream actualSseEvents = check response.getSseEventStream(); + stream expectedSseEvents = new (new SseEventGenerator()); + check assertEventStream(actualSseEvents, expectedSseEvents); +} + +@test:Config {} +function testHttp2ResponseHeadersForSseEventStream() returns error? { + http:Response response = check http2SseClient->/sse.post({}); + test:assertTrue(response.getHeader("Connection") is http:HeaderNotFoundError); + test:assertEquals(response.getHeader("Content-Type"), "text/event-stream"); + test:assertTrue((check response.getHeader("Cache-Control")).startsWith("no-cache")); + stream actualSseEvents = check response.getSseEventStream(); + stream expectedSseEvents = new (new SseEventGenerator()); + check assertEventStream(actualSseEvents, expectedSseEvents); +} + +@test:Config {} +function testClientDataBindingForSseEventStream() returns error? { + stream actualSseEvents = check http1SseClient->/sse; + stream expectedSseEvents = new (new SseEventGenerator()); + check assertEventStream(actualSseEvents, expectedSseEvents); +} + +@test:Config {} +function testClientDataBindingFailure() returns error? { + stream|json|error actualSseEvents = http1SseClient->/sse; + if actualSseEvents is error { + test:assertEquals(actualSseEvents.message(), "payload binding failed: " + + "Target return type must not be a union of stream and anydata"); + } else { + test:assertFail("An error expected"); + } +} + +@test:Config {} +function testClientRequestMethodsWithStreamType() returns error? { + stream actualSseEvents = check http1SseClient->/sse; + stream expectedSseEvents = new (new SseEventGenerator()); + check assertEventStream(actualSseEvents, expectedSseEvents); + + actualSseEvents = check http1SseClient->/sse.delete(); + expectedSseEvents = new (new SseEventGenerator()); + check assertEventStream(actualSseEvents, expectedSseEvents); + + actualSseEvents = check http1SseClient->/sse.options(); + expectedSseEvents = new (new SseEventGenerator()); + check assertEventStream(actualSseEvents, expectedSseEvents); + + actualSseEvents = check http1SseClient->/sse.post({}); + expectedSseEvents = new (new SseEventGenerator()); + check assertEventStream(actualSseEvents, expectedSseEvents); + + actualSseEvents = check http1SseClient->/sse.put({}); + expectedSseEvents = new (new SseEventGenerator()); + check assertEventStream(actualSseEvents, expectedSseEvents); + + actualSseEvents = check http1SseClient->/sse.patch({}); + expectedSseEvents = new (new SseEventGenerator()); + check assertEventStream(actualSseEvents, expectedSseEvents); +} + +@test:Config {} +function testServiceCompletesStreamWithErrorEvent() returns error? { + stream actualSseEvents = check http2SseClient->/sse/completeWithError; + stream expectedSseEvents = new (new SseEventGenerator(3, true)); + check assertEventStream(actualSseEvents, expectedSseEvents); + + actualSseEvents = check http2SseClient->/sse/completeWithError2; + expectedSseEvents = new (new SseEventGenerator2()); + check assertEventStream(actualSseEvents, expectedSseEvents); +} + +isolated function assertEventStream(stream actualSseEvents, stream expectedSseEvents) returns error? { + error? err = from http:SseEvent expectedEvent in expectedSseEvents + do { + record {|http:SseEvent value;|}? valueRecord = check actualSseEvents.next(); + test:assertEquals(valueRecord?.value, expectedEvent); + }; + if err is error { + http:SseEvent expectedEvent = {event: "error", data: err.message()}; + record {|http:SseEvent value;|}? valueRecord = check actualSseEvents.next(); + test:assertEquals(valueRecord?.value, expectedEvent); + } +} diff --git a/ballerina-tests/http-advanced-tests/tests/test_service_ports.bal b/ballerina-tests/http-advanced-tests/tests/test_service_ports.bal index 7176d4045f..8757e53819 100644 --- a/ballerina-tests/http-advanced-tests/tests/test_service_ports.bal +++ b/ballerina-tests/http-advanced-tests/tests/test_service_ports.bal @@ -42,3 +42,5 @@ const int statusCodeErrorUseCasePort = 9090; const int statusCodeErrorPort = 9092; const int identicalCookiePort = 9093; +const int http1SsePort = 9094; +const int http2SsePort = 9095; diff --git a/ballerina-tests/http-client-tests/Ballerina.toml b/ballerina-tests/http-client-tests/Ballerina.toml index cfbf0e883a..f7e2c8b27f 100644 --- a/ballerina-tests/http-client-tests/Ballerina.toml +++ b/ballerina-tests/http-client-tests/Ballerina.toml @@ -1,17 +1,17 @@ [package] org = "ballerina" name = "http_client_tests" -version = "2.11.3" +version = "2.12.0" [[dependency]] org = "ballerina" name = "http_test_common" repository = "local" -version = "2.11.3" +version = "2.12.0" [platform.java17] graalvmCompatible = true [[platform.java17.dependency]] scope = "testOnly" -path = "../../test-utils/build/libs/http-test-utils-2.11.3-SNAPSHOT.jar" +path = "../../test-utils/build/libs/http-test-utils-2.12.0-SNAPSHOT.jar" diff --git a/ballerina-tests/http-client-tests/Dependencies.toml b/ballerina-tests/http-client-tests/Dependencies.toml index 12263323db..17ebb0a219 100644 --- a/ballerina-tests/http-client-tests/Dependencies.toml +++ b/ballerina-tests/http-client-tests/Dependencies.toml @@ -69,7 +69,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -102,7 +102,7 @@ modules = [ [[package]] org = "ballerina" name = "http_client_tests" -version = "2.11.3" +version = "2.12.0" dependencies = [ {org = "ballerina", name = "constraint"}, {org = "ballerina", name = "http"}, @@ -121,7 +121,7 @@ modules = [ [[package]] org = "ballerina" name = "http_test_common" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "lang.string"}, @@ -156,7 +156,7 @@ scope = "testOnly" [[package]] org = "ballerina" name = "jwt" -version = "2.11.0" +version = "2.12.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "cache"}, @@ -281,7 +281,7 @@ dependencies = [ [[package]] org = "ballerina" name = "mime" -version = "2.9.0" +version = "2.10.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, diff --git a/ballerina-tests/http-dispatching-tests/Ballerina.toml b/ballerina-tests/http-dispatching-tests/Ballerina.toml index 4f47b5b8dc..67e3cf723a 100644 --- a/ballerina-tests/http-dispatching-tests/Ballerina.toml +++ b/ballerina-tests/http-dispatching-tests/Ballerina.toml @@ -1,17 +1,17 @@ [package] org = "ballerina" name = "http_dispatching_tests" -version = "2.11.3" +version = "2.12.0" [[dependency]] org = "ballerina" name = "http_test_common" repository = "local" -version = "2.11.3" +version = "2.12.0" [platform.java17] graalvmCompatible = true [[platform.java17.dependency]] scope = "testOnly" -path = "../../test-utils/build/libs/http-test-utils-2.11.3-SNAPSHOT.jar" +path = "../../test-utils/build/libs/http-test-utils-2.12.0-SNAPSHOT.jar" diff --git a/ballerina-tests/http-dispatching-tests/Dependencies.toml b/ballerina-tests/http-dispatching-tests/Dependencies.toml index 22ca844370..c961d36156 100644 --- a/ballerina-tests/http-dispatching-tests/Dependencies.toml +++ b/ballerina-tests/http-dispatching-tests/Dependencies.toml @@ -69,7 +69,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -102,7 +102,7 @@ modules = [ [[package]] org = "ballerina" name = "http_dispatching_tests" -version = "2.11.3" +version = "2.12.0" dependencies = [ {org = "ballerina", name = "constraint"}, {org = "ballerina", name = "http"}, @@ -124,7 +124,7 @@ modules = [ [[package]] org = "ballerina" name = "http_test_common" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "lang.string"}, @@ -159,7 +159,7 @@ scope = "testOnly" [[package]] org = "ballerina" name = "jwt" -version = "2.11.0" +version = "2.12.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "cache"}, @@ -320,7 +320,7 @@ dependencies = [ [[package]] org = "ballerina" name = "mime" -version = "2.9.0" +version = "2.10.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, diff --git a/ballerina-tests/http-interceptor-tests/Ballerina.toml b/ballerina-tests/http-interceptor-tests/Ballerina.toml index e5180c2199..d698448cf6 100644 --- a/ballerina-tests/http-interceptor-tests/Ballerina.toml +++ b/ballerina-tests/http-interceptor-tests/Ballerina.toml @@ -1,17 +1,17 @@ [package] org = "ballerina" name = "http_interceptor_tests" -version = "2.11.3" +version = "2.12.0" [[dependency]] org = "ballerina" name = "http_test_common" repository = "local" -version = "2.11.3" +version = "2.12.0" [platform.java17] graalvmCompatible = true [[platform.java17.dependency]] scope = "testOnly" -path = "../../test-utils/build/libs/http-test-utils-2.11.3-SNAPSHOT.jar" +path = "../../test-utils/build/libs/http-test-utils-2.12.0-SNAPSHOT.jar" diff --git a/ballerina-tests/http-interceptor-tests/Dependencies.toml b/ballerina-tests/http-interceptor-tests/Dependencies.toml index d66abdc4b0..f683df447e 100644 --- a/ballerina-tests/http-interceptor-tests/Dependencies.toml +++ b/ballerina-tests/http-interceptor-tests/Dependencies.toml @@ -66,7 +66,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -99,7 +99,7 @@ modules = [ [[package]] org = "ballerina" name = "http_interceptor_tests" -version = "2.11.3" +version = "2.12.0" dependencies = [ {org = "ballerina", name = "http"}, {org = "ballerina", name = "http_test_common"}, @@ -115,7 +115,7 @@ modules = [ [[package]] org = "ballerina" name = "http_test_common" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "lang.string"}, @@ -147,7 +147,7 @@ scope = "testOnly" [[package]] org = "ballerina" name = "jwt" -version = "2.11.0" +version = "2.12.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "cache"}, @@ -275,7 +275,7 @@ dependencies = [ [[package]] org = "ballerina" name = "mime" -version = "2.9.0" +version = "2.10.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, diff --git a/ballerina-tests/http-misc-tests/Ballerina.toml b/ballerina-tests/http-misc-tests/Ballerina.toml index 9f020b8cd2..80705565f0 100644 --- a/ballerina-tests/http-misc-tests/Ballerina.toml +++ b/ballerina-tests/http-misc-tests/Ballerina.toml @@ -1,17 +1,17 @@ [package] org = "ballerina" name = "http_misc_tests" -version = "2.11.3" +version = "2.12.0" [[dependency]] org = "ballerina" name = "http_test_common" repository = "local" -version = "2.11.3" +version = "2.12.0" [platform.java17] graalvmCompatible = true [[platform.java17.dependency]] scope = "testOnly" -path = "../../test-utils/build/libs/http-test-utils-2.11.3-SNAPSHOT.jar" +path = "../../test-utils/build/libs/http-test-utils-2.12.0-SNAPSHOT.jar" diff --git a/ballerina-tests/http-misc-tests/Dependencies.toml b/ballerina-tests/http-misc-tests/Dependencies.toml index bc265a579b..e04fbba65e 100644 --- a/ballerina-tests/http-misc-tests/Dependencies.toml +++ b/ballerina-tests/http-misc-tests/Dependencies.toml @@ -66,7 +66,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -99,7 +99,7 @@ modules = [ [[package]] org = "ballerina" name = "http_misc_tests" -version = "2.11.3" +version = "2.12.0" dependencies = [ {org = "ballerina", name = "http"}, {org = "ballerina", name = "http_test_common"}, @@ -118,7 +118,7 @@ modules = [ [[package]] org = "ballerina" name = "http_test_common" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "lang.string"}, @@ -156,7 +156,7 @@ modules = [ [[package]] org = "ballerina" name = "jwt" -version = "2.11.0" +version = "2.12.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "cache"}, @@ -281,7 +281,7 @@ dependencies = [ [[package]] org = "ballerina" name = "mime" -version = "2.9.0" +version = "2.10.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, diff --git a/ballerina-tests/http-resiliency-tests/Ballerina.toml b/ballerina-tests/http-resiliency-tests/Ballerina.toml index 97168d5339..b4c4c0c477 100644 --- a/ballerina-tests/http-resiliency-tests/Ballerina.toml +++ b/ballerina-tests/http-resiliency-tests/Ballerina.toml @@ -1,17 +1,17 @@ [package] org = "ballerina" name = "http_resiliency_tests" -version = "2.11.3" +version = "2.12.0" [[dependency]] org = "ballerina" name = "http_test_common" repository = "local" -version = "2.11.3" +version = "2.12.0" [platform.java17] graalvmCompatible = true [[platform.java17.dependency]] scope = "testOnly" -path = "../../test-utils/build/libs/http-test-utils-2.11.3-SNAPSHOT.jar" +path = "../../test-utils/build/libs/http-test-utils-2.12.0-SNAPSHOT.jar" diff --git a/ballerina-tests/http-resiliency-tests/Dependencies.toml b/ballerina-tests/http-resiliency-tests/Dependencies.toml index 2e983f45c0..7a3fb8745f 100644 --- a/ballerina-tests/http-resiliency-tests/Dependencies.toml +++ b/ballerina-tests/http-resiliency-tests/Dependencies.toml @@ -66,7 +66,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -99,7 +99,7 @@ modules = [ [[package]] org = "ballerina" name = "http_resiliency_tests" -version = "2.11.3" +version = "2.12.0" dependencies = [ {org = "ballerina", name = "http"}, {org = "ballerina", name = "http_test_common"}, @@ -116,7 +116,7 @@ modules = [ [[package]] org = "ballerina" name = "http_test_common" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "lang.string"}, @@ -154,7 +154,7 @@ modules = [ [[package]] org = "ballerina" name = "jwt" -version = "2.11.0" +version = "2.12.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "cache"}, @@ -276,7 +276,7 @@ dependencies = [ [[package]] org = "ballerina" name = "mime" -version = "2.9.0" +version = "2.10.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, diff --git a/ballerina-tests/http-security-tests/Ballerina.toml b/ballerina-tests/http-security-tests/Ballerina.toml index f10c39de40..f212262908 100644 --- a/ballerina-tests/http-security-tests/Ballerina.toml +++ b/ballerina-tests/http-security-tests/Ballerina.toml @@ -1,17 +1,17 @@ [package] org = "ballerina" name = "http_security_tests" -version = "2.11.3" +version = "2.12.0" [[dependency]] org = "ballerina" name = "http_test_common" repository = "local" -version = "2.11.3" +version = "2.12.0" [platform.java17] graalvmCompatible = true [[platform.java17.dependency]] scope = "testOnly" -path = "../../test-utils/build/libs/http-test-utils-2.11.3-SNAPSHOT.jar" +path = "../../test-utils/build/libs/http-test-utils-2.12.0-SNAPSHOT.jar" diff --git a/ballerina-tests/http-security-tests/Dependencies.toml b/ballerina-tests/http-security-tests/Dependencies.toml index a4975c1b5c..83fb155b6e 100644 --- a/ballerina-tests/http-security-tests/Dependencies.toml +++ b/ballerina-tests/http-security-tests/Dependencies.toml @@ -69,7 +69,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -102,7 +102,7 @@ modules = [ [[package]] org = "ballerina" name = "http_security_tests" -version = "2.11.3" +version = "2.12.0" dependencies = [ {org = "ballerina", name = "auth"}, {org = "ballerina", name = "http"}, @@ -120,7 +120,7 @@ modules = [ [[package]] org = "ballerina" name = "http_test_common" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "lang.string"}, @@ -152,7 +152,7 @@ scope = "testOnly" [[package]] org = "ballerina" name = "jwt" -version = "2.11.0" +version = "2.12.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "cache"}, @@ -277,7 +277,7 @@ dependencies = [ [[package]] org = "ballerina" name = "mime" -version = "2.9.0" +version = "2.10.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, diff --git a/ballerina-tests/http-service-tests/Ballerina.toml b/ballerina-tests/http-service-tests/Ballerina.toml index b7b2b8b521..59eb1cf495 100644 --- a/ballerina-tests/http-service-tests/Ballerina.toml +++ b/ballerina-tests/http-service-tests/Ballerina.toml @@ -1,17 +1,17 @@ [package] org = "ballerina" name = "http_service_tests" -version = "2.11.3" +version = "2.12.0" [[dependency]] org = "ballerina" name = "http_test_common" repository = "local" -version = "2.11.3" +version = "2.12.0" [platform.java17] graalvmCompatible = true [[platform.java17.dependency]] scope = "testOnly" -path = "../../test-utils/build/libs/http-test-utils-2.11.3-SNAPSHOT.jar" +path = "../../test-utils/build/libs/http-test-utils-2.12.0-SNAPSHOT.jar" diff --git a/ballerina-tests/http-service-tests/Dependencies.toml b/ballerina-tests/http-service-tests/Dependencies.toml index 13fe2395d1..60e494b42e 100644 --- a/ballerina-tests/http-service-tests/Dependencies.toml +++ b/ballerina-tests/http-service-tests/Dependencies.toml @@ -69,7 +69,7 @@ modules = [ [[package]] org = "ballerina" name = "http" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -102,7 +102,7 @@ modules = [ [[package]] org = "ballerina" name = "http_service_tests" -version = "2.11.3" +version = "2.12.0" dependencies = [ {org = "ballerina", name = "file"}, {org = "ballerina", name = "http"}, @@ -121,7 +121,7 @@ modules = [ [[package]] org = "ballerina" name = "http_test_common" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "lang.string"}, @@ -156,7 +156,7 @@ scope = "testOnly" [[package]] org = "ballerina" name = "jwt" -version = "2.11.0" +version = "2.12.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "cache"}, @@ -281,7 +281,7 @@ dependencies = [ [[package]] org = "ballerina" name = "mime" -version = "2.9.0" +version = "2.10.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, diff --git a/ballerina-tests/http-test-common/Ballerina.toml b/ballerina-tests/http-test-common/Ballerina.toml index ef9703d828..e165475d52 100644 --- a/ballerina-tests/http-test-common/Ballerina.toml +++ b/ballerina-tests/http-test-common/Ballerina.toml @@ -1,4 +1,4 @@ [package] org = "ballerina" name = "http_test_common" -version = "2.11.3" +version = "2.12.0" diff --git a/ballerina-tests/http-test-common/Dependencies.toml b/ballerina-tests/http-test-common/Dependencies.toml index efd39389bc..ebbef3ddd6 100644 --- a/ballerina-tests/http-test-common/Dependencies.toml +++ b/ballerina-tests/http-test-common/Dependencies.toml @@ -10,7 +10,7 @@ distribution-version = "2201.9.0" [[package]] org = "ballerina" name = "http_test_common" -version = "2.11.3" +version = "2.12.0" dependencies = [ {org = "ballerina", name = "lang.string"}, {org = "ballerina", name = "mime"}, @@ -108,7 +108,7 @@ dependencies = [ [[package]] org = "ballerina" name = "mime" -version = "2.9.0" +version = "2.10.0" dependencies = [ {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, diff --git a/ballerina-tests/http2-tests/Ballerina.toml b/ballerina-tests/http2-tests/Ballerina.toml index de3119c376..8829d304b3 100644 --- a/ballerina-tests/http2-tests/Ballerina.toml +++ b/ballerina-tests/http2-tests/Ballerina.toml @@ -1,17 +1,17 @@ [package] org = "ballerina" name = "http2_tests" -version = "2.11.3" +version = "2.12.0" [[dependency]] org = "ballerina" name = "http_test_common" repository = "local" -version = "2.11.3" +version = "2.12.0" [platform.java17] graalvmCompatible = true [[platform.java17.dependency]] scope = "testOnly" -path = "../../test-utils/build/libs/http-test-utils-2.11.3-SNAPSHOT.jar" +path = "../../test-utils/build/libs/http-test-utils-2.12.0-SNAPSHOT.jar" diff --git a/ballerina-tests/http2-tests/Dependencies.toml b/ballerina-tests/http2-tests/Dependencies.toml index 3d80896be4..dcd3efb0ed 100644 --- a/ballerina-tests/http2-tests/Dependencies.toml +++ b/ballerina-tests/http2-tests/Dependencies.toml @@ -69,7 +69,7 @@ modules = [ [[package]] org = "ballerina" name = "http" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -102,7 +102,7 @@ modules = [ [[package]] org = "ballerina" name = "http2_tests" -version = "2.11.3" +version = "2.12.0" dependencies = [ {org = "ballerina", name = "file"}, {org = "ballerina", name = "http"}, @@ -121,7 +121,7 @@ modules = [ [[package]] org = "ballerina" name = "http_test_common" -version = "2.11.3" +version = "2.12.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "lang.string"}, @@ -156,7 +156,7 @@ scope = "testOnly" [[package]] org = "ballerina" name = "jwt" -version = "2.11.0" +version = "2.12.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "cache"}, @@ -281,7 +281,7 @@ dependencies = [ [[package]] org = "ballerina" name = "mime" -version = "2.9.0" +version = "2.10.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 7570a49bbf..462ed1fc55 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerina" name = "http" -version = "2.11.3" +version = "2.12.0" authors = ["Ballerina"] keywords = ["http", "network", "service", "listener", "client"] repository = "https://github.com/ballerina-platform/module-ballerina-http" @@ -16,14 +16,14 @@ graalvmCompatible = true [[platform.java17.dependency]] groupId = "io.ballerina.stdlib" artifactId = "http-native" -version = "2.11.3" -path = "../native/build/libs/http-native-2.11.3-SNAPSHOT.jar" +version = "2.12.0" +path = "../native/build/libs/http-native-2.12.0-SNAPSHOT.jar" [[platform.java17.dependency]] groupId = "io.ballerina.stdlib" artifactId = "mime-native" -version = "2.9.0" -path = "./lib/mime-native-2.9.0.jar" +version = "2.10.0" +path = "./lib/mime-native-2.10.0-20240724-124600-f193322.jar" [[platform.java17.dependency]] groupId = "io.ballerina.stdlib" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index 1f6001a559..fc0dcc7e77 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "http-compiler-plugin" class = "io.ballerina.stdlib.http.compiler.HttpCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/http-compiler-plugin-2.11.3-SNAPSHOT.jar" +path = "../compiler-plugin/build/libs/http-compiler-plugin-2.12.0-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index ee2ab8a1f1..90f947b41a 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -76,7 +76,7 @@ modules = [ [[package]] org = "ballerina" name = "http" -version = "2.11.3" +version = "2.12.0" dependencies = [ {org = "ballerina", name = "auth"}, {org = "ballerina", name = "cache"}, @@ -128,7 +128,7 @@ modules = [ [[package]] org = "ballerina" name = "jwt" -version = "2.11.0" +version = "2.12.1" dependencies = [ {org = "ballerina", name = "cache"}, {org = "ballerina", name = "crypto"}, @@ -254,7 +254,7 @@ modules = [ [[package]] org = "ballerina" name = "mime" -version = "2.9.0" +version = "2.10.0" dependencies = [ {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, diff --git a/ballerina/bytes_to_sse_stream_generator.bal b/ballerina/bytes_to_sse_stream_generator.bal new file mode 100644 index 0000000000..a60801ab9b --- /dev/null +++ b/ballerina/bytes_to_sse_stream_generator.bal @@ -0,0 +1,162 @@ +// Copyright (c) 2024 WSO2 LLC. (https://www.wso2.com). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/io; +import ballerina/lang.regexp; +import ballerina/log; + +const byte LINE_FEED = 10; +const byte CARRIAGE_RETURN = 13; + +enum SseFieldName { + COMMENT = "", + ID = "id", + RETRY = "retry", + EVENT = "event", + DATA = "data" +}; + +# This class is designed to read a stream of data one byte at a time. +# It specifically handles the scenario where the streaming party sends data +# in small increments, potentially one byte at a time. +# +# The main functionality of this class is to parse the incoming byte stream +# and detect consecutive line feed characters. When two consecutive line feeds +# ('\n\n' | '\r\r' | '\r\n\r\n') are detected, it signifies the end of an SSE (Server-Sent Event) message, +# and a new `SseEvent` record is created to represent this message. +class BytesToEventStreamGenerator { + private final stream byteStream; + private boolean isClosed = false; + private byte[] lookaheadBuffer = []; + + isolated function init(stream byteStream) { + self.byteStream = byteStream; + } + + public isolated function next() returns record {|SseEvent value;|}|error? { + do { + string? sseEvent = check self.readUntilDoubleLineBreaks(); + if sseEvent is () { + return; + } + return {value: check parseSseEvent(sseEvent)}; + } on fail error e { + log:printError("failed to construct SseEvent", e); + return e; + } + } + + public isolated function close() returns error? { + check self.byteStream.close(); + self.isClosed = true; + } + + private isolated function readUntilDoubleLineBreaks() returns string|error? { + byte[] buffer = []; + byte prevByte = 0; + byte? currentByte = (); + boolean foundCariageReturnWithNewLine = false; + while !self.isClosed { + currentByte = check self.getNextByte(); + if currentByte is () { + return; + } + if foundCariageReturnWithNewLine && currentByte == CARRIAGE_RETURN { + // Lookahead for newline + byte? nextByte = check self.getNextByte(); + if nextByte is () { + return; + } + if nextByte == LINE_FEED { + buffer.push(currentByte); + buffer.push(nextByte); + return string:fromBytes(buffer); + } + // Store char in lookahead buffer if not newline + self.lookaheadBuffer.push(nextByte); + } + foundCariageReturnWithNewLine = false; + if ((currentByte == LINE_FEED || currentByte == CARRIAGE_RETURN) && prevByte == currentByte) { + buffer.push(currentByte); + return string:fromBytes(buffer); + } + if currentByte == LINE_FEED && prevByte == CARRIAGE_RETURN { + foundCariageReturnWithNewLine = true; + } + buffer.push(currentByte); + prevByte = currentByte; + } + return; + } + + # Reads next byte from the lookahead buffer if data is available, otherwise read from the byte stream + # + return - A `byte?` on success, `error` on failure + private isolated function getNextByte() returns byte|error? { + if self.lookaheadBuffer.length() > 0 { + return self.lookaheadBuffer.shift(); + } + record {byte[] value;}? nextValue = check self.byteStream.next(); + return nextValue is () ? () : nextValue.value[0]; + } +} + +isolated function parseSseEvent(string event) returns SseEvent|error { + string[] lines = re `\r\n|\n|\r`.split(event); + string? id = (); + string? comment = (); + string? data = (); + int? 'retry = (); + string? eventName = (); + + foreach string line in lines { + if line == "" { + continue; + } + regexp:Groups? groups = re `(.*?):(.*)`.findGroups(line); + string filedName = line; + string fieldValue = ""; + if groups is regexp:Groups && groups.length() == 3 { + regexp:Span? filedNameSpan = groups[1]; + regexp:Span? filedValueSpan = groups[2]; + if filedNameSpan is () || filedValueSpan is () { + continue; + } + filedName = filedNameSpan.substring().trim(); + fieldValue = removeLeadingSpace(filedValueSpan.substring()); + } + if filedName == ID { + id = fieldValue; + } else if filedName == COMMENT { + comment = fieldValue; + } else if filedName == RETRY { + int|error retryValue = int:fromString(fieldValue); + 'retry = retryValue is error ? () : retryValue; + } else if filedName == EVENT { + eventName = fieldValue; + } else if filedName == DATA { + if data is () { + data = fieldValue; + } else { + data += fieldValue; + } + } + } + return {data, id, comment, 'retry, event: eventName}; +} + +isolated function removeLeadingSpace(string line) returns string { + return line.startsWith(" ") ? line.substring(1) : line; +} diff --git a/ballerina/http_client_endpoint.bal b/ballerina/http_client_endpoint.bal index 7d5d5954b7..2923b62d60 100644 --- a/ballerina/http_client_endpoint.bal +++ b/ballerina/http_client_endpoint.bal @@ -62,7 +62,7 @@ public client isolated class Client { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -78,7 +78,7 @@ public client isolated class Client { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function post(string path, RequestMessage message, map? headers = (), @@ -88,7 +88,7 @@ public client isolated class Client { } external; private isolated function processPost(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); Response|ClientError response = self.httpClient->post(path, req); @@ -104,7 +104,7 @@ public client isolated class Client { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -120,7 +120,7 @@ public client isolated class Client { # + message - An HTTP outbound request or any allowed payload # + mediaType - The MIME type header of the request entity # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function put(string path, RequestMessage message, map? headers = (), @@ -130,7 +130,7 @@ public client isolated class Client { } external; private isolated function processPut(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); Response|ClientError response = self.httpClient->put(path, req); @@ -146,7 +146,7 @@ public client isolated class Client { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -162,7 +162,7 @@ public client isolated class Client { # + message - An HTTP outbound request or any allowed payload # + mediaType - The MIME type header of the request entity # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function patch(string path, RequestMessage message, map? headers = (), @@ -172,7 +172,7 @@ public client isolated class Client { } external; private isolated function processPatch(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); Response|ClientError response = self.httpClient->patch(path, req); @@ -188,7 +188,7 @@ public client isolated class Client { # + message - An optional HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -204,7 +204,7 @@ public client isolated class Client { # + message - An optional HTTP outbound request message or any allowed payload # + mediaType - The MIME type header of the request entity # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function delete(string path, RequestMessage message = (), @@ -214,7 +214,7 @@ public client isolated class Client { } external; private isolated function processDelete(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); Response|ClientError response = self.httpClient->delete(path, req); @@ -254,7 +254,7 @@ public client isolated class Client { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -268,7 +268,7 @@ public client isolated class Client { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function get(string path, map? headers = (), TargetType targetType = <>) @@ -277,7 +277,7 @@ public client isolated class Client { } external; private isolated function processGet(string path, map? headers, TargetType targetType) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { Request req = buildRequestWithHeaders(headers); Response|ClientError response = self.httpClient->get(path, message = req); if observabilityEnabled && response is Response { @@ -290,7 +290,7 @@ public client isolated class Client { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -304,7 +304,7 @@ public client isolated class Client { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function options(string path, map? headers = (), TargetType targetType = <>) @@ -313,7 +313,7 @@ public client isolated class Client { } external; private isolated function processOptions(string path, map? headers, TargetType targetType) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { Request req = buildRequestWithHeaders(headers); Response|ClientError response = self.httpClient->options(path, message = req); if observabilityEnabled && response is Response { @@ -329,7 +329,7 @@ public client isolated class Client { # + message - An HTTP outbound request or any allowed payload # + mediaType - The MIME type header of the request entity # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function execute(string httpVerb, string path, RequestMessage message, @@ -340,7 +340,7 @@ public client isolated class Client { private isolated function processExecute(string httpVerb, string path, RequestMessage message, TargetType targetType, string? mediaType, map? headers) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); Response|ClientError response = self.httpClient->execute(httpVerb, path, req); @@ -354,7 +354,7 @@ public client isolated class Client { # # + path - Request path # + request - An HTTP inbound request message - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function forward(string path, Request request, TargetType targetType = <>) @@ -363,7 +363,7 @@ public client isolated class Client { } external; private isolated function processForward(string path, Request request, TargetType targetType) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { Response|ClientError response = self.httpClient->forward(path, request); if observabilityEnabled && response is Response { addObservabilityInformation(path, request.method, response.statusCode, self.url); @@ -720,7 +720,7 @@ isolated function createStatusCodeResponseDataBindingError(DataBindingErrorType } isolated function processResponse(Response|ClientError response, TargetType targetType, boolean requireValidation) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { if response is ClientError || hasHttpResponseType(targetType) { return response; } @@ -745,8 +745,30 @@ isolated function processResponse(Response|ClientError response, TargetType targ return performDataValidation(payload, targetType); } return payload; - } else { - panic error GenericClientError("invalid payload target type"); + } + if targetType is typedesc> { + return getSseEventStream(response); + } + if targetType is typedesc> { + return error PayloadBindingClientError("payload binding failed: " + + "Target return type must not be a union of stream and anydata"); + } + panic error GenericClientError("invalid payload target type"); +} + +isolated function getSseEventStream(Response response) returns stream|ClientError { + check validateEventStreamContentType(response); + // The streaming party can decide to send one byte at a time, hence the getByteStream method is called + // with an array size of 1. + BytesToEventStreamGenerator bytesToEventStreamGenerator = new (check response.getByteStream(1)); + stream eventStream = new (bytesToEventStreamGenerator); + return eventStream; +} + +isolated function validateEventStreamContentType(Response response) returns ClientError? { + string|HeaderNotFoundError contentType = response.getHeader(CONTENT_TYPE); + if contentType is HeaderNotFoundError || !contentType.startsWith(mime:TEXT_EVENT_STREAM) { + return error PayloadBindingClientError(string `invalid payload target type. The response is not of ${mime:TEXT_EVENT_STREAM} content type.`); } } diff --git a/ballerina/http_client_object.bal b/ballerina/http_client_object.bal index c8e6362d10..6a469ed299 100644 --- a/ballerina/http_client_object.bal +++ b/ballerina/http_client_object.bal @@ -23,7 +23,7 @@ public type ClientObject client object { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -36,7 +36,7 @@ public type ClientObject client object { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -49,7 +49,7 @@ public type ClientObject client object { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -62,7 +62,7 @@ public type ClientObject client object { # + message - An optional HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -82,7 +82,7 @@ public type ClientObject client object { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -93,7 +93,7 @@ public type ClientObject client object { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -106,7 +106,7 @@ public type ClientObject client object { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function post(string path, RequestMessage message, map? headers = (), @@ -119,7 +119,7 @@ public type ClientObject client object { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function put(string path, RequestMessage message, map? headers = (), @@ -132,7 +132,7 @@ public type ClientObject client object { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function patch(string path, RequestMessage message, map? headers = (), @@ -145,7 +145,7 @@ public type ClientObject client object { # + message - An optional HTTP outbound request message or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function delete(string path, RequestMessage message = (), map? headers = (), @@ -163,7 +163,7 @@ public type ClientObject client object { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function get( string path, map? headers = (), TargetType targetType = <>) @@ -173,7 +173,7 @@ public type ClientObject client object { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function options( string path, map? headers = (), TargetType targetType = <>) @@ -186,7 +186,7 @@ public type ClientObject client object { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function execute(string httpVerb, string path, RequestMessage message, @@ -197,7 +197,7 @@ public type ClientObject client object { # # + path - Request path # + request - An HTTP inbound request message - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function forward(string path, Request request, TargetType targetType = <>) diff --git a/ballerina/http_commons.bal b/ballerina/http_commons.bal index 7188f49af3..01a6f469f7 100644 --- a/ballerina/http_commons.bal +++ b/ballerina/http_commons.bal @@ -117,6 +117,8 @@ isolated function buildResponse(ResponseMessage message, string? resourceAccesso response.setBinaryPayload(message); } else if message is stream { response.setByteStream(message); + } else if message is stream { + response.setSseEventStream(message); } else if message is mime:Entity[] { response.setBodyParts(message); } else if message is anydata { diff --git a/ballerina/http_connection.bal b/ballerina/http_connection.bal index 16c150b300..af5060aa88 100644 --- a/ballerina/http_connection.bal +++ b/ballerina/http_connection.bal @@ -137,8 +137,9 @@ public isolated client class Caller { return nativeGetRemoteHostName(self); } - private isolated function returnResponse(anydata|StatusCodeResponse|Response message, string? returnMediaType, - HttpCacheConfig? cacheConfig, map? links) returns ListenerError? { + private isolated function returnResponse( + anydata|StatusCodeResponse|Response|stream|stream message, + string? returnMediaType, HttpCacheConfig? cacheConfig, map? links) returns ListenerError? { Response response = new; boolean setETag = cacheConfig is () ? false: cacheConfig.setETag; boolean cacheCompatibleType = false; @@ -168,6 +169,8 @@ public isolated client class Caller { if returnMediaType is string && !response.hasHeader(CONTENT_TYPE) { response.setHeader(CONTENT_TYPE, returnMediaType); } + } else if message is stream|stream { + response = createSseResponse(message); } else if message is anydata { setPayload(message, response, returnMediaType, setETag, links); if returnMediaType is string { @@ -252,6 +255,12 @@ isolated function createStatusCodeResponse(StatusCodeResponse message, string? r return response; } +isolated function createSseResponse(stream|stream eventStream) returns Response { + Response response = new; + response.setSseEventStream(eventStream); + return response; +} + isolated function retrieveMediaType(StatusCodeResponse resp, string? retrievedMediaType) returns string? { string? mediaType = resp?.mediaType; if mediaType is string { diff --git a/ballerina/http_response.bal b/ballerina/http_response.bal index fbc0d64fd0..df3273424b 100644 --- a/ballerina/http_response.bal +++ b/ballerina/http_response.bal @@ -42,7 +42,7 @@ public class Response { time:Utc receivedTime = [0, 0.0]; time:Utc requestTime = [0, 0.0]; private mime:Entity? entity = (); - + public isolated function init() { self.entity = self.createNewEntity(); } @@ -324,6 +324,13 @@ public class Response { } } + # Gets the response payload as a `stream` of SseEvent. + # + # + return - A SseEvent stream from which the `http:SseEvent` can be read or `http:ClientError` in case of errors + public isolated function getSseEventStream() returns stream|ClientError { + return getSseEventStream(self); + } + # Extracts body parts from the response. If the content type is not a composite media type, an error is returned. # # + return - The body parts as an array of entities or else an `http:ClientError` if there were any errors in @@ -468,6 +475,18 @@ public class Response { self.setEntityAndUpdateContentTypeHeader(entity); } + # Sets an `http:SseEvent` stream as the payload, along with the Content-Type and Cache-Control + # headers set to 'text/event-stream' and 'no-cache', respectively. + # + # + eventStream - SseEvent stream, which needs to be set to the response + public isolated function setSseEventStream(stream|stream eventStream) { + ResponseCacheControl cacheControl = new; + cacheControl.noCache = true; + self.cacheControl = cacheControl; + SseEventToByteStreamGenerator byteStreamGen = new(eventStream); + self.setByteStream(new (byteStreamGen), mime:TEXT_EVENT_STREAM); + } + # Sets the response payload. This method overrides any existing content-type by passing the content-type # as an optional parameter. If the content type parameter is not provided then the default value derived # from the payload will be used as content-type only when there are no existing content-type header. @@ -475,7 +494,7 @@ public class Response { # + payload - Payload can be of type `string`, `xml`, `json`, `byte[]`, `stream` # or `Entity[]` (i.e., a set of body parts). # + contentType - Content-type to be used with the payload. This is an optional parameter - public isolated function setPayload(string|xml|json|byte[]|mime:Entity[]|stream payload, + public isolated function setPayload(string|xml|json|byte[]|mime:Entity[]|stream|stream payload, string? contentType = ()) { if contentType is string { error? err = self.setContentType(contentType); @@ -496,6 +515,8 @@ public class Response { self.setByteStream(payload); } else if payload is mime:Entity[] { self.setBodyParts(payload); + } else if payload is stream { + self.setSseEventStream(payload); } else { panic error Error("invalid entity body type." + "expected one of the types: string|xml|json|byte[]|mime:Entity[]|stream"); diff --git a/ballerina/http_types.bal b/ballerina/http_types.bal index f4d95bab04..db54dc70cd 100644 --- a/ballerina/http_types.bal +++ b/ballerina/http_types.bal @@ -21,7 +21,8 @@ import ballerina/mime; public type RequestMessage anydata|Request|mime:Entity[]|stream; # The types of messages that are accepted by HTTP `listener` when sending out the outbound response. -public type ResponseMessage anydata|Response|mime:Entity[]|stream; +public type ResponseMessage anydata|Response|mime:Entity[]|stream|stream + |stream; # The HTTP service type. public type Service distinct service object { @@ -29,7 +30,7 @@ public type Service distinct service object { }; # The types of data values that are expected by the HTTP `client` to return after the data binding operation. -public type TargetType typedesc; +public type TargetType typedesc>; # Defines the HTTP operations related to circuit breaker, failover and load balancer. # @@ -137,6 +138,20 @@ public type HeaderValue record {| map params; |}; +# Represents a Server Sent Event emitted from a service. +public type SseEvent record {| + # Name of the event + string event?; + # Id of the event + string id?; + # Data part of the event + string data?; + # The reconnect time on failure in milliseconds. + int 'retry?; + # Comment added to the event + string comment?; +|}; + # Dummy types used in the compiler plugin -type ResourceReturnType Response|StatusCodeResponse|anydata|error; +type ResourceReturnType Response|StatusCodeResponse|anydata|stream|stream|error; type InterceptorResourceReturnType ResourceReturnType|NextService; diff --git a/ballerina/resiliency_failover_client.bal b/ballerina/resiliency_failover_client.bal index fdab8636cd..e1be447527 100644 --- a/ballerina/resiliency_failover_client.bal +++ b/ballerina/resiliency_failover_client.bal @@ -73,7 +73,7 @@ public client isolated class FailoverClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -89,7 +89,7 @@ public client isolated class FailoverClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function post(string path, RequestMessage message, map? headers = (), @@ -99,7 +99,7 @@ public client isolated class FailoverClient { } external; private isolated function processPost(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); var result = self.performFailoverAction(path, req, HTTP_POST); @@ -118,7 +118,7 @@ public client isolated class FailoverClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -134,7 +134,7 @@ public client isolated class FailoverClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function put(string path, RequestMessage message, map? headers = (), @@ -144,7 +144,7 @@ public client isolated class FailoverClient { } external; private isolated function processPut(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); var result = self.performFailoverAction(path, req, HTTP_PUT); @@ -163,7 +163,7 @@ public client isolated class FailoverClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -179,7 +179,7 @@ public client isolated class FailoverClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function patch(string path, RequestMessage message, map? headers = (), @@ -189,7 +189,7 @@ public client isolated class FailoverClient { } external; private isolated function processPatch(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); var result = self.performFailoverAction(path, req, HTTP_PATCH); @@ -208,7 +208,7 @@ public client isolated class FailoverClient { # + message - An optional HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -224,7 +224,7 @@ public client isolated class FailoverClient { # + message - An optional HTTP outbound request message or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function delete(string path, RequestMessage message = (), map? headers = (), @@ -234,7 +234,7 @@ public client isolated class FailoverClient { } external; private isolated function processDelete(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); var result = self.performFailoverAction(path, req, HTTP_DELETE); @@ -280,7 +280,7 @@ public client isolated class FailoverClient { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -294,7 +294,7 @@ public client isolated class FailoverClient { # # + path - Resource path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function get(string path, map? headers = (), TargetType targetType = <>) @@ -303,7 +303,7 @@ public client isolated class FailoverClient { } external; private isolated function processGet(string path, map? headers, TargetType targetType) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { Request req = buildRequestWithHeaders(headers); var result = self.performFailoverAction(path, req, HTTP_GET); if result is HttpFuture { @@ -319,7 +319,7 @@ public client isolated class FailoverClient { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -333,7 +333,7 @@ public client isolated class FailoverClient { # # + path - Resource path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function options(string path, map? headers = (), TargetType targetType = <>) @@ -342,7 +342,7 @@ public client isolated class FailoverClient { } external; private isolated function processOptions(string path, map? headers, TargetType targetType) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { Request req = buildRequestWithHeaders(headers); var result = self.performFailoverAction(path, req, HTTP_OPTIONS); if result is HttpFuture { @@ -361,7 +361,7 @@ public client isolated class FailoverClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function execute(string httpVerb, string path, RequestMessage message, @@ -372,7 +372,7 @@ public client isolated class FailoverClient { private isolated function processExecute(string httpVerb, string path, RequestMessage message, TargetType targetType, string? mediaType, map? headers) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); var result = self.performExecuteAction(path, req, httpVerb); @@ -389,7 +389,7 @@ public client isolated class FailoverClient { # # + path - Resource path # + request - An HTTP request - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function forward(string path, Request request, TargetType targetType = <>) @@ -398,7 +398,7 @@ public client isolated class FailoverClient { } external; private isolated function processForward(string path, Request request, TargetType targetType) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { var result = self.performFailoverAction(path, request, HTTP_FORWARD); if result is HttpFuture { return getInvalidTypeError(); diff --git a/ballerina/resiliency_load_balance_client.bal b/ballerina/resiliency_load_balance_client.bal index 1ced9bbb27..535a3cd618 100644 --- a/ballerina/resiliency_load_balance_client.bal +++ b/ballerina/resiliency_load_balance_client.bal @@ -66,7 +66,7 @@ public client isolated class LoadBalanceClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -82,7 +82,7 @@ public client isolated class LoadBalanceClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function post(string path, RequestMessage message, map? headers = (), @@ -92,7 +92,7 @@ public client isolated class LoadBalanceClient { } external; private isolated function processPost(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); var result = self.performLoadBalanceAction(path, req, HTTP_POST); @@ -105,7 +105,7 @@ public client isolated class LoadBalanceClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -121,7 +121,7 @@ public client isolated class LoadBalanceClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function put(string path, RequestMessage message, map? headers = (), @@ -131,7 +131,7 @@ public client isolated class LoadBalanceClient { } external; private isolated function processPut(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); var result = self.performLoadBalanceAction(path, req, HTTP_PUT); @@ -144,7 +144,7 @@ public client isolated class LoadBalanceClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -160,7 +160,7 @@ public client isolated class LoadBalanceClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function patch(string path, RequestMessage message, map? headers = (), @@ -170,7 +170,7 @@ public client isolated class LoadBalanceClient { } external; private isolated function processPatch(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); var result = self.performLoadBalanceAction(path, req, HTTP_PATCH); @@ -183,7 +183,7 @@ public client isolated class LoadBalanceClient { # + message - An optional HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -199,7 +199,7 @@ public client isolated class LoadBalanceClient { # + message - An optional HTTP outbound request message or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function delete(string path, RequestMessage message = (), map? headers = (), @@ -209,7 +209,7 @@ public client isolated class LoadBalanceClient { } external; private isolated function processDelete(string path, RequestMessage message, TargetType targetType, - string? mediaType, map? headers) returns Response|anydata|ClientError { + string? mediaType, map? headers) returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); var result = self.performLoadBalanceAction(path, req, HTTP_DELETE); @@ -242,7 +242,7 @@ public client isolated class LoadBalanceClient { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -256,7 +256,7 @@ public client isolated class LoadBalanceClient { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function get(string path, map? headers = (), TargetType targetType = <>) @@ -265,7 +265,7 @@ public client isolated class LoadBalanceClient { } external; private isolated function processGet(string path, map? headers, TargetType targetType) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { Request req = buildRequestWithHeaders(headers); var result = self.performLoadBalanceAction(path, req, HTTP_GET); return processResponse(result, targetType, self.requireValidation); @@ -275,7 +275,7 @@ public client isolated class LoadBalanceClient { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + params - The query parameters # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure @@ -289,7 +289,7 @@ public client isolated class LoadBalanceClient { # # + path - Request path # + headers - The entity headers - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function options(string path, map? headers = (), TargetType targetType = <>) @@ -298,7 +298,7 @@ public client isolated class LoadBalanceClient { } external; private isolated function processOptions(string path, map? headers, TargetType targetType) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { Request req = buildRequestWithHeaders(headers); var result = self.performLoadBalanceAction(path, req, HTTP_OPTIONS); return processResponse(result, targetType, self.requireValidation); @@ -311,7 +311,7 @@ public client isolated class LoadBalanceClient { # + message - An HTTP outbound request or any allowed payload # + headers - The entity headers # + mediaType - The MIME type header of the request entity - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function execute(string httpVerb, string path, RequestMessage message, @@ -322,7 +322,7 @@ public client isolated class LoadBalanceClient { private isolated function processExecute(string httpVerb, string path, RequestMessage message, TargetType targetType, string? mediaType, map? headers) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { Request req = check buildRequest(message, mediaType); populateOptions(req, mediaType, headers); var result = self.performLoadBalanceExecuteAction(path, req, httpVerb); @@ -333,7 +333,7 @@ public client isolated class LoadBalanceClient { # # + path - Resource path # + request - An HTTP request - # + targetType - HTTP response or `anydata`, which is expected to be returned after data binding + # + targetType - HTTP response, `anydata` or stream of HTTP SSE, which is expected to be returned after data binding # + return - The response or the payload (if the `targetType` is configured) or an `http:ClientError` if failed to # establish the communication with the upstream server or a data binding failure remote isolated function forward(string path, Request request, TargetType targetType = <>) @@ -342,7 +342,7 @@ public client isolated class LoadBalanceClient { } external; private isolated function processForward(string path, Request request, TargetType targetType) - returns Response|anydata|ClientError { + returns Response|stream|anydata|ClientError { var result = self.performLoadBalanceAction(path, request, HTTP_FORWARD); return processResponse(result, targetType, self.requireValidation); } diff --git a/ballerina/sse_to_bytes_stream_generator.bal b/ballerina/sse_to_bytes_stream_generator.bal new file mode 100644 index 0000000000..e9915bb4e5 --- /dev/null +++ b/ballerina/sse_to_bytes_stream_generator.bal @@ -0,0 +1,98 @@ +// Copyright (c) 2024 WSO2 LLC. (https://www.wso2.com). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/io; +import ballerina/log; + +const LINE_BREAK = "\n"; + +class SseEventToByteStreamGenerator { + private final stream|stream eventStream; + private boolean isClosed = false; + private boolean isErrorOccurred = false; + + isolated function init(stream eventStream) { + self.eventStream = eventStream; + } + + public isolated function next() returns record {|byte[] value;|}|io:Error? { + if self.isClosed || self. isErrorOccurred { + return; + } + do { + record {SseEvent value;}? event = check self.eventStream.next(); + if event is () { + return; + } + check validateSseEvent(event.value); + string eventText = getEventText(event.value); + return {value: eventText.toBytes()}; + } on fail error e { + self.isErrorOccurred = true; + log:printError("unable to obtain byte array", e); + SseEvent errorEvent = getErrorEvent(e); + string eventText = getEventText(errorEvent); + return {value: eventText.toBytes()}; + } + } + + public isolated function close() returns error? { + check self.eventStream.close(); + self.isClosed = true; + } +} + +isolated function validateSseEvent(SseEvent event) returns error? { + if !event.hasKey(EVENT) && !event.hasKey(ID) + && !event.hasKey(RETRY) && !event.hasKey("comment") + && !event.hasKey(DATA) { + return error("Invalid value provided as event: " + + "at least one field is expected to be present in the SseEvent record."); + } +} + +isolated function getEventText(SseEvent event) returns string { + string eventText = ""; + string? comment = event.comment; + if comment is string { + eventText += string `: ${comment}` + LINE_BREAK; + } + string? id = event.id; + if id is string { + eventText += string `${ID}: ${id}` + LINE_BREAK; + } + string? eventName = event.event; + if eventName is string { + eventText += string `${EVENT}: ${eventName}` + LINE_BREAK; + } + int? 'retry = event.'retry; + if 'retry is int { + eventText += string `${RETRY}: ${'retry.toString()}` + LINE_BREAK; + } + string? data = event.data; + if data is string { + string[] lines = re `\r\n|\n|\r`.split(data); + foreach string line in lines { + eventText += string `${DATA}: ${line}` + LINE_BREAK; + } + } + eventText += LINE_BREAK; + return eventText; +} + +isolated function getErrorEvent(error err) returns SseEvent { + return {event: "error", data: err.message()}; +} diff --git a/changelog.md b/changelog.md index fb7241eee4..fb690a10d9 100644 --- a/changelog.md +++ b/changelog.md @@ -8,7 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased] ### Added - +- [Add support for Server-Sent Events](https://github.com/ballerina-platform/ballerina-library/issues/6687) - [Introduce default status code response record](https://github.com/ballerina-platform/ballerina-library/issues/6491) - [Add connection eviction feature to handle connections that receive GO_AWAY from the client](https://github.com/ballerina-platform/ballerina-library/issues/6734) - [Enhanced the configurability of Ballerina access logging by introducing multiple configuration options.](https://github.com/ballerina-platform/ballerina-library/issues/6111) diff --git a/compiler-plugin-tests/src/test/java/io/ballerina/stdlib/http/compiler/CompilerPluginTest.java b/compiler-plugin-tests/src/test/java/io/ballerina/stdlib/http/compiler/CompilerPluginTest.java index 4aec08b487..8fe978f0eb 100644 --- a/compiler-plugin-tests/src/test/java/io/ballerina/stdlib/http/compiler/CompilerPluginTest.java +++ b/compiler-plugin-tests/src/test/java/io/ballerina/stdlib/http/compiler/CompilerPluginTest.java @@ -126,11 +126,14 @@ public void testInValidReturnTypes() { DiagnosticResult diagnosticResult = compilation.diagnosticResult(); Assert.assertEquals(diagnosticResult.errorCount(), 3); assertError(diagnosticResult, 0, "invalid resource method return type: expected " + - "'anydata|http:Response|http:StatusCodeResponse|error', but found 'error[]'", HTTP_102); + "'anydata|http:Response|http:StatusCodeResponse|stream|" + + "stream|error', but found 'error[]'", HTTP_102); assertError(diagnosticResult, 1, "invalid resource method return type: expected 'anydata|http:Response" + - "|http:StatusCodeResponse|error', but found 'map'", HTTP_102); + "|http:StatusCodeResponse|stream|stream|" + + "error', but found 'map'", HTTP_102); assertError(diagnosticResult, 2, "invalid resource method return type: expected " + - "'anydata|http:Response|http:StatusCodeResponse|error', but found 'readonly & error[]'", HTTP_102); + "'anydata|http:Response|http:StatusCodeResponse|stream|" + + "stream|error', but found 'readonly & error[]'", HTTP_102); } @Test @@ -629,23 +632,32 @@ public void testInvalidUnionTypesAsReturnType() { DiagnosticResult diagnosticResult = compilation.diagnosticResult(); Assert.assertEquals(diagnosticResult.errorCount(), 9); assertTrue(diagnosticResult, 0, "invalid resource method return type: expected " + - "'anydata|http:Response|http:StatusCodeResponse|error', but found 'TestRecord1[]'", HTTP_102); + "'anydata|http:Response|http:StatusCodeResponse|stream|" + + "stream|error', but found 'TestRecord1[]'", HTTP_102); assertTrue(diagnosticResult, 1, "invalid resource method return type: expected " + - "'anydata|http:Response|http:StatusCodeResponse|error', but found 'TestRecord2[]'", HTTP_102); + "'anydata|http:Response|http:StatusCodeResponse|stream|" + + "stream|error', but found 'TestRecord2[]'", HTTP_102); assertTrue(diagnosticResult, 2, "invalid resource method return type: expected " + - "'anydata|http:Response|http:StatusCodeResponse|error', but found 'TestRecord3[]'", HTTP_102); + "'anydata|http:Response|http:StatusCodeResponse|stream|" + + "stream|error', but found 'TestRecord3[]'", HTTP_102); assertTrue(diagnosticResult, 3, "invalid resource method return type: expected " + - "'anydata|http:Response|http:StatusCodeResponse|error', but found 'TestRecord4[]'", HTTP_102); + "'anydata|http:Response|http:StatusCodeResponse|stream|" + + "stream|error', but found 'TestRecord4[]'", HTTP_102); assertTrue(diagnosticResult, 4, "invalid resource method return type: expected " + - "'anydata|http:Response|http:StatusCodeResponse|error', but found 'TestRecord5[]'", HTTP_102); + "'anydata|http:Response|http:StatusCodeResponse|stream" + + "|stream|error', but found 'TestRecord5[]'", HTTP_102); assertTrue(diagnosticResult, 5, "invalid resource method return type: expected " + - "'anydata|http:Response|http:StatusCodeResponse|error', but found 'TestRecord6[]'", HTTP_102); + "'anydata|http:Response|http:StatusCodeResponse|stream|" + + "stream|error', but found 'TestRecord6[]'", HTTP_102); assertTrue(diagnosticResult, 6, "invalid resource method return type: expected " + - "'anydata|http:Response|http:StatusCodeResponse|error', but found 'TestRecord7[]'", HTTP_102); + "'anydata|http:Response|http:StatusCodeResponse|stream|" + + "stream|error', but found 'TestRecord7[]'", HTTP_102); assertTrue(diagnosticResult, 7, "invalid resource method return type: expected 'anydata|" + - "http:Response|http:StatusCodeResponse|error', but found 'http:StatusCodeResponse[]'", HTTP_102); + "http:Response|http:StatusCodeResponse|stream|" + + "stream|error', but found 'http:StatusCodeResponse[]'", HTTP_102); assertTrue(diagnosticResult, 8, "invalid resource method return type: expected " + - "'anydata|http:Response|http:StatusCodeResponse|error', but found 'error[]'", HTTP_102); + "'anydata|http:Response|http:StatusCodeResponse|stream|" + + "stream|error', but found 'error[]'", HTTP_102); } @Test diff --git a/compiler-plugin/src/main/java/io/ballerina/stdlib/http/compiler/Constants.java b/compiler-plugin/src/main/java/io/ballerina/stdlib/http/compiler/Constants.java index 17f43a34ee..52f607f7e7 100644 --- a/compiler-plugin/src/main/java/io/ballerina/stdlib/http/compiler/Constants.java +++ b/compiler-plugin/src/main/java/io/ballerina/stdlib/http/compiler/Constants.java @@ -84,7 +84,8 @@ private Constants() {} public static final String CALLER_ANNOTATION_NAME = "CallerInfo"; public static final String FIELD_RESPONSE_TYPE = "respondType"; public static final String RESPOND_METHOD_NAME = "respond"; - public static final String ALLOWED_RETURN_UNION = "anydata|http:Response|http:StatusCodeResponse|error"; + public static final String ALLOWED_RETURN_UNION = "anydata|http:Response|http:StatusCodeResponse|" + + "stream|stream|error"; public static final String REQUEST_INTERCEPTOR = "RequestInterceptor"; public static final String RESPONSE_INTERCEPTOR = "ResponseInterceptor"; public static final String REQUEST_ERROR_INTERCEPTOR = "RequestErrorInterceptor"; diff --git a/docs/spec/spec.md b/docs/spec/spec.md index 33503cb356..8a4ad81bf6 100644 --- a/docs/spec/spec.md +++ b/docs/spec/spec.md @@ -827,13 +827,13 @@ service /headerparamservice on HeaderBindingIdealEP { #### 2.3.5. Return types -The resource method supports anydata, error?, http:Response and http:StatusCodeResponse as return types. +The resource method supports `anydata`, `error?`, `http:Response`, `http:StatusCodeResponse` and `stream` as return types. Whenever user returns a particular output, that will result in an HTTP response to the caller who initiated the call. Therefore, user does not necessarily depend on the `http:Caller` and its remote methods to proceed with the response. ```ballerina -resource function XXX NAME_TEMPLATE () returns @http:Payload anydata|http:Response|http:StatusCodeResponse|http:Error? { +resource function XXX NAME_TEMPLATE () returns @http:Payload anydata|http:Response|http:StatusCodeResponse|stream|http:Error? { } ``` @@ -857,6 +857,7 @@ Based on the return types respective header value is added as the `Content-type` | int, float, decimal, boolean | application/json | | map\, table>, map\[], table>)[] | application/json | | http:StatusCodeResponse | derived from the body field | +| stream | text/event-stream | ##### 2.3.5.1. Status Code Response @@ -933,7 +934,19 @@ Return nil from the resource has few meanings. } ``` -##### 2.3.5.3. Default response status codes +##### 2.3.5.3. Return stream + +When an `http:SseEvent` stream is returned from the service, it's considered a server-sent event. By default, the service will add the following headers: + +- For HTTP 2.0: + - `Content-Type: text/event-stream` + - `Cache-Control: no-cache` + +- For HTTP 1.1, in addition to the previously mentioned headers, the following headers will also be included in the response: + - `Transfer-Encoding: chunked` + - `Connection: keep-alive` + +##### 2.3.5.4. Default response status codes To improve the developer experience for RESTful API development, following default status codes will be used in outbound response when returning `anydata` directly from a resource method. @@ -1494,50 +1507,52 @@ infer the expected payload type from the LHS variable type. This is called as cl inbound response payload is accessed and parse to the expected type in the method signature. It is easy to access the payload directly rather manipulation `http:Response` using its support methods such as `getTextPayload()`, ..etc. -Client data binding supports `anydata` where the payload is deserialized based on the media type before binding it +Client data binding supports `anydata` and `stream` where the payload is deserialized based on the media type before binding it to the required type. Similar to the service data binding following table explains the compatible `anydata` types with each common media type. In the absence of a standard media type, the binding type is inferred by the payload parameter type itself. If the type is not compatible with the media type, error is returned. -| Ballerina Type | Structure | "text" | "xml" | "json" | "x-www-form-urlencoded" | "octet-stream" | -|----------------|-------------------------|:------:|:-----:|:------:|:-----------------------:|:--------------:| -| boolean | | ❌ | ❌ | ✅ | ❌ | ❌ | -| | boolean[] | ❌ | ❌ | ✅ | ❌ | ❌ | -| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | -| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | -| int | | ❌ | ❌ | ✅ | ❌ | ❌ | -| | int[] | ❌ | ❌ | ✅ | ❌ | ❌ | -| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | -| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | -| float | | ❌ | ❌ | ✅ | ❌ | ❌ | -| | float[] | ❌ | ❌ | ✅ | ❌ | ❌ | -| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | -| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | -| decimal | | ❌ | ❌ | ✅ | ❌ | ❌ | -| | decimal[] | ❌ | ❌ | ✅ | ❌ | ❌ | -| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | -| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | -| byte[] | | ✅ | ❌ | ✅ | ❌ | ✅ | -| | byte[][] | ❌ | ❌ | ✅ | ❌ | ❌ | -| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | -| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | -| string | | ✅ | ❌ | ✅ | ✅ | ❌ | -| | string[] | ❌ | ❌ | ✅ | ❌ | ❌ | -| | map\ | ❌ | ❌ | ✅ | ✅ | ❌ | -| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | -| xml | | ❌ | ✅ | ❌ | ❌ | ❌ | -| json | | ❌ | ❌ | ✅ | ❌ | ❌ | -| | json[] | ❌ | ❌ | ✅ | ❌ | ❌ | -| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | -| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | -| map | | ❌ | ❌ | ✅ | ❌ | ❌ | -| | map[] | ❌ | ❌ | ✅ | ❌ | ❌ | -| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | -| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | -| record | | ❌ | ❌ | ✅ | ❌ | ❌ | -| | record[] | ❌ | ❌ | ✅ | ❌ | ❌ | -| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | -| | table\ | ❌ | ❌ | ✅ | ❌ | ❌ | +| Ballerina Type | Structure | "text" | "xml" | "json" | "x-www-form-urlencoded" | "octet-stream" | "event-stream" | +|----------------|-------------------------|:------:|:-----:|:------:|:-----------------------:|:--------------:|:--------------: +| boolean | | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | boolean[] | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| int | | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | int[] | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| float | | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | float[] | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| decimal | | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | decimal[] | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| byte[] | | ✅ | ❌ | ✅ | ❌ | ✅ | ❌ +| | byte[][] | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| string | | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | +| | string[] | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | map\ | ❌ | ❌ | ✅ | ✅ | ❌ | ❌ | +| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| xml | | ❌ | ✅ | ❌ | ❌ | ❌ | ❌ | +| json | | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | json[] | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| map | | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | map[] | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | table\\> | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| record | | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | record[] | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | map\ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| | table\ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| stream | | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | +| |stream| ❌ | ❌ | ❌ | ❌ | ❌ | ✅ | ```ballerina http:Client httpClient = check new ("https://person.free.beeceptor.com"); diff --git a/gradle.properties b/gradle.properties index fc66f4cb0b..13e4e2d79c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ org.gradle.caching=true group=io.ballerina.stdlib -version=2.11.3-SNAPSHOT +version=2.12.0-SNAPSHOT ballerinaLangVersion=2201.9.0 ballerinaTomlParserVersion=1.2.2 commonsLang3Version=3.12.0 @@ -32,14 +32,14 @@ stdlibConstraintVersion=1.5.0 stdlibOsVersion=1.8.0 stdlibTaskVersion=2.5.0 stdlibLogVersion=2.9.0 -stdlibCryptoVersion=2.7.0 +stdlibCryptoVersion=2.7.2 stdlibFileVersion=1.9.0 -stdlibMimeVersion=2.9.0 +stdlibMimeVersion=2.10.0-20240724-124600-f193322 stdlibCacheVersion=3.8.0 stdlibAuthVersion=2.11.0 -stdlibJwtVersion=2.11.0 +stdlibJwtVersion=2.12.1 stdlibOAuth2Version=2.11.0 observeVersion=1.2.3 diff --git a/native/src/main/java/io/ballerina/stdlib/http/api/HttpUtil.java b/native/src/main/java/io/ballerina/stdlib/http/api/HttpUtil.java index 27c39a8fb4..682fb3c47a 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/api/HttpUtil.java +++ b/native/src/main/java/io/ballerina/stdlib/http/api/HttpUtil.java @@ -85,6 +85,7 @@ import io.ballerina.stdlib.mime.util.EntityHeaderHandler; import io.ballerina.stdlib.mime.util.EntityWrapper; import io.ballerina.stdlib.mime.util.HeaderUtil; +import io.ballerina.stdlib.mime.util.MimeConstants; import io.ballerina.stdlib.mime.util.MimeUtil; import io.ballerina.stdlib.mime.util.MultipartDataSource; import io.ballerina.stdlib.mime.util.MultipartDecoder; @@ -401,7 +402,11 @@ public static void prepareOutboundResponse(BObject connectionObj, HttpCarbonMess Service httpService = (Service) connectionObj.getNativeData(HttpConstants.HTTP_SERVICE); if (httpService != null) { HttpUtil.setCompressionHeaders(httpService.getCompressionConfig(), inboundRequestMsg, outboundResponseMsg); - HttpUtil.setChunkingHeader(httpService.getChunkingConfig(), outboundResponseMsg); + if (HttpUtil.hasEventStreamContentType(outboundResponseMsg)) { + HttpUtil.setChunkingHeader(HttpConstants.ALWAYS, outboundResponseMsg); + } else { + HttpUtil.setChunkingHeader(httpService.getChunkingConfig(), outboundResponseMsg); + } if (httpService.getMediaTypeSubtypePrefix() != null) { HttpUtil.setMediaTypeSubtypePrefix(httpService.getMediaTypeSubtypePrefix(), outboundResponseMsg); } @@ -2017,6 +2022,11 @@ public static boolean isHttpStatusCodeResponseTypeWithBody(Type type) { return false; } + public static boolean hasEventStreamContentType(HttpCarbonMessage message) { + String contentType = HttpUtil.getContentTypeFromTransportMessage(message); + return contentType != null && contentType.startsWith(MimeConstants.TEXT_EVENT_STREAM); + } + private HttpUtil() { } } diff --git a/native/src/main/java/io/ballerina/stdlib/http/api/nativeimpl/connection/ResponseWriter.java b/native/src/main/java/io/ballerina/stdlib/http/api/nativeimpl/connection/ResponseWriter.java index 081420c24f..afd77c605d 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/api/nativeimpl/connection/ResponseWriter.java +++ b/native/src/main/java/io/ballerina/stdlib/http/api/nativeimpl/connection/ResponseWriter.java @@ -126,6 +126,9 @@ static void serializeDataSource(Environment env, Object outboundMessageSource, B if (outboundMessageSource != null) { HttpUtil.serializeDataSource(outboundMessageSource, entity, messageOutputStream); HttpUtil.closeMessageOutputStream(messageOutputStream); + } else if (EntityBodyHandler.getEventStream(entity) != null) { + //When the entity body is a byte stream of server sent events and it is not null + EntityBodyHandler.writeEventStreamToOutputStream(env, entity, messageOutputStream); } else if (EntityBodyHandler.getByteStream(entity) != null) { //When the entity body is a byte stream and when it is not null EntityBodyHandler.writeByteStreamToOutputStream(env, entity, messageOutputStream); diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/common/Util.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/common/Util.java index f85bf648f2..8df9d9ada1 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/common/Util.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/common/Util.java @@ -15,6 +15,7 @@ package io.ballerina.stdlib.http.transport.contractimpl.common; +import io.ballerina.stdlib.http.api.HttpUtil; import io.ballerina.stdlib.http.transport.contract.Constants; import io.ballerina.stdlib.http.transport.contract.HttpResponseFuture; import io.ballerina.stdlib.http.transport.contract.config.ChunkConfig; @@ -197,6 +198,9 @@ private static void setOutboundRespHeaders(HttpCarbonMessage outboundResponseMsg outboundResponseMsg.setHeader(HttpHeaderNames.CONNECTION.toString(), Constants.CONNECTION_CLOSE); } else if (keepAlive && (Float.valueOf(inboundReqHttpVersion) < Constants.HTTP_1_1)) { outboundResponseMsg.setHeader(HttpHeaderNames.CONNECTION.toString(), Constants.CONNECTION_KEEP_ALIVE); + } else if (Float.valueOf(inboundReqHttpVersion) == Constants.HTTP_1_1 + && HttpUtil.hasEventStreamContentType(outboundResponseMsg)) { + outboundResponseMsg.setHeader(HttpHeaderNames.CONNECTION.toString(), Constants.CONNECTION_KEEP_ALIVE); } else { outboundResponseMsg.removeHeader(HttpHeaderNames.CONNECTION.toString()); } diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/listener/states/SendingHeaders.java b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/listener/states/SendingHeaders.java index 047cca9212..fd173dd92d 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/listener/states/SendingHeaders.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/listener/states/SendingHeaders.java @@ -18,6 +18,7 @@ package io.ballerina.stdlib.http.transport.contractimpl.listener.states; +import io.ballerina.stdlib.http.api.HttpUtil; import io.ballerina.stdlib.http.transport.contract.HttpResponseFuture; import io.ballerina.stdlib.http.transport.contract.ServerConnectorFuture; import io.ballerina.stdlib.http.transport.contract.config.ChunkConfig; @@ -104,7 +105,7 @@ public void writeOutboundResponseHeaders(HttpCarbonMessage outboundResponseMsg, } outboundRespStatusFuture = outboundResponseListener.getInboundRequestMsg().getHttpOutboundRespStatusFuture(); String httpVersion = outboundResponseListener.getRequestDataHolder().getHttpVersion(); - + keepAlive = HttpUtil.hasEventStreamContentType(outboundResponseMsg) || keepAlive; if (isLastHttpContent(httpContent)) { if (chunkConfig == ChunkConfig.ALWAYS && checkChunkingCompatibility(httpVersion, chunkConfig)) { writeHeaders(outboundResponseMsg, keepAlive, outboundRespStatusFuture); diff --git a/native/src/main/java/io/ballerina/stdlib/http/transport/message/HttpMessageDataStreamer.java b/native/src/main/java/io/ballerina/stdlib/http/transport/message/HttpMessageDataStreamer.java index 6ea0b3e0e3..5420e63a6d 100644 --- a/native/src/main/java/io/ballerina/stdlib/http/transport/message/HttpMessageDataStreamer.java +++ b/native/src/main/java/io/ballerina/stdlib/http/transport/message/HttpMessageDataStreamer.java @@ -18,6 +18,7 @@ package io.ballerina.stdlib.http.transport.message; +import io.ballerina.stdlib.http.api.HttpUtil; import io.ballerina.stdlib.http.transport.contract.Constants; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -39,6 +40,8 @@ import java.util.zip.GZIPInputStream; import java.util.zip.InflaterInputStream; +import static io.netty.util.internal.StringUtil.LINE_FEED; + /** * Provides input and output stream by taking the HttpCarbonMessage. */ @@ -174,19 +177,78 @@ public void close() { byteBufferOutputStream = null; } } + } - private ByteBuf getBuffer() { - if (pooledByteBufAllocator == null) { - return Unpooled.buffer(CONTENT_BUFFER_SIZE); + /** + * A class which write event-stream into ByteBuffers and add those + * ByteBuffers to Content Queue. + * No need to worry about thread safety of this class this is called only once by + * one thread at particular time. + */ + protected class EventBufferOutputStream extends ByteBufferOutputStream { + private ByteBuf dataHolder = getBuffer(); + private int lastByte = 0; + + @Override + public void write(int b) { + // Buffer the data until a double line feed (indicating the end of an event) + // or until CONTENT_BUFFER_SIZE is reached. + if (hasDoubleLineFeed(b)) { + try { + writeByte(b); + httpCarbonMessage.addHttpContent(new DefaultHttpContent(this.dataHolder)); + this.dataHolder = getBuffer(); + } catch (RuntimeException ex) { + throw new EncoderException(httpCarbonMessage.getIoException()); + } + } else if (this.dataHolder.writableBytes() != 0) { + writeByte(b); } else { - return pooledByteBufAllocator.directBuffer(CONTENT_BUFFER_SIZE); + try { + httpCarbonMessage.addHttpContent(new DefaultHttpContent(this.dataHolder)); + this.dataHolder = getBuffer(); + writeByte(b); + } catch (RuntimeException ex) { + throw new EncoderException(httpCarbonMessage.getIoException()); + } } } + + private void writeByte(int b) { + this.dataHolder.writeByte((byte) b); + this.lastByte = b; + } + + private boolean hasDoubleLineFeed(int currentByte) { + return currentByte == this.lastByte && this.lastByte == (int) LINE_FEED; + } + } + + private ByteBuf getBuffer() { + if (pooledByteBufAllocator == null) { + return Unpooled.buffer(CONTENT_BUFFER_SIZE); + } else { + return pooledByteBufAllocator.directBuffer(CONTENT_BUFFER_SIZE); + } } public OutputStream getOutputStream() { + if (HttpUtil.hasEventStreamContentType(httpCarbonMessage)) { + return getEventBufferOutputStream(); + } + return getByteBufferOutputStream(); + } + + private OutputStream getByteBufferOutputStream() { + if (byteBufferOutputStream == null) { + byteBufferOutputStream = new ByteBufferOutputStream(); + } + return byteBufferOutputStream; + } + + private OutputStream getEventBufferOutputStream() { if (byteBufferOutputStream == null) { - byteBufferOutputStream = new HttpMessageDataStreamer.ByteBufferOutputStream(); + byteBufferOutputStream = new EventBufferOutputStream(); } return byteBufferOutputStream; } diff --git a/native/src/test/java/io/ballerina/stdlib/http/transport/message/HttpMessageDataStreamerTest.java b/native/src/test/java/io/ballerina/stdlib/http/transport/message/HttpMessageDataStreamerTest.java index 0ba27b21b9..8ca696b010 100644 --- a/native/src/test/java/io/ballerina/stdlib/http/transport/message/HttpMessageDataStreamerTest.java +++ b/native/src/test/java/io/ballerina/stdlib/http/transport/message/HttpMessageDataStreamerTest.java @@ -19,13 +19,21 @@ package io.ballerina.stdlib.http.transport.message; import io.ballerina.stdlib.http.transport.util.client.http2.MessageGenerator; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; import org.junit.Assert; import org.testng.annotations.Test; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.zip.InflaterInputStream; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; + /** * A unit test class for Transport module HttpMessageDataStreamer class functions. */ @@ -54,7 +62,44 @@ public void testGetInputStreamWithContentEncoding() { httpMessageDataStreamer = new HttpMessageDataStreamer(httpCarbonMessage); Assert.assertNotNull(httpMessageDataStreamer.getInputStream()); Assert.assertNull(httpCarbonMessage.getHeader(HttpHeaderNames.CONTENT_ENCODING.toString())); + } + @Test + public void testEventStreamChunking() throws IOException { + HttpCarbonMessage httpResponse = new HttpCarbonResponse(new DefaultHttpResponse(HttpVersion.HTTP_1_1, OK)); + httpResponse.setHeader("Content-Type", "text/event-stream"); + HttpMessageDataStreamer httpMessageDataStreamer = new HttpMessageDataStreamer(httpResponse); + OutputStream outputStream = httpMessageDataStreamer.getOutputStream(); + writeDummyEvent(outputStream); + EntityCollector entityCollector = httpResponse.getBlockingEntityCollector(); + HttpContent content = entityCollector.getHttpContent(); + int currentChunkCount = 0; + while (!(content instanceof LastHttpContent)) { + currentChunkCount++; + content = entityCollector.getHttpContent(); + } + Assert.assertEquals(currentChunkCount, 4); } + // This method writes a server-sent event payload to the output stream + private static void writeDummyEvent(OutputStream outputStream) throws IOException { + final int maxChunkSize = 8192; + final int payloadSize = maxChunkSize * 4 - 10; // Reduced by few bytes to ensure chunking + // happens if two newlines are found + final String dataPrefix = "data: "; + final byte[] dataBytes = dataPrefix.getBytes(); + + // Write the data prefix to the output stream + outputStream.write(dataBytes); + for (int i = dataBytes.length; i < payloadSize; i++) { + outputStream.write('A'); + } + + // Write two newline characters to indicate the end of the event + outputStream.write('\n'); + outputStream.write('\n'); + + // Close the output stream + outputStream.close(); + } }