Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-622 Add support for multi-record transactions #79

Merged
merged 8 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Packages documentation can be found [here](https://javadoc.io/doc/com.aerospike/
* TRUNCATE TABLE
* CREATE INDEX
* DROP INDEX
* Transactions

See [examples](docs/examples.md) of SQL.

Expand Down
35 changes: 35 additions & 0 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,38 @@ CREATE INDEX port_idx ON port_list (port);
```sql
DROP INDEX port_idx ON port_list;
```

## Transactions
**Note:** Wrapping multiple commands in a transaction requires Aerospike Database version 8.0+. Requires the namespace to be configured up with [`strong-consistency true`](https://aerospike.com/docs/server/operations/configure/consistency).

[JDBC transactions](https://docs.oracle.com/javase/tutorial/jdbc/basics/transactions.html) are started by setting auto-commit to false, which acts as an implicit `BEGIN`. Every subsequent command is part of the transaction until a
commit or rollback are issued. A new transaction begins automatically after either is executed. Switching back to auto-commit will rollback an uncommitted transaction.

In a data browser like DBeaver, the UI has buttons to control switching the auto-commit on and off, along with commit and rollback buttons.

```sql
-- Switch to manual (begins the transaction)
SELECT * FROM port_list WHERE __key="ntp";
UPDATE port_list SET port=124 where __key="ntp";
UPDATE port_list SET port=162 where __key="snmp";
--COMMIT

--Switch to auto
SELECT * FROM port_list WHERE __key IN ("ntp", "snmp");

-- Switch to manual (begins the transaction)
UPDATE port_list SET port=123 where __key="ntp";
UPDATE port_list SET port=161 where __key="snmp";
--Rollback

-- Switch to auto
SELECT * FROM port_list WHERE __key IN ("ntp", "snmp");

-- Switch to manual (begins the transaction)
UPDATE port_list SET port=123 where __key="ntp";
UPDATE port_list SET port=161 where __key="snmp";
--Commit

-- Switch to auto
SELECT * FROM port_list WHERE __key IN ("ntp", "snmp");
```
1 change: 1 addition & 0 deletions docs/params.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ Consider setting a custom value if really necessary.
| metadataCacheTtlSeconds | 3600 | Database metadata cache TTL in seconds |
| schemaBuilderMaxRecords | 1000 | The number of records to be used to build the table schema |
| showRecordMetadata | false | Add record metadata columns (__digest, __ttl, __gen) |
| txnTimeoutSeconds | 10 | Multi-record transaction timeout in seconds |
53 changes: 3 additions & 50 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
<maven-compiler-plugin.version>3.13.0</maven-compiler-plugin.version>
<maven-surefire-plugin.version>3.5.0</maven-surefire-plugin.version>
<nexus-staging-maven-plugin.version>1.6.14</nexus-staging-maven-plugin.version>
<maven-javadoc-plugin.version>2.9.1</maven-javadoc-plugin.version>
<maven-javadoc-plugin.version>3.11.1</maven-javadoc-plugin.version>
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
<maven-shade-plugin.version>3.6.0</maven-shade-plugin.version>

<aerospike-client.version>8.1.4</aerospike-client.version>
<aerospike-client.version>9.0.2</aerospike-client.version>
<netty.version>4.1.114.Final</netty.version>
<jackson.version>2.18.1</jackson.version>
<calcite.version>1.38.0</calcite.version>
Expand Down Expand Up @@ -152,52 +152,6 @@
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>${nexus-staging-maven-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>${maven-javadoc-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>${maven-source-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>${maven-gpg-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
</plugin>
</plugins>
</pluginManagement>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -255,8 +209,7 @@
<artifactId>maven-javadoc-plugin</artifactId>
<version>${maven-javadoc-plugin.version}</version>
<configuration>
<additionalparam>-Xdoclint:none</additionalparam>
<reportOutputDirectory>${basedir}</reportOutputDirectory>
<doclint>all,-missing</doclint>
<doctitle>A JDBC driver for the Aerospike database</doctitle>
<show>public</show>
<splitindex>true</splitindex>
Expand Down
82 changes: 74 additions & 8 deletions src/main/java/com/aerospike/jdbc/AerospikeConnection.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.aerospike.jdbc;

import com.aerospike.client.AbortStatus;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.CommitStatus;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Txn;
import com.aerospike.client.policy.Policy;
import com.aerospike.jdbc.model.DriverConfiguration;
import com.aerospike.jdbc.sql.SimpleWrapper;
Expand Down Expand Up @@ -42,6 +46,8 @@ public class AerospikeConnection implements Connection, SimpleWrapper {
private volatile Map<String, Class<?>> typeMap = emptyMap();
private volatile int holdability = HOLD_CURSORS_OVER_COMMIT;
private volatile boolean closed;
private boolean autoCommit = true;
private Txn txn;

public AerospikeConnection(String url, Properties props) {
logger.info("Init AerospikeConnection");
Expand Down Expand Up @@ -77,24 +83,70 @@ public String nativeSQL(String sql) throws SQLException {
@Override
public boolean getAutoCommit() throws SQLException {
checkClosed();
return true;
return autoCommit;
}

@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
checkClosed();
// no-op
if (this.autoCommit == autoCommit) {
return;
}
if (!this.autoCommit) {
commit();
}
this.autoCommit = autoCommit;
logger.fine(() -> format("setAutoCommit = %b", autoCommit));
}

/**
* Requires Aerospike Server 8.0+.
*
* @throws SQLException if the transaction commit fails.
*/
@Override
public void commit() throws SQLException {
checkClosed();
// no-op
if (autoCommit) {
throw new SQLException("Connection is in auto-commit mode");
}
if (txn == null) {
logger.info("No active transaction to commit");
return;
}
try {
CommitStatus status = client.commit(txn);
logger.info(() -> format("MRT %d commit status: %s", txn.getId(), status));
} catch (AerospikeException e) {
throw new SQLException(e);
} finally {
txn = null;
}
}

/**
* Requires Aerospike Server 8.0+.
*
* @throws SQLException if the transaction rollback fails.
*/
@Override
public void rollback() throws SQLException {
throw new SQLFeatureNotSupportedException("rollback is not supported");
checkClosed();
if (autoCommit) {
throw new SQLException("Connection is in auto-commit mode");
}
if (txn == null) {
logger.info("No active transaction to rollback");
return;
}
try {
AbortStatus status = client.abort(txn);
logger.info(() -> format("MRT %d rollback status: %s", txn.getId(), status));
} catch (AerospikeException e) {
throw new SQLException(e);
} finally {
txn = null;
}
}

@Override
Expand Down Expand Up @@ -143,15 +195,14 @@ public void setCatalog(String catalog) throws SQLException {
@Override
public int getTransactionIsolation() throws SQLException {
checkClosed();
return TRANSACTION_NONE;
return TRANSACTION_SERIALIZABLE;
}

@Override
public void setTransactionIsolation(int level) throws SQLException {
checkClosed();
if (level != TRANSACTION_NONE) {
throw new SQLFeatureNotSupportedException(format("Aerospike does not support transactions," +
" so the only valid value here is TRANSACTION_NONE=%d", TRANSACTION_NONE));
if (level != TRANSACTION_SERIALIZABLE) {
throw new SQLException(format("Unsupported transaction isolation level: %d", level));
}
}

Expand All @@ -168,11 +219,13 @@ public void clearWarnings() throws SQLException {
}

@Override
@SuppressWarnings("MagicConstant")
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return createStatement(resultSetType, resultSetConcurrency, holdability);
}

@Override
@SuppressWarnings("MagicConstant")
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
throws SQLException {
return prepareStatement(sql, resultSetType, resultSetConcurrency, holdability);
Expand Down Expand Up @@ -239,6 +292,7 @@ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
throws SQLException {
checkClosed();
checkTxn();
validateResultSetParameters(resultSetType, resultSetConcurrency, resultSetHoldability);
return new AerospikeStatement(client, this);
}
Expand All @@ -247,10 +301,18 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency, in
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
checkClosed();
checkTxn();
validateResultSetParameters(resultSetType, resultSetConcurrency, resultSetHoldability);
return new AerospikePreparedStatement(client, this, sql);
}

private void checkTxn() {
if (!autoCommit && txn == null) {
txn = new Txn();
txn.setTimeout(config.getDriverPolicy().getTxnTimeoutSeconds());
}
}

private void validateResultSetParameters(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
throws SQLException {
if (resultSetType != TYPE_FORWARD_ONLY) {
Expand Down Expand Up @@ -428,4 +490,8 @@ public AerospikeVersion getAerospikeVersion() {
public IAerospikeClient getClient() {
return client;
}

public Txn getTxn() {
return txn;
}
}
11 changes: 6 additions & 5 deletions src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static java.sql.Connection.TRANSACTION_NONE;
import static java.sql.Connection.TRANSACTION_SERIALIZABLE;
import static java.sql.Types.*;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -340,7 +341,7 @@ public boolean supportsMultipleResultSets() {

@Override
public boolean supportsMultipleTransactions() {
return false;
return true;
}

@Override
Expand Down Expand Up @@ -655,17 +656,17 @@ public int getMaxUserNameLength() {

@Override
public int getDefaultTransactionIsolation() {
return TRANSACTION_NONE;
return TRANSACTION_SERIALIZABLE;
}

@Override
public boolean supportsTransactions() {
return false;
return true;
}

@Override
public boolean supportsTransactionIsolationLevel(int level) {
return TRANSACTION_NONE == level;
return TRANSACTION_NONE == level || TRANSACTION_SERIALIZABLE == level;
}

@Override
Expand All @@ -675,7 +676,7 @@ public boolean supportsDataDefinitionAndDataManipulationTransactions() {

@Override
public boolean supportsDataManipulationTransactionsOnly() {
return false;
return true;
}

@Override
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/aerospike/jdbc/AerospikeStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ protected AerospikeQuery parseQuery(String sql, List<Object> sqlParameters) thro
if (query.getCatalog() == null) {
query.setCatalog(catalog);
}
query.setTxn(connection.getTxn());
return query;
}

Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.aerospike.jdbc.model;

import com.aerospike.client.Txn;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.exp.Expression;
import com.aerospike.jdbc.predicate.QueryPredicate;
Expand Down Expand Up @@ -43,6 +44,8 @@ public class AerospikeQuery {
private List<Object> values;
private List<String> columns;

private Txn txn;

public AerospikeQuery() {
this.queryType = QueryType.UNKNOWN;
}
Expand Down Expand Up @@ -147,6 +150,14 @@ public void setColumns(List<String> columns) {
this.columns = columns;
}

public Txn getTxn() {
return txn;
}

public void setTxn(Txn txn) {
this.txn = txn;
}

public String[] columnBins() {
String[] binNames = columns.stream()
.filter(c -> !Objects.equals(c, ASTERISK))
Expand Down
Loading
Loading