diff --git a/README.md b/README.md index 24d5ec04..63e1fd10 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ This repository only contains the source code for the package. ### Setting up the prerequisites -1. Download and install Java SE Development Kit (JDK) version 17 (from one of the following locations). +1. Download and install Java SE Development Kit (JDK) version 21 (from one of the following locations). * [Oracle](https://www.oracle.com/java/technologies/downloads/) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index b9ee8005..94f332a6 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -9,44 +9,44 @@ icon = "icon.png" license = ["Apache-2.0"] distribution = "2201.10.0" -[platform.java17] +[platform.java21] graalvmCompatible = true -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "udp-native" version = "1.11.1" path = "../native/build/libs/udp-native-1.11.1-SNAPSHOT.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] path = "../test-utils/build/libs/udp-test-utils-1.11.1-SNAPSHOT.jar" scope = "testOnly" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.netty" artifactId = "netty-handler" version = "4.1.115.Final" path = "./lib/netty-handler-4.1.115.Final.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.netty" artifactId = "netty-buffer" version = "4.1.115.Final" path = "./lib/netty-buffer-4.1.115.Final.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.netty" artifactId = "netty-transport" version = "4.1.115.Final" path = "./lib/netty-transport-4.1.115.Final.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.netty" artifactId = "netty-common" version = "4.1.115.Final" path = "./lib/netty-common-4.1.115.Final.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.netty" artifactId = "netty-resolver" version = "4.1.115.Final" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 4d13fef4..de7bf99a 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -5,12 +5,12 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.10.0" +distribution-version = "2201.11.0-20241112-214900-6b80ab87" [[package]] org = "ballerina" name = "io" -version = "1.6.1" +version = "1.6.2" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"}, @@ -75,7 +75,7 @@ dependencies = [ [[package]] org = "ballerina" name = "log" -version = "2.10.0" +version = "2.10.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, @@ -90,7 +90,7 @@ modules = [ [[package]] org = "ballerina" name = "observe" -version = "1.3.0" +version = "1.4.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"} diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index b2bcadcb..13e7bb98 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -9,44 +9,44 @@ icon = "icon.png" license = ["Apache-2.0"] distribution = "2201.10.0" -[platform.java17] +[platform.java21] graalvmCompatible = true -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "udp-native" version = "@toml.version@" path = "../native/build/libs/udp-native-@project.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] path = "../test-utils/build/libs/udp-test-utils-@project.version@.jar" scope = "testOnly" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.netty" artifactId = "netty-handler" version = "@netty.version@" path = "./lib/netty-handler-@netty.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.netty" artifactId = "netty-buffer" version = "@netty.version@" path = "./lib/netty-buffer-@netty.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.netty" artifactId = "netty-transport" version = "@netty.version@" path = "./lib/netty-transport-@netty.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.netty" artifactId = "netty-common" version = "@netty.version@" path = "./lib/netty-common-@netty.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.netty" artifactId = "netty-resolver" version = "@netty.version@" diff --git a/build-config/spotbugs-exclude.xml b/build-config/spotbugs-exclude.xml index 1603f89a..06f8da75 100644 --- a/build-config/spotbugs-exclude.xml +++ b/build-config/spotbugs-exclude.xml @@ -24,13 +24,12 @@ - - - - - + + + + diff --git a/compiler-plugin-tests/build.gradle b/compiler-plugin-tests/build.gradle index 4c687424..e52c92a1 100644 --- a/compiler-plugin-tests/build.gradle +++ b/compiler-plugin-tests/build.gradle @@ -52,8 +52,11 @@ checkstyle { checkstyleTest.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsTest { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/compiler-plugin/build.gradle b/compiler-plugin/build.gradle index d85207b5..bcd2a22d 100644 --- a/compiler-plugin/build.gradle +++ b/compiler-plugin/build.gradle @@ -48,8 +48,11 @@ checkstyle { checkstyleMain.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsMain { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/gradle.properties b/gradle.properties index de57f474..eea1a1ef 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,12 +1,12 @@ org.gradle.caching=true group=io.ballerina.stdlib version=1.11.1-SNAPSHOT -ballerinaLangVersion=2201.10.0 +ballerinaLangVersion=2201.11.0-20241112-214900-6b80ab87 puppycrawlCheckstyleVersion=10.12.0 githubJohnrengelmanShadowVersion=8.1.1 checkstyleToolVersion=7.8.2 -githubSpotbugsVersion=5.0.14 +githubSpotbugsVersion=6.0.18 testngVersion=7.6.1 nettyVersion=4.1.115.Final underCouchDownloadVersion=5.4.0 @@ -16,7 +16,7 @@ ballerinaGradlePluginVersion=2.0.1 gsonVersion=2.8.8 # Dependencies -stdlibIoVersion=1.6.0 -stdlibLogVersion=2.10.0 -observeVersion=1.3.0 -observeInternalVersion=1.3.0 +stdlibIoVersion=1.6.2-20241112-233100-995cf5f +stdlibLogVersion=2.10.1-20241113-120000-4577868 +observeVersion=1.4.0-20241113-092000-b83ae74 +observeInternalVersion=1.3.1-20241113-101700-265054d diff --git a/native/build.gradle b/native/build.gradle index c2235fee..c7cb303e 100644 --- a/native/build.gradle +++ b/native/build.gradle @@ -61,8 +61,11 @@ compileJava { } spotbugsMain { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/native/src/main/java/io/ballerina/stdlib/udp/Dispatcher.java b/native/src/main/java/io/ballerina/stdlib/udp/Dispatcher.java index d1efe5ae..6a96dab2 100644 --- a/native/src/main/java/io/ballerina/stdlib/udp/Dispatcher.java +++ b/native/src/main/java/io/ballerina/stdlib/udp/Dispatcher.java @@ -19,21 +19,28 @@ package io.ballerina.stdlib.udp; import io.ballerina.runtime.api.Runtime; -import io.ballerina.runtime.api.TypeTags; -import io.ballerina.runtime.api.async.StrandMetadata; +import io.ballerina.runtime.api.concurrent.StrandMetadata; +import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.MethodType; import io.ballerina.runtime.api.types.ObjectType; +import io.ballerina.runtime.api.types.Parameter; import io.ballerina.runtime.api.types.Type; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; +import io.ballerina.runtime.api.values.BArray; import io.ballerina.runtime.api.values.BError; +import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BString; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.socket.DatagramPacket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; import java.util.Arrays; /** @@ -49,8 +56,7 @@ private static void invokeOnBytes(UdpService udpService, DatagramPacket datagram Type[] parameterTypes) { try { Object[] params = getOnBytesSignature(datagramPacket, channel, parameterTypes); - invokeAsyncCall(udpService.getService(), Constants.ON_BYTES, udpService.getRuntime(), - new UdpCallback(udpService, channel, datagramPacket), params); + invokeAsyncCall(udpService, datagramPacket, channel, Constants.ON_BYTES, params); } catch (BError e) { Dispatcher.invokeOnError(udpService, e.getMessage()); } @@ -60,8 +66,7 @@ private static void invokeOnDatagram(UdpService udpService, DatagramPacket datag Type[] parameterTypes) { try { Object[] params = getOnDatagramSignature(datagramPacket, channel, parameterTypes); - invokeAsyncCall(udpService.getService(), Constants.ON_DATAGRAM, udpService.getRuntime(), - new UdpCallback(udpService, channel, datagramPacket), params); + invokeAsyncCall(udpService, datagramPacket, channel, Constants.ON_DATAGRAM, params); } catch (BError e) { Dispatcher.invokeOnError(udpService, e.getMessage()); } @@ -75,44 +80,47 @@ public static void invokeOnError(UdpService udpService, String message) { filter(m -> m.getName().equals(Constants.ON_ERROR)).findFirst().orElse(null); if (methodType != null) { Object[] params = getOnErrorSignature(message); - invokeAsyncCall(udpService.getService(), Constants.ON_ERROR, udpService.getRuntime(), - new UdpCallback(udpService), params); + invokeAsyncCall(udpService, null, null, Constants.ON_ERROR, params); } } catch (Throwable t) { log.error("Error while executing onError function", t); } } - private static void invokeAsyncCall(BObject service, String methodName, Runtime runtime, UdpCallback callback, - Object[] params) { - StrandMetadata metadata = new StrandMetadata(Utils.getModule().getOrg(), Utils.getModule().getName(), - Utils.getModule().getVersion(), methodName); - ObjectType objectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(service)); - if (objectType.isIsolated() && objectType.isIsolated(methodName)) { - runtime.invokeMethodAsyncConcurrently(service, methodName, - null, metadata, callback, null, null, params); - } else { - runtime.invokeMethodAsyncSequentially(service, methodName, - null, metadata, callback, null, null, params); - } + private static void invokeAsyncCall(UdpService udpService, DatagramPacket datagramPacket, Channel channel, + String methodName, Object[] params) { + Thread.startVirtualThread(() -> { + BObject service = udpService.getService(); + Runtime runtime = udpService.getRuntime(); + ObjectType objectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(service)); + StrandMetadata metadata = new StrandMetadata( + objectType.isIsolated() && objectType.isIsolated(methodName), null); + Object result; + try { + result = runtime.callMethod(service, methodName, metadata, params); + handleResult(udpService, datagramPacket, channel, result); + } catch (BError error) { + handleError(error); + } catch (Throwable throwable) { + handleError(ErrorCreator.createError(throwable)); + } + }); } private static Object[] getOnBytesSignature(DatagramPacket datagramPacket, Channel channel, Type[] parameterTypes) { byte[] byteContent = new byte[datagramPacket.content().readableBytes()]; datagramPacket.content().readBytes(byteContent); - Object[] bValues = new Object[parameterTypes.length * 2]; + Object[] bValues = new Object[parameterTypes.length]; int index = 0; for (Type param : parameterTypes) { int paramTag = param.getTag(); switch (paramTag) { case TypeTags.INTERSECTION_TAG: bValues[index++] = ValueCreator.createReadonlyArrayValue(byteContent); - bValues[index++] = true; break; case TypeTags.OBJECT_TYPE_TAG: bValues[index++] = createClient(datagramPacket, channel); - bValues[index++] = true; break; default: break; @@ -123,18 +131,16 @@ private static Object[] getOnBytesSignature(DatagramPacket datagramPacket, Chann private static Object[] getOnDatagramSignature(DatagramPacket datagramPacket, Channel channel, Type[] parameterTypes) { - Object[] bValues = new Object[parameterTypes.length * 2]; + Object[] bValues = new Object[parameterTypes.length]; int index = 0; for (Type param : parameterTypes) { int paramTag = param.getTag(); switch (paramTag) { case TypeTags.INTERSECTION_TAG: bValues[index++] = Utils.createReadOnlyDatagramWithSenderAddress(datagramPacket); - bValues[index++] = true; break; case TypeTags.OBJECT_TYPE_TAG: bValues[index++] = createClient(datagramPacket, channel); - bValues[index++] = true; break; default: break; @@ -144,7 +150,7 @@ private static Object[] getOnDatagramSignature(DatagramPacket datagramPacket, Ch } private static Object[] getOnErrorSignature(String message) { - return new Object[]{Utils.createUdpError(message), true}; + return new Object[]{Utils.createUdpError(message)}; } private static BObject createClient(DatagramPacket datagramPacket, Channel channel) { @@ -159,19 +165,55 @@ private static BObject createClient(DatagramPacket datagramPacket, Channel chann public static void invokeRead(UdpService udpService, DatagramPacket datagramPacket, Channel channel) { ObjectType objectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(udpService.getService())); + for (MethodType method : objectType.getMethods()) { switch (method.getName()) { case Constants.ON_BYTES: Dispatcher.invokeOnBytes(udpService, datagramPacket, channel, - method.getType().getParameterTypes()); + getParameterTypes(method.getType().getParameters())); break; case Constants.ON_DATAGRAM: Dispatcher.invokeOnDatagram(udpService, datagramPacket, channel, - method.getType().getParameterTypes()); + getParameterTypes(method.getType().getParameters())); break; default: break; } } } + + private static Type[] getParameterTypes(Parameter[] parameters) { + Type[] parameterTypes = new Type[parameters.length]; + for (int i = 0; i < parameters.length; i++) { + parameterTypes[i] = parameters[i].type; + } + return parameterTypes; + } + + private static void handleResult(UdpService udpService, DatagramPacket datagramP, Channel channel, Object object) { + if (object instanceof BArray) { + // call writeBytes if the service returns byte[] + byte[] byteContent = ((BArray) object).getBytes(); + UdpListener.send(udpService, new DatagramPacket(Unpooled.wrappedBuffer(byteContent), + datagramP.sender()), channel); + } else if (object instanceof BMap) { + // call sendDatagram if the service returns Datagram + BMap datagram = (BMap) object; + String host = datagram.getStringValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_HOST)).getValue(); + int port = datagram.getIntValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_PORT)).intValue(); + BArray data = datagram.getArrayValue(StringUtils.fromString(Constants.DATAGRAM_DATA)); + byte[] byteContent = data.getBytes(); + DatagramPacket datagramPacket = new DatagramPacket(Unpooled.wrappedBuffer(byteContent), + new InetSocketAddress(host, port)); + UdpListener.send(udpService, datagramPacket, channel); + } else if (object instanceof BError) { + ((BError) object).printStackTrace(); + } + log.debug("Method successfully dispatched."); + } + + + public static void handleError(BError bError) { + bError.printStackTrace(); + } } diff --git a/native/src/main/java/io/ballerina/stdlib/udp/UdpCallback.java b/native/src/main/java/io/ballerina/stdlib/udp/UdpCallback.java deleted file mode 100644 index dc4c0601..00000000 --- a/native/src/main/java/io/ballerina/stdlib/udp/UdpCallback.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright (c) 2021 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.ballerina.stdlib.udp; - -import io.ballerina.runtime.api.async.Callback; -import io.ballerina.runtime.api.utils.StringUtils; -import io.ballerina.runtime.api.values.BArray; -import io.ballerina.runtime.api.values.BError; -import io.ballerina.runtime.api.values.BMap; -import io.ballerina.runtime.api.values.BString; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.socket.DatagramPacket; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; - -/** - * callback implementation. - */ -public class UdpCallback implements Callback { - - private static final Logger log = LoggerFactory.getLogger(UdpCallback.class); - - private UdpService udpService; - private Channel channel; - private DatagramPacket datagram; - - public UdpCallback(UdpService udpService, Channel channel, DatagramPacket datagram) { - this.udpService = udpService; - this.channel = channel; - this.datagram = datagram; - } - - public UdpCallback(UdpService udpService) { - this.udpService = udpService; - } - - public UdpCallback() {} - - @Override - public void notifySuccess(Object object) { - if (object instanceof BArray) { - // call writeBytes if the service returns byte[] - byte[] byteContent = ((BArray) object).getBytes(); - UdpListener.send(udpService, new DatagramPacket(Unpooled.wrappedBuffer(byteContent), - datagram.sender()), channel); - } else if (object instanceof BMap) { - // call sendDatagram if the service returns Datagram - BMap datagram = (BMap) object; - String host = datagram.getStringValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_HOST)).getValue(); - int port = datagram.getIntValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_PORT)).intValue(); - BArray data = datagram.getArrayValue(StringUtils.fromString(Constants.DATAGRAM_DATA)); - byte[] byteContent = data.getBytes(); - DatagramPacket datagramPacket = new DatagramPacket(Unpooled.wrappedBuffer(byteContent), - new InetSocketAddress(host, port)); - UdpListener.send(udpService, datagramPacket, channel); - } else if (object instanceof BError) { - ((BError) object).printStackTrace(); - } - log.debug("Method successfully dispatched."); - } - - @Override - public void notifyFailure(BError bError) { - bError.printStackTrace(); - } -} diff --git a/native/src/main/java/io/ballerina/stdlib/udp/UdpClient.java b/native/src/main/java/io/ballerina/stdlib/udp/UdpClient.java index fff47def..774039cd 100644 --- a/native/src/main/java/io/ballerina/stdlib/udp/UdpClient.java +++ b/native/src/main/java/io/ballerina/stdlib/udp/UdpClient.java @@ -18,7 +18,6 @@ package io.ballerina.stdlib.udp; -import io.ballerina.runtime.api.Future; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; @@ -33,6 +32,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.LinkedList; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -45,7 +45,7 @@ public class UdpClient { // create connection oriented client public UdpClient(InetSocketAddress localAddress, InetSocketAddress remoteAddress, - EventLoopGroup group, Future callback) { + EventLoopGroup group, CompletableFuture balFuture) { clientBootstrap = new Bootstrap(); clientBootstrap.group(group) .channel(NioDatagramChannel.class) @@ -56,13 +56,13 @@ protected void initChannel(Channel ch) throws Exception { } }); if (remoteAddress != null) { - this.connect(remoteAddress, localAddress, callback); + this.connect(remoteAddress, localAddress, balFuture); } } // create connection less client public UdpClient(InetSocketAddress localAddress, EventLoopGroup group, - Future callback) { + CompletableFuture balFuture) { clientBootstrap = new Bootstrap(); clientBootstrap.group(group) .channel(NioDatagramChannel.class) @@ -75,16 +75,16 @@ protected void initChannel(Channel ch) throws Exception { if (future.isSuccess()) { channel = future.channel(); channel.config().setAutoRead(false); - callback.complete(null); + balFuture.complete(null); } else { - callback.complete(Utils.createUdpError("Error initializing UDP Client")); + balFuture.complete(Utils.createUdpError("Error initializing UDP Client")); } }); } // needed for connection oriented client private void connect(SocketAddress remoteAddress, SocketAddress localAddress, - Future callback) { + CompletableFuture balFuture) { clientBootstrap.connect(remoteAddress, localAddress) .addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { @@ -92,23 +92,23 @@ private void connect(SocketAddress remoteAddress, SocketAddress localAddress, channel.pipeline().replace(Constants.CONNECTIONLESS_CLIENT_HANDLER, Constants.CONNECT_CLIENT_HANDLER, new UdpConnectClientHandler()); channel.config().setAutoRead(false); - callback.complete(null); + balFuture.complete(null); } else { - callback.complete(Utils.createUdpError("Can't connect to remote host: " + balFuture.complete(Utils.createUdpError("Can't connect to remote host: " + future.cause().getMessage())); } }); } - public void sendData(DatagramPacket datagram, Future callback) { + public void sendData(DatagramPacket datagram, CompletableFuture balFuture) { LinkedList fragments = Utils.fragmentDatagram(datagram); PromiseCombiner promiseCombiner = getPromiseCombiner(fragments); promiseCombiner.finish(channel.newPromise().addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { - callback.complete(null); + balFuture.complete(null); } else { - callback.complete(Utils + balFuture.complete(Utils .createUdpError("Failed to send data: " + future.cause().getMessage())); } })); @@ -116,7 +116,7 @@ public void sendData(DatagramPacket datagram, Future callback) { private PromiseCombiner getPromiseCombiner(LinkedList fragments) { PromiseCombiner promiseCombiner = new PromiseCombiner(ImmediateEventExecutor.INSTANCE); - while (fragments.size() > 0) { + while (!fragments.isEmpty()) { if (channel.isWritable()) { promiseCombiner.add(channel.writeAndFlush(fragments.poll())); } @@ -124,7 +124,8 @@ private PromiseCombiner getPromiseCombiner(LinkedList fragments) return promiseCombiner; } - public void receiveData(double readTimeoutInSec, Future callback) { + public void receiveData(double readTimeoutInSec, CompletableFuture balFuture) { + long readTimeoutInNano = (long) (readTimeoutInSec * 1_000_000_000); channel.pipeline().addFirst(Constants.READ_TIMEOUT_HANDLER, new IdleStateHandler(readTimeoutInNano, 0, 0, TimeUnit.NANOSECONDS)); @@ -132,22 +133,22 @@ public void receiveData(double readTimeoutInSec, Future callback) { if (channel.pipeline().get(Constants.CONNECTIONLESS_CLIENT_HANDLER) != null) { UdpClientHandler handler = (UdpClientHandler) channel.pipeline(). get(Constants.CONNECTIONLESS_CLIENT_HANDLER); - handler.setCallback(callback); + handler.setBalFuture(balFuture); } else { UdpConnectClientHandler handler = (UdpConnectClientHandler) channel.pipeline(). get(Constants.CONNECT_CLIENT_HANDLER); - handler.setCallback(callback); + handler.setBalFuture(balFuture); } channel.read(); } - public void close(Future callback) { + public void close(CompletableFuture balFuture) { channel.close().addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { - callback.complete(null); + balFuture.complete(null); } else { - callback.complete(Utils.createUdpError("Unable to close the UDP client. " + balFuture.complete(Utils.createUdpError("Unable to close the UDP client. " + future.cause().getMessage())); } }); diff --git a/native/src/main/java/io/ballerina/stdlib/udp/UdpClientHandler.java b/native/src/main/java/io/ballerina/stdlib/udp/UdpClientHandler.java index 9730539a..7b5d4589 100644 --- a/native/src/main/java/io/ballerina/stdlib/udp/UdpClientHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/udp/UdpClientHandler.java @@ -18,55 +18,48 @@ package io.ballerina.stdlib.udp; -import io.ballerina.runtime.api.Future; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.timeout.IdleStateEvent; import java.net.PortUnreachableException; +import java.util.concurrent.CompletableFuture; /** * {@link UdpClientHandler} ia a ChannelInboundHandler implementation for udp client. */ public class UdpClientHandler extends SimpleChannelInboundHandler { - protected Future callback; + protected CompletableFuture balFuture; @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket datagramPacket) throws Exception { ctx.channel().pipeline().remove(Constants.READ_TIMEOUT_HANDLER); - if (callback != null) { - callback.complete(Utils.createReadonlyDatagramWithRecipientAddress(datagramPacket)); - } + balFuture.complete(Utils.createReadonlyDatagramWithRecipientAddress(datagramPacket)); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { ctx.channel().pipeline().remove(Constants.READ_TIMEOUT_HANDLER); - // return timeout error - if (callback != null) { - callback.complete(Utils.createUdpError("Read timed out")); - } + balFuture.complete(Utils.createUdpError("Read timed out")); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().pipeline().remove(Constants.READ_TIMEOUT_HANDLER); - if (callback != null) { String errorMsg = cause.getMessage(); if (cause instanceof PortUnreachableException) { errorMsg = "Port unreachable (" + ctx.channel().remoteAddress() + ")"; } - callback.complete(Utils.createUdpError(errorMsg)); - } + balFuture.complete(Utils.createUdpError(errorMsg)); } - public void setCallback(Future callback) { - this.callback = callback; + public void setBalFuture(CompletableFuture balFuture) { + this.balFuture = balFuture; } } diff --git a/native/src/main/java/io/ballerina/stdlib/udp/UdpConnectClientHandler.java b/native/src/main/java/io/ballerina/stdlib/udp/UdpConnectClientHandler.java index 131960c0..fe759637 100644 --- a/native/src/main/java/io/ballerina/stdlib/udp/UdpConnectClientHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/udp/UdpConnectClientHandler.java @@ -29,9 +29,7 @@ public class UdpConnectClientHandler extends UdpClientHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket datagramPacket) throws Exception { - if (callback != null) { - callback.complete(Utils.getReadonlyBytesFromDatagram(datagramPacket)); - } + balFuture.complete(Utils.getReadonlyBytesFromDatagram(datagramPacket)); ctx.channel().pipeline().remove(Constants.READ_TIMEOUT_HANDLER); } } diff --git a/native/src/main/java/io/ballerina/stdlib/udp/UdpFactory.java b/native/src/main/java/io/ballerina/stdlib/udp/UdpFactory.java index e093bcfa..46493c87 100644 --- a/native/src/main/java/io/ballerina/stdlib/udp/UdpFactory.java +++ b/native/src/main/java/io/ballerina/stdlib/udp/UdpFactory.java @@ -18,18 +18,18 @@ package io.ballerina.stdlib.udp; -import io.ballerina.runtime.api.Future; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; /** * {@link UdpFactory} creates {@link UdpClient} and UdpListener. */ public class UdpFactory { - private static volatile UdpFactory udpFactory; + private static volatile UdpFactory udpFactory = new UdpFactory();; private EventLoopGroup group; private UdpFactory() { @@ -37,22 +37,20 @@ private UdpFactory() { } public static UdpFactory getInstance() { - if (udpFactory == null) { - udpFactory = new UdpFactory(); - } return udpFactory; } - public UdpClient createUdpClient(InetSocketAddress localAddress, InetSocketAddress remoteAddress, Future callback) { - return new UdpClient(localAddress, remoteAddress, getInstance().group, callback); + public UdpClient createUdpClient(InetSocketAddress localAddress, InetSocketAddress remoteAddress, + CompletableFuture balFuture) { + return new UdpClient(localAddress, remoteAddress, getInstance().group, balFuture); } - public UdpClient createUdpClient(InetSocketAddress localAddress, Future callback) { - return new UdpClient(localAddress, getInstance().group, callback); + public UdpClient createUdpClient(InetSocketAddress localAddress, CompletableFuture balFuture) { + return new UdpClient(localAddress, getInstance().group, balFuture); } public UdpListener createUdpListener(InetSocketAddress localAddress, InetSocketAddress remoteAddress, - Future callback, UdpService udpService) { - return new UdpListener(localAddress, remoteAddress, getInstance().group, callback, udpService); + CompletableFuture balFuture, UdpService udpService) { + return new UdpListener(localAddress, remoteAddress, getInstance().group, balFuture, udpService); } } diff --git a/native/src/main/java/io/ballerina/stdlib/udp/UdpListener.java b/native/src/main/java/io/ballerina/stdlib/udp/UdpListener.java index 14cb054e..96a27207 100644 --- a/native/src/main/java/io/ballerina/stdlib/udp/UdpListener.java +++ b/native/src/main/java/io/ballerina/stdlib/udp/UdpListener.java @@ -18,7 +18,6 @@ package io.ballerina.stdlib.udp; -import io.ballerina.runtime.api.Future; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; @@ -32,6 +31,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.LinkedList; +import java.util.concurrent.CompletableFuture; /** * {@link UdpListener} creates the udp client and handles all the network operations. @@ -42,25 +42,25 @@ public class UdpListener { private final Bootstrap listenerBootstrap; public UdpListener(InetSocketAddress localAddress, InetSocketAddress remoteAddress, - EventLoopGroup group, Future callback, UdpService udpService) { + EventLoopGroup group, CompletableFuture balFuture, UdpService udpService) { listenerBootstrap = new Bootstrap(); listenerBootstrap.group(group) .channel(NioDatagramChannel.class) .handler(new ChannelInitializer<>() { @Override - protected void initChannel(Channel ch) throws Exception { + protected void initChannel(Channel ch) { ch.pipeline().addLast(Constants.LISTENER_HANDLER, new UdpListenerHandler(udpService)); } }); if (remoteAddress != null) { - connect(remoteAddress, localAddress, callback); + connect(remoteAddress, localAddress, balFuture); } else { listenerBootstrap.bind(localAddress).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { channel = future.channel(); - callback.complete(null); + balFuture.complete(null); } else { - callback.complete(Utils.createUdpError("Unable to initialize UDP Listener: " + + balFuture.complete(Utils.createUdpError("Unable to initialize UDP Listener: " + future.cause().getMessage())); } }); @@ -68,15 +68,15 @@ protected void initChannel(Channel ch) throws Exception { } // invoke when caller call writeBytes() or sendDatagram() - public static void send(DatagramPacket datagram, Channel channel, Future callback) { + public static void send(DatagramPacket datagram, Channel channel, CompletableFuture balFuture) { LinkedList fragments = Utils.fragmentDatagram(datagram); PromiseCombiner promiseCombiner = getPromiseCombiner(fragments, channel); promiseCombiner.finish(channel.newPromise().addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { - callback.complete(null); + balFuture.complete(null); } else { - callback.complete(Utils + balFuture.complete(Utils .createUdpError("Failed to send data: " + future.cause().getMessage())); } })); @@ -96,7 +96,7 @@ public static void send(UdpService udpService, DatagramPacket datagram, Channel private static PromiseCombiner getPromiseCombiner(LinkedList fragments, Channel channel) { PromiseCombiner promiseCombiner = new PromiseCombiner(ImmediateEventExecutor.INSTANCE); - while (fragments.size() > 0) { + while (!fragments.isEmpty()) { if (channel.isWritable()) { promiseCombiner.add(channel.writeAndFlush(fragments.poll())); } @@ -105,24 +105,24 @@ private static PromiseCombiner getPromiseCombiner(LinkedList fra } // only invoke if the listener is a connected listener - private void connect(SocketAddress remoteAddress, SocketAddress localAddress, Future callback) { + private void connect(SocketAddress remoteAddress, SocketAddress localAddress, CompletableFuture balFuture) { listenerBootstrap.connect(remoteAddress, localAddress).addListener((ChannelFutureListener) future -> { channel = future.channel(); if (future.isSuccess()) { - callback.complete(null); + balFuture.complete(null); } else { - callback.complete(Utils.createUdpError("Can't connect to remote host.")); + balFuture.complete(Utils.createUdpError("Can't connect to remote host.")); } }); } - public void close(Future callback) throws InterruptedException { + public void close(CompletableFuture balFuture) throws InterruptedException { if (channel != null) { channel.close().sync().addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { - callback.complete(null); + balFuture.complete(null); } else { - callback.complete(Utils.createUdpError("Failed to gracefully shutdown the Listener.")); + balFuture.complete(Utils.createUdpError("Failed to gracefully shutdown the Listener.")); } }); } diff --git a/native/src/main/java/io/ballerina/stdlib/udp/Utils.java b/native/src/main/java/io/ballerina/stdlib/udp/Utils.java index f82d04ec..5d0329c9 100644 --- a/native/src/main/java/io/ballerina/stdlib/udp/Utils.java +++ b/native/src/main/java/io/ballerina/stdlib/udp/Utils.java @@ -24,15 +24,19 @@ import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BArray; +import io.ballerina.runtime.api.values.BDecimal; import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; import io.netty.buffer.ByteBuf; import io.netty.channel.socket.DatagramPacket; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import java.util.concurrent.CompletableFuture; /** * Represents the util functions of Socket operations. @@ -118,4 +122,32 @@ public static void setModule(Environment env) { public static Module getModule() { return udpModule; } + + public static Object getResult(CompletableFuture balFuture) { + try { + return balFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw ErrorCreator.createError(e); + } catch (Throwable throwable) { + throw ErrorCreator.createError(throwable); + } + } + + public static InetSocketAddress getLocalInetSocketAddress(BObject client, BMap config) { + BString host = config.getStringValue(StringUtils.fromString(Constants.CONFIG_LOCALHOST)); + InetSocketAddress localAddress; + if (host == null) { + // A port number of zero will let the system pick up an ephemeral port in a bind operation. + localAddress = new InetSocketAddress(0); + } else { + localAddress = new InetSocketAddress(host.getValue(), 0); + } + + double timeout = + ((BDecimal) config.get(StringUtils.fromString(Constants.CONFIG_READ_TIMEOUT))).floatValue(); + client.addNativeData(Constants.CONFIG_READ_TIMEOUT, timeout); + return localAddress; + } + } diff --git a/native/src/main/java/io/ballerina/stdlib/udp/nativeclient/Client.java b/native/src/main/java/io/ballerina/stdlib/udp/nativeclient/Client.java index 495ad41a..2b682a25 100644 --- a/native/src/main/java/io/ballerina/stdlib/udp/nativeclient/Client.java +++ b/native/src/main/java/io/ballerina/stdlib/udp/nativeclient/Client.java @@ -19,10 +19,8 @@ package io.ballerina.stdlib.udp.nativeclient; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BArray; -import io.ballerina.runtime.api.values.BDecimal; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; @@ -33,6 +31,10 @@ import io.netty.channel.socket.DatagramPacket; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; + +import static io.ballerina.stdlib.udp.Utils.getLocalInetSocketAddress; +import static io.ballerina.stdlib.udp.Utils.getResult; /** * Native function implementations of the UDP ConnectionlessClient. @@ -44,57 +46,47 @@ public final class Client { private Client() {} public static Object init(Environment env, BObject client, BMap config) { - final Future balFuture = env.markAsync(); - - BString host = config.getStringValue(StringUtils.fromString(Constants.CONFIG_LOCALHOST)); - InetSocketAddress localAddress; - if (host == null) { - // A port number of zero will let the system pick up an ephemeral port in a bind operation. - localAddress = new InetSocketAddress(0); - } else { - localAddress = new InetSocketAddress(host.getValue(), 0); - } - - double timeout = ((BDecimal) config.get(StringUtils.fromString(Constants.CONFIG_READ_TIMEOUT))).floatValue(); - client.addNativeData(Constants.CONFIG_READ_TIMEOUT, timeout); - - UdpClient udpClient = UdpFactory.getInstance().createUdpClient(localAddress, balFuture); - client.addNativeData(Constants.CONNECTIONLESS_CLIENT, udpClient); - - return null; + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + BString host = config.getStringValue(StringUtils.fromString(Constants.CONFIG_LOCALHOST)); + InetSocketAddress localAddress = getLocalInetSocketAddress(client, config); + UdpClient udpClient = UdpFactory.getInstance().createUdpClient(localAddress, balFuture); + client.addNativeData(Constants.CONNECTIONLESS_CLIENT, udpClient); + return getResult(balFuture); + }); } public static Object receive(Environment env, BObject client) { - final Future callback = env.markAsync(); - - double readTimeOut = (double) client.getNativeData(Constants.CONFIG_READ_TIMEOUT); - UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECTIONLESS_CLIENT); - udpClient.receiveData(readTimeOut, callback); - - return null; + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + double readTimeOut = (double) client.getNativeData(Constants.CONFIG_READ_TIMEOUT); + UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECTIONLESS_CLIENT); + udpClient.receiveData(readTimeOut, balFuture); + return getResult(balFuture); + }); } public static Object send(Environment env, BObject client, BMap datagram) { - final Future callback = env.markAsync(); - - String host = datagram.getStringValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_HOST)).getValue(); - int port = datagram.getIntValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_PORT)).intValue(); - BArray data = datagram.getArrayValue(StringUtils.fromString(Constants.DATAGRAM_DATA)); - byte[] byteContent = data.getBytes(); - DatagramPacket datagramPacket = new DatagramPacket(Unpooled.wrappedBuffer(byteContent), - new InetSocketAddress(host, port)); - UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECTIONLESS_CLIENT); - udpClient.sendData(datagramPacket, callback); - - return null; + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + String host = datagram.getStringValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_HOST)).getValue(); + int port = datagram.getIntValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_PORT)).intValue(); + BArray data = datagram.getArrayValue(StringUtils.fromString(Constants.DATAGRAM_DATA)); + byte[] byteContent = data.getBytes(); + DatagramPacket datagramPacket = new DatagramPacket(Unpooled.wrappedBuffer(byteContent), + new InetSocketAddress(host, port)); + UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECTIONLESS_CLIENT); + udpClient.sendData(datagramPacket, balFuture); + return getResult(balFuture); + }); } public static Object close(Environment env, BObject client) { - final Future callback = env.markAsync(); - - UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECTIONLESS_CLIENT); - udpClient.close(callback); - - return null; + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECTIONLESS_CLIENT); + udpClient.close(balFuture); + return getResult(balFuture); + }); } } diff --git a/native/src/main/java/io/ballerina/stdlib/udp/nativeclient/ConnectClient.java b/native/src/main/java/io/ballerina/stdlib/udp/nativeclient/ConnectClient.java index db69d392..dc939a0c 100644 --- a/native/src/main/java/io/ballerina/stdlib/udp/nativeclient/ConnectClient.java +++ b/native/src/main/java/io/ballerina/stdlib/udp/nativeclient/ConnectClient.java @@ -19,10 +19,7 @@ package io.ballerina.stdlib.udp.nativeclient; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Future; -import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BArray; -import io.ballerina.runtime.api.values.BDecimal; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; @@ -33,6 +30,10 @@ import io.netty.channel.socket.DatagramPacket; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; + +import static io.ballerina.stdlib.udp.Utils.getLocalInetSocketAddress; +import static io.ballerina.stdlib.udp.Utils.getResult; /** * Native function implementations of the UDP ConnectionlessClient. @@ -45,58 +46,45 @@ private ConnectClient() {} public static Object init(Environment env, BObject client, BString remoteHost, int remotePort, BMap config) { - final Future balFuture = env.markAsync(); - - BString host = config.getStringValue(StringUtils.fromString(Constants.CONFIG_LOCALHOST)); - InetSocketAddress localAddress; - if (host == null) { - // A port number of zero will let the system pick up an ephemeral port in a bind operation. - localAddress = new InetSocketAddress(0); - } else { - localAddress = new InetSocketAddress(host.getValue(), 0); - } - - double timeout = ((BDecimal) config.get(StringUtils.fromString(Constants.CONFIG_READ_TIMEOUT))).floatValue(); - client.addNativeData(Constants.CONFIG_READ_TIMEOUT, timeout); - - InetSocketAddress remoteAddress = new InetSocketAddress(remoteHost.getValue(), remotePort); - client.addNativeData(Constants.REMOTE_ADDRESS, remoteAddress); - - UdpClient udpClient = UdpFactory.getInstance().createUdpClient(localAddress, remoteAddress, balFuture); - client.addNativeData(Constants.CONNECT_CLIENT, udpClient); - - return null; + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + InetSocketAddress localAddress = getLocalInetSocketAddress(client, config); + InetSocketAddress remoteAddress = new InetSocketAddress(remoteHost.getValue(), remotePort); + client.addNativeData(Constants.REMOTE_ADDRESS, remoteAddress); + UdpClient udpClient = UdpFactory.getInstance().createUdpClient(localAddress, remoteAddress, balFuture); + client.addNativeData(Constants.CONNECT_CLIENT, udpClient); + return getResult(balFuture); + }); } public static Object read(Environment env, BObject client) { - final Future balFuture = env.markAsync(); - - double readTimeOut = (double) client.getNativeData(Constants.CONFIG_READ_TIMEOUT); - UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECT_CLIENT); - udpClient.receiveData(readTimeOut, balFuture); - - return null; + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + double readTimeOut = (double) client.getNativeData(Constants.CONFIG_READ_TIMEOUT); + UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECT_CLIENT); + udpClient.receiveData(readTimeOut, balFuture); + return getResult(balFuture); + }); } public static Object write(Environment env, BObject client, BArray data) { - final Future balFuture = env.markAsync(); - - byte[] byteContent = data.getBytes(); - InetSocketAddress remoteAddress = (InetSocketAddress) client.getNativeData(Constants.REMOTE_ADDRESS); - DatagramPacket datagramPacket = new DatagramPacket(Unpooled.wrappedBuffer(byteContent), remoteAddress); - - UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECT_CLIENT); - udpClient.sendData(datagramPacket, balFuture); - - return null; + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + byte[] byteContent = data.getBytes(); + InetSocketAddress remoteAddress = (InetSocketAddress) client.getNativeData(Constants.REMOTE_ADDRESS); + DatagramPacket datagramPacket = new DatagramPacket(Unpooled.wrappedBuffer(byteContent), remoteAddress); + UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECT_CLIENT); + udpClient.sendData(datagramPacket, balFuture); + return null; + }); } public static Object close(Environment env, BObject client) { - final Future callback = env.markAsync(); - - UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECT_CLIENT); - udpClient.close(callback); - - return null; + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + UdpClient udpClient = (UdpClient) client.getNativeData(Constants.CONNECT_CLIENT); + udpClient.close(balFuture); + return getResult(balFuture); + }); } } diff --git a/native/src/main/java/io/ballerina/stdlib/udp/nativelistener/Caller.java b/native/src/main/java/io/ballerina/stdlib/udp/nativelistener/Caller.java index 83f475de..60195041 100644 --- a/native/src/main/java/io/ballerina/stdlib/udp/nativelistener/Caller.java +++ b/native/src/main/java/io/ballerina/stdlib/udp/nativelistener/Caller.java @@ -19,7 +19,6 @@ package io.ballerina.stdlib.udp.nativelistener; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BArray; import io.ballerina.runtime.api.values.BMap; @@ -32,6 +31,9 @@ import io.netty.channel.socket.DatagramPacket; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; + +import static io.ballerina.stdlib.udp.Utils.getResult; /** * Native function implementations of the UDP Caller. @@ -41,32 +43,34 @@ public final class Caller { private Caller() {} public static Object sendBytes(Environment env, BObject caller, BArray data) { - final Future callback = env.markAsync(); - - byte[] byteContent = data.getBytes(); - String remoteHost = ((BString) caller.getStringValue(StringUtils.fromString(Constants.CALLER_REMOTE_HOST))) - .getValue(); - int remotePort = ((Integer) caller.get(StringUtils.fromString(Constants.CALLER_REMOTE_PORT))); - InetSocketAddress remoteAddress = new InetSocketAddress(remoteHost, remotePort); - DatagramPacket datagram = new DatagramPacket(Unpooled.wrappedBuffer(byteContent), remoteAddress); - Channel channel = (Channel) caller.getNativeData(Constants.CHANNEL); + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + byte[] byteContent = data.getBytes(); + String remoteHost = caller.getStringValue(StringUtils.fromString(Constants.CALLER_REMOTE_HOST)) + .getValue(); + int remotePort = ((Integer) caller.get(StringUtils.fromString(Constants.CALLER_REMOTE_PORT))); + InetSocketAddress remoteAddress = new InetSocketAddress(remoteHost, remotePort); + DatagramPacket datagram = new DatagramPacket(Unpooled.wrappedBuffer(byteContent), remoteAddress); + Channel channel = (Channel) caller.getNativeData(Constants.CHANNEL); - UdpListener.send(datagram, channel, callback); - return null; + UdpListener.send(datagram, channel, balFuture); + return getResult(balFuture); + }); } public static Object sendDatagram(Environment env, BObject caller, BMap datagram) { - final Future callback = env.markAsync(); - - String host = datagram.getStringValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_HOST)).getValue(); - int port = datagram.getIntValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_PORT)).intValue(); - BArray data = datagram.getArrayValue(StringUtils.fromString(Constants.DATAGRAM_DATA)); - byte[] byteContent = data.getBytes(); - DatagramPacket datagramPacket = new DatagramPacket(Unpooled.wrappedBuffer(byteContent), - new InetSocketAddress(host, port)); + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + String host = datagram.getStringValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_HOST)).getValue(); + int port = datagram.getIntValue(StringUtils.fromString(Constants.DATAGRAM_REMOTE_PORT)).intValue(); + BArray data = datagram.getArrayValue(StringUtils.fromString(Constants.DATAGRAM_DATA)); + byte[] byteContent = data.getBytes(); + DatagramPacket datagramPacket = new DatagramPacket(Unpooled.wrappedBuffer(byteContent), + new InetSocketAddress(host, port)); - Channel channel = (Channel) caller.getNativeData(Constants.CHANNEL); - UdpListener.send(datagramPacket, channel, callback); - return null; + Channel channel = (Channel) caller.getNativeData(Constants.CHANNEL); + UdpListener.send(datagramPacket, channel, balFuture); + return getResult(balFuture); + }); } } diff --git a/native/src/main/java/io/ballerina/stdlib/udp/nativelistener/Listener.java b/native/src/main/java/io/ballerina/stdlib/udp/nativelistener/Listener.java index 2fa0dcbe..d51ed9d8 100644 --- a/native/src/main/java/io/ballerina/stdlib/udp/nativelistener/Listener.java +++ b/native/src/main/java/io/ballerina/stdlib/udp/nativelistener/Listener.java @@ -18,7 +18,6 @@ package io.ballerina.stdlib.udp.nativelistener; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; @@ -28,12 +27,12 @@ import io.ballerina.stdlib.udp.UdpListener; import io.ballerina.stdlib.udp.UdpService; import io.ballerina.stdlib.udp.Utils; -import io.ballerina.stdlib.udp.nativeclient.Client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; + +import static io.ballerina.stdlib.udp.Utils.getResult; /** * Native function implementations of the UDP Listener. @@ -42,8 +41,6 @@ public final class Listener { private Listener() {} - private static final Logger log = LoggerFactory.getLogger(Client.class); - public static Object init(BObject listener, int localPort, BMap config) { listener.addNativeData(Constants.LISTENER_CONFIG, config); listener.addNativeData(Constants.LOCAL_PORT, localPort); @@ -56,35 +53,33 @@ public static Object register(Environment env, BObject listener, BObject service } public static Object start(Environment env, BObject listener) { - Future balFuture = env.markAsync(); - - BMap config = (BMap) listener.getNativeData(Constants.LISTENER_CONFIG); - - BString localHost = config.getStringValue(StringUtils.fromString(Constants.CONFIG_LOCALHOST)); - int localPort = (int) listener.getNativeData(Constants.LOCAL_PORT); - InetSocketAddress localAddress; - if (localHost == null) { - localAddress = new InetSocketAddress(localPort); - } else { - String hostname = localHost.getValue(); - localAddress = new InetSocketAddress(hostname, localPort); - } - - InetSocketAddress remoteAddress; - BString remoteHost = config.getStringValue(StringUtils.fromString(Constants.CONFIG_REMOTE_HOST)); - Long remotePort = config.getIntValue(StringUtils.fromString(Constants.CONFIG_REMOTE_PORT)); - - try { - UdpService udpService = (UdpService) listener.getNativeData(Constants.SERVICE); - remoteAddress = getRemoteAddress(remoteHost, remotePort); - UdpListener udpListener = UdpFactory.getInstance() - .createUdpListener(localAddress, remoteAddress, balFuture, udpService); - listener.addNativeData(Constants.LISTENER, udpListener); - } catch (Exception e) { - balFuture.complete(Utils.createUdpError(e.getMessage())); - } - - return null; + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + BMap config = (BMap) listener.getNativeData(Constants.LISTENER_CONFIG); + + BString localHost = config.getStringValue(StringUtils.fromString(Constants.CONFIG_LOCALHOST)); + int localPort = (int) listener.getNativeData(Constants.LOCAL_PORT); + InetSocketAddress localAddress; + if (localHost == null) { + localAddress = new InetSocketAddress(localPort); + } else { + String hostname = localHost.getValue(); + localAddress = new InetSocketAddress(hostname, localPort); + } + InetSocketAddress remoteAddress; + BString remoteHost = config.getStringValue(StringUtils.fromString(Constants.CONFIG_REMOTE_HOST)); + Long remotePort = config.getIntValue(StringUtils.fromString(Constants.CONFIG_REMOTE_PORT)); + try { + UdpService udpService = (UdpService) listener.getNativeData(Constants.SERVICE); + remoteAddress = getRemoteAddress(remoteHost, remotePort); + UdpListener udpListener = UdpFactory.getInstance() + .createUdpListener(localAddress, remoteAddress, balFuture, udpService); + listener.addNativeData(Constants.LISTENER, udpListener); + } catch (Exception e) { + balFuture.complete(Utils.createUdpError(e.getMessage())); + } + return getResult(balFuture); + }); } public static Object detach(BObject listener) { @@ -97,19 +92,20 @@ public static Object detach(BObject listener) { } public static Object gracefulStop(Environment env, BObject listener) { - Future balFuture = env.markAsync(); - try { - UdpListener udpListener = (UdpListener) listener.getNativeData(Constants.LISTENER); - if (udpListener != null) { - udpListener.close(balFuture); - } else { - balFuture.complete(Utils.createUdpError("Unable to initialize the udp listener.")); + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + try { + UdpListener udpListener = (UdpListener) listener.getNativeData(Constants.LISTENER); + if (udpListener != null) { + udpListener.close(balFuture); + } else { + balFuture.complete(Utils.createUdpError("Unable to initialize the udp listener.")); + } + } catch (InterruptedException e) { + balFuture.complete(Utils.createUdpError("Failed to gracefully shutdown the Listener.")); } - } catch (InterruptedException e) { - balFuture.complete(Utils.createUdpError("Failed to gracefully shutdown the Listener.")); - } - - return null; + return getResult(balFuture); + }); } private static InetSocketAddress getRemoteAddress(BString remoteHost, Long remotePort) throws Exception { @@ -119,7 +115,6 @@ private static InetSocketAddress getRemoteAddress(BString remoteHost, Long remot return new InetSocketAddress(InetAddress.getByName(remoteHost.getValue()).getHostAddress(), (int) remotePort.longValue()); } - return null; } } diff --git a/test-utils/src/main/java/io/ballerina/stdlib/udp/testutils/MockServerUtils.java b/test-utils/src/main/java/io/ballerina/stdlib/udp/testutils/MockServerUtils.java index f1877649..f5d8eed1 100644 --- a/test-utils/src/main/java/io/ballerina/stdlib/udp/testutils/MockServerUtils.java +++ b/test-utils/src/main/java/io/ballerina/stdlib/udp/testutils/MockServerUtils.java @@ -39,9 +39,8 @@ private MockServerUtils() {} private static MockUdpServer mockUdpServer; public static Object startUdpServer() throws InterruptedException { - ExecutorService executor = Executors.newSingleThreadExecutor(); mockUdpServer = new MockUdpServer(); - executor.execute(mockUdpServer); + Thread.startVirtualThread(mockUdpServer); // Give some time to start the UDP server before starting the tests. Thread.sleep(3000); return null; @@ -53,8 +52,7 @@ public static Object stopUdpServer() { } public static Object passUdpContent(BString serverContent, int port) { - ExecutorService client = Executors.newSingleThreadExecutor(); - client.execute(() -> sendUdpContent(serverContent.getValue(), port)); + Thread.startVirtualThread(() -> sendUdpContent(serverContent.getValue(), port)); return null; }