diff --git a/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/main/java/io/mosip/registartion/processor/abis/middleware/stage/AbisMiddleWareStage.java b/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/main/java/io/mosip/registartion/processor/abis/middleware/stage/AbisMiddleWareStage.java index a0c5ce83249..4f0ccac6c3d 100644 --- a/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/main/java/io/mosip/registartion/processor/abis/middleware/stage/AbisMiddleWareStage.java +++ b/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/main/java/io/mosip/registartion/processor/abis/middleware/stage/AbisMiddleWareStage.java @@ -11,7 +11,7 @@ import javax.jms.Message; import javax.jms.TextMessage; -import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; import org.assertj.core.util.Arrays; import org.json.simple.JSONObject; import org.springframework.beans.factory.annotation.Autowired; @@ -380,7 +380,7 @@ public void consumerListener(Message message, String abisInBoundAddress, MosipQu TextMessage textMessage = (TextMessage) message; response =textMessage.getText(); } else - response = new String(((ActiveMQBytesMessage) message).getContent().data); + response = ((ActiveMQBytesMessage) message).readUTF(); JSONObject inserOrIdentifyResponse = JsonUtil.objectMapperReadValue(response, JSONObject.class); String requestId = JsonUtil.getJSONValue(inserOrIdentifyResponse, REQUESTID); String batchId = packetInfoManager.getBatchIdByRequestId(requestId); diff --git a/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/main/java/io/mosip/registartion/processor/abis/middleware/validators/AbisMiddlewareAppConfigurationsValidator.java b/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/main/java/io/mosip/registartion/processor/abis/middleware/validators/AbisMiddlewareAppConfigurationsValidator.java index 50bd72ab943..c4d7a122d69 100644 --- a/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/main/java/io/mosip/registartion/processor/abis/middleware/validators/AbisMiddlewareAppConfigurationsValidator.java +++ b/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/main/java/io/mosip/registartion/processor/abis/middleware/validators/AbisMiddlewareAppConfigurationsValidator.java @@ -13,6 +13,8 @@ import io.mosip.registration.processor.core.exception.RegistrationProcessorCheckedException; import io.mosip.registration.processor.packet.storage.utils.Utilities; +import javax.jms.JMSException; + /** * All the configuration validations will be done in this class */ @@ -57,7 +59,7 @@ private void validateReprocessElapseTimeConfig() { " it should should be greater than all the queue expiry put together with an" + " additional buffer {}", reprocessorElapseTime, allowedReprocessTime); } - } catch (RegistrationProcessorCheckedException e) { + } catch (RegistrationProcessorCheckedException | JMSException e) { logger.error("Abis queue details invalid", e); } } diff --git a/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/stage/AbisMiddleWareStageTest.java b/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/stage/AbisMiddleWareStageTest.java index 3e6d1861d26..4c513e57c44 100644 --- a/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/stage/AbisMiddleWareStageTest.java +++ b/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/stage/AbisMiddleWareStageTest.java @@ -8,8 +8,9 @@ import java.util.List; import io.mosip.registration.processor.core.util.PropertiesUtil; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; +import org.aspectj.apache.bcel.util.ByteSequence; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -61,6 +62,8 @@ import io.vertx.core.Vertx; import io.vertx.ext.web.Router; +import javax.jms.JMSException; + @RunWith(PowerMockRunner.class) @PrepareForTest({ JsonUtil.class }) @PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*","javax.management.*", "javax.net.ssl.*" }) @@ -162,9 +165,9 @@ public Integer getPort() { private PropertiesUtil propertiesUtil; @Before - public void setUp() throws RegistrationProcessorCheckedException { + public void setUp() throws RegistrationProcessorCheckedException, JMSException { MockitoAnnotations.openMocks(this); - ReflectionTestUtils.setField(stage, "messageFormat", "byte"); + ReflectionTestUtils.setField(stage, "messageFormat", "text"); ReflectionTestUtils.setField(stage, "workerPoolSize", 10); ReflectionTestUtils.setField(stage, "messageExpiryTimeLimit", Long.valueOf(0)); ReflectionTestUtils.setField(stage, "clusterManagerUrl", "/dummyPath"); @@ -450,15 +453,14 @@ public void testException() { @Test - public void testConsumerListener() throws RegistrationProcessorCheckedException { + public void testConsumerListener() throws RegistrationProcessorCheckedException, JMSException { String failedInsertResponse = "{\"id\":\"mosip.abis.insert\",\"requestId\":\"5b64e806-8d5f-4ba1-b641-0b55cf40c0e1\",\"responsetime\":" + null + ",\"returnValue\":2,\"failureReason\":7}\r\n" + ""; - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(failedInsertResponse.getBytes()); - amq.setContent(byteSeq); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + amq.setText(failedInsertResponse); + Mockito.when(amq.getText()).thenReturn(failedInsertResponse); Vertx vertx = Mockito.mock(Vertx.class); //MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware."); MosipEventBus evenBus = Mockito.mock(MosipEventBus.class); @@ -472,8 +474,8 @@ public void testConsumerListener() throws RegistrationProcessorCheckedException String sucessfulResponse = "{\"id\":\"mosip.abis.insert\",\"requestId\":\"5b64e806-8d5f-4ba1-b641-0b55cf40c0e1\",\"responsetime\":" + null + ",\"returnValue\":1,\"failureReason\":null}\r\n" + ""; - byteSeq.setData(sucessfulResponse.getBytes()); - amq.setContent(byteSeq); + amq.setText(sucessfulResponse); + Mockito.when(amq.getText()).thenReturn(sucessfulResponse); AbisRequestDto abisCommonRequestDto1 = new AbisRequestDto(); abisCommonRequestDto1.setRequestType("INSERT"); abisCommonRequestDto1.setAbisAppCode("Abis1"); @@ -490,10 +492,8 @@ public void testConsumerListener() throws RegistrationProcessorCheckedException public void testConsumerListenerForIdentifyReq() throws RegistrationProcessorCheckedException, IOException { String failedInsertResponse = "{\"id\":\"mosip.id.identify\",\"requestId\":\"01234567-89AB-CDEF-0123-456789ABCDEF\",\"responsetime\":\"2020-03-29T07:01:24.692Z\",\"returnValue\":\"2\",\"failureReason\":\"7\"}"; - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(failedInsertResponse.getBytes()); - amq.setContent(byteSeq); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + Mockito.when(amq.getText()).thenReturn(failedInsertResponse); Vertx vertx = Mockito.mock(Vertx.class); //MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware."); MosipEventBus evenBus = Mockito.mock(MosipEventBus.class); @@ -517,10 +517,8 @@ public void testConsumerListenerForIdentifyReqIOException() throws RegistrationP // test for IO Exception String failedInsertResponse = "{\"id\":\"mosip.id.identify\",\"requestId\":\"01234567-89AB-CDEF-0123-456789ABCDEF\",\"responsetime\":\"2020-03-29T07:01:24.692Z\",\"returnValue\":\"2\",\"failureReason\":\"7\"}"; - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(failedInsertResponse.getBytes()); - amq.setContent(byteSeq); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + Mockito.when(amq.getText()).thenReturn(failedInsertResponse); MosipEventBus evenBus = Mockito.mock(MosipEventBus.class); MosipQueue queue = Mockito.mock(MosipQueue.class); @@ -541,10 +539,8 @@ public void batchIdNull() throws RegistrationProcessorCheckedException { String sucessfulResponse = "{\"id\":\"mosip.abis.insert\",\"requestId\":\"5b64e806-8d5f-4ba1-b641-0b55cf40c0e1\",\"responsetime\":" + null + ",\"returnValue\":1,\"failureReason\":null}\r\n" + ""; - ActiveMQBytesMessage amq1 = new ActiveMQBytesMessage(); - ByteSequence byteSeq1 = new ByteSequence(); - byteSeq1.setData(sucessfulResponse.getBytes()); - amq1.setContent(byteSeq1); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + Mockito.when(amq.getText()).thenReturn(sucessfulResponse); Vertx vertx1 = Mockito.mock(Vertx.class); //MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware."); MosipEventBus eventBus = Mockito.mock(MosipEventBus.class); @@ -554,7 +550,7 @@ public void batchIdNull() throws RegistrationProcessorCheckedException { abisCommonRequestDto1.setAbisAppCode("Abis1"); //Mockito.when(packetInfoManager.getAbisRequestByRequestId(Mockito.any())).thenReturn(abisCommonRequestDto1); Mockito.when(packetInfoManager.getBatchIdByRequestId(Mockito.anyString())).thenReturn(null); - stage.consumerListener(amq1, "abis1_inboundAddress", queue1, eventBus, messageTTL); + stage.consumerListener(amq, "abis1_inboundAddress", queue1, eventBus, messageTTL); } @@ -567,10 +563,8 @@ public void testIdentifyConsumerListener() throws RegistrationProcessorCheckedEx // test for identify succes response - no duplicates String identifySucessfulResponse = "{\"id\":\"mosip.abis.identify\",\"requestId\":\"8a3effd4-5fba-44e0-8cbb-3083ba098209\",\"responsetime\":" + null + ",\"returnValue\":1,\"failureReason\":null,\"candidateList\":null}"; - ActiveMQBytesMessage amq1 = new ActiveMQBytesMessage(); - ByteSequence byteSeq1 = new ByteSequence(); - byteSeq1.setData(identifySucessfulResponse.getBytes()); - amq1.setContent(byteSeq1); + ActiveMQTextMessage amq1 = Mockito.mock(ActiveMQTextMessage.class); + Mockito.when(amq1.getText()).thenReturn(identifySucessfulResponse); Vertx vertx1 = Mockito.mock(Vertx.class); //MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware."); MosipEventBus evenBus1 = Mockito.mock(MosipEventBus.class); @@ -586,8 +580,7 @@ public void testIdentifyConsumerListener() throws RegistrationProcessorCheckedEx // test for identify failed response String identifyFailedResponse = "{\"id\":\"mosip.abis.identify\",\"requestId\":\"8a3effd4-5fba-44e0-8cbb-3083ba098209\",\"responsetime\":" + null + ",\"returnValue\":2,\"failureReason\":3,\"candidateList\":null}"; - byteSeq1.setData(identifyFailedResponse.getBytes()); - amq1.setContent(byteSeq1); + Mockito.when(amq1.getText()).thenReturn(identifyFailedResponse); abisCommonRequestDto1.setStatusCode("SENT"); stage.consumerListener(amq1, "abis1_inboundAddress", queue1, evenBus1, messageTTL); @@ -599,15 +592,14 @@ public void testIdentifyConsumerListener() throws RegistrationProcessorCheckedEx String duplicateIdentifySuccessResponse = "{\"id\":\"mosip.abis.identify\",\"requestId\":\"f4b1f6fd-466c-462f-aa8b-c218596542ec\",\"responsetime\":" + null + ",\"returnValue\":1,\"failureReason\":null,\"candidateList\":{\"count\":\"1\",\"candidates\":[{\"referenceId\":\"d1070375-0960-4e90-b12c-72ab6186444d\",\"analytics\":null,\"modalities\":null}]}}"; - byteSeq1.setData(duplicateIdentifySuccessResponse.getBytes()); - amq1.setContent(byteSeq1); + Mockito.when(amq1.getText()).thenReturn(duplicateIdentifySuccessResponse); abisCommonRequestDto1.setStatusCode("SENT"); stage.consumerListener(amq1, "abis1_inboundAddress", queue1, evenBus1, messageTTL); } @Test(expected = RegistrationProcessorUnCheckedException.class) - public void testDeployVerticle() throws RegistrationProcessorCheckedException { + public void testDeployVerticle() throws RegistrationProcessorCheckedException, JMSException { Mockito.when(utility.getAbisQueueDetails()).thenThrow(RegistrationProcessorCheckedException.class); stage.deployVerticle(); } @@ -615,10 +607,8 @@ public void testDeployVerticle() throws RegistrationProcessorCheckedException { @Test public void testConsumerListenerForIdentifyRequest() throws RegistrationProcessorCheckedException, IOException { String failedInsertResponse = "{\"id\":\"mosip.id.identify\",\"requestId\":\"01234567-89AB-CDEF-0123-456789ABCDEF\",\"responsetime\":\"2020-03-29T07:01:24.692Z\",\"returnValue\":\"2\",\"failureReason\":\"7\"}"; - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(failedInsertResponse.getBytes()); - amq.setContent(byteSeq); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + Mockito.when(amq.getText()).thenReturn(failedInsertResponse); Vertx vertx = Mockito.mock(Vertx.class); //MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware."); MosipEventBus evenBus = Mockito.mock(MosipEventBus.class); @@ -639,7 +629,7 @@ public void testConsumerListenerForIdentifyRequest() throws RegistrationProcesso candidates[0] = candidate; candidateList.setCandidates(candidates); abisIdentifyResponseDto.setCandidateList(candidateList); - String response = new String(((ActiveMQBytesMessage) amq).getContent().data); + String response = new String(((ActiveMQTextMessage) amq).getText()); PowerMockito.mockStatic(JsonUtil.class); PowerMockito.when(JsonUtil.objectMapperReadValue(response, AbisIdentifyResponseDto.class)).thenReturn(abisIdentifyResponseDto); PowerMockito.when(JsonUtil.readValueWithUnknownProperties(failedInsertResponse, @@ -652,12 +642,8 @@ public void testConsumerListenerForIdentifyRequest() throws RegistrationProcesso @Test(expected = RegistrationProcessorCheckedException.class) public void testConsumerListenerForIdentifyReqException() throws RegistrationProcessorCheckedException, IOException { String failedInsertResponse = "{\"id\":\"mosip.id.identify\",\"requestId\":\"01234567-89AB-CDEF-0123-456789ABCDEF\",\"responsetime\":\"2020-03-29T07:01:24.692Z\",\"returnValue\":\"2\",\"failureReason\":\"7\"}"; - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(failedInsertResponse.getBytes()); - amq.setContent(byteSeq); - Vertx vertx = Mockito.mock(Vertx.class); - //MosipEventBus evenBus = new MosipEventBusFactory().getEventBus(vertx, "vertx", "mosip.regproc.abis.middleware."); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + Mockito.when(amq.getText()).thenReturn(failedInsertResponse); MosipEventBus evenBus = Mockito.mock(MosipEventBus.class); MosipQueue queue = Mockito.mock(MosipQueue.class); AbisRequestDto abisCommonRequestDto = new AbisRequestDto(); @@ -676,7 +662,7 @@ public void testConsumerListenerForIdentifyReqException() throws RegistrationPro candidates[0] = candidate; candidateList.setCandidates(candidates); abisIdentifyResponseDto.setCandidateList(candidateList); - String response = new String(((ActiveMQBytesMessage) amq).getContent().data); + String response = new String(((ActiveMQTextMessage) amq).getText()); PowerMockito.mockStatic(JsonUtil.class); PowerMockito.when(JsonUtil.readValueWithUnknownProperties(response, AbisIdentifyResponseDto.class)).thenThrow(IOException.class); stage.consumerListener(amq, "abis1_inboundAddress", queue, evenBus, messageTTL); diff --git a/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/validator/AbisMiddlewareAppConfigurationsValidatorTest.java b/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/validator/AbisMiddlewareAppConfigurationsValidatorTest.java index 28d4aeae1d4..29661a073ef 100644 --- a/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/validator/AbisMiddlewareAppConfigurationsValidatorTest.java +++ b/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/validator/AbisMiddlewareAppConfigurationsValidatorTest.java @@ -23,6 +23,8 @@ import io.mosip.registration.processor.core.exception.RegistrationProcessorCheckedException; import io.mosip.registration.processor.packet.storage.utils.Utilities; +import javax.jms.JMSException; + @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = { AbisMiddlewareAppConfigurationsValidatorTest.MyConfig.class }) public class AbisMiddlewareAppConfigurationsValidatorTest { @@ -45,7 +47,7 @@ public void setup() throws Exception { } @Test - public void validateConfigurationsSuccessTest() throws RegistrationProcessorCheckedException { + public void validateConfigurationsSuccessTest() throws RegistrationProcessorCheckedException, JMSException { AbisQueueDetails details = new AbisQueueDetails(); details.setInboundMessageTTL(100); @@ -55,7 +57,7 @@ public void validateConfigurationsSuccessTest() throws RegistrationProcessorChec } @Test - public void validateConfigurationsElapseTimeTest() throws RegistrationProcessorCheckedException { + public void validateConfigurationsElapseTimeTest() throws RegistrationProcessorCheckedException, JMSException { ReflectionTestUtils.setField(abisMiddlewareAppConfigurationsValidator, "reprocessorElapseTime", 1000); AbisQueueDetails details = new AbisQueueDetails(); @@ -66,7 +68,7 @@ public void validateConfigurationsElapseTimeTest() throws RegistrationProcessorC } @Test - public void validateConfigurationsFailureTest() throws RegistrationProcessorCheckedException { + public void validateConfigurationsFailureTest() throws RegistrationProcessorCheckedException, JMSException { Mockito.when(utility.getAbisQueueDetails()).thenThrow(RegistrationProcessorCheckedException.class); abisMiddlewareAppConfigurationsValidator.validateConfigurations(listener.events.peek()); diff --git a/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/validators/AbisMiddlewareAppConfigurationsValidatorTest.java b/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/validators/AbisMiddlewareAppConfigurationsValidatorTest.java index 4eea8ce44d4..7c99fed13ef 100644 --- a/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/validators/AbisMiddlewareAppConfigurationsValidatorTest.java +++ b/registration-processor/core-processor/registration-processor-abis-middleware-stage/src/test/java/io/mosip/registartion/processor/abis/middleware/validators/AbisMiddlewareAppConfigurationsValidatorTest.java @@ -19,6 +19,8 @@ import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.util.ReflectionTestUtils; +import javax.jms.JMSException; + import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.*; @@ -42,7 +44,7 @@ public void setup() { } @Test - public void validateConfigurationsTest() throws RegistrationProcessorCheckedException { + public void validateConfigurationsTest() throws RegistrationProcessorCheckedException, JMSException { ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory .getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME); final Appender mockAppender = mock(Appender.class); @@ -67,7 +69,7 @@ public boolean matches(ILoggingEvent argument) { } @Test - public void validateConfigurationsExceptionTest() throws RegistrationProcessorCheckedException { + public void validateConfigurationsExceptionTest() throws RegistrationProcessorCheckedException, JMSException { ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory .getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME); final Appender mockAppender = mock(Appender.class); diff --git a/registration-processor/core-processor/registration-processor-manual-adjudication-stage/src/main/java/io/mosip/registration/processor/adjudication/stage/ManualAdjudicationStage.java b/registration-processor/core-processor/registration-processor-manual-adjudication-stage/src/main/java/io/mosip/registration/processor/adjudication/stage/ManualAdjudicationStage.java index 95777d68463..ec8652082b1 100644 --- a/registration-processor/core-processor/registration-processor-manual-adjudication-stage/src/main/java/io/mosip/registration/processor/adjudication/stage/ManualAdjudicationStage.java +++ b/registration-processor/core-processor/registration-processor-manual-adjudication-stage/src/main/java/io/mosip/registration/processor/adjudication/stage/ManualAdjudicationStage.java @@ -25,8 +25,8 @@ import io.mosip.registration.processor.status.dto.InternalRegistrationStatusDto; import io.mosip.registration.processor.status.dto.RegistrationStatusDto; import io.mosip.registration.processor.status.service.RegistrationStatusService; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; @@ -35,6 +35,7 @@ import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; @@ -192,15 +193,19 @@ public MessageDTO process(MessageDTO object) { } private MosipQueue getQueueConnection() { - String failOverBrokerUrl = FAIL_OVER + url + "," + url + RANDOMIZE_FALSE; - return mosipConnectionFactory.createConnection(typeOfQueue, username, password, failOverBrokerUrl); + try { + return mosipConnectionFactory.createConnection(typeOfQueue, username, password, url); + } catch (JMSException e) { + regProcLogger.error("","","", ExceptionUtils.getStackTrace(e)); + return null; + } } public void consumerListener(Message message) { try { String response = null; if (message instanceof ActiveMQBytesMessage) { - response = new String(((ActiveMQBytesMessage) message).getContent().data); + response = ((ActiveMQBytesMessage) message).readUTF(); } else if (message instanceof ActiveMQTextMessage) { TextMessage textMessage = (TextMessage) message; response = textMessage.getText(); diff --git a/registration-processor/core-processor/registration-processor-manual-adjudication-stage/src/test/java/io/mosip/registration/processor/verification/stage/ManualAdjudicationStageTest.java b/registration-processor/core-processor/registration-processor-manual-adjudication-stage/src/test/java/io/mosip/registration/processor/verification/stage/ManualAdjudicationStageTest.java index 719975bf496..6271f0bd103 100644 --- a/registration-processor/core-processor/registration-processor-manual-adjudication-stage/src/test/java/io/mosip/registration/processor/verification/stage/ManualAdjudicationStageTest.java +++ b/registration-processor/core-processor/registration-processor-manual-adjudication-stage/src/test/java/io/mosip/registration/processor/verification/stage/ManualAdjudicationStageTest.java @@ -5,6 +5,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; +import java.io.ByteArrayInputStream; import java.io.File; import java.util.HashSet; import java.util.List; @@ -25,9 +26,9 @@ import io.mosip.registration.processor.core.spi.queue.MosipQueueConnectionFactory; import io.mosip.registration.processor.core.spi.queue.MosipQueueManager; import io.mosip.registration.processor.adjudication.util.ManualVerificationRequestValidator; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; +import org.aspectj.apache.bcel.util.ByteSequence; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -67,6 +68,8 @@ import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.Session; +import javax.jms.JMSException; + @RunWith(SpringRunner.class) public class ManualAdjudicationStageTest { @@ -128,7 +131,7 @@ public Integer getPort() { } }; @Before - public void setUp() throws java.io.IOException, ApisResourceAccessException, PacketManagerException, JsonProcessingException { + public void setUp() throws java.io.IOException, ApisResourceAccessException, PacketManagerException, JsonProcessingException, JMSException { ReflectionTestUtils.setField(manualverificationstage, "mosipConnectionFactory", mosipConnectionFactory); ReflectionTestUtils.setField(manualverificationstage, "mosipQueueManager", mosipQueueManager); //ReflectionTestUtils.setField(manualverificationstage, "contextPath", "/registrationprocessor/v1/manualverification"); @@ -508,7 +511,7 @@ public boolean removeEndHandler(int handlerID) { } @Test - public void testConsumeListener() throws JsonProcessingException { + public void testConsumeListener() throws JsonProcessingException, JMSException { ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory .getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME); final Appender mockAppender = mock(Appender.class); @@ -523,10 +526,9 @@ public void testConsumeListener() throws JsonProcessingException { String response = JsonUtils.javaObjectToJsonString(resp); - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(response.getBytes()); - amq.setContent(byteSeq); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + amq.setText(response); + Mockito.when(amq.getText()).thenReturn(response); Mockito.when(manualAdjudicationService.updatePacketStatus(any(), any(), any())).thenReturn(true); diff --git a/registration-processor/core-processor/registration-processor-verification-stage/src/main/java/io/mosip/registration/processor/verification/stage/VerificationStage.java b/registration-processor/core-processor/registration-processor-verification-stage/src/main/java/io/mosip/registration/processor/verification/stage/VerificationStage.java index 65b7e782b79..e0f4d5f4d76 100644 --- a/registration-processor/core-processor/registration-processor-verification-stage/src/main/java/io/mosip/registration/processor/verification/stage/VerificationStage.java +++ b/registration-processor/core-processor/registration-processor-verification-stage/src/main/java/io/mosip/registration/processor/verification/stage/VerificationStage.java @@ -25,8 +25,8 @@ import io.mosip.registration.processor.status.dto.InternalRegistrationStatusDto; import io.mosip.registration.processor.status.dto.RegistrationStatusDto; import io.mosip.registration.processor.status.service.RegistrationStatusService; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; @@ -35,6 +35,7 @@ import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; @@ -193,15 +194,19 @@ public MessageDTO process(MessageDTO object) { } private MosipQueue getQueueConnection() { - String failOverBrokerUrl = FAIL_OVER + url + "," + url + RANDOMIZE_FALSE; - return mosipConnectionFactory.createConnection(typeOfQueue, username, password, failOverBrokerUrl); + try { + return mosipConnectionFactory.createConnection(typeOfQueue, username, password, url); + } catch (JMSException e) { + regProcLogger.error("","","", ExceptionUtils.getStackTrace(e)); + return null; + } } public void consumerListener(Message message) { try { String response = null; if (message instanceof ActiveMQBytesMessage) { - response = new String(((ActiveMQBytesMessage) message).getContent().data); + response = ((ActiveMQBytesMessage) message).readUTF(); } else if (message instanceof ActiveMQTextMessage) { TextMessage textMessage = (TextMessage) message; response = textMessage.getText(); diff --git a/registration-processor/core-processor/registration-processor-verification-stage/src/test/java/io/mosip/registration/processor/verification/service/VerificationServiceTest.java b/registration-processor/core-processor/registration-processor-verification-stage/src/test/java/io/mosip/registration/processor/verification/service/VerificationServiceTest.java index 0c1886ca76a..f77b8635a67 100644 --- a/registration-processor/core-processor/registration-processor-verification-stage/src/test/java/io/mosip/registration/processor/verification/service/VerificationServiceTest.java +++ b/registration-processor/core-processor/registration-processor-verification-stage/src/test/java/io/mosip/registration/processor/verification/service/VerificationServiceTest.java @@ -43,8 +43,9 @@ import io.mosip.registration.processor.verification.response.dto.VerificationResponseDTO; import io.mosip.registration.processor.verification.service.impl.VerificationServiceImpl; import io.mosip.registration.processor.verification.stage.VerificationStage; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; +import org.aspectj.apache.bcel.util.ByteSequence; import org.json.simple.JSONObject; import org.junit.Before; import org.junit.Ignore; @@ -61,6 +62,7 @@ import org.springframework.core.env.Environment; import org.springframework.test.util.ReflectionTestUtils; +import javax.jms.JMSException; import java.sql.Timestamp; import java.util.*; @@ -413,15 +415,14 @@ public void testVerificationInvalidRId() { } @Test(expected = InvalidRidException.class) - public void testInvalidRidException() throws JsonProcessingException, com.fasterxml.jackson.core.JsonProcessingException { + public void testInvalidRidException() throws JsonProcessingException, com.fasterxml.jackson.core.JsonProcessingException, JMSException { String response = objectMapper.writeValueAsString(resp); - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(response.getBytes()); - amq.setContent(byteSeq); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + amq.setText(response); + Mockito.when(amq.getText()).thenReturn(response); resp.setRequestId("2344"); @@ -429,15 +430,14 @@ public void testInvalidRidException() throws JsonProcessingException, com.faster } @Test - public void testNoRecordAssignedException() throws JsonProcessingException, com.fasterxml.jackson.core.JsonProcessingException { + public void testNoRecordAssignedException() throws JsonProcessingException, com.fasterxml.jackson.core.JsonProcessingException, JMSException { String response = objectMapper.writeValueAsString(resp); - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(response.getBytes()); - amq.setContent(byteSeq); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + amq.setText(response); + Mockito.when(amq.getText()).thenReturn(response); boolean result = verificationService.updatePacketStatus(resp, stageName, queue); @@ -446,16 +446,15 @@ public void testNoRecordAssignedException() throws JsonProcessingException, com. @Test @Ignore - public void testUpdateStatusSuccess() throws com.fasterxml.jackson.core.JsonProcessingException { + public void testUpdateStatusSuccess() throws com.fasterxml.jackson.core.JsonProcessingException, JMSException { Mockito.when(basePacketRepository.getAssignedVerificationRecord(anyString(), anyString())).thenReturn(entities); String response = objectMapper.writeValueAsString(resp); - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(response.getBytes()); - amq.setContent(byteSeq); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + amq.setText(response); + Mockito.when(amq.getText()).thenReturn(response); boolean result = verificationService.updatePacketStatus(resp, stageName, queue); @@ -463,16 +462,15 @@ public void testUpdateStatusSuccess() throws com.fasterxml.jackson.core.JsonProc } @Test - public void testUpdateStatusResend() throws com.fasterxml.jackson.core.JsonProcessingException { + public void testUpdateStatusResend() throws com.fasterxml.jackson.core.JsonProcessingException, JMSException { Mockito.when(basePacketRepository.getAssignedVerificationRecord(anyString(), anyString())).thenReturn(entities); String response = objectMapper.writeValueAsString(resp); - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(response.getBytes()); - amq.setContent(byteSeq); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + amq.setText(response); + Mockito.when(amq.getText()).thenReturn(response); // for resend resp.setReturnValue(2); @@ -485,16 +483,15 @@ public void testUpdateStatusResend() throws com.fasterxml.jackson.core.JsonProce @Test @Ignore - public void testUpdateStatusRejected() throws JsonProcessingException, com.fasterxml.jackson.core.JsonProcessingException { + public void testUpdateStatusRejected() throws JsonProcessingException, com.fasterxml.jackson.core.JsonProcessingException, JMSException { Mockito.when(basePacketRepository.getAssignedVerificationRecord(anyString(), anyString())).thenReturn(entities); String response = objectMapper.writeValueAsString(resp); - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(response.getBytes()); - amq.setContent(byteSeq); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + amq.setText(response); + Mockito.when(amq.getText()).thenReturn(response); // for rejected resp.setReturnValue(3); diff --git a/registration-processor/core-processor/registration-processor-verification-stage/src/test/java/io/mosip/registration/processor/verification/stage/VerificationStageTest.java b/registration-processor/core-processor/registration-processor-verification-stage/src/test/java/io/mosip/registration/processor/verification/stage/VerificationStageTest.java index 2b3fdfc3b6e..47d9f24ec5d 100644 --- a/registration-processor/core-processor/registration-processor-verification-stage/src/test/java/io/mosip/registration/processor/verification/stage/VerificationStageTest.java +++ b/registration-processor/core-processor/registration-processor-verification-stage/src/test/java/io/mosip/registration/processor/verification/stage/VerificationStageTest.java @@ -24,8 +24,9 @@ import io.vertx.ext.web.FileUpload; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; +import org.aspectj.apache.bcel.util.ByteSequence; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -38,6 +39,7 @@ import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.util.ReflectionTestUtils; +import javax.jms.JMSException; import java.io.File; import static org.junit.Assert.assertTrue; @@ -106,7 +108,7 @@ public Integer getPort() { } }; @Before - public void setUp() throws java.io.IOException, ApisResourceAccessException, PacketManagerException, JsonProcessingException { + public void setUp() throws java.io.IOException, ApisResourceAccessException, PacketManagerException, JsonProcessingException, JMSException { ReflectionTestUtils.setField(verificationstage, "mosipConnectionFactory", mosipConnectionFactory); ReflectionTestUtils.setField(verificationstage, "mosipQueueManager", mosipQueueManager); //ReflectionTestUtils.setField(manualverificationstage, "contextPath", "/registrationprocessor/v1/manualverification"); @@ -152,7 +154,7 @@ public void testAllProcess() { } @Test - public void testConsumeListener() throws JsonProcessingException { + public void testConsumeListener() throws JsonProcessingException, JMSException { ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory .getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME); final Appender mockAppender = mock(Appender.class); @@ -167,10 +169,9 @@ public void testConsumeListener() throws JsonProcessingException { String response = JsonUtils.javaObjectToJsonString(resp); - ActiveMQBytesMessage amq = new ActiveMQBytesMessage(); - ByteSequence byteSeq = new ByteSequence(); - byteSeq.setData(response.getBytes()); - amq.setContent(byteSeq); + ActiveMQTextMessage amq = Mockito.mock(ActiveMQTextMessage.class); + amq.setText(response); + Mockito.when(amq.getText()).thenReturn(response); Mockito.when(verificationService.updatePacketStatus(any(), any(), any())).thenReturn(true); diff --git a/registration-processor/registration-processor-core/pom.xml b/registration-processor/registration-processor-core/pom.xml index 41f0268f633..303884f2de9 100644 --- a/registration-processor/registration-processor-core/pom.xml +++ b/registration-processor/registration-processor-core/pom.xml @@ -35,25 +35,6 @@ vertx-web ${vertx.version} - - org.apache.hadoop - hadoop-client - 2.8.1 - - - org.slf4j - slf4j-api - - - org.slf4j - slf4j-log4j12 - - - com.google.code.gson - gson - - - org.springframework.boot spring-boot-starter-actuator @@ -64,14 +45,9 @@ vertx-health-check ${vertx.version} - - org.apache.activemq - activemq-core - 5.7.0 - org.springframework.boot - spring-boot-starter-activemq + spring-boot-starter-artemis io.vertx @@ -131,6 +107,10 @@ dom4j dom4j + + commons-beanutils + commons-beanutils + diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/StageHealthCheckHandler.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/StageHealthCheckHandler.java index 9f02ca4822b..178a6537deb 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/StageHealthCheckHandler.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/StageHealthCheckHandler.java @@ -23,16 +23,10 @@ import io.mosip.registration.processor.core.queue.factory.MosipActiveMq; import io.mosip.registration.processor.core.queue.factory.MosipQueue; import io.mosip.registration.processor.core.queue.factory.QueueListener; -import io.mosip.registration.processor.core.queue.impl.TransportExceptionListener; import io.mosip.registration.processor.core.spi.queue.MosipQueueConnectionFactory; import io.mosip.registration.processor.core.spi.queue.MosipQueueManager; -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.UserGroupInformation; +import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.env.Environment; @@ -88,9 +82,6 @@ public class StageHealthCheckHandler implements HealthCheckHandler { private int virusScannerPort; private File currentWorkingDirPath; private VirusScanner virusScanner; - private FileSystem configuredFileSystem; - private Path hadoopLibPath; - private static final String HADOOP_HOME = "hadoop-lib"; private static final String WIN_UTIL = "winutils.exe"; private static final String CLASSPATH_PREFIX = "classpath:"; private static final int THRESHOLD = 10485760; @@ -148,7 +139,6 @@ public StageHealthCheckHandler register(String name, long timeout, Handler promise, MosipQueueManager mosipQueueManager, MosipQueueConnectionFactory mosipConnectionFactory) { - try { final String msg = "Ping"; @@ -156,7 +146,7 @@ public void queueHealthChecker(Promise promise, MosipQueueManager promise, String msg) { - String res = new String(((ActiveMQBytesMessage) message).getContent().data); + public void consumerListener(Message message, Promise promise, String msg) throws JMSException { + String res = ((ActiveMQTextMessage) message).getText(); if (res == null || !msg.equalsIgnoreCase(res)) { final JsonObject result = resultBuilder.create().add(HealthConstant.ERROR, "Could not read response from queue").build(); promise.complete(Status.KO(result)); } final JsonObject result = resultBuilder.create().add(HealthConstant.RESPONSE, res).build(); - promise.complete(Status.OK(result)); - } - - /** - * @param configuration - * @return - * @throws Exception - */ - private void initSecurityConfiguration(Configuration configuration) throws Exception { - configuration.set("dfs.data.transfer.protection", "authentication"); - configuration.set("hadoop.security.authentication", "kerberos"); - InputStream krbStream = getClass().getClassLoader().getResourceAsStream("krb5.conf"); - File krbPath = FileUtils.getFile(hadoopLibPath.toString(), "krb5.conf"); - FileUtils.copyInputStreamToFile(krbStream, krbPath); - System.setProperty("java.security.krb5.conf", krbPath.toString()); - UserGroupInformation.setConfiguration(configuration); - } - - /** - * @param user - * @param keytabPath - * @throws Exception - */ - private void loginWithKeyTab(String user, String keytabPath) throws Exception { - File keyPath = null; - Resource resource = resourceLoader.getResource(keytabPath); - File dataPath = FileUtils.getFile(hadoopLibPath.toString(), "data"); - boolean created = dataPath.mkdir(); - if (resource.exists() && created) { - keyPath = FileUtils.getFile(dataPath.toString(), resource.getFilename()); - FileUtils.copyInputStreamToFile(resource.getInputStream(), keyPath); - } else { - throw new Exception("KEYTAB_FILE_NOT_FOUND_EXCEPTION: " + keytabPath); - } - try { - UserGroupInformation.loginUserFromKeytab(user, keyPath.toString()); - } catch (IOException e) { - throw new Exception("LOGIN_EXCEPTION", e); - } + promise.complete(Status.OK(result)); } /** diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipActiveMq.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipActiveMq.java index 41a84aee361..ca39dad03dc 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipActiveMq.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipActiveMq.java @@ -1,6 +1,8 @@ package io.mosip.registration.processor.core.queue.factory; -import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; + +import javax.jms.JMSException; public class MosipActiveMq extends MosipQueue{ @@ -10,12 +12,12 @@ public class MosipActiveMq extends MosipQueue{ private String brokerUrl; private ActiveMQConnectionFactory activeMQConnectionFactory; - public MosipActiveMq(String queueName, String username, String password, String brokerUrl) { + public MosipActiveMq(String queueName, String username, String password, String brokerUrl) throws JMSException { this.queueName = queueName; this.username = username; this.password = password; this.brokerUrl = brokerUrl; - createConnection(username, password, brokerUrl); + createConnection(brokerUrl, username, password); } public String getUsername() { @@ -35,8 +37,11 @@ public ActiveMQConnectionFactory getActiveMQConnectionFactory() { } @Override - public void createConnection(String username, String password, String brokerUrl) { - this.activeMQConnectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl); + public void createConnection(String brokerUrl, String username, String password) throws JMSException { + if (activeMQConnectionFactory == null || activeMQConnectionFactory.getServerLocator() == null || activeMQConnectionFactory.getServerLocator().isClosed()) { + this.activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl, username, password); + activeMQConnectionFactory.createConnection(); + } } @Override diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipQueue.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipQueue.java index c994006f9cd..ae0b7dc72c6 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipQueue.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipQueue.java @@ -1,8 +1,10 @@ package io.mosip.registration.processor.core.queue.factory; +import javax.jms.JMSException; + public abstract class MosipQueue{ - public abstract void createConnection(String username, String password, String brokerUrl); + public abstract void createConnection(String username, String password, String brokerUrl) throws JMSException; public abstract String getQueueName(); diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipQueueConnectionFactoryImpl.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipQueueConnectionFactoryImpl.java index 86131219c7c..4c300a21823 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipQueueConnectionFactoryImpl.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/factory/MosipQueueConnectionFactoryImpl.java @@ -2,11 +2,13 @@ import io.mosip.registration.processor.core.spi.queue.MosipQueueConnectionFactory; +import javax.jms.JMSException; + public class MosipQueueConnectionFactoryImpl implements MosipQueueConnectionFactory { @Override public MosipQueue createConnection(String typeOfQueue, String username, String password, - String url) { + String url) throws JMSException { if(typeOfQueue.equalsIgnoreCase("ACTIVEMQ")) { return new MosipActiveMq(typeOfQueue, username, password, url); } diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/impl/MosipActiveMqImpl.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/impl/MosipActiveMqImpl.java index 4a54c65f1ae..f4cc7c3850b 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/impl/MosipActiveMqImpl.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/impl/MosipActiveMqImpl.java @@ -12,8 +12,8 @@ import io.mosip.registration.processor.core.queue.impl.exception.InvalidConnectionException; import io.mosip.registration.processor.core.queue.impl.exception.QueueConnectionException; import io.mosip.registration.processor.core.spi.queue.MosipQueueManager; -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.commons.lang.exception.ExceptionUtils; import org.springframework.beans.factory.annotation.Value; @@ -42,7 +42,7 @@ public class MosipActiveMqImpl implements MosipQueueManager private Connection connection; private Session session; - private Destination destination; + private Destination destination = null; private static final String LINE_SEPERATOR = "----------------"; @Value("${registration.processor.queue.connection.retry.count:10}") private int retryCount; @@ -56,14 +56,12 @@ private void setup(MosipActiveMq mosipActiveMq) { regProcLogger.debug(LINE_SEPERATOR, LINE_SEPERATOR, "In ActiveMq setUp ", LINE_SEPERATOR); try { ActiveMQConnection activemQConn = (ActiveMQConnection) connection; - if (activemQConn == null || activemQConn.isClosed()) { + if (activemQConn == null || !activemQConn.isStarted()) { regProcLogger.debug(LINE_SEPERATOR, LINE_SEPERATOR, "-----INITIAL CONNECTION-----", LINE_SEPERATOR + this.connection); regProcLogger.debug(LINE_SEPERATOR, LINE_SEPERATOR, "-----INITIAL SESSION-----", LINE_SEPERATOR + this.session); connection = mosipActiveMq.getActiveMQConnectionFactory().createConnection(); - activemQConn = (ActiveMQConnection) connection; - activemQConn.addTransportListener(new TransportExceptionListener()); if (session == null) { connection.start(); this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -149,9 +147,13 @@ public Boolean send(MosipQueue mosipQueue, String message, String address, int m initialSetup(mosipQueue); try { // fix for activemq connection issue - if (session == null) + if (session == null) { initialSetup(mosipQueue); - destination = session.createQueue(address); + } + + if (destination == null) { + destination = session.createQueue(address); + } MessageProducer messageProducer = session.createProducer(destination); TextMessage textMessage = session.createTextMessage(); textMessage.setText(message); diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/impl/TransportExceptionListener.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/impl/TransportExceptionListener.java index 2de70b55314..7fa8cc347a3 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/impl/TransportExceptionListener.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/queue/impl/TransportExceptionListener.java @@ -1,3 +1,4 @@ +/* package io.mosip.registration.processor.core.queue.impl; import java.io.IOException; @@ -7,11 +8,13 @@ import io.mosip.kernel.core.exception.ExceptionUtils; import io.mosip.kernel.core.logger.spi.Logger; import io.mosip.registration.processor.core.logger.RegProcessorLogger; +*/ /** * * @author Girish Yarru * - */ + *//* + public class TransportExceptionListener implements TransportListener{ private static Logger regProcLogger = RegProcessorLogger.getLogger(TransportExceptionListener.class); private static final String LINE_SEPERATOR = "----------------"; @@ -41,3 +44,4 @@ public void transportResumed() { } } +*/ diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/spi/queue/MosipQueueConnectionFactory.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/spi/queue/MosipQueueConnectionFactory.java index 631afe3593b..84846a9232e 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/spi/queue/MosipQueueConnectionFactory.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/spi/queue/MosipQueueConnectionFactory.java @@ -1,5 +1,7 @@ package io.mosip.registration.processor.core.spi.queue; +import javax.jms.JMSException; + /** * @author Pranav Kumar * @@ -16,6 +18,6 @@ public interface MosipQueueConnectionFactory { * @param Url Url of installation path * @return */ - public Q createConnection(String typeOfQueue, String username, String password, String Url); + public Q createConnection(String typeOfQueue, String username, String password, String Url) throws JMSException; } diff --git a/registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.java b/registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.java index a803d2730d7..d0e72f16eb2 100644 --- a/registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.java +++ b/registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.java @@ -61,6 +61,8 @@ import io.mosip.registration.processor.status.entity.RegistrationStatusEntity; import lombok.Data; +import javax.jms.JMSException; + /** * The Class Utilities. * @@ -439,7 +441,7 @@ public boolean isUinMissingFromIdAuth(String errorCode, String id, String idType * @throws RegistrationProcessorCheckedException * the registration processor checked exception */ - public List getAbisQueueDetails() throws RegistrationProcessorCheckedException { + public List getAbisQueueDetails() throws RegistrationProcessorCheckedException, JMSException { List abisQueueDetailsList = new ArrayList<>(); String registrationProcessorAbis = Utilities.getJson(configServerFileStorageURL, registrationProcessorAbisJson); JSONObject regProcessorAbisJson; @@ -464,7 +466,7 @@ public List getAbisQueueDetails() throws RegistrationProcessor String queueName = validateAbisQueueJsonAndReturnValue(json, NAME); int inboundMessageTTL = validateAbisQueueJsonAndReturnIntValue(json, INBOUNDMESSAGETTL); MosipQueue mosipQueue = mosipConnectionFactory.createConnection(typeOfQueue, userName, password, - failOverBrokerUrl); + brokerUrl); if (mosipQueue == null) throw new QueueConnectionNotFound( PlatformErrorMessages.RPR_PIS_ABIS_QUEUE_CONNECTION_NULL.getMessage());