There's different configurations for the index that can affect the performance on read or the writing process. Here is a resume of some of them.
We designed the QbeastCatalog
to work as an entry point for other format's Catalog's as well.
However, you can also handle different Catalogs simultaneously.
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.QbeastCatalog
Using the spark_catalog
configuration, you can write qbeast and delta ( or upcoming formats ;) ) tables into the default
namespace.
df.write
.format("qbeast")
.option("columnsToIndex", "user_id,product_id")
.saveAsTable("qbeast_table")
df.write
.format("delta")
.saveAsTable("delta_table")
For using more than one Catalog in the same session, you can set it up in a different space.
--conf spark.sql.catalog.spark_catalog = org.apache.spark.sql.delta.catalog.DeltaCatalog \
--conf spark.sql.catalog.qbeast_catalog=io.qbeast.catalog.QbeastCatalog
Notice the QbeastCatalog
conf parameter is not anymore spark_catalog
, but has a customized name like qbeast_catalog
. Each table written using the qbeast implementation, should have the prefix qbeast_catalog
.
For example:
// DataFrame API
df.write
.format("qbeast")
.option("columnsToIndex", "user_id,product_id")
.saveAsTable("qbeast_catalog.default.qbeast_table")
// SQL
spark.sql("CREATE TABLE qbeast_catalog.default.qbeast_table USING qbeast AS SELECT * FROM ecommerce")
These are the columns you want to index. Try to find those which are interesting for your queries, or your data pipelines.
You can specify different advanced options to the columns to index:
- Type: The type of the index you want to create in that column. Can be linear (numeric) or hash (string). By default, it would use the type of data.
df.write.format("qbeast").option("columnsToIndex", "column:type,column2:type...")
To avoid specifying the columnsToIndex
, you can enable auto indexer through the Spark Configuration:
--conf spark.qbeast.index.columnsToIndex.auto=true \
--conf spark.qbeast.index.columnsToIndex.auto.max=10
And write the DataFrame without any extra option:
df.write.format("qbeast").save("path/to/table")
Read more about it in the Columns to Index selector section.
CubeSize option lets you specify the maximum size of the cube, in number of records. By default, it's set to 5M.
df.write.format("qbeast").option("cubeSize", "10000")
One feature of the Qbeast Format is the Revision
.
This Revision
contains some characteristics of the index, such as columns to index or cube size. But it also saves the info of the space that you are writing (min and maximum values of the columns).
This space is computed based on the dataset that is currently being indexed, and if you append records that fall outside this space will trigger another Revision
.
Having many Revision
could be painful for reading process, since we have to query each one separately.
To avoid that, you can tune the space dimensions by adding the columnStats
option.
df.write.format("qbeast")
.option("columnsToIndex", "a,b")
.option("columnStats","""{"a_min":0,"a_max":10,"b_min":20.0,"b_max":70.0}""")
.save("/tmp/table")
In a JSON
string, you can pass the minimum and maximum values of the columns you are indexing with the following schema:
{
"columnName_min" : value
"columnName_max" : value
}
These options are used to make the writes idempotent.
The option txnAppId
identifies an application writing data to the table. It is
the responsibility of the user to assign unique identifiers to the applications
writing data to the table.
The option txnVersion
identifies the transaction issued by the application.
The value of this option must be a valid string representation of a positive
long number.
df.write.format("qbeast")
.option("columnsToIndex", "a")
.option("txnAppId", "ingestionService")
.option("txnVersion", "1")
If the table already contains the data written by some other transaction with
the same txnAppId
and txnVersion
then the requested write will be ignored.
// The data is written
df.write.format("qbeast")
.option("columnsToIndex", "a")
.option("txnAppId", "ingestionService")
.option("txnVersion", "1")
...
// The data is ignored
df.write.format("qbeast")
.mode("append")
.option("txnAppId", "ingestionService")
.option("txnVersion", "1")
For indexing Timestamps
or Dates
with columnStats
(min and maximum ranges), notice that the values need to be formatted in a proper way (following "yyyy-MM-dd HH:mm:ss.SSSSSS'Z'"
pattern) for Qbeast to be able to parse it.
Here's a snippet that would help you to codify the dates:
val minTimestamp = df.selectExpr("min(date)").first().getTimestamp(0)
val maxTimestamp = df.selectExpr("max(date)").first().getTimestamp(0)
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS'Z'")
val columnStats =
s"""{ "date_min":"${formatter.format(minTimestamp)}",
|"date_max":"${formatter.format(maxTimestamp)}" }""".stripMargin
WARNING: This is an Experimental Feature, and the API might change in the near future.
The default column transformation for Strings (HashTransformation
) has limited range query supports since the lexicographic ordering of the String values are not preserved. On the numeric side, the default transformation is LinearTransformation
, which is a simple linear transformation that preserves the ordering of the values.
This can be addressed by introducing a custom Quantile Based sequence in the form of sorted Seq
, and can lead to several improvements including:
- more efficient file-pruning because of its reduced file-level column min/max
- support for range queries on String columns
- improved overall query speed
The following code snippet demonstrates the extraction of a Quantile-based CDF from the source data:
import io.qbeast.utils.QbeastUtils
val columnQuantiles = QbeastUtils.computeQuantilesForColumn(df, "brand")
val columnStats = s"""{"brand_quantiles":$columnQuantiles}"""
(df
.write
.mode("overwrite")
.format("qbeast")
.option("columnsToIndex", "brand:quantiles")
.option("columnStats", columnStats)
.save("/tmp/qbeast_table_quantiles"))
This is only necessary for the first write, if not otherwise made explicit, all subsequent appends will reuse the same quantile calculation.
Any new custom quantiles provided during appends
forces the creation of a new Revision
.
The computeQuantilesForColumn
method computes the quantiles for the specified column and returns a Seq
of quantile values. The Seq
is then serialized into a String
and passed as a custom column transformation to the columnsToIndex
option.
You can tune the number of quantiles and the relative error for numeric columns using the QbeastUtils API.
val columnQuantiles =
QbeastUtils.computeQuantilesForColumn(df = df, columnName = columnName)
val columnQuantilesNumberOfQuantiles =
QbeastUtils.computeQuantilesForColumn(df = df, columnName = columnName, numberOfQuantiles = 100)
// For numeric columns, you can also specify the relative error
// For String columns, the relativeError is ignored
val columnQuantilesRelativeError =
QbeastUtils.computeQuantilesForColumn(df = df, columnName = columnName, relativeError = 0.3)
val columnQuantilesNumAndError =
QbeastUtils.computeQuantilesForColumn(df = df, columnName = columnName, numberOfQuantiles = 100, relativeError = 0.3)
If you don't specify the cubeSize at DataFrame level, the default value is used. This is set to 5M, so if you want to change it for testing or production purposes, you can do it through Spark Configuration:
--conf spark.qbeast.index.defaultCubeSize=100000
The current indexing algorithm uses a greedy approach to estimate the data distribution without additional shuffling.
Still, there's a tradeoff between the goodness of such estimation and the memory required during the computation.
The cubeWeightsBufferCapacity property controls such tradeoff by defining the maximum number of elements stored in
a memory buffer when indexing. It basically follows the next formula, which you can see in the method
estimateGroupCubeSize()
from io.qbeast.core.model.CubeWeights.scala
:
numGroups = MAX(numPartitions, (numElements / cubeWeightsBufferCapacity))
groupCubeSize = desiredCubeSize / numGroups
As you can infer from the formula, the number of working groups used when scanning the dataset influences the quality of the data distribution. A lower number of groups will result in a higher index precision, while having more groups and fewer elements per group will lead to worse indexes.
You can change this number through the Spark Configuration:
--conf spark.qbeast.index.cubeWeightsBufferCapacity=10000
You can change the number of retries for the LocalKeeper in order to test it.
--conf spark.qbeast.index.numberOfRetries=10000
You can set up the SparkSession
with a data staging area for all your Qbeast table writes.
A staging area is where you can put the data you don't yet want to index but still want to be available for your queries. To activate staging, set the following configuration to a non-negative value.
--conf spark.qbeast.index.stagingSizeInBytes=1000000000
When the staging area is not full, all writes are staged without indexing(written in delta
).
When the staging size reaches the defined value, the current data is merged with the staged data and written at once.
The feature can be helpful when your workflow does frequent small appends. Setting up a staging area makes sure that all index appends are at least of the staging size.
We can empty the staging area with a given write by setting the staging size to 0
:
--conf spark.qbeast.index.stagingSizeInBytes=0
Pre-commit hooks enable the execution of custom code just before a write or optimization is committed.
To implement such hooks, extend io.qbeast.spark.delta.hook.PreCommitHook
by implementing its run
method, which has access to the sequence of Action
s created by the operation.
The same method returns a Map[String, String],
which will be used as tags
for the transaction's CommitInfo
:
{
"commitInfo": {
"timestamp": 1718787341410,
"operation": "WRITE",
...
"tags": {
"HookOutputKey": "HookOutputValue"
},
...
}
}
- You can use more than one hook, as shown in the case below:
myHook1
, andmyHook2.
- For each hook you want to use, provide their class names with the option name:
qbeastPreCommitHook.<custom-hook-name>.
- Add an option with the name
qbeastPreCommitHook.<custom-hook-name>.arg
for the ones that take initiation arguments. Currently, only oneString
argument is allowed for each hook.
// Hooks for Writes
df
.write
.format("qbeast")
.option("qbeastPreCommitHook.myHook1", classOf[SimpleHook].getCanonicalName)
.option("qbeastPreCommitHook.myHook2", classOf[StatefulHook].getCanonicalName)
.option("qbeastPreCommitHook.myHook2.arg", myStringHookArg)
.save(pathToTable)
// Hooks for Optimizations
import io.qbeast.table.QbeastTable
val qt = QbeastTable.forPath(spark, tablePath)
val options = Map(
"qbeastPreCommitHook.myHook1" -> classOf[SimpleHook].getCanonicalName,
"qbeastPreCommitHook.myHook2" -> classOf[StatefulHook].getCanonicalName,
"qbeastPreCommitHook.myHook2.arg" -> "myStringHookArg"
)
qt.optimize(filesToOptimize, options)
In addition to the IndexMetrics class, we provide handy access to the low-level details of the Qbeast index through the
QbeastTable.forTable(sparkSession, tablePath) methods that returns a Dataset[DenormalizedBlock] which
contains all indexed metadata in an easy-to-analyze format.
import io.qbeast.table.QbeastTable
val qt = QbeastTable.forPath(spark, tablePath)
val dnb = qt.getDenormalizedBlocks()
dnb.select("filePath").distinct.count() // number of files
dnb.count() // number of blocks
dnb.groupBy("filePath").count().orderBy(col("count").desc).show() // Show the files with the most blocks
dnb.groupBy("cubeId").count().orderBy(col("count").desc).show() // Show the cubeId with the most blocks