Skip to content

Commit

Permalink
Finate State Machine implementation to solve SW concurrency issue
Browse files Browse the repository at this point in the history
  • Loading branch information
vrbanecd committed Jan 14, 2025
1 parent 0ce259b commit 28e7914
Showing 1 changed file with 132 additions and 113 deletions.
245 changes: 132 additions & 113 deletions teapot.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def lifespan(app: FastAPI):
# everything after the yield should be executed after shutdown
handles = app.state.session_state.keys()
for k in list(handles):
await _stop_webdav_instance(k)
await _stop_webdav_instance(k, sw_state, sw_state_lock)
session_store_path = Path(SESSION_STORE_PATH)
if session_store_path.exists():
session_store_path.unlink()
Expand Down Expand Up @@ -135,6 +135,10 @@ async def lifespan(app: FastAPI):
# lock for the state. any write operation on the state should only be done
# in an "async with app.state_lock:" environment.
app.state.state_lock = anyio.Lock()
# state of the storm webdav servers
sw_state: dict[str, str] = {}
# lock for the state of the storm webdav servers
sw_state_lock = anyio.Lock()

context = ssl.create_default_context()
context.load_verify_locations(cafile=config["Teapot"]["Teapot_CA"])
Expand Down Expand Up @@ -466,8 +470,11 @@ async def _get_proc(cmd):
raise RuntimeError("process with full command ", +cmd + "does not exist.")


async def _stop_webdav_instance(username):
async def _stop_webdav_instance(username, state, state_lock):
logger.info("Stopping webdav instance for user %s.", username)
async with state_lock:
state[username] = "STOPPING"
state_lock.notify()

