Skip to content

Commit

Permalink
supported_schemaEvolution_when_restarting_the_paimon_cdc_job
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored and MOBIN-F committed Jul 10, 2024
1 parent 4c064b5 commit 77b9d82
Show file tree
Hide file tree
Showing 19 changed files with 345 additions and 180 deletions.
102 changes: 0 additions & 102 deletions docs/content/append-table/append-table.md

This file was deleted.

58 changes: 58 additions & 0 deletions docs/content/append-table/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
title: "Overview"
weight: 1
type: docs
aliases:
- /append-table/overview.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Overview

If a table does not have a primary key defined, it is an append table. Compared to the primary key table, it does not
have the ability to directly receive changelogs. It cannot be directly updated with data through upsert. It can only
receive incoming data from append data.

{{< tabs "create-append-table" >}}
{{< tab "Flink" >}}
```sql
CREATE TABLE my_table (
product_id BIGINT,
price DOUBLE,
sales BIGINT
) WITH (
-- 'target-file-size' = '256 MB',
-- 'file.format' = 'parquet',
-- 'file.compression' = 'zstd',
-- 'file.compression.zstd-level' = '3'
);
```
{{< /tab >}}
{{< /tabs >}}

Batch write and batch read in typical application scenarios, similar to a regular Hive partition table, but compared to
the Hive table, it can bring:

1. Object storage (S3, OSS) friendly
2. Time Travel and Rollback
3. DELETE / UPDATE with low cost
4. Automatic small file merging in streaming sink
5. Streaming read & write like a queue
6. High performance query with order and index
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: "File Index"
weight: 4
title: "Query"
weight: 3
type: docs
aliases:
- /append-table/file-index.html
- /append-table/query.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
Expand All @@ -24,21 +24,40 @@ specific language governing permissions and limitations
under the License.
-->

# Data File Index
# Query

## Data Skipping By Order

Paimon by default records the maximum and minimum values of each field in the manifest file.

In the query, according to the `WHERE` condition of the query, according to the statistics in the manifest do files
filtering, if the filtering effect is good, the query would have been minutes of the query will be accelerated to
milliseconds to complete the execution.

Often the data distribution is not always effective filtering, so if we can sort the data by the field in `WHERE` condition?
You can take a look to [Flink COMPACT Action]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}) or
[Flink COMPACT Procedure]({{< ref "flink/procedures" >}}) or [Spark COMPACT Procedure]({{< ref "spark/procedures" >}}).

## Data Skipping By File Index

You can use file index too, it filters files by index on the read side.

```sql
CREATE TABLE <PAIMON_TABLE> (<COLUMN> <COLUMN_TYPE> , ...) WITH (
'file-index.bloom-filter.columns' = 'c1,c2',
'file-index.bloom-filter.c1.items' = '200'
);
```

Define `file-index.bloom-filter.columns`, Paimon will create its corresponding index file for each file. If the index
file is too small, it will be stored directly in the manifest, or in the directory of the data file. Each data file
corresponds to an index file, which has a separate file definition and can contain different types of indexes with
multiple columns.

## Concept

Data file index is an external index file corresponding to a certain data file. If the index file is too small, it will
be stored directly in the manifest, otherwise in the directory of the data file. Each data file corresponds to an index file,
which has a separate file definition and can contain different types of indexes with multiple columns.

## Usage

Different file index may be efficient in different scenario. For example bloom filter may speed up query in point lookup
scenario. Using a bitmap may consume more space but can result in greater accuracy. Though we only realize bloom filter
currently, but other types of index will be supported in the future.
Expand All @@ -52,8 +71,6 @@ Currently, file index is only supported in append-only table.

More filter types will be supported...

## Procedure

If you want to add file index to existing table, without any rewrite, you can use `rewrite_file_index` procedure. Before
we use the procedure, you should config appropriate configurations in target table. You can use ALTER clause to config
`file-index.<filter-type>.columns` to the table.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: "Bucketed Append"
weight: 3
title: "Streaming"
weight: 2
type: docs
aliases:
- /append-table/bucketed-append.html
- /append-table/streaming.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
Expand All @@ -24,14 +24,45 @@ specific language governing permissions and limitations
under the License.
-->

# Bucketed Append
# Streaming

## Definition
You can streaming write to the Append table in a very flexible way through Flink, or through read the Append table
Flink, using it like a queue. The only difference is that its latency is in minutes. Its advantages are very low cost
and the ability to push down filters and projection.

## Automatic small file merging

In streaming writing job, without bucket definition, there is no compaction in writer, instead, will use
`Compact Coordinator` to scan the small files and pass compaction task to `Compact Worker`. In streaming mode, if you
run insert sql in flink, the topology will be like this:

{{< img src="/img/unaware-bucket-topo.png">}}

Do not worry about backpressure, compaction never backpressure.

If you set `write-only` to true, the `Compact Coordinator` and `Compact Worker` will be removed in the topology.

The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in flink by
flink action in paimon and disable all the other compaction by set `write-only`.

## Streaming Query

You can stream the Append table and use it like a Message Queue. As with primary key tables, there are two options
for streaming reads:
1. By default, Streaming read produces the latest snapshot on the table upon first startup, and continue to read the
latest incremental records.
2. You can specify `scan.mode` or `scan.snapshot-id` or `scan.timestamp-millis` or `scan.file-creation-time-millis` to
streaming read incremental only.

