diff --git a/notebooks/comtrade.qmd b/notebooks/comtrade.qmd index 0dedcdf..66af125 100644 --- a/notebooks/comtrade.qmd +++ b/notebooks/comtrade.qmd @@ -15,54 +15,68 @@ os.chdir("..") import polars as pl import pandas as pd import time -import requests +from stem import Signal +from stem.control import Controller import comtradeapicall from src.jp_imports.data_process import DataProcess -d = DataProcess("data/") +from src.jp_imports.data_pull import DataPull ``` ```{python} -mydf = comtradeapicall.previewFinalData(typeCode='C', freqCode='M', clCode='HS', period='202403', - reporterCode='', cmdCode=','.join(chunks[0]), flowCode='Xx', partnerCode='584', - partner2Code=None, - customsCode=None, motCode=None, maxRecords=500, format_output='JSON', - aggregateBy=None, breakdownMode=None, countOnly=None, includeDesc=True) +DataPull("data/").pull_comtrade_data(end_year=2024, start_year=2024, exports=True, state_iso="584", chunk_size=50, debug=True) +``` -if mydf == None: - print("hit rate limit") +```{python} +codes = pl.read_json("data/external/hs.json").to_series().to_list() # Load the list of HS +codes ``` ```{python} -time = "yearly" -types = "hs" -df = d.process_int_org(time, types, False).collect() -codes = df.select(pl.col("hs").str.slice(0, 4)).unique().sort(by="hs").to_series().to_list() -len(codes) -# devide it in chunks of 20 -chunks = [codes[i:i+50] for i in range(0, len(codes), 20)] -len(chunks) -time.sleep(6) -pritn(f"Total chunks: {len(chunks)}") -``` +import comtradeapicall +mydf = comtradeapicall._previewFinalData(typeCode='C', freqCode='M', clCode='HS', period='202205', + reporterCode='36', cmdCode='91', flowCode='M', partnerCode=None, + partner2Code=None, + customsCode=None, motCode=None, maxRecords=500, format_output='JSON', + aggregateBy=None, breakdownMode='classic', countOnly=None, includeDesc=True) + +mydf +``` ```{python} +from stem import Signal +from stem.control import Controller +import polars as pl + +# Function to create a new Tor circuit +def renew_tor_ip(): + with Controller.from_port(port=9051) as controller: + controller.authenticate(password="Ouslan2045") + controller.signal(Signal.NEWNYM) + +# Set up Privoxy as the HTTP proxy +proxy_url = 'http://127.0.0.1:8118' + +chunks = [codes[i:i + 50] for i in range(0, len(codes), 50)] # Ensure correct chunk size + empty_df = [ - pl.Series("refYear", [], dtype=pl.String), - pl.Series("refMonth", [], dtype=pl.String), - pl.Series("reporterCode", [], dtype=pl.String), - pl.Series("reporterDesc", [], dtype=pl.String), - pl.Series("flowCode", [], dtype=pl.String), - pl.Series("flowDesc", [], dtype=pl.String), - pl.Series("partnerDesc", [], dtype=pl.String), - pl.Series("classificationCode", [], dtype=pl.String), - pl.Series("cmdCode", [], dtype=pl.String), - pl.Series("cmdDesc", [], dtype=pl.String), - pl.Series("cifvalue", [], dtype=pl.String), - pl.Series("fobvalue", [], dtype=pl.String), - pl.Series("primaryValue", [], dtype=pl.String), - pl.Series("netWgt", [], dtype=pl.String), + pl.Series("refYear", [], dtype=pl.String), + pl.Series("refMonth", [], dtype=pl.String), + pl.Series("reporterCode", [], dtype=pl.String), + pl.Series("reporterDesc", [], dtype=pl.String), + pl.Series("flowCode", [], dtype=pl.String), + pl.Series("flowDesc", [], dtype=pl.String), + pl.Series("partnerDesc", [], dtype=pl.String), + pl.Series("classificationCode", [], dtype=pl.String), + pl.Series("cmdCode", [], dtype=pl.String), + pl.Series("cmdDesc", [], dtype=pl.String), + pl.Series("cifvalue", [], dtype=pl.String), + pl.Series("fobvalue", [], dtype=pl.String), + pl.Series("primaryValue", [], dtype=pl.String), + pl.Series("netWgt", [], dtype=pl.String), ] + master_df = pl.DataFrame(empty_df) + for year in range(2010, 2025): for month in range(1, 13): if year == 2024 and month >= 10: @@ -86,26 +100,24 @@ for year in range(2010, 2025): aggregateBy=None, breakdownMode=None, countOnly=None, - includeDesc=True + includeDesc=True, + proxy_url=proxy_url ) - # Check if mydf is None if mydf is None: - print("Rate limit reached. Waiting to try again...") - time.sleep(60) + print("Rate limit reached. Renewing Tor IP and trying again...") + renew_tor_ip() # Renew Tor circuit continue - # Check if mydf is an empty DataFrame elif mydf.empty: print(f"No data returned for {year}-{month}, chunk: {chunk}") break - # Check if mydf has 500 rows elif len(mydf) == 500: - print(f"Error: {year}-{month} {chunk}, {len(mydf)} rows") - break + print(f"Error: {year}-{month} {chunk} returned 500 rows. Renewing Tor IP...") + renew_tor_ip() # Renew Tor circuit on error + continue else: break - # Process mydf if it has rows if len(mydf) > 0: mydf = mydf[["refYear", "refMonth", "reporterCode", "reporterDesc", "flowCode", "flowDesc", "partnerDesc", "classificationCode", "cmdCode", "cmdDesc", "cifvalue", "fobvalue", 'primaryValue', 'netWgt']] tmp = pl.from_pandas(mydf).cast(pl.String) @@ -113,4 +125,8 @@ for year in range(2010, 2025): print(f"Processed {year}-{str(month).zfill(2)}, {len(tmp)} rows") master_df.to_csv("data/master_df.csv") +``` + +```{python} +master_df.write_csv("master_df.csv") ``` \ No newline at end of file diff --git a/src/jp_imports/data_pull.py b/src/jp_imports/data_pull.py index 613b2ea..a0f5b72 100644 --- a/src/jp_imports/data_pull.py +++ b/src/jp_imports/data_pull.py @@ -1,5 +1,9 @@ +from stem.control import Controller from dotenv import load_dotenv +from datetime import datetime +from stem import Signal from tqdm import tqdm +import comtradeapicall import polars as pl import requests import zipfile @@ -212,6 +216,130 @@ def pull_census_naics(self, end_year:int, start_year:int, exports:bool, state:st census_df.write_parquet(saving_path) + def pull_comtrade_data(self, end_year:int, start_year:int, exports:bool, state_iso:str, chunk_size:int=50, debug:bool=False) -> None: + """ + Pulls data from the UN Comtrade and saves them in a parquet file. + + Parameters + ---------- + end_year: int + The last year to pull data from. + start_year: int + The first year to pull data from. + exports: bool + If True, pulls exports data. If False, pulls imports data. + state_iso: str + The ISO code number of the state to pull data from (e.g. "630" for Puerto Rico). + chunk_size: int + The size of the chunk of HS codes to pull from the UN Comtrade API. + + Returns + ------- + None + """ + proxy_url = os.getenv("PROXY_URL") + codes = pl.read_json(self.saving_dir + "external/hs.json").to_series().to_list() # TODO: Change to get codes from DB + chunks = [codes[i:i + chunk_size] for i in range(0, len(codes), chunk_size)] + if exports: + flowCode = "X" + else: + flowCode = "M" + + empty_df = [ + pl.Series("refYear", [], dtype=pl.String), + pl.Series("refMonth", [], dtype=pl.String), + pl.Series("reporterCode", [], dtype=pl.String), + pl.Series("reporterDesc", [], dtype=pl.String), + pl.Series("flowCode", [], dtype=pl.String), + pl.Series("flowDesc", [], dtype=pl.String), + pl.Series("partnerDesc", [], dtype=pl.String), + pl.Series("classificationCode", [], dtype=pl.String), + pl.Series("cmdCode", [], dtype=pl.String), + pl.Series("cmdDesc", [], dtype=pl.String), + pl.Series("cifvalue", [], dtype=pl.String), + pl.Series("fobvalue", [], dtype=pl.String), + pl.Series("primaryValue", [], dtype=pl.String), + pl.Series("netWgt", [], dtype=pl.String), + ] + + un_comtrade_df = pl.DataFrame(empty_df) + + for year in range(start_year, end_year + 1): + for month in range(1, 13): + if year == datetime.now().year and month > datetime.now().month: + break + for chunk in chunks: + while True: + tmp_df = comtradeapicall.previewFinalData( + typeCode='C', + freqCode='M', + clCode='HS', + period=f'{year}{str(month).zfill(2)}', + reporterCode='', + cmdCode=','.join(chunk), + flowCode=flowCode, + partnerCode=state_iso, + partner2Code=None, + customsCode=None, + motCode=None, + maxRecords=500, + format_output='JSON', + aggregateBy=None, + breakdownMode=None, + countOnly=None, + includeDesc=True, + proxy_url=proxy_url # Pass the HTTP proxy URL + ) + + if tmp_df is None: + if debug: + print("\033[0;31mERROR: \033[0m Rate limit reached. Renewing TOR IP address.") + self.renew_tor() # WARNING: This requires the TOR service to be running & privoxy/torsocks to be installed + continue + elif tmp_df.empty: + if debug: + print(f"\033[0;32mINFO: \033[0m No data found for {year}-{str(month).zfill(2)} in chunk {chunk}.") + break + elif len(tmp_df) == 500: + if debug: + print(f"\033[0;33mWARNING: \033[0m The chunk {chunk} has 500 records.") + break + else: + break + + if len(tmp_df) > 0: + tmp_df = tmp_df[["refYear", "refMonth", "reporterCode", "reporterDesc", "flowCode", "flowDesc", "partnerDesc", "classificationCode", "cmdCode", "cmdDesc", "cifvalue", "fobvalue", 'primaryValue', 'netWgt']] + tmp_df = pl.from_pandas(tmp_df).cast(pl.String) + + if isinstance(un_comtrade_df, pl.DataFrame) and isinstance(tmp_df, pl.DataFrame): + un_comtrade_df = pl.concat([un_comtrade_df, tmp_df], how="vertical") + else: + AttributeError("The dataframes are not of the correct type.") + + if debug: + print(f"\033[0;32mINFO: \033[0m Data pulled for {year}-{str(month).zfill(2)}, inserted {len(tmp_df)} records.") + + un_comtrade_df.write_parquet(f"{self.saving_dir}/raw/un_comtrade_{state_iso}_{flowCode}.parquet") + + + + + def renew_tor(self) -> None: + """ + Renews the TOR IP address. + + Parameters + ---------- + None + + Returns + ------- + None + """ + with Controller.from_port(port=9051) as controller: + controller.authenticate(password=os.getenv("TOR_PASSWORD")) + controller.signal(Signal.NEWNYM) # INFO: Pyright is giving an error here, but it works + def pull_file(self, url:str, filename:str, verify:bool=True) -> None: """ Pulls a file from a URL and saves it in the filename. Used by the class to pull external files.