logger.debug(
"_stop_webdav_instance: trying to acquire lock at %s",
Expand Down Expand Up @@ -515,6 +522,11 @@ async def _stop_webdav_instance(username):
else:
logger.info("Successfully killed process with PID %d.", pid)
exit_code = 0
async with state_lock:
if state[username] == "STOPPING":
state[username] = "NOT RUNNING"
state_lock.notify()

except subprocess.CalledProcessError as e:
logger.error(
"Exception occurred while trying to kill process \
Expand Down Expand Up @@ -548,10 +560,6 @@ async def stop_expired_instances():
while True:
await asyncio.sleep(CHECK_INTERVAL_SEC)
logger.debug("checking for expired instances")
logger.debug(
"stop_expired_instances: trying to acquire 'users' lock at %s",
{datetime.datetime.now().isoformat()},
)
async with app.state.state_lock:
logger.debug(
"stop_expired_instances: acquired 'users' lock at %s",
Expand All @@ -566,17 +574,13 @@ async def stop_expired_instances():
{datetime.datetime.now().isoformat()},
)
async with app.state.state_lock:
logger.debug(
"stop_expired_instances: acquired 'user_dict' lock at %s",
{datetime.datetime.now().isoformat()},
)
user_dict = app.state.session_state.get(user, None)
if user_dict is not None:
last_accessed = user_dict.get("last_accessed", None)
if last_accessed is not None:
diff = now - datetime.datetime.fromisoformat(last_accessed)
if diff.seconds >= INSTANCE_TIMEOUT_SEC:
res = await _stop_webdav_instance(user)
res = await _stop_webdav_instance(user, sw_state, sw_state_lock)
# TO DO: remove instance from session_state
if res != 0:
logger.error(
Expand Down Expand Up @@ -737,6 +741,78 @@ async def _map_fed_to_local(sub):
return None


async def storm_webdav_state(state, state_lock, user):
"""
This function manages the state of the storm-webdav instance for a particular user.
There are four possible states for a storm-webdav instance: STARTING, RUNNING,
STOPPING, NOT_RUNNING. The default state is NOT_RUNNING. Transition between
different states is triggered by an incomming request or by storm-webdav instance
reaching the inactivity treshold.
"""
should_start_sw = False
async with state_lock:
if user not in state:
state[user] = "NOT_RUNNING"
while not (state[user] == "NOT RUNNING" or state[user] == "RUNNING"):
state_lock.wait()

if state[user] == "NOT_RUNNING":
state[user] = "STARTING"
state_lock.notify()
logger.info(
"No instance running for user %s yet, starting now.", user)
should_start_sw = True

if state[user] == "RUNNING":
async with app.state.state_lock:
app.state.session_state[user]["last_accessed"] = str(
datetime.datetime.now()
)
should_start_sw = False

if should_start_sw:
port = await _find_usable_port_no()
pid = await _start_webdav_instance(user, port)

if not pid:
async with state_lock:
if state[user] == "STARTING":
state[user] = "NOT RUNNING"
state_lock.notify()
async with app.state.state_lock:
app.state.session_state[user] = {
"pid": None,
"port": -1,
"created_at": None,
"last_accessed": str(datetime.datetime.now()),
}
logger.error(
"something went wrong while starting instance for user %s.",
user,
)
return -1

async with state_lock:
if state[user] == "STARTING":
state[user] = "RUNNING"
state_lock.notify()
async with app.state.state_lock:
app.state.session_state[user] = {
"pid": pid,
"port": port,
"created_at": datetime.datetime.now(),
"last_accessed": str(datetime.datetime.now()),
}
return port

else:
async with app.state.state_lock:
port = app.state.session_state[user].get("port", None)
logger.info(
"StoRM-WebDAV instance for %s is running on port %d", user, port)
return port


async def _return_or_create_storm_instance(sub):
# returns redirect_host and redirect port for sub.

Expand All @@ -746,117 +822,60 @@ async def _return_or_create_storm_instance(sub):
# local user is unknown, we cannot start or check anything.
return None, None, None

# now check if an instance is running by checking the global state
if local_user in app.state.session_state.keys():
logger.debug(
"_return_or_create_storm_instance: trying to acquire 'get' lock \
at %s",
datetime.datetime.now().isoformat(),
)
async with app.state.state_lock:
port = await storm_webdav_state(sw_state, sw_state_lock, local_user)

running = False
loops = 0
while not running:
await anyio.sleep(1)
if loops >= STARTUP_TIMEOUT:
logger.debug(
"_return_or_create_storm_instance: acquired 'get' lock at \
%s",
datetime.datetime.now().isoformat(),
)
port = app.state.session_state[local_user].get("port", None)
app.state.session_state[local_user]["last_accessed"] = str(
datetime.datetime.now()
)
logger.info(
"StoRM-WebDAV instance for %s is running on port %d",
local_user,
port,
)
else:
# if no instance is running, start it. but first, it has to be checked
# if the directories exist, if not, they need to be created.
# also, we need to write the env vars into a .bash_profile for the
# user, so they are there when the webdav-instance is started.
# the port, pid, storage_area and directory will be managed within
# an sqlite database here in teapot. no external scripts anymore
# to keep the state and its management in one place.
logger.debug(
"no instance running for user %s yet, starting \
now.",
local_user,
)
port = await _find_usable_port_no()
pid = await _start_webdav_instance(local_user, port)
if not pid:
logger.error(
"something went wrong while starting instance for user %s.",
"instance for user %s not reachable after %d tries... \
stop trying.",
local_user,
STARTUP_TIMEOUT,
)
return None, -1, local_user
logger.debug(
"_return_or_create_storm_instance: trying to acquire 'set' lock \
at %s",
datetime.datetime.now().isoformat(),
)
async with app.state.state_lock:
logger.debug(
"_return_or_create_storm_instance: acquired 'set' lock at %s",
"_return_or_create_storm_instance: trying to acquire \
'pop' lock at %s",
datetime.datetime.now().isoformat(),
)
app.state.session_state[local_user] = {
"pid": pid,
"port": port,
"created_at": datetime.datetime.now(),
"last_accessed": str(datetime.datetime.now()),
}
running = False
loops = 0
while not running:
await anyio.sleep(1)
if loops >= STARTUP_TIMEOUT:
logger.debug(
"instance for user %s not reachable after %d tries... \
stop trying.",
local_user,
STARTUP_TIMEOUT,
)
async with app.state.state_lock:
logger.debug(
"_return_or_create_storm_instance: trying to acquire \
'pop' lock at %s",
"_return_or_create_storm_instance: acquired 'pop'\
lock at %s",
datetime.datetime.now().isoformat(),
)
async with app.state.state_lock:
logger.debug(
"_return_or_create_storm_instance: acquired 'pop'\
lock at %s",
datetime.datetime.now().isoformat(),
)
app.state.session_state.pop(local_user)
return None, -1, local_user
try:
logger.debug(
"checking if instance for user {local_user} is listening \
on port %d.",
port,
)
context1 = ssl.create_default_context()
context1.load_verify_locations(
cafile=config["Storm-webdav"]["Storm-webdav_CA"]
)
resp = httpx.get(
"https://"
+ config["Storm-webdav"]["SERVER_ADDRESS"]
+ ":"
+ str(port)
+ "/",
verify=context1,
)
if resp.status_code >= 200:
running = True
except httpx.ConnectError:
loops += 1
logger.debug(
"_return_or_create: trying to reach instance, try \
%d/%d...",
loops,
STARTUP_TIMEOUT,
)
app.state.session_state.pop(local_user)
return None, -1, local_user
try:
logger.debug(
"checking if instance for user {local_user} is listening \
on port %d.",
port,
)
context1 = ssl.create_default_context()
context1.load_verify_locations(
cafile=config["Storm-webdav"]["Storm-webdav_CA"]
)
resp = httpx.get(
"https://"
+ config["Storm-webdav"]["SERVER_ADDRESS"]
+ ":"
+ str(port)
+ "/",
verify=context1,
)
if resp.status_code >= 200:
running = True
except httpx.ConnectError:
loops += 1
logger.debug(
"_return_or_create: trying to reach instance, try \
%d/%d...",
loops,
STARTUP_TIMEOUT,
)

logger.info(
"Storm-WebDAV instance for %s started on port %d.",
Expand Down

1 comment on commit 28e7914

@vrbanecd
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
16 16 0 32 50.00 781.182ms

Passed Tests

Name ⏱️ Duration Suite
GET NO TOKEN 0.013 s Teapot-Tests
GET INVALID TOKEN 0.048 s Teapot-Tests
PUT REQUEST INVALID TOKEN 0.014 s Teapot-Tests
PUT REQUEST NO TOKEN 0.013 s Teapot-Tests
GET FILE NO TOKEN 0.010 s Teapot-Tests
GET FILE INVALID TOKEN 0.011 s Teapot-Tests
DELETE REQUEST INVALID TOKEN 0.011 s Teapot-Tests
DELETE REQUEST NO TOKEN 0.010 s Teapot-Tests
GET NO TOKEN EXTRA_AREA 0.010 s Teapot-Tests
GET INVALID TOKEN EXTRA_AREA 0.010 s Teapot-Tests
PUT REQUEST INVALID TOKEN EXTRA_AREA 0.010 s Teapot-Tests
PUT REQUEST NO TOKEN EXTRA_AREA 0.012 s Teapot-Tests
GET FILE NO TOKEN EXTRA_AREA 0.011 s Teapot-Tests
GET FILE INVALID TOKEN EXTRA_AREA 0.011 s Teapot-Tests
DELETE REQUEST INVALID TOKEN EXTRA_AREA 0.010 s Teapot-Tests
DELETE REQUEST NO TOKEN EXTRA_AREA 0.010 s Teapot-Tests

Failed Tests

Name Message ⏱️ Duration Suite
GET USER1 Url: https://teapot:8081/default_area Expected status: 500 != 200 0.179 s Teapot-Tests
GET USER2 Url: https://teapot:8081/default_area Expected status: 500 != 200 0.102 s Teapot-Tests
PUT REQUEST USER1 Url: https://teapot:8081/default_area/TestFile1 Expected status: 500 != 201 Also teardown failed: Url: https://teapot:8081/default_area/TestFile1 Expected status: 500 != 204 0.027 s Teapot-Tests
PUT REQUEST USER2 Url: https://teapot:8081/default_area/TestFile1 Expected status: 500 != 201 Also teardown failed: Url: https://teapot:8081/default_area/TestFile1 Expected status: 500 != 204 0.025 s Teapot-Tests
GET FILE USER1 Setup failed: Url: https://teapot:8081/default_area/TestFile2 Expected status: 500 != 201 Also teardown failed: Url: https://teapot:8081/default_area/TestFile2 Expected status: 500 != 204 0.026 s Teapot-Tests
GET FILE USER2 Setup failed: Url: https://teapot:8081/default_area/TestFile2 Expected status: 500 != 201 Also teardown failed: Url: https://teapot:8081/default_area/TestFile2 Expected status: 500 != 204 0.025 s Teapot-Tests
DELETE REQUEST USER1 Setup failed: Url: https://teapot:8081/default_area/TestFile2 Expected status: 500 != 201 0.013 s Teapot-Tests
DELETE REQUEST USER2 Setup failed: Url: https://teapot:8081/default_area/TestFile2 Expected status: 500 != 201 0.013 s Teapot-Tests
GET USER1 EXTRA_AREA Url: https://teapot:8081/extra_area Expected status: 500 != 200 0.011 s Teapot-Tests
GET USER2 EXTRA_AREA Url: https://teapot:8081/extra_area Expected status: 500 != 200 0.011 s Teapot-Tests
PUT REQUEST USER1 EXTRA_AREA Url: https://teapot:8081/extra_area/TestFile1 Expected status: 500 != 201 Also teardown failed: Url: https://teapot:8081/extra_area/TestFile1 Expected status: 500 != 204 0.030 s Teapot-Tests
PUT REQUEST USER2 EXTRA_AREA Url: https://teapot:8081/extra_area/TestFile1 Expected status: 500 != 201 Also teardown failed: Url: https://teapot:8081/extra_area/TestFile1 Expected status: 500 != 204 0.027 s Teapot-Tests
GET FILE USER1 EXTRA_AREA Setup failed: Url: https://teapot:8081/extra_area/TestFile2 Expected status: 500 != 201 Also teardown failed: Url: https://teapot:8081/extra_area/TestFile2 Expected status: 500 != 204 0.027 s Teapot-Tests
GET FILE USER2 EXTRA_AREA Setup failed: Url: https://teapot:8081/extra_area/TestFile2 Expected status: 500 != 201 Also teardown failed: Url: https://teapot:8081/extra_area/TestFile2 Expected status: 500 != 204 0.028 s Teapot-Tests
DELETE REQUEST USER1 EXTRA_AREA Setup failed: Url: https://teapot:8081/extra_area/TestFile2 Expected status: 500 != 201 0.012 s Teapot-Tests
DELETE REQUEST USER2 EXTRA_AREA Setup failed: Url: https://teapot:8081/extra_area/TestFile2 Expected status: 500 != 201 0.012 s Teapot-Tests

Please sign in to comment.