Skip to content
This repository has been archived by the owner on Nov 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #110 from caskdata/feature/co-processor-filter
Browse files Browse the repository at this point in the history
TEPHRA-162 Ensure TransactionVisibilityFilter cooperates with other filters
  • Loading branch information
poornachandra committed Jan 8, 2016
2 parents 0eaa9e0 + 50b8936 commit 7ba342a
Show file tree
Hide file tree
Showing 20 changed files with 993 additions and 305 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import co.cask.tephra.TxConstants;
import co.cask.tephra.coprocessor.TransactionStateCache;
import co.cask.tephra.coprocessor.TransactionStateCacheSupplier;
import co.cask.tephra.hbase96.Filters;
import co.cask.tephra.persist.TransactionVisibilityState;
import co.cask.tephra.util.TxUtils;

import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -160,7 +157,7 @@ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, L
get.setMaxVersions();
get.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
TxUtils.getMaxVisibleTimestamp(tx));
Filter newFilter = Filters.combine(getTransactionFilter(tx, ScanType.USER_SCAN), get.getFilter());
Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, get.getFilter());
get.setFilter(newFilter);
}
}
Expand Down Expand Up @@ -216,7 +213,7 @@ public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment
scan.setMaxVersions();
scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData),
TxUtils.getMaxVisibleTimestamp(tx));
Filter newFilter = Filters.combine(getTransactionFilter(tx, ScanType.USER_SCAN), scan.getFilter());
Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter());
scan.setFilter(newFilter);
}
return s;
Expand Down Expand Up @@ -294,7 +291,7 @@ protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, S
scan.setFilter(
new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(),
snapshot.getInvalid(),
getTransactionFilter(dummyTx, type)));
getTransactionFilter(dummyTx, type, null)));

return new StoreScanner(store, store.getScanInfo(), scan, scanners,
type, store.getSmallestReadPoint(), earliestPutTs);
Expand All @@ -313,10 +310,10 @@ private Transaction getFromOperation(OperationWithAttributes op) throws IOExcept
* transaction.
*
* @param tx the current transaction to apply
* @param scanType the type of scan operation being performed
* @param type the type of scan operation being performed
*/
protected Filter getTransactionFilter(Transaction tx, ScanType scanType) {
return new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType);
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
return new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -115,14 +116,18 @@ public ReturnCode filterKeyValue(Cell cell) throws IOException {
} else if (tx.isVisible(kvTimestamp)) {
// Return all writes done by current transaction (including deletes) for VisibilityLevel.SNAPSHOT_ALL
if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL && tx.isCurrentWrite(kvTimestamp)) {
return ReturnCode.INCLUDE;
// cell is visible
// visibility SNAPSHOT_ALL needs all matches
return runSubFilter(ReturnCode.INCLUDE, cell);
}
if (DeleteTracker.isFamilyDelete(cell)) {
deleteTracker.addFamilyDelete(cell);
if (clearDeletes) {
return ReturnCode.NEXT_COL;
} else {
return ReturnCode.INCLUDE_AND_NEXT_COL;
// cell is visible
// as soon as we find a KV to include we can move to the next column
return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
}
}
// check if masked by family delete
Expand All @@ -136,21 +141,43 @@ public ReturnCode filterKeyValue(Cell cell) throws IOException {
return ReturnCode.NEXT_COL;
} else {
// keep the marker but skip any remaining versions
return ReturnCode.INCLUDE_AND_NEXT_COL;
return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
}
}
// cell is visible
if (cellFilter != null) {
return cellFilter.filterKeyValue(cell);
} else {
// as soon as we find a KV to include we can move to the next column
return ReturnCode.INCLUDE_AND_NEXT_COL;
}
// as soon as we find a KV to include we can move to the next column
return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell);
} else {
return ReturnCode.SKIP;
}
}

private ReturnCode runSubFilter(ReturnCode includeCode, Cell cell) throws IOException {
if (cellFilter != null) {
ReturnCode filterCode = cellFilter.filterKeyValue(cell);
// Return the more restrictive of the two filter responses
switch (filterCode) {
case INCLUDE:
return includeCode;
case INCLUDE_AND_NEXT_COL:
return ReturnCode.INCLUDE_AND_NEXT_COL;
case SKIP:
return includeCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL;
default:
return filterCode;
}
}
return includeCode;
}

@Override
public boolean filterRow() throws IOException {
if (cellFilter != null) {
return cellFilter.filterRow();
}
return super.filterRow();
}

