Skip to content

Commit

Permalink
supported schemaEvolution when restarting the paimon cdc job
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored and MOBIN-F committed Jul 10, 2024
1 parent 06c0f4e commit b7df66d
Show file tree
Hide file tree
Showing 19 changed files with 411 additions and 54 deletions.
22 changes: 11 additions & 11 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@
<td><h5>file.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
<td>String</td>
<td>Default file compression. For faster read and write, it is recommended to use LZ4.</td>
<td>Default file compression. For faster read and write, it is recommended to use zstd.</td>
</tr>
<tr>
<td><h5>file.compression.per.level</h5></td>
Expand Down Expand Up @@ -359,9 +359,9 @@
</tr>
<tr>
<td><h5>lookup.cache-spill-compression</h5></td>
<td style="word-wrap: break-word;">"lz4"</td>
<td style="word-wrap: break-word;">"zstd"</td>
<td>String</td>
<td>Spill compression for lookup cache, currently none, lz4, lzo and zstd are supported.</td>
<td>Spill compression for lookup cache, currently zstd, none, lz4 and lzo are supported.</td>
</tr>
<tr>
<td><h5>lookup.cache.bloom.filter.enabled</h5></td>
Expand Down Expand Up @@ -485,18 +485,18 @@
<td>String</td>
<td>The default partition name in case the dynamic partition column value is null/empty string.</td>
</tr>
<tr>
<td><h5>partition.expiration-strategy</h5></td>
<td style="word-wrap: break-word;">values-time</td>
<td>String</td>
<td>Specifies the expiration strategy for partition expiration.<br />Possible values:<ul><li>values-time: A partition expiration policy that compares the time extracted from the partition value with the current time.</li></ul><ul><li>update-time: A partition expiration policy that compares the last update time of the partition with the current time.</li></ul><br /><br />Possible values:<ul><li>"values-time": The strategy compares the time extracted from the partition value with the current time.</li><li>"update-time": The strategy compares the last update time of the partition with the current time.</li></ul></td>
</tr>
<tr>
<td><h5>partition.expiration-check-interval</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The check interval of partition expiration.</td>
</tr>
<tr>
<td><h5>partition.expiration-strategy</h5></td>
<td style="word-wrap: break-word;">values-time</td>
<td><p>Enum</p></td>
<td>Specifies the expiration strategy for partition expiration.<br />Possible values:<ul><li>values-time: A partition expiration policy that compares the time extracted from the partition value with the current time.</li></ul><ul><li>update-time: A partition expiration policy that compares the last update time of the partition with the current time.</li></ul><br /><br />Possible values:<ul><li>"values-time": The strategy compares the time extracted from the partition value with the current time.</li><li>"update-time": The strategy compares the last update time of the partition with the current time.</li></ul></td>
</tr>
<tr>
<td><h5>partition.expiration-time</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -710,9 +710,9 @@
</tr>
<tr>
<td><h5>spill-compression</h5></td>
<td style="word-wrap: break-word;">"LZ4"</td>
<td style="word-wrap: break-word;">"zstd"</td>
<td>String</td>
<td>Compression for spill, currently lz4, lzo and zstd are supported.</td>
<td>Compression for spill, currently zstd, lzo and zstd are supported.</td>
</tr>
<tr>
<td><h5>streaming-read-mode</h5></td>
Expand Down
6 changes: 6 additions & 0 deletions paimon-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ under the License.
<version>${lz4.version}</version>
</dependency>

<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
</dependency>

<!-- Java compiler -->
<dependency>
<groupId>org.codehaus.janino</groupId>
Expand Down
10 changes: 5 additions & 5 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public class CoreOptions implements Serializable {
.stringType()
.defaultValue("zstd")
.withDescription(
"Default file compression. For faster read and write, it is recommended to use LZ4.");
"Default file compression. For faster read and write, it is recommended to use zstd.");

public static final ConfigOption<Integer> FILE_COMPRESSION_ZSTD_LEVEL =
key("file.compression.zstd-level")
Expand Down Expand Up @@ -344,9 +344,9 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<String> SPILL_COMPRESSION =
key("spill-compression")
.stringType()
.defaultValue("LZ4")
.defaultValue("zstd")
.withDescription(
"Compression for spill, currently lz4, lzo and zstd are supported.");
"Compression for spill, currently zstd, lzo and zstd are supported.");

public static final ConfigOption<Boolean> WRITE_ONLY =
key("write-only")
Expand Down Expand Up @@ -814,9 +814,9 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<String> LOOKUP_CACHE_SPILL_COMPRESSION =
key("lookup.cache-spill-compression")
.stringType()
.defaultValue("lz4")
.defaultValue("zstd")
.withDescription(
"Spill compression for lookup cache, currently none, lz4, lzo and zstd are supported.");
"Spill compression for lookup cache, currently zstd, none, lz4 and lzo are supported.");

