Skip to content

Commit

Permalink
更新回测部分代码
Browse files Browse the repository at this point in the history
  • Loading branch information
woldy committed Jan 30, 2024
1 parent 48f8c1e commit f69c343
Show file tree
Hide file tree
Showing 18 changed files with 272 additions and 121 deletions.
43 changes: 37 additions & 6 deletions examples/demo-project/backtest/default/default_backtest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,40 @@
class DefaultTestmodule():
#finhack testmodule run
import os
import multiprocessing
from finhack.trader.default.function import load_preds_data
from finhack.library.mydb import mydb
from finhack.trader.default.data import Data

class DefaultBacktest():
def __init__(self):
pass


def run_command_with_semaphore(self, cmd, semaphore):
with semaphore:
try:
os.system(cmd)
except Exception as e:
print(f'An error occurred: {e}')

def run(self):
print(backtest)
pass

Data.init_data(cache=True)
cash_list = self.args.cash.split(',')
model_list = mydb.selectToDf('select * from auto_train order by score desc', 'finhack')

semaphore = multiprocessing.Semaphore(int(self.args.p)) # 创建一个信号量,最大允许3个进程同时运行
processes = []

for row in model_list.itertuples():
model_hash = getattr(row, 'hash')
load_preds_data(model_hash, True)
for cash in cash_list:
cmd = 'finhack trader run --strategy=AITopNStrategy --log_level=ERROR --model_id=' + model_hash + ' --cash=' + cash
print(cmd)
# 创建Process对象,传入函数和需要的参数,包括信号量
p = multiprocessing.Process(target=self.run_command_with_semaphore, args=(cmd, semaphore))
processes.append(p)
p.start()

# 等待所有进程完成
for p in processes:
p.join()

6 changes: 4 additions & 2 deletions examples/demo-project/strategy/AITopNStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ def initialize(context):
run_daily(trade, time="09:30")
# run_daily(trade, time="8:05")

args=json.loads(context.args)
model_id=args['model_id']
# args=json.loads(context.args)
# model_id=args['model_id']
model_id=context.trade.model_id

