From aeceac3e96e6cc12bb974c679fc7bc4fc26d98cb Mon Sep 17 00:00:00 2001 From: Pedro Nascimento Date: Tue, 29 Oct 2024 14:01:16 -0300 Subject: [PATCH 01/11] Add Terms of Use acceptance feature for User model --- app/models.py | 3 +++ app/routers/frontend.py | 20 ++++++++++++++++++++ migrations/app/35_20241029140029_update.py | 14 ++++++++++++++ 3 files changed, 37 insertions(+) create mode 100644 migrations/app/35_20241029140029_update.py diff --git a/app/models.py b/app/models.py index 0096d066..73a2eecd 100644 --- a/app/models.py +++ b/app/models.py @@ -41,6 +41,9 @@ class User(Model): is_2fa_required = fields.BooleanField(default=False) is_2fa_activated = fields.BooleanField(default=False) is_ergon_validation_required = fields.BooleanField(default=False) + # Terms of use + is_use_terms_accepted = fields.BooleanField(default=False) + use_terms_accepted_at = fields.DatetimeField(null=True) # Metadata is_active = fields.BooleanField(default=True) is_superuser = fields.BooleanField(default=False) diff --git a/app/routers/frontend.py b/app/routers/frontend.py index 6bc729ea..3ce243cc 100644 --- a/app/routers/frontend.py +++ b/app/routers/frontend.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import asyncio +import datetime from typing import Annotated, List from fastapi import APIRouter, Depends, Request from fastapi_limiter.depends import RateLimiter @@ -45,6 +46,7 @@ async def get_user_info( "role": user.role.job_title if user.role else None, "email": user.email, "username": user.username, + "is_use_terms_accepted": user.is_use_terms_accepted, "cpf": cpf, } @@ -150,6 +152,24 @@ async def get_patient_encounters( return [] +@router_request( + method="POST", + router=router, + path="/user/accept-terms/", + response_model=bool, +) +async def accept_use_terms( + user: Annotated[User, Depends(assert_user_is_active)], + request: Request, +) -> List[Encounter]: + + user.is_use_terms_accepted = True + user.use_terms_accepted_at = datetime.datetime.now() + await user.save() + + return user + + @router.get("/patient/filter_tags") async def get_filter_tags(_: Annotated[User, Depends(assert_user_is_active)]) -> List[str]: return [ diff --git a/migrations/app/35_20241029140029_update.py b/migrations/app/35_20241029140029_update.py new file mode 100644 index 00000000..e131aab1 --- /dev/null +++ b/migrations/app/35_20241029140029_update.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +from tortoise import BaseDBAsyncClient + + +async def upgrade(db: BaseDBAsyncClient) -> str: + return """ + ALTER TABLE "user" ADD "use_terms_accepted_at" TIMESTAMPTZ; + ALTER TABLE "user" ADD "is_use_terms_accepted" BOOL NOT NULL DEFAULT False;""" + + +async def downgrade(db: BaseDBAsyncClient) -> str: + return """ + ALTER TABLE "user" DROP COLUMN "use_terms_accepted_at"; + ALTER TABLE "user" DROP COLUMN "is_use_terms_accepted";""" From 185ce215faf06c7fca58290c5a6be57183b93559 Mon Sep 17 00:00:00 2001 From: Pedro Nascimento Date: Tue, 29 Oct 2024 14:40:24 -0300 Subject: [PATCH 02/11] Applying Models in Configuration --- app/enums.py | 4 +++ app/routers/frontend.py | 58 +++++++++++++++++++++++++++-------------- app/types/errors.py | 10 +++++-- 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/app/enums.py b/app/enums.py index f380d187..767ba04f 100644 --- a/app/enums.py +++ b/app/enums.py @@ -18,6 +18,10 @@ class LoginErrorEnum(str, Enum): INACTIVE_EMPLOYEE = "inactive_employee" REQUIRE_2FA = "require_2fa" +class AcceptTermsEnum(str, Enum): + SUCCESS = "success" + FAILURE = "failure" + class AccessErrorEnum(str, Enum): NOT_FOUND = "NOT_FOUND" diff --git a/app/routers/frontend.py b/app/routers/frontend.py index 03912a01..70459a7e 100644 --- a/app/routers/frontend.py +++ b/app/routers/frontend.py @@ -4,6 +4,7 @@ from typing import Annotated, List from fastapi import APIRouter, Depends, Request from fastapi_limiter.depends import RateLimiter +from fastapi.responses import JSONResponse from app.decorators import router_request from app.dependencies import assert_user_is_active, assert_cpf_is_valid @@ -14,6 +15,7 @@ Encounter, UserInfo, ) +from app.types.errors import AcceptTermsEnum from app.utils import read_bq, validate_user_access_to_patient_data from app.config import ( BIGQUERY_PROJECT, @@ -24,7 +26,8 @@ REQUEST_LIMIT_WINDOW_SIZE, ) from app.types.errors import ( - AccessErrorModel + AccessErrorModel, + TermAcceptanceErrorModel ) router = APIRouter(prefix="/frontend", tags=["Frontend Application"]) @@ -51,6 +54,41 @@ async def get_user_info( } +@router_request( + method="POST", + router=router, + path="/user/accept-terms/", + response_model=TermAcceptanceErrorModel, + responses={ + 500: {"model": TermAcceptanceErrorModel}, + }, +) +async def accept_use_terms( + user: Annotated[User, Depends(assert_user_is_active)], + request: Request, +) -> TermAcceptanceErrorModel: + + try: + user.is_use_terms_accepted = True + user.use_terms_accepted_at = datetime.datetime.now() + await user.save() + return JSONResponse( + status_code=200, + content={ + "message": "Success", + "type": AcceptTermsEnum.SUCCESS, + }, + ) + except Exception: + return JSONResponse( + status_code=500, + content={ + "message": "Patient not found", + "type": AcceptTermsEnum.FAILURE, + }, + ) + + @router_request( method="GET", router=router, @@ -152,24 +190,6 @@ async def get_patient_encounters( return [] -@router_request( - method="POST", - router=router, - path="/user/accept-terms/", - response_model=bool, -) -async def accept_use_terms( - user: Annotated[User, Depends(assert_user_is_active)], - request: Request, -) -> List[Encounter]: - - user.is_use_terms_accepted = True - user.use_terms_accepted_at = datetime.datetime.now() - await user.save() - - return user - - @router.get("/patient/filter_tags") async def get_filter_tags(_: Annotated[User, Depends(assert_user_is_active)]) -> List[str]: return [ diff --git a/app/types/errors.py b/app/types/errors.py index a8b76f08..069dfb3c 100644 --- a/app/types/errors.py +++ b/app/types/errors.py @@ -3,7 +3,8 @@ from app.enums import ( LoginErrorEnum, - AccessErrorEnum + AccessErrorEnum, + AcceptTermsEnum ) @@ -14,4 +15,9 @@ class AuthenticationErrorModel(BaseModel): class AccessErrorModel(BaseModel): message: str - type: AccessErrorEnum \ No newline at end of file + type: AccessErrorEnum + + +class TermAcceptanceErrorModel(BaseModel): + message: str + type: AcceptTermsEnum From 77a5d19b10dc9d2c6ab91752e6fc56766e217193 Mon Sep 17 00:00:00 2001 From: Pedro Nascimento Date: Tue, 29 Oct 2024 15:09:18 -0300 Subject: [PATCH 03/11] Add is_use_terms_accepted field to UserInfo model --- app/types/frontend.py | 1 + 1 file changed, 1 insertion(+) diff --git a/app/types/frontend.py b/app/types/frontend.py index fa1bbe50..1da647c4 100644 --- a/app/types/frontend.py +++ b/app/types/frontend.py @@ -78,6 +78,7 @@ class UserInfo(BaseModel): name: Optional[str] cpf: Optional[str] username: Optional[str] + is_use_terms_accepted: Optional[bool] email: Optional[str] role: Optional[str] From 6306fbc0de98aede7264524ad0c96a4082ec4926 Mon Sep 17 00:00:00 2001 From: Pedro Nascimento Date: Tue, 29 Oct 2024 18:04:35 -0300 Subject: [PATCH 04/11] Update dependencies: google-cloud-bigquery to version 3.26.0 --- app/datalake/models.py | 18 ++--- app/datalake/uploader.py | 127 +++++++++++++++++++++++++----------- app/routers/entities_raw.py | 24 +++---- poetry.lock | 22 +++---- pyproject.toml | 1 + 5 files changed, 118 insertions(+), 74 deletions(-) diff --git a/app/datalake/models.py b/app/datalake/models.py index ae56f3c8..dea75522 100644 --- a/app/datalake/models.py +++ b/app/datalake/models.py @@ -109,10 +109,8 @@ class SMSRioPaciente(BaseModel): class Config: dataset_id = "brutos_plataforma_smsrio" table_id = "paciente_cadastro_eventos" - partition_by_date = True - partition_column = "source_updated_at" - biglake_table = True - dataset_is_public = False + biglake_table = False + time_partition_column = "datalake_loaded_at" # =============== @@ -203,10 +201,8 @@ class VitacarePaciente(BaseModel): class Config: dataset_id = "brutos_prontuario_vitacare" table_id = "paciente_eventos" - partition_by_date = True - partition_column = "source_updated_at" - biglake_table = True - dataset_is_public = False + biglake_table = False + time_partition_column = "datalake_loaded_at" class VitacareAtendimento(BaseModel): @@ -248,7 +244,5 @@ class VitacareAtendimento(BaseModel): class Config: dataset_id = "brutos_prontuario_vitacare" table_id = "atendimento_eventos" - partition_by_date = True - partition_column = "source_updated_at" - biglake_table = True - dataset_is_public = False + biglake_table = False + time_partition_column = "datalake_loaded_at" diff --git a/app/datalake/uploader.py b/app/datalake/uploader.py index f0e62f75..2e52b543 100644 --- a/app/datalake/uploader.py +++ b/app/datalake/uploader.py @@ -4,6 +4,7 @@ import shutil import base64 from typing import Optional +from google.cloud import bigquery import pandas as pd import basedosdados as bd @@ -13,22 +14,8 @@ class DatalakeUploader: - def __init__( - self, - if_exists: str = "append", - if_storage_data_exists: str = "replace", - dump_mode: str = "append", - csv_delimiter: str = ";", - force_unique_file_name: bool = False, - ) -> None: - self.if_exists = if_exists - self.if_storage_data_exists = if_storage_data_exists - self.dump_mode = dump_mode - self.csv_delimiter = csv_delimiter - self.force_unique_file_name = force_unique_file_name - + def __init__(self) -> None: self._base_path = os.path.join(os.getcwd(), "files") - self._validate_envs() def _validate_envs(self) -> None: @@ -37,9 +24,7 @@ def _validate_envs(self) -> None: "BASEDOSDADOS_CREDENTIALS_STAGING", "BASEDOSDADOS_CONFIG", ] - missing_envs = [ - env for env in mandatory_envs if env not in os.environ - ] + missing_envs = [env for env in mandatory_envs if env not in os.environ] if len(missing_envs) > 0: raise ValueError(f"Missing environment variables: {missing_envs}") @@ -52,7 +37,6 @@ def _prepare_gcp_credential(self) -> None: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/tmp/credentials.json" return - def _split_dataframe_per_day( self, df: pd.DataFrame, @@ -62,7 +46,7 @@ def _split_dataframe_per_day( if df.empty: logger.warning("Empty dataframe. Preparing to send file with only headers") - dfs = [(str(now.date()), df)] + dfs = [(now.date(), df)] else: logger.warning("Non Empty dataframe. Splitting Dataframe in multiple files by day") df["partition_date"] = pd.to_datetime(df[date_column]).dt.date @@ -77,11 +61,7 @@ def _split_dataframe_per_day( return dfs - def _create_file_name( - self, - table_id: str, - unique: bool = False - ) -> str: + def _create_file_name(self, table_id: str, unique: bool = False) -> str: if unique: return f"{table_id}-{uuid.uuid4()}.parquet" else: @@ -101,6 +81,10 @@ def _upload_files_in_folder( source_format: str = "parquet", biglake_table: bool = True, dataset_is_public: bool = False, + if_exists: str = "append", + if_storage_data_exists: str = "replace", + dump_mode: str = "append", + csv_delimiter: str = ";", ) -> None: self._prepare_gcp_credential() @@ -121,19 +105,19 @@ def _upload_files_in_folder( tb.create( path=folder_path, source_format=source_format, - csv_delimiter=self.csv_delimiter, - if_storage_data_exists=self.if_storage_data_exists, + csv_delimiter=csv_delimiter, + if_storage_data_exists=if_storage_data_exists, biglake_table=biglake_table, dataset_is_public=dataset_is_public, ) else: - if self.dump_mode == "append": + if dump_mode == "append": logger.info( f"TABLE ALREADY EXISTS APPENDING DATA TO STORAGE: {dataset_id}.{table_id}" ) - tb.append(filepath=folder_path, if_exists=self.if_exists) - elif self.dump_mode == "overwrite": + tb.append(filepath=folder_path, if_exists=if_exists) + elif dump_mode == "overwrite": logger.info( "MODE OVERWRITE: Table ALREADY EXISTS, DELETING OLD DATA!\n" f"{storage_path}\n" @@ -153,25 +137,26 @@ def _upload_files_in_folder( tb.create( path=folder_path, source_format=source_format, - csv_delimiter=self.csv_delimiter, - if_storage_data_exists=self.if_storage_data_exists, - biglake_table=self.biglake_table, - dataset_is_public=self.dataset_is_public, + csv_delimiter=csv_delimiter, + if_storage_data_exists=if_storage_data_exists, + biglake_table=biglake_table, + dataset_is_public=dataset_is_public, ) logger.info("Data uploaded to BigQuery") - def upload( + def _upload_as_biglake( self, dataframe: pd.DataFrame, dataset_id: str, table_id: str, - biglake_table: bool = True, dataset_is_public: bool = False, partition_by_date: bool = False, partition_column: Optional[str] = None, source_format: str = "parquet", - **kwargs + force_unique_file_name: bool = False, + **kwargs, ) -> None: + biglake_table = (True,) upload_id = uuid.uuid4() upload_folder = os.path.join(self._base_path, str(upload_id)) @@ -201,14 +186,14 @@ def upload( dataframe = self._cast_to_string(dataframe) dataframe.to_parquet( os.path.join( - folder_path, self._create_file_name(table_id, self.force_unique_file_name) + folder_path, self._create_file_name(table_id, force_unique_file_name) ) ) else: os.makedirs(upload_folder, exist_ok=True) dataframe.to_parquet( os.path.join( - upload_folder, self._create_file_name(table_id, self.force_unique_file_name) + upload_folder, self._create_file_name(table_id, force_unique_file_name) ) ) @@ -226,3 +211,67 @@ def upload( logger.error(f"Error uploading data to BigQuery: {e}") finally: shutil.rmtree(upload_folder) + + def _upload_as_native_table( + self, + dataframe: pd.DataFrame, + dataset_id: str, + table_id: str, + date_partition_column: Optional[str], + create_disposition: str = "CREATE_IF_NEEDED", + write_disposition: str = "WRITE_APPEND", + source_format: str = "PARQUET", + ) -> None: + """ + Uploads a pandas DataFrame to a Google BigQuery table as a native table. + Args: + dataframe (pd.DataFrame): The DataFrame to upload. + dataset_id (str): The ID of the dataset containing the table. + table_id (str): The ID of the table to upload the DataFrame to. + create_disposition (str, optional): Specifies whether the table should be created + if it does not exist. Defaults to "CREATE_IF_NEEDED". + write_disposition (str, optional): Specifies the action that occurs if the + destination table already exists. Defaults to "WRITE_APPEND". + source_format (str, optional): The format of the source data. Defaults to "PARQUET". + date_partition_column (str, optional): The name of the column to use for date + partitioning. + Returns: + bool: True if the upload was successful, False otherwise. + """ + self._prepare_gcp_credential() + + client = bigquery.Client().from_service_account_json("/tmp/credentials.json") + + dataset_ref = client.dataset(dataset_id) + table_ref = dataset_ref.table(table_id) + + job_config_params = { + "create_disposition": create_disposition, + "write_disposition": write_disposition, + "source_format": source_format, + } + if date_partition_column: + if date_partition_column not in dataframe.columns: + raise ValueError( + f"Partition column '{date_partition_column}' not found in DataFrame columns" + ) + job_config_params["time_partitioning"] = bigquery.TimePartitioning( + type_=bigquery.TimePartitioningType.DAY, + field=date_partition_column + ) + + job_result = client.load_table_from_dataframe( + dataframe=dataframe, + destination=table_ref, + job_config=bigquery.LoadJobConfig(**job_config_params), + num_retries=5, + ) + job = client.get_job(job_result.result().job_id) + + return job.state == "DONE" + + def upload(self, dataframe: pd.DataFrame, config: dict) -> None: + if config["biglake_table"]: + self._upload_as_biglake(dataframe, **config) + else: + self._upload_as_native_table(dataframe, **config) diff --git a/app/routers/entities_raw.py b/app/routers/entities_raw.py index a910390a..a2472c51 100644 --- a/app/routers/entities_raw.py +++ b/app/routers/entities_raw.py @@ -26,7 +26,6 @@ async def create_raw_data( entity_name: Literal["patientrecords", "patientconditions", "encounter"], _: Annotated[User, Depends(assert_user_has_pipeline_write_permition)], raw_data: RawDataListModel, - upload_to_datalake: bool = True, ) -> BulkInsertOutputModel: records = raw_data.dict().get("data_list") @@ -56,18 +55,19 @@ async def create_raw_data( system=data_source.system.value, entity=entity_name ) - if upload_to_datalake and formatter: - uploader = DatalakeUploader( - dump_mode="append", - force_unique_file_name=True, + if not formatter: + return HTMLResponse( + status_code=500, + content=f"Formatter not found for {entity_name} and {data_source.cnes}" ) - for config, dataframe in apply_formatter(records, formatter).items(): - uploader.upload( - dataframe=dataframe, - **convert_model_config_to_dict(config) - ) - datalake_status['success'] = True - datalake_status['message'] = "Data uploaded to Datalake" + for config, dataframe in apply_formatter(records, formatter).items(): + uploader = DatalakeUploader() + uploader.upload( + dataframe=dataframe, + config=convert_model_config_to_dict(config) + ) + datalake_status['success'] = True + datalake_status['message'] = "Data uploaded to Datalake" except WrongFormatException as e: return HTMLResponse(status_code=400, content=f"Invalid Format: {e}") except Exception as e: diff --git a/poetry.lock b/poetry.lock index 955aedc5..703e9cf5 100644 --- a/poetry.lock +++ b/poetry.lock @@ -789,30 +789,30 @@ tool = ["click (>=6.0.0)"] [[package]] name = "google-cloud-bigquery" -version = "3.25.0" +version = "3.26.0" description = "Google BigQuery API client library" optional = false python-versions = ">=3.7" files = [ - {file = "google-cloud-bigquery-3.25.0.tar.gz", hash = "sha256:5b2aff3205a854481117436836ae1403f11f2594e6810a98886afd57eda28509"}, - {file = "google_cloud_bigquery-3.25.0-py2.py3-none-any.whl", hash = "sha256:7f0c371bc74d2a7fb74dacbc00ac0f90c8c2bec2289b51dd6685a275873b1ce9"}, + {file = "google_cloud_bigquery-3.26.0-py2.py3-none-any.whl", hash = "sha256:e0e9ad28afa67a18696e624cbccab284bf2c0a3f6eeb9eeb0426c69b943793a8"}, + {file = "google_cloud_bigquery-3.26.0.tar.gz", hash = "sha256:edbdc788beea659e04c0af7fe4dcd6d9155344b98951a0d5055bd2f15da4ba23"}, ] [package.dependencies] -google-api-core = {version = ">=1.34.1,<2.0.dev0 || >=2.11.dev0,<3.0.0dev", extras = ["grpc"]} +google-api-core = {version = ">=2.11.1,<3.0.0dev", extras = ["grpc"]} google-auth = ">=2.14.1,<3.0.0dev" -google-cloud-core = ">=1.6.0,<3.0.0dev" -google-resumable-media = ">=0.6.0,<3.0dev" +google-cloud-core = ">=2.4.1,<3.0.0dev" +google-resumable-media = ">=2.0.0,<3.0dev" packaging = ">=20.0.0" -python-dateutil = ">=2.7.2,<3.0dev" +python-dateutil = ">=2.7.3,<3.0dev" requests = ">=2.21.0,<3.0.0dev" [package.extras] -all = ["Shapely (>=1.8.4,<3.0.0dev)", "db-dtypes (>=0.3.0,<2.0.0dev)", "geopandas (>=0.9.0,<1.0dev)", "google-cloud-bigquery-storage (>=2.6.0,<3.0.0dev)", "grpcio (>=1.47.0,<2.0dev)", "grpcio (>=1.49.1,<2.0dev)", "importlib-metadata (>=1.0.0)", "ipykernel (>=6.0.0)", "ipython (>=7.23.1,!=8.1.0)", "ipywidgets (>=7.7.0)", "opentelemetry-api (>=1.1.0)", "opentelemetry-instrumentation (>=0.20b0)", "opentelemetry-sdk (>=1.1.0)", "pandas (>=1.1.0)", "proto-plus (>=1.15.0,<2.0.0dev)", "protobuf (>=3.19.5,!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev)", "pyarrow (>=3.0.0)", "tqdm (>=4.7.4,<5.0.0dev)"] -bigquery-v2 = ["proto-plus (>=1.15.0,<2.0.0dev)", "protobuf (>=3.19.5,!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev)"] +all = ["Shapely (>=1.8.4,<3.0.0dev)", "bigquery-magics (>=0.1.0)", "db-dtypes (>=0.3.0,<2.0.0dev)", "geopandas (>=0.9.0,<1.0dev)", "google-cloud-bigquery-storage (>=2.6.0,<3.0.0dev)", "grpcio (>=1.47.0,<2.0dev)", "grpcio (>=1.49.1,<2.0dev)", "importlib-metadata (>=1.0.0)", "ipykernel (>=6.0.0)", "ipywidgets (>=7.7.0)", "opentelemetry-api (>=1.1.0)", "opentelemetry-instrumentation (>=0.20b0)", "opentelemetry-sdk (>=1.1.0)", "pandas (>=1.1.0)", "proto-plus (>=1.22.3,<2.0.0dev)", "protobuf (>=3.20.2,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<6.0.0dev)", "pyarrow (>=3.0.0)", "tqdm (>=4.7.4,<5.0.0dev)"] +bigquery-v2 = ["proto-plus (>=1.22.3,<2.0.0dev)", "protobuf (>=3.20.2,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<6.0.0dev)"] bqstorage = ["google-cloud-bigquery-storage (>=2.6.0,<3.0.0dev)", "grpcio (>=1.47.0,<2.0dev)", "grpcio (>=1.49.1,<2.0dev)", "pyarrow (>=3.0.0)"] geopandas = ["Shapely (>=1.8.4,<3.0.0dev)", "geopandas (>=0.9.0,<1.0dev)"] -ipython = ["ipykernel (>=6.0.0)", "ipython (>=7.23.1,!=8.1.0)"] +ipython = ["bigquery-magics (>=0.1.0)"] ipywidgets = ["ipykernel (>=6.0.0)", "ipywidgets (>=7.7.0)"] opentelemetry = ["opentelemetry-api (>=1.1.0)", "opentelemetry-instrumentation (>=0.20b0)", "opentelemetry-sdk (>=1.1.0)"] pandas = ["db-dtypes (>=0.3.0,<2.0.0dev)", "importlib-metadata (>=1.0.0)", "pandas (>=1.1.0)", "pyarrow (>=3.0.0)"] @@ -2916,4 +2916,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "fa36cb1477a80860e9b3571a284ff588647175ef2ed4ff2e663497425524e9cf" +content-hash = "3e4740d0388a7b40649aae504d6922357306084c7b6a5caeb1a2afba94c01d74" diff --git a/pyproject.toml b/pyproject.toml index 14d4d8a0..2b299937 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ pyotp = "^2.9.0" fastapi-simple-rate-limiter = "^0.0.4" fastapi-limiter = "^0.1.6" asgi-lifespan = "^2.1.0" +google-cloud-bigquery = "^3.26.0" [tool.poetry.group.dev.dependencies] From 6c9e87951b93d132c1032637918fa0991118f315 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Wed, 30 Oct 2024 09:32:58 -0300 Subject: [PATCH 05/11] chore: Update table IDs in datalake models --- app/datalake/models.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/datalake/models.py b/app/datalake/models.py index dea75522..3829eccd 100644 --- a/app/datalake/models.py +++ b/app/datalake/models.py @@ -108,7 +108,7 @@ class SMSRioPaciente(BaseModel): class Config: dataset_id = "brutos_plataforma_smsrio" - table_id = "paciente_cadastro_eventos" + table_id = "_paciente_eventos" biglake_table = False time_partition_column = "datalake_loaded_at" @@ -200,7 +200,7 @@ class VitacarePaciente(BaseModel): class Config: dataset_id = "brutos_prontuario_vitacare" - table_id = "paciente_eventos" + table_id = "_paciente_eventos" biglake_table = False time_partition_column = "datalake_loaded_at" @@ -243,6 +243,6 @@ class VitacareAtendimento(BaseModel): class Config: dataset_id = "brutos_prontuario_vitacare" - table_id = "atendimento_eventos" + table_id = "_atendimento_eventos" biglake_table = False time_partition_column = "datalake_loaded_at" From 1618ff40043023d671b2dbb241fe9088112f0c59 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 31 Oct 2024 09:13:53 -0300 Subject: [PATCH 06/11] feat: Add **kwargs to handle unexpected keyword arguments --- app/datalake/uploader.py | 1 + 1 file changed, 1 insertion(+) diff --git a/app/datalake/uploader.py b/app/datalake/uploader.py index 2e52b543..4cd2e7a6 100644 --- a/app/datalake/uploader.py +++ b/app/datalake/uploader.py @@ -221,6 +221,7 @@ def _upload_as_native_table( create_disposition: str = "CREATE_IF_NEEDED", write_disposition: str = "WRITE_APPEND", source_format: str = "PARQUET", + **kwargs, ) -> None: """ Uploads a pandas DataFrame to a Google BigQuery table as a native table. From 1a4b22a9ca31e83e6d6db32ec4c69a88f7da1567 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 31 Oct 2024 09:19:25 -0300 Subject: [PATCH 07/11] chore: Update date partition column name in datalake models --- app/datalake/models.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/datalake/models.py b/app/datalake/models.py index 3829eccd..3269f44b 100644 --- a/app/datalake/models.py +++ b/app/datalake/models.py @@ -110,7 +110,7 @@ class Config: dataset_id = "brutos_plataforma_smsrio" table_id = "_paciente_eventos" biglake_table = False - time_partition_column = "datalake_loaded_at" + date_partition_column = "datalake_loaded_at" # =============== @@ -202,7 +202,7 @@ class Config: dataset_id = "brutos_prontuario_vitacare" table_id = "_paciente_eventos" biglake_table = False - time_partition_column = "datalake_loaded_at" + date_partition_column = "datalake_loaded_at" class VitacareAtendimento(BaseModel): @@ -245,4 +245,4 @@ class Config: dataset_id = "brutos_prontuario_vitacare" table_id = "_atendimento_eventos" biglake_table = False - time_partition_column = "datalake_loaded_at" + date_partition_column = "datalake_loaded_at" From d70fb221a907db90d920bc8a06aeb915ccfa67f9 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 31 Oct 2024 10:27:19 -0300 Subject: [PATCH 08/11] chore: Update date partition column name and generate BigQuery schema in DatalakeUploader --- app/datalake/uploader.py | 9 +++++++- app/datalake/utils.py | 44 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/app/datalake/uploader.py b/app/datalake/uploader.py index 4cd2e7a6..bbb6af27 100644 --- a/app/datalake/uploader.py +++ b/app/datalake/uploader.py @@ -11,6 +11,7 @@ from loguru import logger +from app.datalake.utils import generate_bigquery_schema class DatalakeUploader: @@ -256,9 +257,15 @@ def _upload_as_native_table( raise ValueError( f"Partition column '{date_partition_column}' not found in DataFrame columns" ) + + dataframe["data_particao"] = pd.to_datetime(dataframe[date_partition_column]) job_config_params["time_partitioning"] = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.DAY, - field=date_partition_column + field="data_particao" + ) + job_config_params["schema"]=generate_bigquery_schema( + dataframe, + datetime_as="DATE" ) job_result = client.load_table_from_dataframe( diff --git a/app/datalake/utils.py b/app/datalake/utils.py index 9cc9e29b..8b121605 100644 --- a/app/datalake/utils.py +++ b/app/datalake/utils.py @@ -1,8 +1,10 @@ # -*- coding: utf-8 -*- +import os import json import pandas as pd from typing import Callable from loguru import logger +from google.cloud import bigquery REGISTERED_FORMATTERS = {} @@ -143,3 +145,45 @@ def apply_formatter(records: list[dict], formatter: Callable) -> dict: tables[table_config] = pd.DataFrame(rows) return tables + + +def generate_bigquery_schema(df: pd.DataFrame, datetime_as="TIMESTAMP") -> list[bigquery.SchemaField]: + """ + Generates a BigQuery schema based on the provided DataFrame. + + Args: + df (pd.DataFrame): The DataFrame for which the BigQuery schema needs to be generated. + + Returns: + list[bigquery.SchemaField]: The generated BigQuery schema as a list of SchemaField objects. + """ + TYPE_MAPPING = { + "i": "INTEGER", + "u": "NUMERIC", + "b": "BOOLEAN", + "f": "FLOAT", + "O": "STRING", + "S": "STRING", + "U": "STRING", + "M": datetime_as, + } + schema = [] + for column, dtype in df.dtypes.items(): + val = df[column].iloc[0] + mode = "REPEATED" if isinstance(val, list) else "NULLABLE" + + if isinstance(val, dict) or (mode == "REPEATED" and isinstance(val[0], dict)): + fields = generate_bigquery_schema(pd.json_normalize(val)) + else: + fields = () + + _type = "RECORD" if fields else TYPE_MAPPING.get(dtype.kind) + schema.append( + bigquery.SchemaField( + name=column, + field_type=_type, + mode=mode, + fields=fields, + ) + ) + return schema \ No newline at end of file From c45fb273480e7d0086b786a36d5b67c382f84997 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Thu, 31 Oct 2024 10:46:45 -0300 Subject: [PATCH 09/11] Implemented Async call through bigquery client --- app/datalake/uploader.py | 15 ++++++----- app/routers/entities_raw.py | 2 +- poetry.lock | 54 ++++++++++++++++++++++++++++++++++++- pyproject.toml | 1 + 4 files changed, 63 insertions(+), 9 deletions(-) diff --git a/app/datalake/uploader.py b/app/datalake/uploader.py index bbb6af27..f91bb934 100644 --- a/app/datalake/uploader.py +++ b/app/datalake/uploader.py @@ -5,7 +5,7 @@ import base64 from typing import Optional from google.cloud import bigquery - +from asyncify import asyncify import pandas as pd import basedosdados as bd @@ -145,7 +145,7 @@ def _upload_files_in_folder( ) logger.info("Data uploaded to BigQuery") - def _upload_as_biglake( + async def _upload_as_biglake( self, dataframe: pd.DataFrame, dataset_id: str, @@ -213,7 +213,7 @@ def _upload_as_biglake( finally: shutil.rmtree(upload_folder) - def _upload_as_native_table( + async def _upload_as_native_table( self, dataframe: pd.DataFrame, dataset_id: str, @@ -274,12 +274,13 @@ def _upload_as_native_table( job_config=bigquery.LoadJobConfig(**job_config_params), num_retries=5, ) - job = client.get_job(job_result.result().job_id) + result = await asyncify(job_result.result)() + job = client.get_job(result.job_id) return job.state == "DONE" - def upload(self, dataframe: pd.DataFrame, config: dict) -> None: + async def upload(self, dataframe: pd.DataFrame, config: dict) -> None: if config["biglake_table"]: - self._upload_as_biglake(dataframe, **config) + await self._upload_as_biglake(dataframe, **config) else: - self._upload_as_native_table(dataframe, **config) + await self._upload_as_native_table(dataframe, **config) diff --git a/app/routers/entities_raw.py b/app/routers/entities_raw.py index a2472c51..855883b0 100644 --- a/app/routers/entities_raw.py +++ b/app/routers/entities_raw.py @@ -62,7 +62,7 @@ async def create_raw_data( ) for config, dataframe in apply_formatter(records, formatter).items(): uploader = DatalakeUploader() - uploader.upload( + await uploader.upload( dataframe=dataframe, config=convert_model_config_to_dict(config) ) diff --git a/poetry.lock b/poetry.lock index 703e9cf5..54729dc7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -54,6 +54,17 @@ files = [ [package.dependencies] typing_extensions = ">=3.7.2" +[[package]] +name = "annotated-types" +version = "0.7.0" +description = "Reusable constraint types to use with typing.Annotated" +optional = false +python-versions = ">=3.8" +files = [ + {file = "annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53"}, + {file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"}, +] + [[package]] name = "anyio" version = "4.3.0" @@ -113,6 +124,21 @@ files = [ [package.dependencies] anyio = ">=3.4.0,<5.0" +[[package]] +name = "asyncify" +version = "0.10.0" +description = "sync 2 async" +optional = false +python-versions = ">=3.8,<4.0" +files = [ + {file = "asyncify-0.10.0-py3-none-any.whl", hash = "sha256:9e2bbf1f88ec8a2238b1ff2690c98c1710db967abdc50e51b64b4b22a03a38f3"}, + {file = "asyncify-0.10.0.tar.gz", hash = "sha256:548f7bf14e418bc6b37fa5b1384d76a007c4f8704022c521951f081fdf3efead"}, +] + +[package.dependencies] +funkify = ">=0.4.0" +xtyping = ">=0.5.0" + [[package]] name = "asyncpg" version = "0.29.0" @@ -688,6 +714,17 @@ mccabe = ">=0.7.0,<0.8.0" pycodestyle = ">=2.11.0,<2.12.0" pyflakes = ">=3.1.0,<3.2.0" +[[package]] +name = "funkify" +version = "0.4.5" +description = "Funkify modules so that they are callable" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "funkify-0.4.5-py3-none-any.whl", hash = "sha256:43f1e6c27263468a60ba560dfc13e6e4df57aa75376438a62f741ffc7c83cdfe"}, + {file = "funkify-0.4.5.tar.gz", hash = "sha256:42df845f4afa63e0e66239a986d26b6572ab0b7ad600d7d6365d44d8a0cff3d5"}, +] + [[package]] name = "google-api-core" version = "2.19.1" @@ -2913,7 +2950,22 @@ files = [ [package.extras] dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] +[[package]] +name = "xtyping" +version = "0.8.2" +description = "xtyping = typing + typing_extensions" +optional = false +python-versions = ">=3.8,<4.0" +files = [ + {file = "xtyping-0.8.2-py3-none-any.whl", hash = "sha256:9c80bbf5bcc9aa4ec5d5ad4b05c1778ac5312476a7169502e79f31eedbf77590"}, + {file = "xtyping-0.8.2.tar.gz", hash = "sha256:219d14d2782d986e86d7310e4190701e2b521f158350cf9b5afc0f0c793cb98f"}, +] + +[package.dependencies] +annotated-types = ">=0.5.0" +typing-extensions = ">=4.4.0" + [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "3e4740d0388a7b40649aae504d6922357306084c7b6a5caeb1a2afba94c01d74" +content-hash = "f7d6eabcb4f3b42549234e5c9b1c52987d3cda99fc546a740d4ae3e333a476c2" diff --git a/pyproject.toml b/pyproject.toml index 2b299937..6a467b14 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ fastapi-simple-rate-limiter = "^0.0.4" fastapi-limiter = "^0.1.6" asgi-lifespan = "^2.1.0" google-cloud-bigquery = "^3.26.0" +asyncify = "^0.10.0" [tool.poetry.group.dev.dependencies] From c10feaceb2a3cd888d6780c46d644f8eaef345f9 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Fri, 1 Nov 2024 13:01:12 -0300 Subject: [PATCH 10/11] chore: Update user_cnes in validate_user_access_to_patient_data function --- app/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/utils.py b/app/utils.py index b6ad7a84..c3b3d6fe 100644 --- a/app/utils.py +++ b/app/utils.py @@ -198,7 +198,7 @@ async def validate_user_access_to_patient_data(user: User, cpf: str) -> tuple[bo user_permition_filter = user.role.permition.filter_clause.format( user_cpf=user.cpf, user_ap=user.data_source.ap, - user_cnes=user.data_source, + user_cnes=user.data_source.cnes, ) # Build the query From f417478f7d09f4c26f3b2b2c1565125d823dc950 Mon Sep 17 00:00:00 2001 From: Pedro Marques Date: Fri, 1 Nov 2024 13:02:34 -0300 Subject: [PATCH 11/11] chore: Update permission.csv to use UNNEST for unit and AP checks --- data/permition.csv | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data/permition.csv b/data/permition.csv index df613443..4e1b5c41 100644 --- a/data/permition.csv +++ b/data/permition.csv @@ -3,6 +3,6 @@ pipeline_write,Não permite ao usuário acesso a dados do HCI,"1 = 0" pipeline_read,Não permite ao usuário acesso a dados do HCI,"1 = 0" pipeline_readwrite,Não permite ao usuário acesso a dados do HCI,"1 = 0" only_from_same_cpf,Permite ao usuário acesso a apenas dados do próprio paciente,"cpf = '{user_cpf}'" -only_from_same_unit,Permite ao usuário acesso apenas a dados de pacientes com algum cadastro na sua unidade de trabalho,"'{user_cnes}' IN exibicao.unidades_cadastro" -only_from_same_ap,Permite ao usuário acesso apenas a dados de pacientes com algum cadastro na sua AP,"'{user_ap}' IN exibicao.ap_cadastro" +only_from_same_unit,Permite ao usuário acesso apenas a dados de pacientes com algum cadastro na sua unidade de trabalho,"'{user_cnes}' IN UNNEST(exibicao.unidades_cadastro)" +only_from_same_ap,Permite ao usuário acesso apenas a dados de pacientes com algum cadastro na sua AP,"'{user_ap}' IN UNNEST(exibicao.ap_cadastro)" full_permition,Permite acesso a todos os dados de paciente,"1 = 1" \ No newline at end of file