Skip to content

Commit

Permalink
set the output format as json in a way a user cannot override
Browse files Browse the repository at this point in the history
Get the param as allowed.operations instead of skipped.operations
Change the property names of the sequence
Add isDebugEnable check where string concaterntions are done.
  • Loading branch information
RusJaI committed Sep 11, 2023
1 parent 0f29089 commit c875f42
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import java.util.Map;
import java.util.Properties;

import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DATABASE_NAME;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.OPERATIONS;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TABLES;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TS_MS;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_DATABASE_NAME;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_OPERATIONS;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_TABLES;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_TS_MS;

public class CDCInjectHandler {

Expand Down Expand Up @@ -73,9 +73,7 @@ public boolean invoke(Object object, String inboundEndpointName) throws SynapseE

ChangeEvent<String, String> eventRecord = (ChangeEvent<String, String>) object;
if (eventRecord == null || eventRecord.value() == null) {
if (logger.isDebugEnabled()) {
logger.debug("CDC Source Handler received empty event record");
}
logger.debug("CDC Source Handler received empty event record");
} else {
InputStream in = null;
try {
Expand All @@ -87,10 +85,10 @@ public boolean invoke(Object object, String inboundEndpointName) throws SynapseE
CustomLogSetter.getInstance().setLogAppender(inboundEndpoint.getArtifactContainerName());

CDCEventOutput cdcEventOutput = new CDCEventOutput(eventRecord);
msgCtx.setProperty(DATABASE_NAME, cdcEventOutput.getDatabase());
msgCtx.setProperty(TABLES, cdcEventOutput.getTable().toString());
msgCtx.setProperty(OPERATIONS, cdcEventOutput.getOp());
msgCtx.setProperty(TS_MS, cdcEventOutput.getTs_ms().toString());
msgCtx.setProperty(CDC_DATABASE_NAME, cdcEventOutput.getDatabase());
msgCtx.setProperty(CDC_TABLES, cdcEventOutput.getTable().toString());
msgCtx.setProperty(CDC_OPERATIONS, cdcEventOutput.getOp());
msgCtx.setProperty(CDC_TS_MS, cdcEventOutput.getTs_ms().toString());

if (logger.isDebugEnabled()) {
logger.debug("Processed event : " + eventRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
package org.wso2.carbon.inbound.endpoint.protocol.cdc;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.core.SynapseEnvironment;

import java.io.IOException;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;

import static org.wso2.carbon.inbound.endpoint.protocol.cdc.CDCProcessor.inboundEpEventQueueMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* This class implement the processing logic related to inbound CDC protocol.
Expand All @@ -42,6 +44,8 @@ public class CDCPollingConsumer {
private long scanInterval;
private Long lastRanTime;
private CDCInjectHandler injectHandler;
private ExecutorService executorService = null;
private DebeziumEngine<ChangeEvent<String, String>> engine = null;

public CDCPollingConsumer(Properties cdcProperties, String inboundEndpointName, SynapseEnvironment synapseEnvironment,
long scanInterval) {
Expand Down Expand Up @@ -95,24 +99,41 @@ public void execute() {
* according to the registered handler
*/
public ChangeEvent<String, String> poll() {
logger.debug("Start : listening to DB events : ");
listenDataChanges();
logger.debug("End : Listening to DB events : ");
return null;
}

if (logger.isDebugEnabled()) {
logger.debug("Start : listening to DB events : ");
}
private void listenDataChanges () {
executorService = Executors.newSingleThreadExecutor();

BlockingQueue<ChangeEvent<String, String>> eventQueue = inboundEpEventQueueMap.get(inboundEndpointName);
while (!eventQueue.isEmpty()) {
injectHandler.invoke(eventQueue.poll(), inboundEndpointName);
}
if (engine == null || executorService.isShutdown()) {
engine = DebeziumEngine.create(Json.class)
.using(this.cdcProperties)
.notifying(record -> {
injectHandler.invoke(record, this.inboundEndpointName);
}).build();

if (logger.isDebugEnabled()) {
logger.debug("End : Listening to DB events : ");
executorService.execute(engine);
}
return null;
}

protected Properties getInboundProperties() {
return cdcProperties;
}

protected void destroy () {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
try {
if (engine != null) {
engine.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.wso2.carbon.inbound.endpoint.protocol.cdc;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -39,13 +36,15 @@

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_ALLOWED_OPERATIONS;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_PASSWORD;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_KEY_CONVERTER;
Expand All @@ -58,6 +57,7 @@
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_TOPIC_PREFIX;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_VALUE_CONVERTER;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_SKIPPED_OPERATIONS;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TRUE;

public class CDCProcessor extends InboundRequestProcessorImpl implements TaskStartupObserver, InboundTaskProcessor {
Expand All @@ -74,8 +74,9 @@ public class CDCProcessor extends InboundRequestProcessorImpl implements TaskSta
private static final String FILE_OFFSET_STORAGE_CLASS = "org.apache.kafka.connect.storage.FileOffsetBackingStore";
private static final String FILE_SCHEMA_HISTORY_STORAGE_CLASS = "io.debezium.storage.file.history.FileSchemaHistory";
private static final Log LOGGER = LogFactory.getLog(CDCProcessor.class);
protected static Map<String, BlockingQueue> inboundEpEventQueueMap = new HashMap();
private ExecutorService executorService = null;

private enum operations {create, update, delete, truncate};
private enum opCodes {c, u, d, t};

public CDCProcessor(InboundProcessorParams params) {
this.name = params.getName();
Expand All @@ -100,10 +101,6 @@ public CDCProcessor(InboundProcessorParams params) {
if (cdcProperties.getProperty(PollingConstants.INBOUND_COORDINATION) != null) {
this.coordination = Boolean.parseBoolean(cdcProperties.getProperty(PollingConstants.INBOUND_COORDINATION));
}
if (!inboundEpEventQueueMap.containsKey(this.name)) {
BlockingQueue<ChangeEvent<String, String>> eventQueue = new LinkedBlockingQueue<>();
inboundEpEventQueueMap.put(this.name, eventQueue);
}
}

private void setProperties () {
Expand Down Expand Up @@ -137,22 +134,20 @@ private void setProperties () {
this.cdcProperties.setProperty(DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL, TRUE);
}

if (this.cdcProperties.getProperty(DEBEZIUM_ALLOWED_OPERATIONS) != null) {
this.cdcProperties.setProperty(DEBEZIUM_SKIPPED_OPERATIONS,
getSkippedOperationsString(this.cdcProperties.getProperty(DEBEZIUM_ALLOWED_OPERATIONS)));
}

if (this.cdcProperties.getProperty(DEBEZIUM_TOPIC_PREFIX) == null) {
this.cdcProperties.setProperty(DEBEZIUM_TOPIC_PREFIX, this.name +"_topic");
}

if (this.cdcProperties.getProperty(DEBEZIUM_VALUE_CONVERTER) == null) {
this.cdcProperties.setProperty(DEBEZIUM_VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
}
if (this.cdcProperties.getProperty(DEBEZIUM_KEY_CONVERTER) == null) {
this.cdcProperties.setProperty(DEBEZIUM_KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
}
if (this.cdcProperties.getProperty(DEBEZIUM_KEY_CONVERTER_SCHEMAS_ENABLE) == null) {
this.cdcProperties.setProperty(DEBEZIUM_KEY_CONVERTER_SCHEMAS_ENABLE, TRUE);
}
if (this.cdcProperties.getProperty(DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE) == null) {
this.cdcProperties.setProperty(DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE, TRUE);
}
// set the output format as json in a way a user cannot override
this.cdcProperties.setProperty(DEBEZIUM_VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
this.cdcProperties.setProperty(DEBEZIUM_KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
this.cdcProperties.setProperty(DEBEZIUM_KEY_CONVERTER_SCHEMAS_ENABLE, TRUE);
this.cdcProperties.setProperty(DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE, TRUE);

if (this.cdcProperties.getProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL) == null) {
this.cdcProperties.setProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL, FILE_SCHEMA_HISTORY_STORAGE_CLASS);
Expand Down Expand Up @@ -220,19 +215,6 @@ public void init() {
pollingConsumer = new CDCPollingConsumer(cdcProperties, name, synapseEnvironment, interval);
pollingConsumer.registerHandler(new CDCInjectHandler(injectingSeq, onErrorSeq, sequential,
synapseEnvironment, cdcProperties));

DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(this.cdcProperties)
.notifying(record -> {
try {
inboundEpEventQueueMap.get(this.name).offer(record, interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).build();

executorService = Executors.newSingleThreadExecutor();
executorService.execute(engine);
start();
}

Expand All @@ -254,12 +236,42 @@ public void start() {
public void destroy(boolean removeTask) {
if (removeTask) {
destroy();
executorService.shutdown();
pollingConsumer.destroy();
}
}

@Override
public void update() {
// This will not be called for inbound endpoints
}

private String getOpCode(String op) {
if (op != null) {
switch (operations.valueOf(op)) {
case create:
return opCodes.c.toString();
case update:
return opCodes.u.toString();
case delete:
return opCodes.d.toString();
case truncate:
return opCodes.t.toString();
}
}
return "";
}

/**
* Get the comma separated list containing allowed operations and returns the string of skipped operation codes
* @param allowedOperationsString string
* @return the coma separated string of skipped operation codes
*/
private String getSkippedOperationsString(String allowedOperationsString) {
List<String> allOperations = Stream.of(opCodes.values()).map(Enum :: toString).collect(Collectors.toList());
Set<String> allowedOperationsSet = Stream.of(allowedOperationsString.split(",")).
map(String :: trim).map(String :: toLowerCase).map(op -> getOpCode(op)).
collect(Collectors.toSet());
allOperations.removeAll(allowedOperationsSet);
return String.join(",", allOperations);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ public CDCTask(CDCPollingConsumer pollingConsumer, long interval) {
}

protected void taskExecute() {
if (logger.isDebugEnabled()) {
logger.debug("CDC Task executing.");
}
logger.debug("CDC Task executing.");
pollingConsumer.execute();
}

Expand All @@ -54,14 +52,10 @@ public Properties getInboundProperties() {
}

public void init(SynapseEnvironment synapseEnvironment) {
if (logger.isDebugEnabled()) {
logger.debug("Initializing Task.");
}
logger.debug("Initializing Task.");
}

public void destroy() {
if (logger.isDebugEnabled()) {
logger.debug("Destroying Task. ");
}
logger.debug("Destroying Task. ");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ class InboundCDCConstants {

public static final String DEBEZIUM_SCHEMA_HISTORY_INTERNAL = "schema.history.internal";
public static final String DEBEZIUM_SCHEMA_HISTORY_INTERNAL_FILE_FILENAME = "schema.history.internal.file.filename";
public static final String DEBEZIUM_SKIPPED_OPERATIONS = "skipped.operations";
public static final String DEBEZIUM_ALLOWED_OPERATIONS = "allowed.operations";


/** Output Properties **/
public static final String DATABASE_NAME = "database";
public static final String TABLES ="tables";
public static final String OPERATIONS ="operations";
public static final String CDC_DATABASE_NAME = "cdc.database";
public static final String CDC_TABLES ="cdc.tables";
public static final String CDC_OPERATIONS ="cdc.operations";
public static final String CDC_TS_MS = "cdc.ts_ms";
public static final String TS_MS = "ts_ms";

public static final String BEFORE = "before";
Expand Down

0 comments on commit c875f42

Please sign in to comment.