diff --git a/.github/workflows/daily-build.yml b/.github/workflows/daily-build.yml index 54fb7da5..a9d37777 100644 --- a/.github/workflows/daily-build.yml +++ b/.github/workflows/daily-build.yml @@ -12,11 +12,11 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v3 with: distribution: 'temurin' - java-version: 17.0.7 + java-version: 21.0.3 # Build the project with Gradle - name: Build with Gradle diff --git a/README.md b/README.md index f3ae133c..d7c22075 100644 --- a/README.md +++ b/README.md @@ -143,7 +143,7 @@ This repository only contains the source code for the library. ### Set up the prerequisites -* Download and install Java SE Development Kit (JDK) version 17 (from one of the following locations). +* Download and install Java SE Development Kit (JDK) version 21 (from one of the following locations). * [Oracle](https://www.oracle.com/java/technologies/downloads/) diff --git a/ballerina-tests/Ballerina.toml b/ballerina-tests/Ballerina.toml index c963c804..58ec86fa 100644 --- a/ballerina-tests/Ballerina.toml +++ b/ballerina-tests/Ballerina.toml @@ -1,4 +1,4 @@ [package] org = "ballerinax" name = "nats_tests" -version = "3.1.0" +version = "3.1.1" diff --git a/ballerina-tests/Dependencies.toml b/ballerina-tests/Dependencies.toml index 7c66628d..ec872c75 100644 --- a/ballerina-tests/Dependencies.toml +++ b/ballerina-tests/Dependencies.toml @@ -5,12 +5,12 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.10.0" +distribution-version = "2201.11.0-20241117-133400-a3054b77" [[package]] org = "ballerina" name = "crypto" -version = "2.7.2" +version = "2.7.3" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"}, @@ -20,7 +20,7 @@ dependencies = [ [[package]] org = "ballerina" name = "io" -version = "1.6.1" +version = "1.6.2" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"}, @@ -114,7 +114,7 @@ dependencies = [ [[package]] org = "ballerina" name = "log" -version = "2.10.0" +version = "2.10.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, @@ -129,7 +129,7 @@ modules = [ [[package]] org = "ballerina" name = "observe" -version = "1.3.0" +version = "1.4.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"} @@ -152,7 +152,7 @@ modules = [ [[package]] org = "ballerina" name = "time" -version = "2.4.0" +version = "2.6.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"} @@ -171,7 +171,7 @@ dependencies = [ [[package]] org = "ballerinax" name = "nats" -version = "3.1.0" +version = "3.1.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "crypto"}, @@ -185,7 +185,7 @@ modules = [ [[package]] org = "ballerinax" name = "nats_tests" -version = "3.1.0" +version = "3.1.1" dependencies = [ {org = "ballerina", name = "lang.runtime"}, {org = "ballerina", name = "lang.string"}, diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 3065968a..f45c979a 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,31 +1,31 @@ [package] org = "ballerinax" name = "nats" -version = "3.1.0" +version = "3.1.1" authors = ["Ballerina"] keywords = ["service", "client", "messaging", "network", "pubsub"] repository = "https://github.com/ballerina-platform/module-ballerinax-nats" icon = "icon.png" license = ["Apache-2.0"] -distribution = "2201.10.0" +distribution = "2201.11.0-20241117-133400-a3054b77" -[platform.java17] +[platform.java21] graalvmCompatible = true -[[platform.java17.dependency]] -path = "./lib/jnats-2.16.0.jar" +[[platform.java21.dependency]] +path = "./lib/jnats-2.20.4.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "nats-native" -version = "3.1.0" -path = "../native/build/libs/nats-native-3.1.0.jar" +version = "3.1.1" +path = "../native/build/libs/nats-native-3.1.1-SNAPSHOT.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "constraint-native" -version = "1.5.0" -path = "./lib/constraint-native-1.5.0.jar" +version = "1.6.0" +path = "./lib/constraint-native-1.6.0-20241113-090900-d276ad5.jar" [build-options] observabilityIncluded=true diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index 0a815cce..2d359a39 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "nats-compiler-plugin" class = "io.ballerina.stdlib.nats.plugin.NatsCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/nats-compiler-plugin-3.1.0.jar" +path = "../compiler-plugin/build/libs/nats-compiler-plugin-3.1.1-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index f4fab0fa..58fa4706 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -5,12 +5,12 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.10.0" +distribution-version = "2201.11.0-20241117-133400-a3054b77" [[package]] org = "ballerina" name = "constraint" -version = "1.5.0" +version = "1.6.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"} @@ -22,7 +22,7 @@ modules = [ [[package]] org = "ballerina" name = "crypto" -version = "2.7.2" +version = "2.7.3" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "time"} @@ -34,7 +34,7 @@ modules = [ [[package]] org = "ballerina" name = "io" -version = "1.6.1" +version = "1.6.2" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"}, @@ -130,7 +130,7 @@ dependencies = [ [[package]] org = "ballerina" name = "log" -version = "2.10.0" +version = "2.10.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, @@ -145,7 +145,7 @@ modules = [ [[package]] org = "ballerina" name = "observe" -version = "1.3.0" +version = "1.4.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -167,7 +167,7 @@ modules = [ [[package]] org = "ballerina" name = "time" -version = "2.4.0" +version = "2.6.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -187,7 +187,7 @@ modules = [ [[package]] org = "ballerinax" name = "nats" -version = "3.1.0" +version = "3.1.1" dependencies = [ {org = "ballerina", name = "constraint"}, {org = "ballerina", name = "crypto"}, diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index bdbca044..5aac4962 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -7,21 +7,21 @@ keywords = ["service", "client", "messaging", "network", "pubsub"] repository = "https://github.com/ballerina-platform/module-ballerinax-nats" icon = "icon.png" license = ["Apache-2.0"] -distribution = "2201.10.0" +distribution = "2201.11.0-20241117-133400-a3054b77" -[platform.java17] +[platform.java21] graalvmCompatible = true -[[platform.java17.dependency]] +[[platform.java21.dependency]] path = "./lib/jnats-@nats.client.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "nats-native" version = "@toml.version@" path = "../native/build/libs/nats-native-@project.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "constraint-native" version = "@constraint.version@" diff --git a/build.gradle b/build.gradle index 76a74fa4..b2d9a6fa 100644 --- a/build.gradle +++ b/build.gradle @@ -17,10 +17,10 @@ */ plugins { - id "com.github.spotbugs" version "5.0.14" - id "com.github.johnrengelman.shadow" version "8.1.1" - id "de.undercouch.download" version "5.4.0" - id "net.researchgate.release" version "2.8.0" + id "com.github.spotbugs" version "${spotbugsPluginVersion}" + id "com.github.johnrengelman.shadow" version "${shadowJarPluginVersion}" + id "de.undercouch.download" version "${downloadPluginVersion}" + id "net.researchgate.release" version "${releasePluginVersion}" } allprojects { diff --git a/compiler-plugin-tests/build.gradle b/compiler-plugin-tests/build.gradle index 7b4ba2c9..e4dede91 100644 --- a/compiler-plugin-tests/build.gradle +++ b/compiler-plugin-tests/build.gradle @@ -50,8 +50,11 @@ checkstyle { checkstyleTest.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsTest { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/compiler-plugin/build.gradle b/compiler-plugin/build.gradle index fd211e1d..7a397ead 100644 --- a/compiler-plugin/build.gradle +++ b/compiler-plugin/build.gradle @@ -45,8 +45,11 @@ tasks.withType(Checkstyle) { checkstyleMain.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsMain { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/gradle.properties b/gradle.properties index 259bf344..94903d13 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,23 +2,27 @@ org.gradle.caching=true group=io.ballerina.stdlib version=3.1.1-SNAPSHOT -ballerinaLangVersion=2201.10.0 +ballerinaLangVersion=2201.11.0-20241117-133400-a3054b77 ballerinaGradlePluginVersion=2.0.1 +spotbugsPluginVersion=6.0.18 +shadowJarPluginVersion=8.1.1 +downloadPluginVersion=5.4.0 +releasePluginVersion=2.8.0 -natsVersion=2.16.0 +natsVersion=2.20.4 puppycrawlCheckstyleVersion=10.12.1 testngVersion=7.6.1 gsonVersion=2.8.8 slf4jVersion=1.7.30 -stdlibIoVersion=1.6.1 -stdlibTimeVersion=2.4.0 +stdlibIoVersion=1.6.2-20241112-233100-995cf5f +stdlibTimeVersion=2.6.0-20241113-073800-201b904 -stdlibConstraintVersion=1.5.0 -stdlibLogVersion=2.10.0 -stdlibCryptoVersion=2.7.2 +stdlibConstraintVersion=1.6.0-20241113-090900-d276ad5 +stdlibLogVersion=2.10.1-20241113-120000-4577868 +stdlibCryptoVersion=2.7.3-20241113-081400-d015a39 -observeVersion=1.3.0 -observeInternalVersion=1.3.0 +observeVersion=1.4.0-20241113-092000-b83ae74 +observeInternalVersion=1.3.1-20241113-101700-265054d jacocoVersion=0.8.10 diff --git a/native/build.gradle b/native/build.gradle index 9c24aa59..5a59ae19 100644 --- a/native/build.gradle +++ b/native/build.gradle @@ -49,8 +49,11 @@ tasks.withType(Checkstyle) { checkstyleMain.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsMain { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/native/src/main/java/io/ballerina/stdlib/nats/Handler.java b/native/src/main/java/io/ballerina/stdlib/nats/Handler.java new file mode 100644 index 00000000..eb6d94c1 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/nats/Handler.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024 WSO2 Inc. (http://www.wso2.org) + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.ballerina.stdlib.nats; + +import io.ballerina.runtime.api.values.BError; + +/** + * This utility interface will support Classes that depends on notifySuccess and notifyFailure methods. + */ +public interface Handler { + void notifySuccess(Object result); + + void notifyFailure(BError bError); +} diff --git a/native/src/main/java/io/ballerina/stdlib/nats/Utils.java b/native/src/main/java/io/ballerina/stdlib/nats/Utils.java index 67c94204..06be116b 100644 --- a/native/src/main/java/io/ballerina/stdlib/nats/Utils.java +++ b/native/src/main/java/io/ballerina/stdlib/nats/Utils.java @@ -20,13 +20,13 @@ import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Module; -import io.ballerina.runtime.api.TypeTags; import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.ArrayType; import io.ballerina.runtime.api.types.IntersectionType; import io.ballerina.runtime.api.types.RecordType; import io.ballerina.runtime.api.types.Type; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.types.UnionType; import io.ballerina.runtime.api.utils.JsonUtils; import io.ballerina.runtime.api.utils.StringUtils; @@ -41,15 +41,17 @@ import io.ballerina.stdlib.constraint.Constraints; import java.nio.charset.StandardCharsets; - -import static io.ballerina.runtime.api.TypeTags.ANYDATA_TAG; -import static io.ballerina.runtime.api.TypeTags.ARRAY_TAG; -import static io.ballerina.runtime.api.TypeTags.BYTE_TAG; -import static io.ballerina.runtime.api.TypeTags.INTERSECTION_TAG; -import static io.ballerina.runtime.api.TypeTags.RECORD_TYPE_TAG; -import static io.ballerina.runtime.api.TypeTags.STRING_TAG; -import static io.ballerina.runtime.api.TypeTags.UNION_TAG; -import static io.ballerina.runtime.api.TypeTags.XML_TAG; +import java.util.HashMap; +import java.util.Map; + +import static io.ballerina.runtime.api.types.TypeTags.ANYDATA_TAG; +import static io.ballerina.runtime.api.types.TypeTags.ARRAY_TAG; +import static io.ballerina.runtime.api.types.TypeTags.BYTE_TAG; +import static io.ballerina.runtime.api.types.TypeTags.INTERSECTION_TAG; +import static io.ballerina.runtime.api.types.TypeTags.RECORD_TYPE_TAG; +import static io.ballerina.runtime.api.types.TypeTags.STRING_TAG; +import static io.ballerina.runtime.api.types.TypeTags.UNION_TAG; +import static io.ballerina.runtime.api.types.TypeTags.XML_TAG; import static io.ballerina.runtime.api.utils.TypeUtils.getReferredType; import static io.ballerina.stdlib.nats.Constants.NATS_ERROR; import static io.ballerina.stdlib.nats.Constants.PAYLOAD_BINDING_ERROR; @@ -190,4 +192,13 @@ public static BTypedesc getElementTypeDescFromArrayTypeDesc(BTypedesc bTypeDesc) } return ValueCreator.createTypedescValue((bTypeDesc.getDescribingType())); } + + public static Map getProperties(String resourceName) { + Map properties = new HashMap<>(); + properties.put("moduleOrg", getModule().getOrg()); + properties.put("moduleName", getModule().getName()); + properties.put("moduleVersion", getModule().getMajorVersion()); + properties.put("parentFunctionName", resourceName); + return properties; + } } diff --git a/native/src/main/java/io/ballerina/stdlib/nats/basic/client/Publish.java b/native/src/main/java/io/ballerina/stdlib/nats/basic/client/Publish.java index 2a22f8d4..4cc38353 100644 --- a/native/src/main/java/io/ballerina/stdlib/nats/basic/client/Publish.java +++ b/native/src/main/java/io/ballerina/stdlib/nats/basic/client/Publish.java @@ -19,7 +19,7 @@ package io.ballerina.stdlib.nats.basic.client; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.TypeTags; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BArray; diff --git a/native/src/main/java/io/ballerina/stdlib/nats/basic/client/Request.java b/native/src/main/java/io/ballerina/stdlib/nats/basic/client/Request.java index dc21cd64..778a1165 100644 --- a/native/src/main/java/io/ballerina/stdlib/nats/basic/client/Request.java +++ b/native/src/main/java/io/ballerina/stdlib/nats/basic/client/Request.java @@ -19,11 +19,11 @@ package io.ballerina.stdlib.nats.basic.client; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.TypeTags; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.Field; import io.ballerina.runtime.api.types.RecordType; import io.ballerina.runtime.api.types.Type; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BDecimal; @@ -72,16 +72,23 @@ public static Object requestMessage(Environment environment, BObject clientObj, (NatsMetricsReporter) clientObj.getNativeData(Constants.NATS_METRIC_UTIL); byte[] byteContent = convertDataIntoByteArray(data, TypeUtils.getType(data)); try { - Message reply; - Future incoming = natsConnection.request(subject, byteContent); - natsMetricsReporter.reportRequest(subject, byteContent.length); - if (TypeUtils.getType(duration).getTag() == TypeTags.DECIMAL_TAG) { - BigDecimal valueInSeconds = ((BDecimal) duration).decimalValue(); - int valueInMilliSeconds = (valueInSeconds).multiply(MILLISECOND_MULTIPLIER).intValue(); - reply = incoming.get(valueInMilliSeconds, TimeUnit.MILLISECONDS); - } else { - reply = incoming.get(); - } + Message reply = environment.yieldAndRun(() -> { + Message replyVal; + Future incoming = natsConnection.request(subject, byteContent); + natsMetricsReporter.reportRequest(subject, byteContent.length); + try { + if (TypeUtils.getType(duration).getTag() == TypeTags.DECIMAL_TAG) { + BigDecimal valueInSeconds = ((BDecimal) duration).decimalValue(); + int valueInMilliSeconds = (valueInSeconds).multiply(MILLISECOND_MULTIPLIER).intValue(); + replyVal = incoming.get(valueInMilliSeconds, TimeUnit.MILLISECONDS); + } else { + replyVal = incoming.get(); + } + return replyVal; + } catch (InterruptedException | TimeoutException | ExecutionException ex) { + throw Utils.createNatsError("Error while requesting message to subject " + subject + ".", ex); + } + }); RecordType recordType = Utils.getRecordType(bTypedesc); BMap msgRecord = ValueCreator.createRecordValue(recordType); @@ -93,11 +100,11 @@ public static Object requestMessage(Environment environment, BObject clientObj, StringUtils.fromString(reply.getSubject()), StringUtils.fromString(reply.getReplyTo())); boolean constraintValidation = (boolean) clientObj.getNativeData(CONSTRAINT_VALIDATION); - validateConstraints(populatedRecord, getElementTypeDescFromArrayTypeDesc(bTypedesc), constraintValidation); + validateConstraints(populatedRecord, getElementTypeDescFromArrayTypeDesc(bTypedesc), + constraintValidation); natsMetricsReporter.reportResponse(subject); return populatedRecord; - } catch (IllegalArgumentException | IllegalStateException | ExecutionException | InterruptedException | - TimeoutException ex) { + } catch (IllegalArgumentException | IllegalStateException ex) { natsMetricsReporter.reportProducerError(subject, NatsObservabilityConstants.ERROR_TYPE_REQUEST); return Utils.createNatsError("Error while requesting message to subject " + subject + ".", ex); } catch (BError bError) { diff --git a/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/DefaultMessageHandler.java b/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/DefaultMessageHandler.java index fbf8faad..5989b1ee 100644 --- a/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/DefaultMessageHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/DefaultMessageHandler.java @@ -18,18 +18,17 @@ package io.ballerina.stdlib.nats.basic.consumer; -import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.Runtime; -import io.ballerina.runtime.api.TypeTags; -import io.ballerina.runtime.api.async.Callback; -import io.ballerina.runtime.api.async.StrandMetadata; +import io.ballerina.runtime.api.concurrent.StrandMetadata; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.IntersectionType; import io.ballerina.runtime.api.types.MethodType; import io.ballerina.runtime.api.types.ObjectType; import io.ballerina.runtime.api.types.Parameter; +import io.ballerina.runtime.api.types.PredefinedTypes; import io.ballerina.runtime.api.types.RecordType; import io.ballerina.runtime.api.types.Type; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BArray; @@ -40,6 +39,7 @@ import io.ballerina.runtime.observability.ObservabilityConstants; import io.ballerina.runtime.observability.ObserveUtils; import io.ballerina.stdlib.nats.Constants; +import io.ballerina.stdlib.nats.Handler; import io.ballerina.stdlib.nats.Utils; import io.ballerina.stdlib.nats.observability.NatsMetricsReporter; import io.ballerina.stdlib.nats.observability.NatsObservabilityConstants; @@ -51,17 +51,13 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; -import static io.ballerina.runtime.api.TypeTags.INTERSECTION_TAG; -import static io.ballerina.runtime.api.TypeTags.RECORD_TYPE_TAG; +import static io.ballerina.runtime.api.types.TypeTags.INTERSECTION_TAG; +import static io.ballerina.runtime.api.types.TypeTags.RECORD_TYPE_TAG; import static io.ballerina.runtime.api.utils.TypeUtils.getReferredType; import static io.ballerina.stdlib.nats.Constants.CONSTRAINT_VALIDATION; import static io.ballerina.stdlib.nats.Constants.IS_ANYDATA_MESSAGE; import static io.ballerina.stdlib.nats.Constants.MESSAGE_CONTENT; -import static io.ballerina.stdlib.nats.Constants.NATS; -import static io.ballerina.stdlib.nats.Constants.ORG_NAME; import static io.ballerina.stdlib.nats.Constants.PARAM_ANNOTATION_PREFIX; import static io.ballerina.stdlib.nats.Constants.PARAM_PAYLOAD_ANNOTATION_NAME; import static io.ballerina.stdlib.nats.Constants.TYPE_CHECKER_OBJECT_NAME; @@ -125,14 +121,12 @@ public void onMessage(Message message) { private void dispatchOnRequest(String subject, String replyTo, byte[] data) throws InterruptedException { MethodType methodType = getAttachedFunctionType(this.serviceObject, Constants.ON_REQUEST_RESOURCE); - CountDownLatch countDownLatch = new CountDownLatch(1); try { Object[] arguments = getResourceArguments(data, replyTo, subject, methodType); - executeOnRequestResource(countDownLatch, subject, replyTo, arguments); - countDownLatch.await(); + executeOnRequestResource(subject, replyTo, arguments); } catch (BError bError) { if (getAttachedFunctionType(serviceObject, Constants.ON_ERROR_RESOURCE) != null) { - executeOnErrorResource(countDownLatch, subject, replyTo, data, bError); + executeOnErrorResource(subject, replyTo, data, bError); } } } @@ -142,14 +136,12 @@ private void dispatchOnRequest(String subject, String replyTo, byte[] data) */ private void dispatchOnMessage(String subject, String replyTo, byte[] data) throws InterruptedException { MethodType methodType = getAttachedFunctionType(this.serviceObject, Constants.ON_MESSAGE_RESOURCE); - CountDownLatch countDownLatch = new CountDownLatch(1); try { Object[] arguments = getResourceArguments(data, replyTo, subject, methodType); - executeOnMessageResource(countDownLatch, subject, replyTo, arguments); - countDownLatch.await(); + executeOnMessageResource(subject, replyTo, arguments); } catch (BError bError) { if (getAttachedFunctionType(serviceObject, Constants.ON_ERROR_RESOURCE) != null) { - executeOnErrorResource(countDownLatch, subject, replyTo, data, bError); + executeOnErrorResource(subject, replyTo, data, bError); } } } @@ -167,16 +159,14 @@ private static MethodType getAttachedFunctionType(BObject serviceObject, String return function; } - private void executeOnRequestResource(CountDownLatch countDownLatch, String subject, - String replyTo, Object... args) { - StrandMetadata metadata = new StrandMetadata(getModule().getOrg(), getModule().getName(), - getModule().getVersion(), Constants.ON_REQUEST_RESOURCE); - executeResource(Constants.ON_REQUEST_RESOURCE, new ResponseCallback(countDownLatch, subject, - natsMetricsReporter, replyTo, this.natsConnection), metadata, PredefinedTypes.TYPE_ANYDATA, + private void executeOnRequestResource(String subject, String replyTo, + Object... args) { + executeResource(Constants.ON_REQUEST_RESOURCE, new ResponseHandler(subject, + natsMetricsReporter, replyTo, this.natsConnection), PredefinedTypes.TYPE_ANYDATA, subject, args); } - private void executeOnErrorResource(CountDownLatch countDownLatch, String subject, String replyTo, byte[] data, + private void executeOnErrorResource(String subject, String replyTo, byte[] data, BError bError) { BMap msgObj; BArray msgData = ValueCreator.createArrayValue(data); @@ -188,48 +178,41 @@ private void executeOnErrorResource(CountDownLatch countDownLatch, String subjec } msgObj = ValueCreator.createReadonlyRecordValue(getModule(), Constants.NATS_MESSAGE_OBJ_NAME, valueMap); - StrandMetadata metadata = new StrandMetadata(getModule().getOrg(), getModule().getName(), - getModule().getVersion(), Constants.ON_ERROR_RESOURCE); - runtime.invokeMethodAsyncSequentially(serviceObject, Constants.ON_ERROR_RESOURCE, null, metadata, - new ResponseCallback(countDownLatch, subject, natsMetricsReporter), null, - PredefinedTypes.TYPE_NULL, msgObj, true, bError, true); + ResponseHandler handler = new ResponseHandler(subject, natsMetricsReporter); + try { + Object result = runtime.callMethod(serviceObject, Constants.ON_ERROR_RESOURCE, null, + msgObj, bError); + handler.notifySuccess(result); + } catch (BError bError1) { + handler.notifyFailure(bError1); + } } - private void executeOnMessageResource(CountDownLatch countDownLatch, String subject, + private void executeOnMessageResource(String subject, String replyTo, Object... args) { - StrandMetadata metadata = new StrandMetadata(getModule().getOrg(), getModule().getName(), - getModule().getVersion(), Constants.ON_MESSAGE_RESOURCE); - executeResource(Constants.ON_MESSAGE_RESOURCE, new ResponseCallback(countDownLatch, subject, - natsMetricsReporter), metadata, PredefinedTypes.TYPE_NULL, + executeResource(Constants.ON_MESSAGE_RESOURCE, new ResponseHandler(subject, + natsMetricsReporter), PredefinedTypes.TYPE_NULL, replyTo, args); } - private void executeResource(String function, Callback callback, - StrandMetadata metadata, Type returnType, String subject, Object... args) { + private void executeResource(String function, Handler callback, Type returnType, String subject, Object... args) { ObjectType objectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(serviceObject)); - if (ObserveUtils.isTracingEnabled()) { - Map properties = new HashMap<>(); - NatsObserverContext observerContext = new NatsObserverContext( - NatsObservabilityConstants.CONTEXT_CONSUMER, connectedUrl, subject); - properties.put(ObservabilityConstants.KEY_OBSERVER_CONTEXT, observerContext); - if (objectType.isIsolated() && - objectType.isIsolated(function)) { - runtime.invokeMethodAsyncConcurrently(serviceObject, function, null, metadata, - callback, properties, returnType, args); - } else { - runtime.invokeMethodAsyncSequentially(serviceObject, function, null, metadata, - callback, properties, returnType, args); + Thread.startVirtualThread(() -> { + Map properties = Utils.getProperties(function); + if (ObserveUtils.isTracingEnabled()) { + NatsObserverContext observerContext = new NatsObserverContext( + NatsObservabilityConstants.CONTEXT_CONSUMER, connectedUrl, subject); + properties.put(ObservabilityConstants.KEY_OBSERVER_CONTEXT, observerContext); } - } else { - if (objectType.isIsolated() && - objectType.isIsolated(function)) { - runtime.invokeMethodAsyncConcurrently(serviceObject, function, null, metadata, - callback, null, returnType, args); - } else { - runtime.invokeMethodAsyncSequentially(serviceObject, function, null, metadata, - callback, null, returnType, args); + boolean isConcurrentSafe = objectType.isIsolated() && objectType.isIsolated(function); + try { + Object result = runtime.callMethod(serviceObject, function, new StrandMetadata(isConcurrentSafe, + properties), args); + callback.notifySuccess(result); + } catch (BError bError) { + callback.notifyFailure(bError); } - } + }); } private Object[] getResourceArguments(byte[] message, String replyTo, String subject, MethodType remoteFunction) { @@ -237,7 +220,7 @@ private Object[] getResourceArguments(byte[] message, String replyTo, String sub boolean messageExists = false; boolean payloadExists = false; boolean constraintValidation = (boolean) listenerObj.getNativeData(CONSTRAINT_VALIDATION); - Object[] arguments = new Object[parameters.length * 2]; + Object[] arguments = new Object[parameters.length]; int index = 0; for (Parameter parameter : parameters) { Type referredType = getReferredType(parameter.type); @@ -267,7 +250,6 @@ private Object[] getResourceArguments(byte[] message, String replyTo, String sub arguments[index++] = value; break; } - arguments[index++] = true; } return arguments; } @@ -285,17 +267,13 @@ private boolean isMessageType(Parameter parameter, BMap annotat private boolean invokeIsAnydataMessageTypeMethod(Type paramType) { BObject client = ValueCreator.createObjectValue(getModule(), TYPE_CHECKER_OBJECT_NAME); - Semaphore sem = new Semaphore(0); - NatsTypeCheckCallback messageTypeCheckCallback = new NatsTypeCheckCallback(sem); - StrandMetadata metadata = new StrandMetadata(ORG_NAME, NATS, - getModule().getVersion(), IS_ANYDATA_MESSAGE); - runtime.invokeMethodAsyncSequentially(client, IS_ANYDATA_MESSAGE, null, metadata, - messageTypeCheckCallback, null, PredefinedTypes.TYPE_BOOLEAN, - ValueCreator.createTypedescValue(paramType), true); + NatsTypeCheckHandler messageTypeCheckCallback = new NatsTypeCheckHandler(); try { - sem.acquire(); - } catch (InterruptedException e) { - throw Utils.createNatsError(e.getMessage()); + Object result = runtime.callMethod(client, IS_ANYDATA_MESSAGE, null, + ValueCreator.createTypedescValue(paramType)); + messageTypeCheckCallback.notifySuccess(result); + } catch (BError bError) { + messageTypeCheckCallback.notifyFailure(bError); } return messageTypeCheckCallback.getIsMessageType(); } @@ -350,22 +328,19 @@ private static Type getPayloadType(Type definedType) { /** * Represents the callback which will be triggered upon submitting to resource. */ - public static class ResponseCallback implements Callback { - private final CountDownLatch countDownLatch; + public static class ResponseHandler implements Handler { private final String subject; private final NatsMetricsReporter natsMetricsReporter; private String replyTo; private Connection natsConnection; - ResponseCallback(CountDownLatch countDownLatch, String subject, NatsMetricsReporter natsMetricsReporter) { - this.countDownLatch = countDownLatch; + ResponseHandler(String subject, NatsMetricsReporter natsMetricsReporter) { this.subject = subject; this.natsMetricsReporter = natsMetricsReporter; } - ResponseCallback(CountDownLatch countDownLatch, String subject, NatsMetricsReporter natsMetricsReporter, + ResponseHandler(String subject, NatsMetricsReporter natsMetricsReporter, String replyTo, Connection natsConnection) { - this.countDownLatch = countDownLatch; this.subject = subject; this.natsMetricsReporter = natsMetricsReporter; this.replyTo = replyTo; @@ -383,7 +358,6 @@ public void notifySuccess(Object obj) { natsConnection.publish(replyTo, Utils.convertDataIntoByteArray(obj, TypeUtils.getType(obj))); } natsMetricsReporter.reportDelivery(subject); - countDownLatch.countDown(); } /** @@ -393,12 +367,10 @@ public void notifySuccess(Object obj) { public void notifyFailure(BError error) { error.printStackTrace(); natsMetricsReporter.reportConsumerError(subject, NatsObservabilityConstants.ERROR_TYPE_MSG_RECEIVED); - countDownLatch.countDown(); // Service level `panic` is captured in this method. // Since, `panic` is due to a critical application bug or resource exhaustion // we need to exit the application. // Please refer: https://github.com/ballerina-platform/ballerina-standard-library/issues/2714 - System.exit(1); } } @@ -406,30 +378,21 @@ public void notifyFailure(BError error) { * {@code NatsTypeCheckCallback} provides ability to check whether a given type is a subtype of * nats:AnydataMessage. */ - public static class NatsTypeCheckCallback implements Callback { - - private final Semaphore semaphore; + public static class NatsTypeCheckHandler implements Handler { private Boolean isMessageType = false; - NatsTypeCheckCallback(Semaphore semaphore) { - this.semaphore = semaphore; - } - @Override public void notifySuccess(Object obj) { isMessageType = (Boolean) obj; - semaphore.release(); } @Override public void notifyFailure(BError error) { - semaphore.release(); error.printStackTrace(); // Service level `panic` is captured in this method. // Since, `panic` is due to a critical application bug or resource exhaustion we need // to exit the application. // Please refer: https://github.com/ballerina-platform/ballerina-standard-library/issues/2714 - System.exit(1); } public boolean getIsMessageType() { diff --git a/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/Detach.java b/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/Detach.java index c7a1a13d..07a33eda 100644 --- a/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/Detach.java +++ b/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/Detach.java @@ -56,7 +56,7 @@ public static Object detach(BObject listener, BObject service) { .getSubscriptionConfig(serviceType.getAnnotation( StringUtils.fromString(Utils.getModule().getOrg() + ORG_NAME_SEPARATOR + Utils.getModule().getName() + VERSION_SEPARATOR + - Utils.getModule().getVersion() + ":" + + Utils.getModule().getMajorVersion() + ":" + Constants.SUBSCRIPTION_CONFIG))); String serviceName = (String) service.getNativeData(Constants.SERVICE_NAME); String subject; diff --git a/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/Register.java b/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/Register.java index cac391a1..659c0fe7 100644 --- a/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/Register.java +++ b/native/src/main/java/io/ballerina/stdlib/nats/basic/consumer/Register.java @@ -20,8 +20,8 @@ import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Runtime; -import io.ballerina.runtime.api.TypeTags; import io.ballerina.runtime.api.types.AnnotatableType; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BMap; @@ -61,7 +61,7 @@ public static Object attach(Environment env, BObject listenerObject, BObject ser .getAnnotation(StringUtils.fromString( Utils.getModule().getOrg() + ORG_NAME_SEPARATOR + Utils.getModule().getName() + VERSION_SEPARATOR + - Utils.getModule().getVersion() + ":" + + Utils.getModule().getMajorVersion() + ":" + Constants.SUBSCRIPTION_CONFIG))); String queueName = null; String subject; diff --git a/native/src/main/java/io/ballerina/stdlib/nats/connection/ConnectionUtils.java b/native/src/main/java/io/ballerina/stdlib/nats/connection/ConnectionUtils.java index bef2d146..5304abcb 100644 --- a/native/src/main/java/io/ballerina/stdlib/nats/connection/ConnectionUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/nats/connection/ConnectionUtils.java @@ -18,7 +18,7 @@ package io.ballerina.stdlib.nats.connection; -import io.ballerina.runtime.api.TypeTags; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BArray; diff --git a/native/src/main/java/io/ballerina/stdlib/nats/jetstream/client/ManagementUtils.java b/native/src/main/java/io/ballerina/stdlib/nats/jetstream/client/ManagementUtils.java index f0d26939..3e65abe4 100644 --- a/native/src/main/java/io/ballerina/stdlib/nats/jetstream/client/ManagementUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/nats/jetstream/client/ManagementUtils.java @@ -18,7 +18,7 @@ package io.ballerina.stdlib.nats.jetstream.client; -import io.ballerina.runtime.api.TypeTags; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BArray; diff --git a/native/src/main/java/io/ballerina/stdlib/nats/jetstream/listener/ListenerUtils.java b/native/src/main/java/io/ballerina/stdlib/nats/jetstream/listener/ListenerUtils.java index 51a275c9..230c609b 100644 --- a/native/src/main/java/io/ballerina/stdlib/nats/jetstream/listener/ListenerUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/nats/jetstream/listener/ListenerUtils.java @@ -20,8 +20,8 @@ import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Runtime; -import io.ballerina.runtime.api.TypeTags; import io.ballerina.runtime.api.types.AnnotatableType; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BMap; @@ -100,7 +100,7 @@ public static Object attach(Environment env, BObject listenerObject, BObject ser .getAnnotation(StringUtils.fromString( Utils.getModule().getOrg() + ORG_NAME_SEPARATOR + Utils.getModule().getName() + VERSION_SEPARATOR + - Utils.getModule().getVersion() + ":" + + Utils.getModule().getMajorVersion() + ":" + Constants.STREAM_SUBSCRIPTION_CONFIG))); String queueName = null; String subject; @@ -158,7 +158,7 @@ public static Object detach(BObject listener, BObject service) { BMap subscriptionConfig = Utils.getSubscriptionConfig(((AnnotatableType) TypeUtils.getType(service)).getAnnotation(StringUtils.fromString(Utils.getModule().getOrg() + ORG_NAME_SEPARATOR + Utils.getModule().getName() + VERSION_SEPARATOR + - Utils.getModule().getVersion() + ":" + Constants.STREAM_SUBSCRIPTION_CONFIG))); + Utils.getModule().getMajorVersion() + ":" + Constants.STREAM_SUBSCRIPTION_CONFIG))); String serviceName = (String) service.getNativeData(Constants.SERVICE_NAME); String subject; if (subscriptionConfig == null) { diff --git a/native/src/main/java/io/ballerina/stdlib/nats/jetstream/listener/StreamMessageHandler.java b/native/src/main/java/io/ballerina/stdlib/nats/jetstream/listener/StreamMessageHandler.java index 6688496e..e0b964d2 100644 --- a/native/src/main/java/io/ballerina/stdlib/nats/jetstream/listener/StreamMessageHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/nats/jetstream/listener/StreamMessageHandler.java @@ -19,13 +19,13 @@ package io.ballerina.stdlib.nats.jetstream.listener; import io.ballerina.runtime.api.Runtime; -import io.ballerina.runtime.api.TypeTags; -import io.ballerina.runtime.api.async.Callback; -import io.ballerina.runtime.api.async.StrandMetadata; +import io.ballerina.runtime.api.concurrent.StrandMetadata; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.MethodType; import io.ballerina.runtime.api.types.ObjectType; +import io.ballerina.runtime.api.types.Parameter; import io.ballerina.runtime.api.types.Type; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BError; @@ -43,7 +43,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CountDownLatch; /** * Handles incoming message for a given subscription. @@ -81,26 +80,23 @@ public void onMessage(Message msg) { MethodType onMessageResource = getAttachedFunctionType(service, "onMessage"); Type returnType = onMessageResource.getReturnType(); - Type[] parameterTypes = onMessageResource.getParameterTypes(); - if (parameterTypes.length == 1) { - Object[] args1 = new Object[2]; - if (parameterTypes[0].getTag() == TypeTags.INTERSECTION_TAG) { + Parameter[] parameters = onMessageResource.getParameters(); + if (parameters.length == 1) { + Object[] args1 = new Object[1]; + if (parameters[0].type.getTag() == TypeTags.INTERSECTION_TAG) { args1[0] = getReadonlyMessage(msg); } else { args1[0] = populatedMsgRecord; } - args1[1] = true; dispatch(args1, msg.getSubject(), returnType); - } else if (parameterTypes.length == 2) { - Object[] args2 = new Object[4]; - if (parameterTypes[0].getTag() == TypeTags.INTERSECTION_TAG) { + } else if (parameters.length == 2) { + Object[] args2 = new Object[2]; + if (parameters[0].type.getTag() == TypeTags.INTERSECTION_TAG) { args2[0] = getReadonlyMessage(msg); } else { args2[0] = populatedMsgRecord; } - args2[1] = true; - args2[2] = callerObj; - args2[3] = true; + args2[1] = callerObj; dispatch(args2, msg.getSubject(), returnType); } else { throw Utils.createNatsError("Invalid remote function signature."); @@ -119,56 +115,28 @@ private void dispatch(Object[] args, String subject, Type returnType) { } private void executeResource(String subject, Object[] args, Type returnType) { - CountDownLatch countDownLatch = new CountDownLatch(1); ObjectType objectType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(service)); - StrandMetadata metadata = new StrandMetadata(Utils.getModule().getOrg(), Utils.getModule().getName(), - Utils.getModule().getVersion(), Constants.ON_MESSAGE_RESOURCE); - Map properties; + Map properties = null; if (ObserveUtils.isTracingEnabled()) { properties = new HashMap<>(); NatsObserverContext observerContext = new NatsObserverContext(NatsObservabilityConstants.CONTEXT_CONSUMER, connectedUrl, subject); properties.put(ObservabilityConstants.KEY_OBSERVER_CONTEXT, observerContext); } - if (objectType.isIsolated() && objectType.isIsolated(Constants.ON_MESSAGE_RESOURCE)) { - runtime.invokeMethodAsyncConcurrently(service, Constants.ON_MESSAGE_RESOURCE, null, metadata, - new DispatcherCallback(connectedUrl, subject, countDownLatch), null, returnType, args); - } else { - runtime.invokeMethodAsyncSequentially(service, Constants.ON_MESSAGE_RESOURCE, null, metadata, - new DispatcherCallback(connectedUrl, subject, countDownLatch), null, returnType, args); - } try { - countDownLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw Utils.createNatsError("The current thread got interrupted.", e); - } - } - - private static class DispatcherCallback implements Callback { - private final CountDownLatch countDownLatch; - - public DispatcherCallback(String url, String subject, CountDownLatch countDownLatch) { - this.countDownLatch = countDownLatch; - } - - @Override - public void notifySuccess(Object obj) { - if (obj instanceof BError) { - ((BError) obj).printStackTrace(); + boolean isConcurrentSafe = objectType.isIsolated() && + objectType.isIsolated(Constants.ON_MESSAGE_RESOURCE); + Object result = runtime.callMethod(service, Constants.ON_MESSAGE_RESOURCE, + new StrandMetadata(isConcurrentSafe, properties), args); + if (result instanceof BError) { + ((BError) result).printStackTrace(); } - countDownLatch.countDown(); - } - - @Override - public void notifyFailure(BError error) { - error.printStackTrace(); - countDownLatch.countDown(); + } catch (BError bError) { + bError.printStackTrace(); // Service level `panic` is captured in this method. // Since, `panic` is due to a critical application bug or resource exhaustion // we need to exit the application. // Please refer: https://github.com/ballerina-platform/ballerina-standard-library/issues/2714 - System.exit(1); } } diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml index 329e5aeb..56875cd5 100644 --- a/spotbugs-exclude.xml +++ b/spotbugs-exclude.xml @@ -45,7 +45,7 @@ - + @@ -64,4 +64,8 @@ + + + +