diff --git a/docs/content/append-table/append-table.md b/docs/content/append-table/append-table.md deleted file mode 100644 index 2eba1f44f5a8..000000000000 --- a/docs/content/append-table/append-table.md +++ /dev/null @@ -1,102 +0,0 @@ ---- -title: "Append Table" -weight: 1 -type: docs -aliases: -- /append-table/append-table.html ---- - - -# Append Table - -If a table does not have a primary key defined, it is an append table by default. - -You can only insert a complete record into the table in streaming. This type of table is suitable for use cases that -do not require streaming updates (such as log data synchronization). - -{{< tabs "create-append-table" >}} -{{< tab "Flink" >}} -```sql -CREATE TABLE my_table ( - product_id BIGINT, - price DOUBLE, - sales BIGINT -) WITH ( - 'file.compression.zstd-level' = '3' -); -``` -{{< /tab >}} -{{< /tabs >}} - -## Automatic small file merging - -In streaming writing job, without bucket definition, there is no compaction in writer, instead, will use -`Compact Coordinator` to scan the small files and pass compaction task to `Compact Worker`. In streaming mode, if you -run insert sql in flink, the topology will be like this: - -{{< img src="/img/unaware-bucket-topo.png">}} - -Do not worry about backpressure, compaction never backpressure. - -If you set `write-only` to true, the `Compact Coordinator` and `Compact Worker` will be removed in the topology. - -The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in flink by -flink action in paimon and disable all the other compaction by set `write-only`. - -## Streaming Query - -You can stream the Append table and use it like a Message Queue. As with primary key tables, there are two options -for streaming reads: -1. By default, Streaming read produces the latest snapshot on the table upon first startup, and continue to read the - latest incremental records. -2. You can specify `scan.mode` or `scan.snapshot-id` or `scan.timestamp-millis` or `scan.file-creation-time-millis` to - streaming read incremental only. - -Similar to flink-kafka, order is not guaranteed by default, if your data has some sort of order requirement, you also -need to consider defining a `bucket-key`, see [Bucketed Append]({{< ref "append-table/bucketed-append" >}}). - -## OLAP Query - -### Data Skipping By Order - -Paimon by default records the maximum and minimum values of each field in the manifest file. - -In the query, according to the `WHERE` condition of the query, according to the statistics in the manifest do files -filtering, if the filtering effect is good, the query would have been minutes of the query will be accelerated to -milliseconds to complete the execution. - -Often the data distribution is not always effective filtering, so if we can sort the data by the field in `WHERE` condition? -You can take a look to [Flink COMPACT Action]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}) or -[Flink COMPACT Procedure]({{< ref "flink/procedures" >}}) or [Spark COMPACT Procedure]({{< ref "spark/procedures" >}}). - -### Data Skipping By File Index - -You can use file index too, it filters files by index on the read side. - -```sql -CREATE TABLE ( , ...) WITH ( - 'file-index.bloom-filter.columns' = 'c1,c2', - 'file-index.bloom-filter.c1.items' = '200' -); -``` - -## DELETE & UPDATE - -Now, only Spark SQL supports DELETE & UPDATE, you can take a look to [Spark Write]({{< ref "spark/sql-write" >}}). \ No newline at end of file diff --git a/docs/content/append-table/overview.md b/docs/content/append-table/overview.md new file mode 100644 index 000000000000..67d063c584e9 --- /dev/null +++ b/docs/content/append-table/overview.md @@ -0,0 +1,58 @@ +--- +title: "Overview" +weight: 1 +type: docs +aliases: +- /append-table/overview.html +--- + + +# Overview + +If a table does not have a primary key defined, it is an append table. Compared to the primary key table, it does not +have the ability to directly receive changelogs. It cannot be directly updated with data through upsert. It can only +receive incoming data from append data. + +{{< tabs "create-append-table" >}} +{{< tab "Flink" >}} +```sql +CREATE TABLE my_table ( + product_id BIGINT, + price DOUBLE, + sales BIGINT +) WITH ( + -- 'target-file-size' = '256 MB', + -- 'file.format' = 'parquet', + -- 'file.compression' = 'zstd', + -- 'file.compression.zstd-level' = '3' +); +``` +{{< /tab >}} +{{< /tabs >}} + +Batch write and batch read in typical application scenarios, similar to a regular Hive partition table, but compared to +the Hive table, it can bring: + +1. Object storage (S3, OSS) friendly +2. Time Travel and Rollback +3. DELETE / UPDATE with low cost +4. Automatic small file merging in streaming sink +5. Streaming read & write like a queue +6. High performance query with order and index diff --git a/docs/content/append-table/file-index.md b/docs/content/append-table/query.md similarity index 70% rename from docs/content/append-table/file-index.md rename to docs/content/append-table/query.md index 4bf769d6eea3..fc4cbeb6c29d 100644 --- a/docs/content/append-table/file-index.md +++ b/docs/content/append-table/query.md @@ -1,9 +1,9 @@ --- -title: "File Index" -weight: 4 +title: "Query" +weight: 3 type: docs aliases: -- /append-table/file-index.html +- /append-table/query.html --- -# Data File Index +# Query + +## Data Skipping By Order + +Paimon by default records the maximum and minimum values of each field in the manifest file. + +In the query, according to the `WHERE` condition of the query, according to the statistics in the manifest do files +filtering, if the filtering effect is good, the query would have been minutes of the query will be accelerated to +milliseconds to complete the execution. + +Often the data distribution is not always effective filtering, so if we can sort the data by the field in `WHERE` condition? +You can take a look to [Flink COMPACT Action]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}) or +[Flink COMPACT Procedure]({{< ref "flink/procedures" >}}) or [Spark COMPACT Procedure]({{< ref "spark/procedures" >}}). + +## Data Skipping By File Index + +You can use file index too, it filters files by index on the read side. + +```sql +CREATE TABLE ( , ...) WITH ( + 'file-index.bloom-filter.columns' = 'c1,c2', + 'file-index.bloom-filter.c1.items' = '200' +); +``` Define `file-index.bloom-filter.columns`, Paimon will create its corresponding index file for each file. If the index file is too small, it will be stored directly in the manifest, or in the directory of the data file. Each data file corresponds to an index file, which has a separate file definition and can contain different types of indexes with multiple columns. -## Concept - Data file index is an external index file corresponding to a certain data file. If the index file is too small, it will be stored directly in the manifest, otherwise in the directory of the data file. Each data file corresponds to an index file, which has a separate file definition and can contain different types of indexes with multiple columns. -## Usage - Different file index may be efficient in different scenario. For example bloom filter may speed up query in point lookup scenario. Using a bitmap may consume more space but can result in greater accuracy. Though we only realize bloom filter currently, but other types of index will be supported in the future. @@ -52,8 +71,6 @@ Currently, file index is only supported in append-only table. More filter types will be supported... -## Procedure - If you want to add file index to existing table, without any rewrite, you can use `rewrite_file_index` procedure. Before we use the procedure, you should config appropriate configurations in target table. You can use ALTER clause to config `file-index..columns` to the table. diff --git a/docs/content/append-table/bucketed-append.md b/docs/content/append-table/streaming.md similarity index 76% rename from docs/content/append-table/bucketed-append.md rename to docs/content/append-table/streaming.md index 4ef05b6ea561..c3d64a6500f0 100644 --- a/docs/content/append-table/bucketed-append.md +++ b/docs/content/append-table/streaming.md @@ -1,9 +1,9 @@ --- -title: "Bucketed Append" -weight: 3 +title: "Streaming" +weight: 2 type: docs aliases: -- /append-table/bucketed-append.html +- /append-table/streaming.html --- -# Bucketed Append +# Streaming -## Definition +You can streaming write to the Append table in a very flexible way through Flink, or through read the Append table +Flink, using it like a queue. The only difference is that its latency is in minutes. Its advantages are very low cost +and the ability to push down filters and projection. + +## Automatic small file merging + +In streaming writing job, without bucket definition, there is no compaction in writer, instead, will use +`Compact Coordinator` to scan the small files and pass compaction task to `Compact Worker`. In streaming mode, if you +run insert sql in flink, the topology will be like this: + +{{< img src="/img/unaware-bucket-topo.png">}} + +Do not worry about backpressure, compaction never backpressure. + +If you set `write-only` to true, the `Compact Coordinator` and `Compact Worker` will be removed in the topology. + +The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in flink by +flink action in paimon and disable all the other compaction by set `write-only`. + +## Streaming Query + +You can stream the Append table and use it like a Message Queue. As with primary key tables, there are two options +for streaming reads: +1. By default, Streaming read produces the latest snapshot on the table upon first startup, and continue to read the + latest incremental records. +2. You can specify `scan.mode` or `scan.snapshot-id` or `scan.timestamp-millis` or `scan.file-creation-time-millis` to + streaming read incremental only. + +Similar to flink-kafka, order is not guaranteed by default, if your data has some sort of order requirement, you also +need to consider defining a `bucket-key`. + +## Bucketed Append An ordinary Append table has no strict ordering guarantees for its streaming writes and reads, but there are some cases where you need to define a key similar to Kafka's. -You can define the `bucket` and `bucket-key` to get a bucketed append table. Every record in the same bucket is ordered +You can define the `bucket` and `bucket-key` to get a bucketed append table. Every record in the same bucket is ordered strictly, streaming read will transfer the record to down-stream exactly in the order of writing. To use this mode, you do not need to config special configurations, all the data will go into one bucket as a queue. @@ -55,7 +86,7 @@ CREATE TABLE my_table ( {{< /tab >}} {{< /tabs >}} -## Compaction +### Compaction in Bucket By default, the sink node will automatically perform compaction to control the number of files. The following options control the strategy of compaction: @@ -97,17 +128,13 @@ control the strategy of compaction: -## Streaming Source - -Streaming source behavior is only supported in Flink engine at present. - ### Streaming Read Order For streaming reads, records are produced in the following order: * For any two records from two different partitions - * If `scan.plan-sort-partition` is set to true, the record with a smaller partition value will be produced first. - * Otherwise, the record with an earlier partition creation time will be produced first. + * If `scan.plan-sort-partition` is set to true, the record with a smaller partition value will be produced first. + * Otherwise, the record with an earlier partition creation time will be produced first. * For any two records from the same partition and the same bucket, the first written record will be produced first. * For any two records from the same partition but two different buckets, different buckets are processed by different tasks, there is no order guarantee between them. diff --git a/docs/content/append-table/deletion-vectors.md b/docs/content/append-table/update.md similarity index 57% rename from docs/content/append-table/deletion-vectors.md rename to docs/content/append-table/update.md index 663cdd924103..d50b6be574ac 100644 --- a/docs/content/append-table/deletion-vectors.md +++ b/docs/content/append-table/update.md @@ -1,9 +1,9 @@ --- -title: "Deletion Vectors" -weight: 5 +title: "Update" +weight: 4 type: docs aliases: -- /append-table/deletion-vectors.html +- /append-table/update.html --- -# Deletion Vectors +# Update -{{< hint info >}} -Currently only Spark SQL supports `DELETE`, `UPDATE` for append table. -{{< /hint >}} +Now, only Spark SQL supports DELETE & UPDATE, you can take a look to [Spark Write]({{< ref "spark/sql-write" >}}). -Consider SQL: +Example: ```sql DELETE FROM my_table WHERE currency = 'UNKNOWN'; ``` -By default, it will search for the hit files and then rewrite each file to remove the -data that needs to be deleted from the files. This operation is costly. +Update append table has two modes: -Deletion vectors mode only marks certain records of the corresponding file for deletion -and writes the deletion file, without rewriting the entire file. - -## Usage - -By specifying `'deletion-vectors.enabled' = 'true'`, the Deletion Vectors mode can be enabled. +1. COW (Copy on Write): search for the hit files and then rewrite each file to remove the data that needs to be deleted + from the files. This operation is costly. +2. MOW (Merge on Write): By specifying `'deletion-vectors.enabled' = 'true'`, the Deletion Vectors mode can be enabled. + Only marks certain records of the corresponding file for deletion and writes the deletion file, without rewriting the entire file. diff --git a/docs/content/flink/sql-write.md b/docs/content/flink/sql-write.md index f1e305ef4bb6..2c496c849618 100644 --- a/docs/content/flink/sql-write.md +++ b/docs/content/flink/sql-write.md @@ -51,9 +51,9 @@ For multiple jobs to write the same table, you can refer to [dedicated compactio ### Clustering -In Paimon, clustering is a feature that allows you to cluster data in your [Append Table]({{< ref "append-table/append-table#Append Table" >}}) +In Paimon, clustering is a feature that allows you to cluster data in your [Append Table]({{< ref "append-table/overview" >}}) based on the values of certain columns during the write process. This organization of data can significantly enhance the efficiency of downstream -tasks when reading the data, as it enables faster and more targeted data retrieval. This feature is only supported for [Append Table]({{< ref "append-table/append-table#Append Table" >}}) +tasks when reading the data, as it enables faster and more targeted data retrieval. This feature is only supported for [Append Table]({{< ref "append-table/overview" >}}) and batch execution mode. To utilize clustering, you can specify the columns you want to cluster when creating or writing to a table. Here's a simple example of how to enable clustering: diff --git a/docs/content/learn-paimon/understand-files.md b/docs/content/learn-paimon/understand-files.md index 258b7e0c17b9..3d68a7b7b979 100644 --- a/docs/content/learn-paimon/understand-files.md +++ b/docs/content/learn-paimon/understand-files.md @@ -455,7 +455,7 @@ this means that there are at least 5 files in a bucket. If you want to reduce th By default, Append also does automatic compaction to reduce the number of small files. However, for Bucketed Append table, it will only compact the files within the Bucket for sequential -purposes, which may keep more small files. See [Bucketed Append]({{< ref "append-table/bucketed-append" >}}). +purposes, which may keep more small files. See [Bucketed Append]({{< ref "append-table/streaming#bucketed-append" >}}). ### Understand Full-Compaction diff --git a/docs/content/maintenance/dedicated-compaction.md b/docs/content/maintenance/dedicated-compaction.md index d77e7e41c9ab..dab957da1da4 100644 --- a/docs/content/maintenance/dedicated-compaction.md +++ b/docs/content/maintenance/dedicated-compaction.md @@ -231,7 +231,7 @@ For more usage of the compact_database action, see ## Sort Compact If your table is configured with [dynamic bucket primary key table]({{< ref "primary-key-table/data-distribution#dynamic-bucket" >}}) -or [append table]({{< ref "append-table/append-table" >}}) , +or [append table]({{< ref "append-table/overview" >}}) , you can trigger a compact with specified column sort to speed up queries. ```bash diff --git a/docs/content/migration/migration-from-hive.md b/docs/content/migration/migration-from-hive.md index dd11324442d4..8327e8a928ef 100644 --- a/docs/content/migration/migration-from-hive.md +++ b/docs/content/migration/migration-from-hive.md @@ -28,7 +28,7 @@ under the License. Apache Hive supports ORC, Parquet file formats that could be migrated to Paimon. When migrating data to a paimon table, the origin table will be permanently disappeared. So please back up your data if you -still need the original table. The migrated table will be [append table]({{< ref "append-table/append-table" >}}). +still need the original table. The migrated table will be [append table]({{< ref "append-table/overview" >}}). Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate File Procedure to totally migrate a table from hive to paimon. At the same time, you can use paimon hive catalog with Migrate Database Procedure to fully synchronize all tables in the database to paimon.