Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread-pool implementation to read byte stream #3

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* under the License.
*/

package io.ballerina.stdlib.data.xmldata.xml;
package io.ballerina.stdlib.data.xmldata.io;

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.async.Callback;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.ballerina.stdlib.data.xmldata.io;

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.types.MethodType;
import io.ballerina.runtime.api.types.ObjectType;
import io.ballerina.runtime.api.utils.TypeUtils;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticErrorCode;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticLog;
import io.ballerina.stdlib.data.xmldata.xml.XmlParser;

import java.io.InputStreamReader;
import java.util.function.Consumer;

/**
* This class will read data from a Ballerina Stream of byte blocks, in non-blocking manner.
*
* @since 0.1.0
*/
public class DataReaderTask implements Runnable {

private static final String METHOD_NAME_NEXT = "next";
private static final String METHOD_NAME_CLOSE = "close";

private final Environment env;
private final BObject iteratorObj;
private final Future future;
private final BTypedesc typed;

public DataReaderTask(Environment env, BObject iteratorObj, Future future, BTypedesc typed) {
this.env = env;
this.iteratorObj = iteratorObj;
this.future = future;
this.typed = typed;
}

static MethodType resolveNextMethod(BObject iterator) {
MethodType method = getMethodType(iterator, METHOD_NAME_NEXT);
if (method != null) {
return method;
}
throw new IllegalStateException("next method not found in the iterator object");
}

static MethodType resolveCloseMethod(BObject iterator) {
return getMethodType(iterator, METHOD_NAME_CLOSE);
}

private static MethodType getMethodType(BObject iterator, String methodName) {
ObjectType objectType = (ObjectType) TypeUtils.getReferredType(iterator.getOriginalType());
MethodType[] methods = objectType.getMethods();
// Assumes compile-time validation of the iterator object
for (MethodType method : methods) {
if (method.getName().equals(methodName)) {
return method;
}
}
return null;
}

@Override
public void run() {
DataReaderTask.ResultConsumer<Object> resultConsumer = new DataReaderTask.ResultConsumer<>(future);
try (var byteBlockSteam = new BallerinaByteBlockInputStream(env, iteratorObj, resolveNextMethod(iteratorObj),
resolveCloseMethod(iteratorObj), resultConsumer)) {
Object result = XmlParser.parse(new InputStreamReader(byteBlockSteam), typed.getDescribingType());
future.complete(result);
} catch (Exception e) {
future.complete(DiagnosticLog.error(DiagnosticErrorCode.STREAM_BROKEN, e.getMessage()));
}
}

/**
* This class will hold module related utility functions.
*
* @param <T> The type of the result
* @param future The future to complete
* @since 0.1.0
*/
public record ResultConsumer<T>(Future future) implements Consumer<T> {

@Override
public void accept(T t) {
future.complete(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.ballerina.stdlib.data.xmldata.io;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Thread pool for data reader.
*
* @since 0.1.0
*/
public class DataReaderThreadPool {

// TODO : Make this configurable, in Ballerina Library.
public static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we make this 0, 50, 60L as constants. Because if we need to change these things in future, it will be easier

new SynchronousQueue<>(),
new DataThreadFactory());

/**
* Thread factory for data reader.
*/
static class DataThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable runnable) {
Thread balThread = new Thread(runnable);
balThread.setName("bal-data-xmldata-thread");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is along with the repo name, better to make it a constant as well.

return balThread;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,21 @@

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.types.MethodType;
import io.ballerina.runtime.api.types.ObjectType;
import io.ballerina.runtime.api.utils.TypeUtils;
import io.ballerina.runtime.api.values.BArray;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BStream;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.runtime.api.values.BXml;
import io.ballerina.stdlib.data.xmldata.io.DataReaderTask;
import io.ballerina.stdlib.data.xmldata.io.DataReaderThreadPool;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticErrorCode;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticLog;

import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.util.function.Consumer;

/**
* Xml conversion.
Expand All @@ -45,9 +43,6 @@
*/
public class Native {

private static final String METHOD_NAME_NEXT = "next";
private static final String METHOD_NAME_CLOSE = "close";

public static Object fromXmlWithType(BXml xml, BMap<BString, Object> options, BTypedesc typed) {
try {
return XmlTraversal.traverse(xml, typed.getDescribingType());
Expand All @@ -68,18 +63,9 @@ public static Object fromXmlStringWithType(Environment env, Object xml, BMap<BSt
} else if (xml instanceof BStream) {
final BObject iteratorObj = ((BStream) xml).getIteratorObj();
final Future future = env.markAsync();
ResultConsumer<Object> resultConsumer = new ResultConsumer<>(future);
try (var byteBlockSteam = new BallerinaByteBlockInputStream(env, iteratorObj,
resolveNextMethod(iteratorObj),
resolveCloseMethod(iteratorObj),
resultConsumer)) {
Object result = XmlParser.parse(new InputStreamReader(byteBlockSteam), typed.getDescribingType());
future.complete(result);
return null;
} catch (Exception e) {
future.complete(DiagnosticLog.error(DiagnosticErrorCode.STREAM_BROKEN, e.getMessage()));
return null;
}
DataReaderTask task = new DataReaderTask(env, iteratorObj, future, typed);
DataReaderThreadPool.EXECUTOR_SERVICE.submit(task);
return null;
} else {
return DiagnosticLog.error(DiagnosticErrorCode.UNSUPPORTED_TYPE);
}
Expand All @@ -88,42 +74,4 @@ public static Object fromXmlStringWithType(Environment env, Object xml, BMap<BSt
}
}

static MethodType resolveNextMethod(BObject iterator) {
MethodType method = getMethodType(iterator, METHOD_NAME_NEXT);
if (method != null) {
return method;
}
throw new IllegalStateException("next method not found in the iterator object");
}

static MethodType resolveCloseMethod(BObject iterator) {
return getMethodType(iterator, METHOD_NAME_CLOSE);
}

private static MethodType getMethodType(BObject iterator, String methodNameClose) {
ObjectType objectType = (ObjectType) TypeUtils.getReferredType(iterator.getOriginalType());
MethodType[] methods = objectType.getMethods();
// Assumes compile-time validation of the iterator object
for (MethodType method : methods) {
if (method.getName().equals(methodNameClose)) {
return method;
}
}
return null;
}

/**
* This class will hold module related utility functions.
*
* @param <T> The type of the result
* @param future The future to complete
* @since 0.1.0
*/
private record ResultConsumer<T>(Future future) implements Consumer<T> {

@Override
public void accept(T t) {
future.complete(t);
}
}
}
Loading