Skip to content

Commit

Permalink
Added Multi-Record Transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
DomPeliniAerospike committed Dec 19, 2024
1 parent 5d43d80 commit 7d3e7dd
Show file tree
Hide file tree
Showing 40 changed files with 3,015 additions and 578 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ To run all the test cases:

npm test

To run a specific tests, use:

npm test --testfile=filename.js

Note: make sure your server has TTL enabled for the `test` namespace ([Namespace Retention Configuration](https://docs.aerospike.com/server/operations/configure/namespace/retention)) to allow all tests to run correctly.

To run the tests and also report on test coverage:
Expand Down
2 changes: 1 addition & 1 deletion aerospike-client-c
Submodule aerospike-client-c updated 94 files
+3 −0 Makefile
+2 −2 README.md
+1 −0 examples/async_examples/Makefile
+3 −3 examples/async_examples/async_batch_get/src/main/example.c
+2 −2 examples/async_examples/async_delay_queue/src/main/example.c
+4 −0 examples/async_examples/async_transaction/Makefile
+329 −0 examples/async_examples/async_transaction/src/main/example.c
+1 −0 examples/basic_examples/Makefile
+2 −2 examples/basic_examples/append/src/main/example.c
+2 −2 examples/basic_examples/incr/src/main/example.c
+4 −0 examples/basic_examples/transaction/Makefile
+144 −0 examples/basic_examples/transaction/src/main/example.c
+3 −3 examples/batch_examples/get/src/main/example.c
+ project/aerospike_logo.png
+12 −12 project/doxyfile
+116 −0 project/doxygen-awesome-sidebar-only.css
+2,681 −0 project/doxygen-awesome.css
+84 −0 project/header.html
+0 −0 project/layout.xml
+2 −0 project/test.mk
+0 −14 src/apidocs/footer.html
+0 −47 src/apidocs/header.html
+0 −716 src/apidocs/html/aerospike.css
+0 −563 src/apidocs/html/style.css
+0 −1,204 src/apidocs/old.css
+6 −6 src/include/aerospike/aerospike.h
+70 −30 src/include/aerospike/aerospike_batch.h
+93 −6 src/include/aerospike/aerospike_key.h
+6 −6 src/include/aerospike/aerospike_stats.h
+227 −0 src/include/aerospike/aerospike_txn.h
+3 −3 src/include/aerospike/as_admin.h
+21 −8 src/include/aerospike/as_async.h
+1 −1 src/include/aerospike/as_batch.h
+23 −12 src/include/aerospike/as_cluster.h
+124 −21 src/include/aerospike/as_command.h
+5 −5 src/include/aerospike/as_config.h
+2 −2 src/include/aerospike/as_error.h
+21 −16 src/include/aerospike/as_event.h
+27 −2 src/include/aerospike/as_event_internal.h
+3 −3 src/include/aerospike/as_exp.h
+1 −1 src/include/aerospike/as_latency.h
+11 −11 src/include/aerospike/as_node.h
+1 −1 src/include/aerospike/as_operations.h
+2 −1 src/include/aerospike/as_peers.h
+184 −45 src/include/aerospike/as_policy.h
+3 −3 src/include/aerospike/as_proto.h
+7 −1 src/include/aerospike/as_query.h
+2 −2 src/include/aerospike/as_record.h
+2 −2 src/include/aerospike/as_socket.h
+62 −21 src/include/aerospike/as_status.h
+308 −0 src/include/aerospike/as_txn.h
+95 −0 src/include/aerospike/as_txn_monitor.h
+1 −1 src/include/aerospike/version.h
+1,455 −287 src/main/aerospike/aerospike_batch.c
+1,093 −249 src/main/aerospike/aerospike_key.c
+13 −5 src/main/aerospike/aerospike_query.c
+7 −4 src/main/aerospike/aerospike_scan.c
+740 −0 src/main/aerospike/aerospike_txn.c
+9 −9 src/main/aerospike/as_cluster.c
+200 −31 src/main/aerospike/as_command.c
+12 −4 src/main/aerospike/as_error.c
+176 −16 src/main/aerospike/as_event.c
+1 −1 src/main/aerospike/as_event_uv.c
+27 −19 src/main/aerospike/as_info.c
+2 −2 src/main/aerospike/as_metrics_writer.c
+3 −3 src/main/aerospike/as_node.c
+94 −40 src/main/aerospike/as_peers.c
+8 −4 src/main/aerospike/as_policy.c
+412 −0 src/main/aerospike/as_txn.c
+349 −0 src/main/aerospike/as_txn_monitor.c
+1 −1 src/main/aerospike/version.c
+2 −2 src/test/aerospike_query/query_background.c
+31 −14 src/test/aerospike_test.c
+924 −0 src/test/transaction.c
+1,157 −0 src/test/transaction_async.c
+2 −2 src/test/util/map_rec.c
+2 −2 src/test/util/test_aerospike.c
+1 −1 vs/aerospike-client-c-libevent.nuspec
+1 −1 vs/aerospike-client-c-libuv.nuspec
+1 −1 vs/aerospike-client-c.nuspec
+2 −0 vs/aerospike-test/aerospike-test.vcxproj
+6 −0 vs/aerospike-test/aerospike-test.vcxproj.filters
+54 −0 vs/aerospike.sln
+6 −0 vs/aerospike/aerospike.vcxproj
+18 −0 vs/aerospike/aerospike.vcxproj.filters
+248 −0 vs/examples/async-transaction/async-transaction.vcxproj
+33 −0 vs/examples/async-transaction/async-transaction.vcxproj.filters
+4 −0 vs/examples/async-transaction/packages.config
+4 −0 vs/examples/transaction/packages.config
+248 −0 vs/examples/transaction/transaction.vcxproj
+33 −0 vs/examples/transaction/transaction.vcxproj.filters
+8 −0 xcode/aerospike-test.xcodeproj/project.pbxproj
+24 −0 xcode/aerospike.xcodeproj/project.pbxproj
+24 −0 xcode/examples.xcodeproj/project.pbxproj
7 changes: 7 additions & 0 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
'sources': [
'src/main/aerospike.cc',
'src/main/client.cc',
'src/main/transaction.cc',
'src/main/config.cc',
'src/main/events.cc',
'src/main/cdt_ctx.cc',
Expand Down Expand Up @@ -125,11 +126,17 @@
'src/main/commands/scan_background.cc',
'src/main/commands/scan_pages.cc',
'src/main/commands/select_async.cc',
'src/main/commands/transaction_abort.cc',
'src/main/commands/transaction_commit.cc',
'src/main/commands/truncate.cc',
'src/main/commands/user_create.cc',
'src/main/commands/user_drop.cc',
'src/main/commands/udf_register.cc',
'src/main/commands/udf_remove.cc',
'src/main/enums/abort_status.cc',
'src/main/enums/commit_status.cc',
'src/main/enums/txn_state.cc',
'src/main/enums/txn_capacity.cc',
'src/main/enums/predicates.cc',
'src/main/enums/bitwise_enum.cc',
'src/main/enums/hll_enum.cc',
Expand Down
28 changes: 28 additions & 0 deletions lib/abort_status.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// *****************************************************************************
// Copyright 2024 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the 'License')
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an 'AS IS' BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// *****************************************************************************

'use strict'

const as = require('bindings')('aerospike.node')
const abortStatus = as.abortStatus

module.exports = {
OK: abortStatus.OK,
ALREADY_COMMITTED: abortStatus.ALREADY_COMMITTED,
ALREADY_ABORTED: abortStatus.ALREADY_ABORTED,
ROLL_BACK_ABANDONED: abortStatus.MARK_ROLL_FORWARD_ABANDONED,
CLOSE_ABANDONED: abortStatus.CLOSE_ABANDONED
}
53 changes: 53 additions & 0 deletions lib/aerospike.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
const as = require('bindings')('aerospike.node')
const AerospikeError = require('./error')
const EventLoop = require('./event_loop')
const TransactionPool = require('./transaction_pool')

const _transactionPool = new TransactionPool()

exports._transactionPool = _transactionPool
/**
* @module aerospike
*
Expand Down Expand Up @@ -238,6 +242,39 @@ exports.Config = require('./config')
*/
exports.Double = require('./double')

/**
* Multi-record transaction (MRT) class. Each command in the MRT must use the same namespace.
*
* note: By default, open transactions are destroyed when the final client in a process is closed.
* If you need your transaction to persist after the last client has been closed, provide `false` for the
* destroy Transactions argument in {@link Client#close} (see example below).
*
* @example
*
* const Aerospike = require('aerospike')
*
* // INSERT HOSTNAME AND PORT NUMBER OF AEROSPIKE SERVER NODE HERE!
* var config = {
* hosts: '192.168.33.10:3000',
* }
* Aerospike.connect(config)
* .then(client => {
* // client is ready to accept commands
* console.log("Connected. Now Closing Connection.")
* client.close()
* })
* .catch(error => {
* client.close(
* false, // do not release the event loop
* false // do not destroy open transactions
* )
* console.error('Failed to connect to cluster: %s', error.message)
* })
*
* @summary {@link Transaction} class
*/
exports.Transaction = require('./transaction')

/**
* Representation of a GeoJSON value. Since GeoJSON values are JSON objects
* they need to be wrapped in the {@link GeoJSON} class so that the client can
Expand Down Expand Up @@ -701,6 +738,22 @@ exports.setupGlobalCommandQueue = function (policy) {
*/
exports.batchType = require('./batch_type')

/**
* The {@link module:aerospike/commit_status|aerospike/commit_status}
* module contains a list of commit statuses.
*
* @summary {@link module:aerospike/commit_status|aerospike/commit_status} module
*/
exports.commitStatus = require('./commit_status')

/**
* The {@link module:aerospike/abortStatus|aerospike/abort_status}
* module contains a list of abort statuses.
*
* @summary {@link module:aerospike/abortStatus|aerospike/abort_status} module
*/
exports.abortStatus = require('./abort_status')

/**
* The {@link module:aerospike/privilegeCode|aerospike/privilege_code}
* module is comprised of permission codes which define the type of
Expand Down
47 changes: 45 additions & 2 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ const EventEmitter = require('events')

const as = require('bindings')('aerospike.node')
const AerospikeError = require('./error')
const abortStatus = require('./abort_status')
const commitStatus = require('./commit_status')
const txnState = require('./txn_state')
const Transaction = require('./transaction')
const Context = require('./cdt_context')
const Commands = require('./commands')
const Config = require('./config')
Expand All @@ -35,6 +39,7 @@ const utils = require('./utils')

// number of client instances currently connected to any Aerospike cluster
let _connectedClients = 0
const { _transactionPool } = require('./aerospike')

// callback function for cluster events (node added/removed, etc.)
function eventsCallback (event) {
Expand Down Expand Up @@ -211,6 +216,40 @@ Client.prototype.getNodes = function () {
return this.as_client.getNodes()
}

Client.prototype.abort = function (transaction, callback) {
_transactionPool.tendTransactions()
if (transaction instanceof Transaction) {
if (transaction.getState() === txnState.COMMITTED) {
return abortStatus.ALREADY_COMMITTED
} else if (transaction.getState() === txnState.ABORTED) {
return abortStatus.ALREADY_ABORTED
} else if (transaction.getDestroyed() === true) {
throw new AerospikeError('The object has been destroyed, please create a new transaction.')
}
} else {
throw new AerospikeError('transaction must be an instance of class Transaction.')
}
const cmd = new Commands.TransactionAbort(this, [transaction.transaction], callback)
return cmd.execute()
}

Client.prototype.commit = function (transaction, callback) {
_transactionPool.tendTransactions()
if (transaction instanceof Transaction) {
if (transaction.getState() === txnState.COMMITTED) {
return commitStatus.ALREADY_COMMITTED
} else if (transaction.getState() === txnState.ABORTED) {
return commitStatus.ALREADY_ABORTED
} else if (transaction.getDestroyed() === true) {
throw new AerospikeError('The object has been destroyed, please create a new transaction.')
}
} else {
throw new AerospikeError('transaction must be an instance of class Transaction.')
}
const cmd = new Commands.TransactionCommit(this, [transaction.transaction], callback)
return cmd.execute()
}

/**
* @function Client#contextToBase64
*
Expand Down Expand Up @@ -1711,7 +1750,8 @@ Client.prototype.batchSelect = function (keys, bins, policy, callback) {
*
* @summary Closes the client connection to the cluster.
*
* @param {boolean} [releaseEventLoop=false] - Whether to release the event loop handle after the client is closed.
* @param {boolean} [releaseEventLoop=false] - Whether to release the event loop handle after the last client is closed.
* @param {boolean} [destroyTransactions=true] - Whether to destroy any open transactions after the last client is closed.
*
* @see module:aerospike.releaseEventLoop
*
Expand All @@ -1734,13 +1774,16 @@ Client.prototype.batchSelect = function (keys, bins, policy, callback) {
* console.error('Failed to connect to cluster: %s', error.message)
* })
*/
Client.prototype.close = function (releaseEventLoop = false) {
Client.prototype.close = function (releaseEventLoop = false, destroyTransactions = true) {
if (this.isConnected(false)) {
this.connected = false
this.as_client.close()
_connectedClients -= 1
}
if (_connectedClients === 0) {
if (destroyTransactions) {
_transactionPool.removeAllTransactions()
}
if (releaseEventLoop) {
EventLoop.releaseEventLoop()
} else {
Expand Down
2 changes: 2 additions & 0 deletions lib/commands/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ exports.ScanPages = class ScanPagesCommand extends StreamCommand('scanPages') {
exports.ScanBackground = class ScanBackgroundCommand extends QueryBackgroundBaseCommand('scanBackground') { }
exports.ScanOperate = class ScanOperateCommand extends QueryBackgroundBaseCommand('scanBackground') { }
exports.Select = class SelectCommand extends ReadRecordCommand('selectAsync') { }
exports.TransactionAbort = class TransactionAbortCommand extends Command('transactionAbort') { }
exports.TransactionCommit = class TransactionCommitCommand extends Command('transactionCommit') { }
exports.Truncate = class TruncateCommand extends Command('truncate') { }
exports.UdfRegister = class UdfRegisterCommand extends Command('udfRegister') { }
exports.UdfRemove = class UdfRemoveCommand extends Command('udfRemove') { }
Expand Down
30 changes: 30 additions & 0 deletions lib/commit_status.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// *****************************************************************************
// Copyright 2024 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the 'License')
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an 'AS IS' BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// *****************************************************************************

'use strict'

const as = require('bindings')('aerospike.node')
const commitStatus = as.commitStatus

module.exports = {
OK: commitStatus.OK,
ALREADY_COMMITTED: commitStatus.ALREADY_COMMITTED,
ALREADY_ABORTED: commitStatus.ALREADY_ABORTED,
VERIFY_FAILED: commitStatus.VERIFY_FAILED,
MARK_ROLL_FORWARD_ABANDONED: commitStatus.MARK_ROLL_FORWARD_ABANDONED,
ROLL_FORWARD_ABANDONED: commitStatus.ROLL_FORWARD_ABANDONED,
CLOSE_ABANDONED: commitStatus.CLOSE_ABANDONED
}
2 changes: 2 additions & 0 deletions lib/policies/base_policy.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ class BasePolicy {
* @since v3.14.0
*/
this.compress = props.compress

this.txn = props.txn
}
}

Expand Down
36 changes: 36 additions & 0 deletions lib/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ const as = require('bindings')('aerospike.node')
* @description Database operation error codes.
*/

/**
* Multi-record transaction failed.
* @const {number}
*/
exports.TXN_FAILED = exports.AEROSPIKE_TXN_FAILED = as.status.AEROSPIKE_TXN_FAILED

/**
* One or more keys failed in a batch.
* @const {number}
Expand Down Expand Up @@ -303,6 +309,8 @@ exports.FILTERED_OUT = exports.AEROSPIKE_FILTERED_OUT = as.status.AEROSPIKE_FILT
*/
exports.LOST_CONFLICT = exports.AEROSPIKE_LOST_CONFLICT = as.status.AEROSPIKE_LOST_CONFLICT

exports.XDR_KEY_BUSY = exports.AEROSPIKE_XDR_KEY_BUSY = as.status.AEROSPIKE_XDR_KEY_BUSY

/**
* There are no more records left for query.
* @const {number}
Expand Down Expand Up @@ -441,6 +449,13 @@ exports.ROLE_VIOLATION = exports.AEROSPIKE_ROLE_VIOLATION = as.status.AEROSPIKE_
*/
exports.ERR_UDF = exports.AEROSPIKE_ERR_UDF = as.status.AEROSPIKE_ERR_UDF

exports.MRT_BLOCKED = exports.AEROSPIKE_MRT_BLOCKED = as.status.AEROSPIKE_MRT_BLOCKED
exports.MRT_VERSION_MISMATCH = exports.AEROSPIKE_MRT_VERSION_MISMATCH = as.status.AEROSPIKE_MRT_VERSION_MISMATCH
exports.MRT_EXPIRED = exports.AEROSPIKE_MRT_EXPIRED = as.status.AEROSPIKE_MRT_EXPIRED
exports.MRT_TOO_MANY_WRITES = exports.AEROSPIKE_MRT_TOO_MANY_WRITES = as.status.AEROSPIKE_MRT_TOO_MANY_WRITES
exports.MRT_COMMITTED = exports.AEROSPIKE_MRT_COMMITTED = as.status.AEROSPIKE_MRT_COMMITTED
exports.MRT_ABORTED = exports.AEROSPIKE_MRT_ABORTED = as.status.AEROSPIKE_MRT_ABORTED

/**
* Batch functionality has been disabled.
* @const {number}
Expand Down Expand Up @@ -553,6 +568,9 @@ exports.ERR_LUA_FILE_NOT_FOUND = exports.AEROSPIKE_ERR_LUA_FILE_NOT_FOUND = as.s
exports.getMessage = function (code) {
/* istanbul ignore next */
switch (code) {
case exports.TXN_FAILED:
return 'Multi-record transaction failed.'

case exports.ERR_INVALID_NODE:
return 'Node invalid or could not be found.'

Expand Down Expand Up @@ -727,6 +745,24 @@ exports.getMessage = function (code) {
case exports.ERR_UDF:
return 'Generic UDF error.'

case exports.MRT_BLOCKED:
return 'MRT record blocked by a different transaction.'

case exports.MRT_VERSION_MISMATCH:
return 'MRT read version mismatch identified during commit. Some other command changed the record outside of the transaction.'

case exports.MRT_EXPIRED:
return 'MRT deadline reached without a successful commit or abort.'

case exports.MRT_TOO_MANY_WRITES:
return 'MRT write command limit (4096) exceeded.'

case exports.MRT_COMMITTED:
return 'MRT was already committed.'

case exports.MRT_ABORTED:
return 'MRT was already aborted.'

case exports.ERR_BATCH_DISABLED:
return 'Batch functionality has been disabled.'

Expand Down
Loading

0 comments on commit 7d3e7dd

Please sign in to comment.