preds_data=load_preds_data(model_id)
clsLgbTrainer=LightgbmTrainer()
preds=clsLgbTrainer.pred(preds_data,md5=model_id,save=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def run3(self):
factorAnalyzer.alphalens("pe_0")

def run4(self):
factors=factorManager.getFactors(['ADOSC_0','AD_0','APO_0','AROONDOWN_0','ARRONUP_0','pe_0','alpha101_012','alpha101_013'])
factors=factorManager.getFactors(['ADOSC_0','AD_0','APO_0','AROONDOWN_0','ARRONUP_0','pe_0','alpha101_012','alpha101_013'],start_date='20150101',end_date="20230101")
print(factors)


Expand All @@ -53,4 +53,8 @@ def run5(self):
df = df[df.trade_date=='20240118']


print(df)
print(df)

def run6(self):
print('test')
pass
22 changes: 16 additions & 6 deletions finhack/core/command/finhack
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,16 @@ import os
from finhack.core.core import Core
import multiprocessing
import atexit

def child_process_action(core):
# 注册子进程退出时的清理函数,如果有的话
# atexit.register(Utils.child_cleanup)
core.do_action()
# 子进程完成任务后退出
os._exit(0)

if __name__ == '__main__':
core=Core()
core = Core()
core.load_module()

from finhack.library.utils import Utils
Expand All @@ -14,13 +22,15 @@ if __name__ == '__main__':
print("创建后台进程失败!")
exit(1)
elif pid == 0: # 子进程中执行
core.do_action()
exit(0)
child_process_action(core)
else: # 父进程中继续执行
Utils.write_pids(pid)
print("启动后台任务!")
exit(0)
# 父进程可以在这里继续执行其他任务,或者退出
# 如果父进程需要等待子进程结束,可以使用 os.waitpid() 方法
# _, status = os.waitpid(pid, 0)
# print("后台任务已结束,状态码:", status)
else:
atexit.register(Utils.auto_exit) # 注册退出函数
# 注册父进程退出时的清理函数
#atexit.register(Utils.auto_exit)
core.do_action()

98 changes: 42 additions & 56 deletions finhack/factor/default/factorManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,83 +92,69 @@ def getFactorsList(valid=True,ignore=True):
result.sort()
return result


#获取因子数据
def getFactors(factor_list,stock_list=[],start_date='',end_date='',cache=False):
df_factor=pd.DataFrame()



# 获取因子数据
def getFactors(factor_list, stock_list=[], start_date='', end_date='', cache=False):
df_factor = pd.DataFrame()
single_factors_pkl_dir = SINGLE_FACTORS_PKL_DIR
index_pkl_path = SINGLE_FACTORS_PKL_DIR+'index.pkl'

# for factor in factor_list:
# factor=factor.replace('$','')
# if os.path.isfile(SINGLE_FACTORS_DIR+factor+'.csv'):
# df=pd.read_csv(SINGLE_FACTORS_DIR+factor+'.csv',names=['ts_code','trade_date',factor], dtype={'ts_code': str,'trade_date': str, factor: np.float64},low_memory=False)
# df.set_index(['ts_code', 'trade_date'], inplace=True)
# if df_factor.empty:
# df_factor = df
# else:
# df_factor = df_factor.join(df, how='outer')
# else:
# Log.logger.warning(SINGLE_FACTORS_DIR+factor+'.csv not found')


# if df_factor.empty:
# return df_factor


# 加载索引
index_pkl_path = SINGLE_FACTORS_PKL_DIR + 'index.pkl'
index_df = pd.read_pickle(index_pkl_path)
index_df['trade_date'] = index_df['trade_date'].astype(str)

if start_date != "":
index_df = index_df[index_df['trade_date'] >= start_date]

if end_date != "":
index_df = index_df[index_df['trade_date'] <= end_date]

if index_df.empty:
return index_df

# 获取日期范围的位置
start_index = index_df.index[0]
end_index = index_df.index[-1]

factor_dfs = []

# 逐个读取因子 .pkl 文件
for factor in factor_list:
factor=factor.replace('$','')
factor = factor.replace('$', '')
factor_file = os.path.join(single_factors_pkl_dir, f'{factor}.pkl')
if os.path.isfile(factor_file):
# 加载因子数据
factor_data = pd.read_pickle(factor_file)
factor_data[factor] = pd.to_numeric(factor_data[factor], errors='coerce')


# 将因子数据添加到列表中
# 筛选对应日期的因子数据
factor_data = factor_data.iloc[start_index:end_index+1]
factor_dfs.append(factor_data)
else:
print(f"Warning: {factor_file} not found")

# 将所有因子数据合并为一个 DataFrame
combined_factors_df = pd.concat(factor_dfs, axis=1)

# 加载索引
index_df = pd.read_pickle(index_pkl_path)

# 将因子数据与索引进行对齐
df_factor = index_df.join(combined_factors_df, how='left')
df_factor['trade_date']=df_factor['trade_date'].astype(str)
df_factor = df_factor.set_index(['ts_code', 'trade_date'])
df_factor = df_factor.sort_index()

if df_factor.empty:
return df_factor

if stock_list!=[] or start_date!="" or end_date!="":
df_factor=df_factor.reset_index()
if stock_list!=[]:
df_list=[]
for ts_code in stock_list:
df_tmp=df_factor[df_factor.ts_code==ts_code]
df_list.append(df_tmp)
df_factor=pd.concat(df_list)

if start_date!="":
df_factor=df_factor[df_factor.trade_date>=start_date]

if end_date!="":
df_factor=df_factor[df_factor.trade_date<=end_date]
df_factor=df_factor.set_index(['ts_code','trade_date'])

df_factor=df_factor.sort_index()
# if cache:
# df_factor.to_pickle(cache_file)
if stock_list != []:
df_list = []
for ts_code in stock_list:
df_tmp = combined_factors_df[combined_factors_df['ts_code'] == ts_code]
df_list.append(df_tmp)

df_factor = pd.concat(df_list)
else:


return df_factor
pass


return df_factor




#获取alpha列表的列表
Expand Down
4 changes: 0 additions & 4 deletions finhack/factor/default/taskRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

class taskRunner:
def runTask(taskName='all'):

factorPkl.save()
exit()

c_list=preCheck.checkAllFactors() #chenged factor,代码发生变化
if taskName=='all':
task_list=Config.get_section_list('task')
Expand Down
1 change: 1 addition & 0 deletions finhack/trader/default/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
'args':None,
'trade':DictObj({
'market':'',
'model_id':'',
'start_time':'',
'end_time':'',
'benchmark':'000001',
Expand Down
56 changes: 43 additions & 13 deletions finhack/trader/default/default_trader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from runtime.constant import LOGS_DIR
from .context import context,g
from datetime import datetime
from finhack.library.mydb import mydb
class DefaultTrader:
def load_strategy(self,strategy_name):
if os.path.exists(f"{BASE_DIR}/strategy/{strategy_name}.py"):
Expand All @@ -35,6 +36,14 @@ def run(self):
Log.tlogger=Log.tLog(context.id,logs_dir=LOGS_DIR,background=self.args.background,level=self.args.log_level).logger
log("正在初始化交易上下文")

hassql='select * from backtest where instance_id="%s"' % (context.id)
has=mydb.selectToDf(hassql,'finhack')
if(not has.empty):
log("存在相同回测记录,本次回测结束!")
return




start_time=context['trade']['start_time']
end_time=context['trade']['end_time']
Expand Down Expand Up @@ -91,26 +100,44 @@ def run(self):
Performance.analyse(context)


# hassql='select * from auto_train where hash="%s"' % (md5)
# has=mydb.selectToDf(hassql,'finhack')
# if(has.empty):
# mydb.exec(insert_sql,'finhack')



returns_float_list = (context.performance.returns.values).tolist()
returns_string_list = ['{:.8f}'.format(item) for item in returns_float_list] # 保留8位小数
returns_string = ','.join(returns_string_list)

bench_float_list = (context.performance.bench_returns.values).tolist()
bench_string_list = ['{:.8f}'.format(item) for item in bench_float_list] # 保留8位小数
bench_string = ','.join(bench_string_list)


from finhack.library.mydb import mydb
#from finhack.library.mydb import mydb



features_list=''
train=''

if context.trade.model_id!='':
model=mydb.selectToDf('select * from auto_train where hash="'+context.trade.model_id+'"','finhack')
if(not model.empty):
model=model.iloc[0]
features_list=model['features']
train=model['algorithm']+"_"+model['loss']

sql="INSERT INTO `finhack`.`backtest`(`instance_id`,`features_list`, `train`, `model`, `strategy`, `start_date`, `end_date`, `init_cash`, `args`, `history`, `returns`, `logs`, `total_value`, `alpha`, `beta`, `annual_return`, `cagr`, `annual_volatility`, `info_ratio`, `downside_risk`, `R2`, `sharpe`, `sortino`, `calmar`, `omega`, `max_down`, `SQN`,filter,win,server,trade_num,runtime,starttime,endtime,benchReturns,roto,benchmark) VALUES ( '%s','%s', '%s', '%s', '%s', '%s', '%s', %s, '%s', '%s', '%s', '%s', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,'%s',%s,'%s',%s,'%s','%s','%s','%s','%s','%s')" % \
sql="INSERT INTO `finhack`.`backtest`(`instance_id`,`features_list`, `train`, `model`, `strategy`, `start_date`, `end_date`, `init_cash`, `args`, `history`, `returns`, `logs`, `total_value`, `alpha`, `beta`, `annual_return`, `cagr`, `annual_volatility`, `info_ratio`, `downside_risk`, `R2`, `sharpe`, `sortino`, `calmar`, `omega`, `max_down`, `SQN`,filter,win,server,trade_num,runtime,starttime,endtime,benchReturns,roto,benchmark,strategy_code) VALUES ( '%s','%s', '%s', '%s', '%s', '%s', '%s', %s, '%s', '%s', '%s', '%s', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,'%s',%s,'%s',%s,'%s','%s','%s','%s','%s','%s','%s')" % \
(context.id, \
'features_list', \
'train', \
'model', \
features_list, \
train, \
context.trade.model_id, \
context.trade.strategy, \
context.trade.start_time, \
context.trade.end_time, \
context.portfolio.starting_cash, \
str(context.args).replace("'",'"'), \
'history', \
context.performance.returns, \
returns_string, \
'logs', \
context.portfolio.total_value, \
str(context.performance.indicators.alpha), \
Expand All @@ -134,10 +161,13 @@ def run(self):
str(time.time()-t1), \
str(starttime), \
str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")), \
context.performance.bench_returns, \
bench_string, \
str(context.performance.indicators.roto), \
context.trade.benchmark)
print(sql)
context.trade.benchmark, \
context.trade.strategy_code.replace("'", "\\'") \

)
#print(sql)
mydb.exec('delete from backtest where instance_id="%s"' % (context.id),'finhack')
mydb.exec(sql,'finhack')

Expand Down
Loading

0 comments on commit f69c343

Please sign in to comment.