From 33f5db028ea30ded8f652d51f32e631494bc5c41 Mon Sep 17 00:00:00 2001 From: "Caleb St. John" <30729806+yocalebo@users.noreply.github.com> Date: Sat, 14 Dec 2024 11:35:25 -0500 Subject: [PATCH] NAS-133082 / 25.04 / Move FileApplication into its own module (#15211) * move FileApplication to own module * format the file * move ShellApplication to apps dir * fix comment formatting --- src/middlewared/middlewared/apps/__init__.py | 2 + src/middlewared/middlewared/apps/file_app.py | 227 ++++++++++++++++++ .../webshell.py => apps/webshell_app.py} | 0 src/middlewared/middlewared/main.py | 185 +------------- .../middlewared/webshellapp/__init__.py | 0 5 files changed, 232 insertions(+), 182 deletions(-) create mode 100644 src/middlewared/middlewared/apps/__init__.py create mode 100644 src/middlewared/middlewared/apps/file_app.py rename src/middlewared/middlewared/{webshellapp/webshell.py => apps/webshell_app.py} (100%) delete mode 100644 src/middlewared/middlewared/webshellapp/__init__.py diff --git a/src/middlewared/middlewared/apps/__init__.py b/src/middlewared/middlewared/apps/__init__.py new file mode 100644 index 0000000000000..503f15db87e9b --- /dev/null +++ b/src/middlewared/middlewared/apps/__init__.py @@ -0,0 +1,2 @@ +from .file_app import FileApplication # noqa +from .webshell_app import ShellApplication # noqa diff --git a/src/middlewared/middlewared/apps/file_app.py b/src/middlewared/middlewared/apps/file_app.py new file mode 100644 index 0000000000000..43200ef272154 --- /dev/null +++ b/src/middlewared/middlewared/apps/file_app.py @@ -0,0 +1,227 @@ +from asyncio import run_coroutine_threadsafe +from json import dumps, loads +from urllib.parse import parse_qs + +from aiohttp import web + +from middlewared.pipe import Pipes +from middlewared.restful import ( + parse_credentials, + authenticate, + create_application, + copy_multipart_to_pipe, +) +from middlewared.service_exception import CallError + +__all__ = ("FileApplication",) + + +class FileApplication: + def __init__(self, middleware, loop): + self.middleware = middleware + self.loop = loop + self.jobs = {} + + def register_job(self, job_id, buffered): + # FIXME: Allow the job to run for infinite time + give 300 seconds to begin + # download instead of waiting 3600 seconds for the whole operation + self.jobs[job_id] = self.middleware.loop.call_later( + 3600 if buffered else 60, + lambda: self.middleware.create_task(self._cleanup_job(job_id)), + ) + + async def _cleanup_cancel(self, job_id): + job_cleanup = self.jobs.pop(job_id, None) + if job_cleanup: + job_cleanup.cancel() + + async def _cleanup_job(self, job_id): + if job_id not in self.jobs: + return + self.jobs[job_id].cancel() + del self.jobs[job_id] + + job = self.middleware.jobs[job_id] + await job.pipes.close() + + async def download(self, request): + path = request.path.split("/") + if not request.path[-1].isdigit(): + resp = web.Response() + resp.set_status(404) + return resp + + job_id = int(path[-1]) + + qs = parse_qs(request.query_string) + denied = False + filename = None + if "auth_token" not in qs: + denied = True + else: + auth_token = qs.get("auth_token")[0] + token = await self.middleware.call("auth.get_token", auth_token) + if not token: + denied = True + else: + if token["attributes"].get("job") != job_id: + denied = True + else: + filename = token["attributes"].get("filename") + if denied: + resp = web.Response() + resp.set_status(401) + return resp + + job = self.middleware.jobs.get(job_id) + if not job: + resp = web.Response() + resp.set_status(404) + return resp + + if job_id not in self.jobs: + resp = web.Response() + resp.set_status(410) + return resp + + resp = web.StreamResponse( + status=200, + reason="OK", + headers={ + "Content-Type": "application/octet-stream", + "Content-Disposition": f'attachment; filename="{filename}"', + "Transfer-Encoding": "chunked", + }, + ) + await resp.prepare(request) + + def do_copy(): + while True: + read = job.pipes.output.r.read(1048576) + if read == b"": + break + run_coroutine_threadsafe(resp.write(read), loop=self.loop).result() + + try: + await self._cleanup_cancel(job_id) + await self.middleware.run_in_thread(do_copy) + finally: + await job.pipes.close() + + await resp.drain() + return resp + + async def upload(self, request): + reader = await request.multipart() + + part = await reader.next() + if not part: + resp = web.Response(status=405, body="No part found on payload") + resp.set_status(405) + return resp + + if part.name != "data": + resp = web.Response( + status=405, body='"data" part must be the first on payload' + ) + resp.set_status(405) + return resp + + try: + data = loads(await part.read()) + except Exception as e: + return web.Response(status=400, body=str(e)) + + if "method" not in data: + return web.Response(status=422) + + try: + credentials = parse_credentials(request) + if credentials is None: + raise web.HTTPUnauthorized() + except web.HTTPException as e: + return web.Response(status=e.status_code, body=e.text) + app = await create_application(request) + try: + authenticated_credentials = await authenticate( + self.middleware, request, credentials, "CALL", data["method"] + ) + if authenticated_credentials is None: + raise web.HTTPUnauthorized() + except web.HTTPException as e: + credentials["credentials_data"].pop("password", None) + await self.middleware.log_audit_message( + app, + "AUTHENTICATION", + { + "credentials": credentials, + "error": e.text, + }, + False, + ) + return web.Response(status=e.status_code, body=e.text) + app = await create_application(request, authenticated_credentials) + credentials["credentials_data"].pop("password", None) + await self.middleware.log_audit_message( + app, + "AUTHENTICATION", + { + "credentials": credentials, + "error": None, + }, + True, + ) + + filepart = await reader.next() + + if not filepart or filepart.name != "file": + resp = web.Response( + status=405, body='"file" not found as second part on payload' + ) + resp.set_status(405) + return resp + + try: + serviceobj, methodobj = self.middleware.get_method(data["method"]) + if authenticated_credentials.authorize("CALL", data["method"]): + job = await self.middleware.call_with_audit( + data["method"], + serviceobj, + methodobj, + data.get("params") or [], + app, + pipes=Pipes(input_=self.middleware.pipe()), + ) + else: + await self.middleware.log_audit_message_for_method( + data["method"], + methodobj, + data.get("params") or [], + app, + True, + False, + False, + ) + raise web.HTTPForbidden() + await self.middleware.run_in_thread( + copy_multipart_to_pipe, self.loop, filepart, job.pipes.input + ) + except CallError as e: + if e.errno == CallError.ENOMETHOD: + status_code = 422 + else: + status_code = 412 + return web.Response(status=status_code, body=str(e)) + except web.HTTPException as e: + return web.Response(status=e.status_code, body=e.text) + except Exception as e: + return web.Response(status=500, body=str(e)) + + resp = web.Response( + status=200, + headers={ + "Content-Type": "application/json", + }, + body=dumps({"job_id": job.id}).encode(), + ) + return resp diff --git a/src/middlewared/middlewared/webshellapp/webshell.py b/src/middlewared/middlewared/apps/webshell_app.py similarity index 100% rename from src/middlewared/middlewared/webshellapp/webshell.py rename to src/middlewared/middlewared/apps/webshell_app.py diff --git a/src/middlewared/middlewared/main.py b/src/middlewared/middlewared/main.py index 30881d7f9a2eb..bece0924bffc0 100644 --- a/src/middlewared/middlewared/main.py +++ b/src/middlewared/middlewared/main.py @@ -8,14 +8,14 @@ from .api.base.server.ws_handler.base import BaseWebSocketHandler from .api.base.server.ws_handler.rpc import RpcWebSocketApp, RpcWebSocketAppEvent from .api.base.server.ws_handler.rpc import RpcWebSocketHandler +from .apps import FileApplication, ShellApplication from .common.event_source.manager import EventSourceManager from .event import Events from .job import Job, JobsQueue, State -from .pipe import Pipes, Pipe -from .restful import parse_credentials, authenticate, create_application, copy_multipart_to_pipe, RESTfulAPI +from .pipe import Pipe +from .restful import RESTfulAPI from .role import ROLES, RoleManager from .schema import Error as SchemaError, OROperator -from .webshellapp.webshell import ShellApplication import middlewared.service from .service_exception import ( adapt_exception, CallError, CallException, ErrnoMixin, ValidationError, ValidationErrors, @@ -71,7 +71,6 @@ import traceback import types import typing -import urllib.parse import uuid import tracemalloc @@ -365,184 +364,6 @@ def __setstate__(self, newstate): pass -class FileApplication(object): - - def __init__(self, middleware, loop): - self.middleware = middleware - self.loop = loop - self.jobs = {} - - def register_job(self, job_id, buffered): - self.jobs[job_id] = self.middleware.loop.call_later( - 3600 if buffered else 60, # FIXME: Allow the job to run for infinite time + give 300 seconds to begin - # download instead of waiting 3600 seconds for the whole operation - lambda: self.middleware.create_task(self._cleanup_job(job_id)), - ) - - async def _cleanup_cancel(self, job_id): - job_cleanup = self.jobs.pop(job_id, None) - if job_cleanup: - job_cleanup.cancel() - - async def _cleanup_job(self, job_id): - if job_id not in self.jobs: - return - self.jobs[job_id].cancel() - del self.jobs[job_id] - - job = self.middleware.jobs[job_id] - await job.pipes.close() - - async def download(self, request): - path = request.path.split('/') - if not request.path[-1].isdigit(): - resp = web.Response() - resp.set_status(404) - return resp - - job_id = int(path[-1]) - - qs = urllib.parse.parse_qs(request.query_string) - denied = False - filename = None - if 'auth_token' not in qs: - denied = True - else: - auth_token = qs.get('auth_token')[0] - token = await self.middleware.call('auth.get_token', auth_token) - if not token: - denied = True - else: - if token['attributes'].get('job') != job_id: - denied = True - else: - filename = token['attributes'].get('filename') - if denied: - resp = web.Response() - resp.set_status(401) - return resp - - job = self.middleware.jobs.get(job_id) - if not job: - resp = web.Response() - resp.set_status(404) - return resp - - if job_id not in self.jobs: - resp = web.Response() - resp.set_status(410) - return resp - - resp = web.StreamResponse(status=200, reason='OK', headers={ - 'Content-Type': 'application/octet-stream', - 'Content-Disposition': f'attachment; filename="{filename}"', - 'Transfer-Encoding': 'chunked', - }) - await resp.prepare(request) - - def do_copy(): - while True: - read = job.pipes.output.r.read(1048576) - if read == b'': - break - asyncio.run_coroutine_threadsafe(resp.write(read), loop=self.loop).result() - - try: - await self._cleanup_cancel(job_id) - await self.middleware.run_in_thread(do_copy) - finally: - await job.pipes.close() - - await resp.drain() - return resp - - async def upload(self, request): - reader = await request.multipart() - - part = await reader.next() - if not part: - resp = web.Response(status=405, body='No part found on payload') - resp.set_status(405) - return resp - - if part.name != 'data': - resp = web.Response(status=405, body='"data" part must be the first on payload') - resp.set_status(405) - return resp - - try: - data = json.loads(await part.read()) - except Exception as e: - return web.Response(status=400, body=str(e)) - - if 'method' not in data: - return web.Response(status=422) - - try: - credentials = parse_credentials(request) - if credentials is None: - raise web.HTTPUnauthorized() - except web.HTTPException as e: - return web.Response(status=e.status_code, body=e.text) - app = await create_application(request) - try: - authenticated_credentials = await authenticate(self.middleware, request, credentials, 'CALL', - data['method']) - if authenticated_credentials is None: - raise web.HTTPUnauthorized() - except web.HTTPException as e: - credentials['credentials_data'].pop('password', None) - await self.middleware.log_audit_message(app, 'AUTHENTICATION', { - 'credentials': credentials, - 'error': e.text, - }, False) - return web.Response(status=e.status_code, body=e.text) - app = await create_application(request, authenticated_credentials) - credentials['credentials_data'].pop('password', None) - await self.middleware.log_audit_message(app, 'AUTHENTICATION', { - 'credentials': credentials, - 'error': None, - }, True) - - filepart = await reader.next() - - if not filepart or filepart.name != 'file': - resp = web.Response(status=405, body='"file" not found as second part on payload') - resp.set_status(405) - return resp - - try: - serviceobj, methodobj = self.middleware.get_method(data['method']) - if authenticated_credentials.authorize('CALL', data['method']): - job = await self.middleware.call_with_audit(data['method'], serviceobj, methodobj, - data.get('params') or [], app, - pipes=Pipes(input_=self.middleware.pipe())) - else: - await self.middleware.log_audit_message_for_method(data['method'], methodobj, data.get('params') or [], - app, True, False, False) - raise web.HTTPForbidden() - await self.middleware.run_in_thread(copy_multipart_to_pipe, self.loop, filepart, job.pipes.input) - except CallError as e: - if e.errno == CallError.ENOMETHOD: - status_code = 422 - else: - status_code = 412 - return web.Response(status=status_code, body=str(e)) - except web.HTTPException as e: - return web.Response(status=e.status_code, body=e.text) - except Exception as e: - return web.Response(status=500, body=str(e)) - - resp = web.Response( - status=200, - headers={ - 'Content-Type': 'application/json', - }, - body=json.dumps({'job_id': job.id}).encode(), - ) - return resp - - class PreparedCall(typing.NamedTuple): args: list[typing.Any] | None = None executor: typing.Any | None = None diff --git a/src/middlewared/middlewared/webshellapp/__init__.py b/src/middlewared/middlewared/webshellapp/__init__.py deleted file mode 100644 index e69de29bb2d1d..0000000000000