diff --git a/czsc/__init__.py b/czsc/__init__.py index 6e4b5be93..699ce4378 100644 --- a/czsc/__init__.py +++ b/czsc/__init__.py @@ -79,6 +79,7 @@ SignalAnalyzer, SignalPerformance, daily_performance, + rolling_daily_performance, weekly_performance, holds_performance, net_value_stats, @@ -130,6 +131,7 @@ show_out_in_compare, show_optuna_study, show_drawdowns, + show_rolling_daily_performance, ) from czsc.utils.bi_info import ( diff --git a/czsc/utils/__init__.py b/czsc/utils/__init__.py index 3acf6a572..b367b84df 100644 --- a/czsc/utils/__init__.py +++ b/czsc/utils/__init__.py @@ -18,7 +18,15 @@ from .plotly_plot import KlineChart from .trade import cal_trade_price, update_nbars, update_bbars, update_tbars, risk_free_returns, resample_to_daily from .cross import CrossSectionalPerformance, cross_sectional_ranker -from .stats import daily_performance, net_value_stats, subtract_fee, weekly_performance, holds_performance, top_drawdowns +from .stats import ( + daily_performance, + net_value_stats, + subtract_fee, + weekly_performance, + holds_performance, + top_drawdowns, + rolling_daily_performance, +) from .signal_analyzer import SignalAnalyzer, SignalPerformance from .cache import home_path, get_dir_size, empty_cache_path, DiskCache, disk_cache, clear_cache from .index_composition import index_composition @@ -27,8 +35,27 @@ from .optuna import optuna_study, optuna_good_params -sorted_freqs = ['Tick', '1分钟', '2分钟', '3分钟', '4分钟', '5分钟', '6分钟', '10分钟', '12分钟', - '15分钟', '20分钟', '30分钟', '60分钟', '120分钟', '日线', '周线', '月线', '季线', '年线'] +sorted_freqs = [ + "Tick", + "1分钟", + "2分钟", + "3分钟", + "4分钟", + "5分钟", + "6分钟", + "10分钟", + "12分钟", + "15分钟", + "20分钟", + "30分钟", + "60分钟", + "120分钟", + "日线", + "周线", + "月线", + "季线", + "年线", +] def x_round(x: Union[float, int], digit: int = 4) -> Union[float, int]: @@ -56,9 +83,9 @@ def get_py_namespace(file_py: str, keys: list = []) -> dict: :param keys: 指定需要的对象名称 :return: namespace """ - text = open(file_py, 'r', encoding='utf-8').read() - code = compile(text, file_py, 'exec') - namespace = {"file_py": file_py, 'file_name': os.path.basename(file_py).split('.')[0]} + text = open(file_py, "r", encoding="utf-8").read() + code = compile(text, file_py, "exec") + namespace = {"file_py": file_py, "file_name": os.path.basename(file_py).split(".")[0]} exec(code, namespace) if keys: namespace = {k: v for k, v in namespace.items() if k in keys} @@ -82,11 +109,11 @@ def import_by_name(name): :param name: 模块名,如:'czsc.objects.Factor' :return: 模块对象 """ - if '.' not in name: + if "." not in name: return __import__(name) # 从右边开始分割,分割成模块名和函数名 - module_name, function_name = name.rsplit('.', 1) + module_name, function_name = name.rsplit(".", 1) module = __import__(module_name, globals(), locals(), [function_name]) return vars(module)[function_name] @@ -143,11 +170,12 @@ def create_grid_params(prefix: str = "", multiply=3, **kwargs) -> dict: else: key = str(i).zfill(multiply) - row['version'] = f"{prefix}{key}" + row["version"] = f"{prefix}{key}" params[f"{prefix}@{key}"] = row return params def print_df_sample(df, n=5): from tabulate import tabulate - print(tabulate(df.head(n).values, headers=df.columns, tablefmt='rst')) + + print(tabulate(df.head(n).values, headers=df.columns, tablefmt="rst")) diff --git a/czsc/utils/st_components.py b/czsc/utils/st_components.py index 0f5d67d73..6a01c6c50 100644 --- a/czsc/utils/st_components.py +++ b/czsc/utils/st_components.py @@ -10,7 +10,7 @@ from sklearn.linear_model import LinearRegression -def show_daily_return(df, **kwargs): +def show_daily_return(df: pd.DataFrame, **kwargs): """用 streamlit 展示日收益 :param df: pd.DataFrame,数据源 @@ -23,51 +23,51 @@ def show_daily_return(df, **kwargs): - plot_cumsum: bool,是否展示日收益累计曲线,默认为 True """ - if not df.index.dtype == 'datetime64[ns]': - df['dt'] = pd.to_datetime(df['dt']) - df.set_index('dt', inplace=True) + if not df.index.dtype == "datetime64[ns]": + df["dt"] = pd.to_datetime(df["dt"]) + df.set_index("dt", inplace=True) + assert df.index.dtype == "datetime64[ns]", "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" - assert df.index.dtype == 'datetime64[ns]', "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" df = df.copy().fillna(0) df.sort_index(inplace=True, ascending=True) - def _stats(df_, type_='持有日'): + def _stats(df_, type_="持有日"): df_ = df_.copy() stats = [] for col in df_.columns: - if type_ == '持有日': + if type_ == "持有日": col_stats = czsc.daily_performance([x for x in df_[col] if x != 0]) else: - assert type_ == '交易日', "type_ 参数必须是 持有日 或 交易日" + assert type_ == "交易日", "type_ 参数必须是 持有日 或 交易日" col_stats = czsc.daily_performance(df_[col]) - col_stats['日收益名称'] = col + col_stats["日收益名称"] = col stats.append(col_stats) - stats = pd.DataFrame(stats).set_index('日收益名称') - stats = stats.style.background_gradient(cmap='RdYlGn_r', axis=None, subset=['年化']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['绝对收益']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['夏普']) - stats = stats.background_gradient(cmap='RdYlGn', axis=None, subset=['最大回撤']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['卡玛']) - stats = stats.background_gradient(cmap='RdYlGn', axis=None, subset=['年化波动率']) - stats = stats.background_gradient(cmap='RdYlGn', axis=None, subset=['盈亏平衡点']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['日胜率']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['非零覆盖']) - stats = stats.background_gradient(cmap='RdYlGn', axis=None, subset=['新高间隔']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['新高占比']) + stats = pd.DataFrame(stats).set_index("日收益名称") + stats = stats.style.background_gradient(cmap="RdYlGn_r", axis=None, subset=["年化"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["绝对收益"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["夏普"]) + stats = stats.background_gradient(cmap="RdYlGn", axis=None, subset=["最大回撤"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["卡玛"]) + stats = stats.background_gradient(cmap="RdYlGn", axis=None, subset=["年化波动率"]) + stats = stats.background_gradient(cmap="RdYlGn", axis=None, subset=["盈亏平衡点"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["日胜率"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["非零覆盖"]) + stats = stats.background_gradient(cmap="RdYlGn", axis=None, subset=["新高间隔"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["新高占比"]) stats = stats.format( { - '盈亏平衡点': '{:.2f}', - '年化波动率': '{:.2%}', - '最大回撤': '{:.2%}', - '卡玛': '{:.2f}', - '年化': '{:.2%}', - '夏普': '{:.2f}', - '非零覆盖': '{:.2%}', - '绝对收益': '{:.2%}', - '日胜率': '{:.2%}', - '新高间隔': '{:.2f}', - '新高占比': '{:.2%}', + "盈亏平衡点": "{:.2f}", + "年化波动率": "{:.2%}", + "最大回撤": "{:.2%}", + "卡玛": "{:.2f}", + "年化": "{:.2%}", + "夏普": "{:.2f}", + "非零覆盖": "{:.2%}", + "绝对收益": "{:.2%}", + "日胜率": "{:.2%}", + "新高间隔": "{:.2f}", + "新高占比": "{:.2%}", } ) return stats @@ -81,23 +81,23 @@ def _stats(df_, type_='持有日'): with st.expander("交易日绩效指标", expanded=True): if use_st_table: - st.table(_stats(df, type_='交易日')) + st.table(_stats(df, type_="交易日")) else: - st.dataframe(_stats(df, type_='交易日'), use_container_width=True) + st.dataframe(_stats(df, type_="交易日"), use_container_width=True) if kwargs.get("stat_hold_days", True): with st.expander("持有日绩效指标", expanded=False): - st.dataframe(_stats(df, type_='持有日'), use_container_width=True) + st.dataframe(_stats(df, type_="持有日"), use_container_width=True) if kwargs.get("plot_cumsum", True): df = df.cumsum() fig = px.line(df, y=df.columns.to_list(), title="日收益累计曲线") - fig.update_xaxes(title='') + fig.update_xaxes(title="") # 添加每年的开始第一个日期的竖线 for year in range(df.index.year.min(), df.index.year.max() + 1): first_date = df[df.index.year == year].index.min() - fig.add_vline(x=first_date, line_dash='dash', line_color='red') + fig.add_vline(x=first_date, line_dash="dash", line_color="red") for col in kwargs.get("legend_only_cols", []): fig.update_traces(visible="legendonly", selector=dict(name=col)) @@ -106,7 +106,7 @@ def _stats(df_, type_='持有日'): st.plotly_chart(fig, use_container_width=True) -def show_monthly_return(df, ret_col='total', sub_title="月度累计收益", **kwargs): +def show_monthly_return(df, ret_col="total", sub_title="月度累计收益", **kwargs): """展示指定列的月度累计收益 :param df: pd.DataFrame,数据源 @@ -115,33 +115,33 @@ def show_monthly_return(df, ret_col='total', sub_title="月度累计收益", **k :param kwargs: """ assert isinstance(df, pd.DataFrame), "df 必须是 pd.DataFrame 类型" - if not df.index.dtype == 'datetime64[ns]': - df['dt'] = pd.to_datetime(df['dt']) - df.set_index('dt', inplace=True) + if not df.index.dtype == "datetime64[ns]": + df["dt"] = pd.to_datetime(df["dt"]) + df.set_index("dt", inplace=True) - assert df.index.dtype == 'datetime64[ns]', "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" + assert df.index.dtype == "datetime64[ns]", "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" df = df.copy().fillna(0) df.sort_index(inplace=True, ascending=True) if sub_title: st.subheader(sub_title, divider="rainbow") - monthly = df[[ret_col]].resample('ME').sum() - monthly['year'] = monthly.index.year - monthly['month'] = monthly.index.month - monthly = monthly.pivot_table(index='year', columns='month', values=ret_col) + monthly = df[[ret_col]].resample("ME").sum() + monthly["year"] = monthly.index.year + monthly["month"] = monthly.index.month + monthly = monthly.pivot_table(index="year", columns="month", values=ret_col) month_cols = [f"{x}月" for x in monthly.columns] monthly.columns = month_cols - monthly['年收益'] = monthly.sum(axis=1) + monthly["年收益"] = monthly.sum(axis=1) - monthly = monthly.style.background_gradient(cmap='RdYlGn_r', axis=None, subset=month_cols) - monthly = monthly.background_gradient(cmap='RdYlGn_r', axis=None, subset=['年收益']) - monthly = monthly.format('{:.2%}', na_rep='-') + monthly = monthly.style.background_gradient(cmap="RdYlGn_r", axis=None, subset=month_cols) + monthly = monthly.background_gradient(cmap="RdYlGn_r", axis=None, subset=["年收益"]) + monthly = monthly.format("{:.2%}", na_rep="-") st.dataframe(monthly, use_container_width=True) -def show_correlation(df, cols=None, method='pearson', **kwargs): +def show_correlation(df, cols=None, method="pearson", **kwargs): """用 streamlit 展示相关性 :param df: pd.DataFrame,数据源 @@ -155,8 +155,8 @@ def show_correlation(df, cols=None, method='pearson', **kwargs): """ cols = cols or df.columns.to_list() dfr = df[cols].corr(method=method) - dfr['average'] = (dfr.sum(axis=1) - 1) / (len(cols) - 1) - dfr = dfr.style.background_gradient(cmap='RdYlGn_r', axis=None).format('{:.4f}', na_rep='MISS') + dfr["average"] = (dfr.sum(axis=1) - 1) / (len(cols) - 1) + dfr = dfr.style.background_gradient(cmap="RdYlGn_r", axis=None).format("{:.4f}", na_rep="MISS") if kwargs.get("use_st_table", False): st.table(dfr) @@ -164,7 +164,7 @@ def show_correlation(df, cols=None, method='pearson', **kwargs): st.dataframe(dfr, use_container_width=kwargs.get("use_container_width", True)) -def show_sectional_ic(df, x_col, y_col, method='pearson', **kwargs): +def show_sectional_ic(df, x_col, y_col, method="pearson", **kwargs): """使用 streamlit 展示截面IC :param df: pd.DataFrame,数据源 @@ -172,23 +172,23 @@ def show_sectional_ic(df, x_col, y_col, method='pearson', **kwargs): :param y_col: str,收益列名 :param method: str,计算IC的方法,可选 pearson 和 spearman """ - dfc, res = czsc.cross_sectional_ic(df, x_col=x_col, y_col=y_col, dt_col='dt', method=method) + dfc, res = czsc.cross_sectional_ic(df, x_col=x_col, y_col=y_col, dt_col="dt", method=method) col1, col2, col3, col4 = st.columns([1, 1, 1, 5]) - col1.metric("IC均值", res['IC均值']) - col1.metric("IC标准差", res['IC标准差']) - col2.metric("ICIR", res['ICIR']) - col2.metric("IC胜率", res['IC胜率']) - col3.metric("IC绝对值>2%占比", res['IC绝对值>2%占比']) - col3.metric("品种数量", df['symbol'].nunique()) + col1.metric("IC均值", res["IC均值"]) + col1.metric("IC标准差", res["IC标准差"]) + col2.metric("ICIR", res["ICIR"]) + col2.metric("IC胜率", res["IC胜率"]) + col3.metric("IC绝对值>2%占比", res["IC绝对值>2%占比"]) + col3.metric("品种数量", df["symbol"].nunique()) - dfc[['year', 'month']] = dfc.dt.apply(lambda x: pd.Series([x.year, x.month])) - dfm = dfc.groupby(['year', 'month']).agg({'ic': 'mean'}).reset_index() - dfm = pd.pivot_table(dfm, index='year', columns='month', values='ic') + dfc[["year", "month"]] = dfc.dt.apply(lambda x: pd.Series([x.year, x.month])) + dfm = dfc.groupby(["year", "month"]).agg({"ic": "mean"}).reset_index() + dfm = pd.pivot_table(dfm, index="year", columns="month", values="ic") col4.write("月度IC分析结果:") col4.dataframe( - dfm.style.background_gradient(cmap='RdYlGn_r', axis=None).format('{:.4f}', na_rep='MISS'), + dfm.style.background_gradient(cmap="RdYlGn_r", axis=None).format("{:.4f}", na_rep="MISS"), use_container_width=True, ) @@ -204,7 +204,7 @@ def show_factor_returns(df, x_col, y_col): :param x_col: str,因子列名 :param y_col: str,收益列名 """ - assert 'dt' in df.columns, "时间列必须为 dt" + assert "dt" in df.columns, "时间列必须为 dt" res = [] for dt, dfg in df.groupby("dt"): @@ -215,18 +215,18 @@ def show_factor_returns(df, x_col, y_col): res.append([dt, model.coef_[0][0]]) res = pd.DataFrame(res, columns=["dt", "因子收益率"]) - res['dt'] = pd.to_datetime(res['dt']) + res["dt"] = pd.to_datetime(res["dt"]) col1, col2 = st.columns(2) - fig = px.bar(res, x='dt', y="因子收益率", title="因子逐K收益率") + fig = px.bar(res, x="dt", y="因子收益率", title="因子逐K收益率") col1.plotly_chart(fig, use_container_width=True) res["因子累计收益率"] = res["因子收益率"].cumsum() - fig = px.line(res, x='dt', y="因子累计收益率", title="因子累计收益率") + fig = px.line(res, x="dt", y="因子累计收益率", title="因子累计收益率") col2.plotly_chart(fig, use_container_width=True) -def show_factor_layering(df, x_col, y_col='n1b', **kwargs): +def show_factor_layering(df, x_col, y_col="n1b", **kwargs): """使用 streamlit 绘制因子截面分层收益率图 :param df: 因子数据 @@ -245,8 +245,8 @@ def show_factor_layering(df, x_col, y_col='n1b', **kwargs): df = czsc.feture_cross_layering(df, x_col, n=n) - mr = df.groupby(["dt", f'{x_col}分层'])[y_col].mean().reset_index() - mrr = mr.pivot(index='dt', columns=f'{x_col}分层', values=y_col).fillna(0) + mr = df.groupby(["dt", f"{x_col}分层"])[y_col].mean().reset_index() + mrr = mr.pivot(index="dt", columns=f"{x_col}分层", values=y_col).fillna(0) tabs = st.tabs(["分层收益率", "多空组合"]) with tabs[0]: @@ -265,13 +265,13 @@ def show_factor_layering(df, x_col, y_col='n1b', **kwargs): st.stop() dfr = mrr.copy() - dfr['多头'] = dfr[long].mean(axis=1) - dfr['空头'] = -dfr[short].mean(axis=1) - dfr['多空'] = (dfr['多头'] + dfr['空头']) / 2 - czsc.show_daily_return(dfr[['多头', '空头', '多空']]) + dfr["多头"] = dfr[long].mean(axis=1) + dfr["空头"] = -dfr[short].mean(axis=1) + dfr["多空"] = (dfr["多头"] + dfr["空头"]) / 2 + czsc.show_daily_return(dfr[["多头", "空头", "多空"]]) -def show_symbol_factor_layering(df, x_col, y_col='n1b', **kwargs): +def show_symbol_factor_layering(df, x_col, y_col="n1b", **kwargs): """使用 streamlit 绘制单个标的上的因子分层收益率图 :param df: 因子数据,必须包含 dt, x_col, y_col 列,其中 dt 为日期,x_col 为因子值,y_col 为收益率,数据样例: @@ -300,15 +300,15 @@ def show_symbol_factor_layering(df, x_col, y_col='n1b', **kwargs): if df[x_col].nunique() < n * 2: st.error(f"因子值数量小于{n*2},无法进行分层") - if f'{x_col}分层' not in df.columns: + if f"{x_col}分层" not in df.columns: czsc.normalize_ts_feature(df, x_col, n=n) for i in range(n): - df[f'第{str(i+1).zfill(2)}层'] = np.where(df[f'{x_col}分层'] == f'第{str(i+1).zfill(2)}层', df[y_col], 0) + df[f"第{str(i+1).zfill(2)}层"] = np.where(df[f"{x_col}分层"] == f"第{str(i+1).zfill(2)}层", df[y_col], 0) - layering_cols = [f'第{str(i).zfill(2)}层' for i in range(1, n + 1)] - mrr = df[['dt'] + layering_cols].copy() - mrr.set_index('dt', inplace=True) + layering_cols = [f"第{str(i).zfill(2)}层" for i in range(1, n + 1)] + mrr = df[["dt"] + layering_cols].copy() + mrr.set_index("dt", inplace=True) tabs = st.tabs(["分层收益率", "多空组合"]) @@ -320,10 +320,10 @@ def show_symbol_factor_layering(df, x_col, y_col='n1b', **kwargs): long = col1.multiselect("多头组合", layering_cols, default=["第02层"], key="symbol_factor_long") short = col2.multiselect("空头组合", layering_cols, default=["第01层"], key="symbol_factor_short") dfr = mrr.copy() - dfr['多头'] = dfr[long].sum(axis=1) - dfr['空头'] = -dfr[short].sum(axis=1) - dfr['多空'] = dfr['多头'] + dfr['空头'] - show_daily_return(dfr[['多头', '空头', '多空']]) + dfr["多头"] = dfr[long].sum(axis=1) + dfr["空头"] = -dfr[short].sum(axis=1) + dfr["多空"] = dfr["多头"] + dfr["空头"] + show_daily_return(dfr[["多头", "空头", "多空"]]) def show_weight_backtest(dfw, **kwargs): @@ -361,7 +361,7 @@ def show_weight_backtest(dfw, **kwargs): st.stop() wb = czsc.WeightBacktest(dfw, fee_rate=fee / 10000, digits=digits) - stat = wb.results['绩效评价'] + stat = wb.results["绩效评价"] st.divider() c1, c2, c3, c4, c5, c6, c7, c8, c9, c10 = st.columns([1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) @@ -377,34 +377,34 @@ def show_weight_backtest(dfw, **kwargs): c10.metric("多头占比", f"{stat['多头占比']:.2%}") st.divider() - dret = wb.results['品种等权日收益'].copy() + dret = wb.results["品种等权日收益"].copy() dret.index = pd.to_datetime(dret.index) - show_daily_return(dret, legend_only_cols=dfw['symbol'].unique().tolist(), **kwargs) + show_daily_return(dret, legend_only_cols=dfw["symbol"].unique().tolist(), **kwargs) if kwargs.get("show_drawdowns", False): - show_drawdowns(dret, ret_col='total', sub_title="") + show_drawdowns(dret, ret_col="total", sub_title="") if kwargs.get("show_backtest_detail", False): c1, c2 = st.columns([1, 1]) with c1.expander("品种等权日收益", expanded=False): - df_ = wb.results['品种等权日收益'].copy() - st.dataframe(df_.style.background_gradient(cmap='RdYlGn_r').format("{:.2%}"), use_container_width=True) + df_ = wb.results["品种等权日收益"].copy() + st.dataframe(df_.style.background_gradient(cmap="RdYlGn_r").format("{:.2%}"), use_container_width=True) with c2.expander("查看开平交易对", expanded=False): - dfp = pd.concat([v['pairs'] for k, v in wb.results.items() if k in wb.symbols], ignore_index=True) + dfp = pd.concat([v["pairs"] for k, v in wb.results.items() if k in wb.symbols], ignore_index=True) st.dataframe(dfp, use_container_width=True) if kwargs.get("show_splited_daily", False): with st.expander("品种等权日收益分段表现", expanded=False): - show_splited_daily(dret[['total']].copy(), ret_col='total') + show_splited_daily(dret[["total"]].copy(), ret_col="total") if kwargs.get("show_yearly_stats", False): with st.expander("年度绩效指标", expanded=False): - show_yearly_stats(dret, ret_col='total') + show_yearly_stats(dret, ret_col="total") if kwargs.get("show_monthly_return", False): with st.expander("月度累计收益", expanded=False): - show_monthly_return(dret, ret_col='total', sub_title="") + show_monthly_return(dret, ret_col="total", sub_title="") return wb @@ -419,11 +419,11 @@ def show_splited_daily(df, ret_col, **kwargs): sub_title: str, 子标题 """ - if not df.index.dtype == 'datetime64[ns]': - df['dt'] = pd.to_datetime(df['dt']) - df.set_index('dt', inplace=True) + if not df.index.dtype == "datetime64[ns]": + df["dt"] = pd.to_datetime(df["dt"]) + df.set_index("dt", inplace=True) - assert df.index.dtype == 'datetime64[ns]', "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" + assert df.index.dtype == "datetime64[ns]", "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" df = df.copy().fillna(0) df.sort_index(inplace=True, ascending=True) @@ -447,35 +447,47 @@ def show_splited_daily(df, ret_col, **kwargs): for name, sdt in sdt_map.items(): df1 = df.loc[sdt:last_dt].copy() row = czsc.daily_performance(df1[ret_col]) - row['开始日期'] = sdt.strftime('%Y-%m-%d') - row['结束日期'] = last_dt.strftime('%Y-%m-%d') - row['收益名称'] = name + row["开始日期"] = sdt.strftime("%Y-%m-%d") + row["结束日期"] = last_dt.strftime("%Y-%m-%d") + row["收益名称"] = name # row['绝对收益'] = df1[ret_col].sum() rows.append(row) - dfv = pd.DataFrame(rows).set_index('收益名称') - cols = ['开始日期', '结束日期', '绝对收益', '年化', '夏普', '最大回撤', '卡玛', '年化波动率', '非零覆盖', '日胜率', '盈亏平衡点'] + dfv = pd.DataFrame(rows).set_index("收益名称") + cols = [ + "开始日期", + "结束日期", + "绝对收益", + "年化", + "夏普", + "最大回撤", + "卡玛", + "年化波动率", + "非零覆盖", + "日胜率", + "盈亏平衡点", + ] dfv = dfv[cols].copy() - dfv = dfv.style.background_gradient(cmap='RdYlGn_r', subset=['绝对收益']) - dfv = dfv.background_gradient(cmap='RdYlGn_r', subset=['年化']) - dfv = dfv.background_gradient(cmap='RdYlGn_r', subset=['夏普']) - dfv = dfv.background_gradient(cmap='RdYlGn', subset=['最大回撤']) - dfv = dfv.background_gradient(cmap='RdYlGn_r', subset=['卡玛']) - dfv = dfv.background_gradient(cmap='RdYlGn', subset=['年化波动率']) - dfv = dfv.background_gradient(cmap='RdYlGn', subset=['盈亏平衡点']) - dfv = dfv.background_gradient(cmap='RdYlGn_r', subset=['日胜率']) - dfv = dfv.background_gradient(cmap='RdYlGn_r', subset=['非零覆盖']) + dfv = dfv.style.background_gradient(cmap="RdYlGn_r", subset=["绝对收益"]) + dfv = dfv.background_gradient(cmap="RdYlGn_r", subset=["年化"]) + dfv = dfv.background_gradient(cmap="RdYlGn_r", subset=["夏普"]) + dfv = dfv.background_gradient(cmap="RdYlGn", subset=["最大回撤"]) + dfv = dfv.background_gradient(cmap="RdYlGn_r", subset=["卡玛"]) + dfv = dfv.background_gradient(cmap="RdYlGn", subset=["年化波动率"]) + dfv = dfv.background_gradient(cmap="RdYlGn", subset=["盈亏平衡点"]) + dfv = dfv.background_gradient(cmap="RdYlGn_r", subset=["日胜率"]) + dfv = dfv.background_gradient(cmap="RdYlGn_r", subset=["非零覆盖"]) dfv = dfv.format( { - '盈亏平衡点': '{:.2f}', - '年化波动率': '{:.2%}', - '最大回撤': '{:.2%}', - '卡玛': '{:.2f}', - '年化': '{:.2%}', - '夏普': '{:.2f}', - '非零覆盖': '{:.2%}', - '日胜率': '{:.2%}', - '绝对收益': '{:.2%}', + "盈亏平衡点": "{:.2f}", + "年化波动率": "{:.2%}", + "最大回撤": "{:.2%}", + "卡玛": "{:.2f}", + "年化": "{:.2%}", + "夏普": "{:.2f}", + "非零覆盖": "{:.2%}", + "日胜率": "{:.2%}", + "绝对收益": "{:.2%}", } ) st.dataframe(dfv, use_container_width=True) @@ -490,54 +502,54 @@ def show_yearly_stats(df, ret_col, **kwargs): - sub_title: str, 子标题 """ - if not df.index.dtype == 'datetime64[ns]': - df['dt'] = pd.to_datetime(df['dt']) - df.set_index('dt', inplace=True) + if not df.index.dtype == "datetime64[ns]": + df["dt"] = pd.to_datetime(df["dt"]) + df.set_index("dt", inplace=True) - assert df.index.dtype == 'datetime64[ns]', "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" + assert df.index.dtype == "datetime64[ns]", "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" df = df.copy().fillna(0) df.sort_index(inplace=True, ascending=True) - df['年份'] = df.index.year + df["年份"] = df.index.year _stats = [] - for year, df_ in df.groupby('年份'): + for year, df_ in df.groupby("年份"): _yst = czsc.daily_performance(df_[ret_col].to_list()) - _yst['年份'] = year + _yst["年份"] = year _stats.append(_yst) - stats = pd.DataFrame(_stats).set_index('年份') + stats = pd.DataFrame(_stats).set_index("年份") - stats = stats.style.background_gradient(cmap='RdYlGn_r', axis=None, subset=['年化']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['夏普']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['绝对收益']) - stats = stats.background_gradient(cmap='RdYlGn', axis=None, subset=['最大回撤']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['卡玛']) - stats = stats.background_gradient(cmap='RdYlGn', axis=None, subset=['年化波动率']) - stats = stats.background_gradient(cmap='RdYlGn', axis=None, subset=['盈亏平衡点']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['日胜率']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['非零覆盖']) - stats = stats.background_gradient(cmap='RdYlGn', axis=None, subset=['新高间隔']) - stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['新高占比']) + stats = stats.style.background_gradient(cmap="RdYlGn_r", axis=None, subset=["年化"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["夏普"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["绝对收益"]) + stats = stats.background_gradient(cmap="RdYlGn", axis=None, subset=["最大回撤"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["卡玛"]) + stats = stats.background_gradient(cmap="RdYlGn", axis=None, subset=["年化波动率"]) + stats = stats.background_gradient(cmap="RdYlGn", axis=None, subset=["盈亏平衡点"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["日胜率"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["非零覆盖"]) + stats = stats.background_gradient(cmap="RdYlGn", axis=None, subset=["新高间隔"]) + stats = stats.background_gradient(cmap="RdYlGn_r", axis=None, subset=["新高占比"]) stats = stats.format( { - '盈亏平衡点': '{:.2f}', - '年化波动率': '{:.2%}', - '最大回撤': '{:.2%}', - '卡玛': '{:.2f}', - '年化': '{:.2%}', - '夏普': '{:.2f}', - '非零覆盖': '{:.2%}', - '绝对收益': '{:.2%}', - '日胜率': '{:.2%}', - '新高间隔': '{:.2f}', - '新高占比': '{:.2%}', + "盈亏平衡点": "{:.2f}", + "年化波动率": "{:.2%}", + "最大回撤": "{:.2%}", + "卡玛": "{:.2f}", + "年化": "{:.2%}", + "夏普": "{:.2f}", + "非零覆盖": "{:.2%}", + "绝对收益": "{:.2%}", + "日胜率": "{:.2%}", + "新高间隔": "{:.2f}", + "新高占比": "{:.2%}", } ) - if kwargs.get('sub_title'): - st.subheader(kwargs.get('sub_title'), divider="rainbow") + if kwargs.get("sub_title"): + st.subheader(kwargs.get("sub_title"), divider="rainbow") st.dataframe(stats, use_container_width=True) @@ -559,8 +571,8 @@ def show_ts_rolling_corr(df, col1, col2, **kwargs): return if not isinstance(df.index, pd.DatetimeIndex): - df['dt'] = pd.to_datetime(df['dt']) - df = df.set_index('dt') + df["dt"] = pd.to_datetime(df["dt"]) + df = df.set_index("dt") df = df[[col1, col2]].copy() if df.isnull().sum().sum() > 0: @@ -568,22 +580,22 @@ def show_ts_rolling_corr(df, col1, col2, **kwargs): st.error(f"列 {col1} 或 {col2} 中存在缺失值,请先处理缺失值") return - sub_title = kwargs.get('sub_title', None) + sub_title = kwargs.get("sub_title", None) if sub_title: - st.subheader(sub_title, divider="rainbow", anchor=hashlib.md5(sub_title.encode('utf-8')).hexdigest()[:8]) + st.subheader(sub_title, divider="rainbow", anchor=hashlib.md5(sub_title.encode("utf-8")).hexdigest()[:8]) - min_periods = kwargs.get('min_periods', 300) - window = kwargs.get('window', 2000) - corr_method = kwargs.get('corr_method', 'pearson') + min_periods = kwargs.get("min_periods", 300) + window = kwargs.get("window", 2000) + corr_method = kwargs.get("corr_method", "pearson") corr_result = df[col1].rolling(window=window, min_periods=min_periods).corr(df[col2], pairwise=True) corr_result = corr_result.dropna() - corr_result = corr_result.rename('corr') - line = go.Scatter(x=corr_result.index, y=corr_result, mode='lines', name='corr') + corr_result = corr_result.rename("corr") + line = go.Scatter(x=corr_result.index, y=corr_result, mode="lines", name="corr") layout = go.Layout( - title='滑动相关系数', - xaxis=dict(title=''), - yaxis=dict(title='corr'), + title="滑动相关系数", + xaxis=dict(title=""), + yaxis=dict(title="corr"), annotations=[ dict( x=0.0, @@ -607,8 +619,8 @@ def show_ts_self_corr(df, col, **kwargs): :param col: str, df 中的列名 """ if not isinstance(df.index, pd.DatetimeIndex): - df['dt'] = pd.to_datetime(df['dt']) - df = df.set_index('dt') + df["dt"] = pd.to_datetime(df["dt"]) + df = df.set_index("dt") df = df.sort_index(ascending=True) if df[col].isnull().sum() > 0: @@ -620,31 +632,31 @@ def show_ts_self_corr(df, col, **kwargs): with col1: sub_title = f"自相关系数分析({col})" - st.subheader(sub_title, divider="rainbow", anchor=hashlib.md5(sub_title.encode('utf-8')).hexdigest()[:8]) + st.subheader(sub_title, divider="rainbow", anchor=hashlib.md5(sub_title.encode("utf-8")).hexdigest()[:8]) c1, c2, c3 = st.columns([2, 2, 1]) - nlags = int(c1.number_input('最大滞后阶数', value=20, min_value=1, max_value=100, step=1)) - method = c2.selectbox('选择分析方法', ['acf', 'pacf'], index=0) + nlags = int(c1.number_input("最大滞后阶数", value=20, min_value=1, max_value=100, step=1)) + method = c2.selectbox("选择分析方法", ["acf", "pacf"], index=0) - if method == 'acf': - acf_result, conf_int = sm.tsa.acf(df[[col]].copy(), nlags=nlags, alpha=0.05, missing='raise') + if method == "acf": + acf_result, conf_int = sm.tsa.acf(df[[col]].copy(), nlags=nlags, alpha=0.05, missing="raise") else: acf_result, conf_int = sm.tsa.pacf(df[[col]].copy(), nlags=nlags, alpha=0.05) - bar = go.Bar(x=list(range(len(acf_result))), y=acf_result, name='自相关系数') - upper = go.Scatter(x=list(range(len(acf_result))), y=conf_int[:, 1], mode='lines', name='95%置信区间上界') - lower = go.Scatter(x=list(range(len(acf_result))), y=conf_int[:, 0], mode='lines', name='95%置信区间下界') - layout = go.Layout(title=method.upper(), xaxis=dict(title='滞后阶数'), yaxis=dict(title='自相关系数')) + bar = go.Bar(x=list(range(len(acf_result))), y=acf_result, name="自相关系数") + upper = go.Scatter(x=list(range(len(acf_result))), y=conf_int[:, 1], mode="lines", name="95%置信区间上界") + lower = go.Scatter(x=list(range(len(acf_result))), y=conf_int[:, 0], mode="lines", name="95%置信区间下界") + layout = go.Layout(title=method.upper(), xaxis=dict(title="滞后阶数"), yaxis=dict(title="自相关系数")) fig = go.Figure(data=[bar, upper, lower], layout=layout) st.plotly_chart(fig, use_container_width=True) with col2: sub_title = f"滞后N阶滑动相关性({col})" - st.subheader(sub_title, divider="rainbow", anchor=hashlib.md5(sub_title.encode('utf-8')).hexdigest()[:8]) + st.subheader(sub_title, divider="rainbow", anchor=hashlib.md5(sub_title.encode("utf-8")).hexdigest()[:8]) c1, c2, c3, c4 = st.columns(4) - min_periods = int(c1.number_input('最小滑动窗口长度', value=20, min_value=0, step=1)) - window = int(c2.number_input('滑动窗口长度', value=200, step=1)) - corr_method = c3.selectbox('相关系数计算方法', ['pearson', 'kendall', 'spearman']) - n = int(c4.number_input('自相关滞后阶数', value=1, min_value=1, step=1)) + min_periods = int(c1.number_input("最小滑动窗口长度", value=20, min_value=0, step=1)) + window = int(c2.number_input("滑动窗口长度", value=200, step=1)) + corr_method = c3.selectbox("相关系数计算方法", ["pearson", "kendall", "spearman"]) + n = int(c4.number_input("自相关滞后阶数", value=1, min_value=1, step=1)) df[f"{col}_lag{n}"] = df[col].shift(-n) df.dropna(subset=[f"{col}_lag{n}"], inplace=True) @@ -666,23 +678,23 @@ def show_stoploss_by_direction(dfw, **kwargs): :return: None """ dfw = dfw.copy() - stoploss = kwargs.pop('stoploss', 0.08) + stoploss = kwargs.pop("stoploss", 0.08) dfw1 = czsc.stoploss_by_direction(dfw, stoploss=stoploss) # 找出逐笔止损点 rows = [] - for symbol, dfg in dfw1.groupby('symbol'): - for order_id, dfg1 in dfg.groupby('order_id'): - if dfg1['is_stop'].any(): + for symbol, dfg in dfw1.groupby("symbol"): + for order_id, dfg1 in dfg.groupby("order_id"): + if dfg1["is_stop"].any(): row = { - 'symbol': symbol, - 'order_id': order_id, - '交易方向': '多头' if dfg1['weight'].iloc[0] > 0 else '空头', - '开仓时间': dfg1['dt'].iloc[0], - '平仓时间': dfg1['dt'].iloc[-1], - '平仓收益': dfg1['hold_returns'].iloc[-1], - '止损时间': dfg1[dfg1['is_stop']]['dt'].iloc[0], - '止损收益': dfg1[dfg1['is_stop']]['hold_returns'].iloc[0], + "symbol": symbol, + "order_id": order_id, + "交易方向": "多头" if dfg1["weight"].iloc[0] > 0 else "空头", + "开仓时间": dfg1["dt"].iloc[0], + "平仓时间": dfg1["dt"].iloc[-1], + "平仓收益": dfg1["hold_returns"].iloc[-1], + "止损时间": dfg1[dfg1["is_stop"]]["dt"].iloc[0], + "止损收益": dfg1[dfg1["is_stop"]]["hold_returns"].iloc[0], } rows.append(row) dfr = pd.DataFrame(rows) @@ -690,12 +702,23 @@ def show_stoploss_by_direction(dfw, **kwargs): st.dataframe(dfr, use_container_width=True) if kwargs.pop("show_detail", False): - cols = ['dt', 'symbol', 'raw_weight', 'weight', 'price', 'hold_returns', 'min_hold_returns', 'returns', 'order_id', 'is_stop'] - dfs = dfw1[dfw1['is_stop']][cols].copy() + cols = [ + "dt", + "symbol", + "raw_weight", + "weight", + "price", + "hold_returns", + "min_hold_returns", + "returns", + "order_id", + "is_stop", + ] + dfs = dfw1[dfw1["is_stop"]][cols].copy() with st.expander("止损点详情", expanded=False): st.dataframe(dfs, use_container_width=True) - czsc.show_weight_backtest(dfw1[['dt', 'symbol', 'weight', 'price']].copy(), **kwargs) + czsc.show_weight_backtest(dfw1[["dt", "symbol", "weight", "price"]].copy(), **kwargs) def show_cointegration(df, col1, col2, **kwargs): @@ -716,8 +739,8 @@ def show_cointegration(df, col1, col2, **kwargs): return if not isinstance(df.index, pd.DatetimeIndex): - df['dt'] = pd.to_datetime(df['dt']) - df = df.set_index('dt') + df["dt"] = pd.to_datetime(df["dt"]) + df = df.set_index("dt") df = df[[col1, col2]].copy() if df.isnull().sum().sum() > 0: @@ -725,13 +748,14 @@ def show_cointegration(df, col1, col2, **kwargs): st.dataframe(df[df.isnull().sum(axis=1) > 0], use_container_width=True) return - sub_header = kwargs.get('sub_header', '') + sub_header = kwargs.get("sub_header", "") if sub_header: - st.subheader(sub_header, divider='rainbow') + st.subheader(sub_header, divider="rainbow") - if kwargs.get('docs', False): - with st.expander('协整检验原理与使用说明', expanded=False): - st.markdown(""" + if kwargs.get("docs", False): + with st.expander("协整检验原理与使用说明", expanded=False): + st.markdown( + """ ##### 协整检验原理 简而言之:两个不平稳的时间序列,如果它们的线性组合是平稳的,那么它们就是协整的。 平稳的时间序列是指均值和方差不随时间变化的时间序列。而平稳的时间序列便可以用来进行统计分析。 @@ -741,25 +765,30 @@ def show_cointegration(df, col1, col2, **kwargs): 教条式地解释:协整检验p值的含义是两个时间序列**不协整**的概率。一般取临界值5%来判断是否协整,低于5%则可以认为两个时间序列协整。 协整检验原理与使用说明参考链接:[Cointegration](https://en.wikipedia.org/wiki/Cointegration) - """) + """ + ) l1, l2, l3 = st.columns(3) coint_t, pvalue, crit_value = coint(df[col1], df[col2]) l1.metric("协整检验统计量", str(round(coint_t, 3)), help="单位根检验的T统计量。") - l2.metric("协整检验P值(不协整的概率)", f"{pvalue:.2%}", help="两个时间序列不协整的概率,低于5%则可以认为两个时间序列协整。") + l2.metric( + "协整检验P值(不协整的概率)", + f"{pvalue:.2%}", + help="两个时间序列不协整的概率,低于5%则可以认为两个时间序列协整。", + ) fig = px.line(df, x=df.index, y=[col1, col2]) - fig.update_layout(title=f'{col1} 与 {col2} 的曲线图对比', xaxis_title='', yaxis_title='value') + fig.update_layout(title=f"{col1} 与 {col2} 的曲线图对比", xaxis_title="", yaxis_title="value") st.plotly_chart(fig, use_container_width=True) def show_out_in_compare(df, ret_col, mid_dt, **kwargs): """展示样本内外表现对比""" assert isinstance(df, pd.DataFrame), "df 必须是 pd.DataFrame 类型" - if not df.index.dtype == 'datetime64[ns]': - df['dt'] = pd.to_datetime(df['dt']) - df.set_index('dt', inplace=True) + if not df.index.dtype == "datetime64[ns]": + df["dt"] = pd.to_datetime(df["dt"]) + df.set_index("dt", inplace=True) - assert df.index.dtype == 'datetime64[ns]', "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" + assert df.index.dtype == "datetime64[ns]", "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" df = df[[ret_col]].copy().fillna(0) df.sort_index(inplace=True, ascending=True) @@ -768,45 +797,60 @@ def show_out_in_compare(df, ret_col, mid_dt, **kwargs): dfo = df[df.index >= mid_dt].copy() stats_i = czsc.daily_performance(dfi[ret_col].to_list()) - stats_i['标记'] = '样本内' - stats_i['开始日期'] = dfi.index[0].strftime("%Y-%m-%d") - stats_i['结束日期'] = dfi.index[-1].strftime("%Y-%m-%d") + stats_i["标记"] = "样本内" + stats_i["开始日期"] = dfi.index[0].strftime("%Y-%m-%d") + stats_i["结束日期"] = dfi.index[-1].strftime("%Y-%m-%d") stats_o = czsc.daily_performance(dfo[ret_col].to_list()) - stats_o['标记'] = '样本外' - stats_o['开始日期'] = dfo.index[0].strftime("%Y-%m-%d") - stats_o['结束日期'] = dfo.index[-1].strftime("%Y-%m-%d") + stats_o["标记"] = "样本外" + stats_o["开始日期"] = dfo.index[0].strftime("%Y-%m-%d") + stats_o["结束日期"] = dfo.index[-1].strftime("%Y-%m-%d") df_stats = pd.DataFrame([stats_i, stats_o]) - df_stats = df_stats[['标记', '开始日期', '结束日期', '年化', '最大回撤', '夏普', '卡玛', '日胜率', - '年化波动率', '非零覆盖', '盈亏平衡点', '新高间隔', '新高占比']] + df_stats = df_stats[ + [ + "标记", + "开始日期", + "结束日期", + "年化", + "最大回撤", + "夏普", + "卡玛", + "日胜率", + "年化波动率", + "非零覆盖", + "盈亏平衡点", + "新高间隔", + "新高占比", + ] + ] sub_title = kwargs.get("sub_title", "样本内外表现对比") if sub_title: - st.subheader(sub_title, divider='rainbow') - - df_stats = df_stats.style.background_gradient(cmap='RdYlGn_r', subset=['年化']) - df_stats = df_stats.background_gradient(cmap='RdYlGn_r', subset=['夏普']) - df_stats = df_stats.background_gradient(cmap='RdYlGn', subset=['最大回撤']) - df_stats = df_stats.background_gradient(cmap='RdYlGn_r', subset=['卡玛']) - df_stats = df_stats.background_gradient(cmap='RdYlGn', subset=['年化波动率']) - df_stats = df_stats.background_gradient(cmap='RdYlGn', subset=['盈亏平衡点']) - df_stats = df_stats.background_gradient(cmap='RdYlGn_r', subset=['日胜率']) - df_stats = df_stats.background_gradient(cmap='RdYlGn_r', subset=['非零覆盖']) - df_stats = df_stats.background_gradient(cmap='RdYlGn', subset=['新高间隔']) - df_stats = df_stats.background_gradient(cmap='RdYlGn_r', subset=['新高占比']) + st.subheader(sub_title, divider="rainbow") + + df_stats = df_stats.style.background_gradient(cmap="RdYlGn_r", subset=["年化"]) + df_stats = df_stats.background_gradient(cmap="RdYlGn_r", subset=["夏普"]) + df_stats = df_stats.background_gradient(cmap="RdYlGn", subset=["最大回撤"]) + df_stats = df_stats.background_gradient(cmap="RdYlGn_r", subset=["卡玛"]) + df_stats = df_stats.background_gradient(cmap="RdYlGn", subset=["年化波动率"]) + df_stats = df_stats.background_gradient(cmap="RdYlGn", subset=["盈亏平衡点"]) + df_stats = df_stats.background_gradient(cmap="RdYlGn_r", subset=["日胜率"]) + df_stats = df_stats.background_gradient(cmap="RdYlGn_r", subset=["非零覆盖"]) + df_stats = df_stats.background_gradient(cmap="RdYlGn", subset=["新高间隔"]) + df_stats = df_stats.background_gradient(cmap="RdYlGn_r", subset=["新高占比"]) df_stats = df_stats.format( { - '盈亏平衡点': '{:.2f}', - '年化波动率': '{:.2%}', - '最大回撤': '{:.2%}', - '卡玛': '{:.2f}', - '年化': '{:.2%}', - '夏普': '{:.2f}', - '非零覆盖': '{:.2%}', - '日胜率': '{:.2%}', - '新高间隔': '{:.2f}', - '新高占比': '{:.2%}', + "盈亏平衡点": "{:.2f}", + "年化波动率": "{:.2%}", + "最大回撤": "{:.2%}", + "卡玛": "{:.2f}", + "年化": "{:.2%}", + "夏普": "{:.2f}", + "非零覆盖": "{:.2%}", + "日胜率": "{:.2%}", + "新高间隔": "{:.2f}", + "新高占比": "{:.2%}", } ) st.dataframe(df_stats, use_container_width=True, hide_index=True) @@ -834,7 +878,7 @@ def show_optuna_study(study: optuna.Study, **kwargs): return study -def show_drawdowns(df, ret_col, **kwargs): +def show_drawdowns(df: pd.DataFrame, ret_col, **kwargs): """展示最大回撤分析 :param df: pd.DataFrame, columns: cells, index: dates @@ -845,34 +889,41 @@ def show_drawdowns(df, ret_col, **kwargs): - top: int, optional, 默认10, 返回最大回撤的数量 """ - assert isinstance(df, pd.DataFrame), "df 必须是 pd.DataFrame 类型" - if not df.index.dtype == 'datetime64[ns]': - df['dt'] = pd.to_datetime(df['dt']) - df.set_index('dt', inplace=True) - assert df.index.dtype == 'datetime64[ns]', "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" + if not df.index.dtype == "datetime64[ns]": + df["dt"] = pd.to_datetime(df["dt"]) + df.set_index("dt", inplace=True) + assert df.index.dtype == "datetime64[ns]", "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" + df = df[[ret_col]].copy().fillna(0) df.sort_index(inplace=True, ascending=True) - df['cum_ret'] = df[ret_col].cumsum() - df['cum_max'] = df['cum_ret'].cummax() - df['drawdown'] = df['cum_ret'] - df['cum_max'] + df["cum_ret"] = df[ret_col].cumsum() + df["cum_max"] = df["cum_ret"].cummax() + df["drawdown"] = df["cum_ret"] - df["cum_max"] - sub_title = kwargs.get('sub_title', "最大回撤分析") + sub_title = kwargs.get("sub_title", "最大回撤分析") if sub_title: st.subheader(sub_title, divider="rainbow") - top = kwargs.get('top', 10) + top = kwargs.get("top", 10) if top is not None: with st.expander(f"TOP{top} 最大回撤详情", expanded=False): dft = czsc.top_drawdowns(df[ret_col].copy(), top=10) - dft = dft.style.background_gradient(cmap='RdYlGn_r', subset=['净值回撤']) - dft = dft.background_gradient(cmap='RdYlGn', subset=['回撤天数', '恢复天数']) - dft = dft.format({'净值回撤': '{:.2%}', '回撤天数': '{:.0f}', '恢复天数': '{:.0f}'}) + dft = dft.style.background_gradient(cmap="RdYlGn_r", subset=["净值回撤"]) + dft = dft.background_gradient(cmap="RdYlGn", subset=["回撤天数", "恢复天数", "新高间隔"]) + dft = dft.format({"净值回撤": "{:.2%}", "回撤天数": "{:.0f}", "恢复天数": "{:.0f}", "新高间隔": "{:.0f}"}) st.dataframe(dft, use_container_width=True) # 画图: 净值回撤 # 颜色表:https://www.codeeeee.com/color/rgb.html - drawdown = go.Scatter(x=df.index, y=df["drawdown"], fillcolor="salmon", line=dict(color="salmon"), - fill='tozeroy', mode="lines", name="回测曲线") + drawdown = go.Scatter( + x=df.index, + y=df["drawdown"], + fillcolor="salmon", + line=dict(color="salmon"), + fill="tozeroy", + mode="lines", + name="回测曲线", + ) fig = go.Figure(drawdown) # 增加 10% 分位数线,30% 分位数线,50% 分位数线,同时增加文本标记 @@ -886,3 +937,36 @@ def show_drawdowns(df, ret_col, **kwargs): # 限制 绘制高度 fig.update_layout(height=300) st.plotly_chart(fig, use_container_width=True) + + +@st.experimental_fragment +def show_rolling_daily_performance(df, ret_col, **kwargs): + """展示滚动统计数据 + + :param df: pd.DataFrame, 日收益数据,columns=['dt', ret_col] + :param ret_col: str, 收益列名 + :param kwargs: + """ + assert isinstance(df, pd.DataFrame), "df 必须是 pd.DataFrame 类型" + if not df.index.dtype == "datetime64[ns]": + df["dt"] = pd.to_datetime(df["dt"]) + df.set_index("dt", inplace=True) + + assert df.index.dtype == "datetime64[ns]", "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" + df = df[[ret_col]].copy().fillna(0) + df.sort_index(inplace=True, ascending=True) + + sub_title = kwargs.get("sub_title", "滚动日收益绩效") + if sub_title: + st.subheader(sub_title, divider="rainbow") + + c1, c2, c3 = st.columns(3) + window = c1.number_input("滚动窗口(自然日)", value=365 * 3, min_value=365, max_value=3650) + min_periods = c2.number_input("最小样本数", value=365, min_value=100, max_value=3650) + + dfr = czsc.rolling_daily_performance(df, ret_col, window=window, min_periods=min_periods) + dfr["年化波动率/最大回撤"] = dfr["年化波动率"] / dfr["最大回撤"] + cols = [x for x in dfr.columns if x not in ["sdt", "edt"]] + col = c3.selectbox("选择指标", cols, index=cols.index("夏普")) + fig = px.area(dfr, x="edt", y=col, labels={"edt": "", col: col}) + st.plotly_chart(fig, use_container_width=True) diff --git a/czsc/utils/stats.py b/czsc/utils/stats.py index e683af713..90b199152 100644 --- a/czsc/utils/stats.py +++ b/czsc/utils/stats.py @@ -18,8 +18,8 @@ def cal_break_even_point(seq) -> float: """ if sum(seq) < 0: return 1.0 - seq = np.cumsum(sorted(seq)) # type: ignore - return (np.sum(seq < 0) + 1) / len(seq) # type: ignore + seq = np.cumsum(sorted(seq)) # type: ignore + return (np.sum(seq < 0) + 1) / len(seq) # type: ignore def subtract_fee(df, fee=1): @@ -42,23 +42,23 @@ def subtract_fee(df, fee=1): :param fee: 手续费,单位:BP :return: 修改后的DataFrame """ - assert 'dt' in df.columns, 'dt 列必须存在' - assert 'pos' in df.columns, 'pos 列必须存在' - assert all(x in [0, 1, -1] for x in df['pos'].unique()), "pos 列的值必须是 0, 1, -1 中的一个" + assert "dt" in df.columns, "dt 列必须存在" + assert "pos" in df.columns, "pos 列必须存在" + assert all(x in [0, 1, -1] for x in df["pos"].unique()), "pos 列的值必须是 0, 1, -1 中的一个" - if 'n1b' not in df.columns: - assert 'price' in df.columns, '当n1b列不存在时,price 列必须存在' - df['n1b'] = (df['price'].shift(-1) / df['price'] - 1) * 10000 + if "n1b" not in df.columns: + assert "price" in df.columns, "当n1b列不存在时,price 列必须存在" + df["n1b"] = (df["price"].shift(-1) / df["price"] - 1) * 10000 - df['date'] = df['dt'].dt.date - df['edge_pre_fee'] = df['pos'] * df['n1b'] - df['edge_post_fee'] = df['pos'] * df['n1b'] + df["date"] = df["dt"].dt.date + df["edge_pre_fee"] = df["pos"] * df["n1b"] + df["edge_post_fee"] = df["pos"] * df["n1b"] # 扣费规则, 开仓扣费在第一个持仓K线上,平仓扣费在最后一个持仓K线上 - open_pos = (df['pos'].shift() != df['pos']) & (df['pos'] != 0) - exit_pos = (df['pos'].shift(-1) != df['pos']) & (df['pos'] != 0) - df.loc[open_pos, 'edge_post_fee'] = df.loc[open_pos, 'edge_post_fee'] - fee - df.loc[exit_pos, 'edge_post_fee'] = df.loc[exit_pos, 'edge_post_fee'] - fee + open_pos = (df["pos"].shift() != df["pos"]) & (df["pos"] != 0) + exit_pos = (df["pos"].shift(-1) != df["pos"]) & (df["pos"] != 0) + df.loc[open_pos, "edge_post_fee"] = df.loc[open_pos, "edge_post_fee"] - fee + df.loc[exit_pos, "edge_post_fee"] = df.loc[exit_pos, "edge_post_fee"] - fee return df @@ -88,8 +88,19 @@ def daily_performance(daily_returns): daily_returns = np.array(daily_returns, dtype=np.float64) if len(daily_returns) == 0 or np.std(daily_returns) == 0 or all(x == 0 for x in daily_returns): - return {"绝对收益": 0, "年化": 0, "夏普": 0, "最大回撤": 0, "卡玛": 0, "日胜率": 0, - "年化波动率": 0, "非零覆盖": 0, "盈亏平衡点": 0, "新高间隔": 0, "新高占比": 0} + return { + "绝对收益": 0, + "年化": 0, + "夏普": 0, + "最大回撤": 0, + "卡玛": 0, + "日胜率": 0, + "年化波动率": 0, + "非零覆盖": 0, + "盈亏平衡点": 0, + "新高间隔": 0, + "新高占比": 0, + } annual_returns = np.sum(daily_returns) / len(daily_returns) * 252 sharpe_ratio = np.mean(daily_returns) / np.std(daily_returns) * np.sqrt(252) @@ -132,6 +143,36 @@ def __min_max(x, min_val, max_val, digits=4): return sta +def rolling_daily_performance(df: pd.DataFrame, ret_col, window=252, min_periods=100, **kwargs): + """计算滚动日收益 + + :param df: pd.DataFrame, 日收益数据,columns=['dt', ret_col] + :param ret_col: str, 收益列名 + :param window: int, 滚动窗口, 自然天数 + :param min_periods: int, 最小样本数 + :param kwargs: + """ + if not df.index.dtype == "datetime64[ns]": + df["dt"] = pd.to_datetime(df["dt"]) + df.set_index("dt", inplace=True) + assert df.index.dtype == "datetime64[ns]", "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" + + df = df[[ret_col]].copy().fillna(0) + df.sort_index(inplace=True, ascending=True) + dts = sorted(df.index.to_list()) + res = [] + for edt in dts[min_periods:]: + sdt = edt - pd.Timedelta(days=window) + dfg = df[(df.index >= sdt) & (df.index <= edt)].copy() + s = daily_performance(dfg[ret_col].to_list()) + s["sdt"] = sdt + s["edt"] = edt + res.append(s) + + dfr = pd.DataFrame(res) + return dfr + + def weekly_performance(weekly_returns): """采用单利计算周收益数据的各项指标 @@ -142,8 +183,18 @@ def weekly_performance(weekly_returns): weekly_returns = np.array(weekly_returns, dtype=np.float64) if len(weekly_returns) == 0 or np.std(weekly_returns) == 0 or all(x == 0 for x in weekly_returns): - return {"年化": 0, "夏普": 0, "最大回撤": 0, "卡玛": 0, "周胜率": 0, - "年化波动率": 0, "非零覆盖": 0, "盈亏平衡点": 0, "新高间隔": 0, "新高占比": 0} + return { + "年化": 0, + "夏普": 0, + "最大回撤": 0, + "卡玛": 0, + "周胜率": 0, + "年化波动率": 0, + "非零覆盖": 0, + "盈亏平衡点": 0, + "新高间隔": 0, + "新高占比": 0, + } annual_returns = np.sum(weekly_returns) / len(weekly_returns) * 52 sharpe_ratio = np.mean(weekly_returns) / np.std(weekly_returns) * np.sqrt(52) @@ -210,24 +261,24 @@ def net_value_stats(nv: pd.DataFrame, exclude_zero: bool = False, sub_cost=True) :return: """ nv = nv.copy(deep=True) - nv['dt'] = pd.to_datetime(nv['dt']) + nv["dt"] = pd.to_datetime(nv["dt"]) if sub_cost: - assert 'cost' in nv.columns, "成本列cost不存在" - nv['edge'] = nv['edge'] - nv['cost'] + assert "cost" in nv.columns, "成本列cost不存在" + nv["edge"] = nv["edge"] - nv["cost"] else: - if 'cost' not in nv.columns: - nv['cost'] = 0 + if "cost" not in nv.columns: + nv["cost"] = 0 if exclude_zero: - nv = nv[(nv['edge'] != 0) | (nv['cost'] != 0)] + nv = nv[(nv["edge"] != 0) | (nv["cost"] != 0)] # 按日期聚合 - nv['date'] = nv['dt'].apply(lambda x: x.date()) - df_nav = nv.groupby('date')['edge'].sum() / 10000 + nv["date"] = nv["dt"].apply(lambda x: x.date()) + df_nav = nv.groupby("date")["edge"].sum() / 10000 df_nav = df_nav.cumsum() - if all(x == 0 for x in nv['edge']): + if all(x == 0 for x in nv["edge"]): # 处理没有持仓记录的情况 sharp = 0 y_ret = 0 @@ -246,11 +297,16 @@ def net_value_stats(nv: pd.DataFrame, exclude_zero: bool = False, sub_cost=True) calmar = y_ret / mdd if mdd != 0 else 1 prefix = "有持仓时间" if exclude_zero else "" - res = {"夏普": round(sharp, 2), "卡玛": round(calmar, 2), "年化": round(y_ret, 4), "最大回撤": round(mdd, 4)} + res = { + "夏普": round(sharp, 2), + "卡玛": round(calmar, 2), + "年化": round(y_ret, 4), + "最大回撤": round(mdd, 4), + } res = {f"{prefix}{k}": v for k, v in res.items()} if not exclude_zero: - res['持仓覆盖'] = round(len(nv[(nv['edge'] != 0) | (nv['cost'] != 0)]) / len(nv), 4) if len(nv) > 0 else 0 + res["持仓覆盖"] = round(len(nv[(nv["edge"] != 0) | (nv["cost"] != 0)]) / len(nv), 4) if len(nv) > 0 else 0 return res @@ -273,7 +329,12 @@ def evaluate_pairs(pairs: pd.DataFrame, trade_dir: str = "多空") -> dict: :return: 交易表现 """ from czsc.objects import cal_break_even_point - assert trade_dir in ["多头", "空头", "多空"], "trade_dir 参数错误,可选值 ['多头', '空头', '多空']" + + assert trade_dir in [ + "多头", + "空头", + "多空", + ], "trade_dir 参数错误,可选值 ['多头', '空头', '多空']" pairs = pairs.copy() @@ -304,9 +365,9 @@ def evaluate_pairs(pairs: pd.DataFrame, trade_dir: str = "多空") -> dict: if len(pairs) == 0: return p - pairs = pairs.to_dict(orient='records') - p['交易次数'] = len(pairs) - p["盈亏平衡点"] = round(cal_break_even_point([x['盈亏比例'] for x in pairs]), 4) + pairs = pairs.to_dict(orient="records") + p["交易次数"] = len(pairs) + p["盈亏平衡点"] = round(cal_break_even_point([x["盈亏比例"] for x in pairs]), 4) p["累计收益"] = round(sum([x["盈亏比例"] for x in pairs]), 2) p["单笔收益"] = round(p["累计收益"] / p["交易次数"], 2) p["持仓天数"] = round(sum([x["持仓天数"] for x in pairs]) / len(pairs), 2) @@ -344,24 +405,24 @@ def holds_performance(df, **kwargs): :return: pd.DataFrame, columns=['date', 'change', 'edge_pre_fee', 'cost', 'edge_post_fee'] """ - fee = kwargs.get('fee', 15) - digits = kwargs.get('digits', 2) # 保留小数位数 + fee = kwargs.get("fee", 15) + digits = kwargs.get("digits", 2) # 保留小数位数 df = df.copy() - df['weight'] = df['weight'].round(digits) - df = df.sort_values(['dt', 'symbol']).reset_index(drop=True) + df["weight"] = df["weight"].round(digits) + df = df.sort_values(["dt", "symbol"]).reset_index(drop=True) - dft = pd.pivot_table(df, index='dt', columns='symbol', values='weight', aggfunc='sum').fillna(0) + dft = pd.pivot_table(df, index="dt", columns="symbol", values="weight", aggfunc="sum").fillna(0) df_turns = dft.diff().abs().sum(axis=1).reset_index() - df_turns.columns = ['date', 'change'] - sdt = df['dt'].min() - df_turns.loc[(df_turns['date'] == sdt), 'change'] = df[df['dt'] == sdt]['weight'].sum() - - df_edge = df.groupby('dt').apply(lambda x: (x['weight'] * x['n1b']).sum()).reset_index() - df_edge.columns = ['date', 'edge_pre_fee'] - dfr = pd.merge(df_turns, df_edge, on='date', how='left') - dfr['cost'] = dfr['change'] * fee / 10000 # 换手成本 - dfr['edge_post_fee'] = dfr['edge_pre_fee'] - dfr['cost'] # 净收益 + df_turns.columns = ["date", "change"] + sdt = df["dt"].min() + df_turns.loc[(df_turns["date"] == sdt), "change"] = df[df["dt"] == sdt]["weight"].sum() + + df_edge = df.groupby("dt").apply(lambda x: (x["weight"] * x["n1b"]).sum()).reset_index() + df_edge.columns = ["date", "edge_pre_fee"] + dfr = pd.merge(df_turns, df_edge, on="date", how="left") + dfr["cost"] = dfr["change"] * fee / 10000 # 换手成本 + dfr["edge_post_fee"] = dfr["edge_pre_fee"] - dfr["cost"] # 净收益 return dfr @@ -399,6 +460,7 @@ def top_drawdowns(returns: pd.Series, top: int = 10) -> pd.DataFrame: break df_drawdowns = pd.DataFrame(drawdowns, columns=["回撤开始", "回撤结束", "回撤修复", "净值回撤"]) - df_drawdowns['回撤天数'] = (df_drawdowns['回撤结束'] - df_drawdowns['回撤开始']).dt.days - df_drawdowns['恢复天数'] = (df_drawdowns['回撤修复'] - df_drawdowns['回撤结束']).dt.days + df_drawdowns["回撤天数"] = (df_drawdowns["回撤结束"] - df_drawdowns["回撤开始"]).dt.days + df_drawdowns["恢复天数"] = (df_drawdowns["回撤修复"] - df_drawdowns["回撤结束"]).dt.days + df_drawdowns["新高间隔"] = df_drawdowns["回撤天数"] + df_drawdowns["恢复天数"] return df_drawdowns diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000..cd436e9b0 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,17 @@ +[tool.black] +line-length = 120 +exclude = ''' +/( + \.eggs + | \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist + | setup.py +)/ +''' \ No newline at end of file