Skip to content

Commit

Permalink
PIN-5680 - Fix datalake data export (#1212)
Browse files Browse the repository at this point in the history
  • Loading branch information
Carminepo2 authored Nov 22, 2024
1 parent 0a287ea commit 086ddd3
Show file tree
Hide file tree
Showing 43 changed files with 1,637 additions and 3,486 deletions.
20 changes: 14 additions & 6 deletions .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: "Build & Push"
on:
push:
branches:
- "main"
- "develop"
tags:
- "*"
paths:
Expand Down Expand Up @@ -55,8 +55,16 @@ jobs:
id: login-ecr
uses: aws-actions/amazon-ecr-login@062b18b96a7aff071d4dc91bc00c4c1a7945b076 # v2

- name: (latest) Docker metadata
id: meta_latest
- name: Normalize ref name
id: norm_ref
run: |
set -euo pipefail
NORM_REF="$(echo ${{ github.ref_name }} | sed -e 's/\//-/g')"
echo "NORM_REF=$NORM_REF" >> $GITHUB_ENV
- name: (branch) Docker metadata
id: meta_branch
if: ${{ github.ref_type == 'branch' }}
uses: docker/metadata-action@60a0d343a0d8a18aedee9d34e62251f752153bdb
with:
Expand All @@ -66,7 +74,7 @@ jobs:
prefix=
suffix=
tags: |
type=raw,value=2.x-latest
type=raw,value=${{ env.NORM_REF }}
- name: (tag) Docker metadata
id: meta_tag
Expand All @@ -88,5 +96,5 @@ jobs:
context: .
file: packages/${{ matrix.package }}/Dockerfile
push: true
tags: ${{ github.ref_type == 'branch' && steps.meta_latest.outputs.tags || steps.meta_tag.outputs.tags }}
labels: ${{ github.ref_type == 'branch' && steps.meta_latest.outputs.labels || steps.meta_tag.outputs.labels }}
tags: ${{ github.ref_type == 'branch' && steps.meta_branch.outputs.tags || steps.meta_tag.outputs.tags }}
labels: ${{ github.ref_type == 'branch' && steps.meta_branch.outputs.labels || steps.meta_tag.outputs.labels }}
4 changes: 2 additions & 2 deletions packages/agreement-platformstate-writer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
"vitest": "1.6.0"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "3.637.0",
"@aws-sdk/util-dynamodb": "3.637.0",
"@aws-sdk/client-dynamodb": "3.693.0",
"@aws-sdk/util-dynamodb": "3.693.0",
"@protobuf-ts/runtime": "2.9.4",
"connection-string": "4.4.0",
"dotenv-flow": "4.1.0",
Expand Down
16 changes: 7 additions & 9 deletions packages/anac-certified-attributes-importer/.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ READMODEL_DB_NAME="readmodel"
READMODEL_DB_USERNAME="root"
READMODEL_DB_PASSWORD="example"
READMODEL_DB_PORT=27017
SFTP_HOST=""
SFTP_PORT=0
SFTP_USERNAME=""
SFTP_PASSWORD=""

SFTP_HOST="127.0.0.1"
SFTP_PORT=2022
SFTP_USERNAME="test"
SFTP_PASSWORD="test"
SFTP_FILENAME_PREFIX=""
SFTP_PATH=""
FORCE_REMOTE_FILE_NAME=""
FORCE_REMOTE_FILE_NAME="anac-test-csv.csv"

INTERNAL_JWT_KID=ffcc9b5b-4612-49b1-9374-9d203a3834f2
INTERNAL_JWT_SUBJECT="dev-refactor.interop-eservice-descriptors-archiver"
Expand All @@ -22,6 +22,4 @@ INTERNAL_JWT_SECONDS_DURATION=60

TENANT_PROCESS_URL="http://localhost:3500"
RECORDS_PROCESS_BATCH_SIZE=5
ANAC_TENANT_ID=""

AWS_CONFIG_FILE=aws.config.local
ANAC_TENANT_ID="69e2865e-65ab-4e48-a638-2037a9ee2ee7"
1 change: 1 addition & 0 deletions packages/anac-certified-attributes-importer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ WORKDIR /app
COPY package.json /app/
COPY pnpm-lock.yaml /app/
COPY pnpm-workspace.yaml /app/
COPY .npmrc /app/

COPY ./packages/anac-certified-attributes-importer/package.json /app/packages/anac-certified-attributes-importer/package.json
COPY ./packages/commons/package.json /app/packages/commons/package.json
Expand Down
4 changes: 0 additions & 4 deletions packages/anac-certified-attributes-importer/aws.config.local

This file was deleted.

6 changes: 2 additions & 4 deletions packages/anac-certified-attributes-importer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"lint:autofix": "eslint . --ext .ts,.tsx --fix",
"format:check": "prettier --check src",
"format:write": "prettier --write src",
"start": "node --loader ts-node/esm -r 'dotenv-flow/config' --watch ./src/index.ts",
"start": "tsx -r 'dotenv-flow/config' --watch ./src/index.ts",
"build": "tsc",
"check": "tsc --project tsconfig.check.json"
},
Expand All @@ -24,16 +24,14 @@
"@types/ssh2-sftp-client": "9.0.4",
"pagopa-interop-commons-test": "workspace:*",
"prettier": "2.8.8",
"@types/uuid": "9.0.8",
"testcontainers": "10.9.0",
"ts-node": "10.9.2",
"tsx": "4.19.1",
"typescript": "5.4.5",
"vitest": "1.6.0"
},
"dependencies": {
"csv": "^6.3.2",
"dotenv-flow": "4.1.0",
"uuid": "10.0.0",
"pagopa-interop-commons": "workspace:*",
"pagopa-interop-models": "workspace:*",
"pagopa-interop-api-clients": "workspace:*",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const AnacCertifiedAttributesImporterConfig = LoggerConfig.and(
z
.object({
TENANT_PROCESS_URL: APIEndpoint,
RECORDS_PROCESS_BATCH_SIZE: z.number(),
RECORDS_PROCESS_BATCH_SIZE: z.coerce.number(),
ANAC_TENANT_ID: z.string(),
})
.transform((c) => ({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { z } from "zod";
export const SftpConfig = z
.object({
SFTP_HOST: z.string(),
SFTP_PORT: z.coerce.number().min(1001),
SFTP_PORT: z.coerce.number(),
SFTP_USERNAME: z.string(),
SFTP_PASSWORD: z.string(),
SFTP_FILENAME_PREFIX: z.string(),
Expand Down
5 changes: 5 additions & 0 deletions packages/anac-certified-attributes-importer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ await importAttributes(
loggerInstance,
correlationId
);

process.exit(0);
// process.exit() should not be required.
// however, something in this script hangs on exit.
// TODO figure out why and remove this workaround.
11 changes: 11 additions & 0 deletions packages/anac-certified-attributes-importer/src/model/tenant.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Tenant } from "pagopa-interop-models";
import { z } from "zod";

export const AnacReadModelTenant = Tenant.pick({
id: true,
externalId: true,
attributes: true,
features: true,
});

export type AnacReadModelTenant = z.infer<typeof AnacReadModelTenant>;
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
/* eslint-disable max-params */
import { parse } from "csv/sync";
import { Logger, RefreshableInteropToken, zipBy } from "pagopa-interop-commons";
import { CorrelationId, Tenant } from "pagopa-interop-models";
import { CorrelationId } from "pagopa-interop-models";
import {
AnacAttributes,
AttributeIdentifiers,
BatchParseResult,
} from "../model/processorModel.js";
import { CsvRow, NonPaRow, PaRow } from "../model/csvRowModel.js";
import { InteropContext } from "../model/interopContextModel.js";
import { AnacReadModelTenant } from "../model/tenant.js";
import { TenantProcessService } from "./tenantProcessService.js";
import { ReadModelQueries } from "./readmodelQueriesService.js";
import { SftpClient } from "./sftpService.js";
Expand Down Expand Up @@ -196,7 +197,9 @@ async function getAttributesIdentifiers(
readModel: ReadModelQueries,
anacTenantId: string
): Promise<AnacAttributes> {
const anacTenant: Tenant = await readModel.getTenantById(anacTenantId);
const anacTenant: AnacReadModelTenant = await readModel.getTenantById(
anacTenantId
);
const certifier = anacTenant.features.find(
(f) => f.type === "PersistentCertifier"
);
Expand Down Expand Up @@ -253,7 +256,7 @@ const prepareTenantsProcessor = (
async function processTenants<T extends CsvRow>(
orgs: T[],
extractTenantCode: (org: T) => string,
retrieveTenants: (codes: string[]) => Promise<Tenant[]>
retrieveTenants: (codes: string[]) => Promise<AnacReadModelTenant[]>
): Promise<void> {
if (orgs.length === 0) {
return;
Expand Down Expand Up @@ -344,7 +347,7 @@ const prepareTenantsProcessor = (
async function assignAttribute(
tenantProcess: TenantProcessService,
refreshableToken: RefreshableInteropToken,
tenant: Tenant,
tenant: AnacReadModelTenant,
attribute: AttributeIdentifiers,
logger: Logger,
correlationId: CorrelationId
Expand All @@ -371,7 +374,7 @@ async function assignAttribute(
async function unassignAttribute(
tenantProcess: TenantProcessService,
refreshableToken: RefreshableInteropToken,
tenant: Tenant,
tenant: AnacReadModelTenant,
attribute: AttributeIdentifiers,
logger: Logger,
correlationId: CorrelationId
Expand All @@ -395,7 +398,10 @@ async function unassignAttribute(
}
}

function tenantContainsAttribute(tenant: Tenant, attributeId: string): boolean {
function tenantContainsAttribute(
tenant: AnacReadModelTenant,
attributeId: string
): boolean {
return (
tenant.attributes.find((attribute) => attribute.id === attributeId) !==
undefined
Expand All @@ -404,7 +410,7 @@ function tenantContainsAttribute(tenant: Tenant, attributeId: string): boolean {

function getMissingTenants(
expectedExternalId: string[],
tenants: Tenant[]
tenants: AnacReadModelTenant[]
): string[] {
const existingSet = new Set(tenants.map((t) => t.externalId.value));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ReadModelRepository } from "pagopa-interop-commons";
import { Attribute, Tenant } from "pagopa-interop-models";
import { Attribute } from "pagopa-interop-models";
import { AnacReadModelTenant } from "../model/tenant.js";

const projectUnrevokedCertifiedAttributes = {
_id: 0,
Expand All @@ -26,7 +27,9 @@ export class ReadModelQueries {
/**
* Retrieve all PA tenants that matches the given IPA codes, with their unrevoked certified attribute
*/
public async getPATenants(ipaCodes: string[]): Promise<Tenant[]> {
public async getPATenants(
ipaCodes: string[]
): Promise<AnacReadModelTenant[]> {
return await this.readModelClient.tenants
.aggregate([
{
Expand All @@ -39,14 +42,16 @@ export class ReadModelQueries {
$project: projectUnrevokedCertifiedAttributes,
},
])
.map(({ data }) => Tenant.parse(data))
.map(({ data }) => AnacReadModelTenant.parse(data))
.toArray();
}

/**
* Retrieve all non-PA tenants that matches the given tax codes, with their unrevoked certified attribute
*/
public async getNonPATenants(taxCodes: string[]): Promise<Tenant[]> {
public async getNonPATenants(
taxCodes: string[]
): Promise<AnacReadModelTenant[]> {
return await this.readModelClient.tenants
.aggregate([
{
Expand All @@ -59,11 +64,11 @@ export class ReadModelQueries {
$project: projectUnrevokedCertifiedAttributes,
},
])
.map(({ data }) => Tenant.parse(data))
.map(({ data }) => AnacReadModelTenant.parse(data))
.toArray();
}

public async getTenantById(tenantId: string): Promise<Tenant> {
public async getTenantById(tenantId: string): Promise<AnacReadModelTenant> {
const result = await this.readModelClient.tenants
.aggregate([
{
Expand All @@ -75,7 +80,7 @@ export class ReadModelQueries {
$project: projectUnrevokedCertifiedAttributes,
},
])
.map(({ data }) => Tenant.parse(data))
.map(({ data }) => AnacReadModelTenant.parse(data))
.toArray();

if (result.length === 0) {
Expand Down Expand Up @@ -111,7 +116,7 @@ export class ReadModelQueries {

public async getTenantsWithAttributes(
attributeIds: string[]
): Promise<Tenant[]> {
): Promise<AnacReadModelTenant[]> {
return await this.readModelClient.tenants
.aggregate([
{
Expand All @@ -123,7 +128,7 @@ export class ReadModelQueries {
$project: projectUnrevokedCertifiedAttributes,
},
])
.map(({ data }) => Tenant.parse(data))
.map(({ data }) => AnacReadModelTenant.parse(data))
.toArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ export class SftpClient {

logger.info(`Loading file ${fileName}`);

const file = await sftpClient.get(this.config.folderPath + fileName);
const file = await sftpClient.get(
[this.config.folderPath, fileName].join("/")
);

await sftpClient.end();

Expand Down
4 changes: 2 additions & 2 deletions packages/backend-for-frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
"vitest": "1.6.0"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "3.602.0",
"@aws-sdk/util-dynamodb": "3.602.0",
"@aws-sdk/client-dynamodb": "3.693.0",
"@aws-sdk/util-dynamodb": "3.693.0",
"@zodios/core": "10.9.6",
"@zodios/express": "10.6.1",
"axios": "1.7.4",
Expand Down
4 changes: 2 additions & 2 deletions packages/catalog-platformstate-writer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
"vitest": "1.6.0"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "3.637.0",
"@aws-sdk/util-dynamodb": "3.637.0",
"@aws-sdk/client-dynamodb": "3.693.0",
"@aws-sdk/util-dynamodb": "3.693.0",
"@protobuf-ts/runtime": "2.9.4",
"connection-string": "4.4.0",
"dotenv-flow": "4.1.0",
Expand Down
6 changes: 3 additions & 3 deletions packages/commons-test/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
"license": "Apache-2.0",
"devDependencies": {
"@anatine/zod-mock": "3.13.4",
"@aws-sdk/client-dynamodb": "3.637.0",
"@aws-sdk/client-sesv2": "3.620.1",
"@aws-sdk/util-dynamodb": "3.658.1",
"@aws-sdk/client-dynamodb": "3.693.0",
"@aws-sdk/client-sesv2": "3.693.0",
"@aws-sdk/util-dynamodb": "3.693.0",
"@pagopa/eslint-config": "3.0.0",
"@protobuf-ts/runtime": "2.9.4",
"@testcontainers/postgresql": "10.9.0",
Expand Down
8 changes: 4 additions & 4 deletions packages/commons/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
},
"license": "Apache-2.0",
"dependencies": {
"@aws-sdk/client-kms": "3.600.0",
"@aws-sdk/client-s3": "3.600.0",
"@aws-sdk/client-sesv2": "3.620.1",
"@aws-sdk/s3-request-presigner": "3.623.0",
"@aws-sdk/client-kms": "3.693.0",
"@aws-sdk/client-s3": "3.693.0",
"@aws-sdk/client-sesv2": "3.693.0",
"@aws-sdk/s3-request-presigner": "3.693.0",
"@zodios/core": "10.9.6",
"@zodios/express": "10.6.1",
"axios": "1.7.4",
Expand Down
1 change: 1 addition & 0 deletions packages/datalake-data-export/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ WORKDIR /app
COPY package.json /app/
COPY pnpm-lock.yaml /app/
COPY pnpm-workspace.yaml /app/
COPY .npmrc /app/

COPY ./packages/commons/package.json /app/packages/commons/package.json
COPY ./packages/models/package.json /app/packages/models/package.json
Expand Down
2 changes: 1 addition & 1 deletion packages/datalake-data-export/aws.config.local
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[default]
aws_access_key_id=testawskey
aws_secret_access_key=testawssecret
region=eu-central-1
region=eu-south-1
Loading

0 comments on commit 086ddd3

Please sign in to comment.