Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop experimental.table-writer-merge-operator-enabled config #24145

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions presto-docs/src/main/sphinx/presto_cpp/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ Presto C++ workers.
optimizer.optimize-hash-generation=false
regex-library=RE2J
use-alternative-function-signatures=true
experimental.table-writer-merge-operator-enabled=false

These Presto coordinator configuration properties are described here, in
alphabetical order.
Expand All @@ -45,15 +44,6 @@ alphabetical order.

Set this property to ``0`` to disable canceling.

``experimental.table-writer-merge-operator-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``true``

Merge TableWriter output before sending to TableFinishOperator. This property must be set to
``false``.

``native-execution-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ public final class SystemSessionProperties
public static final String MAX_CONCURRENT_MATERIALIZATIONS = "max_concurrent_materializations";
public static final String PUSHDOWN_SUBFIELDS_ENABLED = "pushdown_subfields_enabled";
public static final String PUSHDOWN_SUBFIELDS_FROM_LAMBDA_ENABLED = "pushdown_subfields_from_lambda_enabled";
public static final String TABLE_WRITER_MERGE_OPERATOR_ENABLED = "table_writer_merge_operator_enabled";
public static final String INDEX_LOADER_TIMEOUT = "index_loader_timeout";
public static final String OPTIMIZED_REPARTITIONING_ENABLED = "optimized_repartitioning";
public static final String AGGREGATION_PARTITIONING_MERGING_STRATEGY = "aggregation_partitioning_merging_strategy";
Expand Down Expand Up @@ -1064,11 +1063,6 @@ public SystemSessionProperties(
"Experimental: enable dereference pushdown",
featuresConfig.isPushdownDereferenceEnabled(),
false),
booleanProperty(
TABLE_WRITER_MERGE_OPERATOR_ENABLED,
"Experimental: enable table writer merge operator",
featuresConfig.isTableWriterMergeOperatorEnabled(),
false),
new PropertyMetadata<>(
INDEX_LOADER_TIMEOUT,
"Timeout for loading indexes for index joins",
Expand Down Expand Up @@ -2515,11 +2509,6 @@ public static boolean isPushdownDereferenceEnabled(Session session)
return session.getSystemProperty(PUSHDOWN_DEREFERENCE_ENABLED, Boolean.class);
}

public static boolean isTableWriterMergeOperatorEnabled(Session session)
{
return session.getSystemProperty(TABLE_WRITER_MERGE_OPERATOR_ENABLED, Boolean.class);
}

public static Duration getIndexLoaderTimeout(Session session)
{
return session.getSystemProperty(INDEX_LOADER_TIMEOUT, Duration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import java.util.stream.Collectors;

import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR;
Expand Down Expand Up @@ -327,52 +326,32 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(

// Disabled by default. Enable when the column statistics are essential for future runtime adaptive plan optimizations
boolean enableStatsCollectionForTemporaryTable = SystemSessionProperties.isEnableStatsCollectionForTemporaryTable(session);

if (isTableWriterMergeOperatorEnabled(session)) {
StatisticAggregations.Parts localAggregations = splitIntoPartialAndIntermediate(aggregations.getPartialAggregation(), variableAllocator, metadata.getFunctionAndTypeManager());
tableWriterMerge = new TableWriterMergeNode(
sourceLocation,
idAllocator.getNextId(),
gatheringExchange(
idAllocator.getNextId(),
LOCAL,
new TableWriterNode(
sourceLocation,
idAllocator.getNextId(),
writerSource,
Optional.of(insertReference),
variableAllocator.newVariable("partialrows", BIGINT),
variableAllocator.newVariable("partialfragments", VARBINARY),
variableAllocator.newVariable("partialtablecommitcontext", VARBINARY),
outputs,
outputColumnNames,
outputNotNullColumnVariables,
Optional.of(partitioningScheme),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE))),
variableAllocator.newVariable("intermediaterows", BIGINT),
variableAllocator.newVariable("intermediatefragments", VARBINARY),
variableAllocator.newVariable("intermediatetablecommitcontext", VARBINARY),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getIntermediateAggregation()) : Optional.empty());
}
else {
tableWriterMerge = new TableWriterNode(
sourceLocation,
idAllocator.getNextId(),
writerSource,
Optional.of(insertReference),
variableAllocator.newVariable("partialrows", BIGINT),
variableAllocator.newVariable("partialfragments", VARBINARY),
variableAllocator.newVariable("partialtablecommitcontext", VARBINARY),
outputs,
outputColumnNames,
outputNotNullColumnVariables,
Optional.of(partitioningScheme),
enableStatsCollectionForTemporaryTable ? Optional.of(aggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE));
}
StatisticAggregations.Parts localAggregations = splitIntoPartialAndIntermediate(aggregations.getPartialAggregation(), variableAllocator, metadata.getFunctionAndTypeManager());
tableWriterMerge = new TableWriterMergeNode(
sourceLocation,
idAllocator.getNextId(),
gatheringExchange(
idAllocator.getNextId(),
LOCAL,
new TableWriterNode(
sourceLocation,
idAllocator.getNextId(),
writerSource,
Optional.of(insertReference),
variableAllocator.newVariable("partialrows", BIGINT),
variableAllocator.newVariable("partialfragments", VARBINARY),
variableAllocator.newVariable("partialtablecommitcontext", VARBINARY),
outputs,
outputColumnNames,
outputNotNullColumnVariables,
Optional.of(partitioningScheme),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE))),
variableAllocator.newVariable("intermediaterows", BIGINT),
variableAllocator.newVariable("intermediatefragments", VARBINARY),
variableAllocator.newVariable("intermediatetablecommitcontext", VARBINARY),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getIntermediateAggregation()) : Optional.empty());

