Skip to content

Commit

Permalink
Merge pull request #154 from catenax-ng/release_refactor_pcf_dt_edc_2405
Browse files Browse the repository at this point in the history
feat | Release refactor pcf dt edc code to fix breaking change
  • Loading branch information
almadigabor authored May 9, 2024
2 parents 37f08fe + d798beb commit 021035a
Show file tree
Hide file tree
Showing 26 changed files with 385 additions and 191 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Supporting new submodel Singlelevelbomasplanned.
- Support EDC 7.
- Added new files for digital twin access rule support.
- Refactor code for pcf, dt access API, EDC 7.

### Fixed
- Remove garbage character from 'edc_request_template' path. Fixed [#147](https://github.com/eclipse-tractusx/managed-simple-data-exchanger-backend/issues/147).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,8 @@ public class PcfResponseEntity {

@Column(name = "last_updated_time")
private Long lastUpdatedTime;

@Column(name = "message", columnDefinition = "TEXT")
private String message;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
@SuppressWarnings("unchecked")
public class AsyncPushPCFDataForApproveRequest {

private static final String PRODUCT_ID = "product_id";
private static final String PRODUCT_ID = "productId";

private final PCFRepositoryService pcfRepositoryService;

Expand All @@ -54,13 +54,15 @@ public void pushPCFDataForApproveRequest(List<JsonObject> jsonObjectList, Policy
List<String> accessBPNList = PolicyOperationUtil.getAccessBPNList(policy);

List<String> productList = jsonObjectList.stream()
.map(ele -> JsonObjectUtility.getValueFromJsonObject(ele, PRODUCT_ID)).toList();
.map(obj-> obj.get("csv").getAsJsonObject())
.map(ele -> JsonObjectUtility.getValueFromJsonObject(ele, PRODUCT_ID))
.toList();

markedPCFDataForPendingProviderRequestAsRequested(productList, jsonObjectList);

PagingResponse pcfData = pcfRepositoryService.getPcfData(
List.of(PCFRequestStatusEnum.PUSHED, PCFRequestStatusEnum.PUSHED_UPDATED_DATA), PCFTypeEnum.PROVIDER, 0,
1000);
100000);
List<PcfRequestModel> requestList = (List<PcfRequestModel>) pcfData.getItems();

if (!requestList.isEmpty()) {
Expand All @@ -75,9 +77,13 @@ public void pushPCFDataForApproveRequest(List<JsonObject> jsonObjectList, Policy
request.setStatus(PCFRequestStatusEnum.PUSHING_UPDATED_DATA);

JsonObject calculatedPCFValue = jsonObjectList.stream()
.filter(ele -> request.getProductId()
.equals(JsonObjectUtility.getValueFromJsonObject(ele, PRODUCT_ID)))
.findAny().orElseThrow(() -> new NoDataFoundException(
.filter(ele -> {
ele = ele.get("csv").getAsJsonObject();
return request.getProductId().equals(JsonObjectUtility.getValueFromJsonObject(ele, PRODUCT_ID));
})
.map(obj-> obj.get("json").getAsJsonObject())
.findAny()
.orElseThrow(() -> new NoDataFoundException(
"No data found for product_id " + request.getProductId()));

PCFRequestStatusEnum status = pcfRepositoryService.identifyRunningStatus(request.getRequestId(),
Expand All @@ -86,7 +92,7 @@ public void pushPCFDataForApproveRequest(List<JsonObject> jsonObjectList, Policy
// push api call
Runnable runnable = () -> proxyRequestInterface.sendNotificationToConsumer(status,
calculatedPCFValue, request.getProductId(), request.getBpnNumber(),
request.getRequestId());
request.getRequestId(), request.getMessage());

new Thread(runnable).start();

Expand All @@ -113,7 +119,7 @@ public void markedPCFDataForPendingProviderRequestAsRequested(List<String> produ
List<JsonObject> jsonObjectList) {

PagingResponse pcfData = pcfRepositoryService
.getPcfData(List.of(PCFRequestStatusEnum.PENDING_DATA_FROM_PROVIDER), PCFTypeEnum.PROVIDER, 0, 1000);
.getPcfData(List.of(PCFRequestStatusEnum.PENDING_DATA_FROM_PROVIDER), PCFTypeEnum.PROVIDER, 0, 100000);
List<PcfRequestModel> requestList = (List<PcfRequestModel>) pcfData.getItems();

if (!requestList.isEmpty()) {
Expand All @@ -122,7 +128,7 @@ public void markedPCFDataForPendingProviderRequestAsRequested(List<String> produ
String msg = "";
try {

JsonObject calculatedPCFValue = jsonObjectList.stream()
jsonObjectList.stream()
.filter(ele -> request.getProductId()
.equals(JsonObjectUtility.getValueFromJsonObject(ele, PRODUCT_ID)))
.findAny().orElseThrow(() -> new NoDataFoundException(
Expand All @@ -145,4 +151,4 @@ public void markedPCFDataForPendingProviderRequestAsRequested(List<String> produ
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,18 @@ public PCFRequestStatusEnum updatePCFPushStatus(PCFRequestStatusEnum status, Str
&& SUCCESS.equalsIgnoreCase(sendNotificationStatus)) {
status = PCFRequestStatusEnum.PUSHED_UPDATED_DATA;
sendNotificationStatus ="PCF updated data successfuly pushed";
} else if (PCFRequestStatusEnum.REJECTED.equals(status) && SUCCESS.equalsIgnoreCase(sendNotificationStatus)) {
} else if ((PCFRequestStatusEnum.REJECTED.equals(status)
|| PCFRequestStatusEnum.SENDING_REJECT_NOTIFICATION.equals(status))
&& SUCCESS.equalsIgnoreCase(sendNotificationStatus)) {
status = PCFRequestStatusEnum.REJECTED;
sendNotificationStatus ="PCF request rejected successfuly";
sendNotificationStatus = "PCF request rejected successfuly";
} else if (PCFRequestStatusEnum.APPROVED.equals(status)
|| PCFRequestStatusEnum.FAILED_TO_PUSH_DATA.equals(status)
|| PCFRequestStatusEnum.PUSHING_DATA.equals(status)
|| PCFRequestStatusEnum.PUSHING_UPDATED_DATA.equals(status)) {
status = PCFRequestStatusEnum.FAILED_TO_PUSH_DATA;
} else if (PCFRequestStatusEnum.REJECTED.equals(status)
|| PCFRequestStatusEnum.SENDING_REJECT_NOTIFICATION.equals(status)
|| PCFRequestStatusEnum.FAILED_TO_SEND_REJECT_NOTIFICATION.equals(status))
status = PCFRequestStatusEnum.FAILED_TO_SEND_REJECT_NOTIFICATION;
else {
Expand Down Expand Up @@ -127,6 +130,7 @@ public PcfRequestEntity savePcfStatus(String requestId, PCFRequestStatusEnum sta

log.info(LogUtil.encode("'" + pcfRequestEntity.getProductId() + "' pcf request saved in the database successfully as " +
status));

pcfRequestRepository.save(pcfRequestEntity);
return pcfRequestEntity;

Expand All @@ -147,4 +151,4 @@ public PagingResponse getPcfData(List<PCFRequestStatusEnum> status, PCFTypeEnum
.totalItems(result.getTotalElements()).build();
}

}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/********************************************************************************
* Copyright (c) 2024 T-Systems International GmbH
* Copyright (c) 2024 Contributors to the Eclipse Foundation
* Copyright (c) 2023,2024 T-Systems International GmbH
* Copyright (c) 2023,2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -60,8 +60,10 @@ public class PcfExchangeServiceImpl implements IPCFExchangeService {
private final PCFRepositoryService pcfRepositoryService;
private final PcfReqsponseRepository pcfReqsponseRepository;
private final EDCAssetUrlCacheService edcAssetUrlCacheService;

@Qualifier("DatabaseUsecaseHandler")
private final DatabaseUsecaseStep databaseUsecaseStep;

private final ProxyRequestInterface proxyRequestInterface;

@SneakyThrows
Expand Down Expand Up @@ -133,14 +135,15 @@ public String actionOnPcfRequestAndSendNotificationToConsumer(PcfRequestModel pc
try {

JsonObject calculatedPCFValue = databaseUsecaseStep.readCreatedTwinsBySpecifyColomn(
"urn:bamm:io.catenax.pcf", "productId", pcfRequestModel.getProductId());

"urn:bamm:io.catenax.pcf", pcfRequestModel.getProductId()).get("json").getAsJsonObject();
PCFRequestStatusEnum status = pcfRepositoryService.identifyRunningStatus(pcfRequestModel.getRequestId(),
pcfRequestModel.getStatus());

// push api call
Runnable runnable = () -> proxyRequestInterface.sendNotificationToConsumer(status, calculatedPCFValue,
pcfRequestModel.getProductId(), pcfRequestModel.getBpnNumber(), pcfRequestModel.getRequestId());
pcfRequestModel.getProductId(), pcfRequestModel.getBpnNumber(), pcfRequestModel.getRequestId(),
pcfRequestModel.getMessage());

new Thread(runnable).start();

Expand All @@ -153,7 +156,7 @@ public String actionOnPcfRequestAndSendNotificationToConsumer(PcfRequestModel pc
pcfRepositoryService.savePcfRequestData(pcfRequestModel.getRequestId(), pcfRequestModel.getProductId(),
pcfRequestModel.getBpnNumber(), pcfRequestModel.getMessage(), PCFTypeEnum.PROVIDER,
PCFRequestStatusEnum.FAILED, remark);
log.error(LogUtil.encode(remark));
log.warn(LogUtil.encode(remark));
throw new ValidationException(e.getMessage());
} catch (Exception e) {
pcfRepositoryService.savePcfStatus(pcfRequestModel.getRequestId(), PCFRequestStatusEnum.FAILED);
Expand All @@ -167,8 +170,7 @@ public PcfRequestModel savePcfRequestData(String requestId, String productId, St
PCFRequestStatusEnum status = PCFRequestStatusEnum.REQUESTED;
String remark = "";
try {
databaseUsecaseStep.readCreatedTwinsBySpecifyColomn("urn:bamm:io.catenax.pcf", "productId",
productId);
databaseUsecaseStep.readCreatedTwinsBySpecifyColomn("urn:bamm:io.catenax.pcf", productId);
} catch (NoDataFoundException e) {
String msg = "The PCF calculated value does not exist in system, please upload PCF value for '" + productId
+ "' in systems using Manual/Recurring Upload";
Expand All @@ -180,27 +182,33 @@ public PcfRequestModel savePcfRequestData(String requestId, String productId, St
status, remark);
}

@SneakyThrows
@Override
public void recievedPCFData(String productId, String bpnNumber, String requestId, String message,
JsonNode pcfData) {

PCFRequestStatusEnum status = PCFRequestStatusEnum.FAILED;
try {
status = PCFRequestStatusEnum.valueOf(message);
} catch (Exception e) {
log.error("Unable to find PCF value status " + e.getMessage());
}
PCFRequestStatusEnum status = null;

PcfResponseEntity entity = PcfResponseEntity.builder().pcfData(pcfData).requestId(requestId)
.responseId(UUID.randomUUID().toString()).lastUpdatedTime(Instant.now().getEpochSecond()).build();
PcfResponseEntity entity = PcfResponseEntity.builder()
.pcfData(pcfData)
.requestId(requestId)
.message(message)
.responseId(UUID.randomUUID().toString())
.lastUpdatedTime(Instant.now().getEpochSecond())
.build();

pcfReqsponseRepository.save(entity);

if (PCFRequestStatusEnum.APPROVED.equals(status) || PCFRequestStatusEnum.PUSHING_DATA.equals(status)
|| PCFRequestStatusEnum.PUSHING_UPDATED_DATA.equals(status)) {
if(StringUtils.isBlank(requestId))
throw new ServiceException("RequestId not recieved from provider to marked PCF exchange request");

if (pcfData != null && !pcfData.isEmpty() && !pcfData.asText().equals("{}")) {
status = PCFRequestStatusEnum.RECEIVED;
}

} else if (StringUtils.isNotBlank(requestId)) {
status = PCFRequestStatusEnum.REJECTED;
} else
status = PCFRequestStatusEnum.FAILED;

pcfRepositoryService.savePcfStatus(requestId, status);

}
Expand All @@ -222,4 +230,4 @@ public PcfResponseEntity viewForPcfDataOffer(String requestId) {
return findById.get();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void requestToProviderForPCFValue(String productId, StringBuilder sb, Str

@SneakyThrows
public void sendNotificationToConsumer(PCFRequestStatusEnum status, JsonObject calculatedPCFValue,
String productId, String bpnNumber, String requestId) {
String productId, String bpnNumber, String requestId, String message) {

// 1 fetch EDC connectors and DTR Assets from EDC connectors
List<QueryDataOfferModel> pcfExchangeUrlOffers = edcAssetUrlCacheService.getPCFExchangeUrlFromTwin(bpnNumber);
Expand All @@ -112,9 +112,9 @@ public void sendNotificationToConsumer(PCFRequestStatusEnum status, JsonObject c
pcfExchangeUrlOffers.parallelStream().forEach(dtOffer -> {

if (PCFRequestStatusEnum.SENDING_REJECT_NOTIFICATION.equals(status)) {
sendNotification(null, productId, bpnNumber, requestId, dtOffer, status);
sendNotification(null, productId, bpnNumber, requestId, dtOffer, status, message);
} else {
sendNotification(calculatedPCFValue, productId, bpnNumber, requestId, dtOffer, status);
sendNotification(calculatedPCFValue, productId, bpnNumber, requestId, dtOffer, status, message);
}

});
Expand All @@ -123,11 +123,9 @@ public void sendNotificationToConsumer(PCFRequestStatusEnum status, JsonObject c

@SneakyThrows
private void sendNotification(JsonObject calculatedPCFValue, String productId, String bpnNumber, String requestId,
QueryDataOfferModel dtOffer, PCFRequestStatusEnum status) {
QueryDataOfferModel dtOffer, PCFRequestStatusEnum status, String message) {
String sendNotificationStatus = "";
try {
String message = status.name();

EDRCachedByIdResponse edrToken = edcAssetUrlCacheService.verifyAndGetToken(bpnNumber, dtOffer);

if (edrToken != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/********************************************************************************
#* Copyright (c) 2024 T-Systems International GmbH
#* Copyright (c) 2024 Contributors to the Eclipse Foundation
#* Copyright (c) 2022,2024 T-Systems International GmbH
#* Copyright (c) 2022,2024 Contributors to the Eclipse Foundation
#*
#* See the NOTICE file(s) distributed with this work for additional
#* information regarding copyright ownership.
Expand Down Expand Up @@ -28,7 +28,7 @@
@Configuration
@Data
public class DigitalTwinConfigurationProperties {

@Value("${digital-twins.hostname:default}")
private String digitalTwinsHostname;

Expand All @@ -50,10 +50,10 @@ public class DigitalTwinConfigurationProperties {
@Value(value = "${digital-twins.authentication.grantType}")
private String digitalTwinsAuthenticationGrantType;

@Value("${digital-twins.registry.uri:/api/v3.0}")
@Value("${digital-twins.registry.uri:/api/v3}")
private String digitalTwinsRegistryPath;

@Value("${digital-twins.lookup.uri:/api/v3.0}")
@Value("${digital-twins.lookup.uri:/api/v3}")
private String digitalTwinsLookupPath;

@Value(value = "${manufacturerId}")
Expand All @@ -76,4 +76,4 @@ public String getDigitalTwinEdcDataplaneEndpoint() {
return this.edcHostname+this.edcDataplaneEndpointpath;
}

}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/********************************************************************************
* Copyright (c) 2022 BMW GmbH
* Copyright (c) 2022, 2023 T-Systems International GmbH
* Copyright (c) 2022, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2022,2024 T-Systems International GmbH
* Copyright (c) 2022,2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -50,7 +50,8 @@ private CommonConstants() {

public static final String EXTERNAL_REFERENCE = "ExternalReference";
public static final String GLOBAL_REFERENCE = "GlobalReference";
public static final String SUBMODEL = "Submodel";
public static final String BODY_ENCODING = "plain";
public static final String INTERFACE = "SUBMODEL-3.0";

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ private SubmoduleCommonColumnsConstant() {
}

public static final String SHELL_ID = "shell_id";
public static final String SHELL_ACCESS_RULE_IDS = "shell_access_rule_ids";
public static final String SUBMODULE_ID = "sub_model_id";
public static final String ASSET_ID = "asset_id";
public static final String USAGE_POLICY_ID = "usage_policy_id";
Expand All @@ -37,4 +38,4 @@ private SubmoduleCommonColumnsConstant() {
public static final String UPDATED = "updated";
public static final String MANUFACTURER_PART_ID_FIELD = "manufacturer_part_id";

}
}
Loading

0 comments on commit 021035a

Please sign in to comment.