diff --git a/README.md b/README.md index 27e89696..23d93a5e 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,15 @@ TBLPROPERTIES ( `dynamodb.type.mapping` and `dynamodb.null.serialization` are optional parameters. +Hive query will automatically choose the most suitable secondary index if there is any based on the +search condition. For an index that can be chosen, it should have following properties: +1. It has all its index keys in Hive query search condition; +2. It contains all the DynamoDB attributes mentioned in `dynamodb.column.mapping`. (If you have to +map more columns than index attributes in your Hive table but still want to use an index when +running queries that only select the attributes within that index, consider create another +Hive table and narrow down the mappings to only include the index attributes. Use that table for +reading the index attributes to reduce table scans) + ## Example: Input/Output Formats with Spark Using the DynamoDBInputFormat and DynamoDBOutputFormat classes with `spark-shell`: ``` diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java index 110a2485..69105024 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java @@ -63,6 +63,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.RetryResult; +import org.apache.hadoop.dynamodb.filter.DynamoDBIndexInfo; import org.apache.hadoop.dynamodb.filter.DynamoDBQueryFilter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; @@ -179,6 +180,12 @@ public RetryResult queryTable( .withLimit(Ints.checkedCast(limit)) .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); + DynamoDBIndexInfo index = dynamoDBQueryFilter.getIndex(); + if (index != null) { + log.debug("Using DynamoDB index: " + index.getIndexName()); + queryRequest.setIndexName(index.getIndexName()); + } + RetryResult retryResult = getRetryDriver().runWithRetry( new Callable() { @Override diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/filter/DynamoDBIndexInfo.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/filter/DynamoDBIndexInfo.java new file mode 100644 index 00000000..b2fa97ec --- /dev/null +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/filter/DynamoDBIndexInfo.java @@ -0,0 +1,46 @@ +package org.apache.hadoop.dynamodb.filter; + +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.Projection; +import java.util.List; + +public class DynamoDBIndexInfo { + + private String indexName; + + private List indexSchema; + private Projection indexProjection; + + public DynamoDBIndexInfo(String indexName, + List indexSchema, + Projection indexProjection) { + this.indexName = indexName; + this.indexSchema = indexSchema; + this.indexProjection = indexProjection; + } + + public String getIndexName() { + return indexName; + } + + public void setIndexName(String indexName) { + this.indexName = indexName; + } + + public List getIndexSchema() { + return indexSchema; + } + + public void setIndexSchema( + List indexSchema) { + this.indexSchema = indexSchema; + } + + public Projection getIndexProjection() { + return indexProjection; + } + + public void setIndexProjection(Projection indexProjection) { + this.indexProjection = indexProjection; + } +} diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/filter/DynamoDBQueryFilter.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/filter/DynamoDBQueryFilter.java index d14d5f38..246e4fdb 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/filter/DynamoDBQueryFilter.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/filter/DynamoDBQueryFilter.java @@ -22,6 +22,16 @@ public class DynamoDBQueryFilter { private final Map keyConditions = new HashMap<>(); private final Map scanFilter = new HashMap<>(); + private DynamoDBIndexInfo index; + + public DynamoDBIndexInfo getIndex() { + return index; + } + + public void setIndex(DynamoDBIndexInfo index) { + this.index = index; + } + public Map getKeyConditions() { return keyConditions; } diff --git a/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/filter/DynamoDBFilterPushdown.java b/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/filter/DynamoDBFilterPushdown.java index 58421358..696d9a9a 100644 --- a/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/filter/DynamoDBFilterPushdown.java +++ b/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/filter/DynamoDBFilterPushdown.java @@ -13,7 +13,11 @@ package org.apache.hadoop.hive.dynamodb.filter; +import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndexDescription; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndexDescription; +import com.amazonaws.services.dynamodbv2.model.Projection; +import com.amazonaws.services.dynamodbv2.model.ProjectionType; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -21,10 +25,12 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.dynamodb.filter.DynamoDBFilter; import org.apache.hadoop.dynamodb.filter.DynamoDBFilterOperator; +import org.apache.hadoop.dynamodb.filter.DynamoDBIndexInfo; import org.apache.hadoop.dynamodb.filter.DynamoDBQueryFilter; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; @@ -39,13 +45,19 @@ public class DynamoDBFilterPushdown { private final Set eligibleHiveTypes = new HashSet<>(); private final Set eligibleOperatorsForRange = new HashSet<>(); + private final IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); + + private static final int HASH_KEY_INDEX = 0; + private static final int RANGE_KEY_INDEX = 1; + private static final String DYNAMODB_KEY_TYPE_HASH = "HASH"; public DynamoDBFilterPushdown() { eligibleHiveTypes.add(serdeConstants.DOUBLE_TYPE_NAME); eligibleHiveTypes.add(serdeConstants.BIGINT_TYPE_NAME); eligibleHiveTypes.add(serdeConstants.STRING_TYPE_NAME); + eligibleHiveTypes.add(serdeConstants.BINARY_TYPE_NAME); - // Not all scan operators are supported by DynanoDB Query API + // Not all scan operators are supported by DynamoDB Query API eligibleOperatorsForRange.add(DynamoDBFilterOperator.EQ); eligibleOperatorsForRange.add(DynamoDBFilterOperator.LE); eligibleOperatorsForRange.add(DynamoDBFilterOperator.LT); @@ -65,7 +77,6 @@ public DecomposedPredicate pushPredicate(Map hiveTypeMapping, Ex } else { List finalSearchCondition = prioritizeSearchConditions(searchConditions); - IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(finalSearchCondition); @@ -74,8 +85,12 @@ public DecomposedPredicate pushPredicate(Map hiveTypeMapping, Ex } } - public DynamoDBQueryFilter predicateToDynamoDBFilter(List schema, Map hiveDynamoDBMapping, Map hiveTypeMapping, ExprNodeDesc predicate) { + public DynamoDBQueryFilter predicateToDynamoDBFilter(List schema, + List localSecondaryIndexes, + List globalSecondaryIndexes, + Map hiveDynamoDBMapping, + Map hiveTypeMapping, + ExprNodeDesc predicate) { List searchConditions = getGenericSearchConditions(hiveTypeMapping, predicate); @@ -111,7 +126,11 @@ public DynamoDBQueryFilter predicateToDynamoDBFilter(List sche } } - return getDynamoDBQueryFilter(schema, filterMap); + return getDynamoDBQueryFilter(schema, + localSecondaryIndexes, + globalSecondaryIndexes, + hiveDynamoDBMapping, + filterMap); } private boolean isBetweenFilter(DynamoDBFilterOperator op1, DynamoDBFilterOperator op2) { @@ -139,7 +158,7 @@ private DynamoDBFilter getBetweenFilter(DynamoDBFilter filter1, DynamoDBFilter f private List getGenericSearchConditions(Map hiveTypeMapping, ExprNodeDesc predicate) { - IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); + analyzer.clearAllowedColumnNames(); // DynamoDB does not support filters on columns of types set for (Entry entry : hiveTypeMapping.entrySet()) { @@ -204,40 +223,166 @@ private List prioritizeSearchConditions(List schema, Map filterMap) { + private DynamoDBQueryFilter getDynamoDBQueryFilter(List schema, + List localSecondaryIndexes, + List globalSecondaryIndexes, + Map hiveDynamoDBMapping, + Map filterMap) { + DynamoDBQueryFilter filter = new DynamoDBQueryFilter(); + List keyFiltersUseForQuery = getDynamoDBFiltersFromSchema(schema, filterMap); + DynamoDBIndexInfo indexUseForQuery = null; + + if (keyFiltersUseForQuery.size() < schema.size()) { + if (keyFiltersUseForQuery.size() == 0 && globalSecondaryIndexes != null) { + // Hash key not found. Check GSIs. + indexUseForQuery = getIndexUseForQuery( + schema, + globalSecondaryIndexes.stream() + .map(index -> new DynamoDBIndexInfo(index.getIndexName(), + index.getKeySchema(), index.getProjection())) + .collect(Collectors.toList()), + hiveDynamoDBMapping, + filterMap, + keyFiltersUseForQuery); + + // Don't use GSI when it is not a fully match. + // Refer to method comment for detailed explanation + if (indexUseForQuery != null + && indexUseForQuery.getIndexSchema().size() > keyFiltersUseForQuery.size()) { + indexUseForQuery = null; + keyFiltersUseForQuery.clear(); + } + } else if (keyFiltersUseForQuery.size() == 1 && localSecondaryIndexes != null) { + // Hash key found but Range key not found. Check LSIs. + indexUseForQuery = getIndexUseForQuery( + schema, + localSecondaryIndexes.stream() + .map(index -> new DynamoDBIndexInfo(index.getIndexName(), + index.getKeySchema(), index.getProjection())) + .collect(Collectors.toList()), + hiveDynamoDBMapping, + filterMap, + keyFiltersUseForQuery); + } + } + + if (indexUseForQuery != null) { + log.info("Setting index name used for query: " + indexUseForQuery.getIndexName()); + filter.setIndex(indexUseForQuery); + } + for (DynamoDBFilter f : keyFiltersUseForQuery) { + filter.addKeyCondition(f); + } + for (DynamoDBFilter f : filterMap.values()) { + if (!filter.getKeyConditions().containsKey(f.getColumnName())) { + filter.addScanFilter(f); + } + } + return filter; + } + + /* + * This method has side effect to update keyFiltersUseForQuery to be the optimal combination + * seen so far. + * The return value is the most efficient index for querying the matched items. + */ + private DynamoDBIndexInfo getIndexUseForQuery(List tableSchema, + List indexes, + Map hiveDynamoDBMapping, + Map filterMap, + List keyFiltersUseForQuery) { + DynamoDBIndexInfo indexUseForQuery = null; + for (DynamoDBIndexInfo index : indexes) { + List indexFilterList = + getDynamoDBFiltersFromSchema(index.getIndexSchema(), filterMap); + if (indexFilterList.size() > keyFiltersUseForQuery.size() + && indexIncludesAllMappedAttributes(tableSchema, index, hiveDynamoDBMapping)) { + keyFiltersUseForQuery.clear(); + keyFiltersUseForQuery.addAll(indexFilterList); + indexUseForQuery = index; + if (keyFiltersUseForQuery.size() == 2) { + break; + } + } + } + return indexUseForQuery; + } + + private boolean indexIncludesAllMappedAttributes(List tableSchema, + DynamoDBIndexInfo index, Map hiveDynamoDBMapping) { + Projection indexProjection = index.getIndexProjection(); + if (ProjectionType.ALL.toString().equals(indexProjection.getProjectionType())) { + return true; + } + + Set projectionAttributes = new HashSet<>(); + for (KeySchemaElement keySchemaElement: tableSchema) { + projectionAttributes.add(keySchemaElement.getAttributeName()); + } + for (KeySchemaElement keySchemaElement: index.getIndexSchema()) { + projectionAttributes.add(keySchemaElement.getAttributeName()); + } + if (ProjectionType.INCLUDE.toString().equals(indexProjection.getProjectionType())) { + projectionAttributes.addAll(indexProjection.getNonKeyAttributes()); + } + + log.info("Checking if all mapped attributes " + hiveDynamoDBMapping.values() + + " are included in the index " + index.getIndexName() + + " having attributes " + projectionAttributes); + for (String queriedAttribute : hiveDynamoDBMapping.values()) { + if (!projectionAttributes.contains(queriedAttribute)) { + log.info("Not all mapped attributes are included in the index. Won't use index: " + + index.getIndexName()); + return false; + } + } + return true; + } + + private List getDynamoDBFiltersFromSchema(List schema, + Map filterMap) { + List dynamoDBFilters = new ArrayList<>(); + boolean hashKeyFilterExists = false; - if (schema.size() > 0 && "HASH".equals(schema.get(0).getKeyType())) { - String hashKeyName = schema.get(0).getAttributeName(); + if (schema.size() > 0 + && DYNAMODB_KEY_TYPE_HASH.equals(schema.get(HASH_KEY_INDEX).getKeyType())) { + String hashKeyName = schema.get(HASH_KEY_INDEX).getAttributeName(); if (filterMap.containsKey(hashKeyName)) { DynamoDBFilter hashKeyFilter = filterMap.get(hashKeyName); if (DynamoDBFilterOperator.EQ.equals(hashKeyFilter.getOperator())) { - filter.addKeyCondition(hashKeyFilter); + dynamoDBFilters.add(hashKeyFilter); hashKeyFilterExists = true; } } } if (hashKeyFilterExists && schema.size() > 1) { - String rangeKeyName = schema.get(1).getAttributeName(); + String rangeKeyName = schema.get(RANGE_KEY_INDEX).getAttributeName(); if (filterMap.containsKey(rangeKeyName)) { DynamoDBFilter rangeKeyFilter = filterMap.get(rangeKeyName); if (eligibleOperatorsForRange.contains(rangeKeyFilter.getOperator())) { - filter.addKeyCondition(rangeKeyFilter); + dynamoDBFilters.add(rangeKeyFilter); } } } - - for (DynamoDBFilter f : filterMap.values()) { - if (!filter.getKeyConditions().containsKey(f.getColumnName())) { - filter.addScanFilter(f); - } - } - - return filter; + return dynamoDBFilters; } } diff --git a/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/read/HiveDynamoDBInputFormat.java b/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/read/HiveDynamoDBInputFormat.java index ee379a69..75f30d56 100644 --- a/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/read/HiveDynamoDBInputFormat.java +++ b/emr-dynamodb-hive/src/main/java/org/apache/hadoop/hive/dynamodb/read/HiveDynamoDBInputFormat.java @@ -13,7 +13,7 @@ package org.apache.hadoop.hive.dynamodb.read; -import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.TableDescription; import java.io.IOException; import java.util.Collection; import java.util.List; @@ -149,10 +149,13 @@ private DynamoDBQueryFilter getQueryFilter(JobConf conf, Map ShimsLoader.getHiveShims().deserializeExpression(filterExprSerialized); DynamoDBFilterPushdown pushdown = new DynamoDBFilterPushdown(); - List schema = - client.describeTable(conf.get(DynamoDBConstants.TABLE_NAME)).getKeySchema(); + TableDescription tableDescription = + client.describeTable(conf.get(DynamoDBConstants.TABLE_NAME)); DynamoDBQueryFilter queryFilter = pushdown.predicateToDynamoDBFilter( - schema, hiveDynamoDBMapping, hiveTypeMapping, filterExpr); + tableDescription.getKeySchema(), + tableDescription.getLocalSecondaryIndexes(), + tableDescription.getGlobalSecondaryIndexes(), + hiveDynamoDBMapping, hiveTypeMapping, filterExpr); return queryFilter; } diff --git a/emr-dynamodb-hive/src/test/java/org/apache/hadoop/hive/dynamodb/filter/DynamoDBFilterPushdownTest.java b/emr-dynamodb-hive/src/test/java/org/apache/hadoop/hive/dynamodb/filter/DynamoDBFilterPushdownTest.java new file mode 100644 index 00000000..957b15de --- /dev/null +++ b/emr-dynamodb-hive/src/test/java/org/apache/hadoop/hive/dynamodb/filter/DynamoDBFilterPushdownTest.java @@ -0,0 +1,511 @@ +package org.apache.hadoop.hive.dynamodb.filter; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.Condition; +import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndexDescription; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndexDescription; +import com.amazonaws.services.dynamodbv2.model.Projection; +import com.amazonaws.services.dynamodbv2.model.ProjectionType; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.dynamodb.filter.DynamoDBFilterOperator; +import org.apache.hadoop.dynamodb.filter.DynamoDBQueryFilter; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class DynamoDBFilterPushdownTest { + + private DynamoDBFilterPushdown dynamoDBFilterPushdown; + + private static final String HASH_KEY_NAME = "hashKey"; + private static final String HASH_KEY_TYPE = "HASH"; + private static final String HASH_KEY_VALUE = "1"; + private static final String RANGE_KEY_NAME = "rangeKey"; + private static final String RANGE_KEY_TYPE = "RANGE"; + private static final String RANGE_KEY_VALUE = "2"; + private static final String COLUMN1_NAME = "column1"; + private static final String COLUMN1_VALUE = "3"; + private static final String COLUMN2_NAME = "column2"; + private static final String COLUMN2_VALUE = "4"; + private static final String COLUMN3_NAME = "column3"; + private static final String COLUMN4_NAME = "column4"; + private static final String LOCAL_SECONDARY_INDEX_NAME = "LSI"; + private static final String GLOBAL_SECONDARY_INDEX_NAME = "GSI"; + + private final ExprNodeDesc hashKeyPredicate = new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, + new GenericUDFOPEqual(), Lists.newArrayList( + new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, HASH_KEY_NAME, null, false), + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, HASH_KEY_VALUE) + )); + private final ExprNodeDesc rangeKeyPredicate = new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, + new GenericUDFOPEqual(), Lists.newArrayList( + new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, RANGE_KEY_NAME, null, false), + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, RANGE_KEY_VALUE) + )); + private final ExprNodeDesc column1Predicate = new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, + new GenericUDFOPEqual(), Lists.newArrayList( + new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, COLUMN1_NAME, null, false), + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, COLUMN1_VALUE) + )); + private final ExprNodeDesc column2Predicate = new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, + new GenericUDFOPEqual(), Lists.newArrayList( + new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, COLUMN2_NAME, null, false), + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, COLUMN2_VALUE) + )); + + private final List tableKeySchema = + createKeySchema(HASH_KEY_NAME, RANGE_KEY_NAME); + private final Map hiveDynamoDBMapping = initHiveDynamoDBMapping(); + private final Map hiveTypeMapping = initHiveTypeMapping(); + + @Before + public void setup() { + dynamoDBFilterPushdown = new DynamoDBFilterPushdown(); + } + + @Test + public void testPushPredicate() { + assertPushablePredicate(serdeConstants.DOUBLE_TYPE_NAME, + new ExprNodeConstantDesc(TypeInfoFactory.doubleTypeInfo, 1.0)); + assertPushablePredicate(serdeConstants.BIGINT_TYPE_NAME, + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 1)); + assertPushablePredicate(serdeConstants.STRING_TYPE_NAME, + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "1")); + assertPushablePredicate(serdeConstants.BINARY_TYPE_NAME, + new ExprNodeConstantDesc(TypeInfoFactory.binaryTypeInfo, new byte[]{1})); + + assertUnpushablePredicate(serdeConstants.BOOLEAN_TYPE_NAME, + new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, true)); + assertUnpushablePredicate(serdeConstants.LIST_TYPE_NAME, + new ExprNodeConstantDesc(TypeInfoFactory.getListTypeInfo(TypeInfoFactory.longTypeInfo), + Lists.newArrayList(1))); + } + + private void assertPushablePredicate(String hiveType, ExprNodeDesc constantDesc) { + Map hiveTypeMapping = new HashMap<>(); + hiveTypeMapping.put("column", hiveType); + + ExprNodeDesc predicate = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + new GenericUDFOPEqual(), Lists.newArrayList( + new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "column", null, false), + constantDesc + )); + + DecomposedPredicate decomposedPredicate = + dynamoDBFilterPushdown.pushPredicate(hiveTypeMapping, predicate); + Assert.assertEquals(decomposedPredicate.pushedPredicate.toString(), + predicate.toString()); + } + + private void assertUnpushablePredicate(String hiveType, ExprNodeDesc constantDesc) { + Map hiveTypeMapping = new HashMap<>(); + hiveTypeMapping.put("column", hiveType); + + ExprNodeDesc predicate = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + new GenericUDFOPEqual(), Lists.newArrayList( + new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "column", null, false), + constantDesc + )); + + DecomposedPredicate decomposedPredicate = + dynamoDBFilterPushdown.pushPredicate(hiveTypeMapping, predicate); + Assert.assertNull(decomposedPredicate); + } + + @Test + public void testPushPredicateDoubleTypeNotInHiveTypeMappingNotPushed() { + Map hiveTypeMapping = new HashMap<>(); + + ExprNodeDesc predicate = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + new GenericUDFOPEqual(), Lists.newArrayList( + new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "column", null, false), + new ExprNodeConstantDesc(TypeInfoFactory.doubleTypeInfo, 1.0) + )); + + DecomposedPredicate decomposedPredicate = + dynamoDBFilterPushdown.pushPredicate(hiveTypeMapping, predicate); + Assert.assertNull(decomposedPredicate); + } + + @Test + public void testPredicateToDynamoDBFilterWithNoIndexesAndNoHashKey() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + rangeKeyPredicate, + column2Predicate)); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, null, null, + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertNull(dynamoDBQueryFilter.getIndex()); + Assert.assertEquals(0, dynamoDBQueryFilter.getKeyConditions().size()); + } + + @Test + public void testPredicateToDynamoDBFilterWithNoIndexesAndTableOnlyHasHashKey() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + hashKeyPredicate, + column1Predicate)); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, null, null, + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertNull(dynamoDBQueryFilter.getIndex()); + Assert.assertEquals(1, dynamoDBQueryFilter.getKeyConditions().size()); + assertKeyCondition(HASH_KEY_NAME, HASH_KEY_VALUE, dynamoDBQueryFilter); + } + + @Test + public void testPredicateToDynamoDBFilterWithHashKeyAndRangeKey() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + hashKeyPredicate, + rangeKeyPredicate, + column1Predicate, + column2Predicate)); + + GlobalSecondaryIndexDescription gsi = createGSI(GLOBAL_SECONDARY_INDEX_NAME, + COLUMN1_NAME, COLUMN2_NAME, ProjectionType.ALL.toString(), null); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, null, Lists.newArrayList(gsi), + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertNull(dynamoDBQueryFilter.getIndex()); + Assert.assertEquals(2, dynamoDBQueryFilter.getKeyConditions().size()); + assertKeyCondition(HASH_KEY_NAME, HASH_KEY_VALUE, dynamoDBQueryFilter); + assertKeyCondition(RANGE_KEY_NAME, RANGE_KEY_VALUE, dynamoDBQueryFilter); + } + + @Test + public void testPredicateToDynamoDBFilterWithLSIAllProjection() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + hashKeyPredicate, + column1Predicate, + column2Predicate)); + + LocalSecondaryIndexDescription lsi = createLSI(LOCAL_SECONDARY_INDEX_NAME, + HASH_KEY_NAME, COLUMN1_NAME, ProjectionType.ALL.toString(), null); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, Lists.newArrayList(lsi), null, + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertEquals(LOCAL_SECONDARY_INDEX_NAME, dynamoDBQueryFilter.getIndex().getIndexName()); + Assert.assertEquals(2, dynamoDBQueryFilter.getKeyConditions().size()); + assertKeyCondition(HASH_KEY_NAME, HASH_KEY_VALUE, dynamoDBQueryFilter); + assertKeyCondition(COLUMN1_NAME, COLUMN1_VALUE, dynamoDBQueryFilter); + } + + @Test + public void testPredicateToDynamoDBFilterWithLSIKeysOnlyProjection() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + hashKeyPredicate, + column1Predicate, + column2Predicate)); + + LocalSecondaryIndexDescription lsi = createLSI(LOCAL_SECONDARY_INDEX_NAME, + HASH_KEY_NAME, COLUMN1_NAME, ProjectionType.KEYS_ONLY.toString(), null); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, Lists.newArrayList(lsi), null, + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertNull(dynamoDBQueryFilter.getIndex()); + Assert.assertEquals(1, dynamoDBQueryFilter.getKeyConditions().size()); + } + + @Test + public void testPredicateToDynamoDBFilterWithLSIIncludeProjectionNotContainAllMapping() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + hashKeyPredicate, + column1Predicate, + column2Predicate)); + + LocalSecondaryIndexDescription lsi = createLSI(LOCAL_SECONDARY_INDEX_NAME, + HASH_KEY_NAME, COLUMN1_NAME, ProjectionType.INCLUDE.toString(), + Lists.newArrayList(COLUMN2_NAME)); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, Lists.newArrayList(lsi), null, + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertNull(dynamoDBQueryFilter.getIndex()); + Assert.assertEquals(1, dynamoDBQueryFilter.getKeyConditions().size()); + } + + @Test + public void testPredicateToDynamoDBFilterWithLSIIncludeProjectionContainAllMapping() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + hashKeyPredicate, + column1Predicate, + column2Predicate)); + + LocalSecondaryIndexDescription lsi = createLSI(LOCAL_SECONDARY_INDEX_NAME, + HASH_KEY_NAME, COLUMN1_NAME, ProjectionType.INCLUDE.toString(), + Lists.newArrayList(COLUMN2_NAME, COLUMN3_NAME, COLUMN4_NAME)); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, Lists.newArrayList(lsi), null, + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertEquals(LOCAL_SECONDARY_INDEX_NAME, dynamoDBQueryFilter.getIndex().getIndexName()); + Assert.assertEquals(2, dynamoDBQueryFilter.getKeyConditions().size()); + assertKeyCondition(HASH_KEY_NAME, HASH_KEY_VALUE, dynamoDBQueryFilter); + assertKeyCondition(COLUMN1_NAME, COLUMN1_VALUE, dynamoDBQueryFilter); + } + + @Test + public void testPredicateToDynamoDBFilterWithHashKeyAndMismatchLSI() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + hashKeyPredicate, + column2Predicate)); + + LocalSecondaryIndexDescription lsi = createLSI(LOCAL_SECONDARY_INDEX_NAME, + HASH_KEY_NAME, COLUMN1_NAME, ProjectionType.ALL.toString(), null); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, Lists.newArrayList(lsi), null, + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertNull(dynamoDBQueryFilter.getIndex()); + Assert.assertEquals(1, dynamoDBQueryFilter.getKeyConditions().size()); + assertKeyCondition(HASH_KEY_NAME, HASH_KEY_VALUE, dynamoDBQueryFilter); + } + + @Test + public void testPredicateToDynamoDBFilterWithGSIAllProjection() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + rangeKeyPredicate, + column1Predicate, + column2Predicate)); + + GlobalSecondaryIndexDescription gsi = createGSI(GLOBAL_SECONDARY_INDEX_NAME, + COLUMN1_NAME, COLUMN2_NAME, ProjectionType.ALL.toString(), null); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, null, Lists.newArrayList(gsi), + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertEquals(GLOBAL_SECONDARY_INDEX_NAME, dynamoDBQueryFilter.getIndex().getIndexName()); + Assert.assertEquals(2, dynamoDBQueryFilter.getKeyConditions().size()); + assertKeyCondition(COLUMN1_NAME, COLUMN1_VALUE, dynamoDBQueryFilter); + assertKeyCondition(COLUMN2_NAME, COLUMN2_VALUE, dynamoDBQueryFilter); + } + + @Test + public void testPredicateToDynamoDBFilterWithGSIKeysOnlyProjection() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + rangeKeyPredicate, + column1Predicate, + column2Predicate)); + + GlobalSecondaryIndexDescription gsi = createGSI(GLOBAL_SECONDARY_INDEX_NAME, + COLUMN1_NAME, COLUMN2_NAME, ProjectionType.KEYS_ONLY.toString(), null); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, null, Lists.newArrayList(gsi), + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertNull(dynamoDBQueryFilter.getIndex()); + Assert.assertEquals(0, dynamoDBQueryFilter.getKeyConditions().size()); + } + + @Test + public void testPredicateToDynamoDBFilterWithGSIIncludeProjectionNotContainAllMapping() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + rangeKeyPredicate, + column1Predicate, + column2Predicate)); + + GlobalSecondaryIndexDescription gsi = createGSI(GLOBAL_SECONDARY_INDEX_NAME, + COLUMN1_NAME, COLUMN2_NAME, ProjectionType.INCLUDE.toString(), + Lists.newArrayList(COLUMN3_NAME)); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, null, Lists.newArrayList(gsi), + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertNull(dynamoDBQueryFilter.getIndex()); + Assert.assertEquals(0, dynamoDBQueryFilter.getKeyConditions().size()); + } + + @Test + public void testPredicateToDynamoDBFilterWithGSIIncludeProjectionContainAllMapping() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + rangeKeyPredicate, + column1Predicate, + column2Predicate)); + + GlobalSecondaryIndexDescription gsi = createGSI(GLOBAL_SECONDARY_INDEX_NAME, + COLUMN1_NAME, COLUMN2_NAME, ProjectionType.INCLUDE.toString(), + Lists.newArrayList(COLUMN3_NAME, COLUMN4_NAME)); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, null, Lists.newArrayList(gsi), + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertEquals(GLOBAL_SECONDARY_INDEX_NAME, dynamoDBQueryFilter.getIndex().getIndexName()); + Assert.assertEquals(2, dynamoDBQueryFilter.getKeyConditions().size()); + assertKeyCondition(COLUMN1_NAME, COLUMN1_VALUE, dynamoDBQueryFilter); + assertKeyCondition(COLUMN2_NAME, COLUMN2_VALUE, dynamoDBQueryFilter); + } + + @Test + public void testPredicateToDynamoDBFilterWithMultipleGSIsHavingTheSameHashKey() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + rangeKeyPredicate, + column1Predicate)); + + GlobalSecondaryIndexDescription gsi1 = createGSI(GLOBAL_SECONDARY_INDEX_NAME + "1", + COLUMN1_NAME, COLUMN2_NAME, ProjectionType.ALL.toString(), null); + GlobalSecondaryIndexDescription gsi2 = createGSI(GLOBAL_SECONDARY_INDEX_NAME + "2", + COLUMN1_NAME, RANGE_KEY_NAME, ProjectionType.ALL.toString(), null); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, null, Lists.newArrayList(gsi1, gsi2), + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertEquals(GLOBAL_SECONDARY_INDEX_NAME + "2", + dynamoDBQueryFilter.getIndex().getIndexName()); + Assert.assertEquals(2, dynamoDBQueryFilter.getKeyConditions().size()); + assertKeyCondition(COLUMN1_NAME, COLUMN1_VALUE, dynamoDBQueryFilter); + assertKeyCondition(RANGE_KEY_NAME, RANGE_KEY_VALUE, dynamoDBQueryFilter); + } + + @Test + public void testPredicateToDynamoDBFilterWithGSIHashKeyOnly() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + rangeKeyPredicate, + column1Predicate)); + + GlobalSecondaryIndexDescription gsi = createGSI(GLOBAL_SECONDARY_INDEX_NAME, + COLUMN1_NAME, COLUMN2_NAME, ProjectionType.ALL.toString(), null); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, null, Lists.newArrayList(gsi), + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertNull(dynamoDBQueryFilter.getIndex()); + Assert.assertEquals(0, dynamoDBQueryFilter.getKeyConditions().size()); + } + + @Test + public void testPredicateToDynamoDBFilterWithoutAnyMatchedGSI() { + ExprNodeDesc combinedPredicate = buildPredicate(Lists.newArrayList( + rangeKeyPredicate, + column2Predicate)); + + GlobalSecondaryIndexDescription gsi = createGSI(GLOBAL_SECONDARY_INDEX_NAME, + COLUMN1_NAME, COLUMN2_NAME, ProjectionType.ALL.toString(), null); + + DynamoDBQueryFilter dynamoDBQueryFilter = dynamoDBFilterPushdown.predicateToDynamoDBFilter( + tableKeySchema, null, Lists.newArrayList(gsi), + hiveDynamoDBMapping, hiveTypeMapping, combinedPredicate); + + Assert.assertNull(dynamoDBQueryFilter.getIndex()); + Assert.assertEquals(0, dynamoDBQueryFilter.getKeyConditions().size()); + } + + private ExprNodeDesc buildPredicate(List predicates) { + ExprNodeDesc combinedPredicate = null; + for (ExprNodeDesc predicate : predicates) { + if (combinedPredicate == null) { + combinedPredicate = predicate; + continue; + } + List children = new ArrayList<>(); + children.add(combinedPredicate); + children.add(predicate); + combinedPredicate = new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getGenericUDFForAnd(), + children); + } + return combinedPredicate; + } + + private List createKeySchema(String hashKeyName, String rangeKeyName) { + List schema = new ArrayList<>(); + schema.add(new KeySchemaElement(hashKeyName, HASH_KEY_TYPE)); + if (rangeKeyName != null) { + schema.add(new KeySchemaElement(rangeKeyName, RANGE_KEY_TYPE)); + } + return schema; + } + + private Map initHiveDynamoDBMapping() { + Map hiveDynamoDBMapping = new HashMap<>(); + hiveDynamoDBMapping.put(HASH_KEY_NAME, HASH_KEY_NAME); + hiveDynamoDBMapping.put(RANGE_KEY_NAME, RANGE_KEY_NAME); + hiveDynamoDBMapping.put(COLUMN1_NAME, COLUMN1_NAME); + hiveDynamoDBMapping.put(COLUMN2_NAME, COLUMN2_NAME); + hiveDynamoDBMapping.put(COLUMN3_NAME, COLUMN3_NAME); + hiveDynamoDBMapping.put(COLUMN4_NAME, COLUMN4_NAME); + return hiveDynamoDBMapping; + } + + private Map initHiveTypeMapping() { + Map hiveTypeMapping = new HashMap<>(); + hiveTypeMapping.put(HASH_KEY_NAME, serdeConstants.STRING_TYPE_NAME); + hiveTypeMapping.put(RANGE_KEY_NAME, serdeConstants.STRING_TYPE_NAME); + hiveTypeMapping.put(COLUMN1_NAME, serdeConstants.STRING_TYPE_NAME); + hiveTypeMapping.put(COLUMN2_NAME, serdeConstants.STRING_TYPE_NAME); + hiveTypeMapping.put(COLUMN3_NAME, serdeConstants.STRING_TYPE_NAME); + hiveTypeMapping.put(COLUMN4_NAME, serdeConstants.STRING_TYPE_NAME); + return hiveTypeMapping; + } + + private GlobalSecondaryIndexDescription createGSI(String indexName, String hashKeyName, + String rangeKeyName, String projectionType, List nonKeyAttributes) { + List schema = createKeySchema(hashKeyName, rangeKeyName); + Projection projection = new Projection() + .withProjectionType(projectionType) + .withNonKeyAttributes(nonKeyAttributes); + return new GlobalSecondaryIndexDescription() + .withIndexName(indexName) + .withKeySchema(schema) + .withProjection(projection); + } + + private LocalSecondaryIndexDescription createLSI(String indexName, String hashKeyName, + String rangeKeyName, String projectionType, List nonKeyAttributes) { + List schema = createKeySchema(hashKeyName, rangeKeyName); + Projection projection = new Projection() + .withProjectionType(projectionType) + .withNonKeyAttributes(nonKeyAttributes); + return new LocalSecondaryIndexDescription() + .withIndexName(indexName) + .withKeySchema(schema) + .withProjection(projection); + } + + private void assertKeyCondition(String columnName, String columnValue, + DynamoDBQueryFilter filter) { + Condition hashKeyCondition = new Condition(); + List hashKeyAttributeValueList = new ArrayList<>(); + hashKeyAttributeValueList.add(new AttributeValue(columnValue)); + hashKeyCondition.setAttributeValueList(hashKeyAttributeValueList); + hashKeyCondition.setComparisonOperator(DynamoDBFilterOperator.EQ.getDynamoDBName()); + Assert.assertEquals((hashKeyCondition.toString()), + filter.getKeyConditions().get(columnName).toString()); + } +}