Skip to content

Commit

Permalink
add pit backend: FilePITStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
John Lyu committed Oct 26, 2023
1 parent a144bc9 commit 3ed3f17
Show file tree
Hide file tree
Showing 4 changed files with 463 additions and 54 deletions.
34 changes: 20 additions & 14 deletions qlib/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ class ProviderBackendMixin:

def get_default_backend(self):
backend = {}
provider_name: str = re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2]
if hasattr(self, "provider_name"):
provider_name = getattr(self, "provider_name")
else:
provider_name: str = re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2]
# set default storage class
backend.setdefault("class", f"File{provider_name}Storage")
# set default storage module
Expand Down Expand Up @@ -335,6 +338,10 @@ def feature(self, instrument, field, start_time, end_time, freq):


class PITProvider(abc.ABC):
@property
def provider_name(self):
return "PIT"

@abc.abstractmethod
def period_feature(
self,
Expand Down Expand Up @@ -741,10 +748,15 @@ def feature(self, instrument, field, start_index, end_index, freq):
return self.backend_obj(instrument=instrument, field=field, freq=freq)[start_index : end_index + 1]


class LocalPITProvider(PITProvider):
class LocalPITProvider(PITProvider, ProviderBackendMixin):
# TODO: Add PIT backend file storage
# NOTE: This class is not multi-threading-safe!!!!

def __init__(self, remote=False, backend={}):
super().__init__()
self.remote = remote
self.backend = backend

def period_feature(self, instrument, field, start_offset, end_offset, cur_time, period=None, start_time=None):
"""get raw data from PIT
we have 3 modes to query data from PIT, all method need current datetime
Expand All @@ -764,17 +776,11 @@ def period_feature(self, instrument, field, start_offset, end_offset, cur_time,

assert end_offset <= 0 # PIT don't support querying future data

DATA_RECORDS = [
("date", C.pit_record_type["date"]),
("period", C.pit_record_type["period"]),
("value", C.pit_record_type["value"]),
("_next", C.pit_record_type["index"]),
]
VALUE_DTYPE = C.pit_record_type["value"]

field = str(field).lower()[2:]
instrument = code_to_fname(instrument)

backend_obj = self.backend_obj(instrument=instrument, field=field)

# {For acceleration
# start_index, end_index, cur_index = kwargs["info"]
# if cur_index == start_index:
Expand Down Expand Up @@ -803,8 +809,8 @@ def period_feature(self, instrument, field, start_offset, end_offset, cur_time,
## so we cannot findout the offset by given date
## stop using index in this version
# start_point = get_pitdata_offset(index_path, period, )
data = np.fromfile(data_path, dtype=DATA_RECORDS)
df = pd.DataFrame(data, columns=[i[0] for i in DATA_RECORDS])
data = backend_obj.np_data()
df = pd.DataFrame(data)
df.sort_values(by=["date", "period"], inplace=True)
df["date"] = pd.to_datetime(df["date"].astype(str))
H["f"][key] = df
Expand All @@ -823,7 +829,7 @@ def period_feature(self, instrument, field, start_offset, end_offset, cur_time,
df_sim = df[s_sign].drop_duplicates(subset=["date"], keep="last")
s_part = df_sim.set_index("date")[start_time:]["value"]
if s_part.empty:
return pd.Series(dtype=VALUE_DTYPE)
return pd.Series(index=backend_obj.columns, dtype="float64")
if start_time != s_part.index[0] and start_time >= df["date"].iloc[0]:
# add previous value to result to avoid nan in the first period
pre_value = pd.Series(df[df["date"] < start_time]["value"].iloc[-1], index=[start_time])
Expand All @@ -832,7 +838,7 @@ def period_feature(self, instrument, field, start_offset, end_offset, cur_time,
else:
df_remain = df[(df["date"] <= cur_time)]
if df_remain.empty:
return pd.Series(dtype=VALUE_DTYPE)
return pd.Series(index=backend_obj.columns, dtype="float64")
last_observe_date = df_remain["date"].iloc[-1]
# keep only the latest period value
df_remain = df_remain.sort_values(by=["period"]).drop_duplicates(subset=["period"], keep="last")
Expand Down
209 changes: 202 additions & 7 deletions qlib/data/storage/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@

import numpy as np
import pandas as pd
from qlib.data.storage.storage import PITStorage

from qlib.utils.time import Freq
from qlib.utils.resam import resam_calendar
from qlib.config import C
from qlib.data.cache import H
from qlib.log import get_module_logger
from qlib.data.storage import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstKT, InstVT
from qlib.data.storage import (
CalendarStorage,
InstrumentStorage,
FeatureStorage,
CalVT,
InstKT,
InstVT,
)

logger = get_module_logger("file_storage")

Expand Down Expand Up @@ -48,7 +56,10 @@ def support_freq(self) -> List[str]:
if len(self.provider_uri) == 1 and C.DEFAULT_FREQ in self.provider_uri:
freq_l = filter(
lambda _freq: not _freq.endswith("_future"),
map(lambda x: x.stem, self.dpm.get_data_uri(C.DEFAULT_FREQ).joinpath("calendars").glob("*.txt")),
map(
lambda x: x.stem,
self.dpm.get_data_uri(C.DEFAULT_FREQ).joinpath("calendars").glob("*.txt"),
),
)
else:
freq_l = self.provider_uri.keys()
Expand Down Expand Up @@ -140,7 +151,10 @@ def data(self) -> List[CalVT]:
_calendar = self._read_calendar()
if Freq(self._freq_file) != Freq(self.freq):
_calendar = resam_calendar(
np.array(list(map(pd.Timestamp, _calendar))), self._freq_file, self.freq, self.region
np.array(list(map(pd.Timestamp, _calendar))),
self._freq_file,
self.freq,
self.region,
)
return _calendar

Expand Down Expand Up @@ -287,6 +301,7 @@ def __init__(self, instrument: str, field: str, freq: str, provider_uri: dict =
super(FileFeatureStorage, self).__init__(instrument, field, freq, **kwargs)
self._provider_uri = None if provider_uri is None else C.DataPathManager.format_provider_uri(provider_uri)
self.file_name = f"{instrument.lower()}/{field.lower()}.{freq.lower()}.bin"
self._start_index = None

def clear(self):
with self.uri.open("wb") as _:
Expand All @@ -303,6 +318,7 @@ def write(self, data_array: Union[List, np.ndarray], index: int = None) -> None:
"if you need to clear the FeatureStorage, please execute: FeatureStorage.clear"
)
return
self._start_index = None
if not self.uri.exists():
# write
index = 0 if index is None else index
Expand All @@ -320,7 +336,9 @@ def write(self, data_array: Union[List, np.ndarray], index: int = None) -> None:
_old_data = np.fromfile(fp, dtype="<f")
_old_index = _old_data[0]
_old_df = pd.DataFrame(
_old_data[1:], index=range(_old_index, _old_index + len(_old_data) - 1), columns=["old"]
_old_data[1:],
index=range(_old_index, _old_index + len(_old_data) - 1),
columns=["old"],
)
fp.seek(0)
_new_df = pd.DataFrame(data_array, index=range(index, index + len(data_array)), columns=["new"])
Expand All @@ -332,9 +350,10 @@ def write(self, data_array: Union[List, np.ndarray], index: int = None) -> None:
def start_index(self) -> Union[int, None]:
if not self.uri.exists():
return None
with self.uri.open("rb") as fp:
index = int(np.frombuffer(fp.read(4), dtype="<f")[0])
return index
if self._start_index is None:
with self.uri.open("rb") as fp:
self._start_index = int(np.frombuffer(fp.read(4), dtype="<f")[0])
return self._start_index

@property
def end_index(self) -> Union[int, None]:
Expand Down Expand Up @@ -377,3 +396,179 @@ def __getitem__(self, i: Union[int, slice]) -> Union[Tuple[int, float], pd.Serie
def __len__(self) -> int:
self.check()
return self.uri.stat().st_size // 4 - 1


class FilePITStorage(FileStorageMixin, PITStorage):
"""PIT data is a special case of Feature data, it looks like
date period value _next
0 20070428 200701 0.090219 4294967295
1 20070817 200702 0.139330 4294967295
2 20071023 200703 0.245863 4294967295
3 20080301 200704 0.347900 80
4 20080313 200704 0.395989 4294967295
It is sorted by [date, period].
next field currently is not used. just for forward compatible.
"""

# NOTE:
# PIT data should have two files, one is the index file, the other is the data file.

# pesudo code:
# date_index = calendar.index(date)
# data_start_index, data_end_index = index_file[date_index]
# data = data_file[data_start_index:data_end_index]

# the index file is like feature's data file, but given a start index in index file, it will return the first and the last observe index of the data file.
# the data file has tree columns, the first column is observe date, the second column is financial period, the third column is the value.

# so given start and end date, we can get the start_index and end_index from calendar.
# use it to read two line from index file, then we can get the start and end index of the data file.

# but consider this implementation, we will create a index file which will have 50 times lines than the data file. Is it a good idea?
# if we just create a index file the same line with data file, we have to read the whole index file for any time slice search, so why not read whole data file?

def __init__(self, instrument: str, field: str, freq: str = "day", provider_uri: dict = None, **kwargs):
super(FilePITStorage, self).__init__(instrument, field, freq, **kwargs)

if not field.endswith("_q") and not field.endswith("_a"):
raise ValueError("period field must ends with '_q' or '_a'")
self.quarterly = field.endswith("_q")

self._provider_uri = None if provider_uri is None else C.DataPathManager.format_provider_uri(provider_uri)
self.file_name = f"{instrument.lower()}/{field.lower()}.data"
self.raw_dtype = [
("date", C.pit_record_type["date"]),
("period", C.pit_record_type["period"]),
("value", C.pit_record_type["value"]),
("_next", C.pit_record_type["index"]), # not used in current implementation
]
self.dtypes = np.dtype(self.raw_dtype)
self.itemsize = self.dtypes.itemsize
self.dtype_string = "".join([i[1] for i in self.raw_dtype])
self.columns = [i[0] for i in self.raw_dtype]

@property
def uri(self) -> Path:
if self.freq not in self.support_freq:
raise ValueError(f"{self.storage_name}: {self.provider_uri} does not contain data for {self.freq}")
return self.dpm.get_data_uri(self.freq).joinpath(f"{self.storage_name}", self.file_name)

def clear(self):
with self.uri.open("wb") as _:
pass

@property
def data(self) -> pd.DataFrame:
return self[:]

def update(self, data_array: np.ndarray) -> None:
"""update data to storage, replace current data from start_date to end_date with given data_array
Args:
data_array: Structured arrays contains date, period, value and next. same with self.raw_dtype
"""
if not self.uri.exists():
# write
index = 0
else:
# sort it
data_array = np.sort(data_array, order=["date", "period"])
# get index
update_start_date = data_array[0][0]
update_end_date = data_array[-1][0]
current_data = self.np_data()
index = (current_data["date"] >= update_start_date).argmax()
end_index = (current_data["date"] > update_end_date).argmax()
new_data = np.concatenate([data_array, current_data[end_index:]])
self.write(new_data, index)

def write(self, data_array: np.ndarray, index: int = None) -> None:
"""write data to storage at specific index
Args:
data_array: Structured arrays contains date, period, value and next
index: _description_. Defaults to None.
"""

if len(data_array) == 0:
logger.info(
"len(data_array) == 0, write"
"if you need to clear the FeatureStorage, please execute: FeatureStorage.clear"
)
return

# sort data_array with first 2 columns
data_array = np.sort(data_array, order=["date", "period"])

if not self.uri.exists():
# write
index = 0 if index is None else index
with self.uri.open("wb") as fp:
data_array.tofile(self.uri)
else:
with self.uri.open("rb+") as fp:
fp.seek(index * self.itemsize)
data_array.tofile(fp)

@property
def start_index(self) -> Union[int, None]:
return 0

@property
def end_index(self) -> Union[int, None]:
if not self.uri.exists():
return None
# The next data appending index point will be `end_index + 1`
return self.start_index + len(self) - 1

def np_data(self, i: Union[int, slice] = None) -> np.ndarray:
if not self.uri.exists():
if isinstance(i, int):
return None, None
elif isinstance(i, slice):
return pd.Series(dtype=np.float32)
else:
raise TypeError(f"type(i) = {type(i)}")

if i is None:
i = slice(None, None)
storage_start_index = self.start_index
storage_end_index = self.end_index
with self.uri.open("rb") as fp:
if isinstance(i, int):
if storage_start_index > i:
raise IndexError(f"{i}: start index is {storage_start_index}")
fp.seek(i * self.itemsize)
return np.array([struct.unpack(self.dtype_string, fp.read(self.itemsize))], dtype=self.dtypes)
elif isinstance(i, slice):
start_index = storage_start_index if i.start is None else i.start
end_index = storage_end_index if i.stop is None else i.stop - 1
si = max(start_index, storage_start_index)
if si > end_index:
return pd.Series(dtype=np.float32)
fp.seek(start_index * self.itemsize)
# read n bytes
count = end_index - si + 1
data = np.frombuffer(fp.read(self.itemsize * count), dtype=self.dtypes)
return data
else:
raise TypeError(f"type(i) = {type(i)}")

def __getitem__(self, i: Union[int, slice]) -> Union[Tuple[int, float], pd.DataFrame]:
if isinstance(i, int):
return pd.Series(self.np_data(i), index=self.columns, name=i)
elif isinstance(i, slice):
data = self.np_data(i)
si = self.start_index if i.start is None else i.start
if si < 0:
si = len(self) + si
return pd.DataFrame(data, index=pd.RangeIndex(si, si + len(data)), columns=self.columns)
else:
raise TypeError(f"type(i) = {type(i)}")

def __len__(self) -> int:
self.check()
return self.uri.stat().st_size // self.itemsize
Loading

0 comments on commit 3ed3f17

Please sign in to comment.