diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 61ab1c2..a3a28ab 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,34 +1,34 @@ [package] org = "ballerina" name = "mqtt" -version = "1.2.0" +version = "1.2.1" authors = ["ballerina"] keywords = ["mqtt", "client", "messaging", "network", "pubsub", "iot"] repository = "https://github.com/ballerina-platform/module-ballerina-mqtt" -distribution = "2201.10.0" +distribution = "2201.11.0-20241112-214900-6b80ab87" -[platform.java17] +[platform.java21] graalvmCompatible = true -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "mqtt-native" -version = "1.2.0" -path = "../native/build/libs/mqtt-native-1.2.0.jar" +version = "1.2.1" +path = "../native/build/libs/mqtt-native-1.2.1-SNAPSHOT.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.eclipse.paho" artifactId = "org.eclipse.paho.mqttv5.client" version = "1.2.5" path = "./lib/org.eclipse.paho.mqttv5.client-1.2.5.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.bouncycastle" artifactId = "bcpkix-jdk18on" version = "1.78" path = "./lib/bcpkix-jdk18on-1.78.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.bouncycastle" artifactId = "bcutil-jdk18on" version = "1.78" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index 81322c7..8e930b8 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "mqtt-compiler-plugin" class = "io.ballerina.stdlib.mqtt.compiler.MqttCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/mqtt-compiler-plugin-1.2.0.jar" +path = "../compiler-plugin/build/libs/mqtt-compiler-plugin-1.2.1-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 13b752f..d0e6369 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -5,12 +5,12 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.10.0" +distribution-version = "2201.11.0-20241112-214900-6b80ab87" [[package]] org = "ballerina" name = "crypto" -version = "2.7.2" +version = "2.7.3" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "time"} @@ -22,7 +22,7 @@ modules = [ [[package]] org = "ballerina" name = "io" -version = "1.6.1" +version = "1.6.2" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"}, @@ -107,7 +107,7 @@ dependencies = [ [[package]] org = "ballerina" name = "log" -version = "2.10.0" +version = "2.10.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, @@ -122,7 +122,7 @@ modules = [ [[package]] org = "ballerina" name = "mqtt" -version = "1.2.0" +version = "1.2.1" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "jballerina.java"}, @@ -138,7 +138,7 @@ modules = [ [[package]] org = "ballerina" name = "observe" -version = "1.3.0" +version = "1.4.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"} @@ -161,7 +161,7 @@ modules = [ [[package]] org = "ballerina" name = "time" -version = "2.4.0" +version = "2.6.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -169,7 +169,7 @@ dependencies = [ [[package]] org = "ballerina" name = "uuid" -version = "1.8.0" +version = "1.8.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "crypto"}, diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index e4d38d8..28b79b4 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -5,30 +5,30 @@ version = "@toml.version@" authors = ["ballerina"] keywords = ["mqtt", "client", "messaging", "network", "pubsub", "iot"] repository = "https://github.com/ballerina-platform/module-ballerina-mqtt" -distribution = "2201.10.0" +distribution = "2201.11.0-20241112-214900-6b80ab87" -[platform.java17] +[platform.java21] graalvmCompatible = true -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "mqtt-native" version = "@toml.version@" path = "../native/build/libs/mqtt-native-@project.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.eclipse.paho" artifactId = "org.eclipse.paho.mqttv5.client" version = "@paho.mqtt.version@" path = "./lib/org.eclipse.paho.mqttv5.client-@paho.mqtt.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.bouncycastle" artifactId = "bcpkix-jdk18on" version = "@bouncy.castle.version@" path = "./lib/bcpkix-jdk18on-@bouncy.castle.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.bouncycastle" artifactId = "bcutil-jdk18on" version = "@bouncy.castle.version@" diff --git a/compiler-plugin-tests/build.gradle b/compiler-plugin-tests/build.gradle index 38e76e9..fa00f33 100644 --- a/compiler-plugin-tests/build.gradle +++ b/compiler-plugin-tests/build.gradle @@ -50,8 +50,11 @@ checkstyle { checkstyleTest.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsTest { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/compiler-plugin/build.gradle b/compiler-plugin/build.gradle index 546bcd9..6c1f1d1 100644 --- a/compiler-plugin/build.gradle +++ b/compiler-plugin/build.gradle @@ -47,8 +47,11 @@ checkstyle { checkstyleMain.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsMain { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/gradle.properties b/gradle.properties index 4b62f63..29466f6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,10 +1,10 @@ org.gradle.caching=true group=io.ballerina.stdlib version=1.2.1-SNAPSHOT -ballerinaLangVersion=2201.10.0 +ballerinaLangVersion=2201.11.0-20241112-214900-6b80ab87 checkstylePluginVersion=10.12.1 -spotbugsPluginVersion=5.0.14 +spotbugsPluginVersion=6.0.18 shadowJarPluginVersion=8.1.1 downloadPluginVersion=5.4.0 releasePluginVersion=2.8.0 @@ -20,16 +20,16 @@ pahoMqtt5Version=1.2.5 #stdlib dependencies # Level 01 -stdlibTimeVersion=2.4.0 -stdlibIoVersion=1.6.0 +stdlibTimeVersion=2.6.0-20241113-073800-201b904 +stdlibIoVersion=1.6.2-20241112-233100-995cf5f # Level 02 -stdlibLogVersion=2.10.0 -stdlibCryptoVersion=2.7.2 -stdlibRandomVersion=1.5.0 +stdlibLogVersion=2.10.1-20241113-120000-4577868 +stdlibCryptoVersion=2.7.3-20241113-081400-d015a39 +stdlibRandomVersion=1.5.1-20241113-122300-1bc770e # Level 03 -stdlibUuidVersion=1.8.0 +stdlibUuidVersion=1.8.1-20241113-154400-443c67b -observeVersion=1.3.0 -observeInternalVersion=1.3.0 +observeVersion=1.4.0-20241113-092000-b83ae74 +observeInternalVersion=1.3.1-20241113-101700-265054d diff --git a/native/build.gradle b/native/build.gradle index e8523db..6c6d532 100644 --- a/native/build.gradle +++ b/native/build.gradle @@ -30,8 +30,11 @@ checkstyle { checkstyleMain.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsMain { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/native/src/main/java/io/ballerina/stdlib/mqtt/caller/CallerActions.java b/native/src/main/java/io/ballerina/stdlib/mqtt/caller/CallerActions.java index 45010bf..2b69c4a 100644 --- a/native/src/main/java/io/ballerina/stdlib/mqtt/caller/CallerActions.java +++ b/native/src/main/java/io/ballerina/stdlib/mqtt/caller/CallerActions.java @@ -19,7 +19,6 @@ package io.ballerina.stdlib.mqtt.caller; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.stdlib.mqtt.utils.MqttConstants; @@ -47,16 +46,16 @@ public static Object complete(Environment env, BObject callerObject) { MqttClient subscriber = (MqttClient) callerObject.getNativeData(MqttConstants.SUBSCRIBER); int messageId = (int) callerObject.getNativeData(MqttConstants.MESSAGE_ID); int qos = (int) callerObject.getNativeData(MqttConstants.QOS); - Future future = env.markAsync(); - executorService.execute(() -> { - try { - subscriber.messageArrivedComplete(messageId, qos); - future.complete(null); - } catch (MqttException e) { - future.complete(MqttUtils.createMqttError(e)); - } + return env.yieldAndRun(() -> { + executorService.execute(() -> { + try { + subscriber.messageArrivedComplete(messageId, qos); + } catch (MqttException e) { + throw MqttUtils.createMqttError(e); + } + }); + return null; }); - return null; } public static Object respond(Environment env, BObject callerObject, BMap message) { @@ -70,16 +69,15 @@ public static Object respond(Environment env, BObject callerObject, BMap message if (Objects.nonNull(correlationData)) { mqttMessage.getProperties().setCorrelationData(correlationData); } - Future future = env.markAsync(); - executorService.execute(() -> { - try { - subscriber.publish(responseTopic, mqttMessage); - future.complete(null); - } catch (MqttException e) { - future.complete(MqttUtils.createMqttError(e)); - } + return env.yieldAndRun(() -> { + executorService.execute(() -> { + try { + subscriber.publish(responseTopic, mqttMessage); + } catch (MqttException e) { + throw MqttUtils.createMqttError(e); + } + }); + return null; }); - return null; } - } diff --git a/native/src/main/java/io/ballerina/stdlib/mqtt/client/ClientActions.java b/native/src/main/java/io/ballerina/stdlib/mqtt/client/ClientActions.java index be09654..4057a50 100644 --- a/native/src/main/java/io/ballerina/stdlib/mqtt/client/ClientActions.java +++ b/native/src/main/java/io/ballerina/stdlib/mqtt/client/ClientActions.java @@ -19,7 +19,6 @@ package io.ballerina.stdlib.mqtt.client; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.creators.TypeCreator; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.StreamType; @@ -107,24 +106,20 @@ public static Object externSubscribe(BObject clientObject, BArray subscriptions) public static Object externPublish(Environment env, BObject clientObject, BString topic, BMap message) { MqttClient publisher = (MqttClient) clientObject.getNativeData(MqttConstants.MQTT_CLIENT); MqttMessage mqttMessage = generateMqttMessage(message); - try { - Future future = env.markAsync(); - publisher.publish(topic.getValue(), mqttMessage); - LinkedBlockingQueue deliveryTokenQueue = (LinkedBlockingQueue) clientObject - .getNativeData(DELIVERY_TOKEN_QUEUE); - publishExecutorService.execute(() -> { - try { - future.complete(deliveryTokenQueue.take()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - future.complete(MqttUtils.createMqttError(e)); - } - }); - } catch (MqttException e) { + return env.yieldAndRun(() -> { + try { + publisher.publish(topic.getValue(), mqttMessage); + LinkedBlockingQueue deliveryTokenQueue = (LinkedBlockingQueue) clientObject + .getNativeData(DELIVERY_TOKEN_QUEUE); + return deliveryTokenQueue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return MqttUtils.createMqttError(e); + } catch (MqttException e) { return MqttUtils.createMqttError(e); } - return null; - } + }); +} public static Object externReceive(BObject clientObject, BTypedesc bTypedesc) { LinkedBlockingQueue blockingQueue = (LinkedBlockingQueue) clientObject.getNativeData(RESPONSE_QUEUE); @@ -179,18 +174,15 @@ public static Object externReconnect(BObject clientObject) { public static Object nextResult(Environment env, BObject streamIterator) { BlockingQueue messageQueue = (BlockingQueue) streamIterator.getNativeData(RESPONSE_QUEUE); - ExecutorService executor = (ExecutorService) streamIterator.getNativeData(RESPONSE_EXECUTOR_SERVICE); - Future future = env.markAsync(); - executor.execute(() -> { + return env.yieldAndRun(() -> { try { BMap message = (BMap) messageQueue.take(); - future.complete(message); + return message; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - future.complete(MqttUtils.createMqttError(e)); + return MqttUtils.createMqttError(e); } }); - return null; } public static void closeStream(BObject streamIterator) { diff --git a/native/src/main/java/io/ballerina/stdlib/mqtt/listener/BServiceInvokeCallbackImpl.java b/native/src/main/java/io/ballerina/stdlib/mqtt/listener/BServiceInvokeCallbackImpl.java deleted file mode 100644 index 6c58273..0000000 --- a/native/src/main/java/io/ballerina/stdlib/mqtt/listener/BServiceInvokeCallbackImpl.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2023, WSO2 LLC. (https://www.wso2.com) All Rights Reserved. - * - * WSO2 LLC. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.ballerina.stdlib.mqtt.listener; - -import io.ballerina.runtime.api.async.Callback; -import io.ballerina.runtime.api.values.BError; - -import java.util.concurrent.CountDownLatch; - -/** - * Class containing the callback of onMessage service function. - */ -public class BServiceInvokeCallbackImpl implements Callback { - - private final CountDownLatch countDownLatch; - - public BServiceInvokeCallbackImpl(CountDownLatch countDownLatch) { - this.countDownLatch = countDownLatch; - } - - @Override - public void notifySuccess(Object obj) { - if (obj instanceof BError) { - ((BError) obj).printStackTrace(); - } - countDownLatch.countDown(); - } - - @Override - public void notifyFailure(BError bError) { - bError.printStackTrace(); - countDownLatch.countDown(); - System.exit(1); - } -} diff --git a/native/src/main/java/io/ballerina/stdlib/mqtt/listener/MqttListenerCallbackImpl.java b/native/src/main/java/io/ballerina/stdlib/mqtt/listener/MqttListenerCallbackImpl.java index 59951f9..b2aa4f7 100644 --- a/native/src/main/java/io/ballerina/stdlib/mqtt/listener/MqttListenerCallbackImpl.java +++ b/native/src/main/java/io/ballerina/stdlib/mqtt/listener/MqttListenerCallbackImpl.java @@ -19,10 +19,7 @@ package io.ballerina.stdlib.mqtt.listener; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Module; -import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.Runtime; -import io.ballerina.runtime.api.async.StrandMetadata; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.RemoteMethodType; import io.ballerina.runtime.api.types.ServiceType; @@ -42,8 +39,6 @@ import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import static io.ballerina.stdlib.mqtt.utils.ModuleUtils.getModule; import static io.ballerina.stdlib.mqtt.utils.MqttConstants.MESSAGE_ID; @@ -95,8 +90,6 @@ public void deliveryComplete(IMqttToken token) { private void invokeOnMessage(MqttMessage message, String topic) { BMap bMqttMessage = getBMqttMessage(message, topic); - StrandMetadata metadata = getStrandMetadata(MqttConstants.ONMESSAGE); - CountDownLatch latch = new CountDownLatch(1); boolean callerExists = isCallerAvailable(); if (!isMethodImplemented(MqttConstants.ONMESSAGE)) { invokeOnError(MqttUtils.createMqttError(new NoSuchMethodException("method onMessage not found"))); @@ -115,17 +108,23 @@ private void invokeOnMessage(MqttMessage message, String topic) { callerObject.addNativeData(MqttConstants.CORRELATION_DATA, message.getProperties().getCorrelationData()); } - runtime.invokeMethodAsyncSequentially(service, MqttConstants.ONMESSAGE, null, metadata, - new BServiceInvokeCallbackImpl(latch), null, PredefinedTypes.TYPE_ANY, - bMqttMessage, true, callerObject, true); + try { + Object result = runtime.callMethod(service, MqttConstants.ONMESSAGE, null, bMqttMessage, callerObject); + if (result instanceof BError error) { + error.printStackTrace(); + } + } catch (BError bError) { + bError.printStackTrace(); + } } else { - runtime.invokeMethodAsyncSequentially(service, MqttConstants.ONMESSAGE, null, metadata, - new BServiceInvokeCallbackImpl(latch), null, PredefinedTypes.TYPE_ANY, bMqttMessage, true); - } - try { - latch.await(100, TimeUnit.SECONDS); - } catch (InterruptedException exception) { - exception.printStackTrace(); + try { + Object result = runtime.callMethod(service, MqttConstants.ONMESSAGE, null, bMqttMessage); + if (result instanceof BError error) { + error.printStackTrace(); + } + } catch (BError bError) { + bError.printStackTrace(); + } } } @@ -134,14 +133,13 @@ private void invokeOnError(BError bError) { bError.printStackTrace(); return; } - StrandMetadata metadata = getStrandMetadata(MqttConstants.ONERROR); - CountDownLatch latch = new CountDownLatch(1); - runtime.invokeMethodAsyncSequentially(service, MqttConstants.ONERROR, null, metadata, - new BServiceInvokeCallbackImpl(latch), null, PredefinedTypes.TYPE_ANY, bError, true); try { - latch.await(100, TimeUnit.SECONDS); - } catch (InterruptedException exception) { - exception.printStackTrace(); + Object result = runtime.callMethod(service, MqttConstants.ONERROR, null, bError); + if (result instanceof BError error) { + error.printStackTrace(); + } + } catch (BError error) { + bError.printStackTrace(); } } @@ -151,14 +149,13 @@ private void invokeOnComplete(IMqttToken token) { } BMap bMqttToken; bMqttToken = getMqttDeliveryToken(token); - StrandMetadata metadata = getStrandMetadata(MqttConstants.ONCOMPLETE); - CountDownLatch latch = new CountDownLatch(1); - runtime.invokeMethodAsyncSequentially(service, MqttConstants.ONCOMPLETE, null, metadata, - new BServiceInvokeCallbackImpl(latch), null, PredefinedTypes.TYPE_ANY, bMqttToken, true); try { - latch.await(100, TimeUnit.SECONDS); - } catch (InterruptedException exception) { - exception.printStackTrace(); + Object result = runtime.callMethod(service, MqttConstants.ONCOMPLETE, null, bMqttToken); + if (result instanceof BError error) { + error.printStackTrace(); + } + } catch (BError bError) { + bError.printStackTrace(); } } @@ -181,9 +178,4 @@ private Optional getRemoteMethodType(String methodName) { } return Optional.empty(); } - - private StrandMetadata getStrandMetadata(String parentFunctionName) { - Module module = getModule(); - return new StrandMetadata(module.getOrg(), module.getName(), module.getMajorVersion(), parentFunctionName); - } }