-
Notifications
You must be signed in to change notification settings - Fork 4
/
carthage-resource-agent
executable file
·144 lines (127 loc) · 5.4 KB
/
carthage-resource-agent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/python3
# Copyright (C) 2022, Hadron Industries, Inc.
# Carthage is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation. It is distributed
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the file
# LICENSE for details.
import sys, os
_OCF_RESKEY_PYTHONPATH = os.environ.get('OCF_RESKEY_PYTHONPATH', None)
if _OCF_RESKEY_PYTHONPATH:
sys.path = list(filter(lambda x: x, _OCF_RESKEY_PYTHONPATH.split(':'))) + sys.path
sys.path.extend(["/usr/lib/ocf/lib/heartbeat"])
import asyncio
import contextlib
import functools
import os
import sys
# reset basename so ocf doesn't overwrite carthage logger
sys.argv[0] = 'carthage-agent'
from pathlib import Path
import ocf
from carthage import *
from carthage.modeling import *
from carthage.utils import file_locked
import carthage, carthage.container, carthage.vm
carthage.sh.logger = ocf.logger
carthage.container.logger = ocf.logger
carthage.vm.logger = ocf.logger
carthage.network.logger = ocf.logger
carthage.setup_tasks.logger = ocf.logger
async def get_layout(config_file):
with open(config_file, "rt") as f:
config_layout = await ainjector(ConfigLayout)
config_layout.load_yaml(f)
layout_name = config_layout.layout_name
config_layout.persist_local_networking = True
if layout_name:
return await ainjector.get_instance_async(InjectionKey(CarthageLayout, layout_name=layout_name))
else: return await ainjector.get_instance_async(CarthageLayout)
def async_handler(f):
@functools.wraps(f)
def run_async(*args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(f(*args, **kwargs))
return run_async
async def get_machines(layout, machines):
machine_list = machines.split()
ainjector = layout.ainjector
return await asyncio.gather(*[ ainjector.get_instance_async(InjectionKey(MachineModel, host=m, _ready=False)) for m in machine_list])
@async_handler
async def monitor(config, machines):
try:
layout = await get_layout(config)
except FileNotFoundError:
return ocf.OCF_ERR_UNIMPLEMENTED
models = await get_machines(layout, machines)
probe = ocf.is_probe()
all_running = True
some_running = False
for m in models:
m.force_locally_hosted = True
machine = await m.ainjector.get_instance_async(InjectionKey(Machine, _ready=False))
running = await machine.is_machine_running()
if running: some_running = True
else: all_running = False
if probe and some_running: return ocf.OCF_SUCCESS
# The following should already be invarient, but in the interest of being defensive
if not some_running: all_running = False
if all_running: return ocf.OCF_SUCCESS
if some_running:
ocf.ocf_exit_reason("Some, but not all machines running")
return ocf.OCF_ERR_GENERIC
return ocf.OCF_NOT_RUNNING
@async_handler
async def start(config, machines):
layout = await get_layout(config)
ainjector = layout.ainjector
config_layout = await ainjector(ConfigLayout)
async with file_locked(Path(config_layout.output_dir)/"generation_lock"):
await layout.generate()
models = await get_machines(layout, machines)
import carthage_base.hosted
for m in models:
m.force_locally_hosted = True
carthage_base.hosted.clear_hosted(m)
await m.async_become_ready()
for m in models:
machine = await m.ainjector.get_instance_async(Machine)
await machine.start_machine()
assert machine.running
# There is some sort of race condition with closing the read side
# of the container's stdout. If that happens too early, the
# container will shutdown. Not entirely sure what's going on, but
# adding a os.close(machine.process.process._stdout_read_fd) just
# after start_machine returns definitely triggers the problem.
await asyncio.sleep(6)
return ocf.OCF_SUCCESS
@async_handler
async def stop(config, machines):
try:
layout = await get_layout(config)
except FileNotFoundError:
return ocf.OCF_ERR_UNIMPLEMENTED
ainjector = layout.ainjector
config_layout = await ainjector(ConfigLayout)
async with file_locked(Path(config_layout.output_dir)/"generation_lock"):
await layout.resolve_networking()
models = await get_machines(layout, machines)
for m in reversed(models):
m.force_locally_hosted = True
machine = await m.ainjector.get_instance_async(InjectionKey(Machine, _ready=False))
await machine.is_machine_running()
await machine.stop_machine()
assert not machine.running
return ocf.OCF_SUCCESS
ainjector = base_injector(AsyncInjector)
agent = ocf.Agent(name="carthage", shortdesc="Manage a set of machines from a Carthage layout",
longdesc=""
)
agent.add_parameter("config", "Path to the config.yml for the layout", required=True)
agent.add_parameter("machines", "Space separated list of machines to manage", required=True)
agent.add_parameter("PYTHONPATH", "PYTHONPATH to be used for carthage-resource-agent", required=False)
agent.add_action('start', timeout="180s", handler=start)
agent.add_action('stop', timeout='220s', handler=stop)
agent.add_action('monitor', interval='30s', timeout='90s', handler=monitor)
agent.run()