Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Development #256

Merged
merged 15 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions app/datalake/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,9 @@ class SMSRioPaciente(BaseModel):

class Config:
dataset_id = "brutos_plataforma_smsrio"
table_id = "paciente_cadastro_eventos"
partition_by_date = True
partition_column = "source_updated_at"
biglake_table = True
dataset_is_public = False
table_id = "_paciente_eventos"
biglake_table = False
date_partition_column = "datalake_loaded_at"


# ===============
Expand Down Expand Up @@ -202,11 +200,9 @@ class VitacarePaciente(BaseModel):

class Config:
dataset_id = "brutos_prontuario_vitacare"
table_id = "paciente_eventos"
partition_by_date = True
partition_column = "source_updated_at"
biglake_table = True
dataset_is_public = False
table_id = "_paciente_eventos"
biglake_table = False
date_partition_column = "datalake_loaded_at"


class VitacareAtendimento(BaseModel):
Expand Down Expand Up @@ -247,8 +243,6 @@ class VitacareAtendimento(BaseModel):

class Config:
dataset_id = "brutos_prontuario_vitacare"
table_id = "atendimento_eventos"
partition_by_date = True
partition_column = "source_updated_at"
biglake_table = True
dataset_is_public = False
table_id = "_atendimento_eventos"
biglake_table = False
date_partition_column = "datalake_loaded_at"
138 changes: 98 additions & 40 deletions app/datalake/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,19 @@
import shutil
import base64
from typing import Optional

from google.cloud import bigquery
from asyncify import asyncify
import pandas as pd
import basedosdados as bd

from loguru import logger

from app.datalake.utils import generate_bigquery_schema

class DatalakeUploader:

def __init__(
self,
if_exists: str = "append",
if_storage_data_exists: str = "replace",
dump_mode: str = "append",
csv_delimiter: str = ";",
force_unique_file_name: bool = False,
) -> None:
self.if_exists = if_exists
self.if_storage_data_exists = if_storage_data_exists
self.dump_mode = dump_mode
self.csv_delimiter = csv_delimiter
self.force_unique_file_name = force_unique_file_name

def __init__(self) -> None:
self._base_path = os.path.join(os.getcwd(), "files")

self._validate_envs()

def _validate_envs(self) -> None:
Expand All @@ -37,9 +25,7 @@ def _validate_envs(self) -> None:
"BASEDOSDADOS_CREDENTIALS_STAGING",
"BASEDOSDADOS_CONFIG",
]
missing_envs = [
env for env in mandatory_envs if env not in os.environ
]
missing_envs = [env for env in mandatory_envs if env not in os.environ]
if len(missing_envs) > 0:
raise ValueError(f"Missing environment variables: {missing_envs}")

Expand All @@ -52,7 +38,6 @@ def _prepare_gcp_credential(self) -> None:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/tmp/credentials.json"
return


