Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Transaction issue and enable testcases #76

Merged
merged 17 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions ballerina/tests/h2-native-tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ function h2NativeQueryComplexTest() returns error? {
@test:Config {
groups: ["transactions", "h2", "native"],
dependsOn: [h2NativeExecuteTestNegative1, h2NativeQueryTest, h2NativeQueryTestNegative, h2NativeQueryComplexTest],
enable: false
enable: true
}
function h2NativeTransactionTest() returns error? {
H2RainierClient rainierClient = check new ();
Expand Down Expand Up @@ -254,17 +254,14 @@ function h2NativeTransactionTest2() returns error? {
check buildingStream.close();
test:assertEquals(buildings, [building33]);

transaction {
_ = check rainierClient->executeNativeSQL(`
UPDATE "Building"
SET
"city" = ${building33Updated.city},
"state" = ${building33Updated.state},
"country" = ${building33Updated.country}
WHERE "buildingCode" = ${building33.buildingCode}
`);
check commit;
}
_ = check rainierClient->executeNativeSQL(`
UPDATE "Building"
SET
"city" = ${building33Updated.city},
"state" = ${building33Updated.state},
"country" = ${building33Updated.country}
WHERE "buildingCode" = ${building33.buildingCode}
`);

stream<Building, persist:Error?> buildingStream3 = rainierClient->queryNativeSQL(`SELECT * FROM "Building" WHERE "buildingCode" = ${building33.buildingCode}`);
Building[] buildings3 = check from Building building in buildingStream3
Expand Down
21 changes: 9 additions & 12 deletions ballerina/tests/mssql-native-tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ function mssqlNativeQueryComplexTest() returns error? {
@test:Config {
groups: ["transactions", "mssql", "native"],
dependsOn: [mssqlNativeExecuteTestNegative1, mssqlNativeQueryTest, mssqlNativeQueryTestNegative, mssqlNativeQueryComplexTest],
enable: false
enable: true
}
function mssqlNativeTransactionTest() returns error? {
MSSQLRainierClient rainierClient = check new ();
Expand Down Expand Up @@ -254,17 +254,14 @@ function mssqlNativeTransactionTest2() returns error? {
check buildingStream.close();
test:assertEquals(buildings, [building33]);

transaction {
_ = check rainierClient->executeNativeSQL(`
UPDATE Building
SET
city = ${building33Updated.city},
state = ${building33Updated.state},
country = ${building33Updated.country}
WHERE buildingCode = ${building33.buildingCode}
`);
check commit;
}
_ = check rainierClient->executeNativeSQL(`
UPDATE Building
SET
city = ${building33Updated.city},
state = ${building33Updated.state},
country = ${building33Updated.country}
WHERE buildingCode = ${building33.buildingCode}
`);

stream<Building, persist:Error?> buildingStream3 = rainierClient->queryNativeSQL(`SELECT * FROM Building WHERE buildingCode = ${building33.buildingCode}`);
Building[] buildings3 = check from Building building in buildingStream3
Expand Down
21 changes: 9 additions & 12 deletions ballerina/tests/mysql-native-tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ function mysqlNativeQueryComplexTest() returns error? {
@test:Config {
groups: ["transactions", "mysql", "native"],
dependsOn: [mysqlNativeExecuteTestNegative1, mysqlNativeQueryTest, mysqlNativeQueryTestNegative, mysqlNativeQueryComplexTest],
enable: false
enable: true
}
function mysqlNativeTransactionTest() returns error? {
MySQLRainierClient rainierClient = check new ();
Expand Down Expand Up @@ -254,17 +254,14 @@ function mysqlNativeTransactionTest2() returns error? {
check buildingStream.close();
test:assertEquals(buildings, [building33]);

transaction {
_ = check rainierClient->executeNativeSQL(`
UPDATE Building
SET
city = ${building33Updated.city},
state = ${building33Updated.state},
country = ${building33Updated.country}
WHERE buildingCode = ${building33.buildingCode}
`);
check commit;
}
_ = check rainierClient->executeNativeSQL(`
UPDATE Building
SET
city = ${building33Updated.city},
state = ${building33Updated.state},
country = ${building33Updated.country}
WHERE buildingCode = ${building33.buildingCode}
`);

stream<Building, persist:Error?> buildingStream3 = rainierClient->queryNativeSQL(`SELECT * FROM Building WHERE buildingCode = ${building33.buildingCode}`);
Building[] buildings3 = check from Building building in buildingStream3
Expand Down
21 changes: 9 additions & 12 deletions ballerina/tests/postgresql-native-tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ function postgresqlNativeQueryComplexTest() returns error? {
@test:Config {
groups: ["transactions", "postgresql", "native"],
dependsOn: [postgresqlNativeExecuteTestNegative1, postgresqlNativeQueryTest, postgresqlNativeQueryTestNegative, postgresqlNativeQueryComplexTest],
enable: false
enable: true
}
function postgresqlNativeTransactionTest() returns error? {
PostgreSQLRainierClient rainierClient = check new ();
Expand Down Expand Up @@ -254,17 +254,14 @@ function postgresqlNativeTransactionTest2() returns error? {
check buildingStream.close();
test:assertEquals(buildings, [building33]);

transaction {
_ = check rainierClient->executeNativeSQL(`
UPDATE "Building"
SET
"city" = ${building33Updated.city},
"state" = ${building33Updated.state},
"country" = ${building33Updated.country}
WHERE "buildingCode" = ${building33.buildingCode}
`);
check commit;
}
_ = check rainierClient->executeNativeSQL(`
UPDATE "Building"
SET
"city" = ${building33Updated.city},
"state" = ${building33Updated.state},
"country" = ${building33Updated.country}
WHERE "buildingCode" = ${building33.buildingCode}
`);

stream<Building, persist:Error?> buildingStream3 = rainierClient->queryNativeSQL(`SELECT * FROM "Building" WHERE "buildingCode" = ${building33.buildingCode}`);
Building[] buildings3 = check from Building building in buildingStream3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.PredefinedTypes;
import io.ballerina.runtime.api.async.Callback;
import io.ballerina.runtime.api.constants.RuntimeConstants;
import io.ballerina.runtime.api.creators.TypeCreator;
import io.ballerina.runtime.api.creators.ValueCreator;
import io.ballerina.runtime.api.types.ErrorType;
Expand All @@ -36,12 +37,11 @@
import io.ballerina.runtime.api.values.BStream;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.runtime.transactions.TransactionLocalContext;
import io.ballerina.runtime.transactions.TransactionResourceManager;
import io.ballerina.stdlib.persist.Constants;
import io.ballerina.stdlib.persist.ModuleUtils;
import io.ballerina.stdlib.persist.sql.Utils;
import io.ballerina.stdlib.sql.parameterprocessor.DefaultResultParameterProcessor;
import io.ballerina.stdlib.sql.parameterprocessor.DefaultStatementParameterProcessor;

import java.util.Map;

Expand Down Expand Up @@ -180,50 +180,12 @@ public void notifyFailure(BError bError) {
static BStream queryNativeSQL(Environment env, BObject client, BObject paramSQLString,
BTypedesc targetType) {
// This method will return `stream<targetType, persist:Error?>`

TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!io.ballerina.stdlib.sql.utils.Utils.isWithinTrxBlock(trxResourceManager)) {
return queryNativeSQLBal(env, client, paramSQLString, targetType);
}

BObject dbClient = (BObject) client.get(DB_CLIENT);
BStream sqlStream = io.ballerina.stdlib.sql.nativeimpl.QueryProcessor.nativeQuery(env, dbClient,
paramSQLString, targetType, DefaultStatementParameterProcessor.getInstance(),
DefaultResultParameterProcessor.getInstance());

if (sqlStream != null) {
BObject persistNativeStream = createPersistNativeSQLStream(sqlStream, null);
RecordType streamConstraint =
(RecordType) TypeUtils.getReferredType(targetType.getDescribingType());
return (ValueCreator.createStreamValue(TypeCreator.createStreamType(streamConstraint,
PredefinedTypes.TYPE_NULL), persistNativeStream)
);
}

return null;
return queryNativeSQLBal(env, client, paramSQLString, targetType);
}

static Object executeNativeSQL(Environment env, BObject client, BObject paramSQLString) {
// This method will return `persist:ExecutionResult|persist:Error`

TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!io.ballerina.stdlib.sql.utils.Utils.isWithinTrxBlock(trxResourceManager)) {
return executeNativeSQLBal(env, client, paramSQLString);
}

BObject dbClient = (BObject) client.get(DB_CLIENT);
Object sqlExecutionResult = io.ballerina.stdlib.sql.nativeimpl.ExecuteProcessor.nativeExecute(env, dbClient,
paramSQLString, DefaultStatementParameterProcessor.getInstance());

if (sqlExecutionResult instanceof BMap) { // returned type is `sql:ExecutionResult`
return ValueCreator.createRecordValue(getModule(),
io.ballerina.stdlib.persist.sql.Constants.PERSIST_EXECUTION_RESULT,
(BMap<BString, Object>) sqlExecutionResult);
} else if (sqlExecutionResult instanceof BError) { // returned type is `sql:Error`
return wrapSQLError((BError) sqlExecutionResult);
}

return null;
return executeNativeSQLBal(env, client, paramSQLString);
}

private static BStream queryNativeSQLBal(Environment env, BObject client, BObject paramSQLString,
Expand All @@ -233,6 +195,12 @@ private static BStream queryNativeSQLBal(Environment env, BObject client, BObjec
BObject dbClient = (BObject) client.get(DB_CLIENT);
RecordType recordType = (RecordType) targetType.getDescribingType();
StreamType streamType = TypeCreator.createStreamType(recordType, PredefinedTypes.TYPE_NULL);
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Map<String, Object> properties = null;
if (currentTrxContext != null) {
properties = Map.of(RuntimeConstants.CURRENT_TRANSACTION_CONTEXT_PROPERTY, currentTrxContext);
}

Future balFuture = env.markAsync();
env.getRuntime().invokeMethodAsyncSequentially(
Expand All @@ -257,7 +225,7 @@ public void notifyFailure(BError bError) { // can only be hit on a panic
BObject errorStream = Utils.createPersistNativeSQLStream(null, bError);
balFuture.complete(errorStream);
}
}, null, streamType, paramSQLString, true, targetType, true
}, properties, streamType, paramSQLString, true, targetType, true
);

return null;
Expand All @@ -267,6 +235,12 @@ private static Object executeNativeSQLBal(Environment env, BObject client, BObje
BObject dbClient = (BObject) client.get(DB_CLIENT);
RecordType persistExecutionResultType = TypeCreator.createRecordType(
io.ballerina.stdlib.persist.sql.Constants.PERSIST_EXECUTION_RESULT, getModule(), 0, true, 0);
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Map<String, Object> properties = null;
if (currentTrxContext != null) {
properties = Map.of(RuntimeConstants.CURRENT_TRANSACTION_CONTEXT_PROPERTY, currentTrxContext);
}

Future balFuture = env.markAsync();
env.getRuntime().invokeMethodAsyncSequentially(
Expand All @@ -292,8 +266,7 @@ public void notifyFailure(BError bError) { // can only be hit on a panic
BError persistError = wrapError(bError);
balFuture.complete(persistError);
}
}, null, persistExecutionResultType, paramSQLString, true
);
}, properties, persistExecutionResultType, paramSQLString, true);

return null;
}
Expand Down
Loading