Skip to content

Commit

Permalink
Fix issue with yields
Browse files Browse the repository at this point in the history
  • Loading branch information
warunalakshitha committed Nov 16, 2024
1 parent 61df4ee commit fd229a5
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 114 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@
import io.ballerina.runtime.transactions.TransactionLocalContext;
import io.ballerina.runtime.transactions.TransactionResourceManager;
import io.ballerina.stdlib.persist.Constants;
import io.ballerina.stdlib.persist.sql.Handler;
import io.ballerina.stdlib.persist.sql.Utils;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static io.ballerina.stdlib.persist.Constants.KEY_FIELDS;
import static io.ballerina.stdlib.persist.ErrorGenerator.wrapError;
Expand All @@ -51,10 +49,10 @@
import static io.ballerina.stdlib.persist.Utils.getRecordTypeWithKeyFields;
import static io.ballerina.stdlib.persist.Utils.getTransactionContextProperties;
import static io.ballerina.stdlib.persist.sql.Constants.DB_CLIENT;
import static io.ballerina.stdlib.persist.sql.Constants.PERSIST_EXECUTION_RESULT;
import static io.ballerina.stdlib.persist.sql.Constants.SQL_EXECUTE_METHOD;
import static io.ballerina.stdlib.persist.sql.Constants.SQL_QUERY_METHOD;
import static io.ballerina.stdlib.persist.sql.ModuleUtils.getModule;
import static io.ballerina.stdlib.persist.sql.ModuleUtils.getResult;
import static io.ballerina.stdlib.persist.sql.Utils.createPersistNativeSQLStream;
import static io.ballerina.stdlib.persist.sql.Utils.wrapSQLError;