def _split_dataframe_per_day(
self,
df: pd.DataFrame,
Expand All @@ -62,7 +47,7 @@ def _split_dataframe_per_day(

if df.empty:
logger.warning("Empty dataframe. Preparing to send file with only headers")
dfs = [(str(now.date()), df)]
dfs = [(now.date(), df)]
else:
logger.warning("Non Empty dataframe. Splitting Dataframe in multiple files by day")
df["partition_date"] = pd.to_datetime(df[date_column]).dt.date
Expand All @@ -77,11 +62,7 @@ def _split_dataframe_per_day(

return dfs

def _create_file_name(
self,
table_id: str,
unique: bool = False
) -> str:
def _create_file_name(self, table_id: str, unique: bool = False) -> str:
if unique:
return f"{table_id}-{uuid.uuid4()}.parquet"
else:
Expand All @@ -101,6 +82,10 @@ def _upload_files_in_folder(
source_format: str = "parquet",
biglake_table: bool = True,
dataset_is_public: bool = False,
if_exists: str = "append",
if_storage_data_exists: str = "replace",
dump_mode: str = "append",
csv_delimiter: str = ";",
) -> None:
self._prepare_gcp_credential()

Expand All @@ -121,19 +106,19 @@ def _upload_files_in_folder(
tb.create(
path=folder_path,
source_format=source_format,
csv_delimiter=self.csv_delimiter,
if_storage_data_exists=self.if_storage_data_exists,
csv_delimiter=csv_delimiter,
if_storage_data_exists=if_storage_data_exists,
biglake_table=biglake_table,
dataset_is_public=dataset_is_public,
)
else:
if self.dump_mode == "append":
if dump_mode == "append":
logger.info(
f"TABLE ALREADY EXISTS APPENDING DATA TO STORAGE: {dataset_id}.{table_id}"
)

tb.append(filepath=folder_path, if_exists=self.if_exists)
elif self.dump_mode == "overwrite":
tb.append(filepath=folder_path, if_exists=if_exists)
elif dump_mode == "overwrite":
logger.info(
"MODE OVERWRITE: Table ALREADY EXISTS, DELETING OLD DATA!\n"
f"{storage_path}\n"
Expand All @@ -153,25 +138,26 @@ def _upload_files_in_folder(
tb.create(
path=folder_path,
source_format=source_format,
csv_delimiter=self.csv_delimiter,
if_storage_data_exists=self.if_storage_data_exists,
biglake_table=self.biglake_table,
dataset_is_public=self.dataset_is_public,
csv_delimiter=csv_delimiter,
if_storage_data_exists=if_storage_data_exists,
biglake_table=biglake_table,
dataset_is_public=dataset_is_public,
)
logger.info("Data uploaded to BigQuery")

def upload(
async def _upload_as_biglake(
self,
dataframe: pd.DataFrame,
dataset_id: str,
table_id: str,
biglake_table: bool = True,
dataset_is_public: bool = False,
partition_by_date: bool = False,
partition_column: Optional[str] = None,
source_format: str = "parquet",
**kwargs
force_unique_file_name: bool = False,
**kwargs,
) -> None:
biglake_table = (True,)
upload_id = uuid.uuid4()
upload_folder = os.path.join(self._base_path, str(upload_id))

Expand Down Expand Up @@ -201,14 +187,14 @@ def upload(
dataframe = self._cast_to_string(dataframe)
dataframe.to_parquet(
os.path.join(
folder_path, self._create_file_name(table_id, self.force_unique_file_name)
folder_path, self._create_file_name(table_id, force_unique_file_name)
)
)
else:
os.makedirs(upload_folder, exist_ok=True)
dataframe.to_parquet(
os.path.join(
upload_folder, self._create_file_name(table_id, self.force_unique_file_name)
upload_folder, self._create_file_name(table_id, force_unique_file_name)
)
)

Expand All @@ -226,3 +212,75 @@ def upload(
logger.error(f"Error uploading data to BigQuery: {e}")
finally:
shutil.rmtree(upload_folder)

async def _upload_as_native_table(
self,
dataframe: pd.DataFrame,
dataset_id: str,
table_id: str,
date_partition_column: Optional[str],
create_disposition: str = "CREATE_IF_NEEDED",
write_disposition: str = "WRITE_APPEND",
source_format: str = "PARQUET",
**kwargs,
) -> None:
"""
Uploads a pandas DataFrame to a Google BigQuery table as a native table.
Args:
dataframe (pd.DataFrame): The DataFrame to upload.
dataset_id (str): The ID of the dataset containing the table.
table_id (str): The ID of the table to upload the DataFrame to.
create_disposition (str, optional): Specifies whether the table should be created
if it does not exist. Defaults to "CREATE_IF_NEEDED".
write_disposition (str, optional): Specifies the action that occurs if the
destination table already exists. Defaults to "WRITE_APPEND".
source_format (str, optional): The format of the source data. Defaults to "PARQUET".
date_partition_column (str, optional): The name of the column to use for date
partitioning.
Returns:
bool: True if the upload was successful, False otherwise.
"""
self._prepare_gcp_credential()

client = bigquery.Client().from_service_account_json("/tmp/credentials.json")

dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)

job_config_params = {
"create_disposition": create_disposition,
"write_disposition": write_disposition,
"source_format": source_format,
}
if date_partition_column:
if date_partition_column not in dataframe.columns:
raise ValueError(
f"Partition column '{date_partition_column}' not found in DataFrame columns"
)

dataframe["data_particao"] = pd.to_datetime(dataframe[date_partition_column])
job_config_params["time_partitioning"] = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="data_particao"
)
job_config_params["schema"]=generate_bigquery_schema(
dataframe,
datetime_as="DATE"
)

job_result = client.load_table_from_dataframe(
dataframe=dataframe,
destination=table_ref,
job_config=bigquery.LoadJobConfig(**job_config_params),
num_retries=5,
)
result = await asyncify(job_result.result)()
job = client.get_job(result.job_id)

return job.state == "DONE"

async def upload(self, dataframe: pd.DataFrame, config: dict) -> None:
if config["biglake_table"]:
await self._upload_as_biglake(dataframe, **config)
else:
await self._upload_as_native_table(dataframe, **config)
44 changes: 44 additions & 0 deletions app/datalake/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# -*- coding: utf-8 -*-
import os
import json
import pandas as pd
from typing import Callable
from loguru import logger
from google.cloud import bigquery

REGISTERED_FORMATTERS = {}

Expand Down Expand Up @@ -143,3 +145,45 @@ def apply_formatter(records: list[dict], formatter: Callable) -> dict:
tables[table_config] = pd.DataFrame(rows)

return tables


def generate_bigquery_schema(df: pd.DataFrame, datetime_as="TIMESTAMP") -> list[bigquery.SchemaField]:
"""
Generates a BigQuery schema based on the provided DataFrame.

Args:
df (pd.DataFrame): The DataFrame for which the BigQuery schema needs to be generated.

Returns:
list[bigquery.SchemaField]: The generated BigQuery schema as a list of SchemaField objects.
"""
TYPE_MAPPING = {
"i": "INTEGER",
"u": "NUMERIC",
"b": "BOOLEAN",
"f": "FLOAT",
"O": "STRING",
"S": "STRING",
"U": "STRING",
"M": datetime_as,
}
schema = []
for column, dtype in df.dtypes.items():
val = df[column].iloc[0]
mode = "REPEATED" if isinstance(val, list) else "NULLABLE"

if isinstance(val, dict) or (mode == "REPEATED" and isinstance(val[0], dict)):
fields = generate_bigquery_schema(pd.json_normalize(val))
else:
fields = ()

_type = "RECORD" if fields else TYPE_MAPPING.get(dtype.kind)
schema.append(
bigquery.SchemaField(
name=column,
field_type=_type,
mode=mode,
fields=fields,
)
)
return schema
4 changes: 4 additions & 0 deletions app/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ class LoginErrorEnum(str, Enum):
INACTIVE_EMPLOYEE = "inactive_employee"
REQUIRE_2FA = "require_2fa"

class AcceptTermsEnum(str, Enum):
SUCCESS = "success"
FAILURE = "failure"


class AccessErrorEnum(str, Enum):
NOT_FOUND = "NOT_FOUND"
Expand Down
3 changes: 3 additions & 0 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class User(Model):
is_2fa_required = fields.BooleanField(default=False)
is_2fa_activated = fields.BooleanField(default=False)
is_ergon_validation_required = fields.BooleanField(default=False)
# Terms of use
is_use_terms_accepted = fields.BooleanField(default=False)
use_terms_accepted_at = fields.DatetimeField(null=True)
# Metadata
is_active = fields.BooleanField(default=True)
is_superuser = fields.BooleanField(default=False)
Expand Down
24 changes: 12 additions & 12 deletions app/routers/entities_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ async def create_raw_data(
entity_name: Literal["patientrecords", "patientconditions", "encounter"],
_: Annotated[User, Depends(assert_user_has_pipeline_write_permition)],
raw_data: RawDataListModel,
upload_to_datalake: bool = True,
) -> BulkInsertOutputModel:

records = raw_data.dict().get("data_list")
Expand Down Expand Up @@ -56,18 +55,19 @@ async def create_raw_data(
system=data_source.system.value,
entity=entity_name
)
if upload_to_datalake and formatter:
uploader = DatalakeUploader(
dump_mode="append",
force_unique_file_name=True,
if not formatter:
return HTMLResponse(
status_code=500,
content=f"Formatter not found for {entity_name} and {data_source.cnes}"
)
for config, dataframe in apply_formatter(records, formatter).items():
uploader.upload(
dataframe=dataframe,
**convert_model_config_to_dict(config)
)
datalake_status['success'] = True
datalake_status['message'] = "Data uploaded to Datalake"
for config, dataframe in apply_formatter(records, formatter).items():
uploader = DatalakeUploader()
await uploader.upload(
dataframe=dataframe,
config=convert_model_config_to_dict(config)
)
datalake_status['success'] = True
datalake_status['message'] = "Data uploaded to Datalake"
except WrongFormatException as e:
return HTMLResponse(status_code=400, content=f"Invalid Format: {e}")
except Exception as e:
Expand Down
Loading