-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager.py
151 lines (127 loc) · 4.35 KB
/
manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import argparse
import subprocess
import time
import re
import sys
import random
from bandit import EpsilonGreedyBandit, EpsilonDecreasingBandit, deploy_bandit
def parse_arguments():
parser = argparse.ArgumentParser(
description="Manage scraping tasks based on failure rates."
)
# Threshold for error rate (e.g., 0.10 for 10%)
parser.add_argument(
"--threshold-error-rate",
type=float,
default=0.10,
help="Threshold for error rate (default: 0.10 for 10%)",
)
# Default wait time in seconds
parser.add_argument(
"--default-wait-time",
type=int,
default=5,
help="Default wait time in seconds before retrying (default: 5)",
)
# Extra wait time if last try also had error
parser.add_argument(
"--extra-wait-time",
type=int,
default=0,
help="Extra wait time in seconds if the last retry also had errors (default: 0)",
)
# Number of tasks on error
parser.add_argument(
"--waiting-num-tasks",
type=int,
default=10,
help="Number of tasks to run when retrying due to high error rate (default: 10)",
)
# Number of processes
parser.add_argument(
"--num-processes",
type=int,
default=30,
help="Number of processes to use (default: 30)",
)
return parser.parse_args()
def run_scraping(num_processes, num_tasks):
"""
Runs the scraping.py script with specified number of processes and tasks.
Returns the total number of failed results and total tasks.
"""
cmd = [
sys.executable,
"scraping.py",
"--num-processes",
str(num_processes),
"--num-tasks",
str(num_tasks),
]
try:
# Use Popen instead of run to capture live output
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
failed = 0
while True:
# Poll process for new output until finished
output = process.stdout.readline()
error_output = process.stderr.readline()
if output:
print("[STDOUT]:", output.strip())
# Check if the output contains "Total failed results: X"
match = re.search(r"Total failed results:\s+(\d+)", output)
if match:
failed = int(match.group(1))
if error_output:
print("[STDERR]:", error_output.strip())
# Exit when the script is done
if output == "" and error_output == "" and process.poll() is not None:
break
if failed is not None:
print(
f"[INFO] Completed scraping with {failed} failed tasks out of {num_tasks}."
)
return num_tasks - failed, failed
else:
print("[WARNING] Could not find 'Total failed results' in the output.")
return 0, num_tasks
except subprocess.CalledProcessError as e:
print(f"[ERROR] Scraping script failed with error:\n{e.stderr}")
return 0, num_tasks
def main():
args = parse_arguments()
threshold_error_rate = args.threshold_error_rate
default_wait = args.default_wait_time
extra_wait = args.extra_wait_time
waiting_num_tasks = args.waiting_num_tasks
num_processes = args.num_processes
# initial_num_tasks = 50
# failed_cnt = 0
# while failed_cnt < 10:
# print(f"[INFO] Running initial scraping with {initial_num_tasks} tasks.")
# _, failed_cnt = run_scraping(num_processes, initial_num_tasks)
# num_tasks_options = list(range(1500, 5000, 500))
# num_tasks_options = list(range(350, 450, 10))
num_tasks_options = [5000, 10000, 25000, 50000]
bandit = EpsilonDecreasingBandit(
arms=num_tasks_options,
initial_epsilon=1.0,
limit_epsilon=0.05,
half_decay_steps=300,
)
# bandit = EpsilonGreedyBandit(arms=num_tasks_options, epsilon=0.1)
deploy_bandit(
bandit,
lambda num_tasks: run_scraping(num_processes, num_tasks),
failure_threshold=threshold_error_rate,
default_wait_time=default_wait,
extra_wait_time=extra_wait,
waiting_args=waiting_num_tasks,
max_steps=2000,
reward_factor=1.0,
verbose=True,
)
if __name__ == "__main__":
main()