Skip to content
This repository has been archived by the owner on Nov 17, 2024. It is now read-only.

Commit

Permalink
TEPHRA-101 Add SnapshotCodecV3 to enable CDAP migration
Browse files Browse the repository at this point in the history
  • Loading branch information
ghelmling committed May 14, 2015
1 parent 92e9951 commit 65684c9
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 7 deletions.
40 changes: 33 additions & 7 deletions tephra-core/src/main/java/co/cask/tephra/TransactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ public class TransactionManager extends AbstractService {
private final Lock logReadLock = logLock.readLock();
private final Lock logWriteLock = logLock.writeLock();

// fudge factor (in milliseconds) used when interpreting transactions as LONG based on expiration
// TODO: REMOVE WITH txnBackwardsCompatCheck()
private final long longTimeoutTolerance;

public TransactionManager(Configuration config) {
this(config, new NoOpTransactionStateStorage(new SnapshotCodecProvider(config)), new TxMetricsCollector());
Expand All @@ -176,6 +179,12 @@ public TransactionManager(Configuration conf, @Nonnull TransactionStateStorage p
// must always keep at least 1 snapshot
snapshotRetainCount = Math.max(conf.getInt(TxConstants.Manager.CFG_TX_SNAPSHOT_RETAIN,
TxConstants.Manager.DEFAULT_TX_SNAPSHOT_RETAIN), 1);

// intentionally not using a constant, as this config should not be exposed
// TODO: REMOVE WITH txnBackwardsCompatCheck()
longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 10000);

//
this.txMetricsCollector = txMetricsCollector;
clear();
}
Expand Down Expand Up @@ -480,7 +489,7 @@ private void restoreSnapshot(TransactionSnapshot snapshot) {
readPointer = snapshot.getReadPointer();
lastWritePointer = snapshot.getWritePointer();
invalid.addAll(snapshot.getInvalid());
inProgress.putAll(txnBackwardsCompatCheck(snapshot.getInProgress()));
inProgress.putAll(txnBackwardsCompatCheck(defaultLongTimeout, longTimeoutTolerance, snapshot.getInProgress()));
committingChangeSets.putAll(snapshot.getCommittingChangeSets());
committedChangeSets.putAll(snapshot.getCommittedChangeSets());
}
Expand All @@ -490,13 +499,21 @@ private void restoreSnapshot(TransactionSnapshot snapshot) {
* This is required for backwards compatibility, when long running transactions were represented
* with expiration time -1. This can be removed when we stop supporting SnapshotCodec version 1.
*/
private Map<Long, InProgressTx> txnBackwardsCompatCheck(Map<Long, InProgressTx> inProgress) {
public static Map<Long, InProgressTx> txnBackwardsCompatCheck(int defaultLongTimeout, long longTimeoutTolerance,
Map<Long, InProgressTx> inProgress) {
for (Map.Entry<Long, InProgressTx> entry : inProgress.entrySet()) {
if (entry.getValue().getExpiration() < 0) {
long writePointer = entry.getKey();
long expiration = getTxExpirationFromWritePointer(writePointer, defaultLongTimeout);
long writePointer = entry.getKey();
long expiration = entry.getValue().getExpiration();
// LONG transactions will either have a negative expiration or expiration set to the long timeout
// use a fudge factor on the expiration check, since expiraton is set based on system time, not the write pointer
if (entry.getValue().getType() == null &&
(expiration < 0 ||
(getTxExpirationFromWritePointer(writePointer, defaultLongTimeout) - expiration
< longTimeoutTolerance))) {
// handle null expiration
long newExpiration = getTxExpirationFromWritePointer(writePointer, defaultLongTimeout);
InProgressTx compatTx =
new InProgressTx(entry.getValue().getVisibilityUpperBound(), expiration, TransactionType.LONG);
new InProgressTx(entry.getValue().getVisibilityUpperBound(), newExpiration, TransactionType.LONG);
entry.setValue(compatTx);
} else if (entry.getValue().getType() == null) {
InProgressTx compatTx =
Expand Down Expand Up @@ -696,7 +713,7 @@ private static long getTxExpiration(long timeoutInSeconds) {
return currentTime + TimeUnit.SECONDS.toMillis(timeoutInSeconds);
}

private static long getTxExpirationFromWritePointer(long writePointer, long timeoutInSeconds) {
public static long getTxExpirationFromWritePointer(long writePointer, long timeoutInSeconds) {
return writePointer / TxConstants.MAX_TX_PER_MS + TimeUnit.SECONDS.toMillis(timeoutInSeconds);
}

Expand Down Expand Up @@ -1252,5 +1269,14 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hashCode(visibilityUpperBound, expiration, type);
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("visibilityUpperBound", visibilityUpperBound)
.add("expiration", expiration)
.add("type", type)
.toString();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright © 2015 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package co.cask.tephra.snapshot;

/**
* Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot}
* and its elements to {@code byte[]}.
*
* <p>The serialization/deserialization of this codec is the same as that performed by {@link SnapshotCodecV2},
* but a new version number is used to allow easy migration from projects using deprecated codecs with
* conflicting version numbers.</p>
*/
public class SnapshotCodecV3 extends SnapshotCodecV2 {
@Override
public int getVersion() {
return 3;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright © 2015 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package co.cask.tephra.snapshot;

import co.cask.tephra.ChangeId;
import co.cask.tephra.TransactionManager;
import co.cask.tephra.TransactionType;
import co.cask.tephra.TxConstants;
import co.cask.tephra.persist.TransactionSnapshot;
import co.cask.tephra.persist.TransactionStateStorage;
import co.cask.tephra.runtime.ConfigModule;
import co.cask.tephra.runtime.DiscoveryModules;
import co.cask.tephra.runtime.TransactionModules;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.hadoop.conf.Configuration;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

/**
* Tests related to {@link SnapshotCodec} implementations.
*/
public class SnapshotCodecTest {
@ClassRule
public static TemporaryFolder tmpDir = new TemporaryFolder();

/**
* In-progress LONG transactions written with DefaultSnapshotCodec will not have the type serialized as part of
* the data. Since these transactions also contain a non-negative expiration, we need to ensure we reset the type
* correctly when the snapshot is loaded.
*/
@Test
public void testDefaultToV3Compatibility() throws Exception {
long now = System.currentTimeMillis();
long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
/*
* Snapshot consisting of transactions at:
*/
long tInvalid = nowWritePointer - 5; // t1 - invalid
long readPtr = nowWritePointer - 4; // t2 - here and earlier committed
long tLong = nowWritePointer - 3; // t3 - in-progress LONG
long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2)
long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)

TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
tLong, new TransactionManager.InProgressTx(readPtr,
TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
TransactionType.LONG),
tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));

TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
Lists.newArrayList(tInvalid), // invalid
inProgress,
ImmutableMap.<Long, Set<ChangeId>>of(
tShort, Sets.newHashSet(new ChangeId(new byte[]{'r', '3'}), new ChangeId(new byte[]{'r', '4'}))),
ImmutableMap.<Long, Set<ChangeId>>of(
tCommitted, Sets.newHashSet(new ChangeId(new byte[]{'r', '1'}), new ChangeId(new byte[]{'r', '2'}))));

Configuration conf1 = new Configuration();
conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);

ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
provider1.encode(out, snapshot);
} finally {
out.close();
}

TransactionSnapshot snapshot2 = provider1.decode(new ByteArrayInputStream(out.toByteArray()));
assertEquals(snapshot.getReadPointer(), snapshot2.getReadPointer());
assertEquals(snapshot.getWritePointer(), snapshot2.getWritePointer());
assertEquals(snapshot.getInvalid(), snapshot2.getInvalid());
// in-progress transactions will have missing types
assertNotEquals(snapshot.getInProgress(), snapshot2.getInProgress());
assertEquals(snapshot.getCommittingChangeSets(), snapshot2.getCommittingChangeSets());
assertEquals(snapshot.getCommittedChangeSets(), snapshot2.getCommittedChangeSets());

// after fixing in-progress, full snapshot should match
Map<Long, TransactionManager.InProgressTx> fixedInProgress = TransactionManager.txnBackwardsCompatCheck(
TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT, 10000L, snapshot2.getInProgress());
assertEquals(snapshot.getInProgress(), fixedInProgress);
assertEquals(snapshot, snapshot2);
}

/**
* Test full stack serialization for a TransactionManager migrating from DefaultSnapshotCodec to SnapshotCodecV3.
*/
@Test
public void testDefaultToV3Migration() throws Exception {
File testDir = tmpDir.newFolder("testDefaultToV3Migration");
Configuration conf = new Configuration();
conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());

Injector injector = Guice.createInjector(new ConfigModule(conf),
new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());

TransactionManager txManager = injector.getInstance(TransactionManager.class);
txManager.startAndWait();

txManager.startLong();

// shutdown to force a snapshot
txManager.stopAndWait();

TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
txStorage.startAndWait();

// confirm that the in-progress entry is missing a type
TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
assertNotNull(snapshot);
assertEquals(1, snapshot.getInProgress().size());
Map.Entry<Long, TransactionManager.InProgressTx> entry =
snapshot.getInProgress().entrySet().iterator().next();
assertNull(entry.getValue().getType());


// start a new Tx manager to test fixup
Configuration conf2 = new Configuration();
conf2.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
conf2.setStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES,
DefaultSnapshotCodec.class.getName(), SnapshotCodecV3.class.getName());
Injector injector2 = Guice.createInjector(new ConfigModule(conf2),
new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());

TransactionManager txManager2 = injector2.getInstance(TransactionManager.class);
txManager2.startAndWait();

// state should be recovered
TransactionSnapshot snapshot2 = txManager2.getCurrentState();
assertEquals(1, snapshot2.getInProgress().size());
Map.Entry<Long, TransactionManager.InProgressTx> inProgressTx =
snapshot2.getInProgress().entrySet().iterator().next();
assertEquals(TransactionType.LONG, inProgressTx.getValue().getType());

// save a new snapshot
txManager2.stopAndWait();

TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
txStorage2.startAndWait();

TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot();
// full snapshot should have deserialized correctly without any fixups
assertEquals(snapshot2.getInProgress(), snapshot3.getInProgress());
assertEquals(snapshot2, snapshot3);
}
}

0 comments on commit 65684c9

Please sign in to comment.