From afe690690f366af84525469c3c4f4ad568f67d55 Mon Sep 17 00:00:00 2001 From: rusirijayodaillesinghe Date: Tue, 25 Jul 2023 11:59:33 +0530 Subject: [PATCH] Add dependency for ojdbc --- .../protocol/cdc/CDCEndpointManager.java | 256 ------------------ .../protocol/cdc/CDCEventExecutorManager.java | 50 ---- .../endpoint/protocol/cdc/CDCEventOutput.java | 142 ---------- .../endpoint/protocol/cdc/CDCListener.java | 50 ---- .../protocol/cdc/CDCSourceHandler.java | 165 ----------- .../protocol/cdc/InboundCDCConstants.java | 51 ---- .../protocol/cdc/InboundCDCEventExecutor.java | 49 ---- .../protocol/cdcPolling/CDCEventOutput.java | 34 +-- pom.xml | 9 +- 9 files changed, 9 insertions(+), 797 deletions(-) delete mode 100644 components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEndpointManager.java delete mode 100644 components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEventExecutorManager.java delete mode 100644 components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEventOutput.java delete mode 100644 components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCListener.java delete mode 100644 components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCSourceHandler.java delete mode 100644 components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java delete mode 100644 components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCEventExecutor.java diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEndpointManager.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEndpointManager.java deleted file mode 100644 index ad03eb7512..0000000000 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEndpointManager.java +++ /dev/null @@ -1,256 +0,0 @@ -package org.wso2.carbon.inbound.endpoint.protocol.cdc; - -import io.debezium.engine.ChangeEvent; -import io.debezium.engine.DebeziumEngine; -import io.debezium.engine.format.Json; -import org.apache.synapse.mediators.Value; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.synapse.MessageContext; -import org.apache.synapse.SynapseException; -import org.apache.synapse.core.SynapseEnvironment; -import org.apache.synapse.inbound.InboundProcessorParams; -import org.apache.synapse.util.xpath.SynapseXPath; -import org.jaxen.JaxenException; -import org.wso2.carbon.inbound.endpoint.common.AbstractInboundEndpointManager; - -import java.io.File; -import java.io.IOException; -import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.wso2.carbon.inbound.endpoint.common.Constants.SUPER_TENANT_DOMAIN_NAME; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_CONNECTOR_CLASS; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_DBNAME; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_HOSTNAME; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_PASSWORD; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_PORT; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_SERVER_ID; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_USER; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_KEY_CONVERTER; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_KEY_CONVERTER_SCHEMAS_ENABLE; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_OFFSET_FLUSH_INTERVAL_MS; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_OFFSET_STORAGE; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_OFFSET_STORAGE_FILE_FILENAME; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_OPERATIONS_EXCLUDE_LIST; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_NAME; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_SCHEMA_HISTORY_INTERNAL; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_SCHEMA_HISTORY_INTERNAL_FILE_FILENAME; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_SNAPSHOT_MODE; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_TABLES_INCLUDE_LIST; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_TOPIC_PREFIX; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_VALUE_CONVERTER; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TRUE; - -public class CDCEndpointManager extends AbstractInboundEndpointManager { - - private static final Log log = LogFactory.getLog(CDCEndpointManager.class); - - private static CDCEndpointManager instance = null; - private InboundCDCEventExecutor eventExecutor; - - private static final String SECURE_VAULT_REGEX = "(wso2:vault-lookup\\('(.*?)'\\))"; - private static Pattern vaultLookupPattern = Pattern.compile(SECURE_VAULT_REGEX); - - private CDCEndpointManager() { - super(); - } - - public static CDCEndpointManager getInstance() { - if (instance == null) { - instance = new CDCEndpointManager(); - } - return instance; - } - - @Override - public boolean startListener(int port, String name, InboundProcessorParams inboundParameters) { - - if (CDCEventExecutorManager.getInstance().isRegisteredExecutor(port)) { - log.info("CDC Listener already started on port " + port); - return true; - } - - log.info("Starting CDC Listener on port " + port); - - eventExecutor = new InboundCDCEventExecutor(); - CDCEventExecutorManager.getInstance().registerEventExecutor(port, eventExecutor); - Properties props = setProperties(inboundParameters); - - CDCSourceHandler sourceHandler = new CDCSourceHandler(port, inboundParameters); - DebeziumEngine> engine = DebeziumEngine.create(Json.class) - .using(props) - .notifying(record -> { - sourceHandler.requestReceived(record); - System.out.println(record); - }).build(); - - eventExecutor.getExecutorService().execute(engine); - log.info("Debezium engine started"); - - return true; - } - - private Properties setProperties (InboundProcessorParams params) { - Properties inboundProperties = params.getProperties(); - log.info("Initializing the properties"); - final Properties props = new Properties(); - try { - props.setProperty(DEBEZIUM_NAME, inboundProperties.getProperty(DEBEZIUM_NAME)); - if (inboundProperties.getProperty(DEBEZIUM_SNAPSHOT_MODE) != null) { - props.setProperty(DEBEZIUM_SNAPSHOT_MODE, inboundProperties.getProperty(DEBEZIUM_SNAPSHOT_MODE)); - } - - if (inboundProperties.getProperty(DEBEZIUM_OFFSET_STORAGE) != null) { - props.setProperty(DEBEZIUM_OFFSET_STORAGE, inboundProperties.getProperty(DEBEZIUM_OFFSET_STORAGE)); - } else { - props.setProperty(DEBEZIUM_OFFSET_STORAGE, "org.apache.kafka.connect.storage.FileOffsetBackingStore"); - } - - if (inboundProperties.getProperty(DEBEZIUM_OFFSET_STORAGE_FILE_FILENAME) != null) { - props.setProperty(DEBEZIUM_OFFSET_STORAGE_FILE_FILENAME, inboundProperties.getProperty(DEBEZIUM_OFFSET_STORAGE_FILE_FILENAME)); - } else { - String filePath = "cdc/offsetStorage/" + params.getName() + "_.dat"; - createFile(filePath); - props.setProperty(DEBEZIUM_OFFSET_STORAGE_FILE_FILENAME, filePath); - } - - if (inboundProperties.getProperty(DEBEZIUM_OFFSET_FLUSH_INTERVAL_MS) != null) { - props.setProperty(DEBEZIUM_OFFSET_FLUSH_INTERVAL_MS, inboundProperties.getProperty(DEBEZIUM_OFFSET_FLUSH_INTERVAL_MS)); - } else { - props.setProperty(DEBEZIUM_OFFSET_FLUSH_INTERVAL_MS, "1000"); - } - - /* begin connector properties */ - props.setProperty(DEBEZIUM_CONNECTOR_CLASS, inboundProperties.getProperty(DEBEZIUM_CONNECTOR_CLASS)); - props.setProperty(DEBEZIUM_DATABASE_HOSTNAME, inboundProperties.getProperty(DEBEZIUM_DATABASE_HOSTNAME)); - props.setProperty(DEBEZIUM_DATABASE_PORT, inboundProperties.getProperty(DEBEZIUM_DATABASE_PORT)); - props.setProperty(DEBEZIUM_DATABASE_USER, inboundProperties.getProperty(DEBEZIUM_DATABASE_USER)); - - String passwordString = inboundProperties.getProperty(DEBEZIUM_DATABASE_PASSWORD); - SynapseEnvironment synapseEnvironment = params.getSynapseEnvironment(); - MessageContext messageContext = synapseEnvironment.createMessageContext(); - - props.setProperty(DEBEZIUM_DATABASE_PASSWORD, resolveSecureVault(messageContext, passwordString)); - - props.setProperty(DEBEZIUM_DATABASE_DBNAME, inboundProperties.getProperty(DEBEZIUM_DATABASE_DBNAME)); - props.setProperty(DEBEZIUM_DATABASE_SERVER_ID, inboundProperties.getProperty(DEBEZIUM_DATABASE_SERVER_ID)); - - if (inboundProperties.getProperty(DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL) != null) { - props.setProperty(DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL, inboundProperties.getProperty(DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL)); - } else { - props.setProperty(DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL, TRUE); - } - - if (inboundProperties.getProperty(DEBEZIUM_TOPIC_PREFIX) != null) { - props.setProperty(DEBEZIUM_TOPIC_PREFIX, inboundProperties.getProperty(DEBEZIUM_TOPIC_PREFIX)); - } else { - props.setProperty(DEBEZIUM_TOPIC_PREFIX, params.getName() +"_topic"); - } - - props.setProperty(DEBEZIUM_VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter"); - props.setProperty(DEBEZIUM_KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter"); - props.setProperty(DEBEZIUM_KEY_CONVERTER_SCHEMAS_ENABLE, TRUE); - props.setProperty(DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE, TRUE); - - if (inboundProperties.getProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL) != null) { - props.setProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL, inboundProperties.getProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL)); - } else { - props.setProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL, "io.debezium.storage.file.history.FileSchemaHistory"); - } - - if (inboundProperties.getProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL_FILE_FILENAME) != null) { - props.setProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL_FILE_FILENAME, inboundProperties.getProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL_FILE_FILENAME)); - } else { - String filePath = "cdc/schemaHistory/" + params.getName() + "_.dat"; - createFile(filePath); - props.setProperty(DEBEZIUM_SCHEMA_HISTORY_INTERNAL_FILE_FILENAME, filePath); - } - - if (inboundProperties.getProperty(DEBEZIUM_TABLES_INCLUDE_LIST) != null) { - props.setProperty(DEBEZIUM_TABLES_INCLUDE_LIST, inboundProperties.getProperty(DEBEZIUM_TABLES_INCLUDE_LIST)); - } - - if (inboundProperties.getProperty(DEBEZIUM_OPERATIONS_EXCLUDE_LIST) != null) { - props.setProperty(DEBEZIUM_OPERATIONS_EXCLUDE_LIST, inboundProperties.getProperty(DEBEZIUM_OPERATIONS_EXCLUDE_LIST)); - } - } catch (IOException e) { - throw new RuntimeException(e); - } catch (NullPointerException e) { - log.error("A required property value is not defined", e); - throw new RuntimeException(e); - } - return props; - } - - private static synchronized String resolveSecureVault(MessageContext messageContext, String passwordString) { - if (passwordString == null) { - return null; - } - Matcher lookupMatcher = vaultLookupPattern.matcher(passwordString); - String resolvedValue = ""; - if (lookupMatcher.find()) { - Value expression; - String expressionStr = lookupMatcher.group(1); - try { - expression = new Value(new SynapseXPath(expressionStr)); - - } catch (JaxenException e) { - throw new SynapseException("Error while building the expression : " + expressionStr, e); - } - resolvedValue = expression.evaluateValue(messageContext); - if (StringUtils.isEmpty(resolvedValue)) { - log.warn("Found Empty value for expression : " + expression.getExpression()); - resolvedValue = ""; - } - } else { - resolvedValue = passwordString; - } - return resolvedValue; - } - - @Override - public boolean startEndpoint(int port, String name, InboundProcessorParams inboundParameters) { - log.info("Starting CDC Endpoint on port " + port); - dataStore.registerListeningEndpoint(port, SUPER_TENANT_DOMAIN_NAME, InboundCDCConstants.CDC, name, - inboundParameters); - - boolean start = startListener(port, name, inboundParameters); - - if (start) { - //do nothing - } else { - dataStore.unregisterListeningEndpoint(port, SUPER_TENANT_DOMAIN_NAME); - return false; - } - return true; - } - - private void createFile (String filePath) throws IOException { - File file = new File(filePath); - file.getParentFile().mkdirs(); - if(!file.exists()) { - file.createNewFile(); - } - } - - @Override - public void closeEndpoint(int port) { - log.info("Closing CDC Endpoint on port " + port); - eventExecutor.getExecutorService().shutdown(); - dataStore.unregisterListeningEndpoint(port, SUPER_TENANT_DOMAIN_NAME); - - if (!CDCEventExecutorManager.getInstance().isRegisteredExecutor(port)) { - log.info("Listener Endpoint is not started"); - return; - } else if (dataStore.isEndpointRegistryEmpty(port)) { - CDCEventExecutorManager.getInstance().shutdownExecutor(port); - } - - } - -} diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEventExecutorManager.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEventExecutorManager.java deleted file mode 100644 index 80040b285f..0000000000 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEventExecutorManager.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.wso2.carbon.inbound.endpoint.protocol.cdc; - - -import java.util.concurrent.ConcurrentHashMap; - -public class CDCEventExecutorManager { - - private ConcurrentHashMap executorPoolMap = new ConcurrentHashMap(); - - private static CDCEventExecutorManager instance = null; - - public static CDCEventExecutorManager getInstance() { - if (instance == null) { - instance = new CDCEventExecutorManager(); - } - return instance; - } - - public void shutdownExecutor(int port) { - executorPoolMap.get(port).shutdownEventExecutor(); - executorPoolMap.remove(port); - } - - public void registerEventExecutor(int port, InboundCDCEventExecutor eventExecutor) { - executorPoolMap.put(port, eventExecutor); - } - - public boolean isRegisteredExecutor(int port) { - return executorPoolMap.containsKey(port); - } - -} diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEventOutput.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEventOutput.java deleted file mode 100644 index df9439c101..0000000000 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCEventOutput.java +++ /dev/null @@ -1,142 +0,0 @@ -package org.wso2.carbon.inbound.endpoint.protocol.cdc; - -import io.debezium.engine.ChangeEvent; -import org.json.JSONObject; - -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.AFTER; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.BEFORE; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DB; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.OP; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.PAYLOAD; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.SOURCE; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TABLE; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TS_MS; - - -public class CDCEventOutput { - - private Object beforeEvent; - private Object afterEvent; - private Long ts_ms; - - private String database; - private Object table; - private String op; - private JSONObject payload; - - private enum operations {c, r, u, d}; - - CDCEventOutput (ChangeEvent event) { - String valueString = event.value().toString(); - JSONObject value = new JSONObject(valueString); - this.payload = value.getJSONObject(PAYLOAD); - } - - public Object getJsonPayloadBeforeEvent() { - Object beforeObject = null; - if (payload.has(BEFORE)) { - beforeObject = payload.get(BEFORE); - } - return beforeObject; - - } - - public void setJsonPayloadBeforeEvent(Object beforeEvent) { - this.beforeEvent = beforeEvent; - } - - public Object getJsonPayloadAfterEvent() { - Object afterObject = null; - if (payload.has(AFTER)) { - afterObject = payload.get(AFTER); - } - return afterObject; - } - - public void setJsonPayloadAfterEvent(Object afterEvent) { - this.afterEvent = afterEvent; - } - - public Long getTs_ms() { - if (payload.has(TS_MS)) { - return payload.getLong(TS_MS); - } - return null; - } - - public void setTs_ms(Long ts_ms) { - this.ts_ms = ts_ms; - } - - public String getDatabase() { - if (getSource() != null) { - if (getSource().has(DB)) { - return getSource().getString(DB); - } - return null; - } - return null; - } - - public void setDatabase(String database) { - this.database = database; - } - - public Object getTable() { - Object tableObject = null; - if (getSource() != null) { - if (getSource().has(TABLE)) { - tableObject = getSource().get(TABLE); - } - } - return tableObject; - } - - public void setTable(Object table) { - this.table = table; - } - - private JSONObject getSource () { - if (payload.has(SOURCE)) { - return payload.getJSONObject(SOURCE); - } - return null; - } - - public String getOp() { - if (payload.has(OP)) { - return getOpString(payload.getString(OP)); - } - return null; - } - private String getOpString(String op) { - if (op != null) { - switch (operations.valueOf(op)) { - case c: - return "CREATE"; - case r: - return "READ"; - case u: - return "UPDATE"; - case d: - return "DELETE"; - } - } - return null; - } - - public void setOp(String op) { - this.op = op; - } - - public JSONObject getOutputJsonPayload () { - if (payload == null) { - return null; - } - JSONObject jsonPayload = new JSONObject(); - jsonPayload.put(OP, getOp()); - jsonPayload.put(BEFORE, getJsonPayloadBeforeEvent()); - jsonPayload.put(AFTER, getJsonPayloadAfterEvent()); - return jsonPayload; - } -} diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCListener.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCListener.java deleted file mode 100644 index d249a37677..0000000000 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCListener.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.wso2.carbon.inbound.endpoint.protocol.cdc; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.synapse.SynapseException; -import org.apache.synapse.inbound.InboundProcessorParams; -import org.apache.synapse.inbound.InboundRequestProcessor; -import org.wso2.carbon.inbound.endpoint.persistence.PersistenceUtils; - -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.INBOUND_ENDPOINT_PARAMETER_CDC_PORT; - -public class CDCListener implements InboundRequestProcessor { - - private InboundProcessorParams processorParams; - private int port; - private String name; - private static final Log LOGGER = LogFactory.getLog(CDCListener.class); - - public CDCListener(InboundProcessorParams params) { - - processorParams = params; - String portParam = params.getProperties() - .getProperty(INBOUND_ENDPOINT_PARAMETER_CDC_PORT); - try { - port = Integer.parseInt(portParam); - } catch (NumberFormatException e) { - handleException("Validation failed for the port parameter " + portParam, e); - } - name = params.getName(); - } - - protected void handleException(String msg, Exception e) { - LOGGER.error(msg, e); - throw new SynapseException(msg, e); - } - - @Override - public void init() { - System.out.println("Init called"); - int offsetPort = port + PersistenceUtils.getPortOffset(processorParams.getProperties()); - CDCEndpointManager.getInstance().startEndpoint(offsetPort, name, processorParams); - } - - @Override - public void destroy() { - System.out.println("Destroy called"); - int offsetPort = port + PersistenceUtils.getPortOffset(processorParams.getProperties()); - CDCEndpointManager.getInstance().closeEndpoint(offsetPort); - } -} diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCSourceHandler.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCSourceHandler.java deleted file mode 100644 index 7df87f60ff..0000000000 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCSourceHandler.java +++ /dev/null @@ -1,165 +0,0 @@ -package org.wso2.carbon.inbound.endpoint.protocol.cdc; - -import io.debezium.engine.ChangeEvent; -import org.apache.axiom.om.OMAbstractFactory; -import org.apache.axiom.om.OMElement; -import org.apache.axiom.soap.SOAPEnvelope; -import org.apache.axiom.soap.SOAPFactory; -import org.apache.axiom.util.UIDGenerator; -import org.apache.axis2.AxisFault; -import org.apache.axis2.builder.Builder; -import org.apache.axis2.builder.BuilderUtil; -import org.apache.axis2.context.OperationContext; -import org.apache.axis2.context.ServiceContext; -import org.apache.axis2.description.InOutAxisOperation; -import org.apache.axis2.transport.TransportUtils; -import org.apache.commons.io.input.AutoCloseInputStream; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.synapse.MessageContext; -import org.apache.synapse.SynapseConstants; -import org.apache.synapse.api.ApiConstants; -import org.apache.synapse.core.axis2.Axis2MessageContext; -import org.apache.synapse.core.axis2.MessageContextCreatorForAxis2; -import org.apache.synapse.inbound.InboundProcessorParams; -import org.apache.synapse.mediators.MediatorFaultHandler; -import org.apache.synapse.mediators.base.SequenceMediator; -import org.wso2.carbon.inbound.endpoint.osgi.service.ServiceReferenceHolder; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.IOException; - - -import static org.wso2.carbon.inbound.endpoint.common.Constants.SUPER_TENANT_DOMAIN_NAME; -import static org.wso2.carbon.inbound.endpoint.common.Constants.TENANT_DOMAIN; -import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.*; - - -public class CDCSourceHandler { - - private int port; - private InboundProcessorParams params; - private static final String tenantDomain = SUPER_TENANT_DOMAIN_NAME; - private static final Log log = LogFactory.getLog(CDCSourceHandler.class); - private static final String contentType = "application/json"; - - public CDCSourceHandler(int port, InboundProcessorParams params) { - this.port = port; - this.params = params; - } - - public void requestReceived(ChangeEvent eventRecord) { - if (eventRecord == null || eventRecord.value() == null) { - log.debug("CDC Source Handler received empty event record"); - } else { - log.debug("CDC Source Handler request received"); - - MessageContext synCtx = null; - try { - - synCtx = getSynapseMessageContext(tenantDomain); - - CDCEventOutput cdcEventOutput = new CDCEventOutput(eventRecord); - synCtx.setProperty(DATABASE_NAME, cdcEventOutput.getDatabase()); - synCtx.setProperty(TABLES, cdcEventOutput.getTable().toString()); - synCtx.setProperty(OPERATIONS, cdcEventOutput.getOp()); - synCtx.setProperty(TS_MS, cdcEventOutput.getTs_ms().toString()); - - org.apache.axis2.context.MessageContext axis2MsgCtx = ((org.apache.synapse.core.axis2.Axis2MessageContext) synCtx) - .getAxis2MessageContext(); - Builder builder = BuilderUtil.getBuilderFromSelector(contentType, axis2MsgCtx); - - if (builder != null) { - String serializedChangeEvent = cdcEventOutput.getOutputJsonPayload().toString(); - InputStream in = new AutoCloseInputStream( - new ByteArrayInputStream(serializedChangeEvent.getBytes())); - - OMElement documentElement = builder.processDocument(in, contentType, axis2MsgCtx); - synCtx.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement)); - - if (log.isDebugEnabled()) { - log.debug("CDCEvent being injected to Sequence"); - } - injectForMediation(synCtx); - return; - } - - } catch (AxisFault e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - private static org.apache.axis2.context.MessageContext createAxis2MessageContext() { - org.apache.axis2.context.MessageContext axis2MsgCtx = new org.apache.axis2.context.MessageContext(); - axis2MsgCtx.setMessageID(UIDGenerator.generateURNString()); - axis2MsgCtx.setConfigurationContext( - ServiceReferenceHolder.getInstance().getConfigurationContextService().getServerConfigContext()); - axis2MsgCtx.setProperty(org.apache.axis2.context.MessageContext.CLIENT_API_NON_BLOCKING, Boolean.TRUE); - axis2MsgCtx.setServerSide(true); - - return axis2MsgCtx; - } - - private static org.apache.synapse.MessageContext createSynapseMessageContext(String tenantDomain) throws AxisFault { - org.apache.axis2.context.MessageContext axis2MsgCtx = createAxis2MessageContext(); - ServiceContext svcCtx = new ServiceContext(); - OperationContext opCtx = new OperationContext(new InOutAxisOperation(), svcCtx); - axis2MsgCtx.setServiceContext(svcCtx); - axis2MsgCtx.setOperationContext(opCtx); - - axis2MsgCtx.setProperty(TENANT_DOMAIN, tenantDomain); - - SOAPFactory fac = OMAbstractFactory.getSOAP11Factory(); - SOAPEnvelope envelope = fac.getDefaultEnvelope(); - axis2MsgCtx.setEnvelope(envelope); - return MessageContextCreatorForAxis2.getSynapseMessageContext(axis2MsgCtx); - } - - public org.apache.synapse.MessageContext getSynapseMessageContext(String tenantDomain) throws AxisFault { - MessageContext synCtx = createSynapseMessageContext(tenantDomain); - synCtx.setProperty(SynapseConstants.IS_INBOUND, true); - ((Axis2MessageContext) synCtx).getAxis2MessageContext().setProperty(SynapseConstants.IS_INBOUND, true); - - return synCtx; - } - - - private void injectForMediation(org.apache.synapse.MessageContext synCtx) { - SequenceMediator faultSequence = getFaultSequence(synCtx); - - MediatorFaultHandler mediatorFaultHandler = new MediatorFaultHandler(faultSequence); - synCtx.pushFaultHandler(mediatorFaultHandler); - if (log.isDebugEnabled()) { - log.debug("injecting message to sequence : " + params.getInjectingSeq()); - } - synCtx.setProperty("inbound.endpoint.name", params.getName()); - synCtx.setProperty(ApiConstants.API_CALLER, params.getName()); - - SequenceMediator injectingSequence = null; - if (params.getInjectingSeq() != null) { - injectingSequence = (SequenceMediator) synCtx.getSequence(params.getInjectingSeq()); - } - if (injectingSequence == null) { - injectingSequence = (SequenceMediator) synCtx.getMainSequence(); - } - - synCtx.getEnvironment().injectMessage(synCtx, injectingSequence); - - } - - private SequenceMediator getFaultSequence(org.apache.synapse.MessageContext synCtx) { - SequenceMediator faultSequence = null; - if (params.getOnErrorSeq() != null) { - faultSequence = (SequenceMediator) synCtx.getSequence(params.getOnErrorSeq()); - } - if (faultSequence == null) { - faultSequence = (SequenceMediator) synCtx.getFaultSequence(); - } - return faultSequence; - } - -} diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java deleted file mode 100644 index 7e99edf490..0000000000 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCConstants.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.wso2.carbon.inbound.endpoint.protocol.cdc; - -class InboundCDCConstants { - - /** Inbound Endpoint Parameters **/ - public static final String CDC = "cdc"; - public static final String INBOUND_ENDPOINT_PARAMETER_CDC_PORT = "inbound.cdc.port"; - - //debezium - public static final String DEBEZIUM_NAME = "name"; - public static final String DEBEZIUM_SNAPSHOT_MODE = "snapshot.mode"; - public static final String DEBEZIUM_MAX_THREADS = "snapshot.max.threads"; - public static final String DEBEZIUM_OFFSET_STORAGE = "offset.storage"; - public static final String DEBEZIUM_OFFSET_STORAGE_FILE_FILENAME = "offset.storage.file.filename"; - public static final String DEBEZIUM_OFFSET_FLUSH_INTERVAL_MS = "offset.flush.interval.ms"; - public static final String DEBEZIUM_CONNECTOR_CLASS = "connector.class"; - public static final String DEBEZIUM_DATABASE_HOSTNAME = "database.hostname"; - public static final String DEBEZIUM_DATABASE_PORT = "database.port"; - public static final String DEBEZIUM_DATABASE_USER = "database.user"; - public static final String DEBEZIUM_DATABASE_PASSWORD = "database.password"; - public static final String DEBEZIUM_DATABASE_DBNAME = "database.dbname"; - public static final String DEBEZIUM_TABLES_INCLUDE_LIST = "table.include.list"; - public static final String DEBEZIUM_OPERATIONS_EXCLUDE_LIST = "skipped.operations"; - public static final String DEBEZIUM_DATABASE_SERVER_ID = "database.server.id"; - public static final String DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL = "database.allowPublicKeyRetrieval"; - public static final String DEBEZIUM_TOPIC_PREFIX = "topic.prefix"; - - public static final String DEBEZIUM_VALUE_CONVERTER = "value.converter"; - public static final String DEBEZIUM_KEY_CONVERTER = "key.converter"; - public static final String DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE = "value.converter.schemas.enable"; - public static final String DEBEZIUM_KEY_CONVERTER_SCHEMAS_ENABLE = "key.converter.schemas.enable"; - - public static final String DEBEZIUM_SCHEMA_HISTORY_INTERNAL = "schema.history.internal"; - public static final String DEBEZIUM_SCHEMA_HISTORY_INTERNAL_FILE_FILENAME = "schema.history.internal.file.filename"; - - /** Output Properties **/ - public static final String DATABASE_NAME = "database"; - public static final String TABLES ="tables"; - public static final String OPERATIONS ="operations"; - public static final String TS_MS = "ts_ms"; - - public static final String BEFORE = "before"; - public static final String AFTER = "after"; - public static final String SOURCE = "source"; - public static final String OP = "op"; - public static final String PAYLOAD = "payload"; - public static final String DB = "db"; - public static final String TABLE = "table"; - - public static final String TRUE = "true"; -} diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCEventExecutor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCEventExecutor.java deleted file mode 100644 index 37e3fbd70d..0000000000 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/InboundCDCEventExecutor.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.wso2.carbon.inbound.endpoint.protocol.cdc; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -public class InboundCDCEventExecutor { - - private ExecutorService executorService; - - public InboundCDCEventExecutor () { - executorService = Executors.newSingleThreadExecutor(); - } - - public ExecutorService getExecutorService() { - return this.executorService; - } - - public void shutdownEventExecutor() { - executorService.shutdown(); - try { - if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) { - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - executorService.shutdownNow(); - } - } - - -} diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdcPolling/CDCEventOutput.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdcPolling/CDCEventOutput.java index b9307fe472..6618a7b8ee 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdcPolling/CDCEventOutput.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdcPolling/CDCEventOutput.java @@ -21,7 +21,6 @@ import io.debezium.engine.ChangeEvent; import org.json.JSONObject; - import static org.wso2.carbon.inbound.endpoint.protocol.cdcPolling.InboundCDCConstants.AFTER; import static org.wso2.carbon.inbound.endpoint.protocol.cdcPolling.InboundCDCConstants.BEFORE; import static org.wso2.carbon.inbound.endpoint.protocol.cdcPolling.InboundCDCConstants.DB; @@ -31,16 +30,8 @@ import static org.wso2.carbon.inbound.endpoint.protocol.cdcPolling.InboundCDCConstants.TABLE; import static org.wso2.carbon.inbound.endpoint.protocol.cdcPolling.InboundCDCConstants.TS_MS; - public class CDCEventOutput { - private Object beforeEvent; - private Object afterEvent; - private Long ts_ms; - - private String database; - private Object table; - private String op; private JSONObject payload; private enum operations {c, r, u, d}; @@ -60,10 +51,6 @@ public Object getJsonPayloadBeforeEvent() { } - public void setJsonPayloadBeforeEvent(Object beforeEvent) { - this.beforeEvent = beforeEvent; - } - public Object getJsonPayloadAfterEvent() { Object afterObject = null; if (payload.has(AFTER)) { @@ -72,10 +59,6 @@ public Object getJsonPayloadAfterEvent() { return afterObject; } - public void setJsonPayloadAfterEvent(Object afterEvent) { - this.afterEvent = afterEvent; - } - public Long getTs_ms() { if (payload.has(TS_MS)) { return payload.getLong(TS_MS); @@ -83,10 +66,6 @@ public Long getTs_ms() { return null; } - public void setTs_ms(Long ts_ms) { - this.ts_ms = ts_ms; - } - public String getDatabase() { if (getSource() != null) { if (getSource().has(DB)) { @@ -97,10 +76,6 @@ public String getDatabase() { return null; } - public void setDatabase(String database) { - this.database = database; - } - public Object getTable() { Object tableObject = null; if (getSource() != null) { @@ -111,10 +86,6 @@ public Object getTable() { return tableObject; } - public void setTable(Object table) { - this.table = table; - } - private JSONObject getSource () { if (payload.has(SOURCE)) { return payload.getJSONObject(SOURCE); @@ -128,6 +99,7 @@ public String getOp() { } return null; } + private String getOpString(String op) { if (op != null) { switch (operations.valueOf(op)) { @@ -144,10 +116,6 @@ private String getOpString(String op) { return null; } - public void setOp(String op) { - this.op = op; - } - public JSONObject getOutputJsonPayload () { if (payload == null) { return null; diff --git a/pom.xml b/pom.xml index 1afba131df..f98c76327c 100644 --- a/pom.xml +++ b/pom.xml @@ -1104,6 +1104,12 @@ ${mysql.connector.version} test + + com.oracle.database.jdbc + ojdbc8 + ${ojdbc8.version} + test + org.apache.derby.wso2 derby @@ -1659,11 +1665,12 @@ 2.4.1.Final + 19.8.0.0 3.3.1 - 4.0.0-wso2v38 + 4.0.0-wso2v40 [4.0.0, 4.0.1) 4.7.175 1.1.3