Skip to content

Commit

Permalink
flux/kv::isEmpty + doc improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
afri committed Sep 1, 2023
1 parent 5356c0d commit 9f41c46
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 7 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ This changelog lists the most prominent, developer-visible changes in each relea


* New functionality

* Added `mho:flux/kv::isEmpty`.

* `mho:flux/kv::Range` now supports 'branch' ranges.

* Added 'sjs:big' module, tracking 'big.js' arbitrary precision math library.
Expand Down
85 changes: 80 additions & 5 deletions modules/flux/kv.sjs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,16 @@ __js function query(store, range, options) {
}
exports.query = query;

/**
@function isEmpty
@param {::KVStore} [kvstore]
@summary Returns `true` if `kvstore` doesn't contain any key-value pairs, `false` otherwise
*/
function isEmpty(store) {
return store .. query(RANGE_ALL, {keys:false, values:false, limit:1}) .. @first(undefined) === undefined;
}
exports.isEmpty = isEmpty;

/**
@function clearRange
@param {::KVStore} [kvstore]
Expand Down Expand Up @@ -306,7 +316,7 @@ exports.observeQuery = observeQuery;

/**
@function withTransaction
@altsyntax kvstore .. withTransaction { |transaction| ... }
@altsyntax kvstore .. withTransaction(function(transaction) {... })
@param {::KVStore} [kvstore]
@param {Function} [block] Function that will be passed a transaction object (a [::KVStore]).
@summary Run code in `block` in a transaction.
Expand All @@ -318,20 +328,20 @@ exports.observeQuery = observeQuery;
During the execution of `block`, withTransaction will check if there are
any conflicts: If any of the keys read or written to within the
transaction are being concurrently modified from outside of the
transaction, `block` will be aborted and called again; indefinitely until no
conflicts are detected.
transaction, `block` will be immediately aborted and called again; indefinitely
until no conflicts are detected.
Transactions will be aborted, and `withTransaction` will return immediately,
if `block` throws an exception or (if block is
a blocklambda) exits via a blocklambda return or blocklambda break.
When `block` exits normally (i.e. not
by throwing an exception, via a blocklambda return or blocklambda break),
by throwing an exception, and not via a blocklambda return or blocklambda break),
`withTransaction` will again check for conflicts while obtaining a global
lock on the db. If a conflict is detected, `block` will be rerun. Otherwise
all mutations performed in `block`
will be applied to the underlying database and `withTransaction`
returns.
returns the return value of `block`
After a completed successful `withTransaction` call
returns, all reads and writes will have been performed atomically,
Expand All @@ -348,6 +358,71 @@ exports.observeQuery = observeQuery;
performed in the transactional context will reflect any prior and
future mutations applied in the same transaction (before they are
committed to the database).
### Caveats of blocklambda controlflow
In general it is a bad idea to use blocklambdas as `block`. Normal functions
are preferred.
This is because controlflow such as blocklambda breaks or
returns will prevent any mutations in a transactions from becoming materialized in
the db. E.g. in the following transaction, the new account balances will **NOT** be
written to the db, (even though the updated value is returned from the blocklambda
return):
// INCORRECT BLOCKLAMBDA USAGE
// transfer $x from a to b and return balance b:
function transfer(x) {
db .. withTransaction {
|tx|
var a = tx .. read('accountA');
var b = tx .. read('accountB');
// transfer $x from a->b:
tx .. write('accountA', a-x);
tx .. write('accountB', b+x);
return tx .. read('accountB'); // blocklambda return ABORTS THE TRANSACTION!!!
}
}
A version that uses a normal function works fine:
// CORRECT VERSION
// transfer $x from a to b and return balance b:
function transfer(x) {
return db .. withTransaction(function(tx) {
var a = tx .. read('accountA');
var b = tx .. read('accountB');
// transfer $x from a->b:
tx .. write('accountA', a-x);
tx .. write('accountB', b+x);
return tx .. read('accountB');
});
}
### Using transactions solely for consistent reads
If a transaction is **solely being used to read data**, any relevant concurrent mutations
will cause the block to be aborted immediately and rerun.
While it is arguably bad form, for these types of transactions it is technically ok to
use blocklambdas & bail early from `block` using blocklambda controlflow.
E.g.:
db .. withTransaction {
|tx|
var a = tx .. read('accountA');
var b = tx .. read('accountB');
return a+b; // it's ok to bail here, as we're not writing any data
}
If there are concurrent mutations to the db, the block might abort during
either of the reads and re-run.
The 'return' line will only be reached once the values of 'a' and 'b' are
consistent with each other (i.e. there aren't any mutations of 'a' between
reading 'a' and reading 'b').
*/
__js function withTransaction(store, options, block) {
if (arguments.length === 2) {
Expand Down
11 changes: 10 additions & 1 deletion modules/flux/kv/wrap.sjs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
// TODO move all the @encoding stuff into itf
function wrapDB(base) {

/*
Even though leveldb allows us to write in atomic batches, we need
to sequentialize writing those batches with a mutex to ensure
transaction isolation.
*/
var MutationMutex = @Semaphore(1, true);

__js function kv_query(range, options) {
Expand Down Expand Up @@ -172,7 +177,10 @@ function wrapDB(base) {
var reverse = options.reverse;

// query and patches are streams of [k, v] pairs:
var query = kv_query(range, options);

// we have to make sure that the query return values includes keys,
// so that we can correlate with patches.
var query = kv_query(range, options .. @override({keys: true}));
var patches = pendingPuts .. @ownValues .. @sort((a,b) ->
reverse ?
@encoding.encodedKeyCompare(b[0],a[0]) :
Expand Down Expand Up @@ -277,6 +285,7 @@ function wrapDB(base) {
or {
var rv = block(T);
__js var ops = pendingPuts .. @ownValues .. @map([key,value] -> { type: value === undefined ? 'del' : 'put', key: key, value: value});
if (ops.length === 0) return rv; // nothing to do.
MutationMutex.acquire();
collapse;
try {
Expand Down
3 changes: 3 additions & 0 deletions modules/sjs-lib-index.json
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@
"get": {
"type": "function"
},
"isEmpty": {
"type": "function"
},
"isNotFound": {
"type": "function"
},
Expand Down
4 changes: 4 additions & 0 deletions test/unit/flux/kv-tests.sjs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,14 @@ function test_large_key(db) {
}

function test_clear(db) {
db .. @kv.clearRange(@kv.RANGE_ALL);
db .. @kv.isEmpty() .. @assert.ok();
db .. @kv.set('foo', 'bar');
db .. @kv.get('foo') .. @assert.eq('bar');
db .. @kv.isEmpty() .. @assert.notOk();
db .. @kv.clear('foo');
db .. @kv.get('foo', undefined) .. @assert.eq(undefined);
db .. @kv.isEmpty() .. @assert.ok();
}

function test_get(db) {
Expand Down

0 comments on commit 9f41c46

Please sign in to comment.