From 7185df3ac8cad13fd49955aef80e6948ea128a88 Mon Sep 17 00:00:00 2001 From: yrizhkov Date: Thu, 5 Dec 2024 11:26:34 +0200 Subject: [PATCH 1/8] FMWK-622 Add support for multi-record transactions --- docs/params.md | 1 + pom.xml | 53 +----- .../aerospike/jdbc/AerospikeConnection.java | 69 +++++++- .../jdbc/AerospikeDatabaseMetadata.java | 11 +- .../aerospike/jdbc/AerospikeStatement.java | 1 + .../aerospike/jdbc/model/AerospikeQuery.java | 11 ++ .../aerospike/jdbc/model/DriverPolicy.java | 6 + .../jdbc/query/DeleteQueryHandler.java | 2 +- .../jdbc/query/InsertQueryHandler.java | 4 +- .../aerospike/jdbc/query/PolicyBuilder.java | 17 +- .../jdbc/query/UpdateQueryHandler.java | 2 +- .../com/aerospike/jdbc/TransactionTest.java | 158 ++++++++++++++++++ 12 files changed, 269 insertions(+), 66 deletions(-) create mode 100644 src/test/java/com/aerospike/jdbc/TransactionTest.java diff --git a/docs/params.md b/docs/params.md index ad88701..c941462 100644 --- a/docs/params.md +++ b/docs/params.md @@ -40,3 +40,4 @@ Consider setting a custom value if really necessary. | metadataCacheTtlSeconds | 3600 | Database metadata cache TTL in seconds | | schemaBuilderMaxRecords | 1000 | The number of records to be used to build the table schema | | showRecordMetadata | false | Add record metadata columns (__digest, __ttl, __gen) | +| txnTimeoutSeconds | 10 | Multi-record transaction timeout in seconds | diff --git a/pom.xml b/pom.xml index 76b2e7c..e901850 100644 --- a/pom.xml +++ b/pom.xml @@ -21,12 +21,12 @@ 3.13.0 3.5.0 1.6.14 - 2.9.1 + 3.11.1 3.3.1 1.6 3.6.0 - 8.1.4 + 9.0.1 4.1.114.Final 2.18.1 1.38.0 @@ -152,52 +152,6 @@ - - - - org.apache.maven.plugins - maven-compiler-plugin - ${maven-compiler-plugin.version} - - - - org.sonatype.plugins - nexus-staging-maven-plugin - ${nexus-staging-maven-plugin.version} - - - - org.apache.maven.plugins - maven-shade-plugin - ${maven-shade-plugin.version} - - - - org.apache.maven.plugins - maven-javadoc-plugin - ${maven-javadoc-plugin.version} - - - - org.apache.maven.plugins - maven-source-plugin - ${maven-source-plugin.version} - - - - org.apache.maven.plugins - maven-gpg-plugin - ${maven-gpg-plugin.version} - - - - org.apache.maven.plugins - maven-surefire-plugin - ${maven-surefire-plugin.version} - - - - org.apache.maven.plugins @@ -255,8 +209,7 @@ maven-javadoc-plugin ${maven-javadoc-plugin.version} - -Xdoclint:none - ${basedir} + all,-missing A JDBC driver for the Aerospike database public true diff --git a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java index 16c29e0..017f2fb 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java @@ -1,6 +1,10 @@ package com.aerospike.jdbc; +import com.aerospike.client.AbortStatus; +import com.aerospike.client.AerospikeException; +import com.aerospike.client.CommitStatus; import com.aerospike.client.IAerospikeClient; +import com.aerospike.client.Txn; import com.aerospike.client.policy.Policy; import com.aerospike.jdbc.model.DriverConfiguration; import com.aerospike.jdbc.sql.SimpleWrapper; @@ -42,6 +46,8 @@ public class AerospikeConnection implements Connection, SimpleWrapper { private volatile Map> typeMap = emptyMap(); private volatile int holdability = HOLD_CURSORS_OVER_COMMIT; private volatile boolean closed; + private boolean autoCommit = true; + private Txn txn; public AerospikeConnection(String url, Properties props) { logger.info("Init AerospikeConnection"); @@ -77,24 +83,64 @@ public String nativeSQL(String sql) throws SQLException { @Override public boolean getAutoCommit() throws SQLException { checkClosed(); - return true; + return autoCommit; } @Override public void setAutoCommit(boolean autoCommit) throws SQLException { checkClosed(); - // no-op + if (autoCommit) { + txn = null; + } + this.autoCommit = autoCommit; } + /** + * Requires Aerospike Server 8.0+. + * + * @throws SQLException if the transaction commit fails. + */ @Override public void commit() throws SQLException { checkClosed(); - // no-op + if (autoCommit) { + throw new SQLException("Connection is in auto-commit mode"); + } + if (txn == null) { + throw new SQLException("txn is null"); + } + try { + CommitStatus status = client.commit(txn); + logger.info(() -> format("MRT %d commit status: %s", txn.getId(), status)); + } catch (AerospikeException e) { + throw new SQLException(e); + } finally { + txn = null; + } } + /** + * Requires Aerospike Server 8.0+. + * + * @throws SQLException if the transaction rollback fails. + */ @Override public void rollback() throws SQLException { - throw new SQLFeatureNotSupportedException("rollback is not supported"); + checkClosed(); + if (autoCommit) { + throw new SQLException("Connection is in auto-commit mode"); + } + if (txn == null) { + throw new SQLException("txn is null"); + } + try { + AbortStatus status = client.abort(txn); + logger.info(() -> format("MRT %d rollback status: %s", txn.getId(), status)); + } catch (AerospikeException e) { + throw new SQLException(e); + } finally { + txn = null; + } } @Override @@ -168,11 +214,13 @@ public void clearWarnings() throws SQLException { } @Override + @SuppressWarnings("MagicConstant") public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { return createStatement(resultSetType, resultSetConcurrency, holdability); } @Override + @SuppressWarnings("MagicConstant") public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return prepareStatement(sql, resultSetType, resultSetConcurrency, holdability); @@ -239,6 +287,7 @@ public void releaseSavepoint(Savepoint savepoint) throws SQLException { public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { checkClosed(); + checkTxn(); validateResultSetParameters(resultSetType, resultSetConcurrency, resultSetHoldability); return new AerospikeStatement(client, this); } @@ -247,10 +296,18 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency, in public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { checkClosed(); + checkTxn(); validateResultSetParameters(resultSetType, resultSetConcurrency, resultSetHoldability); return new AerospikePreparedStatement(client, this, sql); } + private void checkTxn() { + if (!autoCommit && txn == null) { + txn = new Txn(); + txn.setTimeout(config.getDriverPolicy().getTxnTimeoutSeconds()); + } + } + private void validateResultSetParameters(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { if (resultSetType != TYPE_FORWARD_ONLY) { @@ -428,4 +485,8 @@ public AerospikeVersion getAerospikeVersion() { public IAerospikeClient getClient() { return client; } + + public Txn getTxn() { + return txn; + } } diff --git a/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java b/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java index 9416b60..fd5bed2 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java @@ -39,6 +39,7 @@ import static com.google.common.base.Strings.isNullOrEmpty; import static java.lang.String.format; import static java.sql.Connection.TRANSACTION_NONE; +import static java.sql.Connection.TRANSACTION_SERIALIZABLE; import static java.sql.Types.*; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; @@ -340,7 +341,7 @@ public boolean supportsMultipleResultSets() { @Override public boolean supportsMultipleTransactions() { - return false; + return true; } @Override @@ -655,17 +656,17 @@ public int getMaxUserNameLength() { @Override public int getDefaultTransactionIsolation() { - return TRANSACTION_NONE; + return TRANSACTION_SERIALIZABLE; } @Override public boolean supportsTransactions() { - return false; + return true; } @Override public boolean supportsTransactionIsolationLevel(int level) { - return TRANSACTION_NONE == level; + return TRANSACTION_NONE == level || TRANSACTION_SERIALIZABLE == level; } @Override @@ -675,7 +676,7 @@ public boolean supportsDataDefinitionAndDataManipulationTransactions() { @Override public boolean supportsDataManipulationTransactionsOnly() { - return false; + return true; } @Override diff --git a/src/main/java/com/aerospike/jdbc/AerospikeStatement.java b/src/main/java/com/aerospike/jdbc/AerospikeStatement.java index 4b50e99..f28247f 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeStatement.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeStatement.java @@ -76,6 +76,7 @@ protected AerospikeQuery parseQuery(String sql, List sqlParameters) thro if (query.getCatalog() == null) { query.setCatalog(catalog); } + query.setTxn(connection.getTxn()); return query; } diff --git a/src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java b/src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java index 4304cc2..af8e739 100644 --- a/src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java +++ b/src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java @@ -1,5 +1,6 @@ package com.aerospike.jdbc.model; +import com.aerospike.client.Txn; import com.aerospike.client.exp.Exp; import com.aerospike.client.exp.Expression; import com.aerospike.jdbc.predicate.QueryPredicate; @@ -43,6 +44,8 @@ public class AerospikeQuery { private List values; private List columns; + private Txn txn; + public AerospikeQuery() { this.queryType = QueryType.UNKNOWN; } @@ -147,6 +150,14 @@ public void setColumns(List columns) { this.columns = columns; } + public Txn getTxn() { + return txn; + } + + public void setTxn(Txn txn) { + this.txn = txn; + } + public String[] columnBins() { String[] binNames = columns.stream() .filter(c -> !Objects.equals(c, ASTERISK)) diff --git a/src/main/java/com/aerospike/jdbc/model/DriverPolicy.java b/src/main/java/com/aerospike/jdbc/model/DriverPolicy.java index 3b9eccd..9f229d7 100644 --- a/src/main/java/com/aerospike/jdbc/model/DriverPolicy.java +++ b/src/main/java/com/aerospike/jdbc/model/DriverPolicy.java @@ -13,6 +13,7 @@ public class DriverPolicy { private final int recordSetTimeoutMs; private final int metadataCacheTtlSeconds; private final int schemaBuilderMaxRecords; + private final int txnTimeoutSeconds; private final boolean showRecordMetadata; public DriverPolicy(Properties properties) { @@ -24,6 +25,7 @@ public DriverPolicy(Properties properties) { DEFAULT_METADATA_CACHE_TTL_SECONDS); schemaBuilderMaxRecords = parseInt(properties.getProperty("schemaBuilderMaxRecords"), DEFAULT_SCHEMA_BUILDER_MAX_RECORDS); + txnTimeoutSeconds = parseInt(properties.getProperty("txnTimeoutSeconds"), 0); showRecordMetadata = Boolean.parseBoolean(properties.getProperty("showRecordMetadata")); } @@ -43,6 +45,10 @@ public int getSchemaBuilderMaxRecords() { return schemaBuilderMaxRecords; } + public int getTxnTimeoutSeconds() { + return txnTimeoutSeconds; + } + public boolean getShowRecordMetadata() { return showRecordMetadata; } diff --git a/src/main/java/com/aerospike/jdbc/query/DeleteQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/DeleteQueryHandler.java index 2e6102e..3e6749b 100644 --- a/src/main/java/com/aerospike/jdbc/query/DeleteQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/DeleteQueryHandler.java @@ -52,7 +52,7 @@ public Pair execute(AerospikeQuery query) { client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getCatalog(), query.getSetName()); - final WritePolicy deletePolicy = policyBuilder.buildDeleteWritePolicy(); + final WritePolicy deletePolicy = policyBuilder.buildDeleteWritePolicy(query); final AtomicInteger count = new AtomicInteger(); listener.getRecordSet().forEach(r -> { try { diff --git a/src/main/java/com/aerospike/jdbc/query/InsertQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/InsertQueryHandler.java index af52ccd..e376aa2 100644 --- a/src/main/java/com/aerospike/jdbc/query/InsertQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/InsertQueryHandler.java @@ -43,7 +43,7 @@ public Pair putConsecutively(AerospikeQuery query) { List binNames = getBinNames(query); FutureWriteListener listener = new FutureWriteListener(query.getValues().size()); - WritePolicy writePolicy = policyBuilder.buildCreateOnlyPolicy(); + WritePolicy writePolicy = policyBuilder.buildCreateOnlyPolicy(query); for (Object aerospikeRecord : query.getValues()) { @SuppressWarnings("unchecked") @@ -84,7 +84,7 @@ public Pair putBatch(AerospikeQuery query) { ) ); } - BatchPolicy batchPolicy = client.getBatchPolicyDefault(); + BatchPolicy batchPolicy = policyBuilder.buildBatchPolicyDefault(query); try { client.operate(EventLoopProvider.getEventLoop(), listener, batchPolicy, batchRecords); } catch (AerospikeException e) { diff --git a/src/main/java/com/aerospike/jdbc/query/PolicyBuilder.java b/src/main/java/com/aerospike/jdbc/query/PolicyBuilder.java index c76c5df..41bee42 100644 --- a/src/main/java/com/aerospike/jdbc/query/PolicyBuilder.java +++ b/src/main/java/com/aerospike/jdbc/query/PolicyBuilder.java @@ -1,6 +1,7 @@ package com.aerospike.jdbc.query; import com.aerospike.client.IAerospikeClient; +import com.aerospike.client.policy.BatchPolicy; import com.aerospike.client.policy.BatchReadPolicy; import com.aerospike.client.policy.BatchWritePolicy; import com.aerospike.client.policy.QueryPolicy; @@ -41,15 +42,23 @@ public ScanPolicy buildScanNoBinDataPolicy(AerospikeQuery query) { public WritePolicy buildWritePolicy(AerospikeQuery query) { WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault()); writePolicy.filterExp = query.toFilterExpression(true); + writePolicy.txn = query.getTxn(); return writePolicy; } - public WritePolicy buildDeleteWritePolicy() { + public WritePolicy buildDeleteWritePolicy(AerospikeQuery query) { WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault()); writePolicy.sendKey = false; + writePolicy.txn = query.getTxn(); return writePolicy; } + public BatchPolicy buildBatchPolicyDefault(AerospikeQuery query) { + BatchPolicy batchPolicy = new BatchPolicy(client.getBatchPolicyDefault()); + batchPolicy.txn = query.getTxn(); + return batchPolicy; + } + public BatchReadPolicy buildBatchReadPolicy(AerospikeQuery query) { BatchReadPolicy batchReadPolicy = new BatchReadPolicy(); batchReadPolicy.filterExp = query.toFilterExpression(false); @@ -64,15 +73,17 @@ public BatchWritePolicy buildBatchCreateOnlyPolicy() { return batchWritePolicy; } - public WritePolicy buildCreateOnlyPolicy() { + public WritePolicy buildCreateOnlyPolicy(AerospikeQuery query) { WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault()); writePolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY; + writePolicy.txn = query.getTxn(); return writePolicy; } - public WritePolicy buildUpdateOnlyPolicy() { + public WritePolicy buildUpdateOnlyPolicy(AerospikeQuery query) { WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault()); writePolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY; + writePolicy.txn = query.getTxn(); return writePolicy; } } diff --git a/src/main/java/com/aerospike/jdbc/query/UpdateQueryHandler.java b/src/main/java/com/aerospike/jdbc/query/UpdateQueryHandler.java index 8c59afb..589b8e0 100644 --- a/src/main/java/com/aerospike/jdbc/query/UpdateQueryHandler.java +++ b/src/main/java/com/aerospike/jdbc/query/UpdateQueryHandler.java @@ -31,7 +31,7 @@ public UpdateQueryHandler(IAerospikeClient client, Statement statement) { public Pair execute(AerospikeQuery query) { Collection keyObjects = query.getPrimaryKeys(); final Bin[] bins = getBins(query); - final WritePolicy writePolicy = policyBuilder.buildUpdateOnlyPolicy(); + final WritePolicy writePolicy = policyBuilder.buildUpdateOnlyPolicy(query); if (!keyObjects.isEmpty()) { logger.info("UPDATE primary key"); FutureWriteListener listener = new FutureWriteListener(keyObjects.size()); diff --git a/src/test/java/com/aerospike/jdbc/TransactionTest.java b/src/test/java/com/aerospike/jdbc/TransactionTest.java new file mode 100644 index 0000000..f24c3ca --- /dev/null +++ b/src/test/java/com/aerospike/jdbc/TransactionTest.java @@ -0,0 +1,158 @@ +package com.aerospike.jdbc; + +import com.aerospike.jdbc.util.TestRecord; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Ignore; +import org.testng.annotations.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME; +import static com.aerospike.jdbc.util.TestConfig.HOSTNAME; +import static com.aerospike.jdbc.util.TestConfig.NAMESPACE; +import static com.aerospike.jdbc.util.TestConfig.PORT; +import static com.aerospike.jdbc.util.TestConfig.TABLE_NAME; +import static com.aerospike.jdbc.util.TestUtil.closeQuietly; +import static java.lang.String.format; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +@Ignore +public class TransactionTest { + + private static final Logger logger = Logger.getLogger(TransactionTest.class.getName()); + private static Connection connection; + + private final TestRecord testRecord; + private final TestRecord testRecord2; + + TransactionTest() { + testRecord = new TestRecord("key1", true, 11100, 1, "bar"); + testRecord2 = new TestRecord("key2", false, 11101, 2, "foo"); + } + + @BeforeClass + public static void connectionInit() throws Exception { + logger.info("connectionInit"); + Class.forName("com.aerospike.jdbc.AerospikeDriver").newInstance(); + String url = String.format("jdbc:aerospike:%s:%d/%s?sendKey=true&user=%s&password=%s&durableDelete=true", + HOSTNAME, PORT, NAMESPACE, "", ""); + connection = DriverManager.getConnection(url); + connection.setNetworkTimeout(Executors.newSingleThreadExecutor(), 50000); + } + + @AfterClass + public static void connectionClose() throws SQLException { + logger.info("connectionClose"); + connection.close(); + } + + @BeforeMethod + public void setUp() throws SQLException { + Objects.requireNonNull(connection, "connection is null"); + Statement statement = null; + int count; + String query = testRecord.toInsertQuery(); + try { + statement = connection.createStatement(); + count = statement.executeUpdate(query); + } finally { + closeQuietly(statement); + } + assertEquals(count, 1); + } + + @AfterMethod + public void tearDown() throws SQLException { + Objects.requireNonNull(connection, "connection is null"); + Statement statement = null; + String query = format("DELETE FROM %s", TABLE_NAME); + try { + statement = connection.createStatement(); + boolean result = statement.execute(query); + assertFalse(result); + } finally { + closeQuietly(statement); + } + assertTrue(statement.getUpdateCount() > 0); + } + + @Test + public void testTransactionCommit() throws SQLException { + connection.setAutoCommit(false); + Statement statement = null; + String query = testRecord2.toInsertQuery(); + try { + statement = connection.createStatement(); + statement.executeUpdate(query); + } finally { + closeQuietly(statement); + } + try { + statement = connection.createStatement(); + statement.executeUpdate(format("DELETE FROM %s WHERE __key='key1'", TABLE_NAME)); + } finally { + closeQuietly(statement); + } + connection.commit(); + + ResultSet resultSet = null; + query = format("SELECT * FROM %s LIMIT 10", TABLE_NAME); + try { + statement = connection.createStatement(); + resultSet = statement.executeQuery(query); + assertTrue(resultSet.next()); + assertEquals(resultSet.getString(PRIMARY_KEY_COLUMN_NAME), "key2"); + assertFalse(resultSet.next()); + } finally { + closeQuietly(statement); + closeQuietly(resultSet); + } + connection.setAutoCommit(true); + } + + @Test + public void testTransactionRollback() throws SQLException { + connection.setAutoCommit(false); + Statement statement = null; + String query = testRecord2.toInsertQuery(); + try { + statement = connection.createStatement(); + statement.executeUpdate(query); + } finally { + closeQuietly(statement); + } + try { + statement = connection.createStatement(); + statement.executeUpdate(format("DELETE FROM %s WHERE __key='key1'", TABLE_NAME)); + } finally { + closeQuietly(statement); + } + connection.rollback(); + + ResultSet resultSet = null; + query = format("SELECT * FROM %s LIMIT 10", TABLE_NAME); + try { + statement = connection.createStatement(); + resultSet = statement.executeQuery(query); + assertTrue(resultSet.next()); + assertEquals(resultSet.getString(PRIMARY_KEY_COLUMN_NAME), "key1"); + assertFalse(resultSet.next()); + } finally { + closeQuietly(statement); + closeQuietly(resultSet); + } + connection.setAutoCommit(true); + } +} From 07d8c85b951c5890a5c17e83245b1dcb0123f318 Mon Sep 17 00:00:00 2001 From: yrizhkov Date: Thu, 5 Dec 2024 11:40:56 +0200 Subject: [PATCH 2/8] reduce network timeout in transaction test --- src/test/java/com/aerospike/jdbc/TransactionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/aerospike/jdbc/TransactionTest.java b/src/test/java/com/aerospike/jdbc/TransactionTest.java index f24c3ca..1ac2b7f 100644 --- a/src/test/java/com/aerospike/jdbc/TransactionTest.java +++ b/src/test/java/com/aerospike/jdbc/TransactionTest.java @@ -49,7 +49,7 @@ public static void connectionInit() throws Exception { String url = String.format("jdbc:aerospike:%s:%d/%s?sendKey=true&user=%s&password=%s&durableDelete=true", HOSTNAME, PORT, NAMESPACE, "", ""); connection = DriverManager.getConnection(url); - connection.setNetworkTimeout(Executors.newSingleThreadExecutor(), 50000); + connection.setNetworkTimeout(Executors.newSingleThreadExecutor(), 5000); } @AfterClass From a639590d4bbb21f5c860821fa4fc9ab7afa1c891 Mon Sep 17 00:00:00 2001 From: yrizhkov Date: Thu, 5 Dec 2024 18:03:24 +0200 Subject: [PATCH 3/8] update transaction isolation methods in connection --- src/main/java/com/aerospike/jdbc/AerospikeConnection.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java index 017f2fb..87e9ae1 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java @@ -189,15 +189,14 @@ public void setCatalog(String catalog) throws SQLException { @Override public int getTransactionIsolation() throws SQLException { checkClosed(); - return TRANSACTION_NONE; + return TRANSACTION_SERIALIZABLE; } @Override public void setTransactionIsolation(int level) throws SQLException { checkClosed(); - if (level != TRANSACTION_NONE) { - throw new SQLFeatureNotSupportedException(format("Aerospike does not support transactions," + - " so the only valid value here is TRANSACTION_NONE=%d", TRANSACTION_NONE)); + if (level != TRANSACTION_SERIALIZABLE) { + throw new SQLException(format("Unsupported transaction isolation level: %d", level)); } } From ed417032814c009a9e00462f76d20e73ae794b55 Mon Sep 17 00:00:00 2001 From: yrizhkov Date: Thu, 5 Dec 2024 19:00:59 +0200 Subject: [PATCH 4/8] commit any active transaction when switching to auto-commit mode --- .../com/aerospike/jdbc/AerospikeConnection.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java index 87e9ae1..d58e3fe 100644 --- a/src/main/java/com/aerospike/jdbc/AerospikeConnection.java +++ b/src/main/java/com/aerospike/jdbc/AerospikeConnection.java @@ -89,10 +89,14 @@ public boolean getAutoCommit() throws SQLException { @Override public void setAutoCommit(boolean autoCommit) throws SQLException { checkClosed(); - if (autoCommit) { - txn = null; + if (this.autoCommit == autoCommit) { + return; + } + if (!this.autoCommit) { + commit(); } this.autoCommit = autoCommit; + logger.fine(() -> format("setAutoCommit = %b", autoCommit)); } /** @@ -107,7 +111,8 @@ public void commit() throws SQLException { throw new SQLException("Connection is in auto-commit mode"); } if (txn == null) { - throw new SQLException("txn is null"); + logger.info("No active transaction to commit"); + return; } try { CommitStatus status = client.commit(txn); @@ -131,7 +136,8 @@ public void rollback() throws SQLException { throw new SQLException("Connection is in auto-commit mode"); } if (txn == null) { - throw new SQLException("txn is null"); + logger.info("No active transaction to rollback"); + return; } try { AbortStatus status = client.abort(txn); From 77521bd33930272c01b683705f702012a65caa95 Mon Sep 17 00:00:00 2001 From: Ronen Botzer Date: Fri, 6 Dec 2024 19:30:13 -0800 Subject: [PATCH 5/8] Update examples.md with information about transactions --- docs/examples.md | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/docs/examples.md b/docs/examples.md index 4eda708..52ef70c 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -325,3 +325,38 @@ CREATE INDEX port_idx ON port_list (port); ```sql DROP INDEX port_idx ON port_list; ``` + +## Transactions +**Note:** Wrapping multiple commands in a transaction requires Aerospike Database version 8.0+. + +[JDBC transactions](https://docs.oracle.com/javase/tutorial/jdbc/basics/transactions.html) are started by setting auto-commit to false, which acts as an implicit `BEGIN`. Every subsequent command is part of the transaction until a +commit or rollback are issued. A new transaction begins automatically after either is executed. Switching back to auto-commit will rollback an uncommitted transaction. + +In a data browser like DBeaver, the UI has buttons to control switching the auto-commit on and off, along with commit and rollback buttons. + +```sql +-- Switch to manual (begins the transaction) +SELECT * FROM port_list WHERE __key="ntp"; +UPDATE port_list SET port=124 where __key="ntp"; +UPDATE port_list SET port=162 where __key="snmp"; +--COMMIT + +--Switch to auto +SELECT * FROM port_list WHERE __key IN ("ntp", "snmp"); + +-- Switch to manual (begins the transaction) +UPDATE port_list SET port=123 where __key="ntp"; +UPDATE port_list SET port=161 where __key="snmp"; +--Rollback + +-- Switch to auto +SELECT * FROM port_list WHERE __key IN ("ntp", "snmp"); + +-- Switch to manual (begins the transaction) +UPDATE port_list SET port=123 where __key="ntp"; +UPDATE port_list SET port=161 where __key="snmp"; +--Commit + +-- Switch to auto +SELECT * FROM port_list WHERE __key IN ("ntp", "snmp"); +``` From 1cd12073417f4f88ad207910abee2f51b69b64a0 Mon Sep 17 00:00:00 2001 From: Ronen Botzer Date: Fri, 6 Dec 2024 21:04:15 -0800 Subject: [PATCH 6/8] Update README.md with transactions --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index b46e758..47a985e 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,7 @@ Packages documentation can be found [here](https://javadoc.io/doc/com.aerospike/ * TRUNCATE TABLE * CREATE INDEX * DROP INDEX +* Transactions See [examples](docs/examples.md) of SQL. From 8f2f1acf879d5a63349667ac7230d0c800aaba9f Mon Sep 17 00:00:00 2001 From: Ronen Botzer Date: Fri, 6 Dec 2024 21:19:01 -0800 Subject: [PATCH 7/8] Link to the SC documentation. --- docs/examples.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/examples.md b/docs/examples.md index 52ef70c..854eca6 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -327,7 +327,7 @@ DROP INDEX port_idx ON port_list; ``` ## Transactions -**Note:** Wrapping multiple commands in a transaction requires Aerospike Database version 8.0+. +**Note:** Wrapping multiple commands in a transaction requires Aerospike Database version 8.0+. Requires the namespace to be configured up with [`strong-consistency true`](https://aerospike.com/docs/server/operations/configure/consistency). [JDBC transactions](https://docs.oracle.com/javase/tutorial/jdbc/basics/transactions.html) are started by setting auto-commit to false, which acts as an implicit `BEGIN`. Every subsequent command is part of the transaction until a commit or rollback are issued. A new transaction begins automatically after either is executed. Switching back to auto-commit will rollback an uncommitted transaction. From d215f097fc63f886158d636ade155236d8b27f9d Mon Sep 17 00:00:00 2001 From: yrizhkov Date: Sun, 8 Dec 2024 10:17:46 +0200 Subject: [PATCH 8/8] update the Java client to version 9.0.2 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e901850..156fd36 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ 1.6 3.6.0 - 9.0.1 + 9.0.2 4.1.114.Final 2.18.1 1.38.0