From c875f42603710191e3d441b8a11c5130498865e2 Mon Sep 17 00:00:00 2001 From: rusirijayodaillesinghe Date: Fri, 28 Jul 2023 13:28:17 +0530 Subject: [PATCH] set the output format as json in a way a user cannot override Get the param as allowed.operations instead of skipped.operations Change the property names of the sequence Add isDebugEnable check where string concaterntions are done. --- .../protocol/cdc/CDCInjectHandler.java | 20 ++--- .../protocol/cdc/CDCPollingConsumer.java | 47 +++++++--- .../endpoint/protocol/cdc/CDCProcessor.java | 88 +++++++++++-------- .../endpoint/protocol/cdc/CDCTask.java | 12 +-- .../protocol/cdc/InboundCDCConstants.java | 10 ++- 5 files changed, 103 insertions(+), 74 deletions(-) diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java index 00e5fe2127..baad141530 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCInjectHandler.java @@ -40,10 +40,10 @@ import java.util.Map; import java.util.Properties; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DATABASE_NAME; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.OPERATIONS; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TABLES; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TS_MS; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_DATABASE_NAME; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_OPERATIONS; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_TABLES; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_TS_MS; public class CDCInjectHandler { @@ -73,9 +73,7 @@ public boolean invoke(Object object, String inboundEndpointName) throws SynapseE ChangeEvent eventRecord = (ChangeEvent) object; if (eventRecord == null || eventRecord.value() == null) { - if (logger.isDebugEnabled()) { - logger.debug("CDC Source Handler received empty event record"); - } + logger.debug("CDC Source Handler received empty event record"); } else { InputStream in = null; try { @@ -87,10 +85,10 @@ public boolean invoke(Object object, String inboundEndpointName) throws SynapseE CustomLogSetter.getInstance().setLogAppender(inboundEndpoint.getArtifactContainerName()); CDCEventOutput cdcEventOutput = new CDCEventOutput(eventRecord); - msgCtx.setProperty(DATABASE_NAME, cdcEventOutput.getDatabase()); - msgCtx.setProperty(TABLES, cdcEventOutput.getTable().toString()); - msgCtx.setProperty(OPERATIONS, cdcEventOutput.getOp()); - msgCtx.setProperty(TS_MS, cdcEventOutput.getTs_ms().toString()); + msgCtx.setProperty(CDC_DATABASE_NAME, cdcEventOutput.getDatabase()); + msgCtx.setProperty(CDC_TABLES, cdcEventOutput.getTable().toString()); + msgCtx.setProperty(CDC_OPERATIONS, cdcEventOutput.getOp()); + msgCtx.setProperty(CDC_TS_MS, cdcEventOutput.getTs_ms().toString()); if (logger.isDebugEnabled()) { logger.debug("Processed event : " + eventRecord); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java index dbd56d18f2..c10698dd6f 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java @@ -18,15 +18,17 @@ package org.wso2.carbon.inbound.endpoint.protocol.cdc; import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.Json; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.synapse.core.SynapseEnvironment; +import java.io.IOException; import java.util.Date; import java.util.Properties; -import java.util.concurrent.BlockingQueue; - -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.CDCProcessor.inboundEpEventQueueMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * This class implement the processing logic related to inbound CDC protocol. @@ -42,6 +44,8 @@ public class CDCPollingConsumer { private long scanInterval; private Long lastRanTime; private CDCInjectHandler injectHandler; + private ExecutorService executorService = null; + private DebeziumEngine> engine = null; public CDCPollingConsumer(Properties cdcProperties, String inboundEndpointName, SynapseEnvironment synapseEnvironment, long scanInterval) { @@ -95,24 +99,41 @@ public void execute() { * according to the registered handler */ public ChangeEvent poll() { + logger.debug("Start : listening to DB events : "); + listenDataChanges(); + logger.debug("End : Listening to DB events : "); + return null; + } - if (logger.isDebugEnabled()) { - logger.debug("Start : listening to DB events : "); - } + private void listenDataChanges () { + executorService = Executors.newSingleThreadExecutor(); - BlockingQueue> eventQueue = inboundEpEventQueueMap.get(inboundEndpointName); - while (!eventQueue.isEmpty()) { - injectHandler.invoke(eventQueue.poll(), inboundEndpointName); - } + if (engine == null || executorService.isShutdown()) { + engine = DebeziumEngine.create(Json.class) + .using(this.cdcProperties) + .notifying(record -> { + injectHandler.invoke(record, this.inboundEndpointName); + }).build(); - if (logger.isDebugEnabled()) { - logger.debug("End : Listening to DB events : "); + executorService.execute(engine); } - return null; } protected Properties getInboundProperties() { return cdcProperties; } + protected void destroy () { + if (!executorService.isShutdown()) { + executorService.shutdown(); + } + try { + if (engine != null) { + engine.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java index f5353e2acd..41479d471e 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java @@ -18,9 +18,6 @@ package org.wso2.carbon.inbound.endpoint.protocol.cdc; -import io.debezium.engine.ChangeEvent; -import io.debezium.engine.DebeziumEngine; -import io.debezium.engine.format.Json; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,13 +36,15 @@ import java.io.File; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.util.List; import java.util.Properties; -import java.util.concurrent.*; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_ALLOWED_OPERATIONS; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_PASSWORD; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_KEY_CONVERTER; @@ -58,6 +57,7 @@ import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_TOPIC_PREFIX; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_VALUE_CONVERTER; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE; +import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_SKIPPED_OPERATIONS; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TRUE; public class CDCProcessor extends InboundRequestProcessorImpl implements TaskStartupObserver, InboundTaskProcessor { @@ -74,8 +74,9 @@ public class CDCProcessor extends InboundRequestProcessorImpl implements TaskSta private static final String FILE_OFFSET_STORAGE_CLASS = "org.apache.kafka.connect.storage.FileOffsetBackingStore"; private static final String FILE_SCHEMA_HISTORY_STORAGE_CLASS = "io.debezium.storage.file.history.FileSchemaHistory"; private static final Log LOGGER = LogFactory.getLog(CDCProcessor.class); - protected static Map inboundEpEventQueueMap = new HashMap(); - private ExecutorService executorService = null; + + private enum operations {create, update, delete, truncate}; + private enum opCodes {c, u, d, t}; public CDCProcessor(InboundProcessorParams params) { this.name = params.getName(); @@ -100,10 +101,6 @@ public CDCProcessor(InboundProcessorParams params) { if (cdcProperties.getProperty(PollingConstants.INBOUND_COORDINATION) != null) { this.coordination = Boolean.parseBoolean(cdcProperties.getProperty(PollingConstants.INBOUND_COORDINATION)); } - if (!inboundEpEventQueueMap.containsKey(this.name)) { - BlockingQueue> eventQueue = new LinkedBlockingQueue<>(); - inboundEpEventQueueMap.put(this.name, eventQueue); - } } private void setProperties () { @@ -137,22 +134,20 @@ private void setProperties () { this.cdcProperties.setProperty(DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL, TRUE); } + if (this.cdcProperties.getProperty(DEBEZIUM_ALLOWED_OPERATIONS) != null) { + this.cdcProperties.setProperty(DEBEZIUM_SKIPPED_OPERATIONS, + getSkippedOperationsString(this.cdcProperties.getProperty(DEBEZIUM_ALLOWED_OPERATIONS))); + } + if (this.cdcProperties.getProperty(DEBEZIUM_TOPIC_PREFIX) == null) { this.cdcProperties.setProperty(DEBEZIUM_TOPIC_PREFIX, this.name +"_topic"); } - if (this.cdcProperties.getProperty(DEBEZIUM_VALUE_CONVERTER) == null) { - this.cdcProperties.setProperty(DEBEZIUM_VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter"); - } - if (this.cdcProperties.getProperty(DEBEZIUM_KEY_CONVERTER) == null) { - this.cdcProperties.setProperty(DEBEZIUM_KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter"); - } - if (this.cdcProperties.getProperty(DEBEZIUM_KEY_CONVERTER_SCHEMAS_ENABLE) == null) { - this.cdcProperties.setProperty(DEBEZIUM_KEY_CONVERTER_SCHEMAS_ENABLE, TRUE); - } - if (this.cdcProperties.getProperty(DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE) == null) { - this.cdcProperties.setProperty(DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE, TRUE); - } + // set the output format as json in a way a user cannot override + this.cdcProperties.setProperty(DEBEZIUM_VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter"); + this.cdcProperties.setProperty(DEBEZIUM_KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter"); + this.cdcProperties.setProperty(DEBEZIUM_KEY_CONVERTER_SCHEMAS_ENABLE, TRUE); + this.cdcProperties.setProperty(DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE, TRUE); if (this.cdcProperties.getProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL) == null) { this.cdcProperties.setProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL, FILE_SCHEMA_HISTORY_STORAGE_CLASS); @@ -220,19 +215,6 @@ public void init() { pollingConsumer = new CDCPollingConsumer(cdcProperties, name, synapseEnvironment, interval); pollingConsumer.registerHandler(new CDCInjectHandler(injectingSeq, onErrorSeq, sequential, synapseEnvironment, cdcProperties)); - - DebeziumEngine> engine = DebeziumEngine.create(Json.class) - .using(this.cdcProperties) - .notifying(record -> { - try { - inboundEpEventQueueMap.get(this.name).offer(record, interval, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }).build(); - - executorService = Executors.newSingleThreadExecutor(); - executorService.execute(engine); start(); } @@ -254,7 +236,7 @@ public void start() { public void destroy(boolean removeTask) { if (removeTask) { destroy(); - executorService.shutdown(); + pollingConsumer.destroy(); } } @@ -262,4 +244,34 @@ public void destroy(boolean removeTask) { public void update() { // This will not be called for inbound endpoints } + + private String getOpCode(String op) { + if (op != null) { + switch (operations.valueOf(op)) { + case create: + return opCodes.c.toString(); + case update: + return opCodes.u.toString(); + case delete: + return opCodes.d.toString(); + case truncate: + return opCodes.t.toString(); + } + } + return ""; + } + + /** + * Get the comma separated list containing allowed operations and returns the string of skipped operation codes + * @param allowedOperationsString string + * @return the coma separated string of skipped operation codes + */ + private String getSkippedOperationsString(String allowedOperationsString) { + List allOperations = Stream.of(opCodes.values()).map(Enum :: toString).collect(Collectors.toList()); + Set allowedOperationsSet = Stream.of(allowedOperationsString.split(",")). + map(String :: trim).map(String :: toLowerCase).map(op -> getOpCode(op)). + collect(Collectors.toSet()); + allOperations.removeAll(allowedOperationsSet); + return String.join(",", allOperations); + } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCTask.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCTask.java index 4795fab44a..a81d0b6fa9 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCTask.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCTask.java @@ -42,9 +42,7 @@ public CDCTask(CDCPollingConsumer pollingConsumer, long interval) { } protected void taskExecute() { - if (logger.isDebugEnabled()) { - logger.debug("CDC Task executing."); - } + logger.debug("CDC Task executing."); pollingConsumer.execute(); } @@ -54,14 +52,10 @@ public Properties getInboundProperties() { } public void init(SynapseEnvironment synapseEnvironment) { - if (logger.isDebugEnabled()) { - logger.debug("Initializing Task."); - } + logger.debug("Initializing Task."); } public void destroy() { - if (logger.isDebugEnabled()) { - logger.debug("Destroying Task. "); - } + logger.debug("Destroying Task. "); } } diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java index c23931d9b2..386cb7bae7 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java @@ -36,11 +36,15 @@ class InboundCDCConstants { public static final String DEBEZIUM_SCHEMA_HISTORY_INTERNAL = "schema.history.internal"; public static final String DEBEZIUM_SCHEMA_HISTORY_INTERNAL_FILE_FILENAME = "schema.history.internal.file.filename"; + public static final String DEBEZIUM_SKIPPED_OPERATIONS = "skipped.operations"; + public static final String DEBEZIUM_ALLOWED_OPERATIONS = "allowed.operations"; + /** Output Properties **/ - public static final String DATABASE_NAME = "database"; - public static final String TABLES ="tables"; - public static final String OPERATIONS ="operations"; + public static final String CDC_DATABASE_NAME = "cdc.database"; + public static final String CDC_TABLES ="cdc.tables"; + public static final String CDC_OPERATIONS ="cdc.operations"; + public static final String CDC_TS_MS = "cdc.ts_ms"; public static final String TS_MS = "ts_ms"; public static final String BEFORE = "before";