Skip to content

Commit

Permalink
Merge pull request #7 from prakanth97/threadpool
Browse files Browse the repository at this point in the history
Add thread-pool implementation to read byte stream
  • Loading branch information
hasithaa authored Mar 11, 2024
2 parents 59103a2 + c5c9986 commit 9b8776c
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* under the License.
*/

package io.ballerina.stdlib.data.xmldata.xml;
package io.ballerina.stdlib.data.xmldata.io;

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.async.Callback;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.ballerina.stdlib.data.xmldata.io;

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.types.MethodType;
import io.ballerina.runtime.api.types.ObjectType;
import io.ballerina.runtime.api.utils.TypeUtils;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticErrorCode;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticLog;
import io.ballerina.stdlib.data.xmldata.xml.XmlParser;

import java.io.InputStreamReader;
import java.util.function.Consumer;

/**
* This class will read data from a Ballerina Stream of byte blocks, in non-blocking manner.
*
* @since 0.1.0
*/
public class DataReaderTask implements Runnable {

private static final String METHOD_NAME_NEXT = "next";
private static final String METHOD_NAME_CLOSE = "close";

private final Environment env;
private final BObject iteratorObj;
private final Future future;
private final BTypedesc typed;

public DataReaderTask(Environment env, BObject iteratorObj, Future future, BTypedesc typed) {
this.env = env;
this.iteratorObj = iteratorObj;
this.future = future;
this.typed = typed;
}

static MethodType resolveNextMethod(BObject iterator) {
MethodType method = getMethodType(iterator, METHOD_NAME_NEXT);
if (method != null) {
return method;
}
throw new IllegalStateException("next method not found in the iterator object");
}

static MethodType resolveCloseMethod(BObject iterator) {
return getMethodType(iterator, METHOD_NAME_CLOSE);
}

private static MethodType getMethodType(BObject iterator, String methodName) {
ObjectType objectType = (ObjectType) TypeUtils.getReferredType(iterator.getOriginalType());
MethodType[] methods = objectType.getMethods();
// Assumes compile-time validation of the iterator object
for (MethodType method : methods) {
if (method.getName().equals(methodName)) {
return method;
}
}
return null;
}

@Override
public void run() {
DataReaderTask.ResultConsumer<Object> resultConsumer = new DataReaderTask.ResultConsumer<>(future);
try (var byteBlockSteam = new BallerinaByteBlockInputStream(env, iteratorObj, resolveNextMethod(iteratorObj),
resolveCloseMethod(iteratorObj), resultConsumer)) {
Object result = XmlParser.parse(new InputStreamReader(byteBlockSteam), typed.getDescribingType());
future.complete(result);
} catch (Exception e) {
future.complete(DiagnosticLog.error(DiagnosticErrorCode.STREAM_BROKEN, e.getMessage()));
}
}

/**
* This class will hold module related utility functions.
*
* @param <T> The type of the result
* @param future The future to complete
* @since 0.1.0
*/
public record ResultConsumer<T>(Future future) implements Consumer<T> {

@Override
public void accept(T t) {
future.complete(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.ballerina.stdlib.data.xmldata.io;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Thread pool for data reader.
*
* @since 0.1.0
*/
public class DataReaderThreadPool {

// TODO : Make this configurable, in Ballerina Library.
private static final int CORE_POOL_SIZE = 0;
private static final int MAX_POOL_SIZE = 50;
private static final long KEEP_ALIVE_TIME = 60L;
private static final String THREAD_NAME = "bal-data-xmldata-thread";
public static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,
KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue<>(), new DataThreadFactory());

/**
* Thread factory for data reader.
*/
static class DataThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable runnable) {
Thread balThread = new Thread(runnable);
balThread.setName(THREAD_NAME);
return balThread;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,21 @@

import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.types.MethodType;
import io.ballerina.runtime.api.types.ObjectType;
import io.ballerina.runtime.api.utils.TypeUtils;
import io.ballerina.runtime.api.values.BArray;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BStream;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.runtime.api.values.BXml;
import io.ballerina.stdlib.data.xmldata.io.DataReaderTask;
import io.ballerina.stdlib.data.xmldata.io.DataReaderThreadPool;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticErrorCode;
import io.ballerina.stdlib.data.xmldata.utils.DiagnosticLog;

import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.util.function.Consumer;

/**
* Xml conversion.
Expand All @@ -45,9 +43,6 @@
*/
public class Native {

private static final String METHOD_NAME_NEXT = "next";
private static final String METHOD_NAME_CLOSE = "close";

public static Object fromXmlWithType(BXml xml, BMap<BString, Object> options, BTypedesc typed) {
try {
return XmlTraversal.traverse(xml, typed.getDescribingType());
Expand All @@ -68,18 +63,9 @@ public static Object fromXmlStringWithType(Environment env, Object xml, BMap<BSt
} else if (xml instanceof BStream) {
final BObject iteratorObj = ((BStream) xml).getIteratorObj();
final Future future = env.markAsync();
ResultConsumer<Object> resultConsumer = new ResultConsumer<>(future);
try (var byteBlockSteam = new BallerinaByteBlockInputStream(env, iteratorObj,
resolveNextMethod(iteratorObj),
resolveCloseMethod(iteratorObj),
resultConsumer)) {
Object result = XmlParser.parse(new InputStreamReader(byteBlockSteam), typed.getDescribingType());
future.complete(result);
return null;
} catch (Exception e) {
future.complete(DiagnosticLog.error(DiagnosticErrorCode.STREAM_BROKEN, e.getMessage()));
return null;
}
DataReaderTask task = new DataReaderTask(env, iteratorObj, future, typed);
DataReaderThreadPool.EXECUTOR_SERVICE.submit(task);
return null;
} else {
return DiagnosticLog.error(DiagnosticErrorCode.UNSUPPORTED_TYPE);
}
Expand All @@ -88,42 +74,4 @@ public static Object fromXmlStringWithType(Environment env, Object xml, BMap<BSt
}
}

static MethodType resolveNextMethod(BObject iterator) {
MethodType method = getMethodType(iterator, METHOD_NAME_NEXT);
if (method != null) {
return method;
}
throw new IllegalStateException("next method not found in the iterator object");
}

static MethodType resolveCloseMethod(BObject iterator) {
return getMethodType(iterator, METHOD_NAME_CLOSE);
}

private static MethodType getMethodType(BObject iterator, String methodNameClose) {
ObjectType objectType = (ObjectType) TypeUtils.getReferredType(iterator.getOriginalType());
MethodType[] methods = objectType.getMethods();
// Assumes compile-time validation of the iterator object
for (MethodType method : methods) {
if (method.getName().equals(methodNameClose)) {
return method;
}
}
return null;
}

/**
* This class will hold module related utility functions.
*
* @param <T> The type of the result
* @param future The future to complete
* @since 0.1.0
*/
private record ResultConsumer<T>(Future future) implements Consumer<T> {

@Override
public void accept(T t) {
future.complete(t);
}
}
}

0 comments on commit 9b8776c

Please sign in to comment.