Este repositório contém a implementação de uma pipeline ETL utilizando a biblioteca Pandera para validação de contratos de dados em forma de DataFrame. O material foi apresentado em uma live no Youtube no canal do Luciano Galvão Filho.
Os dados são extraídos de um arquivo CSV local, transformados e inseridos em um banco de dados PostgreSQL. Durante o processo, os dados passam por validação em duas etapas: a primeira ocorre ao serem lidos diretamente do arquivo CSV, e a segunda é realizada após a transformação dos dados.
graph TD;
A[Extrai Dados do CSV] -->|Validação de Entrada| B[Calcula Novas Métricas]
B -->|Validação de Saída| C[Carrega Dados no PostgreSQL]
Adicionalmente, foram implementados testes unitários para as validações dos Schemas utilizando pytest, integrados com uma rotina de CI utilizando GitHub Actions para todas as pull requests.
O servidor do banco de dados PostgreSQL, assim como o client PGAdmin, foram criados localmente utilizando Docker.
- Contexto
- Pandera
- Testes com Pytest
- CI com GitHub Actions
- Instruções para executar este projeto localmente
O arquivo data/dados_financeiros.csv
contém o resultado operacional de uma empresa que possui três setores diferentes: vendas, manutenção e reparação. O objetivo é criar uma ETL que leia esses dados, calcule algumas métricas operacionais e carregue os dados em um banco PostgreSQL. Durante o processo, os dados devem seguir a seguinte regra de negócio:
- Dados ao entrar:
Nome da Coluna | Tipo de Dados | Restrições |
---|---|---|
setor_da_empresa | string | deve iniciar com 'VND_', 'REP_' ou 'MNT_' |
receita_operacional | float | maior ou igual a 0 |
data | DateTime | Nenhuma |
percentual_de_imposto | float | entre 0 e 1 |
custo_operacionais | float | maior ou igual a 0 |
- Dados após transformação:
Nome da Coluna | Tipo de Dados | Restrições |
---|---|---|
setor_da_empresa | string | deve iniciar com 'VND_', 'REP_' ou 'MNT_' |
receita_operacional | float | maior ou igual a 0 |
data | DateTime | Nenhuma |
percentual_de_imposto | float | entre 0 e 1 |
custo_operacionais | float | maior ou igual a 0 |
valor_do_imposto | float | maior ou igual a 0 |
custo_total | float | maior ou igual a 0 |
receita_liquida | float | maior ou igual a 0 |
margem_operacional | float | maior ou igual a 0 |
transformado_em | DateTime (opcional) | Nenhuma |
"Data validation for scientists, engineers, and analysts seeking correctness."
Pandera é um projeto open source da Union.ai que oferece uma API flexível e expressiva para realizar validação de dados em objetos semelhantes a dataframes, tornando os pipelines de processamento de dados mais legíveis e robustos.
Integrações:
pip install 'pandera[hypotheses]' # verificações de hipóteses
pip install 'pandera[io]' # utilitários de IO para esquemas yaml/script
pip install 'pandera[strategies]' # estratégias de síntese de dados
pip install 'pandera[mypy]' # habilita a verificação estática de tipos do pandas
pip install 'pandera[fastapi]' # integração com FastAPI
pip install 'pandera[dask]' # validação de dataframes Dask
pip install 'pandera[pyspark]' # validação de dataframes PySpark
pip install 'pandera[modin]' # validação de dataframes Modin
pip install 'pandera[modin-ray]' # validação de dataframes Modin com Ray
pip install 'pandera[modin-dask]' # validação de dataframes Modin com Dask
pip install 'pandera[geopandas]' # validação de geodataframes GeoPandas
pip install 'pandera[polars]' # validação de dataframes Polars
Abaixo segue o Schema do contrato de dados utilizado no projeto na entrada dos dados.
import pandera as pa
import pandas as pd
from pandera.typing import Series
class MetricasFinanceirasBase(pa.DataFrameModel):
setor_da_empresa: Series[str]
receita_operacional: Series[float] = pa.Field(ge=0)
data: Series[pa.DateTime]
percentual_de_imposto: Series[float] = pa.Field(in_range= {"min_value": 0, "max_value": 1})
custo_operacionais: Series[float] = pa.Field(ge=0)
class Config:
strict = True
coerce = True
@pa.check(
"setor_da_empresa", # <--- Coluna que vai receber a checagem customizada
name = "Checagem código dos setores",
error = "Cógido do setor da empresa é inválido")
def checa_codigo_setor(cls, codigo: Series[str]) -> Series[bool]:
return codigo.str[:4].isin(['REP_', 'MNT_', 'VND_'])
-
A classe
MetricasFinanceirasBase
herda depa.DataFrameModel
e define o schema com as colunas/índices sendo atributos de classes. -
pa.Field contém os argumentos nativos da biblioteca e representa características específicas de cada coluna/índice. A lista completa está em Check.
-
Com o decorator @pa.check podemos criar checagens customizadas.
-
Em class Config podemos estabelecer opções de esquema geral.
coerce
define se a classe, ao validar, vai tentar converter a Coluna para o tipo de dados estabelecido.strict
estabelece que a validação não aceita colunas adicionais além das que estão na classe.
-
Lista dos tipos de dados que o pandera aceita.
Abaixo o modelo criado para o Schema do contrato de dados após sua transformação.
class MetricasFinanceirasOut(MetricasFinanceirasBase):
valor_do_imposto: Series[float] = pa.Field(ge=0)
custo_total: Series[float] = pa.Field(ge=0)
receita_liquida: Series[float] = pa.Field(ge=0)
margem_operacional: Series[float] = pa.Field(ge=0)
transformado_em: Optional[pa.DateTime]
@pa.dataframe_check
def checa_margem_operacional(cls, df:pd.DataFrame) -> Series[bool]:
return df["margem_operacional"] == (df["receita_liquida"] / df["receita_operacional"])
-
Sendo apenas uma extensão dos dados de entrada, a classe
MetricasFinanceirasOut
herda da classeMetricasFinanceirasBase
. -
O decorador
@pa.dataframe_check
é utilizado para criarmos checagens a nível do dataframe. Esta abordagem difere do decorador@pa.check
, que chega a nível de colunas isoladas.
-
Com Decoradores:
@pa.check_input(<CONTRATO_ENTRADA>)
: Checa os dados na entrada da função.@pa.check_output(<CONTRATO_SAÍDA>)
: Checa os dados na saída da função.@pa.check_io(df1 =<CONTRATO_ENTRADA> , df2 = <CONTRATO_ENTRADA>, output = <CONTRATO_SAÍDA>)
: Checa os dados na entrada e na saida da função.
-
Chamando diretamente o método
.validate()
da classe criada e passando o dataframe a ser validado como argumento.
MetricasFinanceirasBase.validate(df)
O framework pytest facilita a escrita de testes pequenos e legíveis, e pode escalar para suportar testes funcionais complexos para aplicações e bibliotecas.
A estrutura utilizada para criarmos testes unitários dos contratos de dados criados com o Pandera foi:
def test_<NOME_DO_TESTE>():
df_test = pd.DataFrame(
{<ESTRUTURA-DO-DF>}
)
MetricasFinanceirasBase.validate(df_test)
# ou
with pytest.raises(pa.errors.SchemaError):
MetricasFinanceirasBase.validate(df_test) # Para quebra do contrato de dados
Para rodar os testes:
pytest test/<ARQUIVO-DO-TESTE> -v
A flag -v
serve para vermos todos os resultados dos testes no terminal.
Para criar uma rotina de Integração Contínua (CI) utilizando GitHub Actions, você deve criar um arquivo ci.yaml
dentro da pasta .github/workflows
.
Para ativar o merge de novas pull requests apenas se passar nos testes, siga os passos abaixo:
- Vá para Configurações.
- Selecione Branches no menu lateral esquerdo.
- Clique em Adicionar regra de proteção de branch.
- Escolha o nome da regra (qualquer um que você desejar).
- Marque a caixa para Exigir que verificações de status sejam aprovadas.
- Marque a caixa para Exigir que os branches estejam atualizados antes da fusão.
- Clique em Adicionar verificações.
- Escolha o nome da verificação.
- Salve as alterações.
Os passos abaixo foram executados para um terminal do tipo bash
.
- Clone o repositório localmente:
git clone https://github.com/lealre/pandera-aovivo.git
- Acesse a pasta:
cd pandera-aovivo
- Crie um ambiente virtual:
python -m venv .venv
- Ative o ambiente virtual:
source .venv/bin/activate
- Baixe as dependências:
pip install -r requirements.txt
- Crie o banco de dados com o Docker:
docker compose up -d
Para acessar o banco de dados:
-
Entre no link do localhostem seu navegador:
http://localhost:8888/
-
Acesse o PGAdmin com as credencias estipuladas no arquivo
docker-compose.yaml
:
Username
: [email protected]
Password
: pgadmin
- Defina a senha mestra (ao acessar pela primeira vez).
- Clique com o botão direito no servidor para conectar o pgAdmin ao banco de dados.
- Defina o nome do servidor (pode ser qualquer nome que você queira).
- Conecte-se ao banco de dados usando as credenciais definidas no arquivo
docker-compose.yaml
.
- Nome do host:
db
- Senha:
postgres