diff --git a/src/main/java/com/uci/outbound/controller/MessageController.java b/src/main/java/com/uci/outbound/controller/MessageController.java index 43960c9..c74ad9d 100644 --- a/src/main/java/com/uci/outbound/controller/MessageController.java +++ b/src/main/java/com/uci/outbound/controller/MessageController.java @@ -1,11 +1,14 @@ package com.uci.outbound.controller; import com.fasterxml.jackson.databind.JsonNode; +import com.uci.adapter.provider.factory.IProvider; +import com.uci.adapter.provider.factory.ProviderFactory; +import com.uci.dao.models.XMessageDAO; +import com.uci.dao.repository.XMessageRepository; +import com.uci.dao.utils.XMessageDAOUtils; import com.uci.outbound.consumers.OutboundKafkaController; import com.uci.outbound.model.MessageRequest; import com.uci.utils.BotService; -import com.uci.utils.model.ApiResponse; -import com.uci.utils.model.ApiResponseParams; import com.uci.utils.model.HttpApiResponse; import lombok.extern.slf4j.Slf4j; import messagerosa.core.model.*; @@ -15,10 +18,10 @@ import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Mono; -import javax.ws.rs.BadRequestException; import java.sql.Timestamp; import java.time.LocalDateTime; import java.util.*; +import java.util.function.Consumer; import java.util.function.Function; @Slf4j @@ -31,6 +34,12 @@ public class MessageController { @Autowired public OutboundKafkaController outboundService; + @Autowired + private ProviderFactory factoryProvider; + + @Autowired + private XMessageRepository xMessageRepo; + @RequestMapping(value = "/send", method = RequestMethod.POST, produces = {"application/json", "text/json"}) public Mono> sendMessage(@RequestBody MessageRequest request) { HttpApiResponse response = HttpApiResponse.builder() @@ -64,9 +73,9 @@ public Mono> sendMessage(@RequestBody MessageReq XMessagePayload payload = request.payload; return botService.getAdapterByID(request.getAdapterId()) - .map(new Function>(){ + .map(new Function>>(){ @Override - public ResponseEntity apply(JsonNode adapter) { + public Mono> apply(JsonNode adapter) { XMessage xmsg = new XMessage().builder() .app("Global Outbound Bot") .adapterId(request.getAdapterId()) @@ -93,7 +102,7 @@ public ResponseEntity apply(JsonNode adapter) { response.setStatus(HttpStatus.BAD_REQUEST.value()); response.setError(HttpStatus.BAD_REQUEST.getReasonPhrase()); response.setMessage("Media is allowed only for gupshup whatsapp & netcore whatsapp adapter."); - return ResponseEntity.badRequest().body(response); + return Mono.just(ResponseEntity.badRequest().body(response)); } /* Template id required check for cdac sms adapter */ @@ -103,7 +112,7 @@ public ResponseEntity apply(JsonNode adapter) { response.setStatus(HttpStatus.BAD_REQUEST.value()); response.setError(HttpStatus.BAD_REQUEST.getReasonPhrase()); response.setMessage("Template id in meta of to is required for firebase adapter messaging."); - return ResponseEntity.badRequest().body(response); + return Mono.just(ResponseEntity.badRequest().body(response)); } else { HashMap transformerMeta = new HashMap<>(); transformerMeta.put("templateId", request.getTo().getMeta().get("templateId")); @@ -122,17 +131,69 @@ public ResponseEntity apply(JsonNode adapter) { response.setStatus(HttpStatus.BAD_REQUEST.value()); response.setError(HttpStatus.BAD_REQUEST.getReasonPhrase()); response.setMessage("FCM token in meta of to is required for firebase adapter messaging."); - return ResponseEntity.badRequest().body(response); + return Mono.just(ResponseEntity.badRequest().body(response)); } try { - outboundService.sendOutboundMessage(xmsg); - response.setMessage("Message processed."); + String channel = xmsg.getChannelURI(); + String provider = xmsg.getProviderURI(); + IProvider iprovider = factoryProvider.getProvider(provider, channel); + return iprovider.processOutBoundMessageF(xmsg) + .doOnError(new Consumer() { + @Override + public void accept(Throwable e) { + log.error("Exception in processOutBoundMessageF:"+e.getMessage()); + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value()); + response.setError(HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase()); + response.setMessage("Exception while sending message: "+e.getMessage()); + } + }).map(new Function>>() { + @Override + public Mono> apply(XMessage xMessage) { + XMessageDAO dao = XMessageDAOUtils.convertXMessageToDAO(xMessage); + + return xMessageRepo + .insert(dao) + .doOnError(new Consumer() { + @Override + public void accept(Throwable e) { + log.error("Exception in xMsg Dao Save:"+e.getMessage()); + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value()); + response.setError(HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase()); + response.setMessage("Exception in saving xMessage: "+e.getMessage()); + } + }) + .map(new Function>() { + @Override + public ResponseEntity apply(XMessageDAO xMessageDAO) { + log.info("XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId()); + response.setMessage("Message sent."); + Map resultNode = new HashMap<>(); + resultNode.put("messageId", xMessageDAO.getMessageId()); + response.setResult(resultNode); + return ResponseEntity.ok(response); + } + }); + } + }).flatMap(new Function>, Mono>>() { + @Override + public Mono> apply(Mono> n) { + return n; + } + }); } catch (Exception e) { log.error("Exception while sending outbound message: "+e.getMessage()); + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value()); + response.setError(HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase()); + response.setMessage("Exception while sending message: "+e.getMessage()); } - return ResponseEntity.ok(response); + return Mono.just(ResponseEntity.internalServerError().body(response)); + } + }).flatMap(new Function>, Mono>>() { + @Override + public Mono> apply(Mono> n) { + return n; } }); } diff --git a/src/main/java/com/uci/outbound/health/ServiceStatusController.java b/src/main/java/com/uci/outbound/health/ServiceStatusController.java index ee7deab..34789c9 100644 --- a/src/main/java/com/uci/outbound/health/ServiceStatusController.java +++ b/src/main/java/com/uci/outbound/health/ServiceStatusController.java @@ -28,6 +28,26 @@ public class ServiceStatusController { @Autowired private HealthService healthService; + /** + * In use by sunbird team - to check service liveliness & readliness + * @return + * @throws JsonProcessingException + */ + @RequestMapping(value = "/health", method = RequestMethod.GET, produces = { "application/json", "text/json" }) + public ResponseEntity statusCheck() throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode resultNode = mapper.readTree("{\"healthy\":true}"); + + ApiResponse response = ApiResponse.builder() + .id("api.service.health.cassandra") + .params(ApiResponseParams.builder().build()) + .responseCode(HttpStatus.OK.name()) + .result(resultNode) + .build(); + + return ResponseEntity.ok(response); + } + @RequestMapping(value = "/health/cassandra", method = RequestMethod.GET, produces = { "application/json", "text/json" }) public ResponseEntity cassandraStatusCheck() throws IOException, JsonProcessingException { ApiResponse response = ApiResponse.builder()