From 4a6a051784685de16b9deccb36c2abc1eef266ac Mon Sep 17 00:00:00 2001 From: poorna Date: Sat, 19 Dec 2015 00:20:56 -0800 Subject: [PATCH] TEPHRA-157 Visibility guarentees using transactions --- .../tephra/visibility/DefaultFenceWait.java | 61 +++ .../co/cask/tephra/visibility/FenceWait.java | 44 +++ .../co/cask/tephra/visibility/ReadFence.java | 77 ++++ .../tephra/visibility/VisibilityFence.java | 138 +++++++ .../co/cask/tephra/visibility/WriteFence.java | 85 +++++ .../visibility/VisibilityFenceTest.java | 349 ++++++++++++++++++ 6 files changed, 754 insertions(+) create mode 100644 tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java create mode 100644 tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java create mode 100644 tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java create mode 100644 tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java create mode 100644 tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java create mode 100644 tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java b/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java new file mode 100644 index 00000000..6c45f969 --- /dev/null +++ b/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2016 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.tephra.visibility; + +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionFailureException; +import com.google.common.base.Stopwatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Default implementation of {@link FenceWait}. + */ +public class DefaultFenceWait implements FenceWait { + private static final Logger LOG = LoggerFactory.getLogger(DefaultFenceWait.class); + + private final TransactionContext txContext; + + DefaultFenceWait(TransactionContext txContext) { + this.txContext = txContext; + } + + @Override + public void await(long timeout, TimeUnit timeUnit) + throws TransactionFailureException, InterruptedException, TimeoutException { + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + long sleepTimeMicros = timeUnit.toMicros(timeout) / 10; + // Have sleep time to be within 1 microsecond and 500 milliseconds + sleepTimeMicros = Math.max(Math.min(sleepTimeMicros, 500 * 1000), 1); + while (stopwatch.elapsedTime(timeUnit) < timeout) { + txContext.start(); + try { + txContext.finish(); + return; + } catch (TransactionFailureException e) { + LOG.error("Got exception waiting for fence. Sleeping for {} microseconds", sleepTimeMicros, e); + txContext.abort(); + TimeUnit.MICROSECONDS.sleep(sleepTimeMicros); + } + } + throw new TimeoutException("Timeout waiting for fence"); + } +} diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java b/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java new file mode 100644 index 00000000..2b77b184 --- /dev/null +++ b/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java @@ -0,0 +1,44 @@ +/* + * Copyright © 2016 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.tephra.visibility; + +import co.cask.tephra.TransactionFailureException; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions. + */ +public interface FenceWait { + /** + * Waits until the fence is complete, or till the timeout specified. The fence wait transaction will get re-tried + * several times until the timeout. + *

+ * + * If a fence wait times out then it means there are still some readers with in-progress transactions that have not + * seen the change. In this case the wait will have to be retried using the same FenceWait object. + * + * @param timeout Maximum time to wait + * @param timeUnit {@link TimeUnit} for timeout and sleepTime + * @throws TransactionFailureException when not able to start fence wait transaction + * @throws InterruptedException on any interrupt + * @throws TimeoutException when timeout is reached + */ + void await(long timeout, TimeUnit timeUnit) + throws TransactionFailureException, InterruptedException, TimeoutException; +} diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java new file mode 100644 index 00000000..38abe4a8 --- /dev/null +++ b/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java @@ -0,0 +1,77 @@ +/* + * Copyright © 2015 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.tephra.visibility; + +import co.cask.tephra.Transaction; +import co.cask.tephra.TransactionAware; +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Longs; + +import java.util.Collection; +import java.util.Collections; + +/** + * Implementation of {@link VisibilityFence} used by reader. + */ +class ReadFence implements TransactionAware { + private final byte[] fenceId; + private Transaction tx; + + public ReadFence(byte[] fenceId) { + this.fenceId = fenceId; + } + + @Override + public void startTx(Transaction tx) { + this.tx = tx; + } + + @Override + public void updateTx(Transaction tx) { + // Fences only need original transaction + } + + @Override + public Collection getTxChanges() { + if (tx == null) { + throw new IllegalStateException("Transaction has not started yet"); + } + return Collections.singleton(Bytes.concat(fenceId, Longs.toByteArray(tx.getTransactionId()))); + } + + @Override + public boolean commitTx() throws Exception { + // Nothing to persist + return true; + } + + @Override + public void postTxCommit() { + tx = null; + } + + @Override + public boolean rollbackTx() throws Exception { + // Nothing to rollback + return true; + } + + @Override + public String getTransactionAwareName() { + return getClass().getSimpleName(); + } +} diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java new file mode 100644 index 00000000..7b8a1629 --- /dev/null +++ b/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java @@ -0,0 +1,138 @@ +/* + * Copyright © 2015 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.tephra.visibility; + +import co.cask.tephra.TransactionAware; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionFailureException; +import co.cask.tephra.TransactionSystemClient; + +import java.util.concurrent.TimeoutException; + +/** + * VisibilityFence is used to ensure that after a given point in time, all readers see an updated change + * that got committed. + *

+ * + * Typically a reader will never conflict with a writer, since a reader only sees committed changes when its + * transaction started. However to ensure that after a given point all readers are aware of a change, + * we have to introduce a conflict between a reader and a writer that act on the same data concurrently. + *

+ * + * This is done by the reader indicating that it is interested in changes to a piece of data by using a fence + * in its transaction. If there are no changes to the data when reader tries to commit the transaction + * containing the fence, the commit succeeds. + *

+ * + * On the other hand, a writer updates the same data in a transaction. After the write transaction is committed, + * the writer then waits on the fence to ensure that all in-progress readers are aware of this update. + * When the wait on the fence returns successfully, it means that + * any in-progress readers that have not seen the change will not be allowed to commit anymore. This will + * force the readers to start a new transaction, and this ensures that the changes made by writer are visible + * to the readers. + *

+ * + * In case an in-progress reader commits when the writer is waiting on the fence, then the wait method will retry + * until the given timeout. + *

+ * + * Hence a successful await on a fence ensures that any reader (using the same fence) that successfully commits after + * this point onwards would see the change. + * + *

+ * Sample reader code: + *

+ *   
+ * TransactionAware fence = VisibilityFence.create(fenceId);
+ * TransactionContext readTxContext = new TransactionContext(txClient, fence, table1, table2, ...);
+ * readTxContext.start();
+ *
+ * // do operations using table1, table2, etc.
+ *
+ * // finally commit
+ * try {
+ *   readTxContext.finish();
+ * } catch (TransactionConflictException e) {
+ *   // handle conflict by aborting and starting over with a new transaction
+ * }
+ *   
+ * 
+ *

+ * + * Sample writer code: + *

+ *   
+ * // start transaction
+ * // write change
+ * // commit transaction
+ *
+ * // Now wait on the fence (with the same fenceId as the readers) to ensure that all in-progress readers are
+ * aware of this change
+ * try {
+ *   FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, txClient);
+ *   fenceWait.await(50000, 50, TimeUnit.MILLISECONDS);
+ * } catch (TimeoutException e) {
+ *   // await timed out, the change may not be visible to all in-progress readers.
+ *   // Application has two options at this point:
+ *   // 1. Revert the write. Re-try the write and fence wait again.
+ *   // 2. Retry only the wait with the same fenceWait object (retry logic is not shown here).
+ * }
+ *   
+ * 
+ * + * fenceId in the above samples refers to any id that both the readers and writer know for a given + * piece of data. Both readers and writer will have to use the same fenceId to synchronize on a given data. + * Typically fenceId uniquely identifies the data in question. + * For example, if the data is a table row, the fenceId can be composed of table name and row key. + * If the data is a table cell, the fenceId can be composed of table name, row key, and column key. + *

+ * + * Note that in this implementation, any reader that starts a transaction after the write is committed, and + * while this read transaction is in-progress, if a writer successfully starts and completes an await on the fence then + * this reader will get a conflict while committing the fence even though this reader has seen the latest changes. + * This is because today there is no way to determine the commit time of a transaction. + */ +public final class VisibilityFence { + private VisibilityFence() { + // Cannot instantiate this class, all functionality is through static methods. + } + + /** + * Used by a reader to get a fence that can be added to its transaction context. + * + * @param fenceId uniquely identifies the data that this fence is used to synchronize. + * If the data is a table cell then this id can be composed of the table name, row key + * and column key for the data. + * @return {@link TransactionAware} to be added to reader's transaction context. + */ + public static TransactionAware create(byte[] fenceId) { + return new ReadFence(fenceId); + } + + /** + * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions. + * + * @param fenceId uniquely identifies the data that this fence is used to synchronize. + * If the data is a table cell then this id can be composed of the table name, row key + * and column key for the data. + * @return {@link FenceWait} object + */ + public static FenceWait prepareWait(byte[] fenceId, TransactionSystemClient txClient) + throws TransactionFailureException, InterruptedException, TimeoutException { + return new DefaultFenceWait(new TransactionContext(txClient, new WriteFence(fenceId))); + } +} diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java new file mode 100644 index 00000000..d5a4b3ad --- /dev/null +++ b/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java @@ -0,0 +1,85 @@ +/* + * Copyright © 2015 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.tephra.visibility; + +import co.cask.tephra.Transaction; +import co.cask.tephra.TransactionAware; +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Longs; +import com.google.common.primitives.UnsignedBytes; + +import java.util.Collection; +import java.util.TreeSet; + +/** + * Implementation used by {@link FenceWait} to wait for a {@link VisibilityFence}. + */ +class WriteFence implements TransactionAware { + private final byte[] fenceId; + private Transaction tx; + private Collection inProgressChanges; + + public WriteFence(byte[] fenceId) { + this.fenceId = fenceId; + } + + @Override + public void startTx(Transaction tx) { + this.tx = tx; + if (inProgressChanges == null) { + inProgressChanges = new TreeSet<>(UnsignedBytes.lexicographicalComparator()); + for (long inProgressTx : tx.getInProgress()) { + inProgressChanges.add(Bytes.concat(fenceId, Longs.toByteArray(inProgressTx))); + } + } + } + + @Override + public void updateTx(Transaction tx) { + // Fences only need original transaction + } + + @Override + public Collection getTxChanges() { + if (inProgressChanges == null || tx == null) { + throw new IllegalStateException("Transaction has not started yet"); + } + return inProgressChanges; + } + + @Override + public boolean commitTx() throws Exception { + // Nothing to persist + return true; + } + + @Override + public void postTxCommit() { + tx = null; + } + + @Override + public boolean rollbackTx() throws Exception { + // Nothing to rollback + return true; + } + + @Override + public String getTransactionAwareName() { + return getClass().getSimpleName(); + } +} diff --git a/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java b/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java new file mode 100644 index 00000000..99529336 --- /dev/null +++ b/tephra-core/src/test/java/co/cask/tephra/visibility/VisibilityFenceTest.java @@ -0,0 +1,349 @@ +/* + * Copyright © 2015 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.tephra.visibility; + +import co.cask.tephra.Transaction; +import co.cask.tephra.TransactionAware; +import co.cask.tephra.TransactionConflictException; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionFailureException; +import co.cask.tephra.TransactionManager; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.inmemory.InMemoryTxSystemClient; +import co.cask.tephra.metrics.TxMetricsCollector; +import co.cask.tephra.persist.InMemoryTransactionStateStorage; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import org.apache.hadoop.conf.Configuration; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The following are all the possible cases when using {@link VisibilityFence}. + * + * In the below table, + * "Read Txn" refers to the transaction that contains the read fence + * "Before Write", "During Write" and "After Write" refer to the write transaction time + * "Before Write Fence", "During Write Fence", "After Write Fence" refer to the write fence transaction time + * + * Timeline is: Before Write < During Write < After Write < Before Write Fence < During Write Fence < + * After Write Fence + * + * +------+----------------------+----------------------+--------------------+--------------------+ + * | Case | Read Txn Start | Read Txn Commit | Conflict on Commit | Conflict on Commit | + * | | | | of Read Txn | of Write Fence | + * +------+----------------------+----------------------+--------------------+--------------------+ + * | 1 | Before Write | Before Write | No | No | + * | 2 | Before Write | During Write | No | No | + * | 3 | Before Write | After Write | No | No | + * | 4 | Before Write | Before Write Fence | No | No | + * | 5 | Before Write | During Write Fence | No | Yes | + * | 6 | Before Write | After Write Fence | Yes | No | + * | | | | | | + * | 7 | During Write | During Write | No | No | + * | 8 | During Write | After Write | No | No | + * | 9 | During Write | Before Write Fence | No | No | + * | 10 | During Write | During Write Fence | No | Yes | + * | 11 | During Write | After Write Fence | Yes | No | + * | | | | | | + * | 12 | After Write | After Write | No | No | + * | 13 | After Write | Before Write Fence | No | No | + * | 14 | After Write | During Write Fence | No | Yes # | + * | 15 | After Write | After Write Fence | Yes # | No | + * | | | | | | + * | 16 | Before Write Fence | Before Write Fence | No | No | + * | 17 | Before Write Fence | During Write Fence | No | Yes # | + * | 18 | Before Write Fence | After Write Fence | Yes # | No | + * | | | | | | + * | 19 | During Write Fence | During Write Fence | No | No | + * | 20 | During Write Fence | After Write Fence | No | No | + * | | | | | | + * | 21 | After Write Fence | After Write Fence | No | No | + * +------+----------------------+----------------------+--------------------+--------------------+ + * + * Note: Cases marked with '#' in conflict column should not conflict, however current implementation causes + * them to conflict. The remaining conflicts are a result of the fence. + * + * In the current implementation of VisibilityFence, read txns that start "Before Write", "During Write", + * and "After Write" can be represented by read txns that start "Before Write Fence". + * Verifying cases 16, 17, 18, 20 and 21 will effectively cover all other cases. + */ +public class VisibilityFenceTest { + private static Configuration conf = new Configuration(); + + private static TransactionManager txManager = null; + + @BeforeClass + public static void before() { + txManager = new TransactionManager(conf, new InMemoryTransactionStateStorage(), new TxMetricsCollector()); + txManager.startAndWait(); + } + + @AfterClass + public static void after() { + txManager.stopAndWait(); + } + + @Test + public void testFence1() throws Exception { + byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); + + // Writer updates data here in a separate transaction (code not shown) + // start tx + // update + // commit tx + + // Readers use fence to indicate that they are interested in changes to specific data + TransactionAware readFenceCase16 = VisibilityFence.create(fenceId); + TransactionContext readTxContextCase16 = + new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase16); + readTxContextCase16.start(); + readTxContextCase16.finish(); + + TransactionAware readFenceCase17 = VisibilityFence.create(fenceId); + TransactionContext readTxContextCase17 = + new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase17); + readTxContextCase17.start(); + + TransactionAware readFenceCase18 = VisibilityFence.create(fenceId); + TransactionContext readTxContextCase18 = + new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase18); + readTxContextCase18.start(); + + // Now writer needs to wait for in-progress readers to see the change, it uses write fence to do so + // Start write fence txn + TransactionAware writeFence = new WriteFence(fenceId); + TransactionContext writeTxContext = new TransactionContext(new InMemoryTxSystemClient(txManager), writeFence); + writeTxContext.start(); + + TransactionAware readFenceCase20 = VisibilityFence.create(fenceId); + TransactionContext readTxContextCase20 = + new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase20); + readTxContextCase20.start(); + + readTxContextCase17.finish(); + + assertTxnConflict(writeTxContext); + writeTxContext.start(); + + // Commit write fence txn can commit without conflicts at this point + writeTxContext.finish(); + + TransactionAware readFenceCase21 = VisibilityFence.create(fenceId); + TransactionContext readTxContextCase21 = + new TransactionContext(new InMemoryTxSystemClient(txManager), readFenceCase21); + readTxContextCase21.start(); + + assertTxnConflict(readTxContextCase18); + readTxContextCase20.finish(); + readTxContextCase21.finish(); + } + + private void assertTxnConflict(TransactionContext txContext) throws Exception { + try { + txContext.finish(); + Assert.fail("Expected transaction to fail"); + } catch (TransactionConflictException e) { + // Expected + txContext.abort(); + } + } + + @Test + public void testFence2() throws Exception { + byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); + + // Readers use fence to indicate that they are interested in changes to specific data + // Reader 1 + TransactionAware readFence1 = VisibilityFence.create(fenceId); + TransactionContext readTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence1); + readTxContext1.start(); + + // Reader 2 + TransactionAware readFence2 = VisibilityFence.create(fenceId); + TransactionContext readTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence2); + readTxContext2.start(); + + // Reader 3 + TransactionAware readFence3 = VisibilityFence.create(fenceId); + TransactionContext readTxContext3 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence3); + readTxContext3.start(); + + // Writer updates data here in a separate transaction (code not shown) + // start tx + // update + // commit tx + + // Now writer needs to wait for readers 1, 2, and 3 to see the change, it uses write fence to do so + TransactionAware writeFence = new WriteFence(fenceId); + TransactionContext writeTxContext = new TransactionContext(new InMemoryTxSystemClient(txManager), writeFence); + writeTxContext.start(); + + // Reader 1 commits before writeFence is committed + readTxContext1.finish(); + + try { + // writeFence will throw exception since Reader 1 committed without seeing changes + writeTxContext.finish(); + Assert.fail("Expected transaction to fail"); + } catch (TransactionConflictException e) { + // Expected + writeTxContext.abort(); + } + + // Start over writeFence again + writeTxContext.start(); + + // Now, Reader 3 commits before writeFence + // Note that Reader 3 does not conflict with Reader 1 + readTxContext3.finish(); + + try { + // writeFence will throw exception again since Reader 3 committed without seeing changes + writeTxContext.finish(); + Assert.fail("Expected transaction to fail"); + } catch (TransactionConflictException e) { + // Expected + writeTxContext.abort(); + } + + // Start over writeFence again + writeTxContext.start(); + // This time writeFence commits before the other readers + writeTxContext.finish(); + + // After this point all readers will see the change + + try { + // Reader 2 commits after writeFence, hence this commit with throw exception + readTxContext2.finish(); + Assert.fail("Expected transaction to fail"); + } catch (TransactionConflictException e) { + // Expected + readTxContext2.abort(); + } + + // Reader 2 has to abort and start over again. It will see the changes now. + readTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), readFence2); + readTxContext2.start(); + readTxContext2.finish(); + } + + @Test + public void testFenceAwait() throws Exception { + byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); + + final TransactionContext fence1 = new TransactionContext(new InMemoryTxSystemClient(txManager), + VisibilityFence.create(fenceId)); + fence1.start(); + final TransactionContext fence2 = new TransactionContext(new InMemoryTxSystemClient(txManager), + VisibilityFence.create(fenceId)); + fence2.start(); + TransactionContext fence3 = new TransactionContext(new InMemoryTxSystemClient(txManager), + VisibilityFence.create(fenceId)); + fence3.start(); + + final AtomicInteger attempts = new AtomicInteger(); + TransactionSystemClient customTxClient = new InMemoryTxSystemClient(txManager) { + @Override + public Transaction startShort() { + Transaction transaction = super.startShort(); + try { + switch (attempts.getAndIncrement()) { + case 0: + fence1.finish(); + break; + case 1: + fence2.finish(); + break; + case 2: + break; + default: + throw new IllegalStateException("Unexpected state"); + } + } catch (TransactionFailureException e) { + Throwables.propagate(e); + } + return transaction; + } + }; + + FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient); + fenceWait.await(1000, TimeUnit.MILLISECONDS); + Assert.assertEquals(3, attempts.get()); + + try { + fence3.finish(); + Assert.fail("Expected transaction to fail"); + } catch (TransactionConflictException e) { + // Expected exception + fence3.abort(); + } + + fence3.start(); + fence3.finish(); + } + + @Test + public void testFenceTimeout() throws Exception { + byte[] fenceId = "test_table".getBytes(Charsets.UTF_8); + + final TransactionContext fence1 = new TransactionContext(new InMemoryTxSystemClient(txManager), + VisibilityFence.create(fenceId)); + fence1.start(); + + final long timeout = 100; + final TimeUnit timeUnit = TimeUnit.MILLISECONDS; + final AtomicInteger attempts = new AtomicInteger(); + TransactionSystemClient customTxClient = new InMemoryTxSystemClient(txManager) { + @Override + public Transaction startShort() { + Transaction transaction = super.startShort(); + try { + switch (attempts.getAndIncrement()) { + case 0: + fence1.finish(); + break; + } + timeUnit.sleep(timeout + 1); + } catch (InterruptedException | TransactionFailureException e) { + Throwables.propagate(e); + } + return transaction; + } + }; + + try { + FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient); + fenceWait.await(timeout, timeUnit); + Assert.fail("Expected await to fail"); + } catch (TimeoutException e) { + // Expected exception + } + Assert.assertEquals(1, attempts.get()); + + FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, customTxClient); + fenceWait.await(timeout, timeUnit); + Assert.assertEquals(2, attempts.get()); + } +}