Skip to content

Commit

Permalink
Merge pull request #2928 from RusJaI/cdc-mi
Browse files Browse the repository at this point in the history
CDC Support for MI
  • Loading branch information
RusJaI authored Sep 22, 2023
2 parents 8457467 + 2df82c4 commit 40857f8
Show file tree
Hide file tree
Showing 130 changed files with 953 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>business-adaptors</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion components/business-adaptors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>mi-component-parent</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>crypto-service</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>crypto-service</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion components/crypto-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>mi-component-parent</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.wso2.ei</groupId>
<artifactId>data-services</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.wso2.ei</groupId>
<artifactId>data-services</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.wso2.ei</groupId>
<artifactId>data-services</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.wso2.ei</groupId>
<artifactId>data-services</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.wso2.ei</groupId>
<artifactId>data-services</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion components/data/data-services/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>data</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion components/data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>mi-component-parent</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion components/javax.cache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>mi-component-parent</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>org.wso2.ei</groupId>
<artifactId>data-publishers</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.wso2.ei</groupId>
<artifactId>data-publishers</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>data-publishers</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion components/mediation/data-publishers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>mediation</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>extensions</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.wso2.ei</groupId>
<artifactId>org.wso2.micro.integrator.security.handlers</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.wso2.ei</groupId>
<artifactId>extensions</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion components/mediation/extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>mediation</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<artifactId>inbound-endpoints</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>inbound-endpoints</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>inbound-endpoints</artifactId>
<groupId>org.wso2.ei</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -170,6 +170,10 @@
<groupId>org.apache.axis2.transport</groupId>
<artifactId>axis2-transport-rabbitmq-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.orbit.debezium</groupId>
<artifactId>debezium</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
Expand All @@ -180,6 +184,14 @@
<artifactId>log4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.securevault</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down Expand Up @@ -212,7 +224,8 @@
</Private-Package>
<Import-Package>
org.wso2.carbon.inbound.endpoint.persistence,
org.wso2.carbon.inbound.endpoint.osgi.service;
org.wso2.carbon.inbound.endpoint.osgi.service,
*;resolution:=optional
</Import-Package>
<DynamicImport-Package>*</DynamicImport-Package>
<Fragment-Host>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.synapse.inbound.InboundProcessorParams;
import org.apache.synapse.inbound.InboundRequestProcessor;
import org.apache.synapse.inbound.InboundRequestProcessorFactory;
import org.wso2.carbon.inbound.endpoint.protocol.cdc.CDCProcessor;
import org.wso2.carbon.inbound.endpoint.protocol.generic.GenericEventBasedConsumer;
import org.wso2.carbon.inbound.endpoint.protocol.generic.GenericEventBasedListener;
import org.wso2.carbon.inbound.endpoint.protocol.generic.GenericInboundListener;
Expand All @@ -44,7 +45,7 @@
*/
public class InboundRequestProcessorFactoryImpl implements InboundRequestProcessorFactory {

public static enum Protocols {jms, file, http, https, hl7, kafka, mqtt, rabbitmq, ws, wss, grpc, httpws, httpswss}
public static enum Protocols {jms, file, http, https, hl7, kafka, mqtt, rabbitmq, ws, wss, grpc, httpws, httpswss, cdc}

/**
* return underlying Request Processor Implementation according to protocol
Expand Down Expand Up @@ -97,6 +98,8 @@ public InboundRequestProcessor createInboundProcessor(InboundProcessorParams par
case grpc:
inboundRequestProcessor = new InboundGRPCListener(params);
break;
case cdc:
inboundRequestProcessor = new CDCProcessor(params);
}
} else if (params.getClassImpl() != null) {
if (GenericInboundListener.isListeningInboundEndpoint(params)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.carbon.inbound.endpoint.protocol.cdc;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.debezium.engine.ChangeEvent;

import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.AFTER;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.BEFORE;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DB;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.OP;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.PAYLOAD;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.SOURCE;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TABLE;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TS_MS;

public class CDCEventOutput {

private JsonObject payload;

private enum operations {c, r, u, d};

CDCEventOutput(ChangeEvent event) {
String valueString = event.value().toString();
JsonObject value = new Gson().fromJson(valueString, JsonObject.class);
this.payload = value.getAsJsonObject(PAYLOAD);
}

public JsonElement getJsonPayloadBeforeEvent() {
return payload.get(BEFORE);
}

public JsonElement getJsonPayloadAfterEvent() {
return payload.get(AFTER);
}

public Long getTs_ms() {
if (payload.has(TS_MS)) {
return payload.get(TS_MS).getAsLong();
}
return null;
}

public String getDatabase() {
if (getSource() != null && getSource().has(DB)) {
return getSource().get(DB).getAsString();
}
return null;
}

public JsonElement getTable() {
JsonElement tableObject = null;
if (getSource() != null && getSource().has(TABLE)) {
tableObject = getSource().get(TABLE);
}
return tableObject;
}

private JsonObject getSource () {
if (payload.has(SOURCE)) {
return payload.getAsJsonObject(SOURCE);
}
return null;
}

public String getOp() {
if (payload.has(OP)) {
return getOpString(payload.get(OP).getAsString());
}
return null;
}

private String getOpString(String op) {
if (op != null) {
switch (operations.valueOf(op)) {
case c:
return "CREATE";
case r:
return "READ";
case u:
return "UPDATE";
case d:
return "DELETE";
}
}
return null;
}

public JsonObject getOutputJsonPayload () {
if (payload == null) {
return null;
}
JsonObject jsonPayload = new JsonObject();
jsonPayload.addProperty(OP, getOp());
jsonPayload.add(BEFORE, getJsonPayloadBeforeEvent());
jsonPayload.add(AFTER, getJsonPayloadAfterEvent());
return jsonPayload;
}
}
Loading

0 comments on commit 40857f8

Please sign in to comment.