Skip to content
This repository has been archived by the owner on Nov 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #107 from caskdata/feature/visibility-barrier
Browse files Browse the repository at this point in the history
TEPHRA-157 Visibility guarentees using transaction
  • Loading branch information
poornachandra committed Jan 12, 2016
2 parents 7ba342a + 4a6a051 commit ba91969
Show file tree
Hide file tree
Showing 6 changed files with 754 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright © 2016 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package co.cask.tephra.visibility;

import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionFailureException;
import com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Default implementation of {@link FenceWait}.
*/
public class DefaultFenceWait implements FenceWait {
private static final Logger LOG = LoggerFactory.getLogger(DefaultFenceWait.class);

private final TransactionContext txContext;

DefaultFenceWait(TransactionContext txContext) {
this.txContext = txContext;
}

@Override
public void await(long timeout, TimeUnit timeUnit)
throws TransactionFailureException, InterruptedException, TimeoutException {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
long sleepTimeMicros = timeUnit.toMicros(timeout) / 10;
// Have sleep time to be within 1 microsecond and 500 milliseconds
sleepTimeMicros = Math.max(Math.min(sleepTimeMicros, 500 * 1000), 1);
while (stopwatch.elapsedTime(timeUnit) < timeout) {
txContext.start();
try {
txContext.finish();
return;
} catch (TransactionFailureException e) {
LOG.error("Got exception waiting for fence. Sleeping for {} microseconds", sleepTimeMicros, e);
txContext.abort();
TimeUnit.MICROSECONDS.sleep(sleepTimeMicros);
}
}
throw new TimeoutException("Timeout waiting for fence");
}
}
44 changes: 44 additions & 0 deletions tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright © 2016 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package co.cask.tephra.visibility;

import co.cask.tephra.TransactionFailureException;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions.
*/
public interface FenceWait {
/**
* Waits until the fence is complete, or till the timeout specified. The fence wait transaction will get re-tried
* several times until the timeout.
* <p>
*
* If a fence wait times out then it means there are still some readers with in-progress transactions that have not
* seen the change. In this case the wait will have to be retried using the same FenceWait object.
*
* @param timeout Maximum time to wait
* @param timeUnit {@link TimeUnit} for timeout and sleepTime
* @throws TransactionFailureException when not able to start fence wait transaction
* @throws InterruptedException on any interrupt
* @throws TimeoutException when timeout is reached
*/
void await(long timeout, TimeUnit timeUnit)
throws TransactionFailureException, InterruptedException, TimeoutException;
}
77 changes: 77 additions & 0 deletions tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright © 2015 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package co.cask.tephra.visibility;

import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionAware;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.Longs;

import java.util.Collection;
import java.util.Collections;

