Skip to content

Commit

Permalink
Add GSI/LSI pushdown support for Hive
Browse files Browse the repository at this point in the history
* Check LSIs and GSIs for optimal pushdown key conditions if not both hash key and range key for the DynamoDB table are found in query predicate
* Pull analyzer initialization outside methods
* Update pushdown eligibleHiveType to include binary type since it is also allowed for primary key attributes according to [Amazon DynamoDB doc](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.CoreComponents.html#HowItWorks.CoreComponents.PrimaryKey)
* Add unit tests for DynamoDBFilterPushdown.java
  • Loading branch information
luyuanhao authored and JunyangLi committed Sep 17, 2020
1 parent 8d96fcd commit be5320d
Show file tree
Hide file tree
Showing 7 changed files with 757 additions and 26 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ TBLPROPERTIES (

`dynamodb.type.mapping` and `dynamodb.null.serialization` are optional parameters.

Hive query will automatically choose the most suitable secondary index if there is any based on the
search condition. For an index that can be chosen, it should have following properties:
1. It has all its index keys in Hive query search condition;
2. It contains all the DynamoDB attributes mentioned in `dynamodb.column.mapping`. (If you have to
map more columns than index attributes in your Hive table but still want to use an index when
running queries that only select the attributes within that index, consider create another
Hive table and narrow down the mappings to only include the index attributes. Use that table for
reading the index attributes to reduce table scans)

## Example: Input/Output Formats with Spark
Using the DynamoDBInputFormat and DynamoDBOutputFormat classes with `spark-shell`:
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.RetryResult;
import org.apache.hadoop.dynamodb.filter.DynamoDBIndexInfo;
import org.apache.hadoop.dynamodb.filter.DynamoDBQueryFilter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
Expand Down Expand Up @@ -179,6 +180,12 @@ public RetryResult<QueryResult> queryTable(
.withLimit(Ints.checkedCast(limit))
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);

DynamoDBIndexInfo index = dynamoDBQueryFilter.getIndex();
if (index != null) {
log.debug("Using DynamoDB index: " + index.getIndexName());
queryRequest.setIndexName(index.getIndexName());
}

RetryResult<QueryResult> retryResult = getRetryDriver().runWithRetry(
new Callable<QueryResult>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.apache.hadoop.dynamodb.filter;

import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.Projection;
import java.util.List;

public class DynamoDBIndexInfo {

private String indexName;

private List<KeySchemaElement> indexSchema;
private Projection indexProjection;

public DynamoDBIndexInfo(String indexName,
List<KeySchemaElement> indexSchema,
Projection indexProjection) {
this.indexName = indexName;
this.indexSchema = indexSchema;
this.indexProjection = indexProjection;
}

public String getIndexName() {
return indexName;
}

public void setIndexName(String indexName) {
this.indexName = indexName;
}

public List<KeySchemaElement> getIndexSchema() {
return indexSchema;
}

public void setIndexSchema(
List<KeySchemaElement> indexSchema) {
this.indexSchema = indexSchema;
}

public Projection getIndexProjection() {
return indexProjection;
}

public void setIndexProjection(Projection indexProjection) {
this.indexProjection = indexProjection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ public class DynamoDBQueryFilter {
private final Map<String, Condition> keyConditions = new HashMap<>();
private final Map<String, Condition> scanFilter = new HashMap<>();

private DynamoDBIndexInfo index;

public DynamoDBIndexInfo getIndex() {
return index;
}

public void setIndex(DynamoDBIndexInfo index) {
this.index = index;
}

public Map<String, Condition> getKeyConditions() {
return keyConditions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@

package org.apache.hadoop.hive.dynamodb.filter;

import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndexDescription;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndexDescription;
import com.amazonaws.services.dynamodbv2.model.Projection;
import com.amazonaws.services.dynamodbv2.model.ProjectionType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.dynamodb.filter.DynamoDBFilter;
import org.apache.hadoop.dynamodb.filter.DynamoDBFilterOperator;
import org.apache.hadoop.dynamodb.filter.DynamoDBIndexInfo;
import org.apache.hadoop.dynamodb.filter.DynamoDBQueryFilter;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
Expand All @@ -39,13 +45,19 @@ public class DynamoDBFilterPushdown {

private final Set<String> eligibleHiveTypes = new HashSet<>();
private final Set<DynamoDBFilterOperator> eligibleOperatorsForRange = new HashSet<>();
private final IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();

private static final int HASH_KEY_INDEX = 0;
private static final int RANGE_KEY_INDEX = 1;
private static final String DYNAMODB_KEY_TYPE_HASH = "HASH";

public DynamoDBFilterPushdown() {
eligibleHiveTypes.add(serdeConstants.DOUBLE_TYPE_NAME);
eligibleHiveTypes.add(serdeConstants.BIGINT_TYPE_NAME);
eligibleHiveTypes.add(serdeConstants.STRING_TYPE_NAME);
eligibleHiveTypes.add(serdeConstants.BINARY_TYPE_NAME);

// Not all scan operators are supported by DynanoDB Query API
// Not all scan operators are supported by DynamoDB Query API
eligibleOperatorsForRange.add(DynamoDBFilterOperator.EQ);
eligibleOperatorsForRange.add(DynamoDBFilterOperator.LE);
eligibleOperatorsForRange.add(DynamoDBFilterOperator.LT);
Expand All @@ -65,7 +77,6 @@ public DecomposedPredicate pushPredicate(Map<String, String> hiveTypeMapping, Ex
} else {
List<IndexSearchCondition> finalSearchCondition =
prioritizeSearchConditions(searchConditions);
IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
decomposedPredicate.pushedPredicate =
analyzer.translateSearchConditions(finalSearchCondition);
Expand All @@ -74,8 +85,12 @@ public DecomposedPredicate pushPredicate(Map<String, String> hiveTypeMapping, Ex
}
}

public DynamoDBQueryFilter predicateToDynamoDBFilter(List<KeySchemaElement> schema, Map<String,
String> hiveDynamoDBMapping, Map<String, String> hiveTypeMapping, ExprNodeDesc predicate) {
public DynamoDBQueryFilter predicateToDynamoDBFilter(List<KeySchemaElement> schema,
List<LocalSecondaryIndexDescription> localSecondaryIndexes,
List<GlobalSecondaryIndexDescription> globalSecondaryIndexes,
Map<String, String> hiveDynamoDBMapping,
Map<String, String> hiveTypeMapping,
ExprNodeDesc predicate) {
List<IndexSearchCondition> searchConditions = getGenericSearchConditions(hiveTypeMapping,
predicate);

Expand Down Expand Up @@ -111,7 +126,11 @@ public DynamoDBQueryFilter predicateToDynamoDBFilter(List<KeySchemaElement> sche
}
}

return getDynamoDBQueryFilter(schema, filterMap);
return getDynamoDBQueryFilter(schema,
localSecondaryIndexes,
globalSecondaryIndexes,
hiveDynamoDBMapping,
filterMap);
}

private boolean isBetweenFilter(DynamoDBFilterOperator op1, DynamoDBFilterOperator op2) {
Expand Down Expand Up @@ -139,7 +158,7 @@ private DynamoDBFilter getBetweenFilter(DynamoDBFilter filter1, DynamoDBFilter f
private List<IndexSearchCondition> getGenericSearchConditions(Map<String, String> hiveTypeMapping,
ExprNodeDesc predicate) {

IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
analyzer.clearAllowedColumnNames();

// DynamoDB does not support filters on columns of types set
for (Entry<String, String> entry : hiveTypeMapping.entrySet()) {
Expand Down Expand Up @@ -204,40 +223,166 @@ private List<IndexSearchCondition> prioritizeSearchConditions(List<IndexSearchCo
}

/*
* This method sets the query filter / scan filter parameters appropriately
* This method sets the query filter / scan filter parameters appropriately.
* It first check table key schema if any query condition match with table hash and range key.
* If not all of table keys are found in query conditions, check following cases:
*
* 1. If no table key is found matching with the query conditions, iterate through GSIs
* (Global Secondary Indexes) to see if we can find both hash key and range key of a GSI in the
* query conditions. NOTE: If we only find GSI hash key but not range key (for GSI has 2 keys), we
* still cannot use this GSI because items might be missed in the query result.
* e.g. item {'hk': 'hash key value', 'rk': 'range key value', 'column1': 'value'}
* will not exist in a GSI with 'column1' as hash key and 'column2' as range key.
* So query with "where column1 = 'value'" will not return the item if we use this GSI
* since the item is not written to the GSI due to missing of index range key 'column2'
*
* 2. If only table hash key is found, iterate through LSIs (Local Secondary Indexes) to see if
* we can find a LSI having index range key match with any query condition.
*/
private DynamoDBQueryFilter getDynamoDBQueryFilter(List<KeySchemaElement> schema, Map<String,
DynamoDBFilter> filterMap) {
private DynamoDBQueryFilter getDynamoDBQueryFilter(List<KeySchemaElement> schema,
List<LocalSecondaryIndexDescription> localSecondaryIndexes,
List<GlobalSecondaryIndexDescription> globalSecondaryIndexes,
Map<String, String> hiveDynamoDBMapping,
Map<String, DynamoDBFilter> filterMap) {

DynamoDBQueryFilter filter = new DynamoDBQueryFilter();

List<DynamoDBFilter> keyFiltersUseForQuery = getDynamoDBFiltersFromSchema(schema, filterMap);
DynamoDBIndexInfo indexUseForQuery = null;

if (keyFiltersUseForQuery.size() < schema.size()) {
if (keyFiltersUseForQuery.size() == 0 && globalSecondaryIndexes != null) {
// Hash key not found. Check GSIs.
indexUseForQuery = getIndexUseForQuery(
schema,
globalSecondaryIndexes.stream()
.map(index -> new DynamoDBIndexInfo(index.getIndexName(),
index.getKeySchema(), index.getProjection()))
.collect(Collectors.toList()),
hiveDynamoDBMapping,
filterMap,
keyFiltersUseForQuery);

// Don't use GSI when it is not a fully match.
// Refer to method comment for detailed explanation
if (indexUseForQuery != null
&& indexUseForQuery.getIndexSchema().size() > keyFiltersUseForQuery.size()) {
indexUseForQuery = null;
keyFiltersUseForQuery.clear();
}
} else if (keyFiltersUseForQuery.size() == 1 && localSecondaryIndexes != null) {
// Hash key found but Range key not found. Check LSIs.
indexUseForQuery = getIndexUseForQuery(
schema,
localSecondaryIndexes.stream()
.map(index -> new DynamoDBIndexInfo(index.getIndexName(),
index.getKeySchema(), index.getProjection()))
.collect(Collectors.toList()),
hiveDynamoDBMapping,
filterMap,
keyFiltersUseForQuery);
}
}

if (indexUseForQuery != null) {
log.info("Setting index name used for query: " + indexUseForQuery.getIndexName());
filter.setIndex(indexUseForQuery);
}
for (DynamoDBFilter f : keyFiltersUseForQuery) {
filter.addKeyCondition(f);
}
for (DynamoDBFilter f : filterMap.values()) {
if (!filter.getKeyConditions().containsKey(f.getColumnName())) {
filter.addScanFilter(f);
}
}
return filter;
}

/*
* This method has side effect to update keyFiltersUseForQuery to be the optimal combination
* seen so far.
* The return value is the most efficient index for querying the matched items.
*/
private DynamoDBIndexInfo getIndexUseForQuery(List<KeySchemaElement> tableSchema,
List<DynamoDBIndexInfo> indexes,
Map<String, String> hiveDynamoDBMapping,
Map<String, DynamoDBFilter> filterMap,
List<DynamoDBFilter> keyFiltersUseForQuery) {
DynamoDBIndexInfo indexUseForQuery = null;
for (DynamoDBIndexInfo index : indexes) {
List<DynamoDBFilter> indexFilterList =
getDynamoDBFiltersFromSchema(index.getIndexSchema(), filterMap);
if (indexFilterList.size() > keyFiltersUseForQuery.size()
&& indexIncludesAllMappedAttributes(tableSchema, index, hiveDynamoDBMapping)) {
keyFiltersUseForQuery.clear();
keyFiltersUseForQuery.addAll(indexFilterList);
indexUseForQuery = index;
if (keyFiltersUseForQuery.size() == 2) {
break;
}
}
}
return indexUseForQuery;
}

private boolean indexIncludesAllMappedAttributes(List<KeySchemaElement> tableSchema,
DynamoDBIndexInfo index, Map<String, String> hiveDynamoDBMapping) {
Projection indexProjection = index.getIndexProjection();
if (ProjectionType.ALL.toString().equals(indexProjection.getProjectionType())) {
return true;
}

Set<String> projectionAttributes = new HashSet<>();
for (KeySchemaElement keySchemaElement: tableSchema) {
projectionAttributes.add(keySchemaElement.getAttributeName());
}
for (KeySchemaElement keySchemaElement: index.getIndexSchema()) {
projectionAttributes.add(keySchemaElement.getAttributeName());
}
if (ProjectionType.INCLUDE.toString().equals(indexProjection.getProjectionType())) {
projectionAttributes.addAll(indexProjection.getNonKeyAttributes());
}

log.info("Checking if all mapped attributes " + hiveDynamoDBMapping.values()
+ " are included in the index " + index.getIndexName()
+ " having attributes " + projectionAttributes);
for (String queriedAttribute : hiveDynamoDBMapping.values()) {
if (!projectionAttributes.contains(queriedAttribute)) {
log.info("Not all mapped attributes are included in the index. Won't use index: "
+ index.getIndexName());
return false;
}
}
return true;
}

private List<DynamoDBFilter> getDynamoDBFiltersFromSchema(List<KeySchemaElement> schema,
Map<String, DynamoDBFilter> filterMap) {
List<DynamoDBFilter> dynamoDBFilters = new ArrayList<>();

boolean hashKeyFilterExists = false;
if (schema.size() > 0 && "HASH".equals(schema.get(0).getKeyType())) {
String hashKeyName = schema.get(0).getAttributeName();
if (schema.size() > 0
&& DYNAMODB_KEY_TYPE_HASH.equals(schema.get(HASH_KEY_INDEX).getKeyType())) {
String hashKeyName = schema.get(HASH_KEY_INDEX).getAttributeName();
if (filterMap.containsKey(hashKeyName)) {
DynamoDBFilter hashKeyFilter = filterMap.get(hashKeyName);
if (DynamoDBFilterOperator.EQ.equals(hashKeyFilter.getOperator())) {
filter.addKeyCondition(hashKeyFilter);
dynamoDBFilters.add(hashKeyFilter);
hashKeyFilterExists = true;
}
}
}

if (hashKeyFilterExists && schema.size() > 1) {
String rangeKeyName = schema.get(1).getAttributeName();
String rangeKeyName = schema.get(RANGE_KEY_INDEX).getAttributeName();
if (filterMap.containsKey(rangeKeyName)) {
DynamoDBFilter rangeKeyFilter = filterMap.get(rangeKeyName);
if (eligibleOperatorsForRange.contains(rangeKeyFilter.getOperator())) {
filter.addKeyCondition(rangeKeyFilter);
dynamoDBFilters.add(rangeKeyFilter);
}
}
}

for (DynamoDBFilter f : filterMap.values()) {
if (!filter.getKeyConditions().containsKey(f.getColumnName())) {
filter.addScanFilter(f);
}
}

return filter;
return dynamoDBFilters;
}
}
Loading

0 comments on commit be5320d

Please sign in to comment.