-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpostgre_insert_velocidade_contratada.py
29 lines (21 loc) · 1.22 KB
/
postgre_insert_velocidade_contratada.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from postgre_functions import chunk, insert
import pandas as pd
from config import PARQUET_PATH
# Endereço localizado no arquivo config.py
parquet_path = PARQUET_PATH
if __name__ == "__main__":
try:
print("Reading parquet")
# Leitura arquivo parquet e conversao para dataframe pandas
df_velocidade_contratada = pd.read_parquet(f"{parquet_path}/velocidade_contratada")
df_velocidade_contratada['velocidade_contratada_mbps'] = df_velocidade_contratada['velocidade_contratada_mbps'].astype(float, errors = 'raise')
print(">> Parquet read and converted to pandas dataframe")
print(df_velocidade_contratada)
print(df_velocidade_contratada.info())
# Fatiamento dos dados em blocos para insercao
chunked_velocidade_contratada = chunk(df_velocidade_contratada, 50000)
# Insercao no Postgre
insert(chunked_velocidade_contratada, f"INSERT INTO velocidade_contratada (ano, mes, razao_social, cnpj, velocidade_contratada_mbps, uf, municipio, codigo_ibge, acessos, tipo, municipio_uf) values %s;")
print("Ingested data into Postgre")
except Exception as e:
print(str(e))