/**
* Implementation of {@link VisibilityFence} used by reader.
*/
class ReadFence implements TransactionAware {
private final byte[] fenceId;
private Transaction tx;

public ReadFence(byte[] fenceId) {
this.fenceId = fenceId;
}

@Override
public void startTx(Transaction tx) {
this.tx = tx;
}

@Override
public void updateTx(Transaction tx) {
// Fences only need original transaction
}

@Override
public Collection<byte[]> getTxChanges() {
if (tx == null) {
throw new IllegalStateException("Transaction has not started yet");
}
return Collections.singleton(Bytes.concat(fenceId, Longs.toByteArray(tx.getTransactionId())));
}

@Override
public boolean commitTx() throws Exception {
// Nothing to persist
return true;
}

@Override
public void postTxCommit() {
tx = null;
}

@Override
public boolean rollbackTx() throws Exception {
// Nothing to rollback
return true;
}

@Override
public String getTransactionAwareName() {
return getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright © 2015 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package co.cask.tephra.visibility;

import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionSystemClient;

import java.util.concurrent.TimeoutException;

/**
* VisibilityFence is used to ensure that after a given point in time, all readers see an updated change
* that got committed.
* <p>
*
* Typically a reader will never conflict with a writer, since a reader only sees committed changes when its
* transaction started. However to ensure that after a given point all readers are aware of a change,
* we have to introduce a conflict between a reader and a writer that act on the same data concurrently.
*<p>
*
* This is done by the reader indicating that it is interested in changes to a piece of data by using a fence
* in its transaction. If there are no changes to the data when reader tries to commit the transaction
* containing the fence, the commit succeeds.
* <p>
*
* On the other hand, a writer updates the same data in a transaction. After the write transaction is committed,
* the writer then waits on the fence to ensure that all in-progress readers are aware of this update.
* When the wait on the fence returns successfully, it means that
* any in-progress readers that have not seen the change will not be allowed to commit anymore. This will
* force the readers to start a new transaction, and this ensures that the changes made by writer are visible
* to the readers.
*<p>
*
* In case an in-progress reader commits when the writer is waiting on the fence, then the wait method will retry
* until the given timeout.
* <p>
*
* Hence a successful await on a fence ensures that any reader (using the same fence) that successfully commits after
* this point onwards would see the change.
*
* <p>
* Sample reader code:
* <pre>
* <code>
* TransactionAware fence = VisibilityFence.create(fenceId);
* TransactionContext readTxContext = new TransactionContext(txClient, fence, table1, table2, ...);
* readTxContext.start();
*
* // do operations using table1, table2, etc.
*
* // finally commit
* try {
* readTxContext.finish();
* } catch (TransactionConflictException e) {
* // handle conflict by aborting and starting over with a new transaction
* }
* </code>
* </pre>
*<p>
*
* Sample writer code:
* <pre>
* <code>
* // start transaction
* // write change
* // commit transaction
*
* // Now wait on the fence (with the same fenceId as the readers) to ensure that all in-progress readers are
* aware of this change
* try {
* FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, txClient);
* fenceWait.await(50000, 50, TimeUnit.MILLISECONDS);
* } catch (TimeoutException e) {
* // await timed out, the change may not be visible to all in-progress readers.
* // Application has two options at this point:
* // 1. Revert the write. Re-try the write and fence wait again.
* // 2. Retry only the wait with the same fenceWait object (retry logic is not shown here).
* }
* </code>
* </pre>
*
* fenceId in the above samples refers to any id that both the readers and writer know for a given
* piece of data. Both readers and writer will have to use the same fenceId to synchronize on a given data.
* Typically fenceId uniquely identifies the data in question.
* For example, if the data is a table row, the fenceId can be composed of table name and row key.
* If the data is a table cell, the fenceId can be composed of table name, row key, and column key.
*<p>
*
* Note that in this implementation, any reader that starts a transaction after the write is committed, and
* while this read transaction is in-progress, if a writer successfully starts and completes an await on the fence then
* this reader will get a conflict while committing the fence even though this reader has seen the latest changes.
* This is because today there is no way to determine the commit time of a transaction.
*/
public final class VisibilityFence {
private VisibilityFence() {
// Cannot instantiate this class, all functionality is through static methods.
}

/**
* Used by a reader to get a fence that can be added to its transaction context.
*
* @param fenceId uniquely identifies the data that this fence is used to synchronize.
* If the data is a table cell then this id can be composed of the table name, row key
* and column key for the data.
* @return {@link TransactionAware} to be added to reader's transaction context.
*/
public static TransactionAware create(byte[] fenceId) {
return new ReadFence(fenceId);
}

/**
* Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions.
*
* @param fenceId uniquely identifies the data that this fence is used to synchronize.
* If the data is a table cell then this id can be composed of the table name, row key
* and column key for the data.
* @return {@link FenceWait} object
*/
public static FenceWait prepareWait(byte[] fenceId, TransactionSystemClient txClient)
throws TransactionFailureException, InterruptedException, TimeoutException {
return new DefaultFenceWait(new TransactionContext(txClient, new WriteFence(fenceId)));
}
}
Loading

0 comments on commit ba91969

Please sign in to comment.