Skip to content

Commit

Permalink
ORC: Grow list and map child vectors with a growth factor of 3 (#2218) (
Browse files Browse the repository at this point in the history
  • Loading branch information
770120041 authored Jun 30, 2023
1 parent 3e20679 commit ca4f26b
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ public void nonNullWrite(int rowId, List<T> value, ColumnVector output) {
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough
cv.child.ensureSize(cv.childCount, true);
growColumnVector(cv.child, cv.childCount);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
element.write((int) (e + cv.offsets[rowId]), value.get(e), cv.child);
Expand Down Expand Up @@ -466,8 +466,8 @@ public void nonNullWrite(int rowId, Map<K, V> map, ColumnVector output) {
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough
cv.keys.ensureSize(cv.childCount, true);
cv.values.ensureSize(cv.childCount, true);
growColumnVector(cv.keys, cv.childCount);
growColumnVector(cv.values, cv.childCount);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
int pos = (int) (e + cv.offsets[rowId]);
Expand All @@ -476,4 +476,11 @@ public void nonNullWrite(int rowId, Map<K, V> map, ColumnVector output) {
}
}
}

private static void growColumnVector(ColumnVector cv, int requestedSize) {
if (cv.isNull.length < requestedSize) {
// Use growth factor of 3 to avoid frequent array allocations
cv.ensureSize(requestedSize * 3, true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void nonNullWrite(int rowId, ArrayData data, ColumnVector output) {
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough.
cv.child.ensureSize(cv.childCount, true);
growColumnVector(cv.child, cv.childCount);

for (int e = 0; e < cv.lengths[rowId]; ++e) {
Object value = elementGetter.getElementOrNull(data, e);
Expand Down Expand Up @@ -287,8 +287,8 @@ public void nonNullWrite(int rowId, MapData data, ColumnVector output) {
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough
cv.keys.ensureSize(cv.childCount, true);
cv.values.ensureSize(cv.childCount, true);
growColumnVector(cv.keys, cv.childCount);
growColumnVector(cv.values, cv.childCount);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
int pos = (int) (e + cv.offsets[rowId]);
Expand Down Expand Up @@ -330,4 +330,11 @@ public void nonNullWrite(int rowId, RowData data, ColumnVector output) {
}
}
}

private static void growColumnVector(ColumnVector cv, int requestedSize) {
if (cv.isNull.length < requestedSize) {
// Use growth factor of 3 to avoid frequent array allocations
cv.ensureSize(requestedSize * 3, true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough
cv.child.ensureSize(cv.childCount, true);
growColumnVector(cv.child, cv.childCount);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
writer.write((int) (e + cv.offsets[rowId]), e, value, cv.child);
Expand Down Expand Up @@ -266,8 +266,8 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV
cv.offsets[rowId] = cv.childCount;
cv.childCount = (int) (cv.childCount + cv.lengths[rowId]);
// make sure the child is big enough
cv.keys.ensureSize(cv.childCount, true);
cv.values.ensureSize(cv.childCount, true);
growColumnVector(cv.keys, cv.childCount);
growColumnVector(cv.values, cv.childCount);
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
int pos = (int) (e + cv.offsets[rowId]);
Expand All @@ -276,4 +276,11 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV
}
}
}

private static void growColumnVector(ColumnVector cv, int requestedSize) {
if (cv.isNull.length < requestedSize) {
// Use growth factor of 3 to avoid frequent array allocations
cv.ensureSize(requestedSize * 3, true);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.source;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

public abstract class IcebergSourceNestedListDataBenchmark extends IcebergSourceBenchmark {

@Override
protected Configuration initHadoopConf() {
return new Configuration();
}

@Override
protected final Table initTable() {
Schema schema = new Schema(
required(0, "id", Types.LongType.get()),
optional(1, "outerlist", Types.ListType.ofOptional(2,
Types.StructType.of(required(3, "innerlist", Types.ListType.ofRequired(4, Types.StringType.get())))
))
);
PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
HadoopTables tables = new HadoopTables(hadoopConf());
Map<String, String> properties = Maps.newHashMap();
properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
return tables.create(schema, partitionSpec, properties, newTableLocation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.source.orc;

import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.source.IcebergSourceNestedListDataBenchmark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;

import static org.apache.spark.sql.functions.array_repeat;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.struct;

/**
* A benchmark that evaluates the performance of writing nested Parquet data using Iceberg
* and the built-in file source in Spark.
*
* To run this benchmark:
* <code>
* ./gradlew :iceberg-spark2:jmh
* -PjmhIncludeRegex=IcebergSourceNestedListORCDataWriteBenchmark
* -PjmhOutputPath=benchmark/iceberg-source-nested-list-orc-data-write-benchmark-result.txt
* </code>
*/
public class IcebergSourceNestedListORCDataWriteBenchmark extends IcebergSourceNestedListDataBenchmark {

@Setup
public void setupBenchmark() {
setupSpark();
}

@TearDown
public void tearDownBenchmark() throws IOException {
tearDownSpark();
cleanupFiles();
}

@Param({"2000", "20000"})
private int numRows;

@Benchmark
@Threads(1)
public void writeIceberg() {
String tableLocation = table().location();
benchmarkData().write().format("iceberg").option("write-format", "orc")
.mode(SaveMode.Append).save(tableLocation);
}

@Benchmark
@Threads(1)
public void writeIcebergDictionaryOff() {
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put("orc.dictionary.key.threshold", "0");
withTableProperties(tableProperties, () -> {
String tableLocation = table().location();
benchmarkData().write().format("iceberg").option("write-format", "orc")
.mode(SaveMode.Append).save(tableLocation);
});
}

@Benchmark
@Threads(1)
public void writeFileSource() {
benchmarkData().write().mode(SaveMode.Append).orc(dataLocation());
}

private Dataset<Row> benchmarkData() {
return spark().range(numRows)
.withColumn("outerlist", array_repeat(struct(
expr("array_repeat(CAST(id AS string), 1000) AS innerlist")),
10))
.coalesce(1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.source.parquet;

import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.source.IcebergSourceNestedListDataBenchmark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.internal.SQLConf;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;

import static org.apache.spark.sql.functions.array_repeat;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.struct;

/**
* A benchmark that evaluates the performance of writing nested Parquet data using Iceberg
* and the built-in file source in Spark.
*
* To run this benchmark:
* <code>
* ./gradlew :iceberg-spark2:jmh
* -PjmhIncludeRegex=IcebergSourceNestedListParquetDataWriteBenchmark
* -PjmhOutputPath=benchmark/iceberg-source-nested-list-parquet-data-write-benchmark-result.txt
* </code>
*/
public class IcebergSourceNestedListParquetDataWriteBenchmark extends IcebergSourceNestedListDataBenchmark {

@Setup
public void setupBenchmark() {
setupSpark();
}

@TearDown
public void tearDownBenchmark() throws IOException {
tearDownSpark();
cleanupFiles();
}

@Param({"2000", "20000"})
private int numRows;

@Benchmark
@Threads(1)
public void writeIceberg() {
String tableLocation = table().location();
benchmarkData().write().format("iceberg").mode(SaveMode.Append).save(tableLocation);
}

@Benchmark
@Threads(1)
public void writeFileSource() {
Map<String, String> conf = Maps.newHashMap();
conf.put(SQLConf.PARQUET_COMPRESSION().key(), "gzip");
withSQLConf(conf, () -> benchmarkData().write().mode(SaveMode.Append).parquet(dataLocation()));
}

private Dataset<Row> benchmarkData() {
return spark().range(numRows)
.withColumn("outerlist", array_repeat(struct(
expr("array_repeat(CAST(id AS string), 1000) AS innerlist")),
10))
.coalesce(1);
}
}

0 comments on commit ca4f26b

Please sign in to comment.