Skip to content

Commit

Permalink
implemented code #29
Browse files Browse the repository at this point in the history
  • Loading branch information
ouslan committed Oct 9, 2024
1 parent e8a1804 commit bebfe8b
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 43 deletions.
102 changes: 59 additions & 43 deletions notebooks/comtrade.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,68 @@ os.chdir("..")
import polars as pl
import pandas as pd
import time
import requests
from stem import Signal
from stem.control import Controller
import comtradeapicall
from src.jp_imports.data_process import DataProcess
d = DataProcess("data/")
from src.jp_imports.data_pull import DataPull
```

```{python}
mydf = comtradeapicall.previewFinalData(typeCode='C', freqCode='M', clCode='HS', period='202403',
reporterCode='', cmdCode=','.join(chunks[0]), flowCode='Xx', partnerCode='584',
partner2Code=None,
customsCode=None, motCode=None, maxRecords=500, format_output='JSON',
aggregateBy=None, breakdownMode=None, countOnly=None, includeDesc=True)
DataPull("data/").pull_comtrade_data(end_year=2024, start_year=2024, exports=True, state_iso="584", chunk_size=50, debug=True)
```

if mydf == None:
print("hit rate limit")
```{python}
codes = pl.read_json("data/external/hs.json").to_series().to_list() # Load the list of HS
codes
```

```{python}
time = "yearly"
types = "hs"
df = d.process_int_org(time, types, False).collect()
codes = df.select(pl.col("hs").str.slice(0, 4)).unique().sort(by="hs").to_series().to_list()
len(codes)
# devide it in chunks of 20
chunks = [codes[i:i+50] for i in range(0, len(codes), 20)]
len(chunks)
time.sleep(6)
pritn(f"Total chunks: {len(chunks)}")
```
import comtradeapicall
mydf = comtradeapicall._previewFinalData(typeCode='C', freqCode='M', clCode='HS', period='202205',
reporterCode='36', cmdCode='91', flowCode='M', partnerCode=None,
partner2Code=None,
customsCode=None, motCode=None, maxRecords=500, format_output='JSON',
aggregateBy=None, breakdownMode='classic', countOnly=None, includeDesc=True)
mydf
```
```{python}
from stem import Signal
from stem.control import Controller
import polars as pl
# Function to create a new Tor circuit
def renew_tor_ip():
with Controller.from_port(port=9051) as controller:
controller.authenticate(password="Ouslan2045")
controller.signal(Signal.NEWNYM)
# Set up Privoxy as the HTTP proxy
proxy_url = 'http://127.0.0.1:8118'
chunks = [codes[i:i + 50] for i in range(0, len(codes), 50)] # Ensure correct chunk size
empty_df = [
pl.Series("refYear", [], dtype=pl.String),
pl.Series("refMonth", [], dtype=pl.String),
pl.Series("reporterCode", [], dtype=pl.String),
pl.Series("reporterDesc", [], dtype=pl.String),
pl.Series("flowCode", [], dtype=pl.String),
pl.Series("flowDesc", [], dtype=pl.String),
pl.Series("partnerDesc", [], dtype=pl.String),
pl.Series("classificationCode", [], dtype=pl.String),
pl.Series("cmdCode", [], dtype=pl.String),
pl.Series("cmdDesc", [], dtype=pl.String),
pl.Series("cifvalue", [], dtype=pl.String),
pl.Series("fobvalue", [], dtype=pl.String),
pl.Series("primaryValue", [], dtype=pl.String),
pl.Series("netWgt", [], dtype=pl.String),
pl.Series("refYear", [], dtype=pl.String),
pl.Series("refMonth", [], dtype=pl.String),
pl.Series("reporterCode", [], dtype=pl.String),
pl.Series("reporterDesc", [], dtype=pl.String),
pl.Series("flowCode", [], dtype=pl.String),
pl.Series("flowDesc", [], dtype=pl.String),
pl.Series("partnerDesc", [], dtype=pl.String),
pl.Series("classificationCode", [], dtype=pl.String),
pl.Series("cmdCode", [], dtype=pl.String),
pl.Series("cmdDesc", [], dtype=pl.String),
pl.Series("cifvalue", [], dtype=pl.String),
pl.Series("fobvalue", [], dtype=pl.String),
pl.Series("primaryValue", [], dtype=pl.String),
pl.Series("netWgt", [], dtype=pl.String),
]
master_df = pl.DataFrame(empty_df)
for year in range(2010, 2025):
for month in range(1, 13):
if year == 2024 and month >= 10:
Expand All @@ -86,31 +100,33 @@ for year in range(2010, 2025):
aggregateBy=None,
breakdownMode=None,
countOnly=None,
includeDesc=True
includeDesc=True,
proxy_url=proxy_url
)
# Check if mydf is None
if mydf is None:
print("Rate limit reached. Waiting to try again...")
time.sleep(60)
print("Rate limit reached. Renewing Tor IP and trying again...")
renew_tor_ip() # Renew Tor circuit
continue
# Check if mydf is an empty DataFrame
elif mydf.empty:
print(f"No data returned for {year}-{month}, chunk: {chunk}")
break
# Check if mydf has 500 rows
elif len(mydf) == 500:
print(f"Error: {year}-{month} {chunk}, {len(mydf)} rows")
break
print(f"Error: {year}-{month} {chunk} returned 500 rows. Renewing Tor IP...")
renew_tor_ip() # Renew Tor circuit on error
continue
else:
break
# Process mydf if it has rows
if len(mydf) > 0:
mydf = mydf[["refYear", "refMonth", "reporterCode", "reporterDesc", "flowCode", "flowDesc", "partnerDesc", "classificationCode", "cmdCode", "cmdDesc", "cifvalue", "fobvalue", 'primaryValue', 'netWgt']]
tmp = pl.from_pandas(mydf).cast(pl.String)
master_df = pl.concat([master_df, tmp], how="vertical")
print(f"Processed {year}-{str(month).zfill(2)}, {len(tmp)} rows")
master_df.to_csv("data/master_df.csv")
```

```{python}
master_df.write_csv("master_df.csv")
```
128 changes: 128 additions & 0 deletions src/jp_imports/data_pull.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from stem.control import Controller
from dotenv import load_dotenv
from datetime import datetime
from stem import Signal
from tqdm import tqdm
import comtradeapicall
import polars as pl
import requests
import zipfile
Expand Down Expand Up @@ -212,6 +216,130 @@ def pull_census_naics(self, end_year:int, start_year:int, exports:bool, state:st

census_df.write_parquet(saving_path)

def pull_comtrade_data(self, end_year:int, start_year:int, exports:bool, state_iso:str, chunk_size:int=50, debug:bool=False) -> None:
"""
Pulls data from the UN Comtrade and saves them in a parquet file.
Parameters
----------
end_year: int
The last year to pull data from.
start_year: int
The first year to pull data from.
exports: bool
If True, pulls exports data. If False, pulls imports data.
state_iso: str
The ISO code number of the state to pull data from (e.g. "630" for Puerto Rico).
chunk_size: int
The size of the chunk of HS codes to pull from the UN Comtrade API.
Returns
-------
None
"""
proxy_url = os.getenv("PROXY_URL")
codes = pl.read_json(self.saving_dir + "external/hs.json").to_series().to_list() # TODO: Change to get codes from DB
chunks = [codes[i:i + chunk_size] for i in range(0, len(codes), chunk_size)]
if exports:
flowCode = "X"
else:
flowCode = "M"

empty_df = [
pl.Series("refYear", [], dtype=pl.String),
pl.Series("refMonth", [], dtype=pl.String),
pl.Series("reporterCode", [], dtype=pl.String),
pl.Series("reporterDesc", [], dtype=pl.String),
pl.Series("flowCode", [], dtype=pl.String),
pl.Series("flowDesc", [], dtype=pl.String),
pl.Series("partnerDesc", [], dtype=pl.String),
pl.Series("classificationCode", [], dtype=pl.String),
pl.Series("cmdCode", [], dtype=pl.String),
pl.Series("cmdDesc", [], dtype=pl.String),
pl.Series("cifvalue", [], dtype=pl.String),
pl.Series("fobvalue", [], dtype=pl.String),
pl.Series("primaryValue", [], dtype=pl.String),
pl.Series("netWgt", [], dtype=pl.String),
]

un_comtrade_df = pl.DataFrame(empty_df)

for year in range(start_year, end_year + 1):
for month in range(1, 13):
if year == datetime.now().year and month > datetime.now().month:
break
for chunk in chunks:
while True:
tmp_df = comtradeapicall.previewFinalData(
typeCode='C',
freqCode='M',
clCode='HS',
period=f'{year}{str(month).zfill(2)}',
reporterCode='',
cmdCode=','.join(chunk),
flowCode=flowCode,
partnerCode=state_iso,
partner2Code=None,
customsCode=None,
motCode=None,
maxRecords=500,
format_output='JSON',
aggregateBy=None,
breakdownMode=None,
countOnly=None,
includeDesc=True,
proxy_url=proxy_url # Pass the HTTP proxy URL
)

if tmp_df is None:
if debug:
print("\033[0;31mERROR: \033[0m Rate limit reached. Renewing TOR IP address.")
self.renew_tor() # WARNING: This requires the TOR service to be running & privoxy/torsocks to be installed
continue
elif tmp_df.empty:
if debug:
print(f"\033[0;32mINFO: \033[0m No data found for {year}-{str(month).zfill(2)} in chunk {chunk}.")
break
elif len(tmp_df) == 500:
if debug:
print(f"\033[0;33mWARNING: \033[0m The chunk {chunk} has 500 records.")
break
else:
break

if len(tmp_df) > 0:
tmp_df = tmp_df[["refYear", "refMonth", "reporterCode", "reporterDesc", "flowCode", "flowDesc", "partnerDesc", "classificationCode", "cmdCode", "cmdDesc", "cifvalue", "fobvalue", 'primaryValue', 'netWgt']]
tmp_df = pl.from_pandas(tmp_df).cast(pl.String)

if isinstance(un_comtrade_df, pl.DataFrame) and isinstance(tmp_df, pl.DataFrame):
un_comtrade_df = pl.concat([un_comtrade_df, tmp_df], how="vertical")
else:
AttributeError("The dataframes are not of the correct type.")

if debug:
print(f"\033[0;32mINFO: \033[0m Data pulled for {year}-{str(month).zfill(2)}, inserted {len(tmp_df)} records.")

un_comtrade_df.write_parquet(f"{self.saving_dir}/raw/un_comtrade_{state_iso}_{flowCode}.parquet")




def renew_tor(self) -> None:
"""
Renews the TOR IP address.
Parameters
----------
None
Returns
-------
None
"""
with Controller.from_port(port=9051) as controller:
controller.authenticate(password=os.getenv("TOR_PASSWORD"))
controller.signal(Signal.NEWNYM) # INFO: Pyright is giving an error here, but it works

def pull_file(self, url:str, filename:str, verify:bool=True) -> None:
"""
Pulls a file from a URL and saves it in the filename. Used by the class to pull external files.
Expand Down

0 comments on commit bebfe8b

Please sign in to comment.