Skip to content

Commit

Permalink
0.9.53 新增几个优质信号函数
Browse files Browse the repository at this point in the history
  • Loading branch information
zengbin93 committed Jun 13, 2024
1 parent 3d583d5 commit 4250050
Show file tree
Hide file tree
Showing 6 changed files with 371 additions and 19 deletions.
3 changes: 3 additions & 0 deletions czsc/signals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
cxt_decision_V240526,
cxt_bs_V240526,
cxt_bs_V240527,
cxt_overlap_V240612,
)


Expand Down Expand Up @@ -206,6 +207,7 @@
tas_slope_V231019,
tas_macd_bc_V240307,
tas_dma_bs_V240608,
tas_dif_zero_V240612,
)

from czsc.signals.pos import (
Expand All @@ -222,6 +224,7 @@
pos_stop_V240428,
pos_take_V240428,
pos_stop_V240331,
pos_stop_V240608,
)


Expand Down
86 changes: 78 additions & 8 deletions czsc/signals/cxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2485,9 +2485,11 @@ def cxt_bs_V240526(c: CZSC, **kwargs) -> OrderedDict:
slope_seq = [abs(x.slope) for x in bis]

# 如果倒数第二笔的 SNR 小于 0.7,或者倒数第二笔的价格、量比最大值小于前 7 笔的最大值,不考虑信号
if b2.SNR < 0.7 or (b2.power_price < np.max(power_price_seq)
and b2.power_volume < np.max(power_volume_seq)
and b2.slope < np.max(slope_seq)):
if b2.SNR < 0.7 or (
b2.power_price < np.max(power_price_seq)
and b2.power_volume < np.max(power_volume_seq)
and b2.slope < np.max(slope_seq)
):
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)

if b2.direction == Direction.Up and b1.direction == Direction.Down:
Expand All @@ -2502,6 +2504,7 @@ def cxt_bs_V240526(c: CZSC, **kwargs) -> OrderedDict:

return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)


