-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlauncher.py
118 lines (100 loc) · 3.31 KB
/
launcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from argparse import ArgumentParser
import json
import os
import platform
import shutil
import subprocess
from typing import Any, TypeVar
import venv
DEFAULT_DEPS = ['requests']
_T = TypeVar('_T')
def _cast(name: str, value: Any, to: type[_T]) -> _T:
if isinstance(value, to):
return value
raise TypeError(f'expected {name} to be a {to.__name__}')
def setup_venv(path: str):
print('Creating venv in', path)
venv.create('./run/venv', with_pip=True, upgrade_deps=True)
print('venv created')
def install_dependencies(python: str, deps: list[str]) -> bool:
print('Installing dependencies')
proc = subprocess.run((python, '-m', 'pip', 'install', *deps), stderr=subprocess.PIPE)
if proc.returncode:
print('Failed to install dependencies')
return False
print('Dependencies installed')
return True
parser = ArgumentParser(
prog='Simplic Insights Collector',
description='Reads and sends sensor data to OxS'
)
parser.add_argument('-s', '--settings', default='./settings.json')
parser.add_argument('--debug', action='store_true')
args = parser.parse_args()
with open(args.settings, 'rt') as f:
settings = json.load(f)
pkgs = set[str]()
deps = [dep for dep in DEFAULT_DEPS]
for sensor in settings['sensors']:
sensor_id = _cast('type', sensor['type'], str)
pkg, id = sensor_id.split(':')
pkgs.add(pkg)
for pkg in pkgs:
folder = os.path.join('sensors', pkg)
config_file = os.path.join(folder, 'manifest.json')
with open(config_file, 'rt') as f:
config = json.load(f)
if _cast('pkg', config['pkg'], str) != pkg:
raise ValueError('package name mismatch')
if not os.path.exists(os.path.join(folder, '__init__.py')):
raise FileNotFoundError('Missing __init__.py')
dependencies = _cast('dependencies', config['dependencies'], list)
deps.extend(_cast('dependency', dep, str) for dep in dependencies)
dir_run = './run'
dir_venv = './run/venv'
dir_core = './run/core'
dir_sensors = './run/sensors'
match platform.system():
case 'Linux':
python = './run/venv/bin/python'
case 'Windows':
python = './run/venv/Scripts/python.exe'
case _:
raise RuntimeError('Unsupported OS')
if not os.path.isdir(dir_run):
os.mkdir(dir_run)
if not os.path.isdir(dir_venv):
setup_venv(dir_venv)
if not install_dependencies(python, deps):
print('Trying clean venv')
shutil.rmtree(dir_venv)
if not install_dependencies(python, deps):
print('There is a dependency resolution issue')
print('Installing sensor scripts')
if os.path.exists(dir_core):
shutil.rmtree(dir_core)
shutil.copytree('./core', dir_core)
shutil.copyfile('./main.py', os.path.join(dir_run, 'main.py'))
if os.path.exists(dir_sensors):
shutil.rmtree(dir_sensors)
os.mkdir(dir_sensors)
for pkg in pkgs:
src_path = os.path.join('./sensors', pkg)
dst_path = os.path.join(dir_sensors, pkg)
shutil.copytree(src_path, dst_path)
print('Setup complete')
match platform.system():
case 'Linux':
python_run = './venv/bin/python'
case 'Windows':
python_run = './run/venv/Scripts/python.exe'
case _:
raise RuntimeError('Unsupported OS')
subprocess.run(
(
python_run, './main.py',
os.path.abspath(args.settings),
'debug' if args.debug else 'normal'
),
cwd=dir_run
)