Skip to content

Commit

Permalink
Remove ConnectorJoinNode from the SPI
Browse files Browse the repository at this point in the history
With JoinNode moved to the SPI, ConnectorJoinNode is no longer
necessary.  Remove it and replace references with JoinNode.
  • Loading branch information
rschlussel committed Nov 13, 2024
1 parent 84e3c18 commit 90354c9
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import com.facebook.presto.spi.function.FunctionHandle;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.Assignments;
import com.facebook.presto.spi.plan.ConnectorJoinNode;
import com.facebook.presto.spi.plan.EquiJoinClause;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.spi.plan.JoinType;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
Expand Down Expand Up @@ -248,14 +248,19 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
table,
deleteGroupInfo);

parentNode = new ConnectorJoinNode(idAllocator.getNextId(),
Arrays.asList(parentNode, deleteTableScan),
parentNode = new JoinNode(
Optional.empty(),
idAllocator.getNextId(),
JoinType.LEFT,
clauses,
Sets.newHashSet(versionFilter),
Optional.empty(), // Allow stats to determine join distribution
Stream.concat(parentNode.getOutputVariables().stream(), deleteTableScan.getOutputVariables().stream()).collect(Collectors.toList()));
parentNode,
deleteTableScan,
ImmutableList.copyOf(clauses),
Stream.concat(parentNode.getOutputVariables().stream(), deleteTableScan.getOutputVariables().stream()).collect(Collectors.toList()),
Optional.of(versionFilter),
Optional.empty(),
Optional.empty(),
Optional.empty(), // Allow stats to determine join distribution,
ImmutableMap.of());
}

FilterNode filter = new FilterNode(Optional.empty(), idAllocator.getNextId(), Optional.empty(), parentNode,
Expand All @@ -270,8 +275,8 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
}

private static ImmutableMap<Set<Integer>, DeleteSetInfo> collectDeleteInformation(Table icebergTable,
TupleDomain<IcebergColumnHandle> predicate,
long snapshotId)
TupleDomain<IcebergColumnHandle> predicate,
long snapshotId)
{
// Delete schemas can repeat, so using a normal hashmap to dedup, will be converted to immutable at the end of the function.
HashMap<Set<Integer>, DeleteSetInfo> deleteInformations = new HashMap<>();
Expand Down Expand Up @@ -478,17 +483,17 @@ public ImmutableMap<Integer, PartitionFieldInfo> getPartitionFields()
public List<Types.NestedField> allFields(Schema schema)
{
return Stream.concat(equalityFieldIds
.stream()
.map(schema::findField),
partitionFields
.values()
.stream()
.map(partitionFieldInfo -> {
if (partitionFieldInfo.partitionField.transform().isIdentity()) {
return schema.findField(partitionFieldInfo.partitionField.sourceId());
}
return partitionFieldInfo.nestedField;
}))
.stream()
.map(schema::findField),
partitionFields
.values()
.stream()
.map(partitionFieldInfo -> {
if (partitionFieldInfo.partitionField.transform().isIdentity()) {
return schema.findField(partitionFieldInfo.partitionField.sourceId());
}
return partitionFieldInfo.nestedField;
}))
.collect(Collectors.toList());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.expressions.LogicalRowExpressions;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.ConnectorJoinNode;
import com.facebook.presto.spi.plan.CteConsumerNode;
import com.facebook.presto.spi.plan.CteProducerNode;
import com.facebook.presto.spi.plan.CteReferenceNode;
Expand All @@ -40,8 +38,6 @@
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.spi.plan.ValuesNode;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -162,8 +158,6 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider
"the connector optimizer from %s returns a node that does not cover all output before optimization",
connectorId);

newNode = SimplePlanRewriter.rewriteWith(new ConnectorToInternalJoinRewriter(), newNode);

updates.put(node, newNode);
}
}
Expand Down Expand Up @@ -310,26 +304,4 @@ private static <T> boolean containsAll(Set<T> container, Collection<T> test)
}
return true;
}

private static class ConnectorToInternalJoinRewriter
extends SimplePlanRewriter<Void>
{
@Override
public PlanNode visitConnectorJoinNode(ConnectorJoinNode node, RewriteContext<Void> context)
{
return new JoinNode(node.getSourceLocation(),
node.getId(),
node.getStatsEquivalentPlanNode(),
node.getType(),
context.rewrite(node.getSources().get(0)),
context.rewrite(node.getSources().get(1)),
ImmutableList.copyOf(node.getCriteria()),
node.getOutputVariables(),
node.getFilters().isEmpty() ? Optional.empty() : Optional.of(LogicalRowExpressions.and(node.getFilters())),
Optional.empty(),
Optional.empty(),
node.getDistributionType(),
ImmutableMap.of());
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ public R visitCteConsumer(CteConsumerNode node, C context)
return visitPlan(node, context);
}

public R visitConnectorJoinNode(ConnectorJoinNode node, C context)
{
return visitPlan(node, context);
}

public R visitWindow(WindowNode node, C context)
{
return visitPlan(node, context);
Expand Down

0 comments on commit 90354c9

Please sign in to comment.