@Override
public Cell transformCell(Cell cell) throws IOException {
// Convert Tephra deletes back into HBase deletes
Expand All @@ -172,8 +199,69 @@ public Cell transformCell(Cell cell) throws IOException {
}

@Override
public void reset() {
public void reset() throws IOException {
deleteTracker.reset();
if (cellFilter != null) {
cellFilter.reset();
}
}

@Override
public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
if (cellFilter != null) {
return cellFilter.filterRowKey(buffer, offset, length);
}
return super.filterRowKey(buffer, offset, length);
}

@Override
public boolean filterAllRemaining() throws IOException {
if (cellFilter != null) {
return cellFilter.filterAllRemaining();
}
return super.filterAllRemaining();
}

@Override
public void filterRowCells(List<Cell> kvs) throws IOException {
if (cellFilter != null) {
cellFilter.filterRowCells(kvs);
} else {
super.filterRowCells(kvs);
}
}

@Override
public boolean hasFilterRow() {
if (cellFilter != null) {
return cellFilter.hasFilterRow();
}
return super.hasFilterRow();
}

@SuppressWarnings("deprecation")
@Override
public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
if (cellFilter != null) {
return cellFilter.getNextKeyHint(currentKV);
}
return super.getNextKeyHint(currentKV);
}

@Override
public Cell getNextCellHint(Cell currentKV) throws IOException {
if (cellFilter != null) {
return cellFilter.getNextCellHint(currentKV);
}
return super.getNextCellHint(currentKV);
}

@Override
public boolean isFamilyEssential(byte[] name) throws IOException {
if (cellFilter != null) {
return cellFilter.isFamilyEssential(name);
}
return super.isFamilyEssential(name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import co.cask.tephra.persist.TransactionStateStorage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
Expand All @@ -52,6 +53,9 @@
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
Expand Down Expand Up @@ -1457,4 +1461,93 @@ public void testVisibilityAll() throws Exception {
verifyScan(txTable, scan, expected);
txContext.finish();
}

@Test
public void testFilters() throws Exception {
// Add some values to table
transactionContext.start();
Put put = new Put(TestBytes.row);
byte[] val1 = Bytes.toBytes(1L);
put.add(TestBytes.family, TestBytes.qualifier, val1);
transactionAwareHTable.put(put);
put = new Put(TestBytes.row2);
byte[] val2 = Bytes.toBytes(2L);
put.add(TestBytes.family, TestBytes.qualifier, val2);
transactionAwareHTable.put(put);
put = new Put(TestBytes.row3);
byte[] val3 = Bytes.toBytes(3L);
put.add(TestBytes.family, TestBytes.qualifier, val3);
transactionAwareHTable.put(put);
put = new Put(TestBytes.row4);
byte[] val4 = Bytes.toBytes(4L);
put.add(TestBytes.family, TestBytes.qualifier, val4);
transactionAwareHTable.put(put);
transactionContext.finish();

// Delete cell with value 2
transactionContext.start();
Delete delete = new Delete(TestBytes.row2);
delete.deleteColumn(TestBytes.family, TestBytes.qualifier);
transactionAwareHTable.delete(delete);
transactionContext.finish();

// Scan for values less than 4, should get only values 1 and 3
transactionContext.start();
Scan scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS,
new BinaryComparator(Longs.toByteArray(4))));
try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) {
Result result = scanner.next();
assertNotNull(result);
assertArrayEquals(TestBytes.row, result.getRow());
assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier));
result = scanner.next();
assertNotNull(result);
assertArrayEquals(TestBytes.row3, result.getRow());
assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier));
result = scanner.next();
assertNull(result);
}
transactionContext.finish();

// Run a Get with a filter for less than 10 on row4, should get value 4
transactionContext.start();
Get get = new Get(TestBytes.row4);
get.setFilter(new ValueFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Longs.toByteArray(10))));
Result result = transactionAwareHTable.get(get);
assertFalse(result.isEmpty());
assertArrayEquals(val4, result.getValue(TestBytes.family, TestBytes.qualifier));
transactionContext.finish();

// Change value of row4 to 40
transactionContext.start();
put = new Put(TestBytes.row4);
byte[] val40 = Bytes.toBytes(40L);
put.add(TestBytes.family, TestBytes.qualifier, val40);
transactionAwareHTable.put(put);
transactionContext.finish();

// Scan for values less than 10, should get only values 1 and 3
transactionContext.start();
scan = new Scan(TestBytes.row, new ValueFilter(CompareFilter.CompareOp.LESS,
new BinaryComparator(Longs.toByteArray(10))));
try (ResultScanner scanner = transactionAwareHTable.getScanner(scan)) {
result = scanner.next();
assertNotNull(result);
assertArrayEquals(TestBytes.row, result.getRow());
assertArrayEquals(val1, result.getValue(TestBytes.family, TestBytes.qualifier));
result = scanner.next();
assertNotNull(result);
assertArrayEquals(TestBytes.row3, result.getRow());
assertArrayEquals(val3, result.getValue(TestBytes.family, TestBytes.qualifier));
result = scanner.next();
assertNull(result);
}
transactionContext.finish();

// Run the Get again with a filter for less than 10 on row4, this time should not get any results
transactionContext.start();
result = transactionAwareHTable.get(get);
assertTrue(result.isEmpty());
transactionContext.finish();
}
}

This file was deleted.

Loading

0 comments on commit 7ba342a

Please sign in to comment.