Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop getPreferredShuffleLayout methods from SPI #24106

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 37 additions & 71 deletions presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -2995,7 +2995,22 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(session, table);
if (!hiveBucketHandle.isPresent()) {
return Optional.empty();
if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || table.getPartitionColumns().isEmpty()) {
return Optional.empty();
}

// TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function)
HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
table.getPartitionColumns().stream()
.map(Column::getType)
.collect(toList()),
OptionalInt.empty());
List<String> partitionedBy = table.getPartitionColumns().stream()
.map(Column::getName)
.collect(toList());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}
HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty()
.orElseThrow(() -> new NoSuchElementException("Bucket property should be set"));
Expand Down Expand Up @@ -3031,40 +3046,6 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionColumns));
}

@Override
public Optional<ConnectorNewTableLayout> getPreferredShuffleLayoutForInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
SchemaTableName tableName = hiveTableHandle.getSchemaTableName();
MetastoreContext metastoreContext = getMetastoreContext(session);
Table table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(session, table);
if (hiveBucketHandle.isPresent()) {
// For bucketed table, table partitioning (i.e. the bucketing scheme) should be respected,
// and there is no additional preferred shuffle partitioning
return Optional.empty();
}

if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || table.getPartitionColumns().isEmpty()) {
return Optional.empty();
}

// TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function)
HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
table.getPartitionColumns().stream()
.map(Column::getType)
.collect(toList()),
OptionalInt.empty());
List<String> partitionedBy = table.getPartitionColumns().stream()
.map(Column::getName)
.collect(toList());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}

@Override
public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Expand All @@ -3073,7 +3054,27 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
validateCsvColumns(tableMetadata);
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
if (!bucketProperty.isPresent()) {
return Optional.empty();
List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || partitionedBy.isEmpty()) {
return Optional.empty();
}

List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator);
Map<String, HiveColumnHandle> columnHandlesByName = Maps.uniqueIndex(columnHandles, HiveColumnHandle::getName);
List<Column> partitionColumns = partitionedBy.stream()
.map(columnHandlesByName::get)
.map(columnHandle -> columnHandleToColumn(session, columnHandle))
.collect(toList());

// TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function)
HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
partitionColumns.stream()
.map(Column::getType)
.collect(toList()),
OptionalInt.empty());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}
checkArgument(bucketProperty.get().getBucketFunctionType().equals(BucketFunctionType.HIVE_COMPATIBLE),
"bucketFunctionType is expected to be HIVE_COMPATIBLE, got: %s",
Expand All @@ -3096,41 +3097,6 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
bucketedBy));
}

@Override
public Optional<ConnectorNewTableLayout> getPreferredShuffleLayoutForNewTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
validatePartitionColumns(tableMetadata);
validateBucketColumns(tableMetadata);
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
if (bucketProperty.isPresent()) {
// For bucketed table, table partitioning (i.e. the bucketing scheme) should be respected,
// and there is no additional preferred shuffle partitioning
return Optional.empty();
}

List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || partitionedBy.isEmpty()) {
return Optional.empty();
}

List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator);
Map<String, HiveColumnHandle> columnHandlesByName = Maps.uniqueIndex(columnHandles, HiveColumnHandle::getName);
List<Column> partitionColumns = partitionedBy.stream()
.map(columnHandlesByName::get)
.map(columnHandle -> columnHandleToColumn(session, columnHandle))
.collect(toList());

// TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function)
HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
partitionColumns.stream()
.map(Column::getType)
.collect(toList()),
OptionalInt.empty());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}

@Override
public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,6 @@ public Optional<NewTableLayout> getNewTableLayout(Session session, String catalo
return delegate.getNewTableLayout(session, catalogName, tableMetadata);
}

@Override
public Optional<NewTableLayout> getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
return delegate.getPreferredShuffleLayoutForNewTable(session, catalogName, tableMetadata);
}

