Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread-pool implementation to read byte stream #3

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* under the License.
*/

package io.ballerina.stdlib.data.xmldata.xml;
package io.ballerina.stdlib.data.xmldata.io;

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.async.Callback;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.ballerina.stdlib.data.xmldata.io;

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.types.MethodType;
import io.ballerina.runtime.api.types.ObjectType;
import io.ballerina.runtime.api.utils.TypeUtils;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticErrorCode;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticLog;
import io.ballerina.stdlib.data.xmldata.xml.XmlParser;

import java.io.InputStreamReader;
import java.util.function.Consumer;

/**
* This class will read data from a Ballerina Stream of byte blocks, in non-blocking manner.
*
* @since 0.1.0
*/
public class DataReaderTask implements Runnable {

private static final String METHOD_NAME_NEXT = "next";
private static final String METHOD_NAME_CLOSE = "close";

private final Environment env;
private final BObject iteratorObj;
private final Future future;
private final BTypedesc typed;

public DataReaderTask(Environment env, BObject iteratorObj, Future future, BTypedesc typed) {
this.env = env;
this.iteratorObj = iteratorObj;
this.future = future;
this.typed = typed;
}

static MethodType resolveNextMethod(BObject iterator) {
MethodType method = getMethodType(iterator, METHOD_NAME_NEXT);
if (method != null) {
return method;
}
throw new IllegalStateException("next method not found in the iterator object");

Check warning on line 61 in native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderTask.java

View check run for this annotation

Codecov / codecov/patch

native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderTask.java#L61

Added line #L61 was not covered by tests
}

static MethodType resolveCloseMethod(BObject iterator) {
return getMethodType(iterator, METHOD_NAME_CLOSE);
}

private static MethodType getMethodType(BObject iterator, String methodName) {
ObjectType objectType = (ObjectType) TypeUtils.getReferredType(iterator.getOriginalType());
MethodType[] methods = objectType.getMethods();
// Assumes compile-time validation of the iterator object
for (MethodType method : methods) {
if (method.getName().equals(methodName)) {
return method;
}
}
return null;

Check warning on line 77 in native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderTask.java

View check run for this annotation

Codecov / codecov/patch

native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderTask.java#L77

Added line #L77 was not covered by tests
}

@Override
public void run() {
DataReaderTask.ResultConsumer<Object> resultConsumer = new DataReaderTask.ResultConsumer<>(future);
try (var byteBlockSteam = new BallerinaByteBlockInputStream(env, iteratorObj, resolveNextMethod(iteratorObj),
resolveCloseMethod(iteratorObj), resultConsumer)) {
Object result = XmlParser.parse(new InputStreamReader(byteBlockSteam), typed.getDescribingType());
future.complete(result);
} catch (Exception e) {
future.complete(DiagnosticLog.error(DiagnosticErrorCode.STREAM_BROKEN, e.getMessage()));

Check warning on line 88 in native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderTask.java

View check run for this annotation

Codecov / codecov/patch

native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderTask.java#L87-L88

Added lines #L87 - L88 were not covered by tests
}
}

/**
* This class will hold module related utility functions.
*
* @param <T> The type of the result
* @param future The future to complete
* @since 0.1.0
*/
public record ResultConsumer<T>(Future future) implements Consumer<T> {

@Override
public void accept(T t) {
future.complete(t);
}

Check warning on line 104 in native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderTask.java

View check run for this annotation

Codecov / codecov/patch

native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderTask.java#L103-L104

Added lines #L103 - L104 were not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.ballerina.stdlib.data.xmldata.io;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Thread pool for data reader.
*
* @since 0.1.0
*/
public class DataReaderThreadPool {

Check warning on line 31 in native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderThreadPool.java

View check run for this annotation

Codecov / codecov/patch

native/src/main/java/io/ballerina/stdlib/data/xmldata/io/DataReaderThreadPool.java#L31

Added line #L31 was not covered by tests

// TODO : Make this configurable, in Ballerina Library.
public static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new DataThreadFactory());

/**
* Thread factory for data reader.
*/
static class DataThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable runnable) {
Thread balThread = new Thread(runnable);
balThread.setName("bal-data-xmldata-thread");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is along with the repo name, better to make it a constant as well.

return balThread;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,21 @@

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.types.MethodType;
import io.ballerina.runtime.api.types.ObjectType;
import io.ballerina.runtime.api.utils.TypeUtils;
import io.ballerina.runtime.api.values.BArray;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BStream;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.runtime.api.values.BXml;
import io.ballerina.stdlib.data.xmldata.io.DataReaderTask;
import io.ballerina.stdlib.data.xmldata.io.DataReaderThreadPool;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticErrorCode;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticLog;

import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.util.function.Consumer;

/**
* Xml conversion.
Expand All @@ -45,9 +43,6 @@
*/
public class Native {

private static final String METHOD_NAME_NEXT = "next";
private static final String METHOD_NAME_CLOSE = "close";

public static Object fromXmlWithType(BXml xml, BMap<BString, Object> options, BTypedesc typed) {
try {
return XmlTraversal.traverse(xml, typed.getDescribingType());
Expand All @@ -68,18 +63,9 @@ public static Object fromXmlStringWithType(Environment env, Object xml, BMap<BSt
} else if (xml instanceof BStream) {
final BObject iteratorObj = ((BStream) xml).getIteratorObj();
final Future future = env.markAsync();
ResultConsumer<Object> resultConsumer = new ResultConsumer<>(future);
try (var byteBlockSteam = new BallerinaByteBlockInputStream(env, iteratorObj,
resolveNextMethod(iteratorObj),
resolveCloseMethod(iteratorObj),
resultConsumer)) {
Object result = XmlParser.parse(new InputStreamReader(byteBlockSteam), typed.getDescribingType());
future.complete(result);
return null;
} catch (Exception e) {
future.complete(DiagnosticLog.error(DiagnosticErrorCode.STREAM_BROKEN, e.getMessage()));
return null;
}
DataReaderTask task = new DataReaderTask(env, iteratorObj, future, typed);
DataReaderThreadPool.EXECUTOR_SERVICE.submit(task);
return null;
} else {
return DiagnosticLog.error(DiagnosticErrorCode.UNSUPPORTED_TYPE);
}
Expand All @@ -88,42 +74,4 @@ public static Object fromXmlStringWithType(Environment env, Object xml, BMap<BSt
}
}

static MethodType resolveNextMethod(BObject iterator) {
MethodType method = getMethodType(iterator, METHOD_NAME_NEXT);
if (method != null) {
return method;
}
throw new IllegalStateException("next method not found in the iterator object");
}

static MethodType resolveCloseMethod(BObject iterator) {
return getMethodType(iterator, METHOD_NAME_CLOSE);
}

private static MethodType getMethodType(BObject iterator, String methodNameClose) {
ObjectType objectType = (ObjectType) TypeUtils.getReferredType(iterator.getOriginalType());
MethodType[] methods = objectType.getMethods();
// Assumes compile-time validation of the iterator object
for (MethodType method : methods) {
if (method.getName().equals(methodNameClose)) {
return method;
}
}
return null;
}

/**
* This class will hold module related utility functions.
*
* @param <T> The type of the result
* @param future The future to complete
* @since 0.1.0
*/
private record ResultConsumer<T>(Future future) implements Consumer<T> {

@Override
public void accept(T t) {
future.complete(t);
}
}
}
Loading