Similar to flink-kafka, order is not guaranteed by default, if your data has some sort of order requirement, you also
need to consider defining a `bucket-key`.

## Bucketed Append

An ordinary Append table has no strict ordering guarantees for its streaming writes and reads, but there are some cases
where you need to define a key similar to Kafka's.

You can define the `bucket` and `bucket-key` to get a bucketed append table. Every record in the same bucket is ordered
You can define the `bucket` and `bucket-key` to get a bucketed append table. Every record in the same bucket is ordered
strictly, streaming read will transfer the record to down-stream exactly in the order of writing. To use this mode, you
do not need to config special configurations, all the data will go into one bucket as a queue.

Expand All @@ -55,7 +86,7 @@ CREATE TABLE my_table (
{{< /tab >}}
{{< /tabs >}}

## Compaction
### Compaction in Bucket

By default, the sink node will automatically perform compaction to control the number of files. The following options
control the strategy of compaction:
Expand Down Expand Up @@ -97,17 +128,13 @@ control the strategy of compaction:
</tbody>
</table>

## Streaming Source

Streaming source behavior is only supported in Flink engine at present.

### Streaming Read Order

For streaming reads, records are produced in the following order:

* For any two records from two different partitions
* If `scan.plan-sort-partition` is set to true, the record with a smaller partition value will be produced first.
* Otherwise, the record with an earlier partition creation time will be produced first.
* If `scan.plan-sort-partition` is set to true, the record with a smaller partition value will be produced first.
* Otherwise, the record with an earlier partition creation time will be produced first.
* For any two records from the same partition and the same bucket, the first written record will be produced first.
* For any two records from the same partition but two different buckets, different buckets are processed by different tasks, there is no order guarantee between them.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: "Deletion Vectors"
weight: 5
title: "Update"
weight: 4
type: docs
aliases:
- /append-table/deletion-vectors.html
- /append-table/update.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
Expand All @@ -24,23 +24,18 @@ specific language governing permissions and limitations
under the License.
-->

# Deletion Vectors
# Update

{{< hint info >}}
Currently only Spark SQL supports `DELETE`, `UPDATE` for append table.
{{< /hint >}}
Now, only Spark SQL supports DELETE & UPDATE, you can take a look to [Spark Write]({{< ref "spark/sql-write" >}}).

Consider SQL:
Example:
```sql
DELETE FROM my_table WHERE currency = 'UNKNOWN';
```

By default, it will search for the hit files and then rewrite each file to remove the
data that needs to be deleted from the files. This operation is costly.
Update append table has two modes:

Deletion vectors mode only marks certain records of the corresponding file for deletion
and writes the deletion file, without rewriting the entire file.

## Usage

By specifying `'deletion-vectors.enabled' = 'true'`, the Deletion Vectors mode can be enabled.
1. COW (Copy on Write): search for the hit files and then rewrite each file to remove the data that needs to be deleted
from the files. This operation is costly.
2. MOW (Merge on Write): By specifying `'deletion-vectors.enabled' = 'true'`, the Deletion Vectors mode can be enabled.
Only marks certain records of the corresponding file for deletion and writes the deletion file, without rewriting the entire file.
4 changes: 2 additions & 2 deletions docs/content/flink/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ For multiple jobs to write the same table, you can refer to [dedicated compactio

### Clustering

In Paimon, clustering is a feature that allows you to cluster data in your [Append Table]({{< ref "append-table/append-table#Append Table" >}})
In Paimon, clustering is a feature that allows you to cluster data in your [Append Table]({{< ref "append-table/overview" >}})
based on the values of certain columns during the write process. This organization of data can significantly enhance the efficiency of downstream
tasks when reading the data, as it enables faster and more targeted data retrieval. This feature is only supported for [Append Table]({{< ref "append-table/append-table#Append Table" >}})
tasks when reading the data, as it enables faster and more targeted data retrieval. This feature is only supported for [Append Table]({{< ref "append-table/overview" >}})
and batch execution mode.

To utilize clustering, you can specify the columns you want to cluster when creating or writing to a table. Here's a simple example of how to enable clustering:
Expand Down
2 changes: 1 addition & 1 deletion docs/content/learn-paimon/understand-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ this means that there are at least 5 files in a bucket. If you want to reduce th
By default, Append also does automatic compaction to reduce the number of small files.
However, for Bucketed Append table, it will only compact the files within the Bucket for sequential
purposes, which may keep more small files. See [Bucketed Append]({{< ref "append-table/bucketed-append" >}}).
purposes, which may keep more small files. See [Bucketed Append]({{< ref "append-table/streaming#bucketed-append" >}}).
### Understand Full-Compaction
Expand Down
2 changes: 1 addition & 1 deletion docs/content/maintenance/dedicated-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ For more usage of the compact_database action, see
## Sort Compact
If your table is configured with [dynamic bucket primary key table]({{< ref "primary-key-table/data-distribution#dynamic-bucket" >}})
or [append table]({{< ref "append-table/append-table" >}}) ,
or [append table]({{< ref "append-table/overview" >}}) ,
you can trigger a compact with specified column sort to speed up queries.
```bash
Expand Down
Loading

0 comments on commit 77b9d82

Please sign in to comment.