diff --git a/docs/content/append-table/append-table.md b/docs/content/append-table/append-table.md deleted file mode 100644 index 2eba1f44f5a8d..0000000000000 --- a/docs/content/append-table/append-table.md +++ /dev/null @@ -1,102 +0,0 @@ ---- -title: "Append Table" -weight: 1 -type: docs -aliases: -- /append-table/append-table.html ---- - - -# Append Table - -If a table does not have a primary key defined, it is an append table by default. - -You can only insert a complete record into the table in streaming. This type of table is suitable for use cases that -do not require streaming updates (such as log data synchronization). - -{{< tabs "create-append-table" >}} -{{< tab "Flink" >}} -```sql -CREATE TABLE my_table ( - product_id BIGINT, - price DOUBLE, - sales BIGINT -) WITH ( - 'file.compression.zstd-level' = '3' -); -``` -{{< /tab >}} -{{< /tabs >}} - -## Automatic small file merging - -In streaming writing job, without bucket definition, there is no compaction in writer, instead, will use -`Compact Coordinator` to scan the small files and pass compaction task to `Compact Worker`. In streaming mode, if you -run insert sql in flink, the topology will be like this: - -{{< img src="/img/unaware-bucket-topo.png">}} - -Do not worry about backpressure, compaction never backpressure. - -If you set `write-only` to true, the `Compact Coordinator` and `Compact Worker` will be removed in the topology. - -The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in flink by -flink action in paimon and disable all the other compaction by set `write-only`. - -## Streaming Query - -You can stream the Append table and use it like a Message Queue. As with primary key tables, there are two options -for streaming reads: -1. By default, Streaming read produces the latest snapshot on the table upon first startup, and continue to read the - latest incremental records. -2. You can specify `scan.mode` or `scan.snapshot-id` or `scan.timestamp-millis` or `scan.file-creation-time-millis` to - streaming read incremental only. - -Similar to flink-kafka, order is not guaranteed by default, if your data has some sort of order requirement, you also -need to consider defining a `bucket-key`, see [Bucketed Append]({{< ref "append-table/bucketed-append" >}}). - -## OLAP Query - -### Data Skipping By Order - -Paimon by default records the maximum and minimum values of each field in the manifest file. - -In the query, according to the `WHERE` condition of the query, according to the statistics in the manifest do files -filtering, if the filtering effect is good, the query would have been minutes of the query will be accelerated to -milliseconds to complete the execution. - -Often the data distribution is not always effective filtering, so if we can sort the data by the field in `WHERE` condition? -You can take a look to [Flink COMPACT Action]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}) or -[Flink COMPACT Procedure]({{< ref "flink/procedures" >}}) or [Spark COMPACT Procedure]({{< ref "spark/procedures" >}}). - -### Data Skipping By File Index - -You can use file index too, it filters files by index on the read side. - -```sql -CREATE TABLE ( , ...) WITH ( - 'file-index.bloom-filter.columns' = 'c1,c2', - 'file-index.bloom-filter.c1.items' = '200' -); -``` - -## DELETE & UPDATE - -Now, only Spark SQL supports DELETE & UPDATE, you can take a look to [Spark Write]({{< ref "spark/sql-write" >}}). \ No newline at end of file diff --git a/docs/content/append-table/overview.md b/docs/content/append-table/overview.md new file mode 100644 index 0000000000000..67d063c584e9e --- /dev/null +++ b/docs/content/append-table/overview.md @@ -0,0 +1,58 @@ +--- +title: "Overview" +weight: 1 +type: docs +aliases: +- /append-table/overview.html +--- + + +# Overview + +If a table does not have a primary key defined, it is an append table. Compared to the primary key table, it does not +have the ability to directly receive changelogs. It cannot be directly updated with data through upsert. It can only +receive incoming data from append data. + +{{< tabs "create-append-table" >}} +{{< tab "Flink" >}} +```sql +CREATE TABLE my_table ( + product_id BIGINT, + price DOUBLE, + sales BIGINT +) WITH ( + -- 'target-file-size' = '256 MB', + -- 'file.format' = 'parquet', + -- 'file.compression' = 'zstd', + -- 'file.compression.zstd-level' = '3' +); +``` +{{< /tab >}} +{{< /tabs >}} + +Batch write and batch read in typical application scenarios, similar to a regular Hive partition table, but compared to +the Hive table, it can bring: + +1. Object storage (S3, OSS) friendly +2. Time Travel and Rollback +3. DELETE / UPDATE with low cost +4. Automatic small file merging in streaming sink +5. Streaming read & write like a queue +6. High performance query with order and index diff --git a/docs/content/append-table/file-index.md b/docs/content/append-table/query.md similarity index 70% rename from docs/content/append-table/file-index.md rename to docs/content/append-table/query.md index 4bf769d6eea31..fc4cbeb6c29d8 100644 --- a/docs/content/append-table/file-index.md +++ b/docs/content/append-table/query.md @@ -1,9 +1,9 @@ --- -title: "File Index" -weight: 4 +title: "Query" +weight: 3 type: docs aliases: -- /append-table/file-index.html +- /append-table/query.html --- -# Data File Index +# Query + +## Data Skipping By Order + +Paimon by default records the maximum and minimum values of each field in the manifest file. + +In the query, according to the `WHERE` condition of the query, according to the statistics in the manifest do files +filtering, if the filtering effect is good, the query would have been minutes of the query will be accelerated to +milliseconds to complete the execution. + +Often the data distribution is not always effective filtering, so if we can sort the data by the field in `WHERE` condition? +You can take a look to [Flink COMPACT Action]({{< ref "maintenance/dedicated-compaction#sort-compact" >}}) or +[Flink COMPACT Procedure]({{< ref "flink/procedures" >}}) or [Spark COMPACT Procedure]({{< ref "spark/procedures" >}}). + +## Data Skipping By File Index + +You can use file index too, it filters files by index on the read side. + +```sql +CREATE TABLE ( , ...) WITH ( + 'file-index.bloom-filter.columns' = 'c1,c2', + 'file-index.bloom-filter.c1.items' = '200' +); +``` Define `file-index.bloom-filter.columns`, Paimon will create its corresponding index file for each file. If the index file is too small, it will be stored directly in the manifest, or in the directory of the data file. Each data file corresponds to an index file, which has a separate file definition and can contain different types of indexes with multiple columns. -## Concept - Data file index is an external index file corresponding to a certain data file. If the index file is too small, it will be stored directly in the manifest, otherwise in the directory of the data file. Each data file corresponds to an index file, which has a separate file definition and can contain different types of indexes with multiple columns. -## Usage - Different file index may be efficient in different scenario. For example bloom filter may speed up query in point lookup scenario. Using a bitmap may consume more space but can result in greater accuracy. Though we only realize bloom filter currently, but other types of index will be supported in the future. @@ -52,8 +71,6 @@ Currently, file index is only supported in append-only table. More filter types will be supported... -## Procedure - If you want to add file index to existing table, without any rewrite, you can use `rewrite_file_index` procedure. Before we use the procedure, you should config appropriate configurations in target table. You can use ALTER clause to config `file-index..columns` to the table. diff --git a/docs/content/append-table/bucketed-append.md b/docs/content/append-table/streaming.md similarity index 76% rename from docs/content/append-table/bucketed-append.md rename to docs/content/append-table/streaming.md index 4ef05b6ea561e..c3d64a6500f02 100644 --- a/docs/content/append-table/bucketed-append.md +++ b/docs/content/append-table/streaming.md @@ -1,9 +1,9 @@ --- -title: "Bucketed Append" -weight: 3 +title: "Streaming" +weight: 2 type: docs aliases: -- /append-table/bucketed-append.html +- /append-table/streaming.html --- -# Bucketed Append +# Streaming -## Definition +You can streaming write to the Append table in a very flexible way through Flink, or through read the Append table +Flink, using it like a queue. The only difference is that its latency is in minutes. Its advantages are very low cost +and the ability to push down filters and projection. + +## Automatic small file merging + +In streaming writing job, without bucket definition, there is no compaction in writer, instead, will use +`Compact Coordinator` to scan the small files and pass compaction task to `Compact Worker`. In streaming mode, if you +run insert sql in flink, the topology will be like this: + +{{< img src="/img/unaware-bucket-topo.png">}} + +Do not worry about backpressure, compaction never backpressure. + +If you set `write-only` to true, the `Compact Coordinator` and `Compact Worker` will be removed in the topology. + +The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in flink by +flink action in paimon and disable all the other compaction by set `write-only`. + +## Streaming Query + +You can stream the Append table and use it like a Message Queue. As with primary key tables, there are two options +for streaming reads: +1. By default, Streaming read produces the latest snapshot on the table upon first startup, and continue to read the + latest incremental records. +2. You can specify `scan.mode` or `scan.snapshot-id` or `scan.timestamp-millis` or `scan.file-creation-time-millis` to + streaming read incremental only. + +Similar to flink-kafka, order is not guaranteed by default, if your data has some sort of order requirement, you also +need to consider defining a `bucket-key`. + +## Bucketed Append An ordinary Append table has no strict ordering guarantees for its streaming writes and reads, but there are some cases where you need to define a key similar to Kafka's. -You can define the `bucket` and `bucket-key` to get a bucketed append table. Every record in the same bucket is ordered +You can define the `bucket` and `bucket-key` to get a bucketed append table. Every record in the same bucket is ordered strictly, streaming read will transfer the record to down-stream exactly in the order of writing. To use this mode, you do not need to config special configurations, all the data will go into one bucket as a queue. @@ -55,7 +86,7 @@ CREATE TABLE my_table ( {{< /tab >}} {{< /tabs >}} -## Compaction +### Compaction in Bucket By default, the sink node will automatically perform compaction to control the number of files. The following options control the strategy of compaction: @@ -97,17 +128,13 @@ control the strategy of compaction: -## Streaming Source - -Streaming source behavior is only supported in Flink engine at present. - ### Streaming Read Order For streaming reads, records are produced in the following order: * For any two records from two different partitions - * If `scan.plan-sort-partition` is set to true, the record with a smaller partition value will be produced first. - * Otherwise, the record with an earlier partition creation time will be produced first. + * If `scan.plan-sort-partition` is set to true, the record with a smaller partition value will be produced first. + * Otherwise, the record with an earlier partition creation time will be produced first. * For any two records from the same partition and the same bucket, the first written record will be produced first. * For any two records from the same partition but two different buckets, different buckets are processed by different tasks, there is no order guarantee between them. diff --git a/docs/content/append-table/deletion-vectors.md b/docs/content/append-table/update.md similarity index 57% rename from docs/content/append-table/deletion-vectors.md rename to docs/content/append-table/update.md index 663cdd9241030..d50b6be574ac0 100644 --- a/docs/content/append-table/deletion-vectors.md +++ b/docs/content/append-table/update.md @@ -1,9 +1,9 @@ --- -title: "Deletion Vectors" -weight: 5 +title: "Update" +weight: 4 type: docs aliases: -- /append-table/deletion-vectors.html +- /append-table/update.html --- -# Deletion Vectors +# Update -{{< hint info >}} -Currently only Spark SQL supports `DELETE`, `UPDATE` for append table. -{{< /hint >}} +Now, only Spark SQL supports DELETE & UPDATE, you can take a look to [Spark Write]({{< ref "spark/sql-write" >}}). -Consider SQL: +Example: ```sql DELETE FROM my_table WHERE currency = 'UNKNOWN'; ``` -By default, it will search for the hit files and then rewrite each file to remove the -data that needs to be deleted from the files. This operation is costly. +Update append table has two modes: -Deletion vectors mode only marks certain records of the corresponding file for deletion -and writes the deletion file, without rewriting the entire file. - -## Usage - -By specifying `'deletion-vectors.enabled' = 'true'`, the Deletion Vectors mode can be enabled. +1. COW (Copy on Write): search for the hit files and then rewrite each file to remove the data that needs to be deleted + from the files. This operation is costly. +2. MOW (Merge on Write): By specifying `'deletion-vectors.enabled' = 'true'`, the Deletion Vectors mode can be enabled. + Only marks certain records of the corresponding file for deletion and writes the deletion file, without rewriting the entire file. diff --git a/docs/content/flink/sql-write.md b/docs/content/flink/sql-write.md index f1e305ef4bb6f..2c496c849618d 100644 --- a/docs/content/flink/sql-write.md +++ b/docs/content/flink/sql-write.md @@ -51,9 +51,9 @@ For multiple jobs to write the same table, you can refer to [dedicated compactio ### Clustering -In Paimon, clustering is a feature that allows you to cluster data in your [Append Table]({{< ref "append-table/append-table#Append Table" >}}) +In Paimon, clustering is a feature that allows you to cluster data in your [Append Table]({{< ref "append-table/overview" >}}) based on the values of certain columns during the write process. This organization of data can significantly enhance the efficiency of downstream -tasks when reading the data, as it enables faster and more targeted data retrieval. This feature is only supported for [Append Table]({{< ref "append-table/append-table#Append Table" >}}) +tasks when reading the data, as it enables faster and more targeted data retrieval. This feature is only supported for [Append Table]({{< ref "append-table/overview" >}}) and batch execution mode. To utilize clustering, you can specify the columns you want to cluster when creating or writing to a table. Here's a simple example of how to enable clustering: diff --git a/docs/content/learn-paimon/understand-files.md b/docs/content/learn-paimon/understand-files.md index 258b7e0c17b91..3d68a7b7b979c 100644 --- a/docs/content/learn-paimon/understand-files.md +++ b/docs/content/learn-paimon/understand-files.md @@ -455,7 +455,7 @@ this means that there are at least 5 files in a bucket. If you want to reduce th By default, Append also does automatic compaction to reduce the number of small files. However, for Bucketed Append table, it will only compact the files within the Bucket for sequential -purposes, which may keep more small files. See [Bucketed Append]({{< ref "append-table/bucketed-append" >}}). +purposes, which may keep more small files. See [Bucketed Append]({{< ref "append-table/streaming#bucketed-append" >}}). ### Understand Full-Compaction diff --git a/docs/content/maintenance/dedicated-compaction.md b/docs/content/maintenance/dedicated-compaction.md index d77e7e41c9ab9..dab957da1da4f 100644 --- a/docs/content/maintenance/dedicated-compaction.md +++ b/docs/content/maintenance/dedicated-compaction.md @@ -231,7 +231,7 @@ For more usage of the compact_database action, see ## Sort Compact If your table is configured with [dynamic bucket primary key table]({{< ref "primary-key-table/data-distribution#dynamic-bucket" >}}) -or [append table]({{< ref "append-table/append-table" >}}) , +or [append table]({{< ref "append-table/overview" >}}) , you can trigger a compact with specified column sort to speed up queries. ```bash diff --git a/docs/content/migration/migration-from-hive.md b/docs/content/migration/migration-from-hive.md index dd11324442d4c..8327e8a928ef7 100644 --- a/docs/content/migration/migration-from-hive.md +++ b/docs/content/migration/migration-from-hive.md @@ -28,7 +28,7 @@ under the License. Apache Hive supports ORC, Parquet file formats that could be migrated to Paimon. When migrating data to a paimon table, the origin table will be permanently disappeared. So please back up your data if you -still need the original table. The migrated table will be [append table]({{< ref "append-table/append-table" >}}). +still need the original table. The migrated table will be [append table]({{< ref "append-table/overview" >}}). Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate File Procedure to totally migrate a table from hive to paimon. At the same time, you can use paimon hive catalog with Migrate Database Procedure to fully synchronize all tables in the database to paimon. diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 3e85c7c88bf83..e2052162c5398 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -81,18 +81,23 @@ public static boolean schemaCompatible( for (DataField field : sourceTableFields) { int idx = paimonSchema.fieldNames().indexOf(field.name()); if (idx < 0) { - LOG.info("Cannot find field '{}' in Paimon table.", field.name()); - return false; - } - DataType type = paimonSchema.fields().get(idx).type(); - if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type) - != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { - LOG.info( - "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", - field.name(), - field.type(), - type); - return false; + if (!field.type().isNullable()) { + LOG.info( + "Add column '{}' cannot specify NOT NULL in the Paimon table.", + field.name()); + return false; + } + } else { + DataType type = paimonSchema.fields().get(idx).type(); + if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type()) + != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { + LOG.info( + "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", + field.name(), + field.type(), + type); + return false; + } } } return true; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index e335fc2be3484..d25cb2b8eb386 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -124,13 +124,13 @@ protected void beforeBuildingSourceSink() throws Exception { // Check if table exists before trying to get or create it if (catalog.tableExists(identifier)) { fileStoreTable = (FileStoreTable) catalog.getTable(identifier); - fileStoreTable = alterTableOptions(identifier, fileStoreTable); try { Schema retrievedSchema = retrieveSchema(); computedColumns = buildComputedColumns(computedColumnArgs, retrievedSchema.fields()); Schema paimonSchema = buildPaimonSchema(retrievedSchema); assertSchemaCompatible(fileStoreTable.schema(), paimonSchema.fields()); + fileStoreTable = alterTableSchema(identifier, fileStoreTable, paimonSchema); } catch (SchemaRetrievalException e) { LOG.info( "Failed to retrieve schema from record data but there exists specified Paimon table. " diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 944185347c353..9f99ad35e67c2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -27,8 +27,11 @@ import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy; import org.apache.paimon.flink.sink.cdc.EventParser; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction; import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -79,7 +82,6 @@ public SynchronizationActionBase( this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig); this.syncJobHandler = syncJobHandler; this.caseSensitive = catalog.caseSensitive(); - this.syncJobHandler.registerJdbcDriver(); } @@ -177,7 +179,8 @@ protected abstract void buildSink( DataStream input, EventParser.Factory parserFactory); - protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) { + protected FileStoreTable alterTableSchema( + Identifier identifier, FileStoreTable table, Schema paimonSchema) { // doesn't support altering bucket here Map dynamicOptions = new HashMap<>(tableConfig); dynamicOptions.remove(CoreOptions.BUCKET.key()); @@ -199,6 +202,14 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable .map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue())) .collect(Collectors.toList()); + // alter the table schema + List columnChanges = + UpdatedDataFieldsProcessFunction.extractSchemaChanges( + new SchemaManager(table.fileIO(), table.location()), + paimonSchema.fields(), + caseSensitive); + + optionChanges.addAll(columnChanges); try { catalog.alterTable(identifier, optionChanges, false); } catch (Catalog.TableNotExistException diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index a33f2c978321e..2059293385fbd 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -160,7 +160,7 @@ protected void beforeBuildingSourceSink() throws Exception { Supplier errMsg = incompatibleMessage(table.schema(), tableInfo, identifier); if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) { - table = alterTableOptions(identifier, table); + table = alterTableSchema(identifier, table, fromMySql); tables.add(table); monitoredTables.addAll(tableInfo.identifiers()); } else { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java index 1f3ca3035975b..d677b0e481d1d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java @@ -136,18 +136,14 @@ public static DataType toDataType( return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)); case PG_CHAR: case PG_CHARACTER: - return DataTypes.CHAR(precision); - case PG_CHAR_ARRAY: - case PG_CHARACTER_ARRAY: - return DataTypes.ARRAY(DataTypes.CHAR(precision)); case PG_CHARACTER_VARYING: - return DataTypes.VARCHAR(precision); - case PG_CHARACTER_VARYING_ARRAY: - return DataTypes.ARRAY(DataTypes.VARCHAR(precision)); case PG_TEXT: case PG_JSON: case PG_ENUM: return DataTypes.STRING(); + case PG_CHAR_ARRAY: + case PG_CHARACTER_ARRAY: + case PG_CHARACTER_VARYING_ARRAY: case PG_TEXT_ARRAY: return DataTypes.ARRAY(DataTypes.STRING()); case PG_TIMESTAMP: diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java index 3d832d33949b6..7cf5fb2aea3ca 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java @@ -192,8 +192,13 @@ public static ConvertAction canConvert(DataType oldType, DataType newType) { return ConvertAction.EXCEPTION; } - protected List extractSchemaChanges( + public List extractSchemaChanges( SchemaManager schemaManager, List updatedDataFields) { + return extractSchemaChanges(schemaManager, updatedDataFields, caseSensitive); + } + + public static List extractSchemaChanges( + SchemaManager schemaManager, List updatedDataFields, boolean caseSensitive) { RowType oldRowType = schemaManager.latest().get().logicalRowType(); Map oldFields = new HashMap<>(); for (DataField oldField : oldRowType.getFields()) { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 9568b1c3b7fa9..b9fd80cd842b5 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1361,8 +1361,6 @@ public void testColumnCommentChangeInExistingTable() throws Exception { mySqlConfig.put("database-name", DATABASE_NAME); mySqlConfig.put("table-name", "test_exist_column_comment_change"); - // Flink cdc 2.3 does not support collecting field comments, and existing paimon table field - // comments will not be changed. MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig) .withPrimaryKeys("pk") @@ -1374,13 +1372,96 @@ public void testColumnCommentChangeInExistingTable() throws Exception { Map actual = table.schema().fields().stream() .collect(Collectors.toMap(DataField::name, Function.identity())); - assertThat(actual.get("pk").description()).isEqualTo("pk comment"); - assertThat(actual.get("c1").description()).isEqualTo("c1 comment"); + assertThat(actual.get("pk").description()).isEqualTo("pk new_comment"); + assertThat(actual.get("c1").description()).isEqualTo("c1 new_comment"); assertThat(actual.get("c2").description()).isEqualTo("c2 comment"); } @Test @Timeout(60) + public void testColumnAlterInExistingTableBeforeStartJob() throws Exception { + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + + RowType rowType = + RowType.builder() + .field("pk", DataTypes.INT().notNull()) + .field("a", DataTypes.BIGINT()) + .field("b", DataTypes.VARCHAR(20)) + .build(); + + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("pk"), + Collections.emptyList(), + options); + + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", DATABASE_NAME); + mySqlConfig.put("table-name", "test_exist_column_alter"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withPrimaryKeys("pk") + .withTableConfig(getBasicTableConfig()) + .build(); + + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + + Map actual = + table.schema().fields().stream() + .collect(Collectors.toMap(DataField::name, Function.identity())); + + assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull()); + assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT()); + assertThat(actual.get("b").type()).isEqualTo(DataTypes.VARCHAR(30)); + assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT()); + } + + @Test + @Timeout(60) + public void testAssertSchemaCompatibleWithAddColumnISNOTNULL() throws Exception { + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + + RowType rowType = + RowType.builder() + .field("pk", DataTypes.INT().notNull()) + .field("a", DataTypes.BIGINT()) + .field("b", DataTypes.VARCHAR(20)) + .build(); + + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("pk"), + Collections.emptyList(), + options); + + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", DATABASE_NAME); + mySqlConfig.put("table-name", "assert_schema_compatible"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withPrimaryKeys("pk") + .withTableConfig(getBasicTableConfig()) + .build(); + + assertThatThrownBy(action::run) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Paimon schema and source table schema are not compatible.\n" + + "Paimon fields are: [`pk` INT NOT NULL, `a` BIGINT, `b` VARCHAR(20)].\n" + + "Source table fields are: [`pk` INT NOT NULL '', `a` BIGINT '', `b` VARCHAR(30) '', `c` INT NOT NULL 'Add column cannot specify NOT NULL in the Paimon table']")); + } + public void testWriteOnlyAndSchemaEvolution() throws Exception { Map mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put("database-name", "write_only_and_schema_evolution"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java index b5b36888ebe06..8dd9bb7d92fac 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -38,6 +39,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; @@ -77,7 +80,7 @@ public void testSchemaEvolution() throws Exception { checkTableSchema( "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT NULL\"}," + "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT NULL\"}," - + "{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\"}]"); + + "{\"id\":2,\"name\":\"v1\",\"type\":\"STRING\"}]"); try (Statement statement = getStatement(DATABASE_NAME)) { testSchemaEvolutionImpl(statement); @@ -245,9 +248,9 @@ public void testMultipleSchemaEvolutions() throws Exception { checkTableSchema( "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT NULL\"}," - + "{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\"}," + + "{\"id\":1,\"name\":\"v1\",\"type\":\"STRING\"}," + "{\"id\":2,\"name\":\"v2\",\"type\":\"INT\"}," - + "{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\"}]"); + + "{\"id\":3,\"name\":\"v3\",\"type\":\"STRING\"}]"); try (Statement statement = getStatement(DATABASE_NAME)) { testSchemaEvolutionMultipleImpl(statement); @@ -786,6 +789,50 @@ public void testCatalogAndTableConfig() { .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } + @Test + @Timeout(60) + public void testColumnAlterInExistingTableWhenStartJob() throws Exception { + String tableName = "test_exist_column_alter"; + Map options = new HashMap<>(); + options.put("bucket", "1"); + options.put("sink.parallelism", "1"); + + RowType rowType = + RowType.builder() + .field("pk", DataTypes.INT().notNull()) + .field("a", DataTypes.BIGINT()) + .field("b", DataTypes.STRING()) + .build(); + + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("pk"), + Collections.emptyList(), + options); + + Map postgresConfig = getBasicPostgresConfig(); + postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME); + postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME); + postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), tableName); + + PostgresSyncTableAction action = + syncTableActionBuilder(postgresConfig).withPrimaryKeys("pk").build(); + + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + + Map actual = + table.schema().fields().stream() + .collect(Collectors.toMap(DataField::name, Function.identity())); + + assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull()); + assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT()); + assertThat(actual.get("b").type()).isEqualTo(DataTypes.STRING()); + assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT()); + } + private FileStoreTable getFileStoreTable() throws Exception { return getFileStoreTable(tableName); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 676185fb9291f..c073eea3a37f4 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -317,12 +317,28 @@ CREATE TABLE test_exist_options_change ( ); CREATE TABLE test_exist_column_comment_change ( - pk INT, - c1 DATE, - c2 VARCHAR(10) not null comment 'c2 comment', + pk INT comment 'pk new_comment', + c1 DATE comment 'c1 new_comment', + c2 VARCHAR(10) NOT NULL comment 'c2 comment', PRIMARY KEY (pk) ); +CREATE TABLE test_exist_column_alter ( + pk INT, + a BIGINT, + b VARCHAR(30), + c INT, + PRIMARY KEY (pk) +); + +CREATE TABLE assert_schema_compatible ( + pk INT, + a BIGINT, + b VARCHAR(30), + c INT NOT NULL comment 'Add column cannot specify NOT NULL in the Paimon table', + PRIMARY KEY (pk) +); + -- ################################################################################ -- testSyncShard -- ################################################################################ diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql index 373eb3880f403..a451cb1c7b6ad 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql @@ -229,6 +229,15 @@ CREATE TABLE test_options_change ( ALTER TABLE test_options_change REPLICA IDENTITY FULL; +CREATE TABLE test_exist_column_alter ( + pk INT, + a BIGINT, + b VARCHAR(30), + c INT, + PRIMARY KEY (pk) +); + + -- ################################################################################ -- testMetadataColumns -- ################################################################################