forked from MasterCash/ant-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
72 lines (59 loc) · 2.15 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from numpy import maximum
from bot import handleState, run
from time import sleep, time
from datamanager import DataManager, collectData
from windowManager import CaptureData, findWindow, getWindowInfo
from multiprocessing import Lock, Process, SimpleQueue, Value, Array
import signal
def splitLocations(num: int, max: tuple[int, int] = (600, 600)) -> list[tuple[int, int, int, int]]:
if num == 0:
return [(0, 0, max[0], max[1])]
chunk = getMax(int((max[0] + 1) / num), 1)
extra = (max[0] - (num * chunk)) + 1
locs = [(chunk * i, 0, (chunk * (i+1)) -1, max[1]) for i in range(num)]
x1, y1, x2, y2 = locs[num -1]
locs[num -1] = (x1, y1, x2 + extra, y2)
return locs
captureLock = Lock()
def getMax(x1, x2) -> int:
return x1 if x1 > x2 else x2
def main():
def handleInterrupt(_signal, _frame):
print("set killswitch")
killSwitch.value = True
pass
dataQueue: SimpleQueue = SimpleQueue()
captures: list[tuple[str, int or None]] = []
# add list of window names
for window in ["BlueStacks", "BlueStacks 1","BlueStacks 2", "BlueStacks 3", "BlueStacks 5"]:
hwnd = findWindow(window, "Qt5154QWindowOwnDCIcon")
# ignore windows we don't find
if hwnd != None:
captures.append((window, hwnd))
numCaptures = len(captures)
locs = splitLocations(numCaptures)#, (0,9))
print(locs)
stopped = Array('b',[False for _ in range(numCaptures)])
runTime = time()
procs: list[Process] = []
killSwitch = Value('b', False)
signal.signal(signal.SIGINT, handleInterrupt)
db = Process(target=collectData, args=(killSwitch, dataQueue), name="database")
db.start()
procs.append(db)
for i in range(numCaptures):
p = Process(target=run, args=(captures[i], captureLock, locs[i], dataQueue, i, stopped, killSwitch), name=f'runner-{captures[i][0]}')
p.start()
procs.append(p)
while not (all(stopped) or killSwitch.value):
sleep(5)
runTime = time() - runTime
killSwitch.value = True
print(f"time: {runTime} sec with {numCaptures} instances")
for p in procs:
print(f'process: {p.name} joining...')
if p.is_alive():
p.join()
print(f'process: {p.name} joined')
if __name__ == "__main__":
main()