Skip to content

Commit

Permalink
Merge pull request #720 from ballerina-platform/trx_fix
Browse files Browse the repository at this point in the history
Run database queries inside transaction block as non blocking query
  • Loading branch information
niveathika authored Jun 19, 2024
2 parents bfd5c7c + f1bc2e7 commit 7f17aa5
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 84 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

### Changed
- [Run database queries inside transaction block as non blocking query](https://github.com/ballerina-platform/ballerina-library/issues/6641)

## [1.13.1] - 2024-06-12

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,21 @@ public static SQLDatasource retrieveDatasource(SQLDatasource.SQLDatasourceParams
return sqlDatasourceToBeReturned;
}

public static Connection getConnection(boolean isInTrx, TransactionResourceManager trxResourceManager,
BObject client, SQLDatasource datasource) throws SQLException {
public static Connection getConnection(boolean isInTrx, BObject client, SQLDatasource datasource,
TransactionLocalContext transactionLocalContext,
boolean trxManagerEnabled) throws SQLException {
Connection conn;
try {
if (!isInTrx) {
return datasource.getConnection();
}
String connectorId = (String) client.getNativeData(Constants.SQL_CONNECTOR_TRANSACTION_ID);
boolean isXAConnection = datasource.isXADataSource();
TransactionLocalContext transactionLocalContext = trxResourceManager.getCurrentTransactionContext();
String globalTxId = transactionLocalContext.getGlobalTransactionId();
String currentTxBlockId = transactionLocalContext.getCurrentTransactionBlockId();
BallerinaTransactionContext txContext = transactionLocalContext.getTransactionContext(connectorId);
if (txContext == null) {
if (isXAConnection && !trxResourceManager.getTransactionManagerEnabled()) {
if (isXAConnection && !trxManagerEnabled) {
XAConnection xaConn = datasource.getXAConnection();
XAResource xaResource = xaConn.getXAResource();
TransactionResourceManager.getInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.ballerina.runtime.api.values.BStream;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.runtime.transactions.TransactionLocalContext;
import io.ballerina.runtime.transactions.TransactionResourceManager;
import io.ballerina.stdlib.sql.Constants;
import io.ballerina.stdlib.sql.datasource.SQLDatasource;
Expand Down Expand Up @@ -89,27 +90,24 @@ public static Object nativeCall(Environment env, BObject client, BObject paramSQ
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor,
resultParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor,
resultParameterProcessor, true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor,
resultParameterProcessor, withinTrxBlock, currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});
return null;

}

private static Object nativeCallExecutable(BObject client, BObject paramSQLString, BArray recordTypes,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor,
boolean isWithinTrxBlock,
TransactionResourceManager trxResourceManager) {
boolean isWithinTrxBlock, TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand All @@ -123,7 +121,8 @@ private static Object nativeCallExecutable(BObject client, BObject paramSQLStrin
String sqlQuery = null;
try {
sqlQuery = getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithinTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithinTrxBlock, client, sqlDatasource, currentTrxContext,
trxManagerEnabled);
statement = connection.prepareCall(sqlQuery);

HashMap<Integer, Integer> outputParamTypes = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.transactions.TransactionLocalContext;
import io.ballerina.runtime.transactions.TransactionResourceManager;
import io.ballerina.stdlib.sql.Constants;
import io.ballerina.stdlib.sql.datasource.SQLDatasource;
Expand Down Expand Up @@ -74,24 +75,24 @@ private ExecuteProcessor() {
public static Object nativeExecute(Environment env, BObject client, BObject paramSQLString,
AbstractStatementParameterProcessor statementParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, true,
trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, withinTrxBlock,
currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});

return null;
}

private static Object nativeExecuteExecutable(BObject client, BObject paramSQLString,
AbstractStatementParameterProcessor statementParameterProcessor,
boolean isWithInTrxBlock,
TransactionResourceManager trxResourceManager) {
boolean isWithInTrxBlock, TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand All @@ -105,7 +106,8 @@ private static Object nativeExecuteExecutable(BObject client, BObject paramSQLSt
String sqlQuery = null;
try {
sqlQuery = getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);

if (sqlDatasource.getExecuteGKFlag()) {
statement = connection.prepareStatement(sqlQuery, Statement.RETURN_GENERATED_KEYS);
Expand Down Expand Up @@ -154,25 +156,25 @@ private static Object nativeExecuteExecutable(BObject client, BObject paramSQLSt
public static Object nativeBatchExecute(Environment env, BObject client, BArray paramSQLStrings,
AbstractStatementParameterProcessor statementParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor,
false, null);
balFuture.complete(resultStream);
});
} else {
return nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor,
true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor,
withinTrxBlock, currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});

return null;
}

private static Object nativeBatchExecuteExecutable(BObject client, BArray paramSQLStrings,
AbstractStatementParameterProcessor statementParameterProcessor,
boolean isWithinTrxBlock,
TransactionResourceManager trxResourceManager) {
TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand Down Expand Up @@ -202,7 +204,8 @@ private static Object nativeBatchExecuteExecutable(BObject client, BArray paramS
"commands. These has to be executed in different function calls");
}
}
connection = SQLDatasource.getConnection(isWithinTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithinTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);

if (sqlDatasource.getBatchExecuteGKFlag()) {
statement = connection.prepareStatement(sqlQuery, Statement.RETURN_GENERATED_KEYS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.ballerina.runtime.api.values.BStream;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.runtime.transactions.TransactionLocalContext;
import io.ballerina.runtime.transactions.TransactionResourceManager;
import io.ballerina.stdlib.sql.Constants;
import io.ballerina.stdlib.sql.datasource.SQLDatasource;
Expand Down Expand Up @@ -76,31 +77,28 @@ private QueryProcessor() {
* @param resultParameterProcessor post-processor of the result
* @return result stream or error
*/
public static BStream nativeQuery(
Environment env, BObject client, BObject paramSQLString, Object recordType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor) {
public static BStream nativeQuery(Environment env, BObject client, BObject paramSQLString, Object recordType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
BStream resultStream =
nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor,
resultParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor,
resultParameterProcessor, true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
BStream resultStream =
nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor,
resultParameterProcessor, withinTrxBlock, currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});
return null;
}

private static BStream nativeQueryExecutable(
BObject client, BObject paramSQLString, Object recordType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor, boolean isWithInTrxBlock,
TransactionResourceManager trxResourceManager) {
private static BStream nativeQueryExecutable(BObject client, BObject paramSQLString, Object recordType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor,
boolean isWithInTrxBlock, TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand All @@ -115,7 +113,8 @@ private static BStream nativeQueryExecutable(
String sqlQuery = null;
try {
sqlQuery = Utils.getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);
statement = connection.prepareStatement(sqlQuery);
statementParameterProcessor.setParams(connection, statement, paramSQLString);
resultSet = statement.executeQuery();
Expand Down Expand Up @@ -154,18 +153,18 @@ public static Object nativeQueryRow(Environment env, BObject client, BObject par
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor,
resultParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor,
resultParameterProcessor, true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor,
resultParameterProcessor, withinTrxBlock, currentTrxContext,
trxManagerEnabled);
balFuture.complete(resultStream);
});

return null;
}

Expand All @@ -174,7 +173,7 @@ private static Object nativeQueryRowExecutable(
BTypedesc ballerinaType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor, boolean isWithInTrxBlock,
TransactionResourceManager trxResourceManager) {
TransactionLocalContext currentTrxContext, boolean trxManagerEnabled) {
Type describingType = TypeUtils.getReferredType(ballerinaType.getDescribingType());
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
Expand All @@ -189,7 +188,8 @@ private static Object nativeQueryRowExecutable(
String sqlQuery = null;
try {
sqlQuery = Utils.getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);
statement = connection.prepareStatement(sqlQuery);
statementParameterProcessor.setParams(connection, statement, paramSQLString);
resultSet = statement.executeQuery();
Expand Down

0 comments on commit 7f17aa5

Please sign in to comment.