Expand Down Expand Up @@ -85,28 +83,7 @@ static BStream query(Environment env, BObject client, BTypedesc targetType, BObj
BArray fields = metadata[0];
BArray includes = metadata[1];
BArray typeDescriptions = metadata[2];

CompletableFuture<Object> balFuture = new CompletableFuture<>();
Thread.startVirtualThread(() -> {
Handler handler = new Handler() {
@Override
public void notifySuccess(Object o) {
if (o instanceof BStream) { // stream<record {}, sql:Error?>
BStream sqlStream = (BStream) o;
balFuture.complete(Utils.createPersistSQLStreamValue(sqlStream, targetType, fields,
includes, typeDescriptions, persistClient, null));
} else { // persist:Error
balFuture.complete(Utils.createPersistSQLStreamValue(null, targetType, fields, includes,
typeDescriptions, persistClient, (BError) o));
}
}

@Override
public void notifyFailure(BError bError) {
balFuture.complete(Utils.createPersistSQLStreamValue(null, targetType, fields, includes,
typeDescriptions, persistClient, wrapError(bError)));
}
};
return env.yieldAndRun(() -> {
try {
Object result = env.getRuntime().callMethod(
// Call `SQLClient.runReadQuery(
Expand All @@ -116,12 +93,18 @@ public void notifyFailure(BError bError) {
persistClient, Constants.RUN_READ_QUERY_METHOD, new StrandMetadata(false, trxContextProperties),
targetTypeWithIdFields, fields, includes, whereClause,
orderByClause, limitClause, groupByClause);
handler.notifySuccess(result);
if (result instanceof BStream bStream) { // stream<record {}, sql:Error?>
return Utils.createPersistSQLStreamValue(bStream, targetType, fields, includes, typeDescriptions,
persistClient, null);
}
// persist:Error
return Utils.createPersistSQLStreamValue(null, targetType, fields, includes, typeDescriptions,
persistClient, (BError) result);
} catch (BError bError) {
handler.notifyFailure(bError);
return Utils.createPersistSQLStreamValue(null, targetType, fields, includes, typeDescriptions,
persistClient, wrapError(bError));
}
});
return (BStream) getResult(balFuture);
}

static Object queryOne(Environment env, BObject client, BArray path, BTypedesc targetType) {
Expand All @@ -143,22 +126,9 @@ static Object queryOne(Environment env, BObject client, BArray path, BTypedesc t
BArray typeDescriptions = metadata[2];

Object key = getKey(env, path);

CompletableFuture<Object> balFuture = new CompletableFuture<>();
Thread.startVirtualThread(() -> {
Handler handler = new Handler() {
@Override
public void notifySuccess(Object o) {
balFuture.complete(o);
}

@Override
public void notifyFailure(BError bError) {
balFuture.complete(wrapError(bError));
}
};
return env.yieldAndRun(() -> {
try {
Object result = env.getRuntime().callMethod(
return env.getRuntime().callMethod(
// Call `SQLClient.runReadByKeyQuery(
// typedesc<record {}> rowType, typedesc<record {}> rowTypeWithIdFields, anydata key,
// string[] fields = [], string[] include = [], typedesc<record {}>[] typeDescriptions = []
Expand All @@ -167,12 +137,10 @@ public void notifyFailure(BError bError) {
getPersistClient(client, entity), Constants.RUN_READ_BY_KEY_QUERY_METHOD,
new StrandMetadata(false, trxContextProperties), targetType, targetTypeWithIdFields, key,
fields, includes, typeDescriptions);
handler.notifySuccess(result);
} catch (BError bError) {
handler.notifyFailure(bError);
return wrapError(bError);
}
});
return getResult(balFuture);
}

static BStream queryNativeSQL(Environment env, BObject client, BObject paramSQLString,
Expand All @@ -194,88 +162,52 @@ private static BStream queryNativeSQLBal(Environment env, BObject client, BObjec
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();

CompletableFuture<Object> balFuture = new CompletableFuture<>();
Thread.startVirtualThread(() -> {
Map<String, Object> properties = null;
if (currentTrxContext != null) {
properties = Map.of(RuntimeConstants.CURRENT_TRANSACTION_CONTEXT_PROPERTY, currentTrxContext);
}
Handler handler = new Handler() {
@Override
public void notifySuccess(Object o) {
// returned type is `stream<record {}, sql:Error?>`
BStream sqlStream = (BStream) o;
BObject persistNativeStream = createPersistNativeSQLStream(sqlStream, null);
RecordType streamConstraint =
(RecordType) TypeUtils.getReferredType(targetType.getDescribingType());
balFuture.complete(
ValueCreator.createStreamValue(TypeCreator.createStreamType(streamConstraint,
PredefinedTypes.TYPE_NULL), persistNativeStream)
);
}

@Override
public void notifyFailure(BError bError) { // can only be hit on a panic
BObject errorStream = Utils.createPersistNativeSQLStream(null, bError);
balFuture.complete(errorStream);
}
};
return (BStream) env.yieldAndRun(() -> {
try {
Map<String, Object> properties = null;
if (currentTrxContext != null) {
properties = Map.of(RuntimeConstants.CURRENT_TRANSACTION_CONTEXT_PROPERTY, currentTrxContext);
}
Object result = env.getRuntime().callMethod(
// Call `sqlClient.query(paramSQLString, targetType)` which returns
// `stream<targetType, sql:Error?>`

dbClient, SQL_QUERY_METHOD, null, new StrandMetadata(false, properties), paramSQLString,
dbClient, SQL_QUERY_METHOD, new StrandMetadata(false, properties), paramSQLString,
targetType);
handler.notifySuccess(result);
// returned type is `stream<record {}, sql:Error?>`
BStream sqlStream = (BStream) result;
BObject persistNativeStream = createPersistNativeSQLStream(sqlStream, null);
RecordType streamConstraint =
(RecordType) TypeUtils.getReferredType(targetType.getDescribingType());
return ValueCreator.createStreamValue(TypeCreator.createStreamType(streamConstraint,
PredefinedTypes.TYPE_NULL), persistNativeStream);
} catch (BError bError) {
handler.notifyFailure(bError);
return Utils.createPersistNativeSQLStream(null, bError);
}
});
return (BStream) getResult(balFuture);
}

private static Object executeNativeSQLBal(Environment env, BObject client, BObject paramSQLString) {
BObject dbClient = (BObject) client.get(DB_CLIENT);
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();

CompletableFuture<Object> balFuture = new CompletableFuture<>();
Thread.startVirtualThread(() -> {
return env.yieldAndRun(() -> {
Map<String, Object> properties = null;
if (currentTrxContext != null) {
properties = Map.of(RuntimeConstants.CURRENT_TRANSACTION_CONTEXT_PROPERTY, currentTrxContext);
}
Handler handler = new Handler() {
@Override
public void notifySuccess(Object o) {
if (o instanceof BMap) { // returned type is `sql:ExecutionResult`
BMap<BString, Object> persistExecutionResult =
ValueCreator.createRecordValue(getModule(),
io.ballerina.stdlib.persist.sql.Constants.PERSIST_EXECUTION_RESULT,
(BMap<BString, Object>) o);
balFuture.complete(persistExecutionResult);
} else if (o instanceof BError) { // returned type is `sql:Error`
BError persistError = wrapSQLError((BError) o);
balFuture.complete(persistError);
}
}

@Override
public void notifyFailure(BError bError) { // can only be hit on a panic
BError persistError = wrapError(bError);
balFuture.complete(persistError);
}
};
try {
env.getRuntime().callMethod(
Object result = env.getRuntime().callMethod(
// Call `sqlClient.execute(paramSQLString)` which returns `sql:ExecutionResult|sql:Error`
dbClient, SQL_EXECUTE_METHOD, null, new StrandMetadata(false, properties), paramSQLString);
handler.notifySuccess(balFuture);
dbClient, SQL_EXECUTE_METHOD, new StrandMetadata(false, properties), paramSQLString);
if (result instanceof BMap map) { // returned type is `sql:ExecutionResult`
return ValueCreator.createRecordValue(getModule(), PERSIST_EXECUTION_RESULT, (BMap<BString,
Object>) map);
}
return wrapSQLError((BError) result);
} catch (BError bError) {
handler.notifyFailure(bError);
return bError;
}
});
return getResult(balFuture);
}
}

0 comments on commit fd229a5

Please sign in to comment.