From a01b52d91d550ff4507685a2534fa5256100277a Mon Sep 17 00:00:00 2001 From: Jacopo Carlini Date: Tue, 19 Nov 2024 09:22:01 +0100 Subject: [PATCH] [PAGOPA-2367] chore --- helm/values-prod.yaml | 2 +- script/trigger.py | 53 +++++++++ .../controller/IPMExtractionController.java | 5 +- .../impl/PMExtractionController.java | 10 +- .../model/ExtractionResponse.java | 8 ++ .../service/IPMExtractionService.java | 3 +- .../service/impl/AyncService.java | 106 ++++++++++++++++++ .../service/impl/PMExtractionService.java | 98 ++++------------ 8 files changed, 198 insertions(+), 87 deletions(-) create mode 100644 script/trigger.py create mode 100644 src/main/java/it/gov/pagopa/bizpmingestion/model/ExtractionResponse.java create mode 100644 src/main/java/it/gov/pagopa/bizpmingestion/service/impl/AyncService.java diff --git a/helm/values-prod.yaml b/helm/values-prod.yaml index 1dfa7d2..db5bdc5 100644 --- a/helm/values-prod.yaml +++ b/helm/values-prod.yaml @@ -29,7 +29,7 @@ microservice-chart: - 8080 ingress: create: true - host: "weudev.bizevents.internal.dev.platform.pagopa.it" + host: "weuprod.bizevents.internal.prod.platform.pagopa.it" path: /pagopa-biz-pm-ingestion/(.*) servicePort: 8080 serviceAccount: diff --git a/script/trigger.py b/script/trigger.py new file mode 100644 index 0000000..2c10096 --- /dev/null +++ b/script/trigger.py @@ -0,0 +1,53 @@ +import requests +import time +from datetime import datetime, timedelta +import os + + +SUBSCRIPTION_KEY = os.getenv("OCP_APIM_SUBSCRIPTION_KEY") +PM_INGESTION_URL = os.getenv("PM_INGESTION_URL") + +if not SUBSCRIPTION_KEY: + raise EnvironmentError("La variabile di ambiente 'OCP_APIM_SUBSCRIPTION_KEY' non è configurata.") + +if not PM_INGESTION_URL: + raise EnvironmentError("La variabile di ambiente 'PM_INGESTION_URL' non è configurata.") + +# Configurazioni +BASE_URL = f"{PM_INGESTION_URL}/extraction/data" +PM_EXTRACTION_TYPES = ["CARD", "BPAY", "PAYPAL"] +HEADERS = { + "accept": "*/*", + "Content-Type": "application/json", + "Ocp-Apim-Subscription-Key": SUBSCRIPTION_KEY, # Ottieni la subkey dall'ambiente +} +INTERVAL_SECONDS = 5 * 60 # Configura qui l'intervallo in secondi (5 minuti = 300 secondi) +current_date = datetime(2023, 3, 31) # Data di partenza +end_date = datetime(2018, 1, 1) # Data finale + +def post_requests(): + """Esegue le chiamate POST per ogni tipo di estrazione.""" + creation_date = current_date.strftime("%Y-%m-%d") + for pm_type in PM_EXTRACTION_TYPES: + payload = { + "taxCodes": [], + "creationDateFrom": creation_date, + "creationDateTo": creation_date, + } + url = f"{BASE_URL}?pmExtractionType={pm_type}" + try: + response = requests.post(url, headers=HEADERS, json=payload) + print(f"POST to {url} with payload {payload}: {response.status_code} - {response.text}") + except Exception as e: + print(f"Errore durante la richiesta POST per {pm_type}: {e}") + +# Loop principale +print(f"Avvio dello script. Data iniziale: {current_date.strftime('%Y-%m-%d')}") + +while current_date >= end_date: + post_requests() + current_date -= timedelta(days=1) # Riduci la data di 1 giorno + print(f"Data successiva: {current_date.strftime('%Y-%m-%d')}") + time.sleep(INTERVAL_SECONDS) # Attendi l'intervallo configurato + +print("Script terminato correttamente.") diff --git a/src/main/java/it/gov/pagopa/bizpmingestion/controller/IPMExtractionController.java b/src/main/java/it/gov/pagopa/bizpmingestion/controller/IPMExtractionController.java index 0050189..1ae78b8 100644 --- a/src/main/java/it/gov/pagopa/bizpmingestion/controller/IPMExtractionController.java +++ b/src/main/java/it/gov/pagopa/bizpmingestion/controller/IPMExtractionController.java @@ -9,6 +9,7 @@ import io.swagger.v3.oas.annotations.tags.Tag; import it.gov.pagopa.bizpmingestion.enumeration.PMExtractionType; import it.gov.pagopa.bizpmingestion.model.DataExtractionOptionsModel; +import it.gov.pagopa.bizpmingestion.model.ExtractionResponse; import it.gov.pagopa.bizpmingestion.model.ProblemJson; import jakarta.validation.constraints.NotNull; import org.springframework.http.MediaType; @@ -26,7 +27,7 @@ public interface IPMExtractionController { @Operation(summary = "Request for data extraction from the PM.", security = {@SecurityRequirement(name = "ApiKey"), @SecurityRequirement(name = "ApiKey")}, operationId = "pmDataExtraction") @ApiResponses(value = { - @ApiResponse(responseCode = "200", description = "Request paid."), + @ApiResponse(responseCode = "200", description = "Request paid.", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ExtractionResponse.class))), @ApiResponse(responseCode = "400", description = "Malformed request.", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ProblemJson.class))), @ApiResponse(responseCode = "401", description = "Wrong or missing function key.", content = @Content(schema = @Schema())), @ApiResponse(responseCode = "404", description = "Not found.", content = @Content(schema = @Schema(implementation = ProblemJson.class))), @@ -34,7 +35,7 @@ public interface IPMExtractionController { @ApiResponse(responseCode = "422", description = "Unprocessable entity.", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ProblemJson.class))), @ApiResponse(responseCode = "500", description = "Service unavailable.", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ProblemJson.class)))}) @PostMapping(produces = {"application/json"}, consumes = {"application/json"}) - ResponseEntity pmDataExtraction( + ResponseEntity pmDataExtraction( @Schema(requiredMode = Schema.RequiredMode.REQUIRED, implementation = PMExtractionType.class) @RequestParam(required = true, name = "pmExtractionType", defaultValue = "CARD") @NotNull PMExtractionType pmExtractionType, @RequestBody DataExtractionOptionsModel dataExtractionOptionsModel); diff --git a/src/main/java/it/gov/pagopa/bizpmingestion/controller/impl/PMExtractionController.java b/src/main/java/it/gov/pagopa/bizpmingestion/controller/impl/PMExtractionController.java index e4a8b6b..e804b65 100644 --- a/src/main/java/it/gov/pagopa/bizpmingestion/controller/impl/PMExtractionController.java +++ b/src/main/java/it/gov/pagopa/bizpmingestion/controller/impl/PMExtractionController.java @@ -3,11 +3,11 @@ import it.gov.pagopa.bizpmingestion.controller.IPMExtractionController; import it.gov.pagopa.bizpmingestion.enumeration.PMExtractionType; import it.gov.pagopa.bizpmingestion.model.DataExtractionOptionsModel; +import it.gov.pagopa.bizpmingestion.model.ExtractionResponse; import it.gov.pagopa.bizpmingestion.service.IPMExtractionService; import it.gov.pagopa.bizpmingestion.util.CommonUtility; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Controller; @@ -24,10 +24,10 @@ public PMExtractionController(IPMExtractionService pmExtractionService) { } @Override - public ResponseEntity pmDataExtraction(PMExtractionType pmExtractionType, - DataExtractionOptionsModel dataExtractionOptionsModel) { + public ResponseEntity pmDataExtraction(PMExtractionType pmExtractionType, + DataExtractionOptionsModel dataExtractionOptionsModel) { log.info(String.format(LOG_BASE_HEADER_INFO, "POST", "pmDataExtraction", CommonUtility.sanitize(pmExtractionType.toString()) + "; " + CommonUtility.sanitize(dataExtractionOptionsModel.toString()))); - pmExtractionService.pmDataExtraction(dataExtractionOptionsModel.getCreationDateFrom(), dataExtractionOptionsModel.getCreationDateTo(), dataExtractionOptionsModel.getTaxCodes(), pmExtractionType); - return ResponseEntity.status(HttpStatus.OK).build(); + var body = pmExtractionService.pmDataExtraction(dataExtractionOptionsModel.getCreationDateFrom(), dataExtractionOptionsModel.getCreationDateTo(), dataExtractionOptionsModel.getTaxCodes(), pmExtractionType); + return ResponseEntity.ok(body); } } diff --git a/src/main/java/it/gov/pagopa/bizpmingestion/model/ExtractionResponse.java b/src/main/java/it/gov/pagopa/bizpmingestion/model/ExtractionResponse.java new file mode 100644 index 0000000..6d018b3 --- /dev/null +++ b/src/main/java/it/gov/pagopa/bizpmingestion/model/ExtractionResponse.java @@ -0,0 +1,8 @@ +package it.gov.pagopa.bizpmingestion.model; + +import lombok.Builder; + +@Builder +public class ExtractionResponse { + private Integer element; +} diff --git a/src/main/java/it/gov/pagopa/bizpmingestion/service/IPMExtractionService.java b/src/main/java/it/gov/pagopa/bizpmingestion/service/IPMExtractionService.java index bccc792..c97b761 100644 --- a/src/main/java/it/gov/pagopa/bizpmingestion/service/IPMExtractionService.java +++ b/src/main/java/it/gov/pagopa/bizpmingestion/service/IPMExtractionService.java @@ -4,10 +4,11 @@ import java.util.List; +import it.gov.pagopa.bizpmingestion.model.ExtractionResponse; import org.springframework.http.ResponseEntity; public interface IPMExtractionService { - ResponseEntity pmDataExtraction(String dateFrom, String dateTo, List taxCodes, PMExtractionType pmExtractionType); + ExtractionResponse pmDataExtraction(String dateFrom, String dateTo, List taxCodes, PMExtractionType pmExtractionType); } diff --git a/src/main/java/it/gov/pagopa/bizpmingestion/service/impl/AyncService.java b/src/main/java/it/gov/pagopa/bizpmingestion/service/impl/AyncService.java new file mode 100644 index 0000000..5778e7e --- /dev/null +++ b/src/main/java/it/gov/pagopa/bizpmingestion/service/impl/AyncService.java @@ -0,0 +1,106 @@ +package it.gov.pagopa.bizpmingestion.service.impl; + +import it.gov.pagopa.bizpmingestion.entity.cosmos.execution.BizEventsPMIngestionExecution; +import it.gov.pagopa.bizpmingestion.entity.pm.PPTransaction; +import it.gov.pagopa.bizpmingestion.enumeration.PaymentMethodType; +import it.gov.pagopa.bizpmingestion.model.pm.PMEvent; +import it.gov.pagopa.bizpmingestion.model.pm.PMEventPaymentDetail; +import it.gov.pagopa.bizpmingestion.model.pm.PMEventToViewResult; +import it.gov.pagopa.bizpmingestion.repository.*; +import it.gov.pagopa.bizpmingestion.service.IPMEventToViewService; +import lombok.extern.slf4j.Slf4j; +import org.modelmapper.ModelMapper; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; + +@EnableAsync +@Service +@Slf4j +public class AyncService { + + private static final String LOG_BASE_HEADER_INFO = "[ClassMethod: %s] - [MethodParamsToLog: %s]"; + + private final ModelMapper modelMapper; + private final BizEventsViewGeneralRepository bizEventsViewGeneralRepository; + private final BizEventsViewCartRepository bizEventsViewCartRepository; + private final BizEventsViewUserRepository bizEventsViewUserRepository; + private final PMIngestionExecutionRepository pmIngestionExecutionRepository; + private final IPMEventToViewService pmEventToViewService; + + @Autowired + public AyncService(ModelMapper modelMapper, PPTransactionRepository ppTransactionRepository, + BizEventsViewGeneralRepository bizEventsViewGeneralRepository, BizEventsViewCartRepository bizEventsViewCartRepository, + BizEventsViewUserRepository bizEventsViewUserRepository, PMIngestionExecutionRepository pmIngestionExecutionRepository, + IPMEventToViewService pmEventToViewService) { + this.modelMapper = modelMapper; + this.bizEventsViewGeneralRepository = bizEventsViewGeneralRepository; + this.bizEventsViewCartRepository = bizEventsViewCartRepository; + this.bizEventsViewUserRepository = bizEventsViewUserRepository; + this.pmIngestionExecutionRepository = pmIngestionExecutionRepository; + this.pmEventToViewService = pmEventToViewService; + } + + @Async + public void processDataAsync(List ppTrList, PaymentMethodType paymentMethodType, BizEventsPMIngestionExecution pmIngestionExec) { + + try { + + List skippedId = Collections.synchronizedList(new ArrayList<>()); + pmIngestionExec.setStatus("DONE"); + + var pmEventList = ppTrList.stream() + .map(ppTransaction -> modelMapper.map(ppTransaction, PMEvent.class)) + .toList(); + + int importedEventsCounter = pmEventList.parallelStream() + .map(pmEvent -> { + try { + + PMEventPaymentDetail pmEventPaymentDetail = Optional.ofNullable(pmEvent.getPaymentDetailList()) + .orElse(Collections.emptyList()) + .stream() + .max(Comparator.comparing(PMEventPaymentDetail::getImporto)) + .orElseThrow(); + + PMEventToViewResult result = pmEventToViewService.mapPMEventToView(pmEvent, pmEventPaymentDetail, paymentMethodType); + if (result != null) { + bizEventsViewGeneralRepository.save(result.getGeneralView()); + bizEventsViewCartRepository.save(result.getCartView()); + bizEventsViewUserRepository.saveAll(result.getUserViewList()); + return 1; + } + return 0; + } catch (Exception e) { + pmIngestionExec.setStatus("DONE WITH SKIP"); + skippedId.add(pmEvent.getPkTransactionId()); + + log.error(String.format(LOG_BASE_HEADER_INFO, "processDataAsync", "[processId=" + pmIngestionExec.getId() + "] - Error importing PM event with id=" + pmEvent.getPkTransactionId() + + " (err desc = " + e.getMessage() + ")"), e); + return 0; + } + }) + .reduce(Integer::sum) + .orElse(-1); + + pmIngestionExec.setNumRecordIngested(importedEventsCounter); + pmIngestionExec.setSkippedID(skippedId); + + + } catch (Exception e) { + pmIngestionExec.setStatus("FAILED"); + + log.error(String.format(LOG_BASE_HEADER_INFO, "processDataAsync", "[processId=" + pmIngestionExec.getId() + "] - Error during asynchronous processing: " + e.getMessage())); + } finally { + pmIngestionExec.setEndTime(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(LocalDateTime.now())); + pmIngestionExecutionRepository.save(pmIngestionExec); + + } + } + +} diff --git a/src/main/java/it/gov/pagopa/bizpmingestion/service/impl/PMExtractionService.java b/src/main/java/it/gov/pagopa/bizpmingestion/service/impl/PMExtractionService.java index 6c13be1..021ae71 100644 --- a/src/main/java/it/gov/pagopa/bizpmingestion/service/impl/PMExtractionService.java +++ b/src/main/java/it/gov/pagopa/bizpmingestion/service/impl/PMExtractionService.java @@ -6,6 +6,7 @@ import it.gov.pagopa.bizpmingestion.enumeration.PaymentMethodType; import it.gov.pagopa.bizpmingestion.exception.AppError; import it.gov.pagopa.bizpmingestion.exception.AppException; +import it.gov.pagopa.bizpmingestion.model.ExtractionResponse; import it.gov.pagopa.bizpmingestion.model.pm.PMEvent; import it.gov.pagopa.bizpmingestion.model.pm.PMEventPaymentDetail; import it.gov.pagopa.bizpmingestion.model.pm.PMEventToViewResult; @@ -53,13 +54,15 @@ public class PMExtractionService implements IPMExtractionService { private final BizEventsViewUserRepository bizEventsViewUserRepository; private final PMIngestionExecutionRepository pmIngestionExecutionRepository; private final IPMEventToViewService pmEventToViewService; - - private final Object lock = new Object(); - + + @Autowired + AyncService ayncService; + + @Autowired - public PMExtractionService(ModelMapper modelMapper, PPTransactionRepository ppTransactionRepository, - BizEventsViewGeneralRepository bizEventsViewGeneralRepository, BizEventsViewCartRepository bizEventsViewCartRepository, - BizEventsViewUserRepository bizEventsViewUserRepository, PMIngestionExecutionRepository pmIngestionExecutionRepository, + public PMExtractionService(ModelMapper modelMapper, PPTransactionRepository ppTransactionRepository, + BizEventsViewGeneralRepository bizEventsViewGeneralRepository, BizEventsViewCartRepository bizEventsViewCartRepository, + BizEventsViewUserRepository bizEventsViewUserRepository, PMIngestionExecutionRepository pmIngestionExecutionRepository, IPMEventToViewService pmEventToViewService) { this.modelMapper = modelMapper; this.ppTransactionRepository = ppTransactionRepository; @@ -73,8 +76,8 @@ public PMExtractionService(ModelMapper modelMapper, PPTransactionRepository ppTr @Override @Transactional - public ResponseEntity pmDataExtraction(String dateFrom, String dateTo, List taxCodes, PMExtractionType pmExtractionType) { - + public ExtractionResponse pmDataExtraction(String dateFrom, String dateTo, List taxCodes, PMExtractionType pmExtractionType) { + BizEventsPMIngestionExecution pmIngestionExec = BizEventsPMIngestionExecution.builder() .id(UUID.randomUUID().toString()) .startTime(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(LocalDateTime.now())) @@ -101,78 +104,17 @@ public ResponseEntity pmDataExtraction(String dateFrom, String dateTo, Lis default -> throw new AppException(AppError.BAD_REQUEST, "Invalid PM extraction type [pmExtractionType=" + pmExtractionType + "]"); }; - + List ppTrList = ppTransactionRepository.findAll(Specification.where(spec)); - + pmIngestionExec.setNumRecordFound(ppTrList.size()); - - processDataAsync(ppTrList, paymentMethodType, pmIngestionExec); - - return ResponseEntity.ok().build(); - } - - @Async - public void processDataAsync(List ppTrList, PaymentMethodType paymentMethodType, BizEventsPMIngestionExecution pmIngestionExec) { - - try { - - List skippedId = Collections.synchronizedList(new ArrayList<>()); - - synchronized (lock) { - pmIngestionExec.setStatus("DONE"); - } - - var pmEventList = ppTrList.stream() - .map(ppTransaction -> modelMapper.map(ppTransaction, PMEvent.class)) - .toList(); - - int importedEventsCounter = pmEventList.parallelStream() - .map(pmEvent -> { - try { - - PMEventPaymentDetail pmEventPaymentDetail = Optional.ofNullable(pmEvent.getPaymentDetailList()) - .orElse(Collections.emptyList()) - .stream() - .max(Comparator.comparing(PMEventPaymentDetail::getImporto)) - .orElseThrow(); - - PMEventToViewResult result = pmEventToViewService.mapPMEventToView(pmEvent, pmEventPaymentDetail, paymentMethodType); - if (result != null) { - bizEventsViewGeneralRepository.save(result.getGeneralView()); - bizEventsViewCartRepository.save(result.getCartView()); - bizEventsViewUserRepository.saveAll(result.getUserViewList()); - return 1; - } - return 0; - } catch (Exception e) { - synchronized (lock) { - pmIngestionExec.setStatus("DONE WITH SKIP"); - skippedId.add(pmEvent.getPkTransactionId()); - } - log.error(String.format(LOG_BASE_HEADER_INFO, "processDataAsync", "[processId="+pmIngestionExec.getId()+"] - Error importing PM event with id=" + pmEvent.getPkTransactionId() - + " (err desc = " + e.getMessage() + ")"), e); - return 0; - } - }) - .reduce(Integer::sum) - .orElse(-1); - - synchronized (lock) { - pmIngestionExec.setNumRecordIngested(importedEventsCounter); - pmIngestionExec.setSkippedID(skippedId); - } - - } catch (Exception e) { - synchronized (lock) { - pmIngestionExec.setStatus("FAILED"); - } - log.error(String.format(LOG_BASE_HEADER_INFO, "processDataAsync", "[processId="+pmIngestionExec.getId()+"] - Error during asynchronous processing: " + e.getMessage())); - } finally { - synchronized (lock) { - pmIngestionExec.setEndTime(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(LocalDateTime.now())); - pmIngestionExecutionRepository.save(pmIngestionExec); - } - } + + ayncService.processDataAsync(ppTrList, paymentMethodType, pmIngestionExec); + + return ExtractionResponse.builder() + .element(ppTrList.size()) + .build(); } + }