From 941369e223e53da417ef4ad02932d48de241882c Mon Sep 17 00:00:00 2001 From: zengbin93 Date: Sat, 7 Oct 2023 10:46:42 +0800 Subject: [PATCH] =?UTF-8?q?V0.9.30=20=E6=9B=B4=E6=96=B0=E4=B8=80=E6=89=B9?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=20(#170)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 0.9.30 first commit * update * 0.9.30 优化 factor、event * 0.9.30 update * 0.9.30 优化 RedisWeightsClient * fix docs * 0.9.30 优化部分功能代码 * 0.9.30 update RedisWeightsClient * 0.9.30 update * 0.9.30 update * 0.9.30 新增几个 streamlit 组件 * 0.9.30 RedisWeightsClient 支持权限管控 --- .github/workflows/pythonpackage.yml | 2 +- czsc/__init__.py | 16 ++- czsc/objects.py | 19 ++- czsc/traders/rwc.py | 44 ++++--- czsc/traders/weight_backtest.py | 67 ++++++---- czsc/utils/__init__.py | 1 + czsc/utils/bar_generator.py | 18 +++ czsc/utils/calendar.py | 12 +- czsc/utils/cross.py | 14 +- czsc/utils/features.py | 28 ++++ czsc/utils/st_components.py | 121 +++++++++++++++++- czsc/utils/stats.py | 29 +++-- docs/requirements.txt | 3 +- ...42\346\200\201\351\200\211\350\202\241.py" | 96 ++++++++++++++ examples/run_dummy_backtest.py | 4 - examples/test_offline/test_rwc.py | 41 ++++++ examples/test_offline/test_weight_backtest.py | 40 ++++++ test/test_bar_generator.py | 30 +++++ test/test_utils.py | 25 ++++ 19 files changed, 523 insertions(+), 87 deletions(-) create mode 100644 czsc/utils/features.py create mode 100644 "examples/TS\346\225\260\346\215\256\346\272\220\347\232\204\345\275\242\346\200\201\351\200\211\350\202\241.py" create mode 100644 examples/test_offline/test_rwc.py create mode 100644 examples/test_offline/test_weight_backtest.py diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 2231ac947..af6e93c59 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -5,7 +5,7 @@ name: Python package on: push: - branches: [ master, V0.9.29 ] + branches: [ master, V0.9.30 ] pull_request: branches: [ master ] diff --git a/czsc/__init__.py b/czsc/__init__.py index 19b3a844c..bdcff5ed0 100644 --- a/czsc/__init__.py +++ b/czsc/__init__.py @@ -42,6 +42,10 @@ BarGenerator, freq_end_time, resample_bars, + is_trading_time, + get_intraday_times, + check_freq_and_market, + dill_dump, dill_load, read_json, @@ -81,6 +85,10 @@ # streamlit 量化分析组件 from czsc.utils.st_components import ( show_daily_return, + show_correlation, + show_sectional_ic, + show_factor_returns, + show_factor_layering, ) from czsc.utils.bi_info import ( @@ -88,11 +96,15 @@ symbols_bi_infos, ) +from czsc.utils.features import ( + normalize_feature, +) + -__version__ = "0.9.29" +__version__ = "0.9.30" __author__ = "zengbin93" __email__ = "zeng_bin8888@163.com" -__date__ = "20230904" +__date__ = "20230925" def welcome(): diff --git a/czsc/objects.py b/czsc/objects.py index b7e7ae6e7..ad6f1b5ec 100644 --- a/czsc/objects.py +++ b/czsc/objects.py @@ -510,8 +510,9 @@ class Factor: def __post_init__(self): if not self.signals_all: raise ValueError("signals_all 不能为空") - str_signals = str(self.dump()) - sha256 = hashlib.sha256(str_signals.encode("utf-8")).hexdigest().upper()[:8] + _fatcor = self.dump() + _fatcor.pop("name") + sha256 = hashlib.sha256(str(_fatcor).encode("utf-8")).hexdigest().upper()[:8] self.name = f"{self.name}#{sha256}" if self.name else sha256 @property @@ -552,6 +553,7 @@ def dump(self) -> dict: signals_not = [x.signal for x in self.signals_not] if self.signals_not else [] raw = { + "name": self.name, "signals_all": signals_all, "signals_any": signals_any, "signals_not": signals_not, @@ -604,9 +606,14 @@ class Event: def __post_init__(self): if not self.factors: raise ValueError("factors 不能为空") - str_factors = str(self.dump()) - sha256 = hashlib.sha256(str_factors.encode("utf-8")).hexdigest().upper()[:8] - self.name = f"{self.operate.value}#{sha256}" + _event = self.dump() + _event.pop("name") + sha256 = hashlib.sha256(str(_event).encode("utf-8")).hexdigest().upper()[:8] + if self.name: + self.name = f"{self.name}#{sha256}" + else: + self.name = f"{self.operate.value}#{sha256}" + self.sha256 = sha256 @property def unique_signals(self) -> List[str]: @@ -681,6 +688,7 @@ def dump(self) -> dict: factors = [x.dump() for x in self.factors] raw = { + "name": self.name, "operate": self.operate.value, "signals_all": signals_all, "signals_any": signals_any, @@ -712,6 +720,7 @@ def load(cls, raw: dict): assert raw["factors"], "factors can not be empty" e = Event( + name=raw.get("name", ""), operate=Operate.__dict__["_value2member_map_"][raw["operate"]], factors=[Factor.load(x) for x in raw["factors"]], signals_all=[Signal(x) for x in raw.get("signals_all", [])], diff --git a/czsc/traders/rwc.py b/czsc/traders/rwc.py index d6e884d82..3221f71e3 100644 --- a/czsc/traders/rwc.py +++ b/czsc/traders/rwc.py @@ -17,6 +17,8 @@ class RedisWeightsClient: """策略持仓权重收发客户端""" + version = "V231006" + def __init__(self, strategy_name, redis_url, **kwargs): """ :param strategy_name: str, 策略名 @@ -39,6 +41,7 @@ def __init__(self, strategy_name, redis_url, **kwargs): """ self.strategy_name = strategy_name self.redis_url = redis_url + self.key_prefix = kwargs.get("key_prefix", "Weights") self.heartbeat_client = redis.from_url(redis_url, decode_responses=True) self.heartbeat_prefix = kwargs.get("heartbeat_prefix", "heartbeat") @@ -47,22 +50,33 @@ def __init__(self, strategy_name, redis_url, **kwargs): self.r = redis.Redis(connection_pool=thread_safe_pool) self.lua_publish = RedisWeightsClient.register_lua_publish(self.r) - self.heartbeat_thread = threading.Thread(target=self.__heartbeat, daemon=True) - self.heartbeat_thread.start() + if kwargs.get('send_heartbeat', True): + self.heartbeat_thread = threading.Thread(target=self.__heartbeat, daemon=True) + self.heartbeat_thread.start() def set_metadata(self, base_freq, description, author, outsample_sdt, **kwargs): """设置策略元数据""" + key = f'{self.key_prefix}:META:{self.strategy_name}' + if self.r.exists(key): + if not kwargs.pop('overwrite', False): + logger.warning(f'已存在 {self.strategy_name} 的元数据,如需覆盖请设置 overwrite=True') + return + else: + self.r.delete(key) + logger.warning(f'删除 {self.strategy_name} 的元数据,重新写入') + outsample_sdt = pd.to_datetime(outsample_sdt).strftime('%Y%m%d') - meta = {'name': self.strategy_name, 'base_freq': base_freq, + meta = {'name': self.strategy_name, 'base_freq': base_freq, 'key_prefix': self.key_prefix, 'description': description, 'author': author, 'outsample_sdt': outsample_sdt, 'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'kwargs': json.dumps(kwargs)} - self.r.hset(f'{self.strategy_name}:meta', mapping=meta) + self.r.hset(key, mapping=meta) @property def metadata(self): """获取策略元数据""" - return self.r.hgetall(f'{self.strategy_name}:meta') + key = f'{self.key_prefix}:META:{self.strategy_name}' + return self.r.hgetall(key) def publish(self, symbol, dt, weight, price=0, ref=None, overwrite=False): """发布单个策略持仓权重 @@ -79,7 +93,7 @@ def publish(self, symbol, dt, weight, price=0, ref=None, overwrite=False): dt = pd.to_datetime(dt) udt = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - key = f'Weights:{self.strategy_name}:{symbol}:{dt.strftime("%Y%m%d%H%M%S")}' + key = f'{self.key_prefix}:{self.strategy_name}:{symbol}:{dt.strftime("%Y%m%d%H%M%S")}' ref = ref if ref else '{}' ref_str = json.dumps(ref) if isinstance(ref, dict) else ref return self.lua_publish(keys=[key], args=[1 if overwrite else 0, udt, weight, price, ref_str]) @@ -103,7 +117,7 @@ def publish_dataframe(self, df, overwrite=False, batch_size=10000): keys, args = [], [] for row in df[['symbol', 'dt', 'weight', 'price', 'ref']].to_numpy(): - key = f'Weights:{self.strategy_name}:{row[0]}:{row[1].strftime("%Y%m%d%H%M%S")}' + key = f'{self.key_prefix}:{self.strategy_name}:{row[0]}:{row[1].strftime("%Y%m%d%H%M%S")}' keys.append(key) args.append(row[2]) @@ -130,7 +144,7 @@ def publish_dataframe(self, df, overwrite=False, batch_size=10000): def __heartbeat(self): while True: - key = f'{self.heartbeat_prefix}:{self.strategy_name}' + key = f'{self.key_prefix}:{self.heartbeat_prefix}:{self.strategy_name}' try: self.heartbeat_client.set(key, datetime.now().strftime('%Y-%m-%d %H:%M:%S')) except Exception: @@ -138,13 +152,13 @@ def __heartbeat(self): time.sleep(15) def get_keys(self, pattern): - """使用 lua 获取 redis 中指定 pattern 的 keys""" - return self.r.eval('''local pattern = ARGV[1]\nreturn redis.call('KEYS', pattern)''', 0, pattern) + """获取 redis 中指定 pattern 的 keys""" + return self.r.keys(pattern) def clear_all(self): """删除该策略所有记录""" - self.r.delete(f"{self.strategy_name}:meta") - keys = self.get_keys(f'Weights:{self.strategy_name}*') + self.r.delete(f'{self.key_prefix}:META:{self.strategy_name}') + keys = self.get_keys(f'{self.key_prefix}:{self.strategy_name}*') if keys is not None and len(keys) > 0: self.r.delete(*keys) @@ -195,7 +209,7 @@ def register_lua_publish(client): def get_symbols(self): """获取策略交易的品种列表""" - keys = self.get_keys(f'Weights:{self.strategy_name}*') + keys = self.get_keys(f'{self.key_prefix}:{self.strategy_name}*') symbols = {x.split(":")[2] for x in keys} return list(symbols) @@ -204,7 +218,7 @@ def get_last_weights(self, symbols=None): symbols = symbols if symbols else self.get_symbols() with self.r.pipeline() as pipe: for symbol in symbols: - pipe.hgetall(f"Weights:{self.strategy_name}:{symbol}:LAST") + pipe.hgetall(f'{self.key_prefix}:{self.strategy_name}:{symbol}:LAST') rows = pipe.execute() dfw = pd.DataFrame(rows) @@ -216,7 +230,7 @@ def get_hist_weights(self, symbol, sdt, edt) -> pd.DataFrame: """获取单个品种的持仓权重历史数据""" start_score = pd.to_datetime(sdt).strftime('%Y%m%d%H%M%S') end_score = pd.to_datetime(edt).strftime('%Y%m%d%H%M%S') - model_key = f'Weights:{self.strategy_name}:{symbol}' + model_key = f'{self.key_prefix}:{self.strategy_name}:{symbol}' key_list = self.r.zrangebyscore(model_key, start_score, end_score) if len(key_list) == 0: diff --git a/czsc/traders/weight_backtest.py b/czsc/traders/weight_backtest.py index 383f6093e..7aaeefa09 100644 --- a/czsc/traders/weight_backtest.py +++ b/czsc/traders/weight_backtest.py @@ -133,7 +133,11 @@ def get_ensemble_weight(trader: CzscTrader, method: Union[AnyStr, Callable] = 'm class WeightBacktest: - """持仓权重回测""" + """持仓权重回测 + + 飞书文档:https://s0cqcxuy3p.feishu.cn/wiki/Pf1fw1woQi4iJikbKJmcYToznxb + """ + version = "V231005" def __init__(self, dfw, digits=2, **kwargs) -> None: """持仓权重回测 @@ -142,7 +146,7 @@ def __init__(self, dfw, digits=2, **kwargs) -> None: dt 为K线结束时间,必须是连续的交易时间序列,不允许有时间断层 symbol 为合约代码, - weight 为K线结束时间对应的持仓权重, + weight 为K线结束时间对应的持仓权重,品种之间的权重是独立的,不会互相影响 price 为结束时间对应的交易价格,可以是当前K线的收盘价,或者下一根K线的开盘价,或者未来N根K线的TWAP、VWAP等 数据样例如下: @@ -169,10 +173,7 @@ def __init__(self, dfw, digits=2, **kwargs) -> None: self.fee_rate = kwargs.get('fee_rate', 0.0002) self.dfw['weight'] = self.dfw['weight'].round(digits) self.symbols = list(self.dfw['symbol'].unique().tolist()) - self.res_path = Path(kwargs.get('res_path', "weight_backtest")) - self.res_path.mkdir(exist_ok=True, parents=True) - logger.add(self.res_path.joinpath("weight_backtest.log"), rotation="1 week") - logger.info(f"持仓权重回测参数:digits={digits}, fee_rate={self.fee_rate},res_path={self.res_path},kwargs={kwargs}") + self.results = self.backtest() def get_symbol_daily(self, symbol): """获取某个合约的每日收益率 @@ -285,39 +286,53 @@ def __add_operate(dt, bar_id, volume, price, operate): def backtest(self): """回测所有合约的收益率""" + symbols = self.symbols res = {} - for symbol in self.symbols: + for symbol in symbols: daily = self.get_symbol_daily(symbol) pairs = self.get_symbol_pairs(symbol) res[symbol] = {"daily": daily, "pairs": pairs} - pd.to_pickle(res, self.res_path.joinpath("res.pkl")) - logger.info(f"回测结果已保存到 {self.res_path.joinpath('res.pkl')}") - - # 品种等权费后日收益率 - dret = pd.concat([v['daily'] for v in res.values()], ignore_index=True) + dret = pd.concat([v['daily'] for k, v in res.items() if k in symbols], ignore_index=True) dret = pd.pivot_table(dret, index='date', columns='symbol', values='return').fillna(0) dret['total'] = dret[list(res.keys())].mean(axis=1) + res['品种等权日收益'] = dret + stats = {"开始日期": dret.index.min().strftime("%Y%m%d"), "结束日期": dret.index.max().strftime("%Y%m%d")} stats.update(daily_performance(dret['total'])) - logger.info(f"品种等权费后日收益率:{stats}") - dret.to_excel(self.res_path.joinpath("daily_return.xlsx"), index=True) - logger.info(f"品种等权费后日收益率已保存到 {self.res_path.joinpath('daily_return.xlsx')}") + dfp = pd.concat([v['pairs'] for k, v in res.items() if k in symbols], ignore_index=True) + pairs_stats = evaluate_pairs(dfp) + pairs_stats = {k: v for k, v in pairs_stats.items() if k in ['单笔收益', '持仓K线数', '交易胜率', '持仓天数']} + stats.update(pairs_stats) + + res['绩效评价'] = stats + return res + + def report(self, res_path): + """回测报告""" + res_path = Path(res_path) + res_path.mkdir(exist_ok=True, parents=True) + logger.add(res_path.joinpath("weight_backtest.log"), rotation="1 week") + logger.info(f"持仓权重回测参数:digits={self.digits}, fee_rate={self.fee_rate},res_path={res_path}") + + res = self.results + pd.to_pickle(res, res_path.joinpath("res.pkl")) + logger.info(f"回测结果已保存到 {res_path.joinpath('res.pkl')}") + + # 品种等权费后日收益率 + dret = res['品种等权日收益'].copy() + dret.to_excel(res_path.joinpath("daily_return.xlsx"), index=True) + logger.info(f"品种等权费后日收益率已保存到 {res_path.joinpath('daily_return.xlsx')}") # 品种等权费后日收益率资金曲线绘制 dret = dret.cumsum() fig = px.line(dret, y=dret.columns.to_list(), title="费后日收益率资金曲线") fig.for_each_trace(lambda trace: trace.update(visible=True if trace.name == 'total' else 'legendonly')) - fig.write_html(self.res_path.joinpath("daily_return.html")) - logger.info(f"费后日收益率资金曲线已保存到 {self.res_path.joinpath('daily_return.html')}") + fig.write_html(res_path.joinpath("daily_return.html")) + logger.info(f"费后日收益率资金曲线已保存到 {res_path.joinpath('daily_return.html')}") # 所有开平交易记录的表现 - dfp = pd.concat([v['pairs'] for v in res.values()], ignore_index=True) - pairs_stats = evaluate_pairs(dfp) - pairs_stats = {k: v for k, v in pairs_stats.items() if k in ['单笔收益', '持仓K线数', '交易胜率', '持仓天数']} - logger.info(f"所有开平交易记录的表现:{pairs_stats}") - stats.update(pairs_stats) - logger.info(f"策略评价:{stats}") - save_json(stats, self.res_path.joinpath("stats.json")) - res['stats'] = stats - return res + stats = res['绩效评价'].copy() + logger.info(f"绩效评价:{stats}") + save_json(stats, res_path.joinpath("stats.json")) + logger.info(f"绩效评价已保存到 {res_path.joinpath('stats.json')}") diff --git a/czsc/utils/__init__.py b/czsc/utils/__init__.py index a7778c42a..802fb0cea 100644 --- a/czsc/utils/__init__.py +++ b/czsc/utils/__init__.py @@ -11,6 +11,7 @@ from .word_writer import WordWriter from .corr import nmi_matrix, single_linear, cross_sectional_ic from .bar_generator import BarGenerator, freq_end_time, resample_bars +from .bar_generator import is_trading_time, get_intraday_times, check_freq_and_market from .io import dill_dump, dill_load, read_json, save_json from .sig import check_pressure_support, check_gap_info, is_bis_down, is_bis_up, get_sub_elements from .sig import same_dir_counts, fast_slow_cross, count_last_same, create_single_signal diff --git a/czsc/utils/bar_generator.py b/czsc/utils/bar_generator.py index 70974cd49..ceb5bcf53 100644 --- a/czsc/utils/bar_generator.py +++ b/czsc/utils/bar_generator.py @@ -22,6 +22,24 @@ freq_edt_map[f"{_f}_{_m}"] = {k: v for k, v in dfg[["time", _f]].values} +def is_trading_time(dt: datetime = datetime.now(), market="A股"): + """判断指定时间是否是交易时间""" + hm = dt.strftime("%H:%M") + times = freq_market_times[f"1分钟_{market}"] + return True if hm in times else False + + +def get_intraday_times(freq='1分钟', market="A股"): + """获取指定市场的交易时间段 + + :param market: 市场名称,可选值:A股、期货、默认 + :return: 交易时间段列表 + """ + assert market in ['A股', '期货', '默认'], "market 参数必须为 A股 或 期货 或 默认" + assert freq.endswith("分钟"), "freq 参数必须为分钟级别的K线周期" + return freq_market_times[f"{freq}_{market}"] + + def check_freq_and_market(time_seq: List[AnyStr]): """检查时间序列是否为同一周期,是否为同一市场 diff --git a/czsc/utils/calendar.py b/czsc/utils/calendar.py index a846c335c..27f66c2e7 100644 --- a/czsc/utils/calendar.py +++ b/czsc/utils/calendar.py @@ -7,35 +7,35 @@ """ import pandas as pd from pathlib import Path +from datetime import datetime calendar = pd.read_feather(Path(__file__).parent / "china_calendar.feather") -def is_trading_date(date): +def is_trading_date(date=datetime.now()): """判断是否是交易日""" date = pd.to_datetime(date) is_open = calendar[calendar['cal_date'] == date].iloc[0]['is_open'] return is_open == 1 -def next_trading_date(date, n=1): +def next_trading_date(date=datetime.now(), n=1): """获取未来第N个交易日""" date = pd.to_datetime(date) df = calendar[calendar['cal_date'] > date] return df[df['is_open'] == 1].iloc[n - 1]['cal_date'] -def prev_trading_date(date, n=1): +def prev_trading_date(date=datetime.now(), n=1): """获取过去第N个交易日""" date = pd.to_datetime(date) df = calendar[calendar['cal_date'] < date] return df[df['is_open'] == 1].iloc[-n]['cal_date'] -def get_trading_dates(sdt, edt): +def get_trading_dates(sdt, edt=datetime.now()): """获取两个日期之间的所有交易日""" - sdt = pd.to_datetime(sdt) - edt = pd.to_datetime(edt) + sdt, edt = pd.to_datetime(sdt), pd.to_datetime(edt) df = calendar[(calendar['cal_date'] >= sdt) & (calendar['cal_date'] <= edt)] return df[df['is_open'] == 1]['cal_date'].tolist() diff --git a/czsc/utils/cross.py b/czsc/utils/cross.py index 52428a790..481eabe2e 100644 --- a/czsc/utils/cross.py +++ b/czsc/utils/cross.py @@ -190,15 +190,11 @@ def cross_sectional_ranker(df, x_cols, y_col, **kwargs): :return: df, 包含预测分数和排序列 """ + from lightgbm import LGBMRanker from sklearn.model_selection import TimeSeriesSplit - try: - from lightgbm import LGBMRanker - except: - logger.warning("lightgbm not installed, please install it first! (pip install lightgbm -U)") - return df - + assert "symbol" in df.columns, "df must have column 'symbol'" - assert "dt" in df.columns, f"df must have column 'dt'" + assert "dt" in df.columns, "df must have column 'dt'" if kwargs.get('copy', True): df = df.copy() @@ -213,11 +209,11 @@ def cross_sectional_ranker(df, x_cols, y_col, **kwargs): for train_index, test_index in tss.split(dfd): train_dts = dfd[train_index][:, 0] - test_dts= dfd[test_index][:, 0] + test_dts = dfd[test_index][:, 0] # 拆分训练集和测试集 train, test = df[df['dt'].isin(train_dts)], df[df['dt'].isin(test_dts)] - X_train, X_test, y_train = train[x_cols], test[x_cols], train[y_col] + X_train, X_test, y_train = train[x_cols], test[x_cols], train[y_col] query_train = train.groupby('dt')['symbol'].count().values # 训练模型 & 预测 diff --git a/czsc/utils/features.py b/czsc/utils/features.py new file mode 100644 index 000000000..37af87670 --- /dev/null +++ b/czsc/utils/features.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2023/10/06 15:01 +describe: 因子(特征)处理 +""" +from loguru import logger +from sklearn.preprocessing import scale + + +def normalize_feature(df, x_col, **kwargs): + """因子标准化:缩尾,然后标准化 + + :param df: pd.DataFrame,数据源 + :param x_col: str,因子列名 + :param kwargs: + + - q: float,缩尾比例, 默认 0.05 + """ + df = df.copy() + if df[x_col].isna().sum() > 0: + logger.warning(f"因子列 {x_col} 存在缺失值,已自动剔除,这有可能导致后续分析结果不准确") + df = df.dropna(subset=[x_col]) + + q = kwargs.get("q", 0.05) # 缩尾比例 + df[x_col] = df.groupby("dt")[x_col].transform(lambda x: scale(x.clip(lower=x.quantile(q), upper=x.quantile(1 - q)))) + return df diff --git a/czsc/utils/st_components.py b/czsc/utils/st_components.py index d4b36af56..a0ca2050f 100644 --- a/czsc/utils/st_components.py +++ b/czsc/utils/st_components.py @@ -2,10 +2,11 @@ import pandas as pd import streamlit as st import plotly.express as px +from sklearn.linear_model import LinearRegression def show_daily_return(df, **kwargs): - """用streamlit展示日收益""" + """用 streamlit 展示日收益""" assert df.index.dtype == 'datetime64[ns]', "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" type_ = "持有日" if kwargs.get("none_zero", False) else "交易日" @@ -17,7 +18,7 @@ def show_daily_return(df, **kwargs): stats.append(col_stats) stats = pd.DataFrame(stats).set_index('日收益名称') - fmt_cols = ['年化', '夏普', '最大回撤', '卡玛', '年化波动率', '非零覆盖'] + fmt_cols = ['年化', '夏普', '最大回撤', '卡玛', '年化波动率', '非零覆盖', '日胜率', '盈亏平衡点'] stats = stats.style.background_gradient(cmap='RdYlGn_r', axis=None).format('{:.4f}', subset=fmt_cols) df = df.cumsum() @@ -25,8 +26,120 @@ def show_daily_return(df, **kwargs): for col in kwargs.get("legend_only_cols", []): fig.update_traces(visible="legendonly", selector=dict(name=col)) + title = kwargs.get("title", "") with st.container(): - st.subheader(f'{kwargs.get("title", "日收益表现评价")}({type_})') - st.divider() + if title: + st.subheader(title) + st.divider() st.dataframe(stats, use_container_width=True) st.plotly_chart(fig, use_container_width=True) + + +def show_correlation(df, cols=None, method='pearson', **kwargs): + """用 streamlit 展示相关性 + + :param df: pd.DataFrame,数据源 + :param cols: list,分析相关性的字段 + :param method: str,计算相关性的方法,可选 pearson 和 spearman + """ + cols = cols or df.columns.to_list() + dfr = df[cols].corr(method=method) + dfr['total'] = dfr.sum(axis=1) - 1 + dfr = dfr.style.background_gradient(cmap='RdYlGn_r', axis=None).format('{:.4f}', na_rep='MISS') + st.dataframe(dfr, use_container_width=kwargs.get("use_container_width", True)) + + +def show_sectional_ic(df, x_col, y_col, method='pearson', **kwargs): + """使用 streamlit 展示截面IC + + :param df: pd.DataFrame,数据源 + :param x_col: str,因子列名 + :param y_col: str,收益列名 + :param method: str,计算IC的方法,可选 pearson 和 spearman + """ + dfc, res = czsc.cross_sectional_ic(df, x_col=x_col, y_col=y_col, dt_col='dt', method=method) + + col1, col2, col3, col4 = st.columns([1, 1, 1, 5]) + col1.metric("IC均值", res['IC均值']) + col1.metric("IC标准差", res['IC标准差']) + col2.metric("ICIR", res['ICIR']) + col2.metric("IC胜率", res['IC胜率']) + col3.metric("IC绝对值>2%占比", res['IC绝对值>2%占比']) + col3.metric("品种数量", df['symbol'].nunique()) + + dfc[['year', 'month']] = dfc.dt.apply(lambda x: pd.Series([x.year, x.month])) + dfm = dfc.groupby(['year', 'month']).agg({'ic': 'mean'}).reset_index() + dfm = pd.pivot_table(dfm, index='year', columns='month', values='ic') + + col4.write("月度IC分析结果:") + col4.dataframe(dfm.style.background_gradient(cmap='RdYlGn_r', axis=None).format('{:.4f}', na_rep='MISS'), + use_container_width=True) + + fig = px.histogram(df, x=x_col, marginal="box", title="因子数据分布图") + st.plotly_chart(fig, use_container_width=True) + + +def show_factor_returns(df, x_col, y_col): + """使用 streamlit 展示因子收益率 + + :param df: pd.DataFrame,数据源 + :param x_col: str,因子列名 + :param y_col: str,收益列名 + """ + assert 'dt' in df.columns, "时间列必须为 dt" + + res = [] + for dt, dfg in df.groupby("dt"): + dfg = dfg.copy().dropna(subset=[x_col, y_col]) + X = dfg[x_col].values.reshape(-1, 1) + y = dfg[y_col].values.reshape(-1, 1) + model = LinearRegression(fit_intercept=False).fit(X, y) + res.append([dt, model.coef_[0][0]]) + + res = pd.DataFrame(res, columns=["dt", "因子收益率"]) + res['dt'] = pd.to_datetime(res['dt']) + + col1, col2 = st.columns(2) + fig = px.bar(res, x='dt', y="因子收益率", title="因子逐K收益率") + col1.plotly_chart(fig, use_container_width=True) + + res["因子累计收益率"] = res["因子收益率"].cumsum() + fig = px.line(res, x='dt', y="因子累计收益率", title="因子累计收益率") + col2.plotly_chart(fig, use_container_width=True) + + +def show_factor_layering(df, x_col, y_col='n1b', **kwargs): + """使用 streamlit 绘制因子分层收益率图 + + :param df: 因子数据 + :param x_col: 因子列名 + :param y_col: 收益列名 + :param kwargs: + + - n: 分层数量,默认为10 + - long: 多头组合,例如 "第10层" + - short: 空头组合,例如 "第01层" + + """ + n = kwargs.get("n", 10) + if df[y_col].max() > 100: # 收益率单位为BP, 转换为万分之一 + df[y_col] = df[y_col] / 10000 + + def _layering(x): + return pd.qcut(x, q=n, labels=False, duplicates='drop') + df[f'{x_col}分层'] = df.groupby('dt')[x_col].transform(_layering) + + mr = df.groupby(["dt", f'{x_col}分层'])[y_col].mean().reset_index() + mrr = mr.pivot(index='dt', columns=f'{x_col}分层', values=y_col).fillna(0) + mrr.columns = [f'第{str(i).zfill(2)}层' for i in range(1, n + 1)] + + tabs = st.tabs(["分层收益率", "多空组合"]) + with tabs[0]: + show_daily_return(mrr) + + with tabs[1]: + long = kwargs.get("long", f"第{n}层") + short = kwargs.get("short", "第01层") + st.write(f"多头:{long},空头:{short}") + mrr['多空组合'] = (mrr[long] - mrr[short]) / 2 + show_daily_return(mrr[['多空组合']]) diff --git a/czsc/utils/stats.py b/czsc/utils/stats.py index 1f65baec2..809409041 100644 --- a/czsc/utils/stats.py +++ b/czsc/utils/stats.py @@ -7,7 +7,18 @@ """ import numpy as np import pandas as pd -from typing import List + + +def cal_break_even_point(seq) -> float: + """计算单笔收益序列的盈亏平衡点 + + :param seq: 单笔收益序列,数据样例:[0.01, 0.02, -0.01, 0.03, 0.02, -0.02, 0.01, -0.01, 0.02, 0.01] + :return: 盈亏平衡点 + """ + if sum(seq) < 0: + return 1.0 + seq = np.cumsum(sorted(seq)) # type: ignore + return (np.sum(seq < 0) + 1) / len(seq) # type: ignore def subtract_fee(df, fee=1): @@ -44,7 +55,8 @@ def daily_performance(daily_returns): daily_returns = np.array(daily_returns, dtype=np.float64) if len(daily_returns) == 0 or np.std(daily_returns) == 0 or all(x == 0 for x in daily_returns): - return {"年化": 0, "夏普": 0, "最大回撤": 0, "卡玛": 0, "日胜率": 0, "年化波动率": 0, "非零覆盖": 0} + return {"年化": 0, "夏普": 0, "最大回撤": 0, "卡玛": 0, "日胜率": 0, + "年化波动率": 0, "非零覆盖": 0, "盈亏平衡点": 0} annual_returns = np.sum(daily_returns) / len(daily_returns) * 252 sharpe_ratio = np.mean(daily_returns) / np.std(daily_returns) * np.sqrt(252) @@ -63,6 +75,7 @@ def daily_performance(daily_returns): "日胜率": round(win_pct, 4), "年化波动率": round(annual_volatility, 4), "非零覆盖": round(none_zero_cover, 4), + "盈亏平衡点": round(cal_break_even_point(daily_returns), 4), } return sta @@ -133,18 +146,6 @@ def net_value_stats(nv: pd.DataFrame, exclude_zero: bool = False, sub_cost=True) return res -def cal_break_even_point(seq: List[float]) -> float: - """计算单笔收益序列的盈亏平衡点 - - :param seq: 单笔收益序列 - :return: 盈亏平衡点 - """ - if sum(seq) < 0: - return 1.0 - seq = np.cumsum(sorted(seq)) # type: ignore - return (np.sum(seq < 0) + 1) / len(seq) # type: ignore - - def evaluate_pairs(pairs: pd.DataFrame, trade_dir: str = "多空") -> dict: """评估开平交易记录的表现 diff --git a/docs/requirements.txt b/docs/requirements.txt index e8d2c4f4c..ec6a332c2 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -24,4 +24,5 @@ pytest tenacity>=8.1.0 requests-toolbelt>=0.10.1 plotly>=5.11.0 -parse>=1.19.0 \ No newline at end of file +parse>=1.19.0 +redis \ No newline at end of file diff --git "a/examples/TS\346\225\260\346\215\256\346\272\220\347\232\204\345\275\242\346\200\201\351\200\211\350\202\241.py" "b/examples/TS\346\225\260\346\215\256\346\272\220\347\232\204\345\275\242\346\200\201\351\200\211\350\202\241.py" new file mode 100644 index 000000000..7e1fba6a9 --- /dev/null +++ "b/examples/TS\346\225\260\346\215\256\346\272\220\347\232\204\345\275\242\346\200\201\351\200\211\350\202\241.py" @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2023/9/25 19:45 +describe: 使用 Tushare 数据源的形态选股 + +相关飞书文档: +1. https://s0cqcxuy3p.feishu.cn/wiki/UGvvw2bMiihciHkRnJecKzm6nnb +""" +import czsc +import shutil +import pandas as pd +import tushare as ts +from pathlib import Path +from czsc.connectors.ts_connector import get_raw_bars, get_symbols, dc + + +pro = ts.pro_api() + +# 首次使用,需要设置 Tushare 的 token,注意,每台电脑只需要执行一次即可,token 会保存在本地文件中 +# 没有 token 的用户,可以点击 https://tushare.pro/register?reg=7 进行注册 +# import tushare as ts +# ts.set_token("your token") + +# 如果是每天选股,需要执行以下代码先清空缓存,否则会导致选股结果不更新 +# dc.clear() + + +def get_events_matched(refresh=False): + """执行形态选股""" + + # 获取A股所有股票代码 + symbols = get_symbols("stock") + + # 定义形态事件, 支持多个事件 + events = [ + { + "name": "日线CCI多头", + "operate": "开多", + "signals_not": [ + "日线_D1_涨跌停V230331_跌停_任意_任意_0", + "日线_D1_涨跌停V230331_涨停_任意_任意_0" + ], + "factors": [{"name": "CCI看多", "signals_all": ["日线_D1CCI14#3#10_BS辅助V230402_多头_任意_任意_0"]}], + }, + + {"name": "日线CCI多头;不过滤涨跌停", + "operate": "开多", + "factors": [{"name": "CCI看多", "signals_all": ["日线_D1CCI14#3#10_BS辅助V230402_多头_任意_任意_0"]}]}, + ] + + ems_params = { + "events": events, + "symbols": symbols, + "read_bars": get_raw_bars, + "bar_sdt": "2017-01-01", + "sdt": "2022-01-01", + "edt": "2023-01-01", + "max_workers": 10, + "results_path": r"D:\量化投研\EMS测试A", + } + path = Path(ems_params['results_path']) + if path.exists() and refresh: + shutil.rmtree(path) + + ems = czsc.EventMatchSensor(**ems_params) + + # 获取截面匹配结果 + df = ems.data.copy() + results = {} + for name in ems.events_name: + results[name] = df[df[name] == 1][['symbol', 'dt']].copy().reset_index(drop=True) + return results + + +if __name__ == '__main__': + results = get_events_matched(refresh=True) + + # 获取最新的选股结果 + max_dt = max([df.dt.max() for df in results.values()]) + print(f"最新选股日期:{max_dt}") + latest = {k: df[df.dt == max_dt]['symbol'].to_list() for k, df in results.items()} + + # 过滤掉ST股票 + dfb = dc.daily_basic_new(max_dt)[['ts_code', 'name', 'industry', 'area']] + st_stocks = dfb[dfb.name.str.contains('ST').fillna(True)]['ts_code'].to_list() + latest = {k: [s for s in v if s not in st_stocks] for k, v in latest.items()} + latest = {k: dfb[dfb['ts_code'].isin(v)] for k, v in latest.items() if len(v) > 0} + + # 保存选股结果到 excel + file_xlsx = Path.cwd() / f"选股结果_{pd.to_datetime(max_dt).strftime('%Y%m%d')}.xlsx" + with pd.ExcelWriter(file_xlsx) as writer: + for k, v in latest.items(): + v.to_excel(writer, sheet_name=k, index=False) + print(f"选股结果保存到:{file_xlsx}") diff --git a/examples/run_dummy_backtest.py b/examples/run_dummy_backtest.py index 31f954465..41c5ecb71 100644 --- a/examples/run_dummy_backtest.py +++ b/examples/run_dummy_backtest.py @@ -32,7 +32,3 @@ if __name__ == '__main__': # 这里仅回测前10个品种,作为执行示例 dummy.execute(symbols[:10], n_jobs=4) - - - - diff --git a/examples/test_offline/test_rwc.py b/examples/test_offline/test_rwc.py new file mode 100644 index 000000000..68d64e39c --- /dev/null +++ b/examples/test_offline/test_rwc.py @@ -0,0 +1,41 @@ +import sys +sys.path.insert(0, ".") +sys.path.insert(0, "..") + +import czsc +import pandas as pd + +assert czsc.RedisWeightsClient.version == "V231005" + +redis_url = 'redis://20.205.5.**:9103/1' +rwc = czsc.RedisWeightsClient('test', redis_url, key_prefix='WeightsA') + +# 如果需要清理 redis 中的数据,执行 +# rwc.clear_all() + +# 首次写入,建议设置一些策略元数据 +rwc.set_metadata(description='测试策略:仅用于读写redis测试', base_freq='1分钟', author='ZB', outsample_sdt='20220101') +print(rwc.metadata) + +rwc.set_metadata(description='测试策略:仅用于读写redis测试', base_freq='1分钟', author='ZB', + outsample_sdt='20220101', overwrite=True) +print(rwc.metadata) + +# 写入策略持仓权重,样例数据下载:https://s0cqcxuy3p.feishu.cn/wiki/Pf1fw1woQi4iJikbKJmcYToznxb +weights = pd.read_feather(r"C:\Users\zengb\Downloads\weight_example.feather") + +# 写入单条数据 +rwc.publish(**weights.iloc[0].to_dict()) + +# 批量写入整个dataframe;样例超300万行,写入耗时约5分钟 +rwc.publish_dataframe(weights, overwrite=False, batch_size=1000000) + +# 获取redis中该策略有持仓权重的品种列表 +symbols = rwc.get_symbols() +print(symbols) + +# 获取指定品种在某个时间段的持仓权重数据 +dfw = rwc.get_hist_weights('ZZSF9001', '20210101', '20230101') + +# 获取所有品种最近一个时间的持仓权重 +dfr = rwc.get_last_weights(symbols=symbols) diff --git a/examples/test_offline/test_weight_backtest.py b/examples/test_offline/test_weight_backtest.py new file mode 100644 index 000000000..3ac6c4af9 --- /dev/null +++ b/examples/test_offline/test_weight_backtest.py @@ -0,0 +1,40 @@ +import sys +sys.path.insert(0, ".") +sys.path.insert(0, "..") +import czsc +import pandas as pd + +assert czsc.WeightBacktest.version == "V231005" + + +def run_by_weights(): + """从持仓权重样例数据中回测""" + dfw = pd.read_feather(r"C:\Users\zengb\Desktop\231005\weight_example.feather") + wb = czsc.WeightBacktest(dfw, digits=1, fee_rate=0.0002) + + # ------------------------------------------------------------------------------------ + # 查看绩效评价 + # ------------------------------------------------------------------------------------ + print(wb.results['绩效评价']) + # {'开始日期': '20170103', + # '结束日期': '20230731', + # '年化': 0.093, # 品种等权之后的年化收益率 + # '夏普': 1.19, # 品种等权之后的夏普比率 + # '最大回撤': 0.1397, # 品种等权之后的最大回撤 + # '卡玛': 0.67, + # '日胜率': 0.5228, # 品种等权之后的日胜率 + # '年化波动率': 0.0782, + # '非零覆盖': 1.0, + # '盈亏平衡点': 0.9782, # 品种等权之后的盈亏平衡点,这个值越小越好,正常策略的范围应该在 0.85~0.98 之间 + # '单笔收益': 25.6, # 将所有品种的单笔汇总之后的平均收益,单位是 BP,即 0.01% + # '交易胜率': 0.3717, # 将所有品种的单笔汇总之后的交易胜率 + # '持仓天数': 3.69, # 将所有品种的单笔汇总之后的平均持仓天数 + # '持仓K线数': 971.66} # 将所有品种的单笔汇总之后的平均持仓 K 线数 + + # ------------------------------------------------------------------------------------ + # 获取指定品种的回测结果 + # ------------------------------------------------------------------------------------ + symbol_res = wb.results[wb.symbols[0]] + print(symbol_res) + + wb.report(res_path=r"C:\Users\zengb\Desktop\231005\weight_example") diff --git a/test/test_bar_generator.py b/test/test_bar_generator.py index 714b42f02..ab0d10a97 100644 --- a/test/test_bar_generator.py +++ b/test/test_bar_generator.py @@ -338,3 +338,33 @@ def test_bg_on_d(): assert len(bg.bars['月线']) == 165 assert len(bg.bars['年线']) == 15 assert bg.bars['月线'][-2].id > bg.bars['月线'][-3].id + + +def test_is_trading_time(): + from datetime import datetime + from czsc.utils.bar_generator import is_trading_time + + # Test for A股 market + assert not is_trading_time(datetime(2022, 1, 3, 9, 30), market="A股") + assert is_trading_time(datetime(2022, 1, 3, 9, 31), market="A股") + assert is_trading_time(datetime(2022, 1, 3, 11, 30), market="A股") + assert not is_trading_time(datetime(2022, 1, 3, 12, 59), market="A股") + assert is_trading_time(datetime(2022, 1, 3, 15, 0), market="A股") + assert not is_trading_time(datetime(2022, 1, 3, 20, 0), market="A股") + + # Test for other markets + assert is_trading_time(datetime(2022, 1, 3, 9, 30), market="期货") + assert not is_trading_time(datetime(2022, 1, 3, 10, 25), market="期货") + assert not is_trading_time(datetime(2022, 1, 3, 12, 59), market="期货") + assert is_trading_time(datetime(2022, 1, 3, 15, 0), market="期货") + assert not is_trading_time(datetime(2022, 1, 3, 20, 0), market="期货") + + +def test_get_intraday_times(): + from czsc.utils.bar_generator import get_intraday_times + + assert get_intraday_times(freq='60分钟', market='A股') == ['10:30', '11:30', '14:00', '15:00'] + assert get_intraday_times(freq='120分钟', market='A股') == ['11:30', '15:00'] + assert get_intraday_times(freq='120分钟', market='期货') == ['11:00', '15:00', '23:00', '01:00', '02:30'] + x = ['02:00', '04:00', '06:00', '08:00', '10:00', '12:00', '14:00', '16:00', '18:00', '20:00', '22:00', '00:00'] + assert get_intraday_times(freq='120分钟', market='默认') == x diff --git a/test/test_utils.py b/test/test_utils.py index 72d3a40ce..0539e1f6d 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -7,6 +7,7 @@ """ import pytest import pandas as pd +import numpy as np from czsc import utils @@ -87,3 +88,27 @@ def test_ranker(): assert dfp['rank'].max() == len(symbols) assert dfp['rank'].min() == 1 assert dfp['rank'].mean() == 2.5 + + +def test_daily_performance(): + from czsc.utils.stats import daily_performance + + # Test case 1: empty daily returns + result = daily_performance([]) + assert result == {"年化": 0, "夏普": 0, "最大回撤": 0, "卡玛": 0, "日胜率": 0, "年化波动率": 0, "非零覆盖": 0, "盈亏平衡点": 0} + + # Test case 2: daily returns with zero standard deviation + result = daily_performance([1, 1, 1, 1, 1]) + assert result == {"年化": 0, "夏普": 0, "最大回撤": 0, "卡玛": 0, "日胜率": 0, "年化波动率": 0, "非零覆盖": 0, "盈亏平衡点": 0} + + # Test case 3: daily returns with all zeros + result = daily_performance([0, 0, 0, 0, 0]) + assert result == {"年化": 0, "夏普": 0, "最大回撤": 0, "卡玛": 0, "日胜率": 0, "年化波动率": 0, "非零覆盖": 0, "盈亏平衡点": 0} + + # Test case 4: normal daily returns + daily_returns = np.array([0.01, 0.02, -0.01, 0.03, 0.02, -0.02, 0.01, -0.01, 0.02, 0.01]) + result = daily_performance(daily_returns) + assert result == {'年化': 2.016, '夏普': 8.27, '最大回撤': 0.02, '卡玛': 100.8, '日胜率': 0.7, '年化波动率': 0.2439, '非零覆盖': 1.0, '盈亏平衡点': 0.7} + + result = daily_performance([0.01, 0.02, -0.01, 0.03, 0.02, -0.02, 0.01, -0.01, 0.02, 0.01]) + assert result == {'年化': 2.016, '夏普': 8.27, '最大回撤': 0.02, '卡玛': 100.8, '日胜率': 0.7, '年化波动率': 0.2439, '非零覆盖': 1.0, '盈亏平衡点': 0.7}