Skip to content

Commit

Permalink
Refactor sql calls inside transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
warunalakshitha committed Jun 14, 2024
1 parent 87b9629 commit be0fc13
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,21 @@ public static SQLDatasource retrieveDatasource(SQLDatasource.SQLDatasourceParams
return sqlDatasourceToBeReturned;
}

public static Connection getConnection(boolean isInTrx, TransactionResourceManager trxResourceManager,
BObject client, SQLDatasource datasource) throws SQLException {
public static Connection getConnection(boolean isInTrx, BObject client, SQLDatasource datasource,
TransactionLocalContext transactionLocalContext,
boolean trxManagerEnabled) throws SQLException {
Connection conn;
try {
if (!isInTrx) {
return datasource.getConnection();
}
String connectorId = (String) client.getNativeData(Constants.SQL_CONNECTOR_TRANSACTION_ID);
boolean isXAConnection = datasource.isXADataSource();
TransactionLocalContext transactionLocalContext = trxResourceManager.getCurrentTransactionContext();
String globalTxId = transactionLocalContext.getGlobalTransactionId();
String currentTxBlockId = transactionLocalContext.getCurrentTransactionBlockId();
BallerinaTransactionContext txContext = transactionLocalContext.getTransactionContext(connectorId);
if (txContext == null) {
if (isXAConnection && !trxResourceManager.getTransactionManagerEnabled()) {
if (isXAConnection && trxManagerEnabled) {
XAConnection xaConn = datasource.getXAConnection();
XAResource xaResource = xaConn.getXAResource();
TransactionResourceManager.getInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.ballerina.runtime.api.values.BStream;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.runtime.transactions.TransactionLocalContext;
import io.ballerina.runtime.transactions.TransactionResourceManager;
import io.ballerina.stdlib.sql.Constants;
import io.ballerina.stdlib.sql.datasource.SQLDatasource;
Expand Down Expand Up @@ -89,27 +90,24 @@ public static Object nativeCall(Environment env, BObject client, BObject paramSQ
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor,
resultParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor,
resultParameterProcessor, true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeCallExecutable(client, paramSQLString, recordTypes, statementParameterProcessor,
resultParameterProcessor, withinTrxBlock, currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});
return null;

}

private static Object nativeCallExecutable(BObject client, BObject paramSQLString, BArray recordTypes,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor,
boolean isWithinTrxBlock,
TransactionResourceManager trxResourceManager) {
boolean isWithinTrxBlock, TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand All @@ -123,7 +121,8 @@ private static Object nativeCallExecutable(BObject client, BObject paramSQLStrin
String sqlQuery = null;
try {
sqlQuery = getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithinTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithinTrxBlock, client, sqlDatasource, currentTrxContext,
trxManagerEnabled);
statement = connection.prepareCall(sqlQuery);

HashMap<Integer, Integer> outputParamTypes = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.transactions.TransactionLocalContext;
import io.ballerina.runtime.transactions.TransactionResourceManager;
import io.ballerina.stdlib.sql.Constants;
import io.ballerina.stdlib.sql.datasource.SQLDatasource;
Expand Down Expand Up @@ -74,24 +75,24 @@ private ExecuteProcessor() {
public static Object nativeExecute(Environment env, BObject client, BObject paramSQLString,
AbstractStatementParameterProcessor statementParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, true,
trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeExecuteExecutable(client, paramSQLString, statementParameterProcessor, withinTrxBlock,
currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});

return null;
}

private static Object nativeExecuteExecutable(BObject client, BObject paramSQLString,
AbstractStatementParameterProcessor statementParameterProcessor,
boolean isWithInTrxBlock,
TransactionResourceManager trxResourceManager) {
boolean isWithInTrxBlock, TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand All @@ -105,7 +106,8 @@ private static Object nativeExecuteExecutable(BObject client, BObject paramSQLSt
String sqlQuery = null;
try {
sqlQuery = getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);

if (sqlDatasource.getExecuteGKFlag()) {
statement = connection.prepareStatement(sqlQuery, Statement.RETURN_GENERATED_KEYS);
Expand Down Expand Up @@ -154,25 +156,25 @@ private static Object nativeExecuteExecutable(BObject client, BObject paramSQLSt
public static Object nativeBatchExecute(Environment env, BObject client, BArray paramSQLStrings,
AbstractStatementParameterProcessor statementParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor,
false, null);
balFuture.complete(resultStream);
});
} else {
return nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor,
true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeBatchExecuteExecutable(client, paramSQLStrings, statementParameterProcessor,
withinTrxBlock, currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});

return null;
}

private static Object nativeBatchExecuteExecutable(BObject client, BArray paramSQLStrings,
AbstractStatementParameterProcessor statementParameterProcessor,
boolean isWithinTrxBlock,
TransactionResourceManager trxResourceManager) {
TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand Down Expand Up @@ -202,7 +204,8 @@ private static Object nativeBatchExecuteExecutable(BObject client, BArray paramS
"commands. These has to be executed in different function calls");
}
}
connection = SQLDatasource.getConnection(isWithinTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithinTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);

if (sqlDatasource.getBatchExecuteGKFlag()) {
statement = connection.prepareStatement(sqlQuery, Statement.RETURN_GENERATED_KEYS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.ballerina.runtime.api.values.BStream;
import io.ballerina.runtime.api.values.BString;
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.runtime.transactions.TransactionLocalContext;
import io.ballerina.runtime.transactions.TransactionResourceManager;
import io.ballerina.stdlib.sql.Constants;
import io.ballerina.stdlib.sql.datasource.SQLDatasource;
Expand Down Expand Up @@ -81,26 +82,24 @@ public static BStream nativeQuery(
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
BStream resultStream =
nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor,
resultParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor,
resultParameterProcessor, true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
BStream resultStream =
nativeQueryExecutable(client, paramSQLString, recordType, statementParameterProcessor,
resultParameterProcessor, withinTrxBlock, currentTrxContext, trxManagerEnabled);
balFuture.complete(resultStream);
});
return null;
}

private static BStream nativeQueryExecutable(
BObject client, BObject paramSQLString, Object recordType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor, boolean isWithInTrxBlock,
TransactionResourceManager trxResourceManager) {
private static BStream nativeQueryExecutable(BObject client, BObject paramSQLString, Object recordType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor,
boolean isWithInTrxBlock, TransactionLocalContext currentTrxContext,
boolean trxManagerEnabled) {
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
SQLDatasource sqlDatasource = (SQLDatasource) dbClient;
Expand All @@ -115,7 +114,8 @@ private static BStream nativeQueryExecutable(
String sqlQuery = null;
try {
sqlQuery = Utils.getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);
statement = connection.prepareStatement(sqlQuery);
statementParameterProcessor.setParams(connection, statement, paramSQLString);
resultSet = statement.executeQuery();
Expand Down Expand Up @@ -154,27 +154,26 @@ public static Object nativeQueryRow(Environment env, BObject client, BObject par
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor) {
TransactionResourceManager trxResourceManager = TransactionResourceManager.getInstance();
if (!Utils.isWithinTrxBlock(trxResourceManager)) {
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor,
resultParameterProcessor, false, null);
balFuture.complete(resultStream);
});
} else {
return nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor,
resultParameterProcessor, true, trxResourceManager);
}
boolean withinTrxBlock = Utils.isWithinTrxBlock(trxResourceManager);
boolean trxManagerEnabled = trxResourceManager.getTransactionManagerEnabled();
TransactionLocalContext currentTrxContext = trxResourceManager.getCurrentTransactionContext();
Future balFuture = env.markAsync();
SQL_EXECUTOR_SERVICE.execute(() -> {
Object resultStream =
nativeQueryRowExecutable(client, paramSQLString, bTypedesc, statementParameterProcessor,
resultParameterProcessor, withinTrxBlock, currentTrxContext,
trxManagerEnabled);
balFuture.complete(resultStream);
});

return null;
}

private static Object nativeQueryRowExecutable(
BObject client, BObject paramSQLString,
BTypedesc ballerinaType,
AbstractStatementParameterProcessor statementParameterProcessor,
AbstractResultParameterProcessor resultParameterProcessor, boolean isWithInTrxBlock,
TransactionResourceManager trxResourceManager) {
AbstractResultParameterProcessor resultParameterProcessor, boolean isWithInTrxBlock, TransactionLocalContext currentTrxContext, boolean trxManagerEnabled) {
Type describingType = TypeUtils.getReferredType(ballerinaType.getDescribingType());
Object dbClient = client.getNativeData(Constants.DATABASE_CLIENT);
if (dbClient != null) {
Expand All @@ -189,7 +188,8 @@ private static Object nativeQueryRowExecutable(
String sqlQuery = null;
try {
sqlQuery = Utils.getSqlQuery(paramSQLString);
connection = SQLDatasource.getConnection(isWithInTrxBlock, trxResourceManager, client, sqlDatasource);
connection = SQLDatasource.getConnection(isWithInTrxBlock, client, sqlDatasource,
currentTrxContext, trxManagerEnabled);
statement = connection.prepareStatement(sqlQuery);
statementParameterProcessor.setParams(connection, statement, paramSQLString);
resultSet = statement.executeQuery();
Expand Down

0 comments on commit be0fc13

Please sign in to comment.