return new TableFinishNode(
sourceLocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
"deprecated.legacy-join-using",
"use-legacy-scheduler",
"max-stage-retries",
"deprecated.group-by-uses-equal"})
"deprecated.group-by-uses-equal",
"experimental.table-writer-merge-operator-enabled"})
public class FeaturesConfig
{
@VisibleForTesting
Expand Down Expand Up @@ -177,8 +178,6 @@ public class FeaturesConfig
private boolean pushdownSubfieldsEnabled;
private boolean pushdownSubfieldsFromLambdaEnabled;

private boolean tableWriterMergeOperatorEnabled = true;

private Duration indexLoaderTimeout = new Duration(20, SECONDS);

private boolean listBuiltInFunctionsOnly = true;
Expand Down Expand Up @@ -1693,18 +1692,6 @@ public boolean isPushdownDereferenceEnabled()
return pushdownDereferenceEnabled;
}

public boolean isTableWriterMergeOperatorEnabled()
{
return tableWriterMergeOperatorEnabled;
}

@Config("experimental.table-writer-merge-operator-enabled")
public FeaturesConfig setTableWriterMergeOperatorEnabled(boolean tableWriterMergeOperatorEnabled)
{
this.tableWriterMergeOperatorEnabled = tableWriterMergeOperatorEnabled;
return this;
}

@Config("index-loader-timeout")
@ConfigDescription("Time limit for loading indexes for index joins")
public FeaturesConfig setIndexLoaderTimeout(Duration indexLoaderTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
import static com.facebook.presto.spi.StandardWarningCode.TOO_MANY_STAGES;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
Expand Down Expand Up @@ -168,7 +167,6 @@ private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan,
* - One table writer per task
*/
boolean recoverable = isRecoverableGroupedExecutionEnabled(session) &&
isTableWriterMergeOperatorEnabled(session) &&
parentContainsTableFinish &&
(fragment.getRoot() instanceof TableWriterMergeNode || fragment.getRoot() instanceof TableWriterNode) &&
properties.isRecoveryEligible();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import static com.facebook.presto.SystemSessionProperties.isQuickDistinctLimitEnabled;
import static com.facebook.presto.SystemSessionProperties.isSegmentedAggregationEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.operator.aggregation.AggregationUtils.hasSingleNodeExecutionPreference;
Expand Down Expand Up @@ -549,10 +548,6 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
return planAndEnforceChildren(originalTableWriterNode, singleStream(), defaultParallelism(session));
}

if (!isTableWriterMergeOperatorEnabled(session)) {
return planAndEnforceChildren(originalTableWriterNode, fixedParallelism(), fixedParallelism());
}

Optional<StatisticAggregations.Parts> statisticAggregations = originalTableWriterNode
.getStatisticsAggregation()
.map(aggregations -> splitIntoPartialAndIntermediate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public void testDefaults()
.setPushdownSubfieldsEnabled(false)
.setPushdownSubfieldsFromLambdaEnabled(false)
.setPushdownDereferenceEnabled(false)
.setTableWriterMergeOperatorEnabled(true)
.setIndexLoaderTimeout(new Duration(20, SECONDS))
.setOptimizedRepartitioningEnabled(false)
.setListBuiltInFunctionsOnly(true)
Expand Down Expand Up @@ -349,7 +348,6 @@ public void testExplicitPropertyMappings()
.put("experimental.pushdown-subfields-enabled", "true")
.put("pushdown-subfields-from-lambda-enabled", "true")
.put("experimental.pushdown-dereference-enabled", "true")
.put("experimental.table-writer-merge-operator-enabled", "false")
.put("index-loader-timeout", "10s")
.put("experimental.optimized-repartitioning", "true")
.put("list-built-in-functions-only", "false")
Expand Down Expand Up @@ -542,7 +540,6 @@ public void testExplicitPropertyMappings()
.setPushdownSubfieldsEnabled(true)
.setPushdownSubfieldsFromLambdaEnabled(true)
.setPushdownDereferenceEnabled(true)
.setTableWriterMergeOperatorEnabled(false)
.setIndexLoaderTimeout(new Duration(10, SECONDS))
.setOptimizedRepartitioningEnabled(true)
.setListBuiltInFunctionsOnly(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public static Map<String, String> getNativeWorkerSystemProperties()
// To achieve that, we set inline-sql-functions to false.
.put("inline-sql-functions", "false")
.put("use-alternative-function-signatures", "true")
.put("experimental.table-writer-merge-operator-enabled", "false")
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public void testTableFormats()
{
Session session = Session.builder(getSession())
.setSystemProperty("scale_writers", "true")
.setSystemProperty("table_writer_merge_operator_enabled", "true")
.setSystemProperty("task_writer_count", "1")
.setSystemProperty("task_partitioned_writer_count", "2")
.setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true")
Expand Down Expand Up @@ -441,7 +440,6 @@ private Session buildSessionForTableWrite()
{
return Session.builder(getSession())
.setSystemProperty("scale_writers", "true")
.setSystemProperty("table_writer_merge_operator_enabled", "true")
.setSystemProperty("task_writer_count", "1")
.setSystemProperty("task_partitioned_writer_count", "2")
.setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public void testJsonFileBasedFunction()
public void testAggregationCompanionFunction()
{
Session session = Session.builder(getSession())
.setSystemProperty("table_writer_merge_operator_enabled", "false")
.setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "false")
.setCatalogSessionProperty("hive", "orc_compression_codec", "ZSTD")
.build();
Expand Down
Loading