From 6f34235417364a50bf9d56595d3a69b1dee467ef Mon Sep 17 00:00:00 2001 From: Krisjanis Veinbahs Date: Tue, 15 Mar 2022 01:58:10 +0200 Subject: [PATCH] Add auth to CLI commands --- .../main/scala/diptestbed/domain/User.scala | 4 +- .../web/control/ApiHardwareController.scala | 24 +++--- .../web/control/ApiSoftwareController.scala | 6 +- .../web/control/AppHardwareController.scala | 3 +- .../web/control/AppSoftwareController.scala | 2 +- .../web/views/hardwareList.scala.html | 6 ++ client/src/service/backend.py | 21 +++-- client/src/service/cli.py | 78 +++++++++++++------ client/src/service/click.py | 48 +++++++++--- client/src/service/managed_url.py | 42 ++++++++-- 10 files changed, 163 insertions(+), 71 deletions(-) diff --git a/backend/domain/src/main/scala/diptestbed/domain/User.scala b/backend/domain/src/main/scala/diptestbed/domain/User.scala index b91ba78..1ca4b47 100644 --- a/backend/domain/src/main/scala/diptestbed/domain/User.scala +++ b/backend/domain/src/main/scala/diptestbed/domain/User.scala @@ -7,6 +7,6 @@ case class User( isLabOwner: Boolean, // Lab owners can create and manage hardware isDeveloper: Boolean // Developers can upload their own software ) { - def canAccessHardware: Boolean = isLabOwner || isDeveloper - def canAccessSoftware: Boolean = isLabOwner || isDeveloper + def canInteractHardware: Boolean = isManager || isLabOwner || isDeveloper + def canCreateSoftware: Boolean = isManager || isLabOwner || isDeveloper } diff --git a/backend/web/app/diptestbed/web/control/ApiHardwareController.scala b/backend/web/app/diptestbed/web/control/ApiHardwareController.scala index 3503788..7948a69 100644 --- a/backend/web/app/diptestbed/web/control/ApiHardwareController.scala +++ b/backend/web/app/diptestbed/web/control/ApiHardwareController.scala @@ -65,8 +65,6 @@ class ApiHardwareController( def getHardwares: Action[AnyContent] = IOActionAny(withRequestAuthnOrFail(_)((_, user) => for { - _ <- EitherT.fromEither[IO](Either.cond( - user.canAccessHardware, (), permissionErrorResult("Hardware access"))) hardwares <- EitherT(hardwareService.getHardwares(Some(user), write = false)).leftMap(databaseErrorResult) result = Success(hardwares).withHttpStatus(OK) } yield result @@ -75,8 +73,6 @@ class ApiHardwareController( def getHardware(hardwareId: HardwareId): Action[AnyContent] = IOActionAny(withRequestAuthnOrFail(_)((_, user) => for { - _ <- EitherT.fromEither[IO](Either.cond( - user.canAccessHardware, (), permissionErrorResult("Hardware access"))) hardware <- EitherT(hardwareService.getHardware(Some(user), hardwareId, write = false)).leftMap(databaseErrorResult) _ <- EitherT.fromEither[IO](hardware.toRight(unknownIdErrorResult)) result = Success(hardware).withHttpStatus(OK) @@ -101,7 +97,7 @@ class ApiHardwareController( hardware <- EitherT(hardwareService.getHardware(Some(user), hardwareId, write = false)).leftMap(databaseErrorResult) _ <- EitherT.fromEither[IO](hardware.toRight(unknownIdErrorResult)) _ <- EitherT.fromEither[IO](Either.cond( - user.canAccessHardware, (), permissionErrorResult("Hardware access"))) + user.canInteractHardware, (), permissionErrorResult("Hardware access"))) uploadResult <- HardwareControlActor.requestSoftwareUpload(hardwareId, softwareId).bimap( errorMessage => Failure(errorMessage).withHttpStatus(BAD_REQUEST), result => Success(result.toString).withHttpStatus(OK), @@ -146,14 +142,16 @@ class ApiHardwareController( hardware <- EitherT(hardwareService.getHardware(Some(user), hardwareId, write = false)).leftMap(databaseErrorResult) _ <- EitherT.fromEither[IO](hardware.toRight(unknownIdErrorResult)) _ <- EitherT.fromEither[IO](Either.cond( - user.canAccessHardware, (), permissionErrorResult("Hardware access"))) - source <- HardwareCameraListenerActor.spawnCameraSource(pubSubMediator, hardwareId) - .leftMap(Failure(_).withHttpStatus(BAD_REQUEST)) - content = request.headers.get("Range") match { - case None => Ok("") - case Some(_) => Ok.chunked(source) - } - result <- EitherT.fromEither[IO](Right(withStreamHeaders(content))) + user.canInteractHardware, (), permissionErrorResult("Hardware access"))) + result <- + request.headers.get("Range") match { + case None => EitherT.fromEither[IO](Right[Result, Result](withStreamHeaders(Ok("")))) + case Some(_) => HardwareCameraListenerActor.spawnCameraSource(pubSubMediator, hardwareId).bimap[Result, Result]( + errorMessage => Failure(errorMessage).withHttpStatus(BAD_REQUEST), + source => + withStreamHeaders(Ok.chunked(source)), + ) + } } yield result })) diff --git a/backend/web/app/diptestbed/web/control/ApiSoftwareController.scala b/backend/web/app/diptestbed/web/control/ApiSoftwareController.scala index 096e6c7..5db84c0 100644 --- a/backend/web/app/diptestbed/web/control/ApiSoftwareController.scala +++ b/backend/web/app/diptestbed/web/control/ApiSoftwareController.scala @@ -41,6 +41,8 @@ class ApiSoftwareController( maybeUser .leftMap(databaseErrorResult) .flatMap(_.toRight(authorizationErrorResult))) + _ <- EitherT.fromEither[IO](Either.cond( + user.canCreateSoftware, (), permissionErrorResult("Software access"))) software <- EitherT.fromEither[IO]( (request.body.file("software"), request.body.dataParts.get("name").flatMap(_.headOption)) .tupled @@ -66,8 +68,6 @@ class ApiSoftwareController( def getSoftwareMetas: Action[AnyContent] = IOActionAny(withRequestAuthnOrFail(_)((_, user) => for { - _ <- EitherT.fromEither[IO](Either.cond( - user.canAccessSoftware, (), permissionErrorResult("Software access"))) result <- EitherT(softwareService.getSoftwareMetas(Some(user), write = false)) .leftMap(databaseErrorResult) .map(softwareMetas => Success(softwareMetas).withHttpStatus(OK)) @@ -77,8 +77,6 @@ class ApiSoftwareController( def getSoftware(softwareId: SoftwareId): Action[AnyContent] = IOActionAny(withRequestAuthnOrFail(_)((_, user) => for { - _ <- EitherT.fromEither[IO](Either.cond( - user.canAccessSoftware, (), permissionErrorResult("Software access"))) software <- EitherT(softwareService.getSoftware(Some(user), softwareId, write = false)).leftMap(databaseErrorResult) existingSoftware <- EitherT.fromEither[IO](software.toRight(unknownIdErrorResult)) result = { diff --git a/backend/web/app/diptestbed/web/control/AppHardwareController.scala b/backend/web/app/diptestbed/web/control/AppHardwareController.scala index 80f72d6..113f099 100644 --- a/backend/web/app/diptestbed/web/control/AppHardwareController.scala +++ b/backend/web/app/diptestbed/web/control/AppHardwareController.scala @@ -48,7 +48,7 @@ class AppHardwareController( def dbHardware(sessionUser: User): IO[List[Hardware]] = hardwareService.getHardwares(Some(sessionUser), write = false) - .map(_.toOption.sequence.flatten.toList.filter(_ => sessionUser.canAccessHardware)) + .map(_.toOption.sequence.flatten.toList) def list: Action[AnyContent] = Action { implicit request => @@ -58,7 +58,6 @@ class AppHardwareController( }.unsafeRunSync() } - def publicRequest: Action[AnyContent] = Action { implicit request => withRequestAuthnOrLoginRedirect[AnyContent] { case (_, user) => diff --git a/backend/web/app/diptestbed/web/control/AppSoftwareController.scala b/backend/web/app/diptestbed/web/control/AppSoftwareController.scala index d6e862c..f749b05 100644 --- a/backend/web/app/diptestbed/web/control/AppSoftwareController.scala +++ b/backend/web/app/diptestbed/web/control/AppSoftwareController.scala @@ -49,7 +49,7 @@ class AppSoftwareController( def dbSoftware(sessionUser: User): IO[List[SoftwareMeta]] = softwareService.getSoftwareMetas(Some(sessionUser), write = false) - .map(_.toOption.sequence.flatten.toList.filter(_ => sessionUser.canAccessSoftware)) + .map(_.toOption.sequence.flatten.toList) def list: Action[AnyContent] = Action { implicit r => diff --git a/backend/web/app/diptestbed/web/views/hardwareList.scala.html b/backend/web/app/diptestbed/web/views/hardwareList.scala.html index 3b4ac92..5ebb5eb 100644 --- a/backend/web/app/diptestbed/web/views/hardwareList.scala.html +++ b/backend/web/app/diptestbed/web/views/hardwareList.scala.html @@ -39,6 +39,12 @@ @if(user.map(_.id).contains(hardware.ownerId) || user.exists(_.isManager)) { } + + + } diff --git a/client/src/service/backend.py b/client/src/service/backend.py index 982fc81..a9d6dd2 100644 --- a/client/src/service/backend.py +++ b/client/src/service/backend.py @@ -1,5 +1,6 @@ """Module for backend management service definitions""" import os +import urllib.request from typing import List, TypeVar, Dict, Optional from dataclasses import dataclass import base64 @@ -134,13 +135,15 @@ def hardware_video_source_url(self, hardware_id: ManagedUUID) -> Result[ManagedU return self.control_url(f"{self.config.api_prefix}/hardware/video/source?hardware={hardware_id.value}") def hardware_video_sink_url(self, hardware_id: ManagedUUID) -> Result[ManagedURL, BackendManagementError]: - return self.static_url(f"{self.config.api_prefix}/hardware/video/sink/{hardware_id.value}.ogg") + url_result = self.static_url(f"{self.config.api_prefix}/hardware/video/sink/{hardware_id.value}.ogg") + if isinstance(url_result, Err): + return Err(BackendManagementError("Control server URL build failed", exception=url_result.value)) + return url_result.value.with_basic_auth(self.config.auth.username, self.config.auth.password) # Bad hack def hardware_serial_monitor_url(self, hardware_id: ManagedUUID) -> Result[ManagedURL, BackendManagementError]: """Build hardware serial monitor URL""" return self.control_url(f"{self.config.api_prefix}/hardware/{hardware_id.value}/monitor/serial") - @staticmethod def response_to_result( response: Response, @@ -236,7 +239,7 @@ def user_list(self) -> Result[List[User], BackendManagementError]: """Fetch a list of users""" path = f"{self.config.api_prefix}/user" decoder = s11n_json.list_decoder_json(s11n_json.USER_DECODER_JSON) - return self.static_get_json_result(path, decoder) + return self.static_get_json_result(path, decoder, headers=self.config.auth.auth_headers()) def user_create(self, username: str, password: str) -> Result[User, str]: """Create a new user""" @@ -251,7 +254,7 @@ def hardware_list(self) -> Result[List[Hardware], BackendManagementError]: """Fetch a list of hardware""" path = f"{self.config.api_prefix}/hardware" decoder = s11n_json.list_decoder_json(s11n_json.HARDWARE_DECODER_JSON) - return self.static_get_json_result(path, decoder) + return self.static_get_json_result(path, decoder, headers=self.config.auth.auth_headers()) def hardware_create( self, @@ -273,7 +276,7 @@ def hardware_software_upload( ) -> Optional[BackendManagementError]: """Upload a given software to a given hardware""" path = f"{self.config.api_prefix}/hardware/{hardware_id.value}/upload/software/{software_id.value}" - result = self.static_post_json_result(path) + result = self.static_post_json_result(path, headers=self.config.auth.auth_headers()) if isinstance(result, Err): return result.value return None @@ -283,7 +286,7 @@ def software_list(self) -> Result[List[Software], str]: """Fetch a list of software""" path = f"{self.config.api_prefix}/software" decoder = s11n_json.list_decoder_json(s11n_json.SOFTWARE_DECODER_JSON) - return self.static_get_json_result(path, decoder) + return self.static_get_json_result(path, decoder, headers=self.config.auth.auth_headers()) def software_upload( self, @@ -309,12 +312,14 @@ def software_download( # Build URL url_result = self.static_url(f"{self.config.api_prefix}/software/{software_id.value}/download") if isinstance(url_result, Err): return Err(url_result.value) + url_with_auth_result = url_result.value.with_basic_auth(self.config.auth.username, self.config.auth.password) # Bad hack + if isinstance(url_with_auth_result, Err): return Err(url_with_auth_result.value) # Download file if file_path is None: - file_result = url_result.value.downloaded_file_in_temp() + file_result = url_with_auth_result.value.downloaded_file_in_temp() else: - file_result = url_result.value.downloaded_file_in_path(file_path) + file_result = url_with_auth_result.value.downloaded_file_in_path(file_path) # Handle file download if isinstance(file_result, Err): diff --git a/client/src/service/cli.py b/client/src/service/cli.py index 53886ad..0436a5b 100755 --- a/client/src/service/cli.py +++ b/client/src/service/cli.py @@ -150,7 +150,9 @@ async def agent_fake( @staticmethod def user_list( config_path_str: Optional[str], - static_server_str: Optional[str] + static_server_str: Optional[str], + username_str: Optional[str], + password_str: Optional[str] ) -> Result[List[User], DIPClientError]: pass @@ -166,7 +168,9 @@ def user_create( @staticmethod def hardware_list( config_path_str: Optional[str], - static_server_str: Optional[str] + static_server_str: Optional[str], + username_str: Optional[str], + password_str: Optional[str] ) -> Result[List[Hardware], DIPClientError]: pass @@ -180,10 +184,22 @@ def hardware_create( ) -> Result[Hardware, DIPClientError]: pass + @staticmethod + def hardware_stream_open( + config_path_str: Optional[str], + static_server_str: Optional[str], + hardware_id_str: str, + username_str: Optional[str], + password_str: Optional[str], + ) -> Optional[DIPClientError]: + pass + @staticmethod def software_list( config_path_str: Optional[str], - static_server_str: str + static_server_str: str, + username_str: Optional[str], + password_str: Optional[str], ) -> Result[List[Software], DIPClientError]: pass @@ -203,7 +219,9 @@ def software_download( config_path_str: Optional[str], static_server_str: Optional[str], software_id_str: str, - file_path: str + file_path: str, + username_str: Optional[str], + password_str: Optional[str], ) -> Result[ExistingFilePath, DIPClientError]: pass @@ -212,7 +230,9 @@ def hardware_software_upload( config_path_str: Optional[str], static_server_str: Optional[str], hardware_id_str: str, - software_id_str: str + software_id_str: str, + username_str: Optional[str], + password_str: Optional[str], ) -> Optional[DIPClientError]: pass @@ -623,9 +643,11 @@ async def agent_fake( @staticmethod def user_list( config_path_str: Optional[str], - static_server_str: Optional[str] + static_server_str: Optional[str], + username_str: Optional[str], + password_str: Optional[str] ) -> Result[List[User], DIPClientError]: - backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, None, None) + backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, username_str, password_str) if isinstance(backend_result, Err): return Err(backend_result.value) return backend_result.value.user_list() @@ -643,9 +665,11 @@ def user_create( @staticmethod def hardware_list( config_path_str: Optional[str], - static_server_str: Optional[str] + static_server_str: Optional[str], + username_str: Optional[str], + password_str: Optional[str] ) -> Result[List[Hardware], DIPClientError]: - backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, None, None) + backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, username_str, password_str) if isinstance(backend_result, Err): return Err(backend_result.value) return backend_result.value.hardware_list() @@ -665,9 +689,11 @@ def hardware_create( def hardware_stream_open( config_path_str: Optional[str], static_server_str: Optional[str], - hardware_id_str: str + hardware_id_str: str, + username_str: Optional[str], + password_str: Optional[str], ) -> Optional[DIPClientError]: - backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, None, None) + backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, username_str, password_str) if isinstance(backend_result, Err): return backend_result.value hardware_id_result = ManagedUUID.build(hardware_id_str) if isinstance(hardware_id_result, Err): return hardware_id_result.value.of_type("hardware") @@ -682,9 +708,11 @@ def hardware_stream_open( @staticmethod def software_list( config_path_str: Optional[str], - static_server_str: Optional[str] + static_server_str: Optional[str], + username_str: Optional[str], + password_str: Optional[str], ) -> Result[List[Software], DIPClientError]: - backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, None, None) + backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, username_str, password_str) if isinstance(backend_result, Err): return Err(backend_result.value) return backend_result.value.software_list() @@ -708,22 +736,28 @@ def software_download( config_path_str: Optional[str], static_server_str: Optional[str], software_id_str: str, - file_path: str - ) -> Result[ExistingFilePath, DIPClientError]: - backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, None, None) - if isinstance(backend_result, Err): return Err(backend_result.value) + file_path: str, + username_str: Optional[str], + password_str: Optional[str], + ) -> Optional[DIPClientError]: + backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, username_str, password_str) + if isinstance(backend_result, Err): return backend_result.value software_id_result = ManagedUUID.build(software_id_str) - if isinstance(software_id_result, Err): return Err(software_id_result.value.of_type("software")) - return backend_result.value.software_download(software_id_result.value, file_path) + if isinstance(software_id_result, Err): return software_id_result.value.of_type("software") + download_result = backend_result.value.software_download(software_id_result.value, file_path) + if isinstance(download_result, Err): return download_result.value + return None @staticmethod def hardware_software_upload( config_path_str: Optional[str], static_server_str: Optional[str], hardware_id_str: str, - software_id_str: str + software_id_str: str, + username_str: Optional[str], + password_str: Optional[str], ) -> Optional[DIPClientError]: - backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, None, None) + backend_result = CLI.parsed_backend(config_path_str, None, static_server_str, username_str, password_str) if isinstance(backend_result, Err): return backend_result.value software_id_result = ManagedUUID.build(software_id_str) if isinstance(software_id_result, Err): return software_id_result.value.of_type("software") @@ -790,7 +824,7 @@ async def quick_run( # Forward software to board LOGGER.info("Forwarding software to board") forward_error = CLI.hardware_software_upload( - config_path_str, static_server_str, hardware_id_str, str(software.id.value)) + config_path_str, static_server_str, hardware_id_str, str(software.id.value), username_str, password_str) if forward_error is not None: return Err(forward_error) # Create serial monitor connection to board LOGGER.info("Configuring serial connection monitor with board") diff --git a/client/src/service/click.py b/client/src/service/click.py index 7c3e054..79acabd 100755 --- a/client/src/service/click.py +++ b/client/src/service/click.py @@ -390,15 +390,19 @@ async def exec(): @CONFIG_PATH_OPTION @JSON_OUTPUT_OPTION @STATIC_SERVER_OPTION +@USERNAME_OPTION +@PASSWORD_OPTION def user_list( config_path_str: Optional[str], json_output: bool, - static_server_str: Optional[str] + static_server_str: Optional[str], + username_str: Optional[str], + password_str: Optional[str] ): """Fetch list of users""" CLI.execute_table_result( json_output, - CLI.user_list(config_path_str, static_server_str), + CLI.user_list(config_path_str, static_server_str, username_str, password_str), s11n_json.list_encoder_json(s11n_json.USER_ENCODER_JSON), s11n_rich.RichUserEncoder() ) @@ -430,15 +434,19 @@ def user_create( @CONFIG_PATH_OPTION @JSON_OUTPUT_OPTION @STATIC_SERVER_OPTION +@USERNAME_OPTION +@PASSWORD_OPTION def hardware_list( config_path_str: Optional[str], json_output: bool, - static_server_str: Optional[str] + static_server_str: Optional[str], + username_str: Optional[str], + password_str: Optional[str] ): """Fetch list of hardware""" CLI.execute_table_result( json_output, - CLI.hardware_list(config_path_str, static_server_str), + CLI.hardware_list(config_path_str, static_server_str, username_str, password_str), s11n_json.list_encoder_json(s11n_json.HARDWARE_ENCODER_JSON), s11n_rich.RichHardwareEncoder() ) @@ -473,15 +481,19 @@ def hardware_create( @CONFIG_PATH_OPTION @JSON_OUTPUT_OPTION @STATIC_SERVER_OPTION +@USERNAME_OPTION +@PASSWORD_OPTION def software_list( config_path_str: Optional[str], json_output: bool, - static_server_str: Optional[str] + static_server_str: Optional[str], + username_str: Optional[str], + password_str: Optional[str], ): """Fetch list of software""" CLI.execute_table_result( json_output, - CLI.software_list(config_path_str, static_server_str), + CLI.software_list(config_path_str, static_server_str, username_str, password_str), s11n_json.list_encoder_json(s11n_json.SOFTWARE_ENCODER_JSON), s11n_rich.RichSoftwareEncoder() ) @@ -519,16 +531,20 @@ def software_upload( @STATIC_SERVER_OPTION @SOFTWARE_ID_OPTION @SOFTWARE_FILE_PATH_OPTION +@USERNAME_OPTION +@PASSWORD_OPTION def software_download( config_path_str: Optional[str], static_server_str: Optional[str], software_id_str: str, - software_file_path: str + software_file_path: str, + username_str: Optional[str], + password_str: Optional[str], ): """Download existing software""" CLI.execute_optional_result( False, - CLI.software_download(config_path_str, static_server_str, software_id_str, software_file_path), + CLI.software_download(config_path_str, static_server_str, software_id_str, software_file_path, username_str, password_str), f"Downloaded software at '{software_file_path}'" ) @@ -538,16 +554,20 @@ def software_download( @STATIC_SERVER_OPTION @HARDWARE_ID_OPTION @SOFTWARE_ID_OPTION +@USERNAME_OPTION +@PASSWORD_OPTION def hardware_software_upload( config_path_str: Optional[str], static_server_str: Optional[str], hardware_id_str: str, - software_id_str: str + software_id_str: str, + username_str: Optional[str], + password_str: Optional[str], ): """Upload software to hardware""" CLI.execute_optional_result( False, - CLI.hardware_software_upload(config_path_str, static_server_str, hardware_id_str, software_id_str), + CLI.hardware_software_upload(config_path_str, static_server_str, hardware_id_str, software_id_str, username_str, password_str), f"Uploaded software '{software_id_str}' to hardware '{hardware_id_str}'" ) @@ -669,14 +689,18 @@ async def exec(): @CONFIG_PATH_OPTION @HARDWARE_ID_OPTION @STATIC_SERVER_OPTION +@USERNAME_OPTION +@PASSWORD_OPTION def hardware_stream_open( config_path_str: Optional[str], hardware_id_str: str, - static_server_str: Optional[str] + static_server_str: Optional[str], + username_str: Optional[str], + password_str: Optional[str], ): """Open hardware video stream in a browser""" CLI.execute_optional_result( False, - CLI.hardware_stream_open(config_path_str, static_server_str, hardware_id_str), + CLI.hardware_stream_open(config_path_str, static_server_str, hardware_id_str, username_str, password_str), f"Opening hardware stream for '{hardware_id_str}'" ) diff --git a/client/src/service/managed_url.py b/client/src/service/managed_url.py index 9804d6b..b26fa64 100644 --- a/client/src/service/managed_url.py +++ b/client/src/service/managed_url.py @@ -1,5 +1,6 @@ #!/usr/bin/env python """Module for managing URLs""" +import base64 import dataclasses from dataclasses import dataclass import tempfile @@ -7,6 +8,7 @@ import urllib.parse from typing import Optional from urllib.parse import ParseResult +from urllib.request import Request from result import Result, Ok, Err from requests import Response from src.domain.dip_client_error import DIPClientError @@ -68,16 +70,42 @@ def with_absolute_path(self, absolute_path: str) -> Result['ManagedURL', Excepti except Exception as e: return Err(e) + def with_basic_auth(self, username: str, password: str) -> Result['ManagedURL', Exception]: + """Append an additional string path to a parsed URL""" + try: + copy = dataclasses.replace(self) + result = copy.value._replace(netloc=f"{username}:{password}@{copy.value.netloc}") + return Ok(ManagedURL(result)) + except Exception as e: + return Err(e) + def downloaded_file_in_path(self, path: str) -> Result[ExistingFilePath, str]: """Download file and return file path""" try: - url_result = self.text() - if isinstance(url_result, Err): - return url_result - url_text: str = url_result.value - LOGGER.debug(f"HTTP download. URL: {url_text}, file: {path}") - urllib.request.urlretrieve(url_text, path) - return Ok(ExistingFilePath(path)) + # Don't even comment, I know this is bad, it makes me cry too + basic_auth_index = self.value.netloc.find("@") + has_basic_auth = basic_auth_index != -1 + if has_basic_auth: + basic_auth = self.value.netloc[0:basic_auth_index] + basic_auth_b64 = base64.b64encode(basic_auth.encode()).decode("utf-8") + url_without_basic_auth = ManagedURL(self.value._replace(netloc=self.value.netloc[basic_auth_index + 1:])) + + req = Request(url_without_basic_auth.text().value) + req.add_header("Authorization", f"Basic {basic_auth_b64}") + with urllib.request.urlopen(req) as url: + contents = url.read() + f = open(path, "wb") + f.write(contents) + f.close() + return Ok(ExistingFilePath(path)) + else: + url_result = self.text() + if isinstance(url_result, Err): + return url_result + url_text: str = url_result.value + LOGGER.debug(f"HTTP download. URL: {url_text}, file: {path}") + urllib.request.urlretrieve(url_text, path) + return Ok(ExistingFilePath(path)) except Exception as e: return Err(e)