Skip to content

Commit

Permalink
Drop experimental.table-writer-merge-operator-enabled config
Browse files Browse the repository at this point in the history
Enabled by default for a very long time
  • Loading branch information
arhimondr committed Nov 26, 2024
1 parent 7cc07c8 commit b7501f9
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 97 deletions.
10 changes: 0 additions & 10 deletions presto-docs/src/main/sphinx/presto_cpp/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ Presto C++ workers.
optimizer.optimize-hash-generation=false
regex-library=RE2J
use-alternative-function-signatures=true
experimental.table-writer-merge-operator-enabled=false
These Presto coordinator configuration properties are described here, in
alphabetical order.
Expand All @@ -45,15 +44,6 @@ alphabetical order.

Set this property to ``0`` to disable canceling.

``experimental.table-writer-merge-operator-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``true``

Merge TableWriter output before sending to TableFinishOperator. This property must be set to
``false``.

``native-execution-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ public final class SystemSessionProperties
public static final String MAX_CONCURRENT_MATERIALIZATIONS = "max_concurrent_materializations";
public static final String PUSHDOWN_SUBFIELDS_ENABLED = "pushdown_subfields_enabled";
public static final String PUSHDOWN_SUBFIELDS_FROM_LAMBDA_ENABLED = "pushdown_subfields_from_lambda_enabled";
public static final String TABLE_WRITER_MERGE_OPERATOR_ENABLED = "table_writer_merge_operator_enabled";
public static final String INDEX_LOADER_TIMEOUT = "index_loader_timeout";
public static final String OPTIMIZED_REPARTITIONING_ENABLED = "optimized_repartitioning";
public static final String AGGREGATION_PARTITIONING_MERGING_STRATEGY = "aggregation_partitioning_merging_strategy";
Expand Down Expand Up @@ -1064,11 +1063,6 @@ public SystemSessionProperties(
"Experimental: enable dereference pushdown",
featuresConfig.isPushdownDereferenceEnabled(),
false),
booleanProperty(
TABLE_WRITER_MERGE_OPERATOR_ENABLED,
"Experimental: enable table writer merge operator",
featuresConfig.isTableWriterMergeOperatorEnabled(),
false),
new PropertyMetadata<>(
INDEX_LOADER_TIMEOUT,
"Timeout for loading indexes for index joins",
Expand Down Expand Up @@ -2515,11 +2509,6 @@ public static boolean isPushdownDereferenceEnabled(Session session)
return session.getSystemProperty(PUSHDOWN_DEREFERENCE_ENABLED, Boolean.class);
}

public static boolean isTableWriterMergeOperatorEnabled(Session session)
{
return session.getSystemProperty(TABLE_WRITER_MERGE_OPERATOR_ENABLED, Boolean.class);
}

