Skip to content

Commit

Permalink
MOSIP-23025 : using activemq artemis
Browse files Browse the repository at this point in the history
  • Loading branch information
Monobikash Das committed Sep 20, 2022
1 parent 698d258 commit 91a1b72
Show file tree
Hide file tree
Showing 19 changed files with 158 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import javax.jms.Message;
import javax.jms.TextMessage;

import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
import org.assertj.core.util.Arrays;
import org.json.simple.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -380,7 +380,7 @@ public void consumerListener(Message message, String abisInBoundAddress, MosipQu
TextMessage textMessage = (TextMessage) message;
response =textMessage.getText();
} else
response = new String(((ActiveMQBytesMessage) message).getContent().data);
response = ((ActiveMQBytesMessage) message).readUTF();
JSONObject inserOrIdentifyResponse = JsonUtil.objectMapperReadValue(response, JSONObject.class);
String requestId = JsonUtil.getJSONValue(inserOrIdentifyResponse, REQUESTID);
String batchId = packetInfoManager.getBatchIdByRequestId(requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.mosip.registration.processor.core.exception.RegistrationProcessorCheckedException;
import io.mosip.registration.processor.packet.storage.utils.Utilities;

import javax.jms.JMSException;

/**
* All the configuration validations will be done in this class
*/
Expand Down Expand Up @@ -57,7 +59,7 @@ private void validateReprocessElapseTimeConfig() {
" it should should be greater than all the queue expiry put together with an" +
" additional buffer {}", reprocessorElapseTime, allowedReprocessTime);
}
} catch (RegistrationProcessorCheckedException e) {
} catch (RegistrationProcessorCheckedException | JMSException e) {
logger.error("Abis queue details invalid", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import java.util.List;

import io.mosip.registration.processor.core.util.PropertiesUtil;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.aspectj.apache.bcel.util.ByteSequence;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -61,6 +62,8 @@
import io.vertx.core.Vertx;
import io.vertx.ext.web.Router;

import javax.jms.JMSException;

@RunWith(PowerMockRunner.class)
@PrepareForTest({ JsonUtil.class })
@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*","javax.management.*", "javax.net.ssl.*" })
Expand Down Expand Up @@ -162,9 +165,9 @@ public Integer getPort() {
private PropertiesUtil propertiesUtil;

@Before
public void setUp() throws RegistrationProcessorCheckedException {
public void setUp() throws RegistrationProcessorCheckedException, JMSException {
MockitoAnnotations.openMocks(this);
ReflectionTestUtils.setField(stage, "messageFormat", "byte");
ReflectionTestUtils.setField(stage, "messageFormat", "text");
ReflectionTestUtils.setField(stage, "workerPoolSize", 10);
ReflectionTestUtils.setField(stage, "messageExpiryTimeLimit", Long.valueOf(0));
ReflectionTestUtils.setField(stage, "clusterManagerUrl", "/dummyPath");
Expand Down Expand Up @@ -450,15 +453,14 @@ public void testException() {


@Test
public void testConsumerListener() throws RegistrationProcessorCheckedException {
public void testConsumerListener() throws RegistrationProcessorCheckedException, JMSException {

String failedInsertResponse = "{\"id\":\"mosip.abis.insert\",\"requestId\":\"5b64e806-8d5f-4ba1-b641-0b55cf40c0e1\",\"responsetime\":"
+ null + ",\"returnValue\":2,\"failureReason\":7}\r\n"
+ "";
ActiveMQBytesMessage amq = new ActiveMQBytesMessage();
ByteSequence byteSeq = new ByteSequence();
byteSeq.setData(failedInsertResponse.getBytes());
amq.setContent(byteSeq);
ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class);
amq.setText(failedInsertResponse);
Mockito.when(amq.getText()).thenReturn(failedInsertResponse);
Vertx vertx = Mockito.mock(Vertx.class);
//MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware.");
MosipEventBus evenBus = Mockito.mock(MosipEventBus.class);
Expand All @@ -472,8 +474,8 @@ public void testConsumerListener() throws RegistrationProcessorCheckedException
String sucessfulResponse = "{\"id\":\"mosip.abis.insert\",\"requestId\":\"5b64e806-8d5f-4ba1-b641-0b55cf40c0e1\",\"responsetime\":"
+ null + ",\"returnValue\":1,\"failureReason\":null}\r\n"
+ "";
byteSeq.setData(sucessfulResponse.getBytes());
amq.setContent(byteSeq);
amq.setText(sucessfulResponse);
Mockito.when(amq.getText()).thenReturn(sucessfulResponse);
AbisRequestDto abisCommonRequestDto1 = new AbisRequestDto();
abisCommonRequestDto1.setRequestType("INSERT");
abisCommonRequestDto1.setAbisAppCode("Abis1");
Expand All @@ -490,10 +492,8 @@ public void testConsumerListener() throws RegistrationProcessorCheckedException
public void testConsumerListenerForIdentifyReq() throws RegistrationProcessorCheckedException, IOException {

String failedInsertResponse = "{\"id\":\"mosip.id.identify\",\"requestId\":\"01234567-89AB-CDEF-0123-456789ABCDEF\",\"responsetime\":\"2020-03-29T07:01:24.692Z\",\"returnValue\":\"2\",\"failureReason\":\"7\"}";
ActiveMQBytesMessage amq = new ActiveMQBytesMessage();
ByteSequence byteSeq = new ByteSequence();
byteSeq.setData(failedInsertResponse.getBytes());
amq.setContent(byteSeq);
ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class);
Mockito.when(amq.getText()).thenReturn(failedInsertResponse);
Vertx vertx = Mockito.mock(Vertx.class);
//MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware.");
MosipEventBus evenBus = Mockito.mock(MosipEventBus.class);
Expand All @@ -517,10 +517,8 @@ public void testConsumerListenerForIdentifyReqIOException() throws RegistrationP

// test for IO Exception
String failedInsertResponse = "{\"id\":\"mosip.id.identify\",\"requestId\":\"01234567-89AB-CDEF-0123-456789ABCDEF\",\"responsetime\":\"2020-03-29T07:01:24.692Z\",\"returnValue\":\"2\",\"failureReason\":\"7\"}";
ActiveMQBytesMessage amq = new ActiveMQBytesMessage();
ByteSequence byteSeq = new ByteSequence();
byteSeq.setData(failedInsertResponse.getBytes());
amq.setContent(byteSeq);
ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class);
Mockito.when(amq.getText()).thenReturn(failedInsertResponse);
MosipEventBus evenBus = Mockito.mock(MosipEventBus.class);
MosipQueue queue = Mockito.mock(MosipQueue.class);

Expand All @@ -541,10 +539,8 @@ public void batchIdNull() throws RegistrationProcessorCheckedException {
String sucessfulResponse = "{\"id\":\"mosip.abis.insert\",\"requestId\":\"5b64e806-8d5f-4ba1-b641-0b55cf40c0e1\",\"responsetime\":"
+ null + ",\"returnValue\":1,\"failureReason\":null}\r\n"
+ "";
ActiveMQBytesMessage amq1 = new ActiveMQBytesMessage();
ByteSequence byteSeq1 = new ByteSequence();
byteSeq1.setData(sucessfulResponse.getBytes());
amq1.setContent(byteSeq1);
ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class);
Mockito.when(amq.getText()).thenReturn(sucessfulResponse);
Vertx vertx1 = Mockito.mock(Vertx.class);
//MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware.");
MosipEventBus eventBus = Mockito.mock(MosipEventBus.class);
Expand All @@ -554,7 +550,7 @@ public void batchIdNull() throws RegistrationProcessorCheckedException {
abisCommonRequestDto1.setAbisAppCode("Abis1");
//Mockito.when(packetInfoManager.getAbisRequestByRequestId(Mockito.any())).thenReturn(abisCommonRequestDto1);
Mockito.when(packetInfoManager.getBatchIdByRequestId(Mockito.anyString())).thenReturn(null);
stage.consumerListener(amq1, "abis1_inboundAddress", queue1, eventBus, messageTTL);
stage.consumerListener(amq, "abis1_inboundAddress", queue1, eventBus, messageTTL);

}

Expand All @@ -567,10 +563,8 @@ public void testIdentifyConsumerListener() throws RegistrationProcessorCheckedEx
// test for identify succes response - no duplicates
String identifySucessfulResponse = "{\"id\":\"mosip.abis.identify\",\"requestId\":\"8a3effd4-5fba-44e0-8cbb-3083ba098209\",\"responsetime\":"
+ null + ",\"returnValue\":1,\"failureReason\":null,\"candidateList\":null}";
ActiveMQBytesMessage amq1 = new ActiveMQBytesMessage();
ByteSequence byteSeq1 = new ByteSequence();
byteSeq1.setData(identifySucessfulResponse.getBytes());
amq1.setContent(byteSeq1);
ActiveMQTextMessage amq1 = Mockito.mock(ActiveMQTextMessage.class);
Mockito.when(amq1.getText()).thenReturn(identifySucessfulResponse);
Vertx vertx1 = Mockito.mock(Vertx.class);
//MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware.");
MosipEventBus evenBus1 = Mockito.mock(MosipEventBus.class);
Expand All @@ -586,8 +580,7 @@ public void testIdentifyConsumerListener() throws RegistrationProcessorCheckedEx
// test for identify failed response
String identifyFailedResponse = "{\"id\":\"mosip.abis.identify\",\"requestId\":\"8a3effd4-5fba-44e0-8cbb-3083ba098209\",\"responsetime\":"
+ null + ",\"returnValue\":2,\"failureReason\":3,\"candidateList\":null}";
byteSeq1.setData(identifyFailedResponse.getBytes());
amq1.setContent(byteSeq1);
Mockito.when(amq1.getText()).thenReturn(identifyFailedResponse);
abisCommonRequestDto1.setStatusCode("SENT");
stage.consumerListener(amq1, "abis1_inboundAddress", queue1, evenBus1, messageTTL);

Expand All @@ -599,26 +592,23 @@ public void testIdentifyConsumerListener() throws RegistrationProcessorCheckedEx
String duplicateIdentifySuccessResponse = "{\"id\":\"mosip.abis.identify\",\"requestId\":\"f4b1f6fd-466c-462f-aa8b-c218596542ec\",\"responsetime\":"
+ null
+ ",\"returnValue\":1,\"failureReason\":null,\"candidateList\":{\"count\":\"1\",\"candidates\":[{\"referenceId\":\"d1070375-0960-4e90-b12c-72ab6186444d\",\"analytics\":null,\"modalities\":null}]}}";
byteSeq1.setData(duplicateIdentifySuccessResponse.getBytes());
amq1.setContent(byteSeq1);
Mockito.when(amq1.getText()).thenReturn(duplicateIdentifySuccessResponse);
abisCommonRequestDto1.setStatusCode("SENT");
stage.consumerListener(amq1, "abis1_inboundAddress", queue1, evenBus1, messageTTL);

}

@Test(expected = RegistrationProcessorUnCheckedException.class)
public void testDeployVerticle() throws RegistrationProcessorCheckedException {
public void testDeployVerticle() throws RegistrationProcessorCheckedException, JMSException {
Mockito.when(utility.getAbisQueueDetails()).thenThrow(RegistrationProcessorCheckedException.class);
stage.deployVerticle();
}

@Test
public void testConsumerListenerForIdentifyRequest() throws RegistrationProcessorCheckedException, IOException {
String failedInsertResponse = "{\"id\":\"mosip.id.identify\",\"requestId\":\"01234567-89AB-CDEF-0123-456789ABCDEF\",\"responsetime\":\"2020-03-29T07:01:24.692Z\",\"returnValue\":\"2\",\"failureReason\":\"7\"}";
ActiveMQBytesMessage amq = new ActiveMQBytesMessage();
ByteSequence byteSeq = new ByteSequence();
byteSeq.setData(failedInsertResponse.getBytes());
amq.setContent(byteSeq);
ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class);
Mockito.when(amq.getText()).thenReturn(failedInsertResponse);
Vertx vertx = Mockito.mock(Vertx.class);
//MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware.");
MosipEventBus evenBus = Mockito.mock(MosipEventBus.class);
Expand All @@ -639,7 +629,7 @@ public void testConsumerListenerForIdentifyRequest() throws RegistrationProcesso
candidates[0] = candidate;
candidateList.setCandidates(candidates);
abisIdentifyResponseDto.setCandidateList(candidateList);
String response = new String(((ActiveMQBytesMessage) amq).getContent().data);
String response = new String(((ActiveMQTextMessage) amq).getText());
PowerMockito.mockStatic(JsonUtil.class);
PowerMockito.when(JsonUtil.objectMapperReadValue(response, AbisIdentifyResponseDto.class)).thenReturn(abisIdentifyResponseDto);
PowerMockito.when(JsonUtil.readValueWithUnknownProperties(failedInsertResponse,
Expand All @@ -652,12 +642,8 @@ public void testConsumerListenerForIdentifyRequest() throws RegistrationProcesso
@Test(expected = RegistrationProcessorCheckedException.class)
public void testConsumerListenerForIdentifyReqException() throws RegistrationProcessorCheckedException, IOException {
String failedInsertResponse = "{\"id\":\"mosip.id.identify\",\"requestId\":\"01234567-89AB-CDEF-0123-456789ABCDEF\",\"responsetime\":\"2020-03-29T07:01:24.692Z\",\"returnValue\":\"2\",\"failureReason\":\"7\"}";
ActiveMQBytesMessage amq = new ActiveMQBytesMessage();
ByteSequence byteSeq = new ByteSequence();
byteSeq.setData(failedInsertResponse.getBytes());
amq.setContent(byteSeq);
Vertx vertx = Mockito.mock(Vertx.class);
//MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware.");
ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class);
Mockito.when(amq.getText()).thenReturn(failedInsertResponse);
MosipEventBus evenBus = Mockito.mock(MosipEventBus.class);
MosipQueue queue = Mockito.mock(MosipQueue.class);
AbisRequestDto abisCommonRequestDto = new AbisRequestDto();
Expand All @@ -676,7 +662,7 @@ public void testConsumerListenerForIdentifyReqException() throws RegistrationPro
candidates[0] = candidate;
candidateList.setCandidates(candidates);
abisIdentifyResponseDto.setCandidateList(candidateList);
String response = new String(((ActiveMQBytesMessage) amq).getContent().data);
String response = new String(((ActiveMQTextMessage) amq).getText());
PowerMockito.mockStatic(JsonUtil.class);
PowerMockito.when(JsonUtil.readValueWithUnknownProperties(response, AbisIdentifyResponseDto.class)).thenThrow(IOException.class);
stage.consumerListener(amq, "abis1_inboundAddress", queue, evenBus, messageTTL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.mosip.registration.processor.core.exception.RegistrationProcessorCheckedException;
import io.mosip.registration.processor.packet.storage.utils.Utilities;

import javax.jms.JMSException;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { AbisMiddlewareAppConfigurationsValidatorTest.MyConfig.class })
public class AbisMiddlewareAppConfigurationsValidatorTest {
Expand All @@ -45,7 +47,7 @@ public void setup() throws Exception {
}

@Test
public void validateConfigurationsSuccessTest() throws RegistrationProcessorCheckedException {
public void validateConfigurationsSuccessTest() throws RegistrationProcessorCheckedException, JMSException {

AbisQueueDetails details = new AbisQueueDetails();
details.setInboundMessageTTL(100);
Expand All @@ -55,7 +57,7 @@ public void validateConfigurationsSuccessTest() throws RegistrationProcessorChec
}

@Test
public void validateConfigurationsElapseTimeTest() throws RegistrationProcessorCheckedException {
public void validateConfigurationsElapseTimeTest() throws RegistrationProcessorCheckedException, JMSException {

ReflectionTestUtils.setField(abisMiddlewareAppConfigurationsValidator, "reprocessorElapseTime", 1000);
AbisQueueDetails details = new AbisQueueDetails();
Expand All @@ -66,7 +68,7 @@ public void validateConfigurationsElapseTimeTest() throws RegistrationProcessorC
}

@Test
public void validateConfigurationsFailureTest() throws RegistrationProcessorCheckedException {
public void validateConfigurationsFailureTest() throws RegistrationProcessorCheckedException, JMSException {

Mockito.when(utility.getAbisQueueDetails()).thenThrow(RegistrationProcessorCheckedException.class);
abisMiddlewareAppConfigurationsValidator.validateConfigurations(listener.events.peek());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.util.ReflectionTestUtils;

import javax.jms.JMSException;

import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.*;

Expand All @@ -42,7 +44,7 @@ public void setup() {
}

@Test
public void validateConfigurationsTest() throws RegistrationProcessorCheckedException {
public void validateConfigurationsTest() throws RegistrationProcessorCheckedException, JMSException {
ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
final Appender<ILoggingEvent> mockAppender = mock(Appender.class);
Expand All @@ -67,7 +69,7 @@ public boolean matches(ILoggingEvent argument) {
}

@Test
public void validateConfigurationsExceptionTest() throws RegistrationProcessorCheckedException {
public void validateConfigurationsExceptionTest() throws RegistrationProcessorCheckedException, JMSException {
ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
final Appender<ILoggingEvent> mockAppender = mock(Appender.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import io.mosip.registration.processor.status.dto.InternalRegistrationStatusDto;
import io.mosip.registration.processor.status.dto.RegistrationStatusDto;
import io.mosip.registration.processor.status.service.RegistrationStatusService;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
Expand All @@ -35,6 +35,7 @@
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

Expand Down Expand Up @@ -192,15 +193,19 @@ public MessageDTO process(MessageDTO object) {
}

private MosipQueue getQueueConnection() {
String failOverBrokerUrl = FAIL_OVER + url + "," + url + RANDOMIZE_FALSE;
return mosipConnectionFactory.createConnection(typeOfQueue, username, password, failOverBrokerUrl);
try {
return mosipConnectionFactory.createConnection(typeOfQueue, username, password, url);
} catch (JMSException e) {
regProcLogger.error("","","", ExceptionUtils.getStackTrace(e));
return null;
}
}

public void consumerListener(Message message) {
try {
String response = null;
if (message instanceof ActiveMQBytesMessage) {
response = new String(((ActiveMQBytesMessage) message).getContent().data);
response = ((ActiveMQBytesMessage) message).readUTF();
} else if (message instanceof ActiveMQTextMessage) {
TextMessage textMessage = (TextMessage) message;
response = textMessage.getText();
Expand Down
Loading

0 comments on commit 91a1b72

Please sign in to comment.