Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added lookup operator tests #2

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/org/opensearch/sql/executor/Explain.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ public ExplainResponseNode visitLookup(LookupOperator node, Object context) {
explainNode ->
explainNode.setDescription(
ImmutableMap.of(
"copyfields", node.getCopyFieldMap(),
"matchfields", node.getMatchFieldMap(),
"copyfields", node.getCopyFieldMap().toString(),
"matchfields", node.getMatchFieldMap().toString(),
"indexname", node.getIndexName(),
"appendonly", node.getAppendOnly())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.expression.ReferenceExpression;

/** Lookup operator. Perform lookup on another OpenSearch index and enrich the results. */
@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
public class LookupOperator extends PhysicalPlan {
@Getter private final PhysicalPlan input;
@Getter private final String indexName;
@Getter private final Map<ReferenceExpression, ReferenceExpression> matchFieldMap;
@Getter private final Map<ReferenceExpression, ReferenceExpression> copyFieldMap;
@Getter private final Boolean appendOnly;

@EqualsAndHashCode.Exclude
private final BiFunction<String, Map<String, Object>, Map<String, Object>> lookup;

/** Lookup Constructor. */
Expand Down Expand Up @@ -96,37 +100,29 @@ public ExprValue next() {
finalMap.put("_copy", copyMap.keySet());
}

Map<String, Object> source = lookup.apply(indexName, finalMap);
Map<String, Object> lookupResult = lookup.apply(indexName, finalMap);

if (source == null || source.isEmpty()) {
if (lookupResult == null || lookupResult.isEmpty()) {
// no lookup found or lookup is empty, so we just return the original input value
return inputValue;
}

Map<String, ExprValue> tupleValue = ExprValueUtils.getTupleValue(inputValue);
Map<String, ExprValue> resultBuilder = new HashMap<>();
resultBuilder.putAll(tupleValue);

if (appendOnly) {

for (Map.Entry<String, Object> sourceField : source.entrySet()) {
String u = copyMap.get(sourceField.getKey());
resultBuilder.putIfAbsent(
u == null ? sourceField.getKey() : u.toString(),
ExprValueUtils.fromObjectValue(sourceField.getValue()));
}
} else {
// default

for (Map.Entry<String, Object> sourceField : source.entrySet()) {
String u = copyMap.get(sourceField.getKey());
resultBuilder.put(
u == null ? sourceField.getKey() : u.toString(),
ExprValueUtils.fromObjectValue(sourceField.getValue()));
Map<String, ExprValue> tupleInputValue = ExprValueUtils.getTupleValue(inputValue);
Map<String, ExprValue> resultTupleBuilder = new HashMap<>();
resultTupleBuilder.putAll(tupleInputValue);
for (Map.Entry<String, Object> sourceOfAdditionalField : lookupResult.entrySet()) {
String lookedUpFieldName = sourceOfAdditionalField.getKey();
Object lookedUpFieldValue = sourceOfAdditionalField.getValue();
String finalFieldName = copyMap.getOrDefault(lookedUpFieldName, lookedUpFieldName);
ExprValue value = ExprValueUtils.fromObjectValue(lookedUpFieldValue);
if (appendOnly) {
resultTupleBuilder.putIfAbsent(finalFieldName, value);
} else {
resultTupleBuilder.put(finalFieldName, value);
}
}

return ExprTupleValue.fromExprValueMap(resultBuilder);
return ExprTupleValue.fromExprValueMap(resultTupleBuilder);

} else {
return inputValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.tree.RareTopN.CommandType;
Expand Down Expand Up @@ -83,16 +84,10 @@ public static LookupOperator lookup(
String indexName,
Map<ReferenceExpression, ReferenceExpression> matchFieldMap,
Boolean appendOnly,
Map<ReferenceExpression, ReferenceExpression> copyFieldMap) {
Map<ReferenceExpression, ReferenceExpression> copyFieldMap,
BiFunction<String, Map<String, Object>, Map<String, Object>> lookupFunction) {
return new LookupOperator(
input,
indexName,
matchFieldMap,
appendOnly,
copyFieldMap,
(a, b) -> {
throw new RuntimeException("not implemented by PhysicalPlanDSL");
});
input, indexName, matchFieldMap, appendOnly, copyFieldMap, lookupFunction);
}

public WindowOperator window(
Expand Down
28 changes: 28 additions & 0 deletions core/src/test/java/org/opensearch/sql/executor/ExplainTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.eval;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.filter;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.limit;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.lookup;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.nested;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.project;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.rareTopN;
Expand Down Expand Up @@ -235,6 +236,33 @@ void can_explain_nested() {
explain.apply(plan));
}

@Test
void can_explain_lookup() {
PhysicalPlan plan =
lookup(
tableScan,
"lookup_index_name",
Map.of(ref("lookup_index_field", STRING), ref("query_index_field", STRING)),
true,
Map.of(ref("lookup_index_field_name", STRING), ref("renamed_field", STRING)),
null);
assertEquals(
new ExplainResponse(
new ExplainResponseNode(
"LookupOperator",
Map.of(
"copyfields",
"{lookup_index_field_name=renamed_field}",
"matchfields",
"{lookup_index_field=query_index_field}",
"indexname",
"lookup_index_name",
"appendonly",
true),
singletonList(tableScan.explainNode()))),
explain.apply(plan));
}

private static class FakeTableScan extends TableScanOperator {
@Override
public boolean hasNext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.eval;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.filter;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.limit;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.lookup;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.nested;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.project;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.rareTopN;
Expand Down Expand Up @@ -122,22 +123,27 @@ public void visit_should_return_default_physical_operator() {
nested(
limit(
LogicalPlanDSL.dedupe(
rareTopN(
sort(
eval(
remove(
rename(
aggregation(
filter(values(emptyList()), filterExpr),
aggregators,
groupByExprs),
mappings),
exclude),
newEvalField),
sortField),
CommandType.TOP,
topByExprs,
rareTopNField),
lookup(
rareTopN(
sort(
eval(
remove(
rename(
aggregation(
filter(values(emptyList()), filterExpr),
aggregators,
groupByExprs),
mappings),
exclude),
newEvalField),
sortField),
CommandType.TOP,
topByExprs,
rareTopNField),
"lookup_index_name",
Map.of(),
false,
Map.of()),
dedupeField),
limit,
offset),
Expand All @@ -152,24 +158,30 @@ public void visit_should_return_default_physical_operator() {
PhysicalPlanDSL.nested(
PhysicalPlanDSL.limit(
PhysicalPlanDSL.dedupe(
PhysicalPlanDSL.rareTopN(
PhysicalPlanDSL.sort(
PhysicalPlanDSL.eval(
PhysicalPlanDSL.remove(
PhysicalPlanDSL.rename(
PhysicalPlanDSL.agg(
PhysicalPlanDSL.filter(
PhysicalPlanDSL.values(emptyList()),
filterExpr),
aggregators,
groupByExprs),
mappings),
exclude),
newEvalField),
sortField),
CommandType.TOP,
topByExprs,
rareTopNField),
PhysicalPlanDSL.lookup(
PhysicalPlanDSL.rareTopN(
PhysicalPlanDSL.sort(
PhysicalPlanDSL.eval(
PhysicalPlanDSL.remove(
PhysicalPlanDSL.rename(
PhysicalPlanDSL.agg(
PhysicalPlanDSL.filter(
PhysicalPlanDSL.values(emptyList()),
filterExpr),
aggregators,
groupByExprs),
mappings),
exclude),
newEvalField),
sortField),
CommandType.TOP,
topByExprs,
rareTopNField),
"lookup_index_name",
Map.of(),
false,
Map.of(),
null),
dedupeField),
limit,
offset),
Expand Down Expand Up @@ -278,4 +290,22 @@ public void visitPaginate_should_remove_it_from_tree() {
new ProjectOperator(new ValuesOperator(List.of(List.of())), List.of(), List.of());
assertEquals(physicalPlanTree, logicalPlanTree.accept(implementor, null));
}

@Test
public void visitLookup_should_build_LookupOperator() {
LogicalPlan values = values(List.of(DSL.literal("to be or not to be")));
var logicalPlan = lookup(values, "lookup_index_name", Map.of(), false, Map.of());
var expectedPhysicalPlan =
PhysicalPlanDSL.lookup(
new ValuesOperator(List.of(List.of(DSL.literal("to be or not to be")))),
"lookup_index_name",
Map.of(),
false,
Map.of(),
null);

PhysicalPlan lookupOperator = logicalPlan.accept(implementor, null);

assertEquals(expectedPhysicalPlan, lookupOperator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public TableWriteOperator build(PhysicalPlan child) {
LogicalPlan ad = new LogicalAD(relation, Map.of());
LogicalPlan ml = new LogicalML(relation, Map.of());
LogicalPlan paginate = new LogicalPaginate(42, List.of(relation));
LogicalPlan lookup = new LogicalLookup(relation, "lookup_index", Map.of(), true, Map.of());

List<Map<String, ReferenceExpression>> nestedArgs =
List.of(
Expand Down Expand Up @@ -163,7 +164,8 @@ public TableWriteOperator build(PhysicalPlan child) {
paginate,
nested,
cursor,
closeCursor)
closeCursor,
lookup)
.map(Arguments::of);
}

Expand Down Expand Up @@ -214,5 +216,14 @@ public Integer visitRareTopN(LogicalRareTopN plan, Object context) {
.mapToInt(Integer::intValue)
.sum();
}

@Override
public Integer visitLookup(LogicalLookup plan, Object context) {
return 1
+ plan.getChild().stream()
.map(child -> child.accept(this, context))
.mapToInt(Integer::intValue)
.sum();
}
}
}
Loading
Loading