Welcome to the documentation of the Qbeast-Spark project.
In these sections you will find guides to better understand the technology behind an open format and be able to play with it in just a few lines of code.
Here's a summary of the topics covered in this document:
- Pre-requisites
- Launch a Spark Shell
- Set up an Application
- Creating a Table
- Append
- Read
- QbeastTable API
- Optimize
- Deletes
- Cloud Providers Setup
- Index Visualizer
- Dependencies and Version Compatibility
You can run the qbeast-spark library in two different ways:
- Interactively: Start the Spark shell (Scala or Python) with Qbeast Spark and Delta Lake and run the code snippets.
- As a Project: Set up a Maven or SBT project (Scala or Java), copy the code snippets into a source file, and run the project.
Before starting, ensure you have the following:
- Java 8+: Ensure that Java is installed and properly configured.
- SBT/Gradle/Maven: This is for managing dependencies if running in a development environment.
- Apache Spark 3.5+: A Spark installation with support for Scala 2.12
As mentioned in the official Apache Spark installation instructions here, make sure you have a valid Java version installed (8, 11, or 17) and that Java is configured correctly on your system using either the system PATH
or JAVA_HOME
environmental variable.
Windows users should follow the instructions in this blog, making sure to use the correct version of Apache Spark that is compatible with the latest versions of Delta Lake (3.2.0) and Qbeast Spark (0.7.0).
Download a compatible version of Apache Spark with Hadoop, and create the SPARK_HOME
environment variable:
ℹ️ Note: You can use Hadoop 2.7 if desired, but you could have some troubles with different cloud providers' storage, read more about it here.
wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
tar -xzvf spark-3.5.0-bin-hadoop3.tgz
export SPARK_HOME=$PWD/spark-3.5.0-bin-hadoop3
For running the code interactively, it's necessary to open a pyspark
session in Python or a spark-shell
Scala.
ℹ️ Warning: Different cloud providers may require specific versions of Spark or Hadoop, or specific libraries. Refer here to check compatibilities.
Install a pyspark
version that is compatible with the latest version of Qbeast Spark:
pip install pyspark==<compatible-spark-version>
Run pyspark
shell:
pyspark --packages io.qbeast:qbeast-spark_2.12:0.7.0,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.QbeastCatalog
Run a spark-shell
from the binaries:
$SPARK_HOME/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.7.0,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.QbeastCatalog
$SPARK_HOME/bin/spark-sql \
--packages io.qbeast:qbeast-spark_2.12:0.7.0,io.delta:delta-spark_2.12:3.1.0 \
--conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.catalog.QbeastCatalog
Spark Configuration can help improving writing and reading performance. Here are a few configuration for qbeast.
Configuration | Definition | Default |
---|---|---|
spark.qbeast.index.defaultCubeSize |
Default cube size for all datasets written in the session. | 5000000 |
spark.qbeast.index.cubeDomainsBufferCapacity |
Default buffer capacity for intermediate results. | 100000 |
spark.qbeast.index.columnsToIndex.auto |
Automatically select columns to index. | false |
spark.qbeast.index.columnsToIndex.auto.max |
Maximum number of columns to index automatically. | 10 |
spark.qbeast.index.numberOfRetries |
Number of retries for writing data. | 2 |
Consult the Qbeast-Spark advanced configuration for more information.
You can use the following Maven coordinates to build a project using Qbeast Spark binaries from Maven Central Repository.
You include Qbeast Spark in your Maven project by adding it as a dependency in your POM
file.
<dependency>
<groupId>io.qbeast</groupId>
<artifactId>qbeast-spark_2.12</artifactId>
<version>0.7.0</version>
</dependency>
You include Qbeast Spark in your SBT project by adding the following line to your build.sbt
file:
libraryDependencies += "io.qbeast" %% "qbeast-spark" % "0.7.0"
To use a SNAPSHOT
(NOT RECOMMENDED), add the Snapshots URL to the list of repositories:
ThisBuild / resolvers += "Sonatype OSS Snapshots" at "https://s01.oss.sonatype.org/content/repositories/snapshots"
To set up a Python project (for example, for unit testing), you can configure the SparkSession
with the following options.
import pyspark
# Already configured session
spark = pyspark.sql.SparkSession.builder.appName("MyApp").getOrCreate()
# Session with Configuration
pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.qbeast.sql.QbeastSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "io.qbeast.catalog.QbeastCatalog").getOrCreate()
You can create a Table using Qbeast Layout with plain Spark APIs.
data = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], "id: int, age:string")
data.write.mode("overwrite").option("columnsToIndex", "id,age").saveAsTable("qbeast_table")
val data = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "age")
data.write.mode("overwrite").option("columnsToIndex", "id,age").saveAsTable("qbeast_table")
CREATE TABLE qbeast_table (id INT, age STRING)
USING qbeast
OPTIONS ('columnsToIndex'='id,age');
It is possible to specify the location through the SQL:
CREATE TABLE qbeast_table (id INT, age STRING)
USING qbeast
LOCATION '/tmp/qbeast_table'
OPTIONS ('columnsToIndex'='id,age');
There are different options to fine-tune the underlying index.
Option | Definition | Example |
---|---|---|
columnsToIndex |
Indicates the columns in the DataFrame used to index. We recommend selecting the variables more commonly queried to maximize the layout efficiency. | “id,age” |
cubeSize |
Maximum amount of elements a cube should contain. Default is 5 million. It is a soft limit, which means that it can be exceeded. It is considered a bad sign that final sizes of the cubes duplicate or triplicate the cubeSize . |
1000 |
columnStats |
Min and maximum values of the columns to index in JSON string. The space is computed at writing time, but if you know the stats in advance, it would skip that step and provide a more relevant index for your data. | """{"a_min":0,"a_max":10,"b_min":20.0,"b_max":70.0}""" |
Append data to a path using DataFrame API in “append” mode, or SQL Insert Into clause.
append = spark.createDataFrame([(4, "d"), (5, "e")], "id: int, age:string")
# Save
append.write.\
mode("append").\
insertInto("qbeast_table")
append = spark.createDataFrame([(4, "d"), (5, "e")], "id: int, age:string")
# Save
append.write.\
mode("append").\
option("columnsToIndex", "id,age").\
format("qbeast").\
save("/tmp/qbeast_table")
val append = Seq((4, "d"), (5, "e")).toDF("id", "age")
// Save
append.write.
mode("append").
insertInto("qbeast_table")
val append = Seq((4, "d"), (5, "e")).toDF("id", "age")
// Save
append.write.
mode("append").
option("columnsToIndex", "id,age").
format("qbeast").
save("/tmp/qbeast_table")
Use INSERT INTO
to add records to the new table. It will update the index in a dynamic fashion when new data is inserted.
INSERT INTO table qbeast_table VALUES (4, "d"), (5, "e");
Read data from a Qbeast Table by specifying the paths or the table name.
qbeast_df = spark.read.format("qbeast").load("/tmp/qbeast_table")
val qbeastDF = spark.read.format("qbeast").load("/tmp/qbeast_table")
SELECT * FROM qbeast_table;
Sampling is the process of selecting a subset of data from a larger dataset to analyze and make inferences. It is beneficial because it reduces computational costs, speeds up analysis, and simplifies data handling while still providing accurate and reliable insights if the sample is representative.
Thanks to the Qbeast Metadata, it is possible to use the sample
and TABLESAMPLE
(in SQL) methods to select a fraction of the data directly from storage instead of loading and computing the results in memory with all the records.
qbeast_df.sample(0.3).show()
qbeast_df.sample(0.3).show()
SELECT * FROM qbeast_table TABLESAMPLE (30 PERCENT);
To check sampling perfomance, open your Spark Web UI, and observe how the sample operator is converted into a filter and pushed down to the source!
qbeastDf.sample(0.3).explain()
== Physical Plan ==
*(1) Filter ((qbeast_hash(ss_cdemo_sk#1091, ss_cdemo_sk#1091, 42) < -1717986918) AND (qbeast_hash(ss_cdemo_sk#1091, ss_cdemo_sk#1091, 42) >= -2147483648))
+- *(1) ColumnarToRow
+- FileScan parquet [ss_sold_time_sk#1088,ss_item_sk#1089,ss_customer_sk#1090,ss_cdemo_sk#1091,ss_hdemo_sk#1092,ss_addr_sk#1093,ss_store_sk#1094,ss_promo_sk#1095,ss_ticket_number#1096L,ss_quantity#1097,ss_wholesale_cost#1098,ss_list_price#1099,ss_sales_price#1100,ss_ext_discount_amt#1101,ss_ext_sales_price#1102,ss_ext_wholesale_cost#1103,ss_ext_list_price#1104,ss_ext_tax#1105,ss_coupon_amt#1106,ss_net_paid#1107,ss_net_paid_inc_tax#1108,ss_net_profit#1109,ss_sold_date_sk#1110] Batched: true, DataFilters: [(qbeast_hash(ss_cdemo_sk#1091, ss_cdemo_sk#1091, 42) < -1717986918), (qbeast_hash(ss_cdemo_sk#10..., Format: Parquet, Location: OTreeIndex[file:/tmp/qbeast-test-data/qtable], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ss_sold_time_sk:int,ss_item_sk:int,ss_customer_sk:int,ss_cdemo_sk:int,ss_hdemo_sk:int,ss_a...
Notice that the sample operator is no longer present in the physical plan. It's converted into a Filter (qbeast_hash)
instead and is used to select files during data scanning(DataFilters
from FileScan
). We skip reading many files in this way, involving less I/O.
Get insights into the data using the QbeastTable
interface available in Scala.
import io.qbeast.table.QbeastTable
val qbeastTable = QbeastTable.forPath(spark, "/tmp/qbeast_table")
qbeastTable.getIndexMetrics()
Method | Definition |
---|---|
revisionIDs(): List[Long] | Returns a list with all the Revision ID’s present in the Table. |
latestRevision(): Revision | Returns the Latest Revision available in the Table. |
latestRevisionID(): Long | Returns the Latest Revision ID available in the Table. |
revision(revisionID: Option[Int]): Revision | Returns the Revision information of a particular Revision ID (if specified) or the latest one |
indexedColumns(revisionID: Option[Int]): Seq[String] | Returns the indexed column names of a particular Revision ID (if specified) or the latest one. |
cubeSize(revisionID: Option[Int]) | Returns the Cube Size of a particular Revision ID (if specified) or the latest one. |
getDenormalizedBlocks(revisionID: Option[Int]) | Get the Denormalized Blocks information of all files of a particular Revision ID (if specified) or the latest one. |
getIndexMetrics(revisionID: Option[Int]) | Output the IndexMetrics information a particular Revision ID (if specified) or the latest one available. It is useful to know the state of the different levels of the index and the respective cube sizes. |
Optimize is an expensive operation that consists of rewriting part of the files to accomplish a better layout and improve query performance.
To minimize the write amplification of this command, we execute it based on subsets of the table, like Revision ID's
or specific files.
Read more about Revision and find an example here.
These are the 3 ways of executing the optimize
operation:
// Optimizes the last Revision Available.
// This does NOT include previous Revision's optimizations.
qbeastTable.optimize()
// Optimizes the Revision number 2.
qbeastTable.optimize(2L)
// Optimizes the specific file
qbeastTable.optimize(Seq("file1", "file2"))
If you want to optimize the full table, you must loop through revisions
:
// 1. Get all the Revision ID's available in the table.
val revisions = qbeastTable.revisionsIDs()
// 2. For each revision, call the Optimize method
revisions.foreach(revision =>
qbeastTable.optimize(revision)
)
Go to QbeastTable documentation for more detailed information.
WARNING: Data can be removed from a Qbeast table with Delta Lake API, but, as currently constructed, it will leave the index in an inconsistent state. See issue #327.
You can delete rows from a table using the DeltaTable
API; then the table should only be read using delta
.
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/qbeast/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
Specify the tolerance willing to accept and let qbeast handle the calculation for the optimal fraction size to use.
import io.qbeast.spark.implicits._
qbeastDf.agg(avg("user_id")).tolerance(0.1).show()
For setting up writes and reads on Amazon S3 service, it is possible to use both private and public repositories.
🚧 Amazon Web Services S3 does not work with Hadoop 2.7. For this provider, you'll need Hadoop 3.2.
-
If you are using a public bucket:
$SPARK_HOME/bin/spark-shell \ --conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \ --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \ --packages io.qbeast:qbeast-spark_2.12:0.7.0,\ io.delta:delta-spark_2.12:3.1.0,\ com.amazonaws:aws-java-sdk:1.12.20,\ org.apache.hadoop:hadoop-common:3.2.0,\ org.apache.hadoop:hadoop-client:3.2.0,\ org.apache.hadoop:hadoop-aws:3.2.0
-
If you are using private buckets:
$SPARK_HOME/bin/spark-shell \ --conf spark.sql.extensions=io.qbeast.sql.QbeastSparkSessionExtension \ --conf spark.hadoop.fs.s3a.access.key=${AWS_ACCESS_KEY_ID} \ --conf spark.hadoop.fs.s3a.secret.key=${AWS_SECRET_ACCESS_KEY}\ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --packages io.qbeast:qbeast-spark_2.12:0.7.0,\ io.delta:delta-spark_2.12:3.1.0,\ com.amazonaws:aws-java-sdk:1.12.20,\ org.apache.hadoop:hadoop-common:3.2.0,\ org.apache.hadoop:hadoop-client:3.2.0,\ org.apache.hadoop:hadoop-aws:3.2.0
Google has several services related to Qbeast including Cloud Storage, BigQuery, BigLake, and DataProc — perhaps more, this is the minimum for use with BigQuery using external tables.
-
Install the GCS Cloud Storage connector for Hadoop v3.
-
Provision one or more GCS Buckets.
-
Navigate to Google DataProc, select
Metastore Services -> Metastore
in the sidebar. ClickCreate
, and configure the metastore config overrides, setting:hive.metastore.warehouse.dir: gs://<bucket name>/<nested path>/hive-warehouse
. The nested path is optional. -
Selecting an existing Spark node (in GCE or GKE), and modify its properties to enable Google Cloud & Qbeast configurations
# Configure the Spark worker to use the Qbeast formatter library spark.sql.extensions io.qbeast.sql.QbeastSparkSessionExtension spark.sql.catalog.spark_catalog io.qbeast.catalog.QbeastCatalog
-
Create a schema in BigQuery Studio in the same region than the GC bucket.
-
Create an external connection with
*connection
type* ofApache Spark
, and configure to point to the DataProc metastore described in step #3. -
Create an external connection for BiqQuery to address the Cloud Storage
- Click
Add
, SelectConnections to external data sources
, selectVertex AI remote models, remote functions and BigLake (Cloud Resource)
, choose a connection ID, and select the region used for the GCS Bucket. - Select the external connection created (matching the name) in the left sidebar, and copy the Service Account ID. Assign this service account ID permissions to the GCS Bucket by navigating to the bucket in Cloud Storage,
Grant Access
, entering the BQ Service Account as the principal, and assigning aStorage Admin
role (to be refined later).
- Click
-
Create an external table within BigQuery targeting the Qbeast formatted table (using Delta Lake connector).
CREATE EXTERNAL TABLE `<project>.<schema>.<table name>` WITH CONNECTION `<connection id>` OPTIONS (format ="DELTA_LAKE", uris=['<bucket location>']);
Use Python index visualizer for your indexed table to visually examine the index structure and gather sampling metrics.
Version | Spark | Hadoop | Delta Lake |
---|---|---|---|
0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
0.3.x | 3.2.x | 3.3.x | 1.2.x |
0.4.x | 3.3.x | 3.3.x | 2.1.x |
0.5.x | 3.4.x | 3.3.x | 2.4.x |
0.6.x | 3.5.x | 3.3.x | 3.1.x |
0.7.x | 3.5.x | 3.3.x | 3.1.x |
Check here for Delta Lake and Apache Spark version compatibility.