-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanage.py
executable file
·175 lines (148 loc) · 4.55 KB
/
manage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# -*- coding: utf-8 -*-
import os
import importlib
from typing import Any
import click
import inject
from redis_lock import Lock
from celery import Celery
from template_migration import Migration
from template_apollo import ApolloClient
from app.dependencies import Config, CacheRedis
from app.constants.enum import SysCMD
from app.utils.common import generate_table, print_config
# 加载日志模块
template_logging: Any = importlib.import_module('template_logging')
# 初始化日志
template_logging.init_logger()
# 获取Logger
logger = template_logging.getLogger(__name__)
@click.group()
def cli():
logger.info('Init app...')
# 生成数据库表
generate_table()
print_config()
# 获取配置
config: Config = inject.instance(Config)
# 启动apollo后台监听线程
if config.CMD in (
SysCMD.RUN_API_SERVER, SysCMD.RUN_TEST_SERVER, SysCMD.RUN_BEAT,
SysCMD.RUN_BEAT_WORKER, SysCMD.RUN_CUSTOM_WORKER
):
apollo_client: ApolloClient = inject.instance(ApolloClient)
apollo_client.start()
# 加🔒, 执行migrate脚本,只执行一次
lock_key: str = f"migrate-lock:{config.PROJECT_NAME}-{config.RUNTIME_ENV}"
# 获取redis
redis_cache: CacheRedis = inject.instance(CacheRedis)
lock = Lock(redis_cache, lock_key)
if lock.acquire(blocking=True, timeout=1800):
try:
migration_instance: Migration = inject.instance(Migration)
migration_instance.do_migrate()
except Exception as e:
logger.error(f"failed to do migrate", exc_info=1)
raise e
finally:
# 释放🔒
lock.release()
@cli.command()
def run_print_config():
"""
打印配置
"""
click.echo('run_print_config ......')
print_config()
@cli.command()
def run_api_server():
"""
启动服务
"""
from app.gunicorn_app import run_http_server
from app.flask_app import flask_app
logger.info(f'Run api server...')
port: int = os.getenv('PORT', 8080)
logger.info(f"bind ip is http://0.0.0.0:{port}")
# 启动 gunicorn server
run_http_server(flask_app, {
'bind': f'0.0.0.0:{port}',
'accesslog': '-',
'errorlog': '-',
'max_requests': 8000,
'timeout': 60,
'worker_connections': 4000,
'capture_output': True,
'reload': False,
# 使用默认的worker类型
'worker_class': 'gthread',
'limit_request_line': 8000,
})
@cli.command()
def run_test_server():
"""
启动服务
"""
from app.flask_app import flask_app
logger.info(f"Run test server")
logger.info("bind ip is http://0.0.0.0:8080")
flask_app.run('0.0.0.0', 8080, debug=True)
@cli.command()
def run_beat():
"""
启动celery定时任务
"""
celery_app = inject.instance(Celery)
celery_app.start(
argv=['celery', 'beat', '-S', 'redbeat.RedBeatScheduler', '-l', 'INFO', '--pidfile=']
)
@cli.command()
def run_beat_worker():
"""
该worker仅消费默认定时任务队列消息
"""
# 获取配置
config: Config = inject.instance(Config)
celery_app = inject.instance(Celery)
celery_app.start(
argv=['celery', 'worker', '-l', 'INFO', '-n',
f'{config.PROJECT_NAME}-{config.RUNTIME_ENV}-run_beat_worker@%h',
'-c', f'{config.WORKER_NUM}', '-Q', f'{config.PROJECT_NAME}-{config.RUNTIME_ENV}-beat-queue',
'--max-tasks-per-child', f'{config.MAX_TASKS_PER_CHILD}',
]
)
@cli.command()
def run_custom_worker():
"""
执行业务中异步任务
该worker仅消费默认队列消息
"""
# 获取配置
config: Config = inject.instance(Config)
celery_app = inject.instance(Celery)
celery_app.start(
argv=['celery', 'worker', '-l', 'INFO', '-n',
f'{config.PROJECT_NAME}-{config.RUNTIME_ENV}-run_custom_worker@%h',
'-c', f'{config.WORKER_NUM}', '-Q', f'{config.PROJECT_NAME}-{config.RUNTIME_ENV}-queue',
'--max-tasks-per-child', f'{config.MAX_TASKS_PER_CHILD}',
]
)
@cli.command()
def test():
"""
单元测试
"""
import unittest
tests = unittest.TestLoader().discover('tests')
unittest.TextTestRunner(verbosity=2).run(tests)
@cli.command()
def migrate():
"""
数据库migrate
如果没有migrate记录则要初始化数据
"""
from template_migration import Migration
migration_instance: Migration = inject.instance(Migration)
migration_instance.do_migrate()
if __name__ == '__main__':
cli()