def cxt_bs_V240527(c: CZSC, **kwargs) -> OrderedDict:
"""快速走势之后的减速反弹,形成第反弹买点
Expand Down Expand Up @@ -2538,18 +2541,20 @@ def cxt_bs_V240527(c: CZSC, **kwargs) -> OrderedDict:
slope_seq = [abs(x.slope) for x in bis]

# 如果倒数第二笔的 SNR 小于 0.7,或者倒数第二笔的力度小于前 7 笔的最大值,不考虑信号
if b1.SNR < 0.7 or (b1.power_price < np.max(power_price_seq)
and b1.power_volume < np.max(power_volume_seq)
and b1.slope < np.max(slope_seq)):
if b1.SNR < 0.7 or (
b1.power_price < np.max(power_price_seq)
and b1.power_volume < np.max(power_volume_seq)
and b1.slope < np.max(slope_seq)
):
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)

ubi = c.ubi
if not ubi:
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
if len(ubi['raw_bars']) < 7:
if len(ubi["raw_bars"]) < 7:
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)

ubi_power_price = ubi['high_bar'].high - ubi['low_bar'].low
ubi_power_price = ubi["high_bar"].high - ubi["low_bar"].low
if b1.direction == Direction.Up:
# 买点:倒数第二笔是向上笔,倒数第一笔是向下笔,且倒数第一笔的力度在倒数第二笔的 10% ~ 70% 之间
if 0.1 * b1.power_price < ubi_power_price < 0.7 * b1.power_price:
Expand All @@ -2562,3 +2567,68 @@ def cxt_bs_V240527(c: CZSC, **kwargs) -> OrderedDict:

return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)


def cxt_overlap_V240612(c: CZSC, **kwargs) -> OrderedDict:
"""顺畅笔的顶底分型构建支撑压力位
参数模板:"{freq}_SNR顺畅N{n}_支撑压力V240612"
**信号逻辑:**
前面N笔中走势最顺畅的笔,顶底分型是重要支撑压力位,可以作为决策点。
1. 取最近 N 笔,找出SNR最大的笔 B1;
2. 如果当前笔是向下笔,且向下笔的底分型区间与B1的顶/底分型区间有重合,那么认为当前位置是支撑位;
**信号列表:**
:param c: CZSC对象
:param kwargs: 无
:return: 信号识别结果
"""
n = int(kwargs.get("n", 9))
freq = c.freq.value
k1, k2, k3 = f"{freq}_SNR顺畅N{n}_支撑压力V240612".split("_")
v1 = "其他"
if len(c.bi_list) < n + 2 or len(c.bars_ubi) > 7:
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)

bis = get_sub_elements(c.bi_list, di=3, n=9)
bis = [x for x in bis if len(x.raw_bars) >= 9]
if len(bis) == 0:
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)

max_snr_bi = max(bis, key=lambda x: x.SNR)
last_bi = c.bi_list[-1]

if max_snr_bi.direction == Direction.Down:
fxg = max_snr_bi.fx_a
fxd = max_snr_bi.fx_b
else:
fxg = max_snr_bi.fx_b
fxd = max_snr_bi.fx_a

def is_price_overlap(h1, l1, h2, l2):
"""判断两个价格区间是否有重合"""
return True if max(l1, l2) < min(h1, h2) else False

v2 = "任意"
if last_bi.direction == Direction.Down:
if is_price_overlap(fxg.high, fxg.low, last_bi.fx_b.high, last_bi.fx_b.low):
v1 = "支撑"
v2 = "顺畅笔顶分型"
if is_price_overlap(fxd.high, fxd.low, last_bi.fx_b.high, last_bi.fx_b.low):
v1 = "支撑"
v2 = "顺畅笔底分型"

if last_bi.direction == Direction.Up:
if is_price_overlap(fxg.high, fxg.low, last_bi.fx_b.high, last_bi.fx_b.low):
v1 = "压力"
v2 = "顺畅笔顶分型"

if is_price_overlap(fxd.high, fxd.low, last_bi.fx_b.high, last_bi.fx_b.low):
v1 = "压力"
v2 = "顺畅笔底分型"

return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2)
82 changes: 78 additions & 4 deletions czsc/signals/pos.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,26 +575,26 @@ def pos_holds_V240608(cat: CzscTrader, **kwargs) -> OrderedDict:
op = pos.operates[-1]

# 开仓前W根K线
w_bars = [x for x in c.bars_raw[-200:] if x.dt < op["dt"]][-w:]
w_bars = [x for x in c.bars_raw[-200:] if x.dt <= op["dt"]][-w:]
# 开仓后的K线
a_bars = [x for x in c.bars_raw[-100:] if x.dt > op["dt"]]
unique_prices = [p for x in w_bars for p in [x.high, x.low, x.close, x.open]]
unique_prices = [p for x in c.bars_raw[-200:] for p in [x.high, x.low, x.close, x.open]]
unique_prices = sorted(list(set(unique_prices)))

if op["op"] == Operate.LO:
w_low = min([x.low for x in w_bars]) # 开仓前最低价
a_low = min([x.low for x in a_bars]) # 开仓后最低价
up_prices = [x for x in unique_prices if x > op["price"]] # 成本价上方的价位
# 如果开仓后的最低价低于开仓前的最低价,且当前价比开仓价高 N 个价位,则平仓保本
if len(up_prices) > n and a_low > w_low and cat.latest_price > up_prices[n]:
if len(up_prices) > n and a_low < w_low and cat.latest_price > up_prices[n]:
v1 = "多头保本"

if op["op"] == Operate.SO:
w_high = max([x.high for x in w_bars]) # 开仓前最高价
a_high = max([x.high for x in a_bars]) # 开仓后最高价
down_prices = [x for x in unique_prices if x < op["price"]] # 成本价下方的价位
# 如果开仓后的最高价高于开仓前的最高价,且当前价比开仓价低 N 个价位,则平仓保本
if len(down_prices) > n and a_high < w_high and cat.latest_price < down_prices[-n]:
if len(down_prices) > n and a_high > w_high and cat.latest_price < down_prices[-n]:
v1 = "空头保本"

return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
Expand Down Expand Up @@ -808,3 +808,77 @@ def pos_stop_V240331(cat: CzscTrader, **kwargs) -> OrderedDict:
v1 = "空头止损"

return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)


def pos_stop_V240608(cat: CzscTrader, **kwargs) -> OrderedDict:
"""止损:多头开仓后,最低价跌破前W根K线最低价N个价位,提示止损;空头反之。
参数模板:"{pos_name}_{freq1}W{w}N{n}_止损V240608"
**信号逻辑:**
以多头止损为例,计算过程如下:
1. 从多头开仓点开始,在给定的K线周期 freq1 上计算开仓前 W 个K线的最低价,记为 L1;
2. 计算开仓后的最低价,记为 L2;
3. 如果 L2 < L1 - N 个价位,则提示多头止损信号。
空头止损逻辑同理。
**信号列表:**
- Signal('SMA5多头_15分钟W20N10_止损V240608_多头止损_任意_任意_0')
- Signal('SMA5空头_15分钟W20N10_止损V240608_空头止损_任意_任意_0')
:param cat: CzscTrader对象
:param kwargs: 参数字典
- pos_name: str,开仓信号的名称
- freq1: str,给定的K线周期
- w: int,开仓前W根K线,默认为 20
- n: int,最低价下方N个价位,默认为 10
:return: OrderedDict
"""
pos_name = kwargs["pos_name"]
freq1 = kwargs["freq1"]
w = int(kwargs.get("w", 20)) # 开仓前W根K线
n = int(kwargs.get("n", 10)) # N个价位

k1, k2, k3 = f"{pos_name}_{freq1}W{w}N{n}_止损V240608".split("_")
v1 = "其他"

# 如果没有持仓策略,则不产生信号
if not cat.kas or not hasattr(cat, "positions"):
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)

pos = [x for x in cat.positions if x.name == pos_name][0]
if len(pos.operates) == 0 or pos.operates[-1]["op"] in [Operate.SE, Operate.LE]:
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)

c = cat.kas[freq1]
op = pos.operates[-1]

# 开仓前W根K线
w_bars = [x for x in c.bars_raw[-200:] if x.dt < op["dt"]][-w:]
# 开仓后的K线
a_bars = [x for x in c.bars_raw[-100:] if x.dt > op["dt"]]
unique_prices = [p for x in c.bars_raw[-200:] for p in [x.high, x.low, x.close, x.open]]
unique_prices = sorted(list(set(unique_prices))) # 去重并按升序排列

if op["op"] == Operate.LO:
w_low = min([x.low for x in w_bars]) # 开仓前最低价
a_low = min([x.low for x in a_bars]) # 开仓后最低价
w_low_prices = [x for x in unique_prices if x < w_low] # 开仓前最低价下方的价位,升序排列
# 如果开仓后的最低价低于开仓前的最低价向下的 N 个价位,则提示多头止损
if len(w_low_prices) > n and a_low < w_low_prices[-n]:
v1 = "多头止损"

if op["op"] == Operate.SO:
w_high = max([x.high for x in w_bars])
a_high = max([x.high for x in a_bars])
w_high_prices = [x for x in unique_prices if x > w_high]
if len(w_high_prices) > n and a_high > w_high_prices[n]:
v1 = "空头止损"

return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
63 changes: 56 additions & 7 deletions czsc/signals/tas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3582,15 +3582,15 @@ def tas_dma_bs_V240608(c: CZSC, **kwargs) -> OrderedDict:
- t2: int, 默认10,均线2的周期
:return: 信号识别结果
"""
n = int(kwargs.get('n', 5))
t1 = int(kwargs.get('t1', 5))
t2 = int(kwargs.get('t2', 10))
n = int(kwargs.get("n", 5))
t1 = int(kwargs.get("t1", 5))
t2 = int(kwargs.get("t2", 10))

