diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index d25fd47ed680b..9ef2f0c8cf598 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -270,7 +270,7 @@
file.compression |
"zstd" |
String |
- Default file compression. For faster read and write, it is recommended to use LZ4. |
+ Default file compression. For faster read and write, it is recommended to use zstd. |
file.compression.per.level |
@@ -359,9 +359,9 @@
lookup.cache-spill-compression |
- "lz4" |
+ "zstd" |
String |
- Spill compression for lookup cache, currently none, lz4, lzo and zstd are supported. |
+ Spill compression for lookup cache, currently zstd, none, lz4 and lzo are supported. |
lookup.cache.bloom.filter.enabled |
@@ -485,18 +485,18 @@
String |
The default partition name in case the dynamic partition column value is null/empty string. |
-
- partition.expiration-strategy |
- values-time |
- String |
- Specifies the expiration strategy for partition expiration. Possible values:- values-time: A partition expiration policy that compares the time extracted from the partition value with the current time.
- update-time: A partition expiration policy that compares the last update time of the partition with the current time.
Possible values:- "values-time": The strategy compares the time extracted from the partition value with the current time.
- "update-time": The strategy compares the last update time of the partition with the current time.
|
-
partition.expiration-check-interval |
1 h |
Duration |
The check interval of partition expiration. |
+
+ partition.expiration-strategy |
+ values-time |
+ Enum |
+ Specifies the expiration strategy for partition expiration. Possible values:- values-time: A partition expiration policy that compares the time extracted from the partition value with the current time.
- update-time: A partition expiration policy that compares the last update time of the partition with the current time.
Possible values:- "values-time": The strategy compares the time extracted from the partition value with the current time.
- "update-time": The strategy compares the last update time of the partition with the current time.
|
+
partition.expiration-time |
(none) |
@@ -710,9 +710,9 @@
spill-compression |
- "LZ4" |
+ "zstd" |
String |
- Compression for spill, currently lz4, lzo and zstd are supported. |
+ Compression for spill, currently zstd, lzo and zstd are supported. |
streaming-read-mode |
diff --git a/paimon-common/pom.xml b/paimon-common/pom.xml
index a694db70f9a4e..ab95c8332440f 100644
--- a/paimon-common/pom.xml
+++ b/paimon-common/pom.xml
@@ -64,6 +64,12 @@ under the License.
${lz4.version}
+
+ com.github.luben
+ zstd-jni
+ ${zstd-jni.version}
+
+
org.codehaus.janino
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 515b4ec27709d..ccdedbd7b1503 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -157,7 +157,7 @@ public class CoreOptions implements Serializable {
.stringType()
.defaultValue("zstd")
.withDescription(
- "Default file compression. For faster read and write, it is recommended to use LZ4.");
+ "Default file compression. For faster read and write, it is recommended to use zstd.");
public static final ConfigOption FILE_COMPRESSION_ZSTD_LEVEL =
key("file.compression.zstd-level")
@@ -344,9 +344,9 @@ public class CoreOptions implements Serializable {
public static final ConfigOption SPILL_COMPRESSION =
key("spill-compression")
.stringType()
- .defaultValue("LZ4")
+ .defaultValue("zstd")
.withDescription(
- "Compression for spill, currently lz4, lzo and zstd are supported.");
+ "Compression for spill, currently zstd, lzo and zstd are supported.");
public static final ConfigOption WRITE_ONLY =
key("write-only")
@@ -814,9 +814,9 @@ public class CoreOptions implements Serializable {
public static final ConfigOption LOOKUP_CACHE_SPILL_COMPRESSION =
key("lookup.cache-spill-compression")
.stringType()
- .defaultValue("lz4")
+ .defaultValue("zstd")
.withDescription(
- "Spill compression for lookup cache, currently none, lz4, lzo and zstd are supported.");
+ "Spill compression for lookup cache, currently zstd, none, lz4 and lzo are supported.");
public static final ConfigOption LOOKUP_CACHE_MAX_MEMORY_SIZE =
key("lookup.cache-max-memory-size")
diff --git a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
index 621370b3dcaa6..90ef2002d5c9e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
+++ b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
@@ -20,8 +20,6 @@
import io.airlift.compress.lzo.LzoCompressor;
import io.airlift.compress.lzo.LzoDecompressor;
-import io.airlift.compress.zstd.ZstdCompressor;
-import io.airlift.compress.zstd.ZstdDecompressor;
import javax.annotation.Nullable;
@@ -46,7 +44,7 @@ static BlockCompressionFactory create(String compression) {
case "LZO":
return new AirCompressorFactory(new LzoCompressor(), new LzoDecompressor());
case "ZSTD":
- return new AirCompressorFactory(new ZstdCompressor(), new ZstdDecompressor());
+ return new ZstdBlockCompressionFactory();
default:
throw new IllegalStateException("Unknown CompressionMethod " + compression);
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
new file mode 100644
index 0000000000000..5e97cb3e5089d
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.compression;
+
+/** Implementation of {@link BlockCompressionFactory} for zstd codec. */
+public class ZstdBlockCompressionFactory implements BlockCompressionFactory {
+
+ @Override
+ public BlockCompressor getCompressor() {
+ return new ZstdBlockCompressor();
+ }
+
+ @Override
+ public BlockDecompressor getDecompressor() {
+ return new ZstdBlockDecompressor();
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressor.java b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressor.java
new file mode 100644
index 0000000000000..bd6d2a5a6bfd7
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.compression;
+
+import com.github.luben.zstd.RecyclingBufferPool;
+import com.github.luben.zstd.ZstdOutputStream;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.apache.paimon.compression.CompressorUtils.HEADER_LENGTH;
+
+/** A {@link BlockCompressor} for zstd. */
+public class ZstdBlockCompressor implements BlockCompressor {
+
+ private static final int MAX_BLOCK_SIZE = 128 * 1024;
+
+ @Override
+ public int getMaxCompressedSize(int srcSize) {
+ return HEADER_LENGTH + zstdMaxCompressedLength(srcSize);
+ }
+
+ private int zstdMaxCompressedLength(int uncompressedSize) {
+ // refer to io.airlift.compress.zstd.ZstdCompressor
+ int result = uncompressedSize + (uncompressedSize >>> 8);
+ if (uncompressedSize < MAX_BLOCK_SIZE) {
+ result += (MAX_BLOCK_SIZE - uncompressedSize) >>> 11;
+ }
+ return result;
+ }
+
+ @Override
+ public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+ throws BufferCompressionException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream(dst, dstOff);
+ try (ZstdOutputStream zstdStream =
+ new ZstdOutputStream(stream, RecyclingBufferPool.INSTANCE, 1)) {
+ zstdStream.setWorkers(0);
+ zstdStream.write(src, srcOff, srcLen);
+ } catch (IOException e) {
+ throw new BufferCompressionException(e);
+ }
+ return stream.position() - dstOff;
+ }
+
+ private static class ByteArrayOutputStream extends OutputStream {
+
+ private final byte[] buf;
+ private int position;
+
+ public ByteArrayOutputStream(byte[] buf, int position) {
+ this.buf = buf;
+ this.position = position;
+ }
+
+ @Override
+ public void write(int b) {
+ buf[position] = (byte) b;
+ position += 1;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) {
+ throw new IndexOutOfBoundsException();
+ }
+ try {
+ System.arraycopy(b, off, buf, position, len);
+ } catch (IndexOutOfBoundsException e) {
+ throw new IOException(e);
+ }
+ position += len;
+ }
+
+ int position() {
+ return position;
+ }
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockDecompressor.java b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockDecompressor.java
new file mode 100644
index 0000000000000..e87667416d902
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockDecompressor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.compression;
+
+import com.github.luben.zstd.RecyclingBufferPool;
+import com.github.luben.zstd.ZstdInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+/** A {@link BlockDecompressor} for zstd. */
+public class ZstdBlockDecompressor implements BlockDecompressor {
+
+ @Override
+ public int decompress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+ throws BufferDecompressionException {
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(src, srcOff, srcLen);
+ try (ZstdInputStream decompressorStream =
+ new ZstdInputStream(inputStream, RecyclingBufferPool.INSTANCE)) {
+ int decompressedLen = 0;
+ while (true) {
+ int offset = dstOff + decompressedLen;
+ int count = decompressorStream.read(dst, offset, dst.length - offset);
+ if (count <= 0) {
+ if (decompressorStream.available() != 0) {
+ throw new BufferDecompressionException(
+ "Dst is too small and the decompression was not completed.");
+ }
+ break;
+ }
+ decompressedLen += count;
+ }
+ return decompressedLen;
+ } catch (IOException e) {
+ throw new BufferDecompressionException(e);
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 3e85c7c88bf83..e2052162c5398 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -81,18 +81,23 @@ public static boolean schemaCompatible(
for (DataField field : sourceTableFields) {
int idx = paimonSchema.fieldNames().indexOf(field.name());
if (idx < 0) {
- LOG.info("Cannot find field '{}' in Paimon table.", field.name());
- return false;
- }
- DataType type = paimonSchema.fields().get(idx).type();
- if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
- != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) {
- LOG.info(
- "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.",
- field.name(),
- field.type(),
- type);
- return false;
+ if (!field.type().isNullable()) {
+ LOG.info(
+ "Add column '{}' cannot specify NOT NULL in the Paimon table.",
+ field.name());
+ return false;
+ }
+ } else {
+ DataType type = paimonSchema.fields().get(idx).type();
+ if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type())
+ != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) {
+ LOG.info(
+ "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.",
+ field.name(),
+ field.type(),
+ type);
+ return false;
+ }
}
}
return true;
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index e335fc2be3484..d25cb2b8eb386 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -124,13 +124,13 @@ protected void beforeBuildingSourceSink() throws Exception {
// Check if table exists before trying to get or create it
if (catalog.tableExists(identifier)) {
fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
- fileStoreTable = alterTableOptions(identifier, fileStoreTable);
try {
Schema retrievedSchema = retrieveSchema();
computedColumns =
buildComputedColumns(computedColumnArgs, retrievedSchema.fields());
Schema paimonSchema = buildPaimonSchema(retrievedSchema);
assertSchemaCompatible(fileStoreTable.schema(), paimonSchema.fields());
+ fileStoreTable = alterTableSchema(identifier, fileStoreTable, paimonSchema);
} catch (SchemaRetrievalException e) {
LOG.info(
"Failed to retrieve schema from record data but there exists specified Paimon table. "
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 944185347c353..9f99ad35e67c2 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -27,8 +27,11 @@
import org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -79,7 +82,6 @@ public SynchronizationActionBase(
this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig);
this.syncJobHandler = syncJobHandler;
this.caseSensitive = catalog.caseSensitive();
-
this.syncJobHandler.registerJdbcDriver();
}
@@ -177,7 +179,8 @@ protected abstract void buildSink(
DataStream input,
EventParser.Factory parserFactory);
- protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) {
+ protected FileStoreTable alterTableSchema(
+ Identifier identifier, FileStoreTable table, Schema paimonSchema) {
// doesn't support altering bucket here
Map dynamicOptions = new HashMap<>(tableConfig);
dynamicOptions.remove(CoreOptions.BUCKET.key());
@@ -199,6 +202,14 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable
.map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
+ // alter the table schema
+ List columnChanges =
+ UpdatedDataFieldsProcessFunction.extractSchemaChanges(
+ new SchemaManager(table.fileIO(), table.location()),
+ paimonSchema.fields(),
+ caseSensitive);
+
+ optionChanges.addAll(columnChanges);
try {
catalog.alterTable(identifier, optionChanges, false);
} catch (Catalog.TableNotExistException
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index a33f2c978321e..2059293385fbd 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -160,7 +160,7 @@ protected void beforeBuildingSourceSink() throws Exception {
Supplier errMsg =
incompatibleMessage(table.schema(), tableInfo, identifier);
if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
- table = alterTableOptions(identifier, table);
+ table = alterTableSchema(identifier, table, fromMySql);
tables.add(table);
monitoredTables.addAll(tableInfo.identifiers());
} else {
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java
index 1f3ca3035975b..d677b0e481d1d 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresTypeUtils.java
@@ -136,18 +136,14 @@ public static DataType toDataType(
return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18));
case PG_CHAR:
case PG_CHARACTER:
- return DataTypes.CHAR(precision);
- case PG_CHAR_ARRAY:
- case PG_CHARACTER_ARRAY:
- return DataTypes.ARRAY(DataTypes.CHAR(precision));
case PG_CHARACTER_VARYING:
- return DataTypes.VARCHAR(precision);
- case PG_CHARACTER_VARYING_ARRAY:
- return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
case PG_TEXT:
case PG_JSON:
case PG_ENUM:
return DataTypes.STRING();
+ case PG_CHAR_ARRAY:
+ case PG_CHARACTER_ARRAY:
+ case PG_CHARACTER_VARYING_ARRAY:
case PG_TEXT_ARRAY:
return DataTypes.ARRAY(DataTypes.STRING());
case PG_TIMESTAMP:
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 3d832d33949b6..7cf5fb2aea3ca 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -192,8 +192,13 @@ public static ConvertAction canConvert(DataType oldType, DataType newType) {
return ConvertAction.EXCEPTION;
}
- protected List extractSchemaChanges(
+ public List extractSchemaChanges(
SchemaManager schemaManager, List updatedDataFields) {
+ return extractSchemaChanges(schemaManager, updatedDataFields, caseSensitive);
+ }
+
+ public static List extractSchemaChanges(
+ SchemaManager schemaManager, List updatedDataFields, boolean caseSensitive) {
RowType oldRowType = schemaManager.latest().get().logicalRowType();
Map oldFields = new HashMap<>();
for (DataField oldField : oldRowType.getFields()) {
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 9568b1c3b7fa9..b9fd80cd842b5 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -1361,8 +1361,6 @@ public void testColumnCommentChangeInExistingTable() throws Exception {
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "test_exist_column_comment_change");
- // Flink cdc 2.3 does not support collecting field comments, and existing paimon table field
- // comments will not be changed.
MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
.withPrimaryKeys("pk")
@@ -1374,13 +1372,96 @@ public void testColumnCommentChangeInExistingTable() throws Exception {
Map actual =
table.schema().fields().stream()
.collect(Collectors.toMap(DataField::name, Function.identity()));
- assertThat(actual.get("pk").description()).isEqualTo("pk comment");
- assertThat(actual.get("c1").description()).isEqualTo("c1 comment");
+ assertThat(actual.get("pk").description()).isEqualTo("pk new_comment");
+ assertThat(actual.get("c1").description()).isEqualTo("c1 new_comment");
assertThat(actual.get("c2").description()).isEqualTo("c2 comment");
}
@Test
@Timeout(60)
+ public void testColumnAlterInExistingTableBeforeStartJob() throws Exception {
+ Map options = new HashMap<>();
+ options.put("bucket", "1");
+ options.put("sink.parallelism", "1");
+
+ RowType rowType =
+ RowType.builder()
+ .field("pk", DataTypes.INT().notNull())
+ .field("a", DataTypes.BIGINT())
+ .field("b", DataTypes.VARCHAR(20))
+ .build();
+
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("pk"),
+ Collections.emptyList(),
+ options);
+
+ Map mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "test_exist_column_alter");
+
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+ .withPrimaryKeys("pk")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+
+ runActionWithDefaultEnv(action);
+
+ FileStoreTable table = getFileStoreTable();
+
+ Map actual =
+ table.schema().fields().stream()
+ .collect(Collectors.toMap(DataField::name, Function.identity()));
+
+ assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull());
+ assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT());
+ assertThat(actual.get("b").type()).isEqualTo(DataTypes.VARCHAR(30));
+ assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT());
+ }
+
+ @Test
+ @Timeout(60)
+ public void testAssertSchemaCompatibleWithAddColumnISNOTNULL() throws Exception {
+ Map options = new HashMap<>();
+ options.put("bucket", "1");
+ options.put("sink.parallelism", "1");
+
+ RowType rowType =
+ RowType.builder()
+ .field("pk", DataTypes.INT().notNull())
+ .field("a", DataTypes.BIGINT())
+ .field("b", DataTypes.VARCHAR(20))
+ .build();
+
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("pk"),
+ Collections.emptyList(),
+ options);
+
+ Map mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "assert_schema_compatible");
+
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+ .withPrimaryKeys("pk")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+
+ assertThatThrownBy(action::run)
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Paimon schema and source table schema are not compatible.\n"
+ + "Paimon fields are: [`pk` INT NOT NULL, `a` BIGINT, `b` VARCHAR(20)].\n"
+ + "Source table fields are: [`pk` INT NOT NULL '', `a` BIGINT '', `b` VARCHAR(30) '', `c` INT NOT NULL 'Add column cannot specify NOT NULL in the Paimon table']"));
+ }
+
public void testWriteOnlyAndSchemaEvolution() throws Exception {
Map mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "write_only_and_schema_evolution");
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
index b5b36888ebe06..8dd9bb7d92fac 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
@@ -20,6 +20,7 @@
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -38,6 +39,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
@@ -77,7 +80,7 @@ public void testSchemaEvolution() throws Exception {
checkTableSchema(
"[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT NULL\"},"
+ "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT NULL\"},"
- + "{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\"}]");
+ + "{\"id\":2,\"name\":\"v1\",\"type\":\"STRING\"}]");
try (Statement statement = getStatement(DATABASE_NAME)) {
testSchemaEvolutionImpl(statement);
@@ -245,9 +248,9 @@ public void testMultipleSchemaEvolutions() throws Exception {
checkTableSchema(
"[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT NULL\"},"
- + "{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\"},"
+ + "{\"id\":1,\"name\":\"v1\",\"type\":\"STRING\"},"
+ "{\"id\":2,\"name\":\"v2\",\"type\":\"INT\"},"
- + "{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\"}]");
+ + "{\"id\":3,\"name\":\"v3\",\"type\":\"STRING\"}]");
try (Statement statement = getStatement(DATABASE_NAME)) {
testSchemaEvolutionMultipleImpl(statement);
@@ -786,6 +789,50 @@ public void testCatalogAndTableConfig() {
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}
+ @Test
+ @Timeout(60)
+ public void testColumnAlterInExistingTableWhenStartJob() throws Exception {
+ String tableName = "test_exist_column_alter";
+ Map options = new HashMap<>();
+ options.put("bucket", "1");
+ options.put("sink.parallelism", "1");
+
+ RowType rowType =
+ RowType.builder()
+ .field("pk", DataTypes.INT().notNull())
+ .field("a", DataTypes.BIGINT())
+ .field("b", DataTypes.STRING())
+ .build();
+
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("pk"),
+ Collections.emptyList(),
+ options);
+
+ Map postgresConfig = getBasicPostgresConfig();
+ postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
+ postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
+ postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), tableName);
+
+ PostgresSyncTableAction action =
+ syncTableActionBuilder(postgresConfig).withPrimaryKeys("pk").build();
+
+ runActionWithDefaultEnv(action);
+
+ FileStoreTable table = getFileStoreTable();
+
+ Map actual =
+ table.schema().fields().stream()
+ .collect(Collectors.toMap(DataField::name, Function.identity()));
+
+ assertThat(actual.get("pk").type()).isEqualTo(DataTypes.INT().notNull());
+ assertThat(actual.get("a").type()).isEqualTo(DataTypes.BIGINT());
+ assertThat(actual.get("b").type()).isEqualTo(DataTypes.STRING());
+ assertThat(actual.get("c").type()).isEqualTo(DataTypes.INT());
+ }
+
private FileStoreTable getFileStoreTable() throws Exception {
return getFileStoreTable(tableName);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index 676185fb9291f..c073eea3a37f4 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -317,12 +317,28 @@ CREATE TABLE test_exist_options_change (
);
CREATE TABLE test_exist_column_comment_change (
- pk INT,
- c1 DATE,
- c2 VARCHAR(10) not null comment 'c2 comment',
+ pk INT comment 'pk new_comment',
+ c1 DATE comment 'c1 new_comment',
+ c2 VARCHAR(10) NOT NULL comment 'c2 comment',
PRIMARY KEY (pk)
);
+CREATE TABLE test_exist_column_alter (
+ pk INT,
+ a BIGINT,
+ b VARCHAR(30),
+ c INT,
+ PRIMARY KEY (pk)
+);
+
+CREATE TABLE assert_schema_compatible (
+ pk INT,
+ a BIGINT,
+ b VARCHAR(30),
+ c INT NOT NULL comment 'Add column cannot specify NOT NULL in the Paimon table',
+ PRIMARY KEY (pk)
+);
+
-- ################################################################################
-- testSyncShard
-- ################################################################################
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql
index 373eb3880f403..a451cb1c7b6ad 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_table_setup.sql
@@ -229,6 +229,15 @@ CREATE TABLE test_options_change (
ALTER TABLE test_options_change
REPLICA IDENTITY FULL;
+CREATE TABLE test_exist_column_alter (
+ pk INT,
+ a BIGINT,
+ b VARCHAR(30),
+ c INT,
+ PRIMARY KEY (pk)
+);
+
+
-- ################################################################################
-- testMetadataColumns
-- ################################################################################
diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml
index bd20fa65d9bb3..57467f8513e77 100644
--- a/paimon-format/pom.xml
+++ b/paimon-format/pom.xml
@@ -37,7 +37,6 @@ under the License.
2.5
1.6
3.12.0
- 1.5.5-11
2.8.1
3.19.6
diff --git a/pom.xml b/pom.xml
index d7da87d65bc63..80dc1b0c4f1a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@ under the License.
3.0.1-1.18
+ 1.5.5-11
3.0.11
3.4.6
2.3.1