diff --git a/native/src/main/java/io/ballerina/stdlib/data/xmldata/xml/BallerinaByteBlockInputStream.java b/native/src/main/java/io/ballerina/stdlib/data/xmldata/io/BallerinaByteBlockInputStream.java similarity index 99% rename from native/src/main/java/io/ballerina/stdlib/data/xmldata/xml/BallerinaByteBlockInputStream.java rename to native/src/main/java/io/ballerina/stdlib/data/xmldata/io/BallerinaByteBlockInputStream.java index 638f238..aee560e 100644 --- a/native/src/main/java/io/ballerina/stdlib/data/xmldata/xml/BallerinaByteBlockInputStream.java +++ b/native/src/main/java/io/ballerina/stdlib/data/xmldata/io/BallerinaByteBlockInputStream.java @@ -16,7 +16,7 @@ * under the License. */ -package io.ballerina.stdlib.data.xmldata.xml; +package io.ballerina.stdlib.data.xmldata.io; import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.async.Callback; diff --git a/native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderTask.java b/native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderTask.java new file mode 100644 index 0000000..81b7118 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderTask.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.ballerina.stdlib.data.xmldata.io; + +import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.Future; +import io.ballerina.runtime.api.types.MethodType; +import io.ballerina.runtime.api.types.ObjectType; +import io.ballerina.runtime.api.utils.TypeUtils; +import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BTypedesc; +import io.ballerina.stdlib.data.xmldata.utils.DiagnosticErrorCode; +import io.ballerina.stdlib.data.xmldata.utils.DiagnosticLog; +import io.ballerina.stdlib.data.xmldata.xml.XmlParser; + +import java.io.InputStreamReader; +import java.util.function.Consumer; + +/** + * This class will read data from a Ballerina Stream of byte blocks, in non-blocking manner. + * + * @since 0.1.0 + */ +public class DataReaderTask implements Runnable { + + private static final String METHOD_NAME_NEXT = "next"; + private static final String METHOD_NAME_CLOSE = "close"; + + private final Environment env; + private final BObject iteratorObj; + private final Future future; + private final BTypedesc typed; + + public DataReaderTask(Environment env, BObject iteratorObj, Future future, BTypedesc typed) { + this.env = env; + this.iteratorObj = iteratorObj; + this.future = future; + this.typed = typed; + } + + static MethodType resolveNextMethod(BObject iterator) { + MethodType method = getMethodType(iterator, METHOD_NAME_NEXT); + if (method != null) { + return method; + } + throw new IllegalStateException("next method not found in the iterator object"); + } + + static MethodType resolveCloseMethod(BObject iterator) { + return getMethodType(iterator, METHOD_NAME_CLOSE); + } + + private static MethodType getMethodType(BObject iterator, String methodName) { + ObjectType objectType = (ObjectType) TypeUtils.getReferredType(iterator.getOriginalType()); + MethodType[] methods = objectType.getMethods(); + // Assumes compile-time validation of the iterator object + for (MethodType method : methods) { + if (method.getName().equals(methodName)) { + return method; + } + } + return null; + } + + @Override + public void run() { + DataReaderTask.ResultConsumer resultConsumer = new DataReaderTask.ResultConsumer<>(future); + try (var byteBlockSteam = new BallerinaByteBlockInputStream(env, iteratorObj, resolveNextMethod(iteratorObj), + resolveCloseMethod(iteratorObj), resultConsumer)) { + Object result = XmlParser.parse(new InputStreamReader(byteBlockSteam), typed.getDescribingType()); + future.complete(result); + } catch (Exception e) { + future.complete(DiagnosticLog.error(DiagnosticErrorCode.STREAM_BROKEN, e.getMessage())); + } + } + + /** + * This class will hold module related utility functions. + * + * @param The type of the result + * @param future The future to complete + * @since 0.1.0 + */ + public record ResultConsumer(Future future) implements Consumer { + + @Override + public void accept(T t) { + future.complete(t); + } + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderThreadPool.java b/native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderThreadPool.java new file mode 100644 index 0000000..9a12096 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderThreadPool.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.ballerina.stdlib.data.xmldata.io; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Thread pool for data reader. + * + * @since 0.1.0 + */ +public class DataReaderThreadPool { + + // TODO : Make this configurable, in Ballerina Library. + public static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new DataThreadFactory()); + + /** + * Thread factory for data reader. + */ + static class DataThreadFactory implements ThreadFactory { + + @Override + public Thread newThread(Runnable runnable) { + Thread balThread = new Thread(runnable); + balThread.setName("bal-data-xmldata-thread"); + return balThread; + } + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/data/xmldata/xml/Native.java b/native/src/main/java/io/ballerina/stdlib/data/xmldata/xml/Native.java index 2d2b101..3616447 100644 --- a/native/src/main/java/io/ballerina/stdlib/data/xmldata/xml/Native.java +++ b/native/src/main/java/io/ballerina/stdlib/data/xmldata/xml/Native.java @@ -20,9 +20,6 @@ import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Future; -import io.ballerina.runtime.api.types.MethodType; -import io.ballerina.runtime.api.types.ObjectType; -import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BArray; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; @@ -30,13 +27,14 @@ import io.ballerina.runtime.api.values.BString; import io.ballerina.runtime.api.values.BTypedesc; import io.ballerina.runtime.api.values.BXml; +import io.ballerina.stdlib.data.xmldata.io.DataReaderTask; +import io.ballerina.stdlib.data.xmldata.io.DataReaderThreadPool; import io.ballerina.stdlib.data.xmldata.utils.DiagnosticErrorCode; import io.ballerina.stdlib.data.xmldata.utils.DiagnosticLog; import java.io.ByteArrayInputStream; import java.io.InputStreamReader; import java.io.StringReader; -import java.util.function.Consumer; /** * Xml conversion. @@ -45,9 +43,6 @@ */ public class Native { - private static final String METHOD_NAME_NEXT = "next"; - private static final String METHOD_NAME_CLOSE = "close"; - public static Object fromXmlWithType(BXml xml, BMap options, BTypedesc typed) { try { return XmlTraversal.traverse(xml, typed.getDescribingType()); @@ -68,18 +63,9 @@ public static Object fromXmlStringWithType(Environment env, Object xml, BMap resultConsumer = new ResultConsumer<>(future); - try (var byteBlockSteam = new BallerinaByteBlockInputStream(env, iteratorObj, - resolveNextMethod(iteratorObj), - resolveCloseMethod(iteratorObj), - resultConsumer)) { - Object result = XmlParser.parse(new InputStreamReader(byteBlockSteam), typed.getDescribingType()); - future.complete(result); - return null; - } catch (Exception e) { - future.complete(DiagnosticLog.error(DiagnosticErrorCode.STREAM_BROKEN, e.getMessage())); - return null; - } + DataReaderTask task = new DataReaderTask(env, iteratorObj, future, typed); + DataReaderThreadPool.EXECUTOR_SERVICE.submit(task); + return null; } else { return DiagnosticLog.error(DiagnosticErrorCode.UNSUPPORTED_TYPE); } @@ -88,42 +74,4 @@ public static Object fromXmlStringWithType(Environment env, Object xml, BMap The type of the result - * @param future The future to complete - * @since 0.1.0 - */ - private record ResultConsumer(Future future) implements Consumer { - - @Override - public void accept(T t) { - future.complete(t); - } - } }