diff --git a/assembly/pom.xml b/assembly/pom.xml index 8070f84007f7d..5b164b9ed22de 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../pom.xml diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml index e1c3425f36de6..cbf5e0af8b322 100644 --- a/common/kvstore/pom.xml +++ b/common/kvstore/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 9408b7dea33ba..1211d85f19d49 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 4617eec4073cb..dbb85c555d018 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 1f09e7fa300b5..fd4cd94e5a6bc 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index 8de157814be66..0a98259d2d12e 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/common/tags/pom.xml b/common/tags/pom.xml index bc617b080b390..5e6dd46ea07d3 100644 --- a/common/tags/pom.xml +++ b/common/tags/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml index d41ef5a7afa89..a6161a9c1d9bd 100644 --- a/common/unsafe/pom.xml +++ b/common/unsafe/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/core/pom.xml b/core/pom.xml index b73bfce744d78..078d6e4fc5ce4 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../pom.xml diff --git a/examples/pom.xml b/examples/pom.xml index 19dc828f0a0a3..d79ebc51c71fc 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../pom.xml diff --git a/external/avro/pom.xml b/external/avro/pom.xml index cb6ac88a84758..cf788b08b3e6b 100644 --- a/external/avro/pom.xml +++ b/external/avro/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index e2318ba773019..56a972b71c5a6 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index d08060bb5f13a..a034e5c1ea4e4 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 98f0c701e65cb..cac1a0083afd2 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 7a0a12c44c2c6..8f10fe1f6b641 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml index 43e806322f919..061969b866bd1 100644 --- a/external/kafka-0-10-assembly/pom.xml +++ b/external/kafka-0-10-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 079498333d1d1..841b62a818564 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 15a5ad3bc52a2..d64690e287a3e 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/kafka-0-8-assembly/pom.xml b/external/kafka-0-8-assembly/pom.xml index 5aad1e62f16b8..e76eacce5defb 100644 --- a/external/kafka-0-8-assembly/pom.xml +++ b/external/kafka-0-8-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/kafka-0-8/pom.xml b/external/kafka-0-8/pom.xml index 8f7dc212a1ca9..b82930f6b344c 100644 --- a/external/kafka-0-8/pom.xml +++ b/external/kafka-0-8/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/kinesis-asl-assembly/pom.xml b/external/kinesis-asl-assembly/pom.xml index e6bcdeb3fd535..477a76fc67569 100644 --- a/external/kinesis-asl-assembly/pom.xml +++ b/external/kinesis-asl-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/kinesis-asl/pom.xml b/external/kinesis-asl/pom.xml index 5e5648c964848..2bc6e328db582 100644 --- a/external/kinesis-asl/pom.xml +++ b/external/kinesis-asl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/external/spark-ganglia-lgpl/pom.xml b/external/spark-ganglia-lgpl/pom.xml index 96bea68537ff8..70a4d0818193a 100644 --- a/external/spark-ganglia-lgpl/pom.xml +++ b/external/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index 1db98cf7504ee..4d8eb7bd61501 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../pom.xml diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml index 3f55ad96057a0..dd7758acd45f5 100644 --- a/hadoop-cloud/pom.xml +++ b/hadoop-cloud/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../pom.xml diff --git a/launcher/pom.xml b/launcher/pom.xml index 7a481f00b1d7f..93ffdbbdefe9e 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../pom.xml diff --git a/mllib-local/pom.xml b/mllib-local/pom.xml index 56e444250af13..51f6ff5d816a6 100644 --- a/mllib-local/pom.xml +++ b/mllib-local/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index a29180bb4e09b..0f421ee8745f1 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../pom.xml diff --git a/pom.xml b/pom.xml index 28019d0b7dc16..db10a7dd40f6d 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 pom Spark Project Parent POM http://spark.apache.org/ diff --git a/repl/pom.xml b/repl/pom.xml index ae23855238437..59eff8c7f991c 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../pom.xml diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index a6ea7315ef016..7464d8e424353 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../../pom.xml diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index def232e1b8906..9263f4123fb21 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../../pom.xml diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml index 2a8a448d89208..a70c83af51e87 100644 --- a/resource-managers/mesos/pom.xml +++ b/resource-managers/mesos/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml index b2c58213795ce..a3a56dd9b7ac5 100644 --- a/resource-managers/yarn/pom.xml +++ b/resource-managers/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 4792199c8b209..61431203ee5bc 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/sql/core/pom.xml b/sql/core/pom.xml index dc73c59f3e37c..b52ac97c97742 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index ea4f1592a7c2e..345ab385b959b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.ParquetMetrics import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel @@ -64,6 +65,8 @@ class ParquetFileFormat // here. private val parquetLogRedirector = ParquetLogRedirector.INSTANCE + private val metric = new ParquetMetric + override def shortName(): String = "parquet" override def toString: String = "Parquet" @@ -326,7 +329,9 @@ class ParquetFileFormat val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - + if (!metric.isRegistered()) { + metric.register(sparkSession.sparkContext) + } // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for // a subset of the types (no complex types). @@ -409,7 +414,7 @@ class ParquetFileFormat ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } val taskContext = Option(TaskContext.get()) - if (enableVectorizedReader) { + val iter = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) val iter = new RecordReaderIterator(vectorizedReader) @@ -453,6 +458,36 @@ class ParquetFileFormat .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) } } + val metrics = ParquetMetrics.get() + metric._pageReadHeaderTime.setValue(metrics.getPageReadHeaderTime) + metric._pageReadHeaderCnt.setValue(metrics.getPageReadHeaderCnt) + + metric._footerReadTime.setValue(metrics.getFooterReadTime) + metric._footerReadCnt.setValue(metrics.getFooterReadCnt) + + metric._groupReadTime.setValue(metrics.getGroupReadTime) + metric._groupReadCnt.setValue(metrics.getGroupReadCnt) + + metric._pageReadTime.setValue(metrics.getPageReadTime) + metric._pageReadCnt.setValue(metrics.getPageReadCnt) + + metric._filteredGroupReadTime.setValue(metrics.getFilteredGroupReadTime) + metric._filteredGroupReadCnt.setValue(metrics.getFilteredGroupReadTime) + + metric._pageReadDecompressTime.setValue(metrics.getPageReadDecompressTime) + metric._pageReadUncompressBytes.setValue(metrics.getPageReadUncompressBytes) + metric._pageReadDecompressBytes.setValue(metrics.getPageReadDecompressBytes) + + metric._totalPages.setValue(metrics.getTotalPages) + + metric._colPageIndexReadTime.setValue(metrics.getColPageIndexReadTime) + metric._colPageIndexReadCnt.setValue(metrics.getColPageIndexReadCnt) + + metric._offsetPageIndexReadTime.setValue(metrics.getOffsetPageIndexReadTime) + metric._offsetPageIndexReadCnt.setValue(metrics.getOffsetPageIndexReadCnt) + + metric._totalTime.setValue(metrics.getTotalTime) + iter } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetMetric.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetMetric.scala new file mode 100644 index 0000000000000..3146b7bfedfa6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetMetric.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import scala.collection.mutable + +import org.apache.spark.SparkContext +import org.apache.spark.util.LongAccumulator + +class ParquetMetric extends Serializable { + val _pageReadHeaderTime = new LongAccumulator + val _pageReadHeaderCnt = new LongAccumulator + + val _footerReadTime = new LongAccumulator + val _footerReadCnt = new LongAccumulator + + val _groupReadTime = new LongAccumulator + val _groupReadCnt = new LongAccumulator + + val _pageReadTime = new LongAccumulator + val _pageReadCnt = new LongAccumulator + + val _filteredGroupReadTime = new LongAccumulator + val _filteredGroupReadCnt = new LongAccumulator + + val _pageReadUncompressBytes = new LongAccumulator + val _pageReadDecompressBytes = new LongAccumulator + val _pageReadDecompressTime = new LongAccumulator + + val _totalPages = new LongAccumulator + + val _colPageIndexReadCnt = new LongAccumulator + val _colPageIndexReadTime = new LongAccumulator + + val _offsetPageIndexReadCnt = new LongAccumulator + val _offsetPageIndexReadTime = new LongAccumulator + + val _totalTime = new LongAccumulator + + val nameToAccums: mutable.LinkedHashMap[String, LongAccumulator] = mutable.LinkedHashMap( + ParquetMetric.PARQUET_PAGE_READ_HEADER_TIME -> _pageReadHeaderTime, + ParquetMetric.PARQUET_PAGE_READ_HEADER_CNT -> _pageReadHeaderCnt, + ParquetMetric.PARQUET_FOOTER_READ_TIME -> _footerReadTime, + ParquetMetric.PARQUET_FOOTER_READ_CNT -> _footerReadCnt, + ParquetMetric.PARQUET_GROUP_READ_TIME -> _groupReadTime, + ParquetMetric.PARQUET_GROUP_READ_CNT -> _groupReadCnt, + ParquetMetric.PARQUET_PAGE_READ_TIME -> _pageReadTime, + ParquetMetric.PARQUET_PAGE_READ_CNT -> _pageReadCnt, + ParquetMetric.PARQUET_FILTERED_GROUP_READ_TIME -> _filteredGroupReadTime, + ParquetMetric.PARQUET_FILTERED_GROUP_READ_CNT -> _filteredGroupReadCnt, + ParquetMetric.PARQUET_PAGE_READ_UNCOMPRESS_BYTES -> _pageReadUncompressBytes, + ParquetMetric.PARQUET_PAGE_READ_DECOMPRESS_BYTES -> _pageReadDecompressBytes, + ParquetMetric.PARQUET_PAGE_READ_DECOMPRESS_TIME -> _pageReadDecompressTime, + ParquetMetric.PARQUET_TOTAL_PAGES -> _totalPages, + ParquetMetric.PARQUET_COL_PAGE_INDEX_READ_CNT -> _colPageIndexReadCnt, + ParquetMetric.PARQUET_COL_PAGE_INDEX_READ_TIME -> _colPageIndexReadTime, + ParquetMetric.PARQUET_OFFSET_PAGE_INDEX_READ_CNT -> _offsetPageIndexReadCnt, + ParquetMetric.PARQUET_OFFSET_PAGE_INDEX_READ_TIME -> _offsetPageIndexReadTime, + ParquetMetric.PARQUET_TOTAL_TIME -> _totalTime + ) + + private[spark] def register(sc: SparkContext): Unit = { + nameToAccums.foreach { + case (name, acc) => acc.register(sc, name = Some(name), countFailedValues = true) + } + } + + def isRegistered(): Boolean = { + nameToAccums.values.forall(_.isRegistered) + } +} + +object ParquetMetric { + val PARQUET_PAGE_READ_HEADER_TIME = "parquetPageReadHeaderTime" + val PARQUET_PAGE_READ_HEADER_CNT = "parquetPageReadHeaderCnt" + val PARQUET_FOOTER_READ_TIME = "parquetFooterReadTime" + val PARQUET_FOOTER_READ_CNT = "parquetFooterReadCnt" + val PARQUET_GROUP_READ_TIME = "parquetGroupReadTime" + val PARQUET_GROUP_READ_CNT = "parquetGroupReadCnt" + val PARQUET_PAGE_READ_TIME = "parquetPageReadTime" + val PARQUET_PAGE_READ_CNT = "parquetPageReadCnt" + val PARQUET_FILTERED_GROUP_READ_TIME = "parquetFilteredGroupReadTime" + val PARQUET_FILTERED_GROUP_READ_CNT = "parquetFilteredGroupReadCnt" + val PARQUET_PAGE_READ_UNCOMPRESS_BYTES = "parquetPageReadUncompressBytes" + val PARQUET_PAGE_READ_DECOMPRESS_BYTES = "parquetPageReadDecompressBytes" + val PARQUET_PAGE_READ_DECOMPRESS_TIME = "parquetPageReadDecompressTime" + val PARQUET_TOTAL_PAGES = "parquetTotalPages" + val PARQUET_COL_PAGE_INDEX_READ_CNT = "parquetColPageIndexReadCnt" + val PARQUET_COL_PAGE_INDEX_READ_TIME = "parquetColPageIndexReadTime" + val PARQUET_OFFSET_PAGE_INDEX_READ_CNT = "parquetOffsetPageIndexReadCnt" + val PARQUET_OFFSET_PAGE_INDEX_READ_TIME = "parquetOffsetPageIndexReadTime" + val PARQUET_TOTAL_TIME = "parquetTotalTime" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetMetricAccumulator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetMetricAccumulator.scala new file mode 100644 index 0000000000000..2482df7812996 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetMetricAccumulator.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.util.AccumulatorV2 + +class ParquetMetricAccumulator extends AccumulatorV2[String, String] { + + private var res = "" + + /** + * Returns if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero + * value; for a list accumulator, Nil is zero value. + */ + override def isZero: Boolean = res.isEmpty + + /** + * Creates a new copy of this accumulator. + */ + override def copy(): AccumulatorV2[String, String] = { + val newAccu = new ParquetMetricAccumulator + newAccu.res = this.res + newAccu + } + + /** + * Resets this accumulator, which is zero value. i.e. call `isZero` must + * return true. + */ + override def reset(): Unit = { + res = "" + } + + /** + * Takes the inputs and accumulates. + */ + override def add(v: String): Unit = { + throw new UnsupportedOperationException + } + + def setValue(newValue: String): Unit = { + this.res = newValue + } + + + /** + * Merges another same-type accumulator into this one and update its state, i.e. this should be + * merge-in-place. + */ + override def merge(other: AccumulatorV2[String, String]): Unit = { + throw new UnsupportedOperationException + } + + /** + * Defines the current value of this accumulator + */ + override def value: String = res +} diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index e9d0341e4c50b..d6da5cd7649fb 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 9f4b516f7a57a..ea43a0a5c7682 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index 4e12d72e3beb0..3ec97d7cc0f39 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index b7d905c07e35b..6cec68bf83b60 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.11 - 2.4.1-kylin-r20 + 2.4.1-kylin-r22 ../pom.xml