diff --git a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumSource.java b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumSource.java new file mode 100644 index 0000000000000..d1694d6bfd4a3 --- /dev/null +++ b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumSource.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.debezium.oracle; + +import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.kafka.connect.KafkaConnectSource; +import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig; + +import java.util.Map; + +@Slf4j +public abstract class DebeziumSource extends KafkaConnectSource { + private static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter"; + private static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.oracle.PulsarDatabaseHistory"; + private static final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic"; + private static final String DEFAULT_HISTORY_TOPIC = "debezium-history-topic"; + + public static void throwExceptionIfConfigNotMatch(Map config, + String key, + String value) throws IllegalArgumentException { + Object orig = config.get(key); + if (orig == null) { + config.put(key, value); + return; + } + + // throw exception if value not match + if (!orig.equals(value)) { + throw new IllegalArgumentException("Expected " + value + " but has " + orig); + } + } + + public static void setConfigIfNull(Map config, String key, String value) { + config.putIfAbsent(key, value); + } + + // namespace for output topics, default value is "tenant/namespace" + public static String topicNamespace(SourceContext sourceContext) { + String tenant = sourceContext.getTenant(); + String namespace = sourceContext.getNamespace(); + + return (StringUtils.isEmpty(tenant) ? TopicName.PUBLIC_TENANT : tenant) + "/" + + (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace); + } + + public static void tryLoadingConfigSecret(String secretName, Map config, SourceContext context) { + try { + String secret = context.getSecret(secretName); + if (secret != null) { + config.put(secretName, secret); + log.info("Config key {} set from secret.", secretName); + } + } catch (Exception e) { + log.warn("Failed to read secret {}.", secretName, e); + } + } + + public abstract void setDbConnectorTask(Map config) throws Exception; + + @Override + public void open(Map config, SourceContext sourceContext) throws Exception { + setDbConnectorTask(config); + tryLoadingConfigSecret("database.user", config, sourceContext); + tryLoadingConfigSecret("database.password", config, sourceContext); + + // key.converter + setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); + // value.converter + setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); + + // database.history : implementation class for database history. + setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), DEFAULT_HISTORY); + + // database.history.pulsar.service.url + String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name()); + + String topicNamespace = topicNamespace(sourceContext); + // topic.namespace + setConfigIfNull(config, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace); + + String sourceName = sourceContext.getSourceName(); + // database.history.pulsar.topic: history topic name + setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(), + topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC); + // offset.storage.topic: offset topic name + setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, + topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC); + + // pass pulsar.client.builder if database.history.pulsar.service.url is not provided + if (StringUtils.isEmpty(pulsarUrl)) { + String pulsarClientBuilder = SerDeUtils.serialize(sourceContext.getPulsarClientBuilder()); + config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(), pulsarClientBuilder); + } + + super.open(config, sourceContext); + } + +} diff --git a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/PulsarDatabaseHistory.java b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/PulsarDatabaseHistory.java new file mode 100644 index 0000000000000..31e7c90accbf0 --- /dev/null +++ b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/PulsarDatabaseHistory.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.debezium.oracle; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import io.debezium.annotation.ThreadSafe; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.document.DocumentReader; +import io.debezium.relational.history.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.pulsar.client.api.*; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; + +import static org.apache.commons.lang3.StringUtils.isBlank; + +/** + * A {@link SchemaHistory} implementation that records schema changes as normal pulsar messages on the specified + * topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic. + */ +@Slf4j +@ThreadSafe +public final class PulsarDatabaseHistory extends AbstractSchemaHistory { + + public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic") + .withDisplayName("Database history topic name") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("The name of the topic for the database schema history") + .withValidation(Field::isRequired); + + public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url") + .withDisplayName("Pulsar service url") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("Pulsar service url") + .withValidation(Field::isOptional); + + public static final Field CLIENT_BUILDER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.client.builder") + .withDisplayName("Pulsar client builder") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("Pulsar client builder") + .withValidation(Field::isOptional); + + public static final Field READER_CONFIG = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.reader.config") + .withDisplayName("Extra configs of the reader") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("The configs of the reader for the database schema history topic, " + + "in the form of a JSON string with key-value pairs") + .withDefault((String) null) + .withValidation(Field::isOptional); + + public static final Field.Set ALL_FIELDS = Field.setOf( + TOPIC, + SERVICE_URL, + CLIENT_BUILDER, + SchemaHistory.NAME, + READER_CONFIG); + + private final ObjectMapper mapper = new ObjectMapper(); + private final DocumentReader reader = DocumentReader.defaultReader(); + private String topicName; + private Map readerConfigMap = new HashMap<>(); + private String dbHistoryName; + private ClientBuilder clientBuilder; + private volatile PulsarClient pulsarClient; + private volatile Producer producer; + + @Override + public void configure( + Configuration config, + HistoryRecordComparator comparator, + SchemaHistoryListener listener, + boolean useCatalogBeforeSchema) { + super.configure(config, comparator, listener, useCatalogBeforeSchema); + if (!config.validateAndRecord(ALL_FIELDS, logger::error)) { + throw new IllegalArgumentException("Error configuring an instance of " + + getClass().getSimpleName() + "; check the logs for details"); + } + this.topicName = config.getString(TOPIC); + try { + final String configString = config.getString(READER_CONFIG); + if (configString == null) { + this.readerConfigMap = Collections.emptyMap(); + } else { + this.readerConfigMap = mapper.readValue(configString, Map.class); + } + + } catch (JsonProcessingException exception) { + log.warn("The provided reader configs are invalid, " + + "will not passing any extra config to the reader builder.", exception); + } + + String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER); + if (isBlank(clientBuilderBase64Encoded) && isBlank(config.getString(SERVICE_URL))) { + throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided."); + } + this.clientBuilder = PulsarClient.builder(); + if (!isBlank(clientBuilderBase64Encoded)) { + // deserialize the client builder to the same classloader + this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded, + this.clientBuilder.getClass().getClassLoader()); + } else { + this.clientBuilder.serviceUrl(config.getString(SERVICE_URL)); + } + + // Copy the relevant portions of the configuration and add useful defaults ... + this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString()); + + log.info("Configure to store the debezium database history {} to pulsar topic {}", + dbHistoryName, topicName); + } + + @Override + public void initializeStorage() { + super.initializeStorage(); + + // try simple to publish an empty string to create topic + try (Producer p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) { + p.send(""); + } catch (PulsarClientException pce) { + log.error("Failed to initialize storage", pce); + throw new RuntimeException("Failed to initialize storage", pce); + } + } + + void setupClientIfNeeded() { + if (null == this.pulsarClient) { + try { + pulsarClient = clientBuilder.build(); + } catch (PulsarClientException e) { + throw new RuntimeException("Failed to create pulsar client to pulsar cluster", e); + } + } + } + + void setupProducerIfNeeded() { + setupClientIfNeeded(); + if (null == this.producer) { + try { + this.producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .producerName(dbHistoryName) + .blockIfQueueFull(true) + .create(); + } catch (PulsarClientException e) { + log.error("Failed to create pulsar producer to topic '{}'", topicName); + throw new RuntimeException("Failed to create pulsar producer to topic '" + + topicName, e); + } + } + } + + @Override + public void start() { + super.start(); + setupProducerIfNeeded(); + } + + @Override + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { + if (this.producer == null) { + throw new IllegalStateException("No producer is available. Ensure that 'start()'" + + " is called before storing database history records."); + } + if (log.isTraceEnabled()) { + log.trace("Storing record into database history: {}", record); + } + try { + producer.send(record.toString()); + } catch (PulsarClientException e) { + throw new SchemaHistoryException(e); + } + } + + @Override + public void stop() { + try { + if (this.producer != null) { + try { + producer.flush(); + } catch (PulsarClientException pce) { + // ignore the error to ensure the client is eventually closed + } finally { + this.producer.close(); + } + this.producer = null; + } + if (this.pulsarClient != null) { + pulsarClient.close(); + this.pulsarClient = null; + } + } catch (PulsarClientException pe) { + log.warn("Failed to closing pulsar client", pe); + } + } + + @Override + protected void recoverRecords(Consumer records) { + setupClientIfNeeded(); + try (Reader historyReader = createHistoryReader()) { + log.info("Scanning the database history topic '{}'", topicName); + + // Read all messages in the topic ... + MessageId lastProcessedMessageId = null; + + // read the topic until the end + while (historyReader.hasMessageAvailable()) { + Message msg = historyReader.readNext(); + try { + if (null == lastProcessedMessageId || lastProcessedMessageId.compareTo(msg.getMessageId()) < 0) { + if (!isBlank(msg.getValue())) { + HistoryRecord recordObj = new HistoryRecord(reader.read(msg.getValue())); + if (log.isTraceEnabled()) { + log.trace("Recovering database history: {}", recordObj); + } + if (!recordObj.isValid()) { + log.warn("Skipping invalid database history record '{}'. This is often not an issue," + + " but if it happens repeatedly please check the '{}' topic.", + recordObj, topicName); + } else { + records.accept(recordObj); + log.trace("Recovered database history: {}", recordObj); + } + } + lastProcessedMessageId = msg.getMessageId(); + } + } catch (IOException ioe) { + log.error("Error while deserializing history record '{}'", msg.getValue(), ioe); + } catch (final Exception e) { + throw e; + } + } + log.info("Successfully completed scanning the database history topic '{}'", topicName); + } catch (IOException ioe) { + log.error("Encountered issues on recovering history records", ioe); + throw new RuntimeException("Encountered issues on recovering history records", ioe); + } + } + + @Override + public boolean exists() { + setupClientIfNeeded(); + try (Reader historyReader = createHistoryReader()) { + return historyReader.hasMessageAvailable(); + } catch (IOException e) { + log.error("Encountered issues on checking existence of database history", e); + throw new RuntimeException("Encountered issues on checking existence of database history", e); + } + } + + @Override + public boolean storageExists() { + return true; + } + + @Override + public String toString() { + if (topicName != null) { + return "Pulsar topic (" + topicName + ")"; + } + return "Pulsar topic"; + } + + @VisibleForTesting + Reader createHistoryReader() throws PulsarClientException { + return pulsarClient.newReader(Schema.STRING) + .topic(topicName) + .startMessageId(MessageId.earliest) + .loadConf(readerConfigMap) + .create(); + } +} diff --git a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/SerDeUtils.java b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/SerDeUtils.java new file mode 100644 index 0000000000000..1b9a062de254a --- /dev/null +++ b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/SerDeUtils.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.debezium.oracle; + +import lombok.extern.slf4j.Slf4j; + +import java.io.*; +import java.util.Base64; + +@Slf4j +public class SerDeUtils { + public static Object deserialize(String objectBase64Encoded, ClassLoader classLoader) { + byte[] data = Base64.getDecoder().decode(objectBase64Encoded); + try (InputStream bai = new ByteArrayInputStream(data); + PulsarClientBuilderInputStream ois = new PulsarClientBuilderInputStream(bai, classLoader)) { + return ois.readObject(); + } catch (Exception e) { + throw new RuntimeException( + "Failed to initialize the pulsar client to store debezium database history", e); + } + } + + public static String serialize(Object obj) throws Exception { + try (ByteArrayOutputStream bao = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bao)) { + oos.writeObject(obj); + oos.flush(); + byte[] data = bao.toByteArray(); + return Base64.getEncoder().encodeToString(data); + } + } + + static class PulsarClientBuilderInputStream extends ObjectInputStream { + private final ClassLoader classLoader; + public PulsarClientBuilderInputStream(InputStream in, ClassLoader ldr) throws IOException { + super(in); + this.classLoader = ldr; + } + + protected Class resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { + try { + return Class.forName(desc.getName(), true, classLoader); + } catch (Exception ex) { + log.warn("PulsarClientBuilderInputStream resolveClass failed {} {}", desc.getName(), ex); + } + return super.resolveClass(desc); + } + } +}