diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java b/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java index 591bdde7..19ffc6aa 100644 --- a/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java +++ b/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java @@ -156,6 +156,9 @@ public class TransactionManager extends AbstractService { private final Lock logReadLock = logLock.readLock(); private final Lock logWriteLock = logLock.writeLock(); + // fudge factor (in milliseconds) used when interpreting transactions as LONG based on expiration + // TODO: REMOVE WITH txnBackwardsCompatCheck() + private final long longTimeoutTolerance; public TransactionManager(Configuration config) { this(config, new NoOpTransactionStateStorage(new SnapshotCodecProvider(config)), new TxMetricsCollector()); @@ -176,6 +179,12 @@ public TransactionManager(Configuration conf, @Nonnull TransactionStateStorage p // must always keep at least 1 snapshot snapshotRetainCount = Math.max(conf.getInt(TxConstants.Manager.CFG_TX_SNAPSHOT_RETAIN, TxConstants.Manager.DEFAULT_TX_SNAPSHOT_RETAIN), 1); + + // intentionally not using a constant, as this config should not be exposed + // TODO: REMOVE WITH txnBackwardsCompatCheck() + longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 10000); + + // this.txMetricsCollector = txMetricsCollector; clear(); } @@ -480,7 +489,7 @@ private void restoreSnapshot(TransactionSnapshot snapshot) { readPointer = snapshot.getReadPointer(); lastWritePointer = snapshot.getWritePointer(); invalid.addAll(snapshot.getInvalid()); - inProgress.putAll(txnBackwardsCompatCheck(snapshot.getInProgress())); + inProgress.putAll(txnBackwardsCompatCheck(defaultLongTimeout, longTimeoutTolerance, snapshot.getInProgress())); committingChangeSets.putAll(snapshot.getCommittingChangeSets()); committedChangeSets.putAll(snapshot.getCommittedChangeSets()); } @@ -490,13 +499,21 @@ private void restoreSnapshot(TransactionSnapshot snapshot) { * This is required for backwards compatibility, when long running transactions were represented * with expiration time -1. This can be removed when we stop supporting SnapshotCodec version 1. */ - private Map txnBackwardsCompatCheck(Map inProgress) { + public static Map txnBackwardsCompatCheck(int defaultLongTimeout, long longTimeoutTolerance, + Map inProgress) { for (Map.Entry entry : inProgress.entrySet()) { - if (entry.getValue().getExpiration() < 0) { - long writePointer = entry.getKey(); - long expiration = getTxExpirationFromWritePointer(writePointer, defaultLongTimeout); + long writePointer = entry.getKey(); + long expiration = entry.getValue().getExpiration(); + // LONG transactions will either have a negative expiration or expiration set to the long timeout + // use a fudge factor on the expiration check, since expiraton is set based on system time, not the write pointer + if (entry.getValue().getType() == null && + (expiration < 0 || + (getTxExpirationFromWritePointer(writePointer, defaultLongTimeout) - expiration + < longTimeoutTolerance))) { + // handle null expiration + long newExpiration = getTxExpirationFromWritePointer(writePointer, defaultLongTimeout); InProgressTx compatTx = - new InProgressTx(entry.getValue().getVisibilityUpperBound(), expiration, TransactionType.LONG); + new InProgressTx(entry.getValue().getVisibilityUpperBound(), newExpiration, TransactionType.LONG); entry.setValue(compatTx); } else if (entry.getValue().getType() == null) { InProgressTx compatTx = @@ -696,7 +713,7 @@ private static long getTxExpiration(long timeoutInSeconds) { return currentTime + TimeUnit.SECONDS.toMillis(timeoutInSeconds); } - private static long getTxExpirationFromWritePointer(long writePointer, long timeoutInSeconds) { + public static long getTxExpirationFromWritePointer(long writePointer, long timeoutInSeconds) { return writePointer / TxConstants.MAX_TX_PER_MS + TimeUnit.SECONDS.toMillis(timeoutInSeconds); } @@ -1252,5 +1269,14 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hashCode(visibilityUpperBound, expiration, type); } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("visibilityUpperBound", visibilityUpperBound) + .add("expiration", expiration) + .add("type", type) + .toString(); + } } } diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java new file mode 100644 index 00000000..eccef403 --- /dev/null +++ b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java @@ -0,0 +1,32 @@ +/* + * Copyright © 2015 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.tephra.snapshot; + +/** + * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot} + * and its elements to {@code byte[]}. + * + *

The serialization/deserialization of this codec is the same as that performed by {@link SnapshotCodecV2}, + * but a new version number is used to allow easy migration from projects using deprecated codecs with + * conflicting version numbers.

+ */ +public class SnapshotCodecV3 extends SnapshotCodecV2 { + @Override + public int getVersion() { + return 3; + } +} diff --git a/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java new file mode 100644 index 00000000..49304799 --- /dev/null +++ b/tephra-core/src/test/java/co/cask/tephra/snapshot/SnapshotCodecTest.java @@ -0,0 +1,180 @@ +/* + * Copyright © 2015 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package co.cask.tephra.snapshot; + +import co.cask.tephra.ChangeId; +import co.cask.tephra.TransactionManager; +import co.cask.tephra.TransactionType; +import co.cask.tephra.TxConstants; +import co.cask.tephra.persist.TransactionSnapshot; +import co.cask.tephra.persist.TransactionStateStorage; +import co.cask.tephra.runtime.ConfigModule; +import co.cask.tephra.runtime.DiscoveryModules; +import co.cask.tephra.runtime.TransactionModules; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.hadoop.conf.Configuration; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +/** + * Tests related to {@link SnapshotCodec} implementations. + */ +public class SnapshotCodecTest { + @ClassRule + public static TemporaryFolder tmpDir = new TemporaryFolder(); + + /** + * In-progress LONG transactions written with DefaultSnapshotCodec will not have the type serialized as part of + * the data. Since these transactions also contain a non-negative expiration, we need to ensure we reset the type + * correctly when the snapshot is loaded. + */ + @Test + public void testDefaultToV3Compatibility() throws Exception { + long now = System.currentTimeMillis(); + long nowWritePointer = now * TxConstants.MAX_TX_PER_MS; + /* + * Snapshot consisting of transactions at: + */ + long tInvalid = nowWritePointer - 5; // t1 - invalid + long readPtr = nowWritePointer - 4; // t2 - here and earlier committed + long tLong = nowWritePointer - 3; // t3 - in-progress LONG + long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2) + long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4) + + TreeMap inProgress = Maps.newTreeMap(ImmutableSortedMap.of( + tLong, new TransactionManager.InProgressTx(readPtr, + TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT), + TransactionType.LONG), + tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT))); + + TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer, + Lists.newArrayList(tInvalid), // invalid + inProgress, + ImmutableMap.>of( + tShort, Sets.newHashSet(new ChangeId(new byte[]{'r', '3'}), new ChangeId(new byte[]{'r', '4'}))), + ImmutableMap.>of( + tCommitted, Sets.newHashSet(new ChangeId(new byte[]{'r', '1'}), new ChangeId(new byte[]{'r', '2'})))); + + Configuration conf1 = new Configuration(); + conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); + SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + provider1.encode(out, snapshot); + } finally { + out.close(); + } + + TransactionSnapshot snapshot2 = provider1.decode(new ByteArrayInputStream(out.toByteArray())); + assertEquals(snapshot.getReadPointer(), snapshot2.getReadPointer()); + assertEquals(snapshot.getWritePointer(), snapshot2.getWritePointer()); + assertEquals(snapshot.getInvalid(), snapshot2.getInvalid()); + // in-progress transactions will have missing types + assertNotEquals(snapshot.getInProgress(), snapshot2.getInProgress()); + assertEquals(snapshot.getCommittingChangeSets(), snapshot2.getCommittingChangeSets()); + assertEquals(snapshot.getCommittedChangeSets(), snapshot2.getCommittedChangeSets()); + + // after fixing in-progress, full snapshot should match + Map fixedInProgress = TransactionManager.txnBackwardsCompatCheck( + TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT, 10000L, snapshot2.getInProgress()); + assertEquals(snapshot.getInProgress(), fixedInProgress); + assertEquals(snapshot, snapshot2); + } + + /** + * Test full stack serialization for a TransactionManager migrating from DefaultSnapshotCodec to SnapshotCodecV3. + */ + @Test + public void testDefaultToV3Migration() throws Exception { + File testDir = tmpDir.newFolder("testDefaultToV3Migration"); + Configuration conf = new Configuration(); + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath()); + + Injector injector = Guice.createInjector(new ConfigModule(conf), + new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules()); + + TransactionManager txManager = injector.getInstance(TransactionManager.class); + txManager.startAndWait(); + + txManager.startLong(); + + // shutdown to force a snapshot + txManager.stopAndWait(); + + TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class); + txStorage.startAndWait(); + + // confirm that the in-progress entry is missing a type + TransactionSnapshot snapshot = txStorage.getLatestSnapshot(); + assertNotNull(snapshot); + assertEquals(1, snapshot.getInProgress().size()); + Map.Entry entry = + snapshot.getInProgress().entrySet().iterator().next(); + assertNull(entry.getValue().getType()); + + + // start a new Tx manager to test fixup + Configuration conf2 = new Configuration(); + conf2.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath()); + conf2.setStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, + DefaultSnapshotCodec.class.getName(), SnapshotCodecV3.class.getName()); + Injector injector2 = Guice.createInjector(new ConfigModule(conf2), + new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules()); + + TransactionManager txManager2 = injector2.getInstance(TransactionManager.class); + txManager2.startAndWait(); + + // state should be recovered + TransactionSnapshot snapshot2 = txManager2.getCurrentState(); + assertEquals(1, snapshot2.getInProgress().size()); + Map.Entry inProgressTx = + snapshot2.getInProgress().entrySet().iterator().next(); + assertEquals(TransactionType.LONG, inProgressTx.getValue().getType()); + + // save a new snapshot + txManager2.stopAndWait(); + + TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class); + txStorage2.startAndWait(); + + TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot(); + // full snapshot should have deserialized correctly without any fixups + assertEquals(snapshot2.getInProgress(), snapshot3.getInProgress()); + assertEquals(snapshot2, snapshot3); + } +}