From 7f3257ff675ce05af03e6516856d32e6f54fc48f Mon Sep 17 00:00:00 2001 From: Gary Helmling Date: Tue, 23 Jun 2015 18:08:24 -0700 Subject: [PATCH] TEPHRA-104 Use cell timestamps when generating family delete markers --- .../hbase96/TransactionAwareHTable.java | 4 +- .../coprocessor/TransactionProcessor.java | 3 +- .../coprocessor/TransactionProcessorTest.java | 60 ++++++++++++++++ .../coprocessor/TransactionProcessor.java | 3 +- .../coprocessor/TransactionProcessorTest.java | 60 ++++++++++++++++ .../coprocessor/TransactionProcessor.java | 3 +- .../coprocessor/TransactionProcessorTest.java | 60 ++++++++++++++++ .../coprocessor/TransactionProcessor.java | 3 +- .../coprocessor/TransactionProcessorTest.java | 68 ++++++++++++++++++- 9 files changed, 256 insertions(+), 8 deletions(-) diff --git a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/TransactionAwareHTable.java b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/TransactionAwareHTable.java index 84bfac14..2b66b407 100644 --- a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/TransactionAwareHTable.java +++ b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/TransactionAwareHTable.java @@ -558,7 +558,9 @@ private Delete transactionalizeAction(Delete delete) throws IOException { if (conflictLevel == TxConstants.ConflictDetection.ROW || conflictLevel == TxConstants.ConflictDetection.NONE) { // no need to identify individual columns deleted - txDelete.deleteFamily(family); + // Older versions of HBase 0.96 lack HBASE-10964, so family deletes do not correctly + // inherit the common Delete timestamp, so must explicitly set the timestamp here. + txDelete.deleteFamily(family, transactionTimestamp); addToChangeSet(deleteRow, null, null); } else { Result result = get(new Get(delete.getRow()).addFamily(family)); diff --git a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessor.java index f3b3990b..bbd2b8a9 100644 --- a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessor.java @@ -176,7 +176,8 @@ public void preDelete(ObserverContext e, Delete de for (byte[] family : delete.getFamilyCellMap().keySet()) { List familyCells = delete.getFamilyCellMap().get(family); if (isFamilyDelete(familyCells)) { - deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY); + deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(), + HConstants.EMPTY_BYTE_ARRAY); } else { int cellSize = familyCells.size(); for (int i = 0; i < cellSize; i++) { diff --git a/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java index 54b42802..1c4b678f 100644 --- a/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java @@ -366,6 +366,66 @@ public void testDeleteMarkerCleanup() throws Exception { } } + /** + * Test that we correctly preserve the timestamp set for column family delete markers. This is not + * directly required for the TransactionAwareHTable usage, but is the right thing to do and ensures + * that we make it easy to interoperate with other systems. + */ + @Test + public void testFamilyDeleteTimestamp() throws Exception { + String tableName = "TestFamilyDeleteTimestamp"; + byte[] family1Bytes = Bytes.toBytes("f1"); + byte[] columnBytes = Bytes.toBytes("c"); + byte[] rowBytes = Bytes.toBytes("row"); + byte[] valBytes = Bytes.toBytes("val"); + HRegion region = createRegion(tableName, family1Bytes, 0); + try { + region.initialize(); + + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + Put p = new Put(rowBytes); + p.add(family1Bytes, columnBytes, now - 10, valBytes); + region.put(p); + + // issue a family delete with an explicit timestamp + Delete delete = new Delete(rowBytes, now); + delete.deleteFamily(family1Bytes, now - 5); + region.delete(delete); + + // test that the delete marker preserved the timestamp + Scan scan = new Scan(); + scan.setMaxVersions(); + RegionScanner scanner = region.getScanner(scan); + List results = Lists.newArrayList(); + scanner.next(results); + assertEquals(2, results.size()); + // delete marker should appear first + Cell cell = results.get(0); + assertArrayEquals(new byte[0], cell.getQualifier()); + assertArrayEquals(new byte[0], cell.getValue()); + assertEquals(now - 5, cell.getTimestamp()); + // since this is an unfiltered scan against the region, the original put should be next + cell = results.get(1); + assertArrayEquals(valBytes, cell.getValue()); + assertEquals(now - 10, cell.getTimestamp()); + scanner.close(); + + + // with a filtered scan the original put should disappear + scan = new Scan(); + scan.setMaxVersions(); + scan.setFilter(new TransactionVisibilityFilter( + TxUtils.createDummyTransaction(txSnapshot), new TreeMap(), false, ScanType.USER_SCAN)); + scanner = region.getScanner(scan); + results = Lists.newArrayList(); + scanner.next(results); + assertEquals(0, results.size()); + scanner.close(); + } finally { + region.close(); + } + } + private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor cfd = new HColumnDescriptor(family); diff --git a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessor.java index 8478141f..9c97ab42 100644 --- a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessor.java @@ -176,7 +176,8 @@ public void preDelete(ObserverContext e, Delete de for (byte[] family : delete.getFamilyCellMap().keySet()) { List familyCells = delete.getFamilyCellMap().get(family); if (isFamilyDelete(familyCells)) { - deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY); + deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(), + HConstants.EMPTY_BYTE_ARRAY); } else { int cellSize = familyCells.size(); for (int i = 0; i < cellSize; i++) { diff --git a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java index a8656628..7283630a 100644 --- a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java @@ -372,6 +372,66 @@ public void testDeleteMarkerCleanup() throws Exception { } } + /** + * Test that we correctly preserve the timestamp set for column family delete markers. This is not + * directly required for the TransactionAwareHTable usage, but is the right thing to do and ensures + * that we make it easy to interoperate with other systems. + */ + @Test + public void testFamilyDeleteTimestamp() throws Exception { + String tableName = "TestFamilyDeleteTimestamp"; + byte[] family1Bytes = Bytes.toBytes("f1"); + byte[] columnBytes = Bytes.toBytes("c"); + byte[] rowBytes = Bytes.toBytes("row"); + byte[] valBytes = Bytes.toBytes("val"); + HRegion region = createRegion(tableName, family1Bytes, 0); + try { + region.initialize(); + + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + Put p = new Put(rowBytes); + p.add(family1Bytes, columnBytes, now - 10, valBytes); + region.put(p); + + // issue a family delete with an explicit timestamp + Delete delete = new Delete(rowBytes, now); + delete.deleteFamily(family1Bytes, now - 5); + region.delete(delete); + + // test that the delete marker preserved the timestamp + Scan scan = new Scan(); + scan.setMaxVersions(); + RegionScanner scanner = region.getScanner(scan); + List results = Lists.newArrayList(); + scanner.next(results); + assertEquals(2, results.size()); + // delete marker should appear first + Cell cell = results.get(0); + assertArrayEquals(new byte[0], cell.getQualifier()); + assertArrayEquals(new byte[0], cell.getValue()); + assertEquals(now - 5, cell.getTimestamp()); + // since this is an unfiltered scan against the region, the original put should be next + cell = results.get(1); + assertArrayEquals(valBytes, cell.getValue()); + assertEquals(now - 10, cell.getTimestamp()); + scanner.close(); + + + // with a filtered scan the original put should disappear + scan = new Scan(); + scan.setMaxVersions(); + scan.setFilter(new TransactionVisibilityFilter( + TxUtils.createDummyTransaction(txSnapshot), new TreeMap(), false, ScanType.USER_SCAN)); + scanner = region.getScanner(scan); + results = Lists.newArrayList(); + scanner.next(results); + assertEquals(0, results.size()); + scanner.close(); + } finally { + region.close(); + } + } + private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor cfd = new HColumnDescriptor(family); diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/co/cask/tephra/hbase10cdh/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/co/cask/tephra/hbase10cdh/coprocessor/TransactionProcessor.java index cda6988a..d79d055f 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/co/cask/tephra/hbase10cdh/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/co/cask/tephra/hbase10cdh/coprocessor/TransactionProcessor.java @@ -176,7 +176,8 @@ public void preDelete(ObserverContext e, Delete de for (byte[] family : delete.getFamilyCellMap().keySet()) { List familyCells = delete.getFamilyCellMap().get(family); if (isFamilyDelete(familyCells)) { - deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY); + deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(), + HConstants.EMPTY_BYTE_ARRAY); } else { int cellSize = familyCells.size(); for (int i = 0; i < cellSize; i++) { diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/co/cask/tephra/hbase10cdh/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/co/cask/tephra/hbase10cdh/coprocessor/TransactionProcessorTest.java index ba42df2a..68b444d6 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/co/cask/tephra/hbase10cdh/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/co/cask/tephra/hbase10cdh/coprocessor/TransactionProcessorTest.java @@ -351,6 +351,66 @@ public void testDeleteMarkerCleanup() throws Exception { } } + /** + * Test that we correctly preserve the timestamp set for column family delete markers. This is not + * directly required for the TransactionAwareHTable usage, but is the right thing to do and ensures + * that we make it easy to interoperate with other systems. + */ + @Test + public void testFamilyDeleteTimestamp() throws Exception { + String tableName = "TestFamilyDeleteTimestamp"; + byte[] family1Bytes = Bytes.toBytes("f1"); + byte[] columnBytes = Bytes.toBytes("c"); + byte[] rowBytes = Bytes.toBytes("row"); + byte[] valBytes = Bytes.toBytes("val"); + HRegion region = createRegion(tableName, family1Bytes, 0); + try { + region.initialize(); + + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + Put p = new Put(rowBytes); + p.add(family1Bytes, columnBytes, now - 10, valBytes); + region.put(p); + + // issue a family delete with an explicit timestamp + Delete delete = new Delete(rowBytes, now); + delete.deleteFamily(family1Bytes, now - 5); + region.delete(delete); + + // test that the delete marker preserved the timestamp + Scan scan = new Scan(); + scan.setMaxVersions(); + RegionScanner scanner = region.getScanner(scan); + List results = Lists.newArrayList(); + scanner.next(results); + assertEquals(2, results.size()); + // delete marker should appear first + Cell cell = results.get(0); + assertArrayEquals(new byte[0], cell.getQualifier()); + assertArrayEquals(new byte[0], cell.getValue()); + assertEquals(now - 5, cell.getTimestamp()); + // since this is an unfiltered scan against the region, the original put should be next + cell = results.get(1); + assertArrayEquals(valBytes, cell.getValue()); + assertEquals(now - 10, cell.getTimestamp()); + scanner.close(); + + + // with a filtered scan the original put should disappear + scan = new Scan(); + scan.setMaxVersions(); + scan.setFilter(new TransactionVisibilityFilter( + TxUtils.createDummyTransaction(txSnapshot), new TreeMap(), false, ScanType.USER_SCAN)); + scanner = region.getScanner(scan); + results = Lists.newArrayList(); + scanner.next(results); + assertEquals(0, results.size()); + scanner.close(); + } finally { + region.close(); + } + } + private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor cfd = new HColumnDescriptor(family); diff --git a/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/coprocessor/TransactionProcessor.java index ccfb7ba5..ad27f219 100644 --- a/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0/src/main/java/co/cask/tephra/hbase10/coprocessor/TransactionProcessor.java @@ -176,7 +176,8 @@ public void preDelete(ObserverContext e, Delete de for (byte[] family : delete.getFamilyCellMap().keySet()) { List familyCells = delete.getFamilyCellMap().get(family); if (isFamilyDelete(familyCells)) { - deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY); + deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(), + HConstants.EMPTY_BYTE_ARRAY); } else { int cellSize = familyCells.size(); for (int i = 0; i < cellSize; i++) { diff --git a/tephra-hbase-compat-1.0/src/test/java/co/cask/tephra/hbase10/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/co/cask/tephra/hbase10/coprocessor/TransactionProcessorTest.java index 34a05b68..f7926010 100644 --- a/tephra-hbase-compat-1.0/src/test/java/co/cask/tephra/hbase10/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/co/cask/tephra/hbase10/coprocessor/TransactionProcessorTest.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; @@ -170,7 +172,7 @@ public void testDataJanitorRegionScanner() throws Exception { // first returned value should be "4" with version "4" results.clear(); assertTrue(regionScanner.next(results)); - assertKeyValueMatches(results, 4, new long[] {V[4]}); + assertKeyValueMatches(results, 4, new long[]{V[4]}); results.clear(); assertTrue(regionScanner.next(results)); @@ -178,11 +180,11 @@ public void testDataJanitorRegionScanner() throws Exception { results.clear(); assertTrue(regionScanner.next(results)); - assertKeyValueMatches(results, 6, new long[] {V[6], V[4]}); + assertKeyValueMatches(results, 6, new long[]{V[6], V[4]}); results.clear(); assertTrue(regionScanner.next(results)); - assertKeyValueMatches(results, 7, new long[] {V[6], V[4]}); + assertKeyValueMatches(results, 7, new long[]{V[6], V[4]}); results.clear(); assertFalse(regionScanner.next(results)); @@ -352,6 +354,66 @@ public void testDeleteMarkerCleanup() throws Exception { } } + /** + * Test that we correctly preserve the timestamp set for column family delete markers. This is not + * directly required for the TransactionAwareHTable usage, but is the right thing to do and ensures + * that we make it easy to interoperate with other systems. + */ + @Test + public void testFamilyDeleteTimestamp() throws Exception { + String tableName = "TestFamilyDeleteTimestamp"; + byte[] family1Bytes = Bytes.toBytes("f1"); + byte[] columnBytes = Bytes.toBytes("c"); + byte[] rowBytes = Bytes.toBytes("row"); + byte[] valBytes = Bytes.toBytes("val"); + HRegion region = createRegion(tableName, family1Bytes, 0); + try { + region.initialize(); + + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + Put p = new Put(rowBytes); + p.add(family1Bytes, columnBytes, now - 10, valBytes); + region.put(p); + + // issue a family delete with an explicit timestamp + Delete delete = new Delete(rowBytes, now); + delete.deleteFamily(family1Bytes, now - 5); + region.delete(delete); + + // test that the delete marker preserved the timestamp + Scan scan = new Scan(); + scan.setMaxVersions(); + RegionScanner scanner = region.getScanner(scan); + List results = Lists.newArrayList(); + scanner.next(results); + assertEquals(2, results.size()); + // delete marker should appear first + Cell cell = results.get(0); + assertArrayEquals(new byte[0], cell.getQualifier()); + assertArrayEquals(new byte[0], cell.getValue()); + assertEquals(now - 5, cell.getTimestamp()); + // since this is an unfiltered scan against the region, the original put should be next + cell = results.get(1); + assertArrayEquals(valBytes, cell.getValue()); + assertEquals(now - 10, cell.getTimestamp()); + scanner.close(); + + + // with a filtered scan the original put should disappear + scan = new Scan(); + scan.setMaxVersions(); + scan.setFilter(new TransactionVisibilityFilter( + TxUtils.createDummyTransaction(txSnapshot), new TreeMap(), false, ScanType.USER_SCAN)); + scanner = region.getScanner(scan); + results = Lists.newArrayList(); + scanner.next(results); + assertEquals(0, results.size()); + scanner.close(); + } finally { + region.close(); + } + } + private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor cfd = new HColumnDescriptor(family);