diff --git a/teapot.py b/teapot.py index f94c42a..f4e7853 100644 --- a/teapot.py +++ b/teapot.py @@ -72,7 +72,7 @@ async def lifespan(app: FastAPI): # everything after the yield should be executed after shutdown handles = app.state.session_state.keys() for k in list(handles): - await _stop_webdav_instance(k) + await _stop_webdav_instance(k, sw_state, sw_state_lock) session_store_path = Path(SESSION_STORE_PATH) if session_store_path.exists(): session_store_path.unlink() @@ -135,6 +135,10 @@ async def lifespan(app: FastAPI): # lock for the state. any write operation on the state should only be done # in an "async with app.state_lock:" environment. app.state.state_lock = anyio.Lock() +# state of the storm webdav servers +sw_state: dict[str, str] = {} +# lock for the state of the storm webdav servers +sw_state_lock = anyio.Lock() context = ssl.create_default_context() context.load_verify_locations(cafile=config["Teapot"]["Teapot_CA"]) @@ -466,8 +470,11 @@ async def _get_proc(cmd): raise RuntimeError("process with full command ", +cmd + "does not exist.") -async def _stop_webdav_instance(username): +async def _stop_webdav_instance(username, state, state_lock): logger.info("Stopping webdav instance for user %s.", username) + async with state_lock: + state[username] = "STOPPING" + state_lock.notify() logger.debug( "_stop_webdav_instance: trying to acquire lock at %s", @@ -515,6 +522,11 @@ async def _stop_webdav_instance(username): else: logger.info("Successfully killed process with PID %d.", pid) exit_code = 0 + async with state_lock: + if state[username] == "STOPPING": + state[username] = "NOT RUNNING" + state_lock.notify() + except subprocess.CalledProcessError as e: logger.error( "Exception occurred while trying to kill process \ @@ -548,10 +560,6 @@ async def stop_expired_instances(): while True: await asyncio.sleep(CHECK_INTERVAL_SEC) logger.debug("checking for expired instances") - logger.debug( - "stop_expired_instances: trying to acquire 'users' lock at %s", - {datetime.datetime.now().isoformat()}, - ) async with app.state.state_lock: logger.debug( "stop_expired_instances: acquired 'users' lock at %s", @@ -566,17 +574,13 @@ async def stop_expired_instances(): {datetime.datetime.now().isoformat()}, ) async with app.state.state_lock: - logger.debug( - "stop_expired_instances: acquired 'user_dict' lock at %s", - {datetime.datetime.now().isoformat()}, - ) user_dict = app.state.session_state.get(user, None) if user_dict is not None: last_accessed = user_dict.get("last_accessed", None) if last_accessed is not None: diff = now - datetime.datetime.fromisoformat(last_accessed) if diff.seconds >= INSTANCE_TIMEOUT_SEC: - res = await _stop_webdav_instance(user) + res = await _stop_webdav_instance(user, sw_state, sw_state_lock) # TO DO: remove instance from session_state if res != 0: logger.error( @@ -737,6 +741,78 @@ async def _map_fed_to_local(sub): return None +async def storm_webdav_state(state, state_lock, user): + """ + This function manages the state of the storm-webdav instance for a particular user. + There are four possible states for a storm-webdav instance: STARTING, RUNNING, + STOPPING, NOT_RUNNING. The default state is NOT_RUNNING. Transition between + different states is triggered by an incomming request or by storm-webdav instance + reaching the inactivity treshold. + """ + should_start_sw = False + async with state_lock: + if user not in state: + state[user] = "NOT_RUNNING" + while not (state[user] == "NOT RUNNING" or state[user] == "RUNNING"): + state_lock.wait() + + if state[user] == "NOT_RUNNING": + state[user] = "STARTING" + state_lock.notify() + logger.info( + "No instance running for user %s yet, starting now.", user) + should_start_sw = True + + if state[user] == "RUNNING": + async with app.state.state_lock: + app.state.session_state[user]["last_accessed"] = str( + datetime.datetime.now() + ) + should_start_sw = False + + if should_start_sw: + port = await _find_usable_port_no() + pid = await _start_webdav_instance(user, port) + + if not pid: + async with state_lock: + if state[user] == "STARTING": + state[user] = "NOT RUNNING" + state_lock.notify() + async with app.state.state_lock: + app.state.session_state[user] = { + "pid": None, + "port": -1, + "created_at": None, + "last_accessed": str(datetime.datetime.now()), + } + logger.error( + "something went wrong while starting instance for user %s.", + user, + ) + return -1 + + async with state_lock: + if state[user] == "STARTING": + state[user] = "RUNNING" + state_lock.notify() + async with app.state.state_lock: + app.state.session_state[user] = { + "pid": pid, + "port": port, + "created_at": datetime.datetime.now(), + "last_accessed": str(datetime.datetime.now()), + } + return port + + else: + async with app.state.state_lock: + port = app.state.session_state[user].get("port", None) + logger.info( + "StoRM-WebDAV instance for %s is running on port %d", user, port) + return port + + async def _return_or_create_storm_instance(sub): # returns redirect_host and redirect port for sub. @@ -746,117 +822,60 @@ async def _return_or_create_storm_instance(sub): # local user is unknown, we cannot start or check anything. return None, None, None - # now check if an instance is running by checking the global state - if local_user in app.state.session_state.keys(): - logger.debug( - "_return_or_create_storm_instance: trying to acquire 'get' lock \ - at %s", - datetime.datetime.now().isoformat(), - ) - async with app.state.state_lock: + port = await storm_webdav_state(sw_state, sw_state_lock, local_user) + + running = False + loops = 0 + while not running: + await anyio.sleep(1) + if loops >= STARTUP_TIMEOUT: logger.debug( - "_return_or_create_storm_instance: acquired 'get' lock at \ - %s", - datetime.datetime.now().isoformat(), - ) - port = app.state.session_state[local_user].get("port", None) - app.state.session_state[local_user]["last_accessed"] = str( - datetime.datetime.now() - ) - logger.info( - "StoRM-WebDAV instance for %s is running on port %d", - local_user, - port, - ) - else: - # if no instance is running, start it. but first, it has to be checked - # if the directories exist, if not, they need to be created. - # also, we need to write the env vars into a .bash_profile for the - # user, so they are there when the webdav-instance is started. - # the port, pid, storage_area and directory will be managed within - # an sqlite database here in teapot. no external scripts anymore - # to keep the state and its management in one place. - logger.debug( - "no instance running for user %s yet, starting \ - now.", - local_user, - ) - port = await _find_usable_port_no() - pid = await _start_webdav_instance(local_user, port) - if not pid: - logger.error( - "something went wrong while starting instance for user %s.", + "instance for user %s not reachable after %d tries... \ + stop trying.", local_user, + STARTUP_TIMEOUT, ) - return None, -1, local_user - logger.debug( - "_return_or_create_storm_instance: trying to acquire 'set' lock \ - at %s", - datetime.datetime.now().isoformat(), - ) - async with app.state.state_lock: logger.debug( - "_return_or_create_storm_instance: acquired 'set' lock at %s", + "_return_or_create_storm_instance: trying to acquire \ + 'pop' lock at %s", datetime.datetime.now().isoformat(), ) - app.state.session_state[local_user] = { - "pid": pid, - "port": port, - "created_at": datetime.datetime.now(), - "last_accessed": str(datetime.datetime.now()), - } - running = False - loops = 0 - while not running: - await anyio.sleep(1) - if loops >= STARTUP_TIMEOUT: - logger.debug( - "instance for user %s not reachable after %d tries... \ - stop trying.", - local_user, - STARTUP_TIMEOUT, - ) + async with app.state.state_lock: logger.debug( - "_return_or_create_storm_instance: trying to acquire \ - 'pop' lock at %s", + "_return_or_create_storm_instance: acquired 'pop'\ + lock at %s", datetime.datetime.now().isoformat(), ) - async with app.state.state_lock: - logger.debug( - "_return_or_create_storm_instance: acquired 'pop'\ - lock at %s", - datetime.datetime.now().isoformat(), - ) - app.state.session_state.pop(local_user) - return None, -1, local_user - try: - logger.debug( - "checking if instance for user {local_user} is listening \ - on port %d.", - port, - ) - context1 = ssl.create_default_context() - context1.load_verify_locations( - cafile=config["Storm-webdav"]["Storm-webdav_CA"] - ) - resp = httpx.get( - "https://" - + config["Storm-webdav"]["SERVER_ADDRESS"] - + ":" - + str(port) - + "/", - verify=context1, - ) - if resp.status_code >= 200: - running = True - except httpx.ConnectError: - loops += 1 - logger.debug( - "_return_or_create: trying to reach instance, try \ - %d/%d...", - loops, - STARTUP_TIMEOUT, - ) + app.state.session_state.pop(local_user) + return None, -1, local_user + try: + logger.debug( + "checking if instance for user {local_user} is listening \ + on port %d.", + port, + ) + context1 = ssl.create_default_context() + context1.load_verify_locations( + cafile=config["Storm-webdav"]["Storm-webdav_CA"] + ) + resp = httpx.get( + "https://" + + config["Storm-webdav"]["SERVER_ADDRESS"] + + ":" + + str(port) + + "/", + verify=context1, + ) + if resp.status_code >= 200: + running = True + except httpx.ConnectError: + loops += 1 + logger.debug( + "_return_or_create: trying to reach instance, try \ + %d/%d...", + loops, + STARTUP_TIMEOUT, + ) logger.info( "Storm-WebDAV instance for %s started on port %d.",