diff --git a/native/src/main/java/io/ballerina/stdlib/persist/sql/Handler.java b/native/src/main/java/io/ballerina/stdlib/persist/sql/Handler.java deleted file mode 100644 index cd1357c..0000000 --- a/native/src/main/java/io/ballerina/stdlib/persist/sql/Handler.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.ballerina.stdlib.persist.sql; - -import io.ballerina.runtime.api.values.BError; - -public interface Handler { - void notifySuccess(Object result); - - void notifyFailure(BError bError); -} diff --git a/native/src/main/java/io/ballerina/stdlib/persist/sql/datastore/SQLProcessor.java b/native/src/main/java/io/ballerina/stdlib/persist/sql/datastore/SQLProcessor.java index a458351..9662b99 100644 --- a/native/src/main/java/io/ballerina/stdlib/persist/sql/datastore/SQLProcessor.java +++ b/native/src/main/java/io/ballerina/stdlib/persist/sql/datastore/SQLProcessor.java @@ -36,11 +36,9 @@ import io.ballerina.runtime.transactions.TransactionLocalContext; import io.ballerina.runtime.transactions.TransactionResourceManager; import io.ballerina.stdlib.persist.Constants; -import io.ballerina.stdlib.persist.sql.Handler; import io.ballerina.stdlib.persist.sql.Utils; import java.util.Map; -import java.util.concurrent.CompletableFuture; import static io.ballerina.stdlib.persist.Constants.KEY_FIELDS; import static io.ballerina.stdlib.persist.ErrorGenerator.wrapError; @@ -51,10 +49,10 @@ import static io.ballerina.stdlib.persist.Utils.getRecordTypeWithKeyFields; import static io.ballerina.stdlib.persist.Utils.getTransactionContextProperties; import static io.ballerina.stdlib.persist.sql.Constants.DB_CLIENT; +import static io.ballerina.stdlib.persist.sql.Constants.PERSIST_EXECUTION_RESULT; import static io.ballerina.stdlib.persist.sql.Constants.SQL_EXECUTE_METHOD; import static io.ballerina.stdlib.persist.sql.Constants.SQL_QUERY_METHOD; import static io.ballerina.stdlib.persist.sql.ModuleUtils.getModule; -import static io.ballerina.stdlib.persist.sql.ModuleUtils.getResult; import static io.ballerina.stdlib.persist.sql.Utils.createPersistNativeSQLStream; import static io.ballerina.stdlib.persist.sql.Utils.wrapSQLError; @@ -85,28 +83,7 @@ static BStream query(Environment env, BObject client, BTypedesc targetType, BObj BArray fields = metadata[0]; BArray includes = metadata[1]; BArray typeDescriptions = metadata[2]; - - CompletableFuture balFuture = new CompletableFuture<>(); - Thread.startVirtualThread(() -> { - Handler handler = new Handler() { - @Override - public void notifySuccess(Object o) { - if (o instanceof BStream) { // stream - BStream sqlStream = (BStream) o; - balFuture.complete(Utils.createPersistSQLStreamValue(sqlStream, targetType, fields, - includes, typeDescriptions, persistClient, null)); - } else { // persist:Error - balFuture.complete(Utils.createPersistSQLStreamValue(null, targetType, fields, includes, - typeDescriptions, persistClient, (BError) o)); - } - } - - @Override - public void notifyFailure(BError bError) { - balFuture.complete(Utils.createPersistSQLStreamValue(null, targetType, fields, includes, - typeDescriptions, persistClient, wrapError(bError))); - } - }; + return env.yieldAndRun(() -> { try { Object result = env.getRuntime().callMethod( // Call `SQLClient.runReadQuery( @@ -116,12 +93,18 @@ public void notifyFailure(BError bError) { persistClient, Constants.RUN_READ_QUERY_METHOD, new StrandMetadata(false, trxContextProperties), targetTypeWithIdFields, fields, includes, whereClause, orderByClause, limitClause, groupByClause); - handler.notifySuccess(result); + if (result instanceof BStream bStream) { // stream + return Utils.createPersistSQLStreamValue(bStream, targetType, fields, includes, typeDescriptions, + persistClient, null); + } + // persist:Error + return Utils.createPersistSQLStreamValue(null, targetType, fields, includes, typeDescriptions, + persistClient, (BError) result); } catch (BError bError) { - handler.notifyFailure(bError); + return Utils.createPersistSQLStreamValue(null, targetType, fields, includes, typeDescriptions, + persistClient, wrapError(bError)); } }); - return (BStream) getResult(balFuture); } static Object queryOne(Environment env, BObject client, BArray path, BTypedesc targetType) { @@ -143,22 +126,9 @@ static Object queryOne(Environment env, BObject client, BArray path, BTypedesc t BArray typeDescriptions = metadata[2]; Object key = getKey(env, path); - - CompletableFuture balFuture = new CompletableFuture<>(); - Thread.startVirtualThread(() -> { - Handler handler = new Handler() { - @Override - public void notifySuccess(Object o) { - balFuture.complete(o); - } - - @Override - public void notifyFailure(BError bError) { - balFuture.complete(wrapError(bError)); - } - }; + return env.yieldAndRun(() -> { try { - Object result = env.getRuntime().callMethod( + return env.getRuntime().callMethod( // Call `SQLClient.runReadByKeyQuery( // typedesc rowType, typedesc rowTypeWithIdFields, anydata key, // string[] fields = [], string[] include = [], typedesc[] typeDescriptions = [] @@ -167,12 +137,10 @@ public void notifyFailure(BError bError) { getPersistClient(client, entity), Constants.RUN_READ_BY_KEY_QUERY_METHOD, new StrandMetadata(false, trxContextProperties), targetType, targetTypeWithIdFields, key, fields, includes, typeDescriptions); - handler.notifySuccess(result); } catch (BError bError) { - handler.notifyFailure(bError); + return wrapError(bError); } }); - return getResult(balFuture); } static BStream queryNativeSQL(Environment env, BObject client, BObject paramSQLString, @@ -194,45 +162,28 @@ private static BStream queryNativeSQLBal(Environment env, BObject client, BObjec TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance(); TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext(); - CompletableFuture balFuture = new CompletableFuture<>(); - Thread.startVirtualThread(() -> { - Map properties = null; - if (currentTrxContext != null) { - properties = Map.of(RuntimeConstants.CURRENT_TRANSACTION_CONTEXT_PROPERTY, currentTrxContext); - } - Handler handler = new Handler() { - @Override - public void notifySuccess(Object o) { - // returned type is `stream` - BStream sqlStream = (BStream) o; - BObject persistNativeStream = createPersistNativeSQLStream(sqlStream, null); - RecordType streamConstraint = - (RecordType) TypeUtils.getReferredType(targetType.getDescribingType()); - balFuture.complete( - ValueCreator.createStreamValue(TypeCreator.createStreamType(streamConstraint, - PredefinedTypes.TYPE_NULL), persistNativeStream) - ); - } - - @Override - public void notifyFailure(BError bError) { // can only be hit on a panic - BObject errorStream = Utils.createPersistNativeSQLStream(null, bError); - balFuture.complete(errorStream); - } - }; + return (BStream) env.yieldAndRun(() -> { try { + Map properties = null; + if (currentTrxContext != null) { + properties = Map.of(RuntimeConstants.CURRENT_TRANSACTION_CONTEXT_PROPERTY, currentTrxContext); + } Object result = env.getRuntime().callMethod( // Call `sqlClient.query(paramSQLString, targetType)` which returns // `stream` - - dbClient, SQL_QUERY_METHOD, null, new StrandMetadata(false, properties), paramSQLString, + dbClient, SQL_QUERY_METHOD, new StrandMetadata(false, properties), paramSQLString, targetType); - handler.notifySuccess(result); + // returned type is `stream` + BStream sqlStream = (BStream) result; + BObject persistNativeStream = createPersistNativeSQLStream(sqlStream, null); + RecordType streamConstraint = + (RecordType) TypeUtils.getReferredType(targetType.getDescribingType()); + return ValueCreator.createStreamValue(TypeCreator.createStreamType(streamConstraint, + PredefinedTypes.TYPE_NULL), persistNativeStream); } catch (BError bError) { - handler.notifyFailure(bError); + return Utils.createPersistNativeSQLStream(null, bError); } }); - return (BStream) getResult(balFuture); } private static Object executeNativeSQLBal(Environment env, BObject client, BObject paramSQLString) { @@ -240,42 +191,23 @@ private static Object executeNativeSQLBal(Environment env, BObject client, BObje TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance(); TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext(); - CompletableFuture balFuture = new CompletableFuture<>(); - Thread.startVirtualThread(() -> { + return env.yieldAndRun(() -> { Map properties = null; if (currentTrxContext != null) { properties = Map.of(RuntimeConstants.CURRENT_TRANSACTION_CONTEXT_PROPERTY, currentTrxContext); } - Handler handler = new Handler() { - @Override - public void notifySuccess(Object o) { - if (o instanceof BMap) { // returned type is `sql:ExecutionResult` - BMap persistExecutionResult = - ValueCreator.createRecordValue(getModule(), - io.ballerina.stdlib.persist.sql.Constants.PERSIST_EXECUTION_RESULT, - (BMap) o); - balFuture.complete(persistExecutionResult); - } else if (o instanceof BError) { // returned type is `sql:Error` - BError persistError = wrapSQLError((BError) o); - balFuture.complete(persistError); - } - } - - @Override - public void notifyFailure(BError bError) { // can only be hit on a panic - BError persistError = wrapError(bError); - balFuture.complete(persistError); - } - }; try { - env.getRuntime().callMethod( + Object result = env.getRuntime().callMethod( // Call `sqlClient.execute(paramSQLString)` which returns `sql:ExecutionResult|sql:Error` - dbClient, SQL_EXECUTE_METHOD, null, new StrandMetadata(false, properties), paramSQLString); - handler.notifySuccess(balFuture); + dbClient, SQL_EXECUTE_METHOD, new StrandMetadata(false, properties), paramSQLString); + if (result instanceof BMap map) { // returned type is `sql:ExecutionResult` + return ValueCreator.createRecordValue(getModule(), PERSIST_EXECUTION_RESULT, (BMap) map); + } + return wrapSQLError((BError) result); } catch (BError bError) { - handler.notifyFailure(bError); + return bError; } }); - return getResult(balFuture); } }