Skip to content

Commit

Permalink
[PAGOPA-2367] chore
Browse files Browse the repository at this point in the history
  • Loading branch information
jacopocarlini committed Nov 19, 2024
1 parent 6805dac commit a01b52d
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 87 deletions.
2 changes: 1 addition & 1 deletion helm/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ microservice-chart:
- 8080
ingress:
create: true
host: "weudev.bizevents.internal.dev.platform.pagopa.it"
host: "weuprod.bizevents.internal.prod.platform.pagopa.it"
path: /pagopa-biz-pm-ingestion/(.*)
servicePort: 8080
serviceAccount:
Expand Down
53 changes: 53 additions & 0 deletions script/trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import requests
import time
from datetime import datetime, timedelta
import os


SUBSCRIPTION_KEY = os.getenv("OCP_APIM_SUBSCRIPTION_KEY")
PM_INGESTION_URL = os.getenv("PM_INGESTION_URL")

if not SUBSCRIPTION_KEY:
raise EnvironmentError("La variabile di ambiente 'OCP_APIM_SUBSCRIPTION_KEY' non è configurata.")

if not PM_INGESTION_URL:
raise EnvironmentError("La variabile di ambiente 'PM_INGESTION_URL' non è configurata.")

# Configurazioni
BASE_URL = f"{PM_INGESTION_URL}/extraction/data"
PM_EXTRACTION_TYPES = ["CARD", "BPAY", "PAYPAL"]
HEADERS = {
"accept": "*/*",
"Content-Type": "application/json",
"Ocp-Apim-Subscription-Key": SUBSCRIPTION_KEY, # Ottieni la subkey dall'ambiente
}
INTERVAL_SECONDS = 5 * 60 # Configura qui l'intervallo in secondi (5 minuti = 300 secondi)
current_date = datetime(2023, 3, 31) # Data di partenza
end_date = datetime(2018, 1, 1) # Data finale

def post_requests():
"""Esegue le chiamate POST per ogni tipo di estrazione."""
creation_date = current_date.strftime("%Y-%m-%d")
for pm_type in PM_EXTRACTION_TYPES:
payload = {
"taxCodes": [],
"creationDateFrom": creation_date,
"creationDateTo": creation_date,
}
url = f"{BASE_URL}?pmExtractionType={pm_type}"
try:
response = requests.post(url, headers=HEADERS, json=payload)
print(f"POST to {url} with payload {payload}: {response.status_code} - {response.text}")
except Exception as e:
print(f"Errore durante la richiesta POST per {pm_type}: {e}")

# Loop principale
print(f"Avvio dello script. Data iniziale: {current_date.strftime('%Y-%m-%d')}")

while current_date >= end_date:
post_requests()
current_date -= timedelta(days=1) # Riduci la data di 1 giorno
print(f"Data successiva: {current_date.strftime('%Y-%m-%d')}")
time.sleep(INTERVAL_SECONDS) # Attendi l'intervallo configurato