assert t1 < t2, "均线1的周期必须小于均线2的周期"

freq = c.freq.value
k1, k2, k3 = f"{freq}_N{n}双均线{t1}#{t2}顺势_BS辅助V240608".split('_')
v1 = '其他'
k1, k2, k3 = f"{freq}_N{n}双均线{t1}#{t2}顺势_BS辅助V240608".split("_")
v1 = "其他"
ma1 = update_ma_cache(c, timeperiod=t1)
ma2 = update_ma_cache(c, timeperiod=t2)
if len(c.bars_raw) < 110:
Expand All @@ -3610,14 +3610,63 @@ def tas_dma_bs_V240608(c: CZSC, **kwargs) -> OrderedDict:
ma2_round_high = upper_prices[n] if len(upper_prices) > n else upper_prices[-1]
# 买点:1)上一根K线的最低价小于 ma2_round_high;2)当前K线的最高价大于 ma2_round_high,且收盘价小于 ma2_round_high
if bar1.low < ma2_round_high < bar2.high and bar2.close < ma2_round_high:
v1 = '买点'
v1 = "买点"

elif lower_prices and ma1_value < ma2_value and bar2.cache[ma2] < bar1.cache[ma2]:
# ma2_round_low 是 ma2_value 下方的第 n 个价格
ma2_round_low = lower_prices[-n] if len(lower_prices) > n else lower_prices[0]
# 卖点:1)上一根K线的最高价大于 ma2_round_low;2)当前K线的收盘价大于 ma2_round_low,且收盘价大于 ma2_round_low
if bar1.high > ma2_round_low > bar2.low and bar2.close > ma2_round_low:
v1 = '卖点'
v1 = "卖点"

return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)


def tas_dif_zero_V240612(c: CZSC, **kwargs) -> OrderedDict:
"""DIFF 远离零轴后靠近零轴,形成买卖点
参数模板:"{freq}_DIF靠近零轴T{t}_BS辅助V240612"
**信号逻辑:**
买点的定位以DIF为主,要求如下。
1,取最近一个向下笔的底分型中的DIFF的最小值
2. 如果这个最小值在零轴的一个0.5倍标准差范围,那么就认为这个最小值是一个有效的买点
飞书文档:https://s0cqcxuy3p.feishu.cn/wiki/R9Y5w1w3Qi1jsHkzSyLcjoVWnld
**信号列表:**
- Signal('60分钟_DIF靠近零轴T50_BS辅助V240612_买点_任意_任意_0')
- Signal('60分钟_DIF靠近零轴T50_BS辅助V240612_卖点_任意_任意_0')
:param c: CZSC对象
:param kwargs: 无
- t: DIF波动率的倍数,除以100,默认为50
:return: 信号识别结果
"""
t = int(kwargs.get("t", 50)) # 波动率的倍数,除以100

freq = c.freq.value
k1, k2, k3 = f"{freq}_DIF靠近零轴T{t}_BS辅助V240612".split("_")
v1 = "其他"
key = update_macd_cache(c)
if len(c.bars_raw) < 110 or len(c.bars_ubi) > 7:
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)

bi = c.bi_list[-1]
if len(bi.raw_bars) < 7:
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)

diffs = [x.cache[key]["dif"] for x in bi.raw_bars]
delta = np.std(diffs) * t / 100

if bi.direction == Direction.Down and delta > np.min(diffs) > -delta:
v1 = "买点"

if bi.direction == Direction.Up and -delta < np.max(diffs) < delta:
v1 = "卖点"

return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
Loading

0 comments on commit 4250050

Please sign in to comment.