Skip to content

Commit 848c187

Browse files
committed
Upgrade debezium version to 2.5.4.Final
1 parent 570fec2 commit 848c187

File tree

4 files changed

+22
-22
lines changed

4 files changed

+22
-22
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ flexible messaging model and an intuitive client API.</description>
197197
<opensearch.version>1.2.4</opensearch.version>
198198
<elasticsearch-java.version>8.12.1</elasticsearch-java.version>
199199
<trino.version>368</trino.version>
200-
<debezium.version>1.9.7.Final</debezium.version>
200+
<debezium.version>2.5.4.Final</debezium.version>
201201
<debezium.postgresql.version>42.5.5</debezium.postgresql.version>
202202
<debezium.mysql.version>8.0.30</debezium.mysql.version>
203203
<!-- Override version that brings CVE-2022-3143 with debezium -->

pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
8888
setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
8989

9090
// database.history : implementation class for database history.
91-
setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);
91+
setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), DEFAULT_HISTORY);
9292

9393
// database.history.pulsar.service.url
9494
String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name());

pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
import io.debezium.config.Configuration;
2727
import io.debezium.config.Field;
2828
import io.debezium.document.DocumentReader;
29-
import io.debezium.relational.history.AbstractDatabaseHistory;
30-
import io.debezium.relational.history.DatabaseHistory;
31-
import io.debezium.relational.history.DatabaseHistoryException;
32-
import io.debezium.relational.history.DatabaseHistoryListener;
29+
import io.debezium.relational.history.AbstractSchemaHistory;
30+
import io.debezium.relational.history.SchemaHistory;
31+
import io.debezium.relational.history.SchemaHistoryException;
32+
import io.debezium.relational.history.SchemaHistoryListener;
3333
import io.debezium.relational.history.HistoryRecord;
3434
import io.debezium.relational.history.HistoryRecordComparator;
3535
import java.io.IOException;
@@ -52,12 +52,12 @@
5252
import org.apache.pulsar.client.api.Schema;
5353

5454
/**
55-
* A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified
55+
* A {@link SchemaHistory} implementation that records schema changes as normal pulsar messages on the specified
5656
* topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic.
5757
*/
5858
@Slf4j
5959
@ThreadSafe
60-
public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
60+
public final class PulsarDatabaseHistory extends AbstractSchemaHistory {
6161

6262
public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic")
6363
.withDisplayName("Database history topic name")
@@ -97,7 +97,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
9797
TOPIC,
9898
SERVICE_URL,
9999
CLIENT_BUILDER,
100-
DatabaseHistory.NAME,
100+
SchemaHistory.NAME,
101101
READER_CONFIG);
102102

103103
private final ObjectMapper mapper = new ObjectMapper();
@@ -113,7 +113,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
113113
public void configure(
114114
Configuration config,
115115
HistoryRecordComparator comparator,
116-
DatabaseHistoryListener listener,
116+
SchemaHistoryListener listener,
117117
boolean useCatalogBeforeSchema) {
118118
super.configure(config, comparator, listener, useCatalogBeforeSchema);
119119
if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
@@ -148,7 +148,7 @@ public void configure(
148148
}
149149

150150
// Copy the relevant portions of the configuration and add useful defaults ...
151-
this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
151+
this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString());
152152

153153
log.info("Configure to store the debezium database history {} to pulsar topic {}",
154154
dbHistoryName, topicName);
@@ -201,7 +201,7 @@ public void start() {
201201
}
202202

203203
@Override
204-
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
204+
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
205205
if (this.producer == null) {
206206
throw new IllegalStateException("No producer is available. Ensure that 'start()'"
207207
+ " is called before storing database history records.");
@@ -212,7 +212,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException
212212
try {
213213
producer.send(record.toString());
214214
} catch (PulsarClientException e) {
215-
throw new DatabaseHistoryException(e);
215+
throw new SchemaHistoryException(e);
216216
}
217217
}
218218

pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
2828
import io.debezium.relational.Tables;
2929
import io.debezium.relational.ddl.DdlParser;
30-
import io.debezium.relational.history.DatabaseHistory;
31-
import io.debezium.relational.history.DatabaseHistoryListener;
30+
import io.debezium.relational.history.SchemaHistory;
31+
import io.debezium.relational.history.SchemaHistoryListener;
3232
import io.debezium.text.ParsingException;
3333
import io.debezium.util.Collect;
3434

@@ -80,8 +80,8 @@ protected void cleanup() throws Exception {
8080
private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder, boolean testWithReaderConfig) throws Exception {
8181
Configuration.Builder configBuidler = Configuration.create()
8282
.with(PulsarDatabaseHistory.TOPIC, topicName)
83-
.with(DatabaseHistory.NAME, "my-db-history")
84-
.with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL);
83+
.with(SchemaHistory.NAME, "my-db-history")
84+
.with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL);
8585

8686
if (testWithClientBuilder) {
8787
ClientBuilder builder = PulsarClient.builder().serviceUrl(brokerUrl.toString());
@@ -101,7 +101,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
101101
}
102102

103103
// Start up the history ...
104-
history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
104+
history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true);
105105
history.start();
106106

107107
// Should be able to call start more than once ...
@@ -160,7 +160,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
160160
// Stop the history (which should stop the producer) ...
161161
history.stop();
162162
history = new PulsarDatabaseHistory();
163-
history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
163+
history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true);
164164
// no need to start
165165

166166
// Recover from the very beginning to just past the first change ...
@@ -240,11 +240,11 @@ public void testExists() throws Exception {
240240
Configuration config = Configuration.create()
241241
.with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
242242
.with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic")
243-
.with(DatabaseHistory.NAME, "my-db-history")
244-
.with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
243+
.with(SchemaHistory.NAME, "my-db-history")
244+
.with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
245245
.build();
246246

247-
history.configure(config, null, DatabaseHistoryListener.NOOP, true);
247+
history.configure(config, null, SchemaHistoryListener.NOOP, true);
248248
history.start();
249249

250250
// dummytopic should not exist yet

0 commit comments

Comments
 (0)