该项目基于cflow主体调度系统,做适当修改:
- kafka->Redis
- etl_day时间格式yyyy-mm-dd->yyyymmdd
- 加入web管理调度任务模块,降低使用难度
用python实现的高可用可横向扩展的分布式调度系统,并具有可视化操作的功能
- 定时调度(类似linux crontab)
- 依赖调度(满足依赖关系后才会启动任务)
- 任务格式: 任意命令行可执行程序
- 高可用,可横向扩展(调度器和执行器均可同时运行多个,完善的重试机制)
- 可视化操作(基于flask)
调度器获取任务,通过Redis进行任务分发,执行器消费Redis的任务并执行
- 删除
- 强制执行
- 重做当前
- 重做当前及后续
- 强制通过
- 查看单个任务依赖关系
- 优化任务触发方式
- 修复未知bug
- 用户权限
- 任务路由
- 任务优先级
- work发现
goflow命令位于工程目录下
- 安装mysql和etcd,执行 goflow init_db -c goflow_conf.json
- 运行调度器: goflow scheduler -c goflow_conf.json --ha
- 运行执行器(可启多个): goflow executer -c goflow_conf.json
配置文件为json格式
{
"Common":{
"MysqlConn":"mysql://test:test@localhost:3306/goflow", # 数据库地址
"MysqlConn":"mysql://test:test@localhost:3306/cflow", # 数据库地址
"Broker":"localhost:9092" # kafka地址 也可基于redis
},
"Scheduler":{
"LogDir":"/var/log/go_log", # 日志路径
"LogFile":"scheduler.log", # 日志文件
"FetchInterval":10, # 扫描cron_conf间隔(秒)
"WorkerTimeOut":20, # woker 心跳超时时间
"RetryQueueimes":4 # 超时任务,重新发起调度的次数
},
"Worker": {
"LogDir":"/var/log/go_log",
"LogFile":"executer.log",
"Parallelism": 32, # 每个woker,并行执行任务的个数
"HearteatInterval": 10, # worker 心跳间隔(秒)
"Retry": 3, # 单个任务重试次数
"TaskTimeOut": 60 # 执行单个任务时的超时时间(秒), -1 不设置超时
}
}
- cron_conf crontab 配置表,存储定时任务配置
- loader_result 抽取结果表,存抽取结果,以及worker直接文件同步
- stat_result 每天的执行结果
- task_define 普通任务定义
- task_dependency 任务依赖关系
- task_instance 所有的任务实例, 记录任务的运行状态信息等
{run,dep,do_all_job,migrate,version,kill,executer,scheduler,init_db} ...
查看参数列表和使用方法
查看参数列表和使用方法
goflow run -j task_id -d YYYY-MM-DD
goflow run -j task_id -d YYYY-MM-DD --force
goflow run -j task_id -d YYYY-MM-DD -down
goflow run -j task_id -d YYYY-MM-DD -up
任务操作: 开始 强制执行 重做当前 重做当前及后续 强制通过
1、查看依赖 2、优化任务触发方式 3、修复未知bug