@Override
public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional<NewTableLayout> layout)
{
Expand All @@ -308,12 +302,6 @@ public Optional<NewTableLayout> getInsertLayout(Session session, TableHandle tar
return delegate.getInsertLayout(session, target);
}

@Override
public Optional<NewTableLayout> getPreferredShuffleLayoutForInsert(Session session, TableHandle target)
{
return delegate.getPreferredShuffleLayoutForInsert(session, target);
}

@Override
public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,6 @@ public interface Metadata

Optional<NewTableLayout> getNewTableLayout(Session session, String catalogName, ConnectorTableMetadata tableMetadata);

@Experimental
Optional<NewTableLayout> getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata);

/**
* Begin the atomic creation of a table with data.
*/
Expand All @@ -261,9 +258,6 @@ public interface Metadata

Optional<NewTableLayout> getInsertLayout(Session session, TableHandle target);

@Experimental
Optional<NewTableLayout> getPreferredShuffleLayoutForInsert(Session session, TableHandle target);

/**
* Describes statistics that must be collected during a write.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,17 +733,6 @@ public Optional<NewTableLayout> getInsertLayout(Session session, TableHandle tab
.map(layout -> new NewTableLayout(connectorId, catalogMetadata.getTransactionHandleFor(connectorId), layout));
}

@Override
public Optional<NewTableLayout> getPreferredShuffleLayoutForInsert(Session session, TableHandle table)
{
ConnectorId connectorId = table.getConnectorId();
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, connectorId);
ConnectorMetadata metadata = catalogMetadata.getMetadata();

return metadata.getPreferredShuffleLayoutForInsert(session.toConnectorSession(connectorId), table.getConnectorHandle())
.map(layout -> new NewTableLayout(connectorId, catalogMetadata.getTransactionHandleFor(connectorId), layout));
}

@Override
public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
Expand Down Expand Up @@ -795,19 +784,6 @@ public Optional<NewTableLayout> getNewTableLayout(Session session, String catalo
.map(layout -> new NewTableLayout(connectorId, transactionHandle, layout));
}

@Override
public Optional<NewTableLayout> getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogName);
ConnectorId connectorId = catalogMetadata.getConnectorId();
ConnectorMetadata metadata = catalogMetadata.getMetadata();

ConnectorTransactionHandle transactionHandle = catalogMetadata.getTransactionHandleFor(connectorId);
ConnectorSession connectorSession = session.toConnectorSession(connectorId);
return metadata.getPreferredShuffleLayoutForNewTable(connectorSession, tableMetadata)
.map(layout -> new NewTableLayout(connectorId, transactionHandle, layout));
}

@Override
public void beginQuery(Session session, Set<ConnectorId> connectors)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ public static TableFinishNode createTemporaryTableWriteWithoutExchanges(
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE)),
Optional.of(insertReference),
outputVar,
Expand Down Expand Up @@ -349,7 +348,6 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
outputColumnNames,
outputNotNullColumnVariables,
Optional.of(partitioningScheme),
Optional.empty(),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE))),
Expand All @@ -371,7 +369,6 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
outputColumnNames,
outputNotNullColumnVariables,
Optional.of(partitioningScheme),
Optional.empty(),
enableStatsCollectionForTemporaryTable ? Optional.of(aggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,6 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<FragmentPr
if (node.getTablePartitioningScheme().isPresent()) {
context.get().setDistribution(node.getTablePartitioningScheme().get().getPartitioning().getHandle(), metadata, session);
}
if (node.getPreferredShufflePartitioningScheme().isPresent()) {
context.get().setDistribution(node.getPreferredShufflePartitioningScheme().get().getPartitioning().getHandle(), metadata, session);
}
return context.defaultRewrite(node, context.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ public Optional<PlanNode> visitTableWriter(TableWriterNode node, Context context
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
context.addPlan(node, new CanonicalPlan(result, strategy));
return Optional.of(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,6 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query)
analysis.getParameters(),
analysis.getCreateTableComment());
Optional<NewTableLayout> newTableLayout = metadata.getNewTableLayout(session, destination.getCatalogName(), tableMetadata);
Optional<NewTableLayout> preferredShuffleLayout = metadata.getPreferredShuffleLayoutForNewTable(session, destination.getCatalogName(), tableMetadata);

List<String> columnNames = tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden())
.map(ColumnMetadata::getName)
Expand All @@ -291,7 +289,6 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query)
columnNames,
tableMetadata.getColumns(),
newTableLayout,
preferredShuffleLayout,
statisticsMetadata);
}

Expand Down Expand Up @@ -372,7 +369,6 @@ private RelationPlan buildInternalInsertPlan(
plan = new RelationPlan(projectNode, scope, projectNode.getOutputVariables());

Optional<NewTableLayout> newTableLayout = metadata.getInsertLayout(session, tableHandle);
Optional<NewTableLayout> preferredShuffleLayout = metadata.getPreferredShuffleLayoutForInsert(session, tableHandle);

String catalogName = tableHandle.getConnectorId().getCatalogName();
TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadataForWrite(session, catalogName, tableMetadata.getMetadata());
Expand All @@ -384,7 +380,6 @@ private RelationPlan buildInternalInsertPlan(
visibleTableColumnNames,
visibleTableColumns,
newTableLayout,
preferredShuffleLayout,
statisticsMetadata);
}

Expand All @@ -395,11 +390,8 @@ private RelationPlan createTableWriterPlan(
List<String> columnNames,
List<ColumnMetadata> columnMetadataList,
Optional<NewTableLayout> writeTableLayout,
Optional<NewTableLayout> preferredShuffleLayout,
TableStatisticsMetadata statisticsMetadata)
{
verify(!(writeTableLayout.isPresent() && preferredShuffleLayout.isPresent()), "writeTableLayout and preferredShuffleLayout cannot both exist");

PlanNode source = plan.getRoot();

if (!analysis.isCreateTableAsSelectWithData()) {
Expand All @@ -408,7 +400,6 @@ private RelationPlan createTableWriterPlan(

List<VariableReferenceExpression> variables = plan.getFieldMappings();
Optional<PartitioningScheme> tablePartitioningScheme = getPartitioningSchemeForTableWrite(writeTableLayout, columnNames, variables);
Optional<PartitioningScheme> preferredShufflePartitioningScheme = getPartitioningSchemeForTableWrite(preferredShuffleLayout, columnNames, variables);

verify(columnNames.size() == variables.size(), "columnNames.size() != variables.size(): %s and %s", columnNames, variables);
Map<String, VariableReferenceExpression> columnToVariableMap = zip(columnNames.stream(), plan.getFieldMappings().stream(), SimpleImmutableEntry::new)
Expand Down Expand Up @@ -440,7 +431,6 @@ private RelationPlan createTableWriterPlan(
columnNames,
notNullColumnVariables,
tablePartitioningScheme,
preferredShufflePartitioningScheme,
// partial aggregation is run within the TableWriteOperator to calculate the statistics for
// the data consumed by the TableWriteOperator
Optional.of(aggregations.getPartialAggregation()),
Expand Down Expand Up @@ -471,7 +461,6 @@ private RelationPlan createTableWriterPlan(
columnNames,
notNullColumnVariables,
tablePartitioningScheme,
preferredShufflePartitioningScheme,
Optional.empty(),
Optional.empty(),
Optional.of(Boolean.FALSE)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class PushTableWriteThroughUnion
// guaranteed regardless of this optimizer. The level of local parallelism will be
// determined by LocalExecutionPlanner separately, and shouldn't be a concern of
// this optimizer.
.matching(tableWriter -> !(tableWriter.getTablePartitioningScheme().isPresent() || tableWriter.getPreferredShufflePartitioningScheme().isPresent()))
.matching(tableWriter -> !tableWriter.getTablePartitioningScheme().isPresent())
.with(source().matching(union().capturedAs(CHILD)));

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,6 @@ public Result apply(TableWriterNode node, Captures captures, Context context)
node.getColumnNames(),
node.getNotNullColumnVariables(),
node.getTablePartitioningScheme(),
node.getPreferredShufflePartitioningScheme(),
rewrittenStatisticsAggregation,
node.getTaskCountIfScaledWriter(),
node.getIsTemporaryTableWriter()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public Result apply(TableWriterNode node, Captures captures, Context context)
node.getColumnNames(),
node.getNotNullColumnVariables(),
node.getTablePartitioningScheme(),
node.getPreferredShufflePartitioningScheme(),
node.getStatisticsAggregation(),
Optional.of(initialTaskNumber),
node.getIsTemporaryTableWriter()));
Expand Down
Loading
Loading