-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_all_scripts.py
229 lines (194 loc) · 7.16 KB
/
run_all_scripts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""Script to run all the different python scripts that listens to websocket,
grpc stream and restarts them if they fail for any reason
Usage: python run_all_scripts.py
"""
import json
import logging
import signal
import subprocess
import sys
import time
from datetime import datetime, timedelta, timezone
from logging.handlers import RotatingFileHandler
from typing import Dict
from google.cloud import bigquery
# Set up the BigQuery client
client = bigquery.Client()
with open("config.json", "r") as config_file:
config = json.load(config_file)
# Constants
PROJECT_ID = config["bigquery_project_id"]
CHECK_INTERVAL = 250 # Check every 250 seconds
SCRIPT_CONFIGS = {
# "websocket": {
# "script_name": "listen_to_websocket.py",
# "table_id": "indexer_stream.responses",
# "timestamp_column": "received_at",
# "filter": "",
# "args": [],
# "time_threshold": timedelta(seconds=90),
# },
"place_orders": {
"script_name": "place_orders.py",
"table_id": "latency_experiments.long_running_two_sided_orders",
"timestamp_column": "sent_at",
"filter": "",
"args": [],
"time_threshold": timedelta(seconds=90),
},
# "place_taker_orders": {
# "script_name": "place_taker_orders.py",
# "table_id": "latency_experiments.long_running_taker_orders",
# "timestamp_column": "sent_at",
# "filter": "",
# "args": [],
# "time_threshold": timedelta(seconds=180),
# },
"place_stateful_orders": {
"script_name": "place_stateful_orders.py",
"table_id": "latency_experiments.long_running_stateful_orders",
"timestamp_column": "sent_at",
"filter": "",
"args": [],
"time_threshold": timedelta(seconds=90),
},
# Add more scripts with their corresponding table IDs, timestamp columns, and filters here
}
# Include full node stream gRPC listeners
for addr in config["full_node_addresses"]:
SCRIPT_CONFIGS[f"grpc_stream {addr}"] = {
"script_name": "listen_to_grpc_stream.py",
"table_id": "full_node_stream.responses",
"timestamp_column": "received_at",
"filter": f'server_address = "{addr}"',
"args": ["--server_address", addr],
"time_threshold": timedelta(seconds=90),
}
for addr in config["indexer_addresses"]:
SCRIPT_CONFIGS[f"indexer {addr}"] = {
"script_name": "listen_to_websocket.py",
"table_id": "indexer_stream_new.responses",
"timestamp_column": "received_at",
"filter": f'server_address = "{addr}"',
"args": ["--indexer_url", addr],
"time_threshold": timedelta(seconds=90),
}
def get_latest_timestamp(table_id, timestamp_column, filter_condition):
filter_clause = f"WHERE TIMESTAMP_TRUNC({timestamp_column}, DAY) = TIMESTAMP(CURRENT_DATE()) AND {filter_condition}" if filter_condition else ""
query = f"""
SELECT MAX({timestamp_column}) as latest_timestamp
FROM `{PROJECT_ID}.{table_id}`
{filter_clause}
"""
query_job = client.query(query)
results = query_job.result()
for row in results:
if row["latest_timestamp"]:
return row["latest_timestamp"].astimezone(timezone.utc).replace(tzinfo=None)
return None
def start_script(script_name, args):
logging.info("running " + script_name + " " + " ".join(args))
return subprocess.Popen(["python", script_name] + args)
def terminate_process(process: subprocess.Popen, pname: str):
# Try to terminate the process
logging.info(f"Terminating process {pname}...")
process.terminate()
def force_kill_process(process: subprocess.Popen, pname: str):
# Check if the process has terminated
if process.poll() is None:
# Process is still alive, so forcefully kill it
logging.info(f"Forcefully killing process {pname}...")
process.kill()
process.wait()
logging.info(f"Process {pname} has finished.")
def check_and_restart_script(
process,
config_name,
script_name,
table_id,
timestamp_column,
filter_condition,
args,
time_threshold,
):
latest_timestamp = get_latest_timestamp(
table_id, timestamp_column, filter_condition
)
should_restart = False
if latest_timestamp:
current_time = datetime.utcnow().replace(tzinfo=None)
if current_time - latest_timestamp > time_threshold:
logging.info(
f"Latest timestamp for table {table_id} for script {script_name} "
f"is {latest_timestamp}, restarting {config_name}..."
)
should_restart = True
else:
logging.info(
f"Latest timestamp for table {table_id} for script {script_name} "
f"is {latest_timestamp}, {config_name} is working fine."
)
else:
logging.info(f"Failed to retrieve the latest timestamp for table "
f"{table_id}, restarting {config_name}...")
should_restart = True
if should_restart:
terminate_process(process, script_name)
time.sleep(1) # Wait for the process to terminate
force_kill_process(process, script_name)
return start_script(script_name, args)
else:
return process
def main():
processes: Dict[str, subprocess.Popen] = {
config: start_script(info["script_name"], info["args"])
for config, info in SCRIPT_CONFIGS.items()
}
# Gracefully handle Ctrl+C
def signal_handler(sig, frame):
logging.info("Received termination signal, shutting down...")
for pname, p in processes.items():
terminate_process(p, pname)
time.sleep(3) # Wait for the processes to terminate
for pname, p in processes.items():
force_kill_process(p, pname)
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
time.sleep(CHECK_INTERVAL)
while True:
for config, info in SCRIPT_CONFIGS.items():
process = processes[config]
script_name = info["script_name"]
table_id = info["table_id"]
timestamp_column = info["timestamp_column"]
filter_condition = info.get("filter", "")
args = info["args"]
# Check if the process is still running
if process.poll() is not None:
logging.info(f"{script_name} ({config}) has stopped, restarting...")
processes[config] = start_script(script_name, args)
else:
processes[config] = check_and_restart_script(
process,
config,
script_name,
table_id,
timestamp_column,
filter_condition,
args,
info["time_threshold"],
)
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
handler = RotatingFileHandler(
"run_all_scripts.log",
maxBytes=5 * 1024 * 1024, # 5 MB
backupCount=5
)
logging.basicConfig(
handlers=[handler],
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
main()