From 33151ddece1b15dc8d347e9b99de296d7e71787c Mon Sep 17 00:00:00 2001 From: ChetanT-System Date: Mon, 1 Apr 2024 22:39:43 +0530 Subject: [PATCH 1/2] - PCF exchange service implementation --- .../AsyncPushPCFDataForApproveRequest.java | 142 +++++++++++ .../service/impl/PCFRepositoryService.java | 149 ++++++++++++ .../service/impl/PcfExchangeServiceImpl.java | 220 ++++++++++++++++++ .../service/impl/ProxyRequestInterface.java | 163 +++++++++++++ 4 files changed, 674 insertions(+) create mode 100644 modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/AsyncPushPCFDataForApproveRequest.java create mode 100644 modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/PCFRepositoryService.java create mode 100644 modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/PcfExchangeServiceImpl.java create mode 100644 modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/ProxyRequestInterface.java diff --git a/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/AsyncPushPCFDataForApproveRequest.java b/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/AsyncPushPCFDataForApproveRequest.java new file mode 100644 index 000000000..6a1169a2a --- /dev/null +++ b/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/AsyncPushPCFDataForApproveRequest.java @@ -0,0 +1,142 @@ +/******************************************************************************** + * Copyright (c) 2024 T-Systems International GmbH + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +package org.eclipse.tractusx.sde.pcfexchange.service.impl; + +import java.util.List; + +import org.eclipse.tractusx.sde.common.entities.PolicyModel; +import org.eclipse.tractusx.sde.common.exception.NoDataFoundException; +import org.eclipse.tractusx.sde.common.model.PagingResponse; +import org.eclipse.tractusx.sde.common.utils.PolicyOperationUtil; +import org.eclipse.tractusx.sde.pcfexchange.enums.PCFRequestStatusEnum; +import org.eclipse.tractusx.sde.pcfexchange.enums.PCFTypeEnum; +import org.eclipse.tractusx.sde.pcfexchange.request.PcfRequestModel; +import org.eclipse.tractusx.sde.submodels.pcf.entity.PcfEntity; +import org.eclipse.tractusx.sde.submodels.pcf.service.PcfService; +import org.springframework.stereotype.Component; + +import com.google.gson.JsonObject; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +@SuppressWarnings("unchecked") +public class AsyncPushPCFDataForApproveRequest { + + private final PCFRepositoryService pcfRepositoryService; + + private final PcfService pcfService; + + private final ProxyRequestInterface proxyRequestInterface; + + + public void pushPCFDataForApproveRequest(String processId, PolicyModel policy) { + + List accessBPNList = PolicyOperationUtil.getAccessBPNList(policy); + + List productList = pcfService.readCreatedTwins(processId).stream().map(PcfEntity::getProductId) + .toList(); + + markedPCFDataForPendingProviderRequestAsRequested(productList); + + PagingResponse pcfData = pcfRepositoryService.getPcfData( + List.of(PCFRequestStatusEnum.PUSHED, PCFRequestStatusEnum.PUSHED_UPDATED_DATA), PCFTypeEnum.PROVIDER, 0, + 1000); + List requestList = (List) pcfData.getItems(); + + if (!requestList.isEmpty()) { + + requestList.forEach(request -> { + + if (productList.contains(request.getProductId()) && accessBPNList.contains(request.getBpnNumber())) { + + String msg = ""; + + try { + request.setStatus(PCFRequestStatusEnum.PUSHING_UPDATED_DATA); + + JsonObject calculatedPCFValue = pcfService + .readCreatedTwinsDetailsByProductId(request.getProductId()).get("json") + .getAsJsonObject(); + + PCFRequestStatusEnum status = pcfRepositoryService.identifyRunningStatus(request.getRequestId(), + request.getStatus()); + + // push api call + Runnable runnable = () -> proxyRequestInterface.sendNotificationToConsumer(status, + calculatedPCFValue, request.getProductId(), request.getBpnNumber(), + request.getRequestId()); + + new Thread(runnable).start(); + + msg = "PCF request '" + request.getStatus() + + "' and asynchronously sending notification to consumer"; + + } catch (NoDataFoundException e) { + msg = "Unable to take action on PCF request becasue pcf calculated value does not exist, please provide/upload PCF value for " + + request.getProductId() + ", requestId " + request.getRequestId(); + log.error("Async PushPCFDataForApproveRequest" + msg); + throw new NoDataFoundException(msg); + } catch (Exception e) { + pcfRepositoryService.savePcfStatus(request.getRequestId(), PCFRequestStatusEnum.FAILED); + } + } + }); + } else { + log.debug("Async PushPCFDataForApproveRequest - No APPROVED request found"); + } + + } + + public void markedPCFDataForPendingProviderRequestAsRequested(List productList) { + + PagingResponse pcfData = pcfRepositoryService.getPcfData(List.of(PCFRequestStatusEnum.PENDING_DATA_FROM_PROVIDER), + PCFTypeEnum.PROVIDER, 0, 1000); + List requestList = (List) pcfData.getItems(); + + if (!requestList.isEmpty()) { + requestList.forEach(request -> { + if (productList.contains(request.getProductId())) { + String msg = ""; + try { + pcfService.readCreatedTwinsDetailsByProductId(request.getProductId()).get("json") + .getAsJsonObject(); + pcfRepositoryService.savePcfStatus(request.getRequestId(), PCFRequestStatusEnum.REQUESTED); + + } catch (NoDataFoundException e) { + msg = "Unable to markedPCFDataForPendingProviderRequestAsRequested becasue pcf calculated value does not exist " + + request.getProductId() + ", requestId " + request.getRequestId(); + log.error("Async PushPCFDataForApproveRequest" + msg); + throw new NoDataFoundException(msg); + } catch (Exception e) { + pcfRepositoryService.savePcfStatus(request.getRequestId(), PCFRequestStatusEnum.FAILED); + } + } + }); + } else { + log.debug("Async PushPCFDataForApproveRequest - No 'PENDING_DATA_FROM_PROVIDER' request found"); + } + + } +} diff --git a/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/PCFRepositoryService.java b/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/PCFRepositoryService.java new file mode 100644 index 000000000..4a51743ce --- /dev/null +++ b/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/PCFRepositoryService.java @@ -0,0 +1,149 @@ +/******************************************************************************** + * Copyright (c) 2024 T-Systems International GmbH + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +package org.eclipse.tractusx.sde.pcfexchange.service.impl; + +import java.time.Instant; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.eclipse.tractusx.sde.common.model.PagingResponse; +import org.eclipse.tractusx.sde.pcfexchange.entity.PcfRequestEntity; +import org.eclipse.tractusx.sde.pcfexchange.enums.PCFRequestStatusEnum; +import org.eclipse.tractusx.sde.pcfexchange.enums.PCFTypeEnum; +import org.eclipse.tractusx.sde.pcfexchange.mapper.PcfExchangeMapper; +import org.eclipse.tractusx.sde.pcfexchange.repository.PcfRequestRepository; +import org.eclipse.tractusx.sde.pcfexchange.request.PcfRequestModel; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class PCFRepositoryService { + + private static final String SUCCESS = "SUCCESS"; + private final PcfRequestRepository pcfRequestRepository; + private final PcfExchangeMapper pcfMapper; + + public PcfRequestModel savePcfRequestData(String requestId, String productId, String bpnNumber, String message, + PCFTypeEnum type, PCFRequestStatusEnum status, String remark) { + + PcfRequestModel pojo = PcfRequestModel.builder().requestId(requestId).productId(productId).bpnNumber(bpnNumber) + .status(status).message(message).type(type).requestedTime(Instant.now().getEpochSecond()) + .lastUpdatedTime(Instant.now().getEpochSecond()).remark(remark).build(); + + PcfRequestEntity entity = pcfMapper.mapFrom(pojo); + + return pcfMapper.mapFrom(pcfRequestRepository.save(entity)); + } + + public PCFRequestStatusEnum updatePCFPushStatus(PCFRequestStatusEnum status, String requestId, String sendNotificationStatus) { + + if ((PCFRequestStatusEnum.APPROVED.equals(status) || PCFRequestStatusEnum.PUSHING_DATA.equals(status)) + && SUCCESS.equalsIgnoreCase(sendNotificationStatus)) { + status = PCFRequestStatusEnum.PUSHED; + sendNotificationStatus ="PCF data successfuly pushed"; + } else if (PCFRequestStatusEnum.PUSHING_UPDATED_DATA.equals(status) + && SUCCESS.equalsIgnoreCase(sendNotificationStatus)) { + status = PCFRequestStatusEnum.PUSHED_UPDATED_DATA; + sendNotificationStatus ="PCF updated data successfuly pushed"; + } else if (PCFRequestStatusEnum.REJECTED.equals(status) && SUCCESS.equalsIgnoreCase(sendNotificationStatus)) { + status = PCFRequestStatusEnum.REJECTED; + sendNotificationStatus ="PCF request rejected successfuly"; + } else if (PCFRequestStatusEnum.APPROVED.equals(status) + || PCFRequestStatusEnum.FAILED_TO_PUSH_DATA.equals(status) + || PCFRequestStatusEnum.PUSHING_DATA.equals(status) + || PCFRequestStatusEnum.PUSHING_UPDATED_DATA.equals(status)) { + status = PCFRequestStatusEnum.FAILED_TO_PUSH_DATA; + } else if (PCFRequestStatusEnum.REJECTED.equals(status) + || PCFRequestStatusEnum.FAILED_TO_SEND_REJECT_NOTIFICATION.equals(status)) + status = PCFRequestStatusEnum.FAILED_TO_SEND_REJECT_NOTIFICATION; + else { + status = PCFRequestStatusEnum.FAILED; + } + + savePcfStatus(requestId, status, sendNotificationStatus); + + return status; + } + + public PCFRequestStatusEnum identifyRunningStatus(String requestId, PCFRequestStatusEnum status) { + + boolean isApproval = PCFRequestStatusEnum.APPROVED.equals(status) + || PCFRequestStatusEnum.FAILED_TO_PUSH_DATA.equals(status); + + boolean isRejection = PCFRequestStatusEnum.REJECTED.equals(status) + || PCFRequestStatusEnum.FAILED_TO_SEND_REJECT_NOTIFICATION.equals(status); + + if (isApproval) { + status = PCFRequestStatusEnum.PUSHING_DATA; + } else if (isRejection) { + status = PCFRequestStatusEnum.SENDING_REJECT_NOTIFICATION; + } + + savePcfStatus(requestId, status); + + return status; + } + + @SneakyThrows + public PcfRequestEntity savePcfStatus(String requestId, PCFRequestStatusEnum status) { + return savePcfStatus(requestId, status, null); + } + + @SneakyThrows + public PcfRequestEntity savePcfStatus(String requestId, PCFRequestStatusEnum status, String remark) { + + PcfRequestEntity pcfRequestEntity = pcfRequestRepository.getReferenceById(requestId); + pcfRequestEntity.setLastUpdatedTime(Instant.now().getEpochSecond()); + pcfRequestEntity.setStatus(status); + + if(StringUtils.isNotBlank(remark)) + pcfRequestEntity.setRemark(remark); + + log.info("'" + pcfRequestEntity.getProductId() + "' pcf request saved in the database successfully as {}", + status); + pcfRequestRepository.save(pcfRequestEntity); + return pcfRequestEntity; + + } + + public PagingResponse getPcfData(List status, PCFTypeEnum type, Integer page, Integer pageSize) { + + Page result = null; + if (status == null || status.isEmpty()) { + result = pcfRequestRepository.findByType(PageRequest.of(page, pageSize), type); + } else { + result = pcfRequestRepository.findByTypeAndStatusIn(PageRequest.of(page, pageSize), type, status); + } + + List requestList = result.stream().map(pcfMapper::mapFrom).toList(); + + return PagingResponse.builder().items(requestList).pageSize(result.getSize()).page(result.getNumber()) + .totalItems(result.getTotalElements()).build(); + } + +} diff --git a/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/PcfExchangeServiceImpl.java b/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/PcfExchangeServiceImpl.java new file mode 100644 index 000000000..54c09594d --- /dev/null +++ b/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/PcfExchangeServiceImpl.java @@ -0,0 +1,220 @@ +/******************************************************************************** + * Copyright (c) 2024 T-Systems International GmbH + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ +package org.eclipse.tractusx.sde.pcfexchange.service.impl; + +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import org.apache.commons.lang3.StringUtils; +import org.eclipse.tractusx.sde.common.entities.PolicyModel; +import org.eclipse.tractusx.sde.common.exception.NoDataFoundException; +import org.eclipse.tractusx.sde.common.exception.ServiceException; +import org.eclipse.tractusx.sde.common.exception.ValidationException; +import org.eclipse.tractusx.sde.common.model.PagingResponse; +import org.eclipse.tractusx.sde.edc.model.request.ConsumerRequest; +import org.eclipse.tractusx.sde.edc.model.response.QueryDataOfferModel; +import org.eclipse.tractusx.sde.edc.util.EDCAssetUrlCacheService; +import org.eclipse.tractusx.sde.pcfexchange.entity.PcfResponseEntity; +import org.eclipse.tractusx.sde.pcfexchange.enums.PCFRequestStatusEnum; +import org.eclipse.tractusx.sde.pcfexchange.enums.PCFTypeEnum; +import org.eclipse.tractusx.sde.pcfexchange.repository.PcfReqsponseRepository; +import org.eclipse.tractusx.sde.pcfexchange.request.PcfRequestModel; +import org.eclipse.tractusx.sde.pcfexchange.service.IPCFExchangeService; +import org.eclipse.tractusx.sde.submodels.pcf.service.PcfService; +import org.springframework.stereotype.Service; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.gson.JsonObject; + +import feign.FeignException; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +public class PcfExchangeServiceImpl implements IPCFExchangeService { + + private final PCFRepositoryService pcfRepositoryService; + private final PcfReqsponseRepository pcfReqsponseRepository; + private final EDCAssetUrlCacheService edcAssetUrlCacheService; + private final PcfService pcfService; + private final ProxyRequestInterface proxyRequestInterface; + + @SneakyThrows + @Override + public String requestForPcfDataExistingOffer(String productId, ConsumerRequest consumerRequest) { + + StringBuilder sb = new StringBuilder(); + consumerRequest.getOffers().stream().forEach(offer -> { + + String requestId = UUID.randomUUID().toString(); + String providerBPNNumber = offer.getConnectorId(); + + String message = "Please provide PCF value for " + productId; + + pcfRepositoryService.savePcfRequestData(requestId, productId, providerBPNNumber, message, + PCFTypeEnum.CONSUMER, PCFRequestStatusEnum.SENDING_REQUEST, ""); + + QueryDataOfferModel queryDataOfferModel = QueryDataOfferModel.builder().assetId(offer.getAssetId()) + .offerId(offer.getOfferId()).policyId(offer.getPolicyId()).connectorId(providerBPNNumber) + .connectorOfferUrl(offer.getConnectorOfferUrl()) + .policy(PolicyModel.builder().usagePolicies(consumerRequest.getUsagePolicies()).build()).build(); + + proxyRequestInterface.requestToProviderForPCFValue(productId, sb, requestId, message, queryDataOfferModel, false); + }); + + return sb.toString(); + + } + + @Override + public Object requestForPcfNotExistDataOffer(PcfRequestModel pcfRequestModel) { + StringBuilder sb = new StringBuilder(); + String requestId = UUID.randomUUID().toString(); + try { + pcfRepositoryService.savePcfRequestData(requestId, pcfRequestModel.getProductId(), + pcfRequestModel.getBpnNumber(), pcfRequestModel.getMessage(), PCFTypeEnum.CONSUMER, + PCFRequestStatusEnum.SENDING_REQUEST, ""); + + // 1 fetch EDC connectors and DTR Assets from EDC connectors + List pcfExchangeUrlOffers = edcAssetUrlCacheService + .getPCFExchangeUrlFromTwin(pcfRequestModel.getBpnNumber()); + + // 2 request for PCF value for non existing sub model and send notification to + // call provider for data + pcfExchangeUrlOffers.parallelStream().forEach(dtOffer -> proxyRequestInterface.requestToProviderForPCFValue( + pcfRequestModel.getProductId(), sb, requestId, pcfRequestModel.getMessage(), dtOffer, true)); + + } catch (FeignException e) { + log.error("FeignRequest requestForPcfNotExistDataOffer:" + e.request()); + String errorMsg = "Unable to request to data provider because: " + + (StringUtils.isBlank(e.contentUTF8()) ? e.getMessage() : e.contentUTF8()); + log.error("FeignException requestForPcfNotExistDataOffer: " + errorMsg); + } + + if (sb.isEmpty()) + throw new ValidationException("Not requested to provider for '" + pcfRequestModel.getProductId() + + "' because there is no PCF exchange endpoint found"); + + return sb.toString(); + } + + @SneakyThrows + @Override + public String actionOnPcfRequestAndSendNotificationToConsumer(PcfRequestModel pcfRequestModel) { + String remark = ""; + try { + + JsonObject calculatedPCFValue = pcfService + .readCreatedTwinsDetailsByProductId(pcfRequestModel.getProductId()).get("json").getAsJsonObject(); + + + PCFRequestStatusEnum status = pcfRepositoryService.identifyRunningStatus(pcfRequestModel.getRequestId(), + pcfRequestModel.getStatus()); + + // push api call + Runnable runnable = () -> proxyRequestInterface.sendNotificationToConsumer(status, + calculatedPCFValue, pcfRequestModel.getProductId(), pcfRequestModel.getBpnNumber(), + pcfRequestModel.getRequestId()); + + new Thread(runnable).start(); + + remark = "PCF push request accepted for '" + pcfRequestModel.getProductId() + + "' and asynchronously pushing notification to consumer"; + + } catch (NoDataFoundException e) { + remark = "Unable to take action on PCF request becasue PCF calculated value does not exist, please upload PCF value for " + + pcfRequestModel.getProductId() + " in systems using Manual/Recurring Upload"; + pcfRepositoryService.savePcfRequestData(pcfRequestModel.getRequestId(), pcfRequestModel.getProductId(), + pcfRequestModel.getBpnNumber(), pcfRequestModel.getMessage(), PCFTypeEnum.PROVIDER, + PCFRequestStatusEnum.FAILED, remark); + log.error(remark); + throw new ValidationException(e.getMessage()); + } catch (Exception e) { + pcfRepositoryService.savePcfStatus(pcfRequestModel.getRequestId(), PCFRequestStatusEnum.FAILED); + throw new ServiceException(e.getMessage()); + } + return remark; + } + + @Override + public PcfRequestModel savePcfRequestData(String requestId, String productId, String bpnNumber, String message) { + PCFRequestStatusEnum status = PCFRequestStatusEnum.REQUESTED; + String remark = ""; + try { + pcfService.readCreatedTwinsDetailsByProductId(productId).get("json").getAsJsonObject(); + } catch (NoDataFoundException e) { + String msg = "The PCF calculated value does not exist in system, please upload PCF value for '" + productId + + "' in systems using Manual/Recurring Upload"; + log.warn(msg); + remark = msg; + status = PCFRequestStatusEnum.PENDING_DATA_FROM_PROVIDER; + } + return pcfRepositoryService.savePcfRequestData(requestId, productId, bpnNumber, message, PCFTypeEnum.PROVIDER, + status, remark); + } + + @Override + public void recievedPCFData(String productId, String bpnNumber, String requestId, String message, + JsonNode pcfData) { + + PCFRequestStatusEnum status = PCFRequestStatusEnum.FAILED; + try { + status = PCFRequestStatusEnum.valueOf(message); + } catch (Exception e) { + log.error("Unable to find PCF value status " + e.getMessage()); + } + + PcfResponseEntity entity = PcfResponseEntity.builder().pcfData(pcfData).requestId(requestId) + .responseId(UUID.randomUUID().toString()).lastUpdatedTime(Instant.now().getEpochSecond()).build(); + + pcfReqsponseRepository.save(entity); + + if (PCFRequestStatusEnum.APPROVED.equals(status) || PCFRequestStatusEnum.PUSHING_DATA.equals(status) + || PCFRequestStatusEnum.PUSHING_UPDATED_DATA.equals(status)) { + status = PCFRequestStatusEnum.RECEIVED; + } + + pcfRepositoryService.savePcfStatus(requestId, status); + + } + + @Override + public PagingResponse getPcfData(PCFRequestStatusEnum status, PCFTypeEnum type, Integer page, Integer pageSize) { + List statusLs = null; + if (status != null) + statusLs = List.of(status); + return pcfRepositoryService.getPcfData(statusLs, type, page, pageSize); + } + + @Override + public PcfResponseEntity viewForPcfDataOffer(String requestId) { + Optional findById = pcfReqsponseRepository + .findFirstByRequestIdOrderByLastUpdatedTimeDesc(requestId); + if (!findById.isPresent()) + throw new NoDataFoundException("No data found uuid " + requestId); + return findById.get(); + } + +} diff --git a/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/ProxyRequestInterface.java b/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/ProxyRequestInterface.java new file mode 100644 index 000000000..f76bca9db --- /dev/null +++ b/modules/pcf-exchange/src/main/java/org/eclipse/tractusx/sde/pcfexchange/service/impl/ProxyRequestInterface.java @@ -0,0 +1,163 @@ +/******************************************************************************** + * Copyright (c) 2024 T-Systems International GmbH + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ +package org.eclipse.tractusx.sde.pcfexchange.service.impl; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.eclipse.tractusx.sde.common.mapper.JsonObjectMapper; +import org.eclipse.tractusx.sde.edc.model.edr.EDRCachedByIdResponse; +import org.eclipse.tractusx.sde.edc.model.response.QueryDataOfferModel; +import org.eclipse.tractusx.sde.edc.util.EDCAssetUrlCacheService; +import org.eclipse.tractusx.sde.pcfexchange.enums.PCFRequestStatusEnum; +import org.eclipse.tractusx.sde.pcfexchange.proxy.PCFExchangeProxy; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import com.google.gson.JsonObject; + +import feign.FeignException; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ProxyRequestInterface { + + private static final String PRODUCT_IDS = "productIds"; + private static final String SLASH_DELIMETER = "/"; + private final PCFExchangeProxy pcfExchangeProxy; + private final PCFRepositoryService pcfRepositoryService; + private final EDCAssetUrlCacheService edcAssetUrlCacheService; + private final JsonObjectMapper jsonObjectMapper; + + @Value(value = "${manufacturerId}") + private String manufacturerId; + + @Value(value = "${digital-twins.managed.thirdparty:false}") + private boolean managedThirdParty; + + @SneakyThrows + public void requestToProviderForPCFValue(String productId, StringBuilder sb, String requestId, String message, + QueryDataOfferModel dataset, boolean isRequestToNonexistingTwin) { + + EDRCachedByIdResponse edrToken = edcAssetUrlCacheService.verifyAndGetToken(dataset.getConnectorId(), dataset); + + if (!sb.isEmpty()) + sb.append("\n"); + + if (edrToken != null) { + + URI pcfpushEnpoint = null; + + if(isRequestToNonexistingTwin) + pcfpushEnpoint = new URI( + edrToken.getEndpoint() + SLASH_DELIMETER + PRODUCT_IDS + SLASH_DELIMETER + productId); + else + pcfpushEnpoint = new URI(edrToken.getEndpoint()); + + Map header = new HashMap<>(); + header.put(edrToken.getAuthKey(), edrToken.getAuthCode()); + + // Send request to data provider for PCF value push + pcfExchangeProxy.getPcfByProduct(pcfpushEnpoint, header, manufacturerId, + requestId, message); + + sb.append(productId + ": requested for PCF value"); + pcfRepositoryService.savePcfStatus(requestId, PCFRequestStatusEnum.REQUESTED); + } else { + sb.append(productId + ": Unable to request for PCF value becasue the EDR token status is null"); + log.warn("EDC connector " + dataset.getConnectorOfferUrl() + ": {},{},{}", requestId, productId, + "Unable to request for PCF value becasue the EDR token status is null"); + pcfRepositoryService.savePcfStatus(requestId, PCFRequestStatusEnum.FAILED); + } + } + + @SneakyThrows + public void sendNotificationToConsumer(PCFRequestStatusEnum status, JsonObject calculatedPCFValue, + String productId, String bpnNumber, String requestId) { + + // 1 fetch EDC connectors and DTR Assets from EDC connectors + List pcfExchangeUrlOffers = edcAssetUrlCacheService.getPCFExchangeUrlFromTwin(bpnNumber); + + // 2 lookup shell for PCF sub model and send notification to call consumer + // request + if(pcfExchangeUrlOffers.isEmpty()) { + pcfRepositoryService.updatePCFPushStatus(status, requestId, "Unable to find PCF exchange endpoint"); + } + else { + pcfExchangeUrlOffers.parallelStream().forEach(dtOffer -> { + + if (PCFRequestStatusEnum.SENDING_REJECT_NOTIFICATION.equals(status)) { + sendNotification(null, productId, bpnNumber, requestId, dtOffer, status); + } else { + sendNotification(calculatedPCFValue, productId, bpnNumber, requestId, dtOffer, status); + } + + }); + } + } + + @SneakyThrows + private void sendNotification(JsonObject calculatedPCFValue, String productId, String bpnNumber, String requestId, + QueryDataOfferModel dtOffer, PCFRequestStatusEnum status) { + String sendNotificationStatus = ""; + try { + String message = status.name(); + + EDRCachedByIdResponse edrToken = edcAssetUrlCacheService.verifyAndGetToken(bpnNumber, dtOffer); + + if (edrToken != null) { + + URI pcfpushEnpoint = new URI( + edrToken.getEndpoint() + SLASH_DELIMETER + PRODUCT_IDS + SLASH_DELIMETER + productId); + + Map header = new HashMap<>(); + header.put(edrToken.getAuthKey(), edrToken.getAuthCode()); + + pcfExchangeProxy.uploadPcfSubmodel(pcfpushEnpoint, header, bpnNumber, requestId, message, + jsonObjectMapper.gsonObjectToJsonNode(calculatedPCFValue)); + + sendNotificationStatus = "SUCCESS"; + } else { + String warn="EDC connector " + dtOffer.getConnectorOfferUrl() + + ", The EDR token is null to find pcf exchange asset"; + log.warn(warn); + sendNotificationStatus = warn; + } + } catch (FeignException e) { + log.error("FeignRequest:" + e.request()); + String errorMsg = "Unable to send notification to consumer because: " + + (StringUtils.isBlank(e.contentUTF8()) ? e.getMessage() : e.contentUTF8()); + log.error("FeignException : " + errorMsg); + sendNotificationStatus = errorMsg; + } finally { + pcfRepositoryService.updatePCFPushStatus(status, requestId, sendNotificationStatus); + } + } + + + +} From cb164678fdbd6f1c5282c2c945f13ad63234b7d8 Mon Sep 17 00:00:00 2001 From: ChetanT-System Date: Mon, 1 Apr 2024 23:04:26 +0530 Subject: [PATCH 2/2] - Changelog file updated --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c488b2cf..54bbe6167 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Updated supported sub-model implementation classes. - EDC asset update refactored in supported submodels. - Support for pcf v6.0.0 submodel. +- Updated PCF exchange service implementation code. ## [2.3.6] - 2024-03-06