public static Duration getIndexLoaderTimeout(Session session)
{
return session.getSystemProperty(INDEX_LOADER_TIMEOUT, Duration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import java.util.stream.Collectors;

import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR;
Expand Down Expand Up @@ -327,52 +326,32 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(

// Disabled by default. Enable when the column statistics are essential for future runtime adaptive plan optimizations
boolean enableStatsCollectionForTemporaryTable = SystemSessionProperties.isEnableStatsCollectionForTemporaryTable(session);

if (isTableWriterMergeOperatorEnabled(session)) {
StatisticAggregations.Parts localAggregations = splitIntoPartialAndIntermediate(aggregations.getPartialAggregation(), variableAllocator, metadata.getFunctionAndTypeManager());
tableWriterMerge = new TableWriterMergeNode(
sourceLocation,
idAllocator.getNextId(),
gatheringExchange(
idAllocator.getNextId(),
LOCAL,
new TableWriterNode(
sourceLocation,
idAllocator.getNextId(),
writerSource,
Optional.of(insertReference),
variableAllocator.newVariable("partialrows", BIGINT),
variableAllocator.newVariable("partialfragments", VARBINARY),
variableAllocator.newVariable("partialtablecommitcontext", VARBINARY),
outputs,
outputColumnNames,
outputNotNullColumnVariables,
Optional.of(partitioningScheme),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE))),
variableAllocator.newVariable("intermediaterows", BIGINT),
variableAllocator.newVariable("intermediatefragments", VARBINARY),
variableAllocator.newVariable("intermediatetablecommitcontext", VARBINARY),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getIntermediateAggregation()) : Optional.empty());
}
else {
tableWriterMerge = new TableWriterNode(
sourceLocation,
idAllocator.getNextId(),
writerSource,
Optional.of(insertReference),
variableAllocator.newVariable("partialrows", BIGINT),
variableAllocator.newVariable("partialfragments", VARBINARY),
variableAllocator.newVariable("partialtablecommitcontext", VARBINARY),
outputs,
outputColumnNames,
outputNotNullColumnVariables,
Optional.of(partitioningScheme),
enableStatsCollectionForTemporaryTable ? Optional.of(aggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE));
}
StatisticAggregations.Parts localAggregations = splitIntoPartialAndIntermediate(aggregations.getPartialAggregation(), variableAllocator, metadata.getFunctionAndTypeManager());
tableWriterMerge = new TableWriterMergeNode(
sourceLocation,
idAllocator.getNextId(),
gatheringExchange(
idAllocator.getNextId(),
LOCAL,
new TableWriterNode(
sourceLocation,
idAllocator.getNextId(),
writerSource,
Optional.of(insertReference),
variableAllocator.newVariable("partialrows", BIGINT),
variableAllocator.newVariable("partialfragments", VARBINARY),
variableAllocator.newVariable("partialtablecommitcontext", VARBINARY),
outputs,
outputColumnNames,
outputNotNullColumnVariables,
Optional.of(partitioningScheme),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE))),
variableAllocator.newVariable("intermediaterows", BIGINT),
variableAllocator.newVariable("intermediatefragments", VARBINARY),
variableAllocator.newVariable("intermediatetablecommitcontext", VARBINARY),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getIntermediateAggregation()) : Optional.empty());

return new TableFinishNode(
sourceLocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
"deprecated.legacy-join-using",
"use-legacy-scheduler",
"max-stage-retries",
"deprecated.group-by-uses-equal"})
"deprecated.group-by-uses-equal",
"experimental.table-writer-merge-operator-enabled"})
public class FeaturesConfig
{
@VisibleForTesting
Expand Down Expand Up @@ -177,8 +178,6 @@ public class FeaturesConfig
private boolean pushdownSubfieldsEnabled;
private boolean pushdownSubfieldsFromLambdaEnabled;

private boolean tableWriterMergeOperatorEnabled = true;

private Duration indexLoaderTimeout = new Duration(20, SECONDS);

private boolean listBuiltInFunctionsOnly = true;
Expand Down Expand Up @@ -1693,18 +1692,6 @@ public boolean isPushdownDereferenceEnabled()
return pushdownDereferenceEnabled;
}

public boolean isTableWriterMergeOperatorEnabled()
{
return tableWriterMergeOperatorEnabled;
}

@Config("experimental.table-writer-merge-operator-enabled")
public FeaturesConfig setTableWriterMergeOperatorEnabled(boolean tableWriterMergeOperatorEnabled)
{
this.tableWriterMergeOperatorEnabled = tableWriterMergeOperatorEnabled;
return this;
}

