diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index cb55230ed41de..abad533ec81d8 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -2995,7 +2995,22 @@ public Optional getInsertLayout(ConnectorSession sessio Optional hiveBucketHandle = getHiveBucketHandle(session, table); if (!hiveBucketHandle.isPresent()) { - return Optional.empty(); + if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || table.getPartitionColumns().isEmpty()) { + return Optional.empty(); + } + + // TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function) + HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle( + SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE, + table.getPartitionColumns().stream() + .map(Column::getType) + .collect(toList()), + OptionalInt.empty()); + List partitionedBy = table.getPartitionColumns().stream() + .map(Column::getName) + .collect(toList()); + + return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy)); } HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty() .orElseThrow(() -> new NoSuchElementException("Bucket property should be set")); @@ -3031,40 +3046,6 @@ public Optional getInsertLayout(ConnectorSession sessio return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionColumns)); } - @Override - public Optional getPreferredShuffleLayoutForInsert(ConnectorSession session, ConnectorTableHandle tableHandle) - { - HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle; - SchemaTableName tableName = hiveTableHandle.getSchemaTableName(); - MetastoreContext metastoreContext = getMetastoreContext(session); - Table table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()) - .orElseThrow(() -> new TableNotFoundException(tableName)); - - Optional hiveBucketHandle = getHiveBucketHandle(session, table); - if (hiveBucketHandle.isPresent()) { - // For bucketed table, table partitioning (i.e. the bucketing scheme) should be respected, - // and there is no additional preferred shuffle partitioning - return Optional.empty(); - } - - if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || table.getPartitionColumns().isEmpty()) { - return Optional.empty(); - } - - // TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function) - HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle( - SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE, - table.getPartitionColumns().stream() - .map(Column::getType) - .collect(toList()), - OptionalInt.empty()); - List partitionedBy = table.getPartitionColumns().stream() - .map(Column::getName) - .collect(toList()); - - return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy)); - } - @Override public Optional getNewTableLayout(ConnectorSession session, ConnectorTableMetadata tableMetadata) { @@ -3073,7 +3054,27 @@ public Optional getNewTableLayout(ConnectorSession sess validateCsvColumns(tableMetadata); Optional bucketProperty = getBucketProperty(tableMetadata.getProperties()); if (!bucketProperty.isPresent()) { - return Optional.empty(); + List partitionedBy = getPartitionedBy(tableMetadata.getProperties()); + if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || partitionedBy.isEmpty()) { + return Optional.empty(); + } + + List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator); + Map columnHandlesByName = Maps.uniqueIndex(columnHandles, HiveColumnHandle::getName); + List partitionColumns = partitionedBy.stream() + .map(columnHandlesByName::get) + .map(columnHandle -> columnHandleToColumn(session, columnHandle)) + .collect(toList()); + + // TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function) + HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle( + SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE, + partitionColumns.stream() + .map(Column::getType) + .collect(toList()), + OptionalInt.empty()); + + return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy)); } checkArgument(bucketProperty.get().getBucketFunctionType().equals(BucketFunctionType.HIVE_COMPATIBLE), "bucketFunctionType is expected to be HIVE_COMPATIBLE, got: %s", @@ -3096,41 +3097,6 @@ public Optional getNewTableLayout(ConnectorSession sess bucketedBy)); } - @Override - public Optional getPreferredShuffleLayoutForNewTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) - { - validatePartitionColumns(tableMetadata); - validateBucketColumns(tableMetadata); - Optional bucketProperty = getBucketProperty(tableMetadata.getProperties()); - if (bucketProperty.isPresent()) { - // For bucketed table, table partitioning (i.e. the bucketing scheme) should be respected, - // and there is no additional preferred shuffle partitioning - return Optional.empty(); - } - - List partitionedBy = getPartitionedBy(tableMetadata.getProperties()); - if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || partitionedBy.isEmpty()) { - return Optional.empty(); - } - - List columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator); - Map columnHandlesByName = Maps.uniqueIndex(columnHandles, HiveColumnHandle::getName); - List partitionColumns = partitionedBy.stream() - .map(columnHandlesByName::get) - .map(columnHandle -> columnHandleToColumn(session, columnHandle)) - .collect(toList()); - - // TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function) - HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle( - SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE, - partitionColumns.stream() - .map(Column::getType) - .collect(toList()), - OptionalInt.empty()); - - return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy)); - } - @Override public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(ConnectorSession session, ConnectorTableMetadata tableMetadata) { diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DelegatingMetadataManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/DelegatingMetadataManager.java index 6d69af471ee9e..8954086f99db4 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/DelegatingMetadataManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DelegatingMetadataManager.java @@ -280,12 +280,6 @@ public Optional getNewTableLayout(Session session, String catalo return delegate.getNewTableLayout(session, catalogName, tableMetadata); } - @Override - public Optional getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata) - { - return delegate.getPreferredShuffleLayoutForNewTable(session, catalogName, tableMetadata); - } - @Override public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional layout) { @@ -308,12 +302,6 @@ public Optional getInsertLayout(Session session, TableHandle tar return delegate.getInsertLayout(session, target); } - @Override - public Optional getPreferredShuffleLayoutForInsert(Session session, TableHandle target) - { - return delegate.getPreferredShuffleLayoutForInsert(session, target); - } - @Override public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Session session, String catalogName, ConnectorTableMetadata tableMetadata) { diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java b/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java index d47d5fe1bb630..1cc99fcce3b9b 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java @@ -246,9 +246,6 @@ public interface Metadata Optional getNewTableLayout(Session session, String catalogName, ConnectorTableMetadata tableMetadata); - @Experimental - Optional getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata); - /** * Begin the atomic creation of a table with data. */ @@ -261,9 +258,6 @@ public interface Metadata Optional getInsertLayout(Session session, TableHandle target); - @Experimental - Optional getPreferredShuffleLayoutForInsert(Session session, TableHandle target); - /** * Describes statistics that must be collected during a write. */ diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java index f001d50b43271..7f9812c48f891 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java @@ -733,17 +733,6 @@ public Optional getInsertLayout(Session session, TableHandle tab .map(layout -> new NewTableLayout(connectorId, catalogMetadata.getTransactionHandleFor(connectorId), layout)); } - @Override - public Optional getPreferredShuffleLayoutForInsert(Session session, TableHandle table) - { - ConnectorId connectorId = table.getConnectorId(); - CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, connectorId); - ConnectorMetadata metadata = catalogMetadata.getMetadata(); - - return metadata.getPreferredShuffleLayoutForInsert(session.toConnectorSession(connectorId), table.getConnectorHandle()) - .map(layout -> new NewTableLayout(connectorId, catalogMetadata.getTransactionHandleFor(connectorId), layout)); - } - @Override public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Session session, String catalogName, ConnectorTableMetadata tableMetadata) { @@ -795,19 +784,6 @@ public Optional getNewTableLayout(Session session, String catalo .map(layout -> new NewTableLayout(connectorId, transactionHandle, layout)); } - @Override - public Optional getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata) - { - CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogName); - ConnectorId connectorId = catalogMetadata.getConnectorId(); - ConnectorMetadata metadata = catalogMetadata.getMetadata(); - - ConnectorTransactionHandle transactionHandle = catalogMetadata.getTransactionHandleFor(connectorId); - ConnectorSession connectorSession = session.toConnectorSession(connectorId); - return metadata.getPreferredShuffleLayoutForNewTable(connectorSession, tableMetadata) - .map(layout -> new NewTableLayout(connectorId, transactionHandle, layout)); - } - @Override public void beginQuery(Session session, Set connectors) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java b/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java index a88000274d3c5..df31b5d4c8544 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java @@ -212,7 +212,6 @@ public static TableFinishNode createTemporaryTableWriteWithoutExchanges( Optional.empty(), Optional.empty(), Optional.empty(), - Optional.empty(), Optional.of(Boolean.TRUE)), Optional.of(insertReference), outputVar, @@ -349,7 +348,6 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges( outputColumnNames, outputNotNullColumnVariables, Optional.of(partitioningScheme), - Optional.empty(), enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(), Optional.empty(), Optional.of(Boolean.TRUE))), @@ -371,7 +369,6 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges( outputColumnNames, outputNotNullColumnVariables, Optional.of(partitioningScheme), - Optional.empty(), enableStatsCollectionForTemporaryTable ? Optional.of(aggregations.getPartialAggregation()) : Optional.empty(), Optional.empty(), Optional.of(Boolean.TRUE)); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index 18a2e199468cf..eba7eeb25efd6 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -255,9 +255,6 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext visitTableWriter(TableWriterNode node, Context context Optional.empty(), Optional.empty(), Optional.empty(), - Optional.empty(), Optional.empty()); context.addPlan(node, new CanonicalPlan(result, strategy)); return Optional.of(result); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java index e7d79325c6cd2..9d0d1aaaf1804 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java @@ -275,8 +275,6 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query) analysis.getParameters(), analysis.getCreateTableComment()); Optional newTableLayout = metadata.getNewTableLayout(session, destination.getCatalogName(), tableMetadata); - Optional preferredShuffleLayout = metadata.getPreferredShuffleLayoutForNewTable(session, destination.getCatalogName(), tableMetadata); - List columnNames = tableMetadata.getColumns().stream() .filter(column -> !column.isHidden()) .map(ColumnMetadata::getName) @@ -291,7 +289,6 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query) columnNames, tableMetadata.getColumns(), newTableLayout, - preferredShuffleLayout, statisticsMetadata); } @@ -372,7 +369,6 @@ private RelationPlan buildInternalInsertPlan( plan = new RelationPlan(projectNode, scope, projectNode.getOutputVariables()); Optional newTableLayout = metadata.getInsertLayout(session, tableHandle); - Optional preferredShuffleLayout = metadata.getPreferredShuffleLayoutForInsert(session, tableHandle); String catalogName = tableHandle.getConnectorId().getCatalogName(); TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadataForWrite(session, catalogName, tableMetadata.getMetadata()); @@ -384,7 +380,6 @@ private RelationPlan buildInternalInsertPlan( visibleTableColumnNames, visibleTableColumns, newTableLayout, - preferredShuffleLayout, statisticsMetadata); } @@ -395,11 +390,8 @@ private RelationPlan createTableWriterPlan( List columnNames, List columnMetadataList, Optional writeTableLayout, - Optional preferredShuffleLayout, TableStatisticsMetadata statisticsMetadata) { - verify(!(writeTableLayout.isPresent() && preferredShuffleLayout.isPresent()), "writeTableLayout and preferredShuffleLayout cannot both exist"); - PlanNode source = plan.getRoot(); if (!analysis.isCreateTableAsSelectWithData()) { @@ -408,7 +400,6 @@ private RelationPlan createTableWriterPlan( List variables = plan.getFieldMappings(); Optional tablePartitioningScheme = getPartitioningSchemeForTableWrite(writeTableLayout, columnNames, variables); - Optional preferredShufflePartitioningScheme = getPartitioningSchemeForTableWrite(preferredShuffleLayout, columnNames, variables); verify(columnNames.size() == variables.size(), "columnNames.size() != variables.size(): %s and %s", columnNames, variables); Map columnToVariableMap = zip(columnNames.stream(), plan.getFieldMappings().stream(), SimpleImmutableEntry::new) @@ -440,7 +431,6 @@ private RelationPlan createTableWriterPlan( columnNames, notNullColumnVariables, tablePartitioningScheme, - preferredShufflePartitioningScheme, // partial aggregation is run within the TableWriteOperator to calculate the statistics for // the data consumed by the TableWriteOperator Optional.of(aggregations.getPartialAggregation()), @@ -471,7 +461,6 @@ private RelationPlan createTableWriterPlan( columnNames, notNullColumnVariables, tablePartitioningScheme, - preferredShufflePartitioningScheme, Optional.empty(), Optional.empty(), Optional.of(Boolean.FALSE)), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java index 09745d2619435..9df8e9d8963a2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java @@ -52,7 +52,7 @@ public class PushTableWriteThroughUnion // guaranteed regardless of this optimizer. The level of local parallelism will be // determined by LocalExecutionPlanner separately, and shouldn't be a concern of // this optimizer. - .matching(tableWriter -> !(tableWriter.getTablePartitioningScheme().isPresent() || tableWriter.getPreferredShufflePartitioningScheme().isPresent())) + .matching(tableWriter -> !tableWriter.getTablePartitioningScheme().isPresent()) .with(source().matching(union().capturedAs(CHILD))); @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RowExpressionRewriteRuleSet.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RowExpressionRewriteRuleSet.java index 9dd42cbc639e2..f448b8024fcb1 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RowExpressionRewriteRuleSet.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RowExpressionRewriteRuleSet.java @@ -587,7 +587,6 @@ public Result apply(TableWriterNode node, Captures captures, Context context) node.getColumnNames(), node.getNotNullColumnVariables(), node.getTablePartitioningScheme(), - node.getPreferredShufflePartitioningScheme(), rewrittenStatisticsAggregation, node.getTaskCountIfScaledWriter(), node.getIsTemporaryTableWriter())); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/ScaledWriterRule.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/ScaledWriterRule.java index 73fad6e2e7089..527242616a3a2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/ScaledWriterRule.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/ScaledWriterRule.java @@ -78,7 +78,6 @@ public Result apply(TableWriterNode node, Captures captures, Context context) node.getColumnNames(), node.getNotNullColumnVariables(), node.getTablePartitioningScheme(), - node.getPreferredShufflePartitioningScheme(), node.getStatisticsAggregation(), Optional.of(initialTaskNumber), node.getIsTemporaryTableWriter())); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 5644d778ff417..6e96c4b29c2c9 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -650,10 +650,6 @@ public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProper PlanWithProperties source = accept(node.getSource(), preferredProperties); Optional shufflePartitioningScheme = node.getTablePartitioningScheme(); - if (!shufflePartitioningScheme.isPresent()) { - shufflePartitioningScheme = node.getPreferredShufflePartitioningScheme(); - } - if (!shufflePartitioningScheme.isPresent()) { if (scaleWriters) { shufflePartitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), source.getNode().getOutputVariables())); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java index 9764defd1aaaf..0fde898e14801 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java @@ -580,7 +580,6 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo originalTableWriterNode.getColumnNames(), originalTableWriterNode.getNotNullColumnVariables(), originalTableWriterNode.getTablePartitioningScheme(), - originalTableWriterNode.getPreferredShufflePartitioningScheme(), statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation), originalTableWriterNode.getTaskCountIfScaledWriter(), originalTableWriterNode.getIsTemporaryTableWriter()), @@ -606,7 +605,6 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo originalTableWriterNode.getColumnNames(), originalTableWriterNode.getNotNullColumnVariables(), originalTableWriterNode.getTablePartitioningScheme(), - originalTableWriterNode.getPreferredShufflePartitioningScheme(), statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation), originalTableWriterNode.getTaskCountIfScaledWriter(), originalTableWriterNode.getIsTemporaryTableWriter()), @@ -636,7 +634,6 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo originalTableWriterNode.getColumnNames(), originalTableWriterNode.getNotNullColumnVariables(), originalTableWriterNode.getTablePartitioningScheme(), - originalTableWriterNode.getPreferredShufflePartitioningScheme(), statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation), originalTableWriterNode.getTaskCountIfScaledWriter(), originalTableWriterNode.getIsTemporaryTableWriter()), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java index adf739944fb31..fe8c1e11ec756 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java @@ -727,11 +727,6 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext canonicalize(partitioningScheme, source)), - node.getPreferredShufflePartitioningScheme().map(partitioningScheme -> canonicalize(partitioningScheme, source)), node.getStatisticsAggregation().map(this::map), node.getTaskCountIfScaledWriter(), node.getIsTemporaryTableWriter()); diff --git a/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java b/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java index b822bfbf2a90f..48210df7d5594 100644 --- a/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java +++ b/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java @@ -320,12 +320,6 @@ public Optional getNewTableLayout(Session session, String catalo throw new UnsupportedOperationException(); } - @Override - public Optional getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata) - { - throw new UnsupportedOperationException(); - } - @Override public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional layout) { @@ -344,12 +338,6 @@ public Optional getInsertLayout(Session session, TableHandle tar throw new UnsupportedOperationException(); } - @Override - public Optional getPreferredShuffleLayoutForInsert(Session session, TableHandle target) - { - throw new UnsupportedOperationException(); - } - @Override public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Session session, String catalogName, ConnectorTableMetadata tableMetadata) { diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java index 41674ca683335..fa44b4d837e4e 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java @@ -894,7 +894,6 @@ public TableWriterNode tableWriter(List columns, Li Optional.empty(), Optional.empty(), Optional.empty(), - Optional.empty(), Optional.empty()); } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java index 8225d1d129a0f..06ebb4acbc12d 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java @@ -413,19 +413,6 @@ default Optional getNewTableLayout(ConnectorSession ses return Optional.empty(); } - /** - * A connector can have preferred shuffle layout for table write. - * For example, Hive connector might prefer to shuffle on partitioned columns for partitioned unbucketed table. - * - * @apiNote this method and {@link #getNewTableLayout} cannot both return non-empty table layout. - * @see #getPreferredShuffleLayoutForInsert - */ - @Experimental - default Optional getPreferredShuffleLayoutForNewTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) - { - return Optional.empty(); - } - /** * Get the physical layout for a inserting into an existing table. */ @@ -448,19 +435,6 @@ default Optional getInsertLayout(ConnectorSession sessi return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionColumns)); } - /** - * A connector can have preferred shuffle layout for table write. - * For example, Hive connector might prefer to shuffle on partitioned columns for partitioned unbucketed table. - * - * @apiNote this method and {@link #getInsertLayout} cannot both return non-empty table layout. - * @see #getPreferredShuffleLayoutForNewTable - */ - @Experimental - default Optional getPreferredShuffleLayoutForInsert(ConnectorSession session, ConnectorTableHandle tableHandle) - { - return Optional.empty(); - } - /** * Describes statistics that must be collected during a write. */ diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java index 2201e0b98b0bb..f1e5407176141 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java @@ -155,14 +155,6 @@ public Optional getNewTableLayout(ConnectorSession sess } } - @Override - public Optional getPreferredShuffleLayoutForNewTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) - { - try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { - return delegate.getPreferredShuffleLayoutForNewTable(session, tableMetadata); - } - } - @Override public Optional getInsertLayout(ConnectorSession session, ConnectorTableHandle tableHandle) { @@ -171,14 +163,6 @@ public Optional getInsertLayout(ConnectorSession sessio } } - @Override - public Optional getPreferredShuffleLayoutForInsert(ConnectorSession session, ConnectorTableHandle tableHandle) - { - try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { - return delegate.getPreferredShuffleLayoutForInsert(session, tableHandle); - } - } - @Override public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(ConnectorSession session, ConnectorTableMetadata tableMetadata) { diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java index 98f5a39e4cabc..0f15ad358d8b2 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java @@ -51,7 +51,6 @@ public class TableWriterNode private final List columnNames; private final Set notNullColumnVariables; private final Optional tablePartitioningScheme; - private final Optional preferredShufflePartitioningScheme; private final Optional statisticsAggregation; private final List outputs; private final Optional taskCountIfScaledWriter; @@ -70,12 +69,26 @@ public TableWriterNode( @JsonProperty("columnNames") List columnNames, @JsonProperty("notNullColumnVariables") Set notNullColumnVariables, @JsonProperty("partitioningScheme") Optional tablePartitioningScheme, - @JsonProperty("preferredShufflePartitioningScheme") Optional preferredShufflePartitioningScheme, @JsonProperty("statisticsAggregation") Optional statisticsAggregation, @JsonProperty("taskCountIfScaledWriter") Optional taskCountIfScaledWriter, @JsonProperty("isTemporaryTableWriter") Optional isTemporaryTableWriter) { - this(sourceLocation, id, Optional.empty(), source, target, rowCountVariable, fragmentVariable, tableCommitContextVariable, columns, columnNames, notNullColumnVariables, tablePartitioningScheme, preferredShufflePartitioningScheme, statisticsAggregation, taskCountIfScaledWriter, isTemporaryTableWriter); + this( + sourceLocation, + id, + Optional.empty(), + source, + target, + rowCountVariable, + fragmentVariable, + tableCommitContextVariable, + columns, + columnNames, + notNullColumnVariables, + tablePartitioningScheme, + statisticsAggregation, + taskCountIfScaledWriter, + isTemporaryTableWriter); } public TableWriterNode( @@ -91,7 +104,6 @@ public TableWriterNode( List columnNames, Set notNullColumnVariables, Optional tablePartitioningScheme, - Optional preferredShufflePartitioningScheme, Optional statisticsAggregation, Optional taskCountIfScaledWriter, Optional isTemporaryTableWriter) @@ -101,9 +113,6 @@ public TableWriterNode( requireNonNull(columns, "columns is null"); requireNonNull(columnNames, "columnNames is null"); checkArgument(columns.size() == columnNames.size(), "columns and columnNames sizes don't match"); - checkArgument( - !(tablePartitioningScheme.isPresent() && preferredShufflePartitioningScheme.isPresent()), - "tablePartitioningScheme and preferredShufflePartitioningScheme cannot both exist"); this.source = requireNonNull(source, "source is null"); this.target = requireNonNull(target, "target is null"); @@ -114,7 +123,6 @@ public TableWriterNode( this.columnNames = Collections.unmodifiableList(new ArrayList<>(columnNames)); this.notNullColumnVariables = Collections.unmodifiableSet(new LinkedHashSet<>(requireNonNull(notNullColumnVariables, "notNullColumns is null"))); this.tablePartitioningScheme = requireNonNull(tablePartitioningScheme, "partitioningScheme is null"); - this.preferredShufflePartitioningScheme = requireNonNull(preferredShufflePartitioningScheme, "preferredShufflePartitioningScheme is null"); this.statisticsAggregation = requireNonNull(statisticsAggregation, "statisticsAggregation is null"); List outputsList = new ArrayList<>(); @@ -184,12 +192,6 @@ public Optional getTablePartitioningScheme() return tablePartitioningScheme; } - @JsonProperty - public Optional getPreferredShufflePartitioningScheme() - { - return preferredShufflePartitioningScheme; - } - @JsonProperty public Optional getStatisticsAggregation() { @@ -243,7 +245,6 @@ public PlanNode replaceChildren(List newChildren) columnNames, notNullColumnVariables, tablePartitioningScheme, - preferredShufflePartitioningScheme, statisticsAggregation, taskCountIfScaledWriter, isTemporaryTableWriter); } @@ -264,7 +265,6 @@ public PlanNode assignStatsEquivalentPlanNode(Optional statsEquivalent columnNames, notNullColumnVariables, tablePartitioningScheme, - preferredShufflePartitioningScheme, statisticsAggregation, taskCountIfScaledWriter, isTemporaryTableWriter); }