26
26
import io .debezium .config .Configuration ;
27
27
import io .debezium .config .Field ;
28
28
import io .debezium .document .DocumentReader ;
29
- import io .debezium .relational .history .AbstractDatabaseHistory ;
30
- import io .debezium .relational .history .DatabaseHistory ;
31
- import io .debezium .relational .history .DatabaseHistoryException ;
32
- import io .debezium .relational .history .DatabaseHistoryListener ;
29
+ import io .debezium .relational .history .AbstractSchemaHistory ;
33
30
import io .debezium .relational .history .HistoryRecord ;
34
31
import io .debezium .relational .history .HistoryRecordComparator ;
32
+ import io .debezium .relational .history .SchemaHistory ;
33
+ import io .debezium .relational .history .SchemaHistoryException ;
34
+ import io .debezium .relational .history .SchemaHistoryListener ;
35
35
import java .io .IOException ;
36
36
import java .util .Collections ;
37
37
import java .util .HashMap ;
52
52
import org .apache .pulsar .client .api .Schema ;
53
53
54
54
/**
55
- * A {@link DatabaseHistory } implementation that records schema changes as normal pulsar messages on the specified
55
+ * A {@link SchemaHistory } implementation that records schema changes as normal pulsar messages on the specified
56
56
* topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic.
57
57
*/
58
58
@ Slf4j
59
59
@ ThreadSafe
60
- public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
60
+ public final class PulsarDatabaseHistory extends AbstractSchemaHistory {
61
61
62
62
public static final Field TOPIC = Field .create (CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic" )
63
63
.withDisplayName ("Database history topic name" )
@@ -97,7 +97,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
97
97
TOPIC ,
98
98
SERVICE_URL ,
99
99
CLIENT_BUILDER ,
100
- DatabaseHistory .NAME ,
100
+ SchemaHistory .NAME ,
101
101
READER_CONFIG );
102
102
103
103
private final ObjectMapper mapper = new ObjectMapper ();
@@ -113,7 +113,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
113
113
public void configure (
114
114
Configuration config ,
115
115
HistoryRecordComparator comparator ,
116
- DatabaseHistoryListener listener ,
116
+ SchemaHistoryListener listener ,
117
117
boolean useCatalogBeforeSchema ) {
118
118
super .configure (config , comparator , listener , useCatalogBeforeSchema );
119
119
if (!config .validateAndRecord (ALL_FIELDS , logger ::error )) {
@@ -148,7 +148,7 @@ public void configure(
148
148
}
149
149
150
150
// Copy the relevant portions of the configuration and add useful defaults ...
151
- this .dbHistoryName = config .getString (DatabaseHistory .NAME , UUID .randomUUID ().toString ());
151
+ this .dbHistoryName = config .getString (SchemaHistory .NAME , UUID .randomUUID ().toString ());
152
152
153
153
log .info ("Configure to store the debezium database history {} to pulsar topic {}" ,
154
154
dbHistoryName , topicName );
@@ -201,7 +201,7 @@ public void start() {
201
201
}
202
202
203
203
@ Override
204
- protected void storeRecord (HistoryRecord record ) throws DatabaseHistoryException {
204
+ protected void storeRecord (HistoryRecord record ) throws SchemaHistoryException {
205
205
if (this .producer == null ) {
206
206
throw new IllegalStateException ("No producer is available. Ensure that 'start()'"
207
207
+ " is called before storing database history records." );
@@ -212,7 +212,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException
212
212
try {
213
213
producer .send (record .toString ());
214
214
} catch (PulsarClientException e ) {
215
- throw new DatabaseHistoryException (e );
215
+ throw new SchemaHistoryException (e );
216
216
}
217
217
}
218
218
0 commit comments