print("Script terminato correttamente.")
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.swagger.v3.oas.annotations.tags.Tag;
import it.gov.pagopa.bizpmingestion.enumeration.PMExtractionType;
import it.gov.pagopa.bizpmingestion.model.DataExtractionOptionsModel;
import it.gov.pagopa.bizpmingestion.model.ExtractionResponse;
import it.gov.pagopa.bizpmingestion.model.ProblemJson;
import jakarta.validation.constraints.NotNull;
import org.springframework.http.MediaType;
Expand All @@ -26,15 +27,15 @@ public interface IPMExtractionController {

@Operation(summary = "Request for data extraction from the PM.", security = {@SecurityRequirement(name = "ApiKey"), @SecurityRequirement(name = "ApiKey")}, operationId = "pmDataExtraction")
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Request paid."),
@ApiResponse(responseCode = "200", description = "Request paid.", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ExtractionResponse.class))),
@ApiResponse(responseCode = "400", description = "Malformed request.", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ProblemJson.class))),
@ApiResponse(responseCode = "401", description = "Wrong or missing function key.", content = @Content(schema = @Schema())),
@ApiResponse(responseCode = "404", description = "Not found.", content = @Content(schema = @Schema(implementation = ProblemJson.class))),
@ApiResponse(responseCode = "409", description = "Conflict.", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ProblemJson.class))),
@ApiResponse(responseCode = "422", description = "Unprocessable entity.", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ProblemJson.class))),
@ApiResponse(responseCode = "500", description = "Service unavailable.", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ProblemJson.class)))})
@PostMapping(produces = {"application/json"}, consumes = {"application/json"})
ResponseEntity<Void> pmDataExtraction(
ResponseEntity<ExtractionResponse> pmDataExtraction(
@Schema(requiredMode = Schema.RequiredMode.REQUIRED, implementation = PMExtractionType.class) @RequestParam(required = true, name = "pmExtractionType", defaultValue = "CARD") @NotNull PMExtractionType pmExtractionType,
@RequestBody DataExtractionOptionsModel dataExtractionOptionsModel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import it.gov.pagopa.bizpmingestion.controller.IPMExtractionController;
import it.gov.pagopa.bizpmingestion.enumeration.PMExtractionType;
import it.gov.pagopa.bizpmingestion.model.DataExtractionOptionsModel;
import it.gov.pagopa.bizpmingestion.model.ExtractionResponse;
import it.gov.pagopa.bizpmingestion.service.IPMExtractionService;
import it.gov.pagopa.bizpmingestion.util.CommonUtility;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;

Expand All @@ -24,10 +24,10 @@ public PMExtractionController(IPMExtractionService pmExtractionService) {
}

@Override
public ResponseEntity<Void> pmDataExtraction(PMExtractionType pmExtractionType,
DataExtractionOptionsModel dataExtractionOptionsModel) {
public ResponseEntity<ExtractionResponse> pmDataExtraction(PMExtractionType pmExtractionType,
DataExtractionOptionsModel dataExtractionOptionsModel) {
log.info(String.format(LOG_BASE_HEADER_INFO, "POST", "pmDataExtraction", CommonUtility.sanitize(pmExtractionType.toString()) + "; " + CommonUtility.sanitize(dataExtractionOptionsModel.toString())));
pmExtractionService.pmDataExtraction(dataExtractionOptionsModel.getCreationDateFrom(), dataExtractionOptionsModel.getCreationDateTo(), dataExtractionOptionsModel.getTaxCodes(), pmExtractionType);
return ResponseEntity.status(HttpStatus.OK).build();
var body = pmExtractionService.pmDataExtraction(dataExtractionOptionsModel.getCreationDateFrom(), dataExtractionOptionsModel.getCreationDateTo(), dataExtractionOptionsModel.getTaxCodes(), pmExtractionType);
return ResponseEntity.ok(body);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package it.gov.pagopa.bizpmingestion.model;

import lombok.Builder;

@Builder
public class ExtractionResponse {
private Integer element;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

import java.util.List;

import it.gov.pagopa.bizpmingestion.model.ExtractionResponse;
import org.springframework.http.ResponseEntity;


public interface IPMExtractionService {

ResponseEntity<Void> pmDataExtraction(String dateFrom, String dateTo, List<String> taxCodes, PMExtractionType pmExtractionType);
ExtractionResponse pmDataExtraction(String dateFrom, String dateTo, List<String> taxCodes, PMExtractionType pmExtractionType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package it.gov.pagopa.bizpmingestion.service.impl;

import it.gov.pagopa.bizpmingestion.entity.cosmos.execution.BizEventsPMIngestionExecution;
import it.gov.pagopa.bizpmingestion.entity.pm.PPTransaction;
import it.gov.pagopa.bizpmingestion.enumeration.PaymentMethodType;
import it.gov.pagopa.bizpmingestion.model.pm.PMEvent;
import it.gov.pagopa.bizpmingestion.model.pm.PMEventPaymentDetail;
import it.gov.pagopa.bizpmingestion.model.pm.PMEventToViewResult;
import it.gov.pagopa.bizpmingestion.repository.*;
import it.gov.pagopa.bizpmingestion.service.IPMEventToViewService;
import lombok.extern.slf4j.Slf4j;
import org.modelmapper.ModelMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;

@EnableAsync
@Service
@Slf4j
public class AyncService {

private static final String LOG_BASE_HEADER_INFO = "[ClassMethod: %s] - [MethodParamsToLog: %s]";

private final ModelMapper modelMapper;
private final BizEventsViewGeneralRepository bizEventsViewGeneralRepository;
private final BizEventsViewCartRepository bizEventsViewCartRepository;
private final BizEventsViewUserRepository bizEventsViewUserRepository;
private final PMIngestionExecutionRepository pmIngestionExecutionRepository;
private final IPMEventToViewService pmEventToViewService;

@Autowired
public AyncService(ModelMapper modelMapper, PPTransactionRepository ppTransactionRepository,
BizEventsViewGeneralRepository bizEventsViewGeneralRepository, BizEventsViewCartRepository bizEventsViewCartRepository,
BizEventsViewUserRepository bizEventsViewUserRepository, PMIngestionExecutionRepository pmIngestionExecutionRepository,
IPMEventToViewService pmEventToViewService) {
this.modelMapper = modelMapper;
this.bizEventsViewGeneralRepository = bizEventsViewGeneralRepository;
this.bizEventsViewCartRepository = bizEventsViewCartRepository;
this.bizEventsViewUserRepository = bizEventsViewUserRepository;
this.pmIngestionExecutionRepository = pmIngestionExecutionRepository;
this.pmEventToViewService = pmEventToViewService;
}

@Async
public void processDataAsync(List<PPTransaction> ppTrList, PaymentMethodType paymentMethodType, BizEventsPMIngestionExecution pmIngestionExec) {

try {

List<Long> skippedId = Collections.synchronizedList(new ArrayList<>());
pmIngestionExec.setStatus("DONE");

var pmEventList = ppTrList.stream()
.map(ppTransaction -> modelMapper.map(ppTransaction, PMEvent.class))
.toList();

int importedEventsCounter = pmEventList.parallelStream()
.map(pmEvent -> {
try {

PMEventPaymentDetail pmEventPaymentDetail = Optional.ofNullable(pmEvent.getPaymentDetailList())
.orElse(Collections.emptyList())
.stream()
.max(Comparator.comparing(PMEventPaymentDetail::getImporto))
.orElseThrow();

PMEventToViewResult result = pmEventToViewService.mapPMEventToView(pmEvent, pmEventPaymentDetail, paymentMethodType);
if (result != null) {
bizEventsViewGeneralRepository.save(result.getGeneralView());
bizEventsViewCartRepository.save(result.getCartView());
bizEventsViewUserRepository.saveAll(result.getUserViewList());
return 1;
}
return 0;
} catch (Exception e) {
pmIngestionExec.setStatus("DONE WITH SKIP");
skippedId.add(pmEvent.getPkTransactionId());

log.error(String.format(LOG_BASE_HEADER_INFO, "processDataAsync", "[processId=" + pmIngestionExec.getId() + "] - Error importing PM event with id=" + pmEvent.getPkTransactionId()
+ " (err desc = " + e.getMessage() + ")"), e);
return 0;
}
})
.reduce(Integer::sum)
.orElse(-1);

pmIngestionExec.setNumRecordIngested(importedEventsCounter);
pmIngestionExec.setSkippedID(skippedId);


} catch (Exception e) {
pmIngestionExec.setStatus("FAILED");

log.error(String.format(LOG_BASE_HEADER_INFO, "processDataAsync", "[processId=" + pmIngestionExec.getId() + "] - Error during asynchronous processing: " + e.getMessage()));
} finally {
pmIngestionExec.setEndTime(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(LocalDateTime.now()));
pmIngestionExecutionRepository.save(pmIngestionExec);

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import it.gov.pagopa.bizpmingestion.enumeration.PaymentMethodType;
import it.gov.pagopa.bizpmingestion.exception.AppError;
import it.gov.pagopa.bizpmingestion.exception.AppException;
import it.gov.pagopa.bizpmingestion.model.ExtractionResponse;
import it.gov.pagopa.bizpmingestion.model.pm.PMEvent;
import it.gov.pagopa.bizpmingestion.model.pm.PMEventPaymentDetail;
import it.gov.pagopa.bizpmingestion.model.pm.PMEventToViewResult;
Expand Down Expand Up @@ -53,13 +54,15 @@ public class PMExtractionService implements IPMExtractionService {
private final BizEventsViewUserRepository bizEventsViewUserRepository;
private final PMIngestionExecutionRepository pmIngestionExecutionRepository;
private final IPMEventToViewService pmEventToViewService;

private final Object lock = new Object();


@Autowired
AyncService ayncService;


@Autowired
public PMExtractionService(ModelMapper modelMapper, PPTransactionRepository ppTransactionRepository,
BizEventsViewGeneralRepository bizEventsViewGeneralRepository, BizEventsViewCartRepository bizEventsViewCartRepository,
BizEventsViewUserRepository bizEventsViewUserRepository, PMIngestionExecutionRepository pmIngestionExecutionRepository,
public PMExtractionService(ModelMapper modelMapper, PPTransactionRepository ppTransactionRepository,
BizEventsViewGeneralRepository bizEventsViewGeneralRepository, BizEventsViewCartRepository bizEventsViewCartRepository,
BizEventsViewUserRepository bizEventsViewUserRepository, PMIngestionExecutionRepository pmIngestionExecutionRepository,
IPMEventToViewService pmEventToViewService) {
this.modelMapper = modelMapper;
this.ppTransactionRepository = ppTransactionRepository;
Expand All @@ -73,8 +76,8 @@ public PMExtractionService(ModelMapper modelMapper, PPTransactionRepository ppTr

@Override
@Transactional
public ResponseEntity<Void> pmDataExtraction(String dateFrom, String dateTo, List<String> taxCodes, PMExtractionType pmExtractionType) {
public ExtractionResponse pmDataExtraction(String dateFrom, String dateTo, List<String> taxCodes, PMExtractionType pmExtractionType) {

BizEventsPMIngestionExecution pmIngestionExec = BizEventsPMIngestionExecution.builder()
.id(UUID.randomUUID().toString())
.startTime(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(LocalDateTime.now()))
Expand All @@ -101,78 +104,17 @@ public ResponseEntity<Void> pmDataExtraction(String dateFrom, String dateTo, Lis
default -> throw new AppException(AppError.BAD_REQUEST,
"Invalid PM extraction type [pmExtractionType=" + pmExtractionType + "]");
};

List<PPTransaction> ppTrList = ppTransactionRepository.findAll(Specification.where(spec));

pmIngestionExec.setNumRecordFound(ppTrList.size());

processDataAsync(ppTrList, paymentMethodType, pmIngestionExec);

return ResponseEntity.ok().build();
}

@Async
public void processDataAsync(List<PPTransaction> ppTrList, PaymentMethodType paymentMethodType, BizEventsPMIngestionExecution pmIngestionExec) {

try {

List<Long> skippedId = Collections.synchronizedList(new ArrayList<>());

synchronized (lock) {
pmIngestionExec.setStatus("DONE");
}

var pmEventList = ppTrList.stream()
.map(ppTransaction -> modelMapper.map(ppTransaction, PMEvent.class))
.toList();

int importedEventsCounter = pmEventList.parallelStream()
.map(pmEvent -> {
try {

PMEventPaymentDetail pmEventPaymentDetail = Optional.ofNullable(pmEvent.getPaymentDetailList())
.orElse(Collections.emptyList())
.stream()
.max(Comparator.comparing(PMEventPaymentDetail::getImporto))
.orElseThrow();

PMEventToViewResult result = pmEventToViewService.mapPMEventToView(pmEvent, pmEventPaymentDetail, paymentMethodType);
if (result != null) {
bizEventsViewGeneralRepository.save(result.getGeneralView());
bizEventsViewCartRepository.save(result.getCartView());
bizEventsViewUserRepository.saveAll(result.getUserViewList());
return 1;
}
return 0;
} catch (Exception e) {
synchronized (lock) {
pmIngestionExec.setStatus("DONE WITH SKIP");
skippedId.add(pmEvent.getPkTransactionId());
}
log.error(String.format(LOG_BASE_HEADER_INFO, "processDataAsync", "[processId="+pmIngestionExec.getId()+"] - Error importing PM event with id=" + pmEvent.getPkTransactionId()
+ " (err desc = " + e.getMessage() + ")"), e);
return 0;
}
})
.reduce(Integer::sum)
.orElse(-1);

synchronized (lock) {
pmIngestionExec.setNumRecordIngested(importedEventsCounter);
pmIngestionExec.setSkippedID(skippedId);
}

} catch (Exception e) {
synchronized (lock) {
pmIngestionExec.setStatus("FAILED");
}
log.error(String.format(LOG_BASE_HEADER_INFO, "processDataAsync", "[processId="+pmIngestionExec.getId()+"] - Error during asynchronous processing: " + e.getMessage()));
} finally {
synchronized (lock) {
pmIngestionExec.setEndTime(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(LocalDateTime.now()));
pmIngestionExecutionRepository.save(pmIngestionExec);
}
}

ayncService.processDataAsync(ppTrList, paymentMethodType, pmIngestionExec);

return ExtractionResponse.builder()
.element(ppTrList.size())
.build();
}


}

0 comments on commit a01b52d

Please sign in to comment.