From 0d1db73ac2e6b32866f8a8fddf0338c45085f0a7 Mon Sep 17 00:00:00 2001 From: zengbin93 Date: Fri, 6 Oct 2023 23:44:59 +0800 Subject: [PATCH] =?UTF-8?q?0.9.30=20RedisWeightsClient=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=9D=83=E9=99=90=E7=AE=A1=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- czsc/traders/rwc.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/czsc/traders/rwc.py b/czsc/traders/rwc.py index a1092cb8f..3221f71e3 100644 --- a/czsc/traders/rwc.py +++ b/czsc/traders/rwc.py @@ -17,7 +17,7 @@ class RedisWeightsClient: """策略持仓权重收发客户端""" - version = "V231005" + version = "V231006" def __init__(self, strategy_name, redis_url, **kwargs): """ @@ -41,6 +41,7 @@ def __init__(self, strategy_name, redis_url, **kwargs): """ self.strategy_name = strategy_name self.redis_url = redis_url + self.key_prefix = kwargs.get("key_prefix", "Weights") self.heartbeat_client = redis.from_url(redis_url, decode_responses=True) self.heartbeat_prefix = kwargs.get("heartbeat_prefix", "heartbeat") @@ -49,13 +50,13 @@ def __init__(self, strategy_name, redis_url, **kwargs): self.r = redis.Redis(connection_pool=thread_safe_pool) self.lua_publish = RedisWeightsClient.register_lua_publish(self.r) - self.heartbeat_thread = threading.Thread(target=self.__heartbeat, daemon=True) - self.heartbeat_thread.start() - self.key_prefix = kwargs.get("key_prefix", "Weights") + if kwargs.get('send_heartbeat', True): + self.heartbeat_thread = threading.Thread(target=self.__heartbeat, daemon=True) + self.heartbeat_thread.start() def set_metadata(self, base_freq, description, author, outsample_sdt, **kwargs): """设置策略元数据""" - key = f'{self.strategy_name}:meta' + key = f'{self.key_prefix}:META:{self.strategy_name}' if self.r.exists(key): if not kwargs.pop('overwrite', False): logger.warning(f'已存在 {self.strategy_name} 的元数据,如需覆盖请设置 overwrite=True') @@ -74,7 +75,8 @@ def set_metadata(self, base_freq, description, author, outsample_sdt, **kwargs): @property def metadata(self): """获取策略元数据""" - return self.r.hgetall(f'{self.strategy_name}:meta') + key = f'{self.key_prefix}:META:{self.strategy_name}' + return self.r.hgetall(key) def publish(self, symbol, dt, weight, price=0, ref=None, overwrite=False): """发布单个策略持仓权重 @@ -142,7 +144,7 @@ def publish_dataframe(self, df, overwrite=False, batch_size=10000): def __heartbeat(self): while True: - key = f'{self.heartbeat_prefix}:{self.strategy_name}' + key = f'{self.key_prefix}:{self.heartbeat_prefix}:{self.strategy_name}' try: self.heartbeat_client.set(key, datetime.now().strftime('%Y-%m-%d %H:%M:%S')) except Exception: @@ -150,12 +152,12 @@ def __heartbeat(self): time.sleep(15) def get_keys(self, pattern): - """使用 lua 获取 redis 中指定 pattern 的 keys""" - return self.r.eval('''local pattern = ARGV[1]\nreturn redis.call('KEYS', pattern)''', 0, pattern) + """获取 redis 中指定 pattern 的 keys""" + return self.r.keys(pattern) def clear_all(self): """删除该策略所有记录""" - self.r.delete(f"{self.strategy_name}:meta") + self.r.delete(f'{self.key_prefix}:META:{self.strategy_name}') keys = self.get_keys(f'{self.key_prefix}:{self.strategy_name}*') if keys is not None and len(keys) > 0: self.r.delete(*keys)