public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_MEMORY_SIZE =
key("lookup.cache-max-memory-size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import io.airlift.compress.lzo.LzoCompressor;
import io.airlift.compress.lzo.LzoDecompressor;
import io.airlift.compress.zstd.ZstdCompressor;
import io.airlift.compress.zstd.ZstdDecompressor;

import javax.annotation.Nullable;

Expand All @@ -46,7 +44,7 @@ static BlockCompressionFactory create(String compression) {
case "LZO":
return new AirCompressorFactory(new LzoCompressor(), new LzoDecompressor());
case "ZSTD":
return new AirCompressorFactory(new ZstdCompressor(), new ZstdDecompressor());
return new ZstdBlockCompressionFactory();
default:
throw new IllegalStateException("Unknown CompressionMethod " + compression);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.compression;

/** Implementation of {@link BlockCompressionFactory} for zstd codec. */
public class ZstdBlockCompressionFactory implements BlockCompressionFactory {

@Override
public BlockCompressor getCompressor() {
return new ZstdBlockCompressor();
}

@Override
public BlockDecompressor getDecompressor() {
return new ZstdBlockDecompressor();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.compression;

import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdOutputStream;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import static org.apache.paimon.compression.CompressorUtils.HEADER_LENGTH;

/** A {@link BlockCompressor} for zstd. */
public class ZstdBlockCompressor implements BlockCompressor {

private static final int MAX_BLOCK_SIZE = 128 * 1024;

@Override
public int getMaxCompressedSize(int srcSize) {
return HEADER_LENGTH + zstdMaxCompressedLength(srcSize);
}

private int zstdMaxCompressedLength(int uncompressedSize) {
// refer to io.airlift.compress.zstd.ZstdCompressor
int result = uncompressedSize + (uncompressedSize >>> 8);
if (uncompressedSize < MAX_BLOCK_SIZE) {
result += (MAX_BLOCK_SIZE - uncompressedSize) >>> 11;
}
return result;
}

@Override
public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
throws BufferCompressionException {
ByteArrayOutputStream stream = new ByteArrayOutputStream(dst, dstOff);
try (ZstdOutputStream zstdStream =
new ZstdOutputStream(stream, RecyclingBufferPool.INSTANCE, 1)) {
zstdStream.setWorkers(0);
zstdStream.write(src, srcOff, srcLen);
} catch (IOException e) {
throw new BufferCompressionException(e);
}
return stream.position() - dstOff;
}

private static class ByteArrayOutputStream extends OutputStream {

private final byte[] buf;
private int position;

public ByteArrayOutputStream(byte[] buf, int position) {
this.buf = buf;
this.position = position;
}

@Override
public void write(int b) {
buf[position] = (byte) b;
position += 1;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) {
throw new IndexOutOfBoundsException();
}
try {
System.arraycopy(b, off, buf, position, len);
} catch (IndexOutOfBoundsException e) {
throw new IOException(e);
}
position += len;
}

int position() {
return position;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.compression;

import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStream;

import java.io.ByteArrayInputStream;
import java.io.IOException;

/** A {@link BlockDecompressor} for zstd. */
public class ZstdBlockDecompressor implements BlockDecompressor {

@Override
public int decompress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
throws BufferDecompressionException {
ByteArrayInputStream inputStream = new ByteArrayInputStream(src, srcOff, srcLen);
try (ZstdInputStream decompressorStream =
new ZstdInputStream(inputStream, RecyclingBufferPool.INSTANCE)) {
int decompressedLen = 0;
while (true) {
int offset = dstOff + decompressedLen;
int count = decompressorStream.read(dst, offset, dst.length - offset);
if (count <= 0) {
if (decompressorStream.available() != 0) {
throw new BufferDecompressionException(
"Dst is too small and the decompression was not completed.");
}
break;
}
decompressedLen += count;
}
return decompressedLen;
} catch (IOException e) {
throw new BufferDecompressionException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,23 @@ public static boolean schemaCompatible(
for (DataField field : sourceTableFields) {
int idx = paimonSchema.fieldNames().indexOf(field.name());
if (idx < 0) {
LOG.info("Cannot find field '{}' in Paimon table.", field.name());
return false;
}
DataType type = paimonSchema.fields().get(idx).type();
if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
!= UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) {
LOG.info(
"Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.",
field.name(),
field.type(),
type);
return false;
if (!field.type().isNullable()) {
LOG.info(
"Add column '{}' cannot specify NOT NULL in the Paimon table.",
field.name());
return false;
}
} else {
DataType type = paimonSchema.fields().get(idx).type();
if (UpdatedDataFieldsProcessFunction.canConvert(type, field.type())
!= UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) {
LOG.info(
"Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.",
field.name(),
field.type(),
type);
return false;
}
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ protected void beforeBuildingSourceSink() throws Exception {
// Check if table exists before trying to get or create it
if (catalog.tableExists(identifier)) {
fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
fileStoreTable = alterTableOptions(identifier, fileStoreTable);
try {
Schema retrievedSchema = retrieveSchema();
computedColumns =
buildComputedColumns(computedColumnArgs, retrievedSchema.fields());
Schema paimonSchema = buildPaimonSchema(retrievedSchema);
assertSchemaCompatible(fileStoreTable.schema(), paimonSchema.fields());
fileStoreTable = alterTableSchema(identifier, fileStoreTable, paimonSchema);
} catch (SchemaRetrievalException e) {
LOG.info(
"Failed to retrieve schema from record data but there exists specified Paimon table. "
Expand Down
Loading

0 comments on commit b7df66d

Please sign in to comment.