Skip to content

Commit

Permalink
Tests related mainly to OpenSearchIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-soszynski-eliatra committed Jul 18, 2024
1 parent 0c4e488 commit 51ce97e
Show file tree
Hide file tree
Showing 5 changed files with 488 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.eval;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.filter;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.limit;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.lookup;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.nested;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.project;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.rareTopN;
Expand All @@ -48,7 +47,6 @@
import org.opensearch.sql.ast.tree.RareTopN.CommandType;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.data.model.ExprBooleanValue;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.executor.pagination.PlanSerializer;
import org.opensearch.sql.expression.DSL;
Expand All @@ -60,7 +58,6 @@
import org.opensearch.sql.expression.window.WindowDefinition;
import org.opensearch.sql.expression.window.ranking.RowNumberFunction;
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
import org.opensearch.sql.planner.logical.LogicalLookup;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanDSL;
Expand Down Expand Up @@ -125,27 +122,22 @@ public void visit_should_return_default_physical_operator() {
nested(
limit(
LogicalPlanDSL.dedupe(
lookup(
rareTopN(
sort(
eval(
remove(
rename(
aggregation(
filter(values(emptyList()), filterExpr),
aggregators,
groupByExprs),
mappings),
exclude),
newEvalField),
sortField),
CommandType.TOP,
topByExprs,
rareTopNField),
"lookup_index_name",
Map.of(),
false,
Map.of()),
rareTopN(
sort(
eval(
remove(
rename(
aggregation(
filter(values(emptyList()), filterExpr),
aggregators,
groupByExprs),
mappings),
exclude),
newEvalField),
sortField),
CommandType.TOP,
topByExprs,
rareTopNField),
dedupeField),
limit,
offset),
Expand All @@ -160,30 +152,24 @@ public void visit_should_return_default_physical_operator() {
PhysicalPlanDSL.nested(
PhysicalPlanDSL.limit(
PhysicalPlanDSL.dedupe(
PhysicalPlanDSL.lookup(
PhysicalPlanDSL.rareTopN(
PhysicalPlanDSL.sort(
PhysicalPlanDSL.eval(
PhysicalPlanDSL.remove(
PhysicalPlanDSL.rename(
PhysicalPlanDSL.agg(
PhysicalPlanDSL.filter(
PhysicalPlanDSL.values(emptyList()),
filterExpr),
aggregators,
groupByExprs),
mappings),
exclude),
newEvalField),
sortField),
CommandType.TOP,
topByExprs,
rareTopNField),
"lookup_index_name",
Map.of(),
false,
Map.of(),
null),
PhysicalPlanDSL.rareTopN(
PhysicalPlanDSL.sort(
PhysicalPlanDSL.eval(
PhysicalPlanDSL.remove(
PhysicalPlanDSL.rename(
PhysicalPlanDSL.agg(
PhysicalPlanDSL.filter(
PhysicalPlanDSL.values(emptyList()),
filterExpr),
aggregators,
groupByExprs),
mappings),
exclude),
newEvalField),
sortField),
CommandType.TOP,
topByExprs,
rareTopNField),
dedupeField),
limit,
offset),
Expand Down Expand Up @@ -292,37 +278,4 @@ public void visitPaginate_should_remove_it_from_tree() {
new ProjectOperator(new ValuesOperator(List.of(List.of())), List.of(), List.of());
assertEquals(physicalPlanTree, logicalPlanTree.accept(implementor, null));
}

@Test
public void visitLookup_should_build_LookupOperator() {
LogicalPlan values = values(List.of(DSL.literal("to be or not to be")));
var logicalPlan = lookup(values, "lookup_index_name", Map.of(), false, Map.of());
var expectedPhysicalPlan =
PhysicalPlanDSL.lookup(
new ValuesOperator(List.of(List.of(DSL.literal("to be or not to be")))),
"lookup_index_name",
Map.of(),
false,
Map.of(),
null);

PhysicalPlan lookupOperator = logicalPlan.accept(implementor, null);

assertEquals(expectedPhysicalPlan, lookupOperator);
}

@Test
public void visitLookup_should_throw_unsupportedOperationException() {
LogicalLookup input = mock(LogicalLookup.class);
LogicalPlan dataSource = mock(LogicalPlan.class);
PhysicalPlan physicalSource = mock(PhysicalPlan.class);
when(dataSource.accept(implementor, null)).thenReturn(physicalSource);
when(input.getChild()).thenReturn(List.of(dataSource));
PhysicalPlan lookupOperator = implementor.visitLookup(input, null);
when(physicalSource.next()).thenReturn(ExprValueUtils.tupleValue(Map.of("field", "value")));

var ex = assertThrows(UnsupportedOperationException.class, () -> lookupOperator.next());

assertEquals("Lookup not implemented by DefaultImplementor", ex.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.Nullable;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.MatchQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.data.type.ExprCoreType;
Expand Down Expand Up @@ -217,66 +221,79 @@ public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) {

@Override
public PhysicalPlan visitLookup(LogicalLookup node, OpenSearchIndexScan context) {
SingleRowQuery singleRowQuery = new SingleRowQuery(client);
return new LookupOperator(
visitChild(node, context),
node.getIndexName(),
node.getMatchFieldMap(),
node.getAppendOnly(),
node.getCopyFieldMap(),
lookup());
lookup(singleRowQuery));
}

BiFunction<String, Map<String, Object>, Map<String, Object>> lookup() {

if (client.getNodeClient() == null) {
throw new RuntimeException(
"Can not perform lookup because openSearchClient was null. This is likely a bug.");
}

BiFunction<String, Map<String, Object>, Map<String, Object>> lookup(
SingleRowQuery singleRowQuery) {
Objects.requireNonNull(singleRowQuery, "SingleRowQuery is required to perform lookup");
return (indexName, inputMap) -> {
Map<String, Object> matchMap = (Map<String, Object>) inputMap.get("_match");
Set<String> copySet = (Set<String>) inputMap.get("_copy");

BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

for (Map.Entry<String, Object> f : matchMap.entrySet()) {
BoolQueryBuilder orQueryBuilder = new BoolQueryBuilder();

// Todo: Search with term and a match query? Or terms only?
orQueryBuilder.should(new TermQueryBuilder(f.getKey(), f.getValue().toString()));
orQueryBuilder.should(new MatchQueryBuilder(f.getKey(), f.getValue().toString()));
orQueryBuilder.minimumShouldMatch(1);

// filter is the same as "must" but ignores scoring
boolQueryBuilder.filter(orQueryBuilder);
}

SearchResponse result =
client
.getNodeClient()
.search(
new SearchRequest(indexName)
.source(
SearchSourceBuilder.searchSource()
.fetchSource(
copySet == null ? null : copySet.toArray(new String[0]), null)
.query(boolQueryBuilder)
.size(2)))
.actionGet();

int hits = result.getHits().getHits().length;

if (hits == 0) {
// null indicates no hits for the lookup found
return null;
}

if (hits != 1) {
throw new RuntimeException("too many hits for " + indexName + " (" + hits + ")");
}

return result.getHits().getHits()[0].getSourceAsMap();
return singleRowQuery.executeQuery(indexName, matchMap, copySet);
};
}
}

static class SingleRowQuery {

private final NodeClient nodeClient;

public SingleRowQuery(OpenSearchClient openSearchClient) {
Objects.requireNonNull(openSearchClient, "Opensearch client is required to perform lookup");
this.nodeClient =
Objects.requireNonNull(
openSearchClient.getNodeClient(),
"Can not perform lookup because openSearchClient was null. This is likely a bug.");
}

public @Nullable Map<String, Object> executeQuery(
String indexName, Map<String, Object> matchMap, Set<String> copySet) {
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

for (Map.Entry<String, Object> f : matchMap.entrySet()) {
BoolQueryBuilder orQueryBuilder = new BoolQueryBuilder();

// Todo: Search with term and a match query? Or terms only?
orQueryBuilder.should(new TermQueryBuilder(f.getKey(), f.getValue().toString()));
orQueryBuilder.should(new MatchQueryBuilder(f.getKey(), f.getValue().toString()));
orQueryBuilder.minimumShouldMatch(1);

// filter is the same as "must" but ignores scoring
boolQueryBuilder.filter(orQueryBuilder);
}

SearchResponse result =
nodeClient
.search(
new SearchRequest(indexName)
.source(
SearchSourceBuilder.searchSource()
.fetchSource(
copySet == null ? null : copySet.toArray(new String[0]), null)
.query(boolQueryBuilder)
.size(2)))
.actionGet();

SearchHit[] searchHits = result.getHits().getHits();
int hits = searchHits.length;
if (hits == 0) {
// null indicates no hits for the lookup found
return null;
}

if (hits != 1) {
throw new RuntimeException("too many hits for " + indexName + " (" + hits + ")");
}

return searchHits[0].getSourceAsMap();
}
}
}
Loading

0 comments on commit 51ce97e

Please sign in to comment.