@Config("index-loader-timeout")
@ConfigDescription("Time limit for loading indexes for index joins")
public FeaturesConfig setIndexLoaderTimeout(Duration indexLoaderTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
import static com.facebook.presto.spi.StandardWarningCode.TOO_MANY_STAGES;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
Expand Down Expand Up @@ -168,7 +167,6 @@ private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan,
* - One table writer per task
*/
boolean recoverable = isRecoverableGroupedExecutionEnabled(session) &&
isTableWriterMergeOperatorEnabled(session) &&
parentContainsTableFinish &&
(fragment.getRoot() instanceof TableWriterMergeNode || fragment.getRoot() instanceof TableWriterNode) &&
properties.isRecoveryEligible();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import static com.facebook.presto.SystemSessionProperties.isQuickDistinctLimitEnabled;
import static com.facebook.presto.SystemSessionProperties.isSegmentedAggregationEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.operator.aggregation.AggregationUtils.hasSingleNodeExecutionPreference;
Expand Down Expand Up @@ -549,10 +548,6 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo
return planAndEnforceChildren(originalTableWriterNode, singleStream(), defaultParallelism(session));
}

if (!isTableWriterMergeOperatorEnabled(session)) {
return planAndEnforceChildren(originalTableWriterNode, fixedParallelism(), fixedParallelism());
}

Optional<StatisticAggregations.Parts> statisticAggregations = originalTableWriterNode
.getStatisticsAggregation()
.map(aggregations -> splitIntoPartialAndIntermediate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public void testDefaults()
.setPushdownSubfieldsEnabled(false)
.setPushdownSubfieldsFromLambdaEnabled(false)
.setPushdownDereferenceEnabled(false)
.setTableWriterMergeOperatorEnabled(true)
.setIndexLoaderTimeout(new Duration(20, SECONDS))
.setOptimizedRepartitioningEnabled(false)
.setListBuiltInFunctionsOnly(true)
Expand Down Expand Up @@ -349,7 +348,6 @@ public void testExplicitPropertyMappings()
.put("experimental.pushdown-subfields-enabled", "true")
.put("pushdown-subfields-from-lambda-enabled", "true")
.put("experimental.pushdown-dereference-enabled", "true")
.put("experimental.table-writer-merge-operator-enabled", "false")
.put("index-loader-timeout", "10s")
.put("experimental.optimized-repartitioning", "true")
.put("list-built-in-functions-only", "false")
Expand Down Expand Up @@ -542,7 +540,6 @@ public void testExplicitPropertyMappings()
.setPushdownSubfieldsEnabled(true)
.setPushdownSubfieldsFromLambdaEnabled(true)
.setPushdownDereferenceEnabled(true)
.setTableWriterMergeOperatorEnabled(false)
.setIndexLoaderTimeout(new Duration(10, SECONDS))
.setOptimizedRepartitioningEnabled(true)
.setListBuiltInFunctionsOnly(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public static Map<String, String> getNativeWorkerSystemProperties()
// To achieve that, we set inline-sql-functions to false.
.put("inline-sql-functions", "false")
.put("use-alternative-function-signatures", "true")
.put("experimental.table-writer-merge-operator-enabled", "false")
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public void testTableFormats()
{
Session session = Session.builder(getSession())
.setSystemProperty("scale_writers", "true")
.setSystemProperty("table_writer_merge_operator_enabled", "true")
.setSystemProperty("task_writer_count", "1")
.setSystemProperty("task_partitioned_writer_count", "2")
.setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true")
Expand Down Expand Up @@ -441,7 +440,6 @@ private Session buildSessionForTableWrite()
{
return Session.builder(getSession())
.setSystemProperty("scale_writers", "true")
.setSystemProperty("table_writer_merge_operator_enabled", "true")
.setSystemProperty("task_writer_count", "1")
.setSystemProperty("task_partitioned_writer_count", "2")
.setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public void testJsonFileBasedFunction()
public void testAggregationCompanionFunction()
{
Session session = Session.builder(getSession())
.setSystemProperty("table_writer_merge_operator_enabled", "false")
.setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "false")
.setCatalogSessionProperty("hive", "orc_compression_codec", "ZSTD")
.build();
Expand Down

0 comments on commit b7501f9

Please sign in to comment.