Skip to content

Commit

Permalink
modified the logic of update_data_to_bin
Browse files Browse the repository at this point in the history
  • Loading branch information
SunsetWolf committed Sep 27, 2023
1 parent 4574d05 commit ab3e6d3
Showing 1 changed file with 19 additions and 57 deletions.
76 changes: 19 additions & 57 deletions scripts/data_collector/yahoo/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,70 +524,31 @@ def __init__(
super(YahooNormalize1dExtend, self).__init__(date_field_name, symbol_field_name)
self._first_close_field = "first_close"
self._ori_close_field = "ori_close"
self.column_list = ["open", "high", "low", "close", "volume", "factor", "change"]
self.old_qlib_data = self._get_old_data(old_qlib_data_dir)

def _get_old_data(self, qlib_data_dir: [str, Path]):
qlib_data_dir = str(Path(qlib_data_dir).expanduser().resolve())
qlib.init(provider_uri=qlib_data_dir, expression_cache=None, dataset_cache=None)
df = D.features(D.instruments("all"), ["$close/$factor", "$adjclose/$close"])
df.columns = [self._ori_close_field, self._first_close_field]
df = D.features(D.instruments("all"), ["$" + col for col in self.column_list])
df.columns = self.column_list
return df

def _get_close(self, df: pd.DataFrame, field_name: str):
_symbol = df.loc[df[self._symbol_field_name].first_valid_index()][self._symbol_field_name].upper()
_df = self.old_qlib_data.loc(axis=0)[_symbol]
_close = _df.loc[_df.last_valid_index()][field_name]
return _close

def _get_first_close(self, df: pd.DataFrame) -> float:
try:
_close = self._get_close(df, field_name=self._first_close_field)
except KeyError:
_close = super(YahooNormalize1dExtend, self)._get_first_close(df)
return _close

def _get_last_close(self, df: pd.DataFrame) -> float:
try:
_close = self._get_close(df, field_name=self._ori_close_field)
except KeyError:
_close = None
return _close

def _get_last_date(self, df: pd.DataFrame) -> pd.Timestamp:
_symbol = df.loc[df[self._symbol_field_name].first_valid_index()][self._symbol_field_name].upper()
try:
_df = self.old_qlib_data.loc(axis=0)[_symbol]
_date = _df.index.max()
except KeyError:
_date = None
return _date

def normalize(self, df: pd.DataFrame) -> pd.DataFrame:
_last_close = self._get_last_close(df)
# reindex
_last_date = self._get_last_date(df)
if _last_date is not None:
df = df.set_index(self._date_field_name)
df.index = pd.to_datetime(df.index)
df = df[~df.index.duplicated(keep="first")]
_max_date = df.index.max()
df = df.reindex(self._calendar_list).loc[:_max_date].reset_index()
df = df[df[self._date_field_name] > _last_date]
if df.empty:
return pd.DataFrame()
_si = df["close"].first_valid_index()
if _si > df.index[0]:
logger.warning(
f"{df.loc[_si][self._symbol_field_name]} missing data: {df.loc[:_si - 1][self._date_field_name].to_list()}"
)
# normalize
df = self.normalize_yahoo(
df, self._calendar_list, self._date_field_name, self._symbol_field_name, last_close=_last_close
)
# adjusted price
df = self.adjusted_price(df)
df = self._manual_adj_data(df)
return df
df = super(YahooNormalize1dExtend, self).normalize(df)
df.set_index(self._date_field_name, inplace=True)
symbol_name = df[self._symbol_field_name].iloc[0]
old_df = self.old_qlib_data.loc[str(symbol_name).upper()]
latest_date = old_df.index[-1]
new_latest_data = df.loc[latest_date]
old_latest_data = old_df.loc[latest_date]
for col in self.column_list[:-1]:
if col == "volume":
df[col] = df[col] / (new_latest_data[col] / old_latest_data[col])
else:
df[col] = df[col] * (old_latest_data[col] / new_latest_data[col])
df = df.loc[self._calendar_list[self._calendar_list.index(latest_date) + 1]:]
return df.reset_index()


class YahooNormalize1min(YahooNormalize, ABC):
Expand Down Expand Up @@ -1019,7 +980,8 @@ def update_data_to_bin(

# download data from yahoo
# NOTE: when downloading data from YahooFinance, max_workers is recommended to be 1
self.download_data(delay=delay, start=trading_date, end=end_date, check_data_length=check_data_length)
trading_date = (pd.Timestamp(trading_date) - pd.Timedelta(days=2)).strftime("%Y-%m-%d")
# self.download_data(delay=delay, start=trading_date, end=end_date, check_data_length=check_data_length)
# NOTE: a larger max_workers setting here would be faster
self.max_workers = (
max(multiprocessing.cpu_count() - 2, 1)
Expand Down

0 comments on commit ab3e6d3

Please sign in to comment.