diff --git a/czsc/__init__.py b/czsc/__init__.py index 89a234871..2c63d06fb 100644 --- a/czsc/__init__.py +++ b/czsc/__init__.py @@ -215,6 +215,9 @@ cal_symbols_factor, weights_simple_ensemble, unify_weights, + sma_long_bear, + dif_long_bear, + tsf_type, ) diff --git a/czsc/eda.py b/czsc/eda.py index 2c544113b..32264ccac 100644 --- a/czsc/eda.py +++ b/czsc/eda.py @@ -416,3 +416,92 @@ def unify_weights(dfw: pd.DataFrame, **kwargs): return dfw +def sma_long_bear(df: pd.DataFrame, **kwargs): + """均线牛熊分界指标过滤持仓,close 在长期均线上方为牛市,下方为熊市 + + 牛市只做多,熊市只做空。 + + :param df: DataFrame, 必须包含 dt, close, symbol, weight 列 + :return: DataFrame + """ + assert df["symbol"].nunique() == 1, "数据中包含多个品种,必须单品种" + assert df["dt"].is_monotonic_increasing, "数据未按日期排序,必须升序排列" + assert df["dt"].is_unique, "数据中存在重复dt,必须唯一" + + window = kwargs.get("window", 200) + + if kwargs.get("copy", True): + df = df.copy() + + df['SMA_LB'] = df['close'].rolling(window).mean() + df['raw_weight'] = df['weight'] + df['weight'] = np.where(np.sign(df['close'] - df['SMA_LB']) == np.sign(df['weight']), df['weight'], 0) + return df + + +def dif_long_bear(df: pd.DataFrame, **kwargs): + """DIF牛熊分界指标过滤持仓,DIF 在0上方为牛市,下方为熊市 + + 牛市只做多,熊市只做空。 + + :param df: DataFrame, 必须包含 dt, close, symbol, weight 列 + :return: DataFrame + """ + from czsc.utils.ta import MACD + + assert df["symbol"].nunique() == 1, "数据中包含多个品种,必须单品种" + assert df["dt"].is_monotonic_increasing, "数据未按日期排序,必须升序排列" + assert df["dt"].is_unique, "数据中存在重复dt,必须唯一" + + if kwargs.get("copy", True): + df = df.copy() + + df['DIF_LB'], _, _ = MACD(df['close']) + df['raw_weight'] = df['weight'] + df['weight'] = np.where(np.sign(df['DIF_LB']) == np.sign(df['weight']), df['weight'], 0) + return df + + +def tsf_type(df: pd.DataFrame, factor, n=5, **kwargs): + """时序因子的类型定性分析 + + tsf 是 time series factor 的缩写,时序因子的类型定性分析,是指对某个时序因子进行分层,然后计算每个分层的平均收益, + + :param df: pd.DataFrame, 必须包含 dt, symbol, factor, price 列,其中 dt 为日期,symbol 为标的代码,factor 为因子值 + :param factor: str, 因子列名 + :param n: int, 分层数量 + :param kwargs: + + - window: int, 窗口大小,默认为600 + - min_periods: int, 最小样本数量,默认为300 + + :return: str, 返回分层收益排序(从大到小)结果,例如:第01层->第02层->第03层->第04层->第05层 + """ + window = kwargs.get("window", 600) + min_periods = kwargs.get("min_periods", 300) + target = kwargs.get("target", "n1b") + assert target in df.columns, f"数据中不存在 {target} 列" + assert factor in df.columns, f"数据中不存在 {factor} 列" + + if target == 'n1b' and 'n1b' not in df.columns: + from czsc.utils.trade import update_nxb + df = update_nxb(df, nseq=(1,)) + + rows = [] + for symbol, dfg in df.groupby("symbol"): + dfg = dfg.copy().reset_index(drop=True) + dfg = rolling_layers(dfg, factor, n=n, window=window, min_periods=min_periods) + rows.append(dfg) + + df = pd.concat(rows, ignore_index=True) + layers = [x for x in df[f"{factor}分层"].unique() if x != "第00层" and str(x).endswith("层")] + + # 计算每个分层的平均收益 + layer_returns = {} + for layer in layers: + dfg = df[df[f"{factor}分层"] == layer].copy() + dfg = dfg.groupby("dt")[target].mean().reset_index() + layer_returns[layer] = dfg[target].sum() + + sorted_layers = sorted(layer_returns.items(), key=lambda x: x[1], reverse=True) + return "->".join([f"{x[0]}" for x in sorted_layers])