Skip to content

Commit

Permalink
0.9.30 RedisWeightsClient 支持权限管控
Browse files Browse the repository at this point in the history
  • Loading branch information
zengbin93 committed Oct 6, 2023
1 parent dab091f commit 0d1db73
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions czsc/traders/rwc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
class RedisWeightsClient:
"""策略持仓权重收发客户端"""

version = "V231005"
version = "V231006"

def __init__(self, strategy_name, redis_url, **kwargs):
"""
Expand All @@ -41,6 +41,7 @@ def __init__(self, strategy_name, redis_url, **kwargs):
"""
self.strategy_name = strategy_name
self.redis_url = redis_url
self.key_prefix = kwargs.get("key_prefix", "Weights")

self.heartbeat_client = redis.from_url(redis_url, decode_responses=True)
self.heartbeat_prefix = kwargs.get("heartbeat_prefix", "heartbeat")
Expand All @@ -49,13 +50,13 @@ def __init__(self, strategy_name, redis_url, **kwargs):
self.r = redis.Redis(connection_pool=thread_safe_pool)
self.lua_publish = RedisWeightsClient.register_lua_publish(self.r)

self.heartbeat_thread = threading.Thread(target=self.__heartbeat, daemon=True)
self.heartbeat_thread.start()
self.key_prefix = kwargs.get("key_prefix", "Weights")
if kwargs.get('send_heartbeat', True):
self.heartbeat_thread = threading.Thread(target=self.__heartbeat, daemon=True)
self.heartbeat_thread.start()

def set_metadata(self, base_freq, description, author, outsample_sdt, **kwargs):
"""设置策略元数据"""
key = f'{self.strategy_name}:meta'
key = f'{self.key_prefix}:META:{self.strategy_name}'
if self.r.exists(key):
if not kwargs.pop('overwrite', False):
logger.warning(f'已存在 {self.strategy_name} 的元数据,如需覆盖请设置 overwrite=True')
Expand All @@ -74,7 +75,8 @@ def set_metadata(self, base_freq, description, author, outsample_sdt, **kwargs):
@property
def metadata(self):
"""获取策略元数据"""
return self.r.hgetall(f'{self.strategy_name}:meta')
key = f'{self.key_prefix}:META:{self.strategy_name}'
return self.r.hgetall(key)

def publish(self, symbol, dt, weight, price=0, ref=None, overwrite=False):
"""发布单个策略持仓权重
Expand Down Expand Up @@ -142,20 +144,20 @@ def publish_dataframe(self, df, overwrite=False, batch_size=10000):

def __heartbeat(self):
while True:
key = f'{self.heartbeat_prefix}:{self.strategy_name}'
key = f'{self.key_prefix}:{self.heartbeat_prefix}:{self.strategy_name}'
try:
self.heartbeat_client.set(key, datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
except Exception:
continue
time.sleep(15)

def get_keys(self, pattern):
"""使用 lua 获取 redis 中指定 pattern 的 keys"""
return self.r.eval('''local pattern = ARGV[1]\nreturn redis.call('KEYS', pattern)''', 0, pattern)
"""获取 redis 中指定 pattern 的 keys"""
return self.r.keys(pattern)

def clear_all(self):
"""删除该策略所有记录"""
self.r.delete(f"{self.strategy_name}:meta")
self.r.delete(f'{self.key_prefix}:META:{self.strategy_name}')
keys = self.get_keys(f'{self.key_prefix}:{self.strategy_name}*')
if keys is not None and len(keys) > 0:
self.r.delete(*keys)
Expand Down

0 comments on commit 0d1db73

Please sign in to comment.