diff --git a/czsc/signals/__init__.py b/czsc/signals/__init__.py index a99e74cf7..28417dbde 100644 --- a/czsc/signals/__init__.py +++ b/czsc/signals/__init__.py @@ -45,6 +45,7 @@ cxt_bi_end_V230815, cxt_bi_stop_V230815, cxt_bi_trend_V230824, + cxt_bi_trend_V230913, ) diff --git a/czsc/signals/cxt.py b/czsc/signals/cxt.py index 5ee8a56fd..32e138bb1 100644 --- a/czsc/signals/cxt.py +++ b/czsc/signals/cxt.py @@ -16,7 +16,7 @@ from czsc.signals.tas import update_ma_cache, update_macd_cache from collections import OrderedDict from deprecated import deprecated - +from sklearn.linear_model import LinearRegression def cxt_bi_base_V230228(c: CZSC, **kwargs) -> OrderedDict: @@ -258,7 +258,7 @@ def cxt_zhong_shu_gong_zhen_V221221(cat: CzscSignals, freq1='日线', freq2='60 if not cat.kas or freq1 not in cat.kas or freq2 not in cat.kas: return create_single_signal(k1=k1, k2=k2, k3=k3, v1="其他") - + max_freq: CZSC = cat.kas[freq1] min_freq: CZSC = cat.kas[freq2] symbol = cat.symbol @@ -1166,7 +1166,7 @@ def cxt_bi_end_V230618(c: CZSC, **kwargs) -> OrderedDict: 类似 cxt_third_bs_V230318 信号,但增加了笔内有无小级别中枢的判断。用k线重叠来近似小级别中枢的判断 :param c: CZSC对象 - :param kwargs: + :param kwargs: - di: int, 默认1,表示取倒数第几笔 - max_overlap: int, 默认3,表示笔内最多允许有几个信号重叠 @@ -1268,7 +1268,7 @@ def cxt_three_bi_V230618(c: CZSC, **kwargs) -> OrderedDict: :param kwargs: - di: 倒数第几笔 - + :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) @@ -1335,7 +1335,7 @@ def cxt_five_bi_V230619(c: CZSC, **kwargs) -> OrderedDict: :param kwargs: - di: 倒数第几笔 - + :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) @@ -1424,7 +1424,7 @@ def cxt_seven_bi_V230620(c: CZSC, **kwargs) -> OrderedDict: :param kwargs: - di: 倒数第几笔 - + :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) @@ -1533,7 +1533,7 @@ def cxt_nine_bi_V230621(c: CZSC, **kwargs) -> OrderedDict: :param kwargs: - di: 倒数第几笔 - + :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) @@ -1595,7 +1595,7 @@ def cxt_nine_bi_V230621(c: CZSC, **kwargs) -> OrderedDict: > min([x.high for x in [bi3, bi5, bi7]]) \ > max([x.low for x in [bi3, bi5, bi7]]) > bi1.low == min_low: return create_single_signal(k1=k1, k2=k2, k3=k3, v1='类三买B') - + if min_low == bi5.low and max_high == bi1.high and bi4.high < bi2.low: # 前五笔构成向下类趋势 zd = max([x.low for x in [bi5, bi7]]) zg = min([x.high for x in [bi5, bi7]]) @@ -1626,18 +1626,18 @@ def cxt_nine_bi_V230621(c: CZSC, **kwargs) -> OrderedDict: if bi8.low > min(bi2.high, bi4.high, bi6.high) > max(bi2.low, bi4.low, bi6.low) \ and bi9.power < bi7.power: return create_single_signal(k1=k1, k2=k2, k3=k3, v1='aAbcd式类一卖') - + # ABC式类一卖 if bi3.high > bi1.high and bi7.low < bi9.low \ and min(bi4.high, bi6.high) > max(bi4.low, bi6.low) \ and (bi3.high - bi1.low) > (bi9.high - bi7.low): return create_single_signal(k1=k1, k2=k2, k3=k3, v1='ABC式类一卖') - + # 类趋势一卖 if bi8.low > bi6.high > bi6.low > bi4.high > bi4.low > bi2.high \ and bi9.power < max([bi1.power, bi3.power, bi5.power, bi7.power]): return create_single_signal(k1=k1, k2=k2, k3=k3, v1='类趋势一卖') - + # 九笔三卖 if max_high == bi1.high and min_low == bi9.low \ and bi9.high < max([x.low for x in [bi3, bi5, bi7]]) < min([x.high for x in [bi3, bi5, bi7]]): @@ -1678,7 +1678,7 @@ def cxt_eleven_bi_V230622(c: CZSC, **kwargs) -> OrderedDict: :param kwargs: - di: 倒数第几笔 - + :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) @@ -1758,12 +1758,12 @@ def cxt_eleven_bi_V230622(c: CZSC, **kwargs) -> OrderedDict: if bi1.high < bi3.high and min(bi4.high, bi6.high, bi8.high) > max(bi4.low, bi6.low, bi8.low) \ and bi9.low < bi11.low and bi3.high - bi1.low > bi11.high - bi9.low: return create_single_signal(k1=k1, k2=k2, k3=k3, v1='A3B5C3式类一卖') - + # 类二卖:1~9构成类趋势,11不创新高 if max_high == bi9.high > bi8.low > bi6.high > bi6.low > bi4.high > bi4.low > bi2.high > bi1.low == min_low \ and bi11.high < bi9.high: return create_single_signal(k1=k1, k2=k2, k3=k3, v1='类二卖') - + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) @@ -1865,7 +1865,7 @@ def cxt_intraday_V230701(cat: CzscSignals, **kwargs) -> OrderedDict: v1 = "其他" if not cat.kas or freq1 not in cat.kas.keys() or freq2 not in cat.kas.keys(): return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) - + c1, c2 = cat.kas[freq1], cat.kas[freq2] day = c2.bars_raw[-di].dt.date() bars = [x for x in c1.bars_raw if x.dt.date() == day] @@ -1877,13 +1877,13 @@ def cxt_intraday_V230701(cat: CzscSignals, **kwargs) -> OrderedDict: for b1, b2, b3 in zip(bars[:-2], bars[1:-1], bars[2:]): if min(b1.high, b2.high, b3.high) >= max(b1.low, b2.low, b3.low): zs_list.append([b1, b2, b3]) - + _dir = "上涨" if bars[-1].close > bars[0].open else "下跌" if not zs_list: v1 = f"无中枢{_dir}" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) - + # 双中枢的情况,有一根K线的 high low 与前后两个中枢没有重叠 if len(zs_list) >= 2: zs1, zs2 = zs_list[0], zs_list[-1] @@ -1892,11 +1892,11 @@ def cxt_intraday_V230701(cat: CzscSignals, **kwargs) -> OrderedDict: if _dir == "上涨" and zs1_high < zs2_low: # type: ignore v1 = f"双中枢{_dir}" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) - + if _dir == "下跌" and zs1_low > zs2_high: # type: ignore v1 = f"双中枢{_dir}" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) - + # 单中枢的情况,前三根K线出现高点:弱平衡市,前三根K线出现低点:强平衡市,否则:转折平衡市 high_first = max(bars[0].high, bars[1].high, bars[2].high) == max([x.high for x in bars]) low_first = min(bars[0].low, bars[1].low, bars[2].low) == min([x.low for x in bars]) @@ -1906,7 +1906,7 @@ def cxt_intraday_V230701(cat: CzscSignals, **kwargs) -> OrderedDict: v1 = "强平衡市" else: v1 = "转折平衡市" - + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) @@ -1957,7 +1957,7 @@ def cxt_ubi_end_V230816(c: CZSC, **kwargs) -> OrderedDict: if ubi['raw_bars'][-1].high > cur_hfx.high: v1 = '新高' v2 = f"第{cnt + 1}次" - + if ubi['direction'] == Direction.Down: fxs = [x for x in fxs if x.mark == Mark.D] cnt = 1 @@ -1997,7 +1997,7 @@ def cxt_bi_end_V230815(c: CZSC, **kwargs) -> OrderedDict: v1 = '其他' if len(c.bi_list) < 5 or len(c.bars_ubi) >= 5: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) - + bi, last_bar = c.bi_list[-1], c.bars_ubi[-1] if bi.direction == Direction.Up and last_bar.low < bi.low: v1 = '向下' @@ -2023,7 +2023,7 @@ def cxt_bi_stop_V230815(c: CZSC, **kwargs) -> OrderedDict: - Signal('15分钟_距离50BP_止损V230815_向上_阈值外_任意_0') :param c: CZSC对象 - :param kwargs: + :param kwargs: - th: 止损距离阈值,单位为BP, 默认为50BP, 即0.5% @@ -2035,7 +2035,7 @@ def cxt_bi_stop_V230815(c: CZSC, **kwargs) -> OrderedDict: v1, v2 = '其他', '其他' if len(c.bi_list) < 5: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) - + bi, last_bar = c.bi_list[-1], c.bars_ubi[-1] if bi.direction == Direction.Up: v1 = '向下' @@ -2096,3 +2096,85 @@ def cxt_bi_trend_V230824(c: CZSC, **kwargs) -> OrderedDict: v1 = "横盘" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + +def cxt_bi_trend_V230913(c: CZSC, **kwargs) -> OrderedDict: + """辅助判断股票通道信号,贡献者:马鸣 + + 参数模板:"{freq}_D{di}N{n}笔趋势_高低点辅助判断V230913" + + **信号逻辑:** + + 1. 倒数di笔之间的高低点形成趋势线,根据股价的当前位置,推断趋势强弱 + + **信号列表:** + + - Signal('日线_D3N1笔趋势_高低点辅助判断V230913_下降趋势_超强_任意_0') + - Signal('日线_D3N1笔趋势_高低点辅助判断V230913_观望_末笔延伸_任意_0') + - Signal('日线_D3N1笔趋势_高低点辅助判断V230913_上升趋势_强_任意_0') + - Signal('日线_D3N1笔趋势_高低点辅助判断V230913_观望_趋势线交叉_任意_0') + - Signal('日线_D3N1笔趋势_高低点辅助判断V230913_下降趋势_强_任意_0') + - Signal('日线_D3N1笔趋势_高低点辅助判断V230913_上升趋势_超强_任意_0') + + :param c: CZSC对象 + :param kwargs: 参数字典 + + -:param di: 倒数di笔 + -:param n: 倒数第n根K线 + + :return: 信号识别结果 + """ + di = int(kwargs.get("di", 4)) + n = int(kwargs.get("n", 1)) + freq = c.freq.value + k1, k2, k3 = f"{freq}_D{di}N{n}笔趋势_高低点辅助判断V230913".split('_') + v1 = "其他" + if len(c.bi_list) <= di + 2 or len(c.bars_ubi) <= n + 1: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + up_trend_price = np.array([x.high for x in c.bi_list if x.direction == Direction.Up][-di:]) + up_trend_time = np.array([x.edt.timestamp() for x in c.bi_list if x.direction == Direction.Up][-di:]).reshape(-1, 1) + + down_trend_price = np.array([x.low for x in c.bi_list if x.direction == Direction.Down][-di:]) + down_trend_time = np.array([x.edt.timestamp() for x in c.bi_list if x.direction == Direction.Down][-di:]).reshape(-1, 1) + + model_up = LinearRegression() + model_down = LinearRegression() + model_up.fit(up_trend_time, up_trend_price) + model_down.fit(down_trend_time, down_trend_price) + + new_bar_data = np.array([c.bars_ubi[-n].dt.timestamp()]).reshape(-1, 1) + pre_up_price = model_up.predict(new_bar_data) + pre_down_price = model_down.predict(new_bar_data) + pre_mid_price = (pre_up_price + pre_down_price) / 2 + + if pre_up_price <= pre_down_price: + v1 = "观望" + v2 = '趋势线交叉' + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + if len(c.bars_ubi) >= 5: + v1 = "观望" + v2 = '末笔延伸' + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + if c.bars_raw[-n].close >= pre_up_price: + v1 = '上升趋势' + v2 = '超强' + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + elif pre_mid_price < c.bars_raw[-n].close < pre_up_price: + v1 = '上升趋势' + v2 = '强' + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + elif pre_down_price < c.bars_raw[-n].close < pre_mid_price: + v1 = '下降趋势' + v2 = '强' + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + elif c.bars_raw[-n].close <= pre_down_price: + v1 = '下降趋势' + v2 = '超强' + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) diff --git a/examples/signals_dev/signal_match.py b/examples/signals_dev/signal_match.py index 0d595f4db..9132efa6d 100644 --- a/examples/signals_dev/signal_match.py +++ b/examples/signals_dev/signal_match.py @@ -3,7 +3,7 @@ author: zengbin93 email: zeng_bin8888@163.com create_dt: 2023/3/30 12:08 -describe: +describe: """ import os import sys @@ -44,8 +44,8 @@ conf = sp.parse(signals_seq) parsed_name = {x['name'] for x in conf} print(f"total signal functions: {len(sp.sig_name_map)}; parsed: {len(parsed_name)}") - # total signal functions: 182; parsed: 182 - + # total signal functions: 197; parsed: 197 + # 测试信号配置生成信号 from czsc import generate_czsc_signals, get_signals_freqs, get_signals_config from test.test_analyze import read_1min