From 195fdc818613f9a4b3847a64255633e8f82335cf Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 25 Oct 2024 11:43:09 -0700 Subject: [PATCH 01/14] Add ftp support --- src/toil/jobStores/abstractJobStore.py | 111 +++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 6 deletions(-) diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index 17a602e65f..230430d702 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -11,7 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import ftplib import logging +import netrc import os import pickle import re @@ -35,7 +37,7 @@ ) from urllib.error import HTTPError from urllib.parse import ParseResult, urlparse -from urllib.request import urlopen +from urllib.request import urlopen, Request from uuid import uuid4 from toil.common import Config, getNodeID, safeUnpickleFromStream @@ -1870,6 +1872,8 @@ class JobStoreSupport(AbstractJobStore, metaclass=ABCMeta): stores. """ + ftp = None + @classmethod def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: return url.scheme.lower() in ("http", "https", "ftp") and not export @@ -1894,11 +1898,14 @@ def _url_exists(cls, url: ParseResult) -> bool: ) def _get_size(cls, url: ParseResult) -> Optional[int]: if url.scheme.lower() == "ftp": - return None - with closing(urlopen(url.geturl())) as readable: - # just read the header for content length - size = readable.info().get("content-length") - return int(size) if size is not None else None + if cls.ftp is None: + cls.ftp = FtpFsAccess() + cls.ftp.size(url.geturl()) + + # just read the header for content length + resp = urlopen(Request(url.geturl(), method="HEAD")) + size = resp.info().get("content-length") + return int(size) if size is not None else None @classmethod def _read_from_url( @@ -1958,3 +1965,95 @@ def _get_is_directory(cls, url: ParseResult) -> bool: def _list_url(cls, url: ParseResult) -> list[str]: # TODO: Implement HTTP index parsing and FTP directory listing raise NotImplementedError("HTTP and FTP URLs cannot yet be listed") + + +class FtpFsAccess: + """ + FTP access with upload. + + Taken and modified from https://github.com/ohsu-comp-bio/cwl-tes/blob/03f0096f9fae8acd527687d3460a726e09190c3a/cwl_tes/ftp.py#L8 + """ + + def __init__( + self, cache: Optional[dict[Any, ftplib.FTP]] = None, insecure: bool = False + ): + self.cache = cache or {} + self.netrc = None + self.insecure = insecure + try: + if "HOME" in os.environ: + if os.path.exists(os.path.join(os.environ["HOME"], ".netrc")): + self.netrc = netrc.netrc(os.path.join(os.environ["HOME"], ".netrc")) + elif os.path.exists(os.path.join(os.curdir, ".netrc")): + self.netrc = netrc.netrc(os.path.join(os.curdir, ".netrc")) + except netrc.NetrcParseError as err: + logger.debug(err) + + def _parse_url( + self, url: str + ) -> tuple[Optional[str], Optional[str], Optional[str], str]: + parse = urlparse(url) + user = parse.username + passwd = parse.password + host = parse.hostname + path = parse.path + if parse.scheme == "ftp": + if not user and self.netrc: + if host is not None: + creds = self.netrc.authenticators(host) + if creds: + user, _, passwd = creds + if not user: + if host is not None: + user, passwd = self._recall_credentials(host) + if passwd is None: + passwd = "anonymous@" + if user is None: + user = "anonymous" + + return host, user, passwd, path + + def _connect(self, url: str) -> Optional[ftplib.FTP]: + parse = urlparse(url) + if parse.scheme == "ftp": + host, user, passwd, _ = self._parse_url(url) + if host is None: + # there has to be a host + return None + if (host, user, passwd) in self.cache: + if self.cache[(host, user, passwd)].pwd(): + return self.cache[(host, user, passwd)] + ftp = ftplib.FTP_TLS() + ftp.set_debuglevel(1 if logger.isEnabledFor(logging.DEBUG) else 0) + ftp.connect(host) + ftp.login(user or "", passwd or "", secure=not self.insecure) + self.cache[(host, user, passwd)] = ftp + return ftp + return None + + def _recall_credentials( + self, desired_host: str + ) -> tuple[Optional[str], Optional[str]]: + for host, user, passwd in self.cache: + if desired_host == host: + return user, passwd + return None, None + + def size(self, fn: str) -> Optional[int]: + ftp = self._connect(fn) + if ftp: + host, user, passwd, path = self._parse_url(fn) + try: + return ftp.size(path) + except ftplib.all_errors: + if host is None: + # no host + return None + handle = urlopen("ftp://{}:{}@{}/{}".format(user, passwd, host, path)) + info = handle.info() + handle.close() + if "Content-length" in info: + return int(info["Content-length"]) + return None + + return None From 176381c564857de127a24dc49c7d6c63de999d8b Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 25 Oct 2024 12:34:01 -0700 Subject: [PATCH 02/14] FTP support --- docs/appendices/environment_vars.rst | 11 ++++ src/toil/jobStores/abstractJobStore.py | 80 ++++++++++++++++++++++++-- 2 files changed, 85 insertions(+), 6 deletions(-) diff --git a/docs/appendices/environment_vars.rst b/docs/appendices/environment_vars.rst index b6b2889789..948665bbe3 100644 --- a/docs/appendices/environment_vars.rst +++ b/docs/appendices/environment_vars.rst @@ -217,6 +217,17 @@ There are several environment variables that affect the way Toil runs. | | to S3 (``True`` by default). | | | Example: ``TOIL_S3_USE_SSL=False`` | +----------------------------------+----------------------------------------------------+ +| TOIL_FTP_USER | The FTP username to override all FTP logins with | +| | Example: ``TOIL_FTP_USER=ftp_user`` | ++----------------------------------+----------------------------------------------------+ +| TOIL_FTP_PASSWORD | The FTP password to override all FTP logins with | +| | Example: ``TOIL_FTP_PASSWORD=ftp_password`` | ++----------------------------------+----------------------------------------------------+ +| TOIL_FTP_INSECURE | Enable or disable connecting to all FTP | +| | connections in insecure mode | +| | to S3 (``False`` by default). | +| | Example: ``TOIL_FTP_INSECURE=True`` | ++----------------------------------+----------------------------------------------------+ | TOIL_WES_BROKER_URL | An optional broker URL to use to communicate | | | between the WES server and Celery task queue. If | | | unset, ``amqp://guest:guest@localhost:5672//`` is | diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index 230430d702..b0813f566c 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -49,6 +49,7 @@ ServiceJobDescription, ) from toil.lib.compatibility import deprecated +from toil.lib.conversions import strtobool from toil.lib.io import WriteWatchingStream from toil.lib.memoize import memoize from toil.lib.retry import ErrorCondition, retry @@ -1874,15 +1875,26 @@ class JobStoreSupport(AbstractJobStore, metaclass=ABCMeta): ftp = None + @classmethod + def _setup_ftp(cls) -> None: + if cls.ftp is None: + cls.ftp = FtpFsAccess(insecure=strtobool(os.environ.get('TOIL_FTP_INSECURE', 'False')) is True) + @classmethod def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: return url.scheme.lower() in ("http", "https", "ftp") and not export @classmethod def _url_exists(cls, url: ParseResult) -> bool: + # Deal with FTP first to support user/password auth + if url.scheme.lower() == "ftp": + cls._setup_ftp() + # mypy is unable to understand that ftp must exist by this point + assert cls.ftp is not None + return cls.ftp.exists(url.geturl()) + try: - # TODO: Figure out how to HEAD instead of this. - with cls._open_url(url): + with closing(urlopen(Request(url.geturl(), method="HEAD"))): return True except FileNotFoundError: return False @@ -1898,9 +1910,10 @@ def _url_exists(cls, url: ParseResult) -> bool: ) def _get_size(cls, url: ParseResult) -> Optional[int]: if url.scheme.lower() == "ftp": - if cls.ftp is None: - cls.ftp = FtpFsAccess() - cls.ftp.size(url.geturl()) + cls._setup_ftp() + # mypy is unable to understand that ftp must exist by this point + assert cls.ftp is not None + return cls.ftp.size(url.geturl()) # just read the header for content length resp = urlopen(Request(url.geturl(), method="HEAD")) @@ -1938,6 +1951,14 @@ def count(l: int) -> None: ] ) def _open_url(cls, url: ParseResult) -> IO[bytes]: + # Deal with FTP first so we support user/password auth + if url.scheme.lower() == "ftp": + cls._setup_ftp() + # mypy is unable to understand that ftp must exist by this point + assert cls.ftp is not None + # we open in read mode as write mode is not supported + return cls.ftp.open(url.geturl(), mode="r") + try: return cast(IO[bytes], closing(urlopen(url.geturl()))) except HTTPError as e: @@ -1989,6 +2010,42 @@ def __init__( except netrc.NetrcParseError as err: logger.debug(err) + def exists(self, fn: str) -> bool: + return self.isfile(fn) or self.isdir(fn) + + def isfile(self, fn: str) -> bool: + ftp = self._connect(fn) + if ftp: + try: + if not self.size(fn) is None: + return True + else: + return False + except ftplib.all_errors: + return False + return False + + def isdir(self, fn: str) -> bool: + ftp = self._connect(fn) + if ftp: + try: + cwd = ftp.pwd() + ftp.cwd(urlparse(fn).path) + ftp.cwd(cwd) + return True + except ftplib.all_errors: + return False + return False + + def open(self, fn: str, mode: str) -> IO[bytes]: + if 'r' in mode: + host, user, passwd, path = self._parse_url(fn) + handle = urlopen( + "ftp://{}:{}@{}/{}".format(user, passwd, host, path)) + return cast(IO[bytes], closing(handle)) + # TODO: support write mode + raise Exception('Write mode FTP not implemented') + def _parse_url( self, url: str ) -> tuple[Optional[str], Optional[str], Optional[str], str]: @@ -2026,6 +2083,12 @@ def _connect(self, url: str) -> Optional[ftplib.FTP]: ftp = ftplib.FTP_TLS() ftp.set_debuglevel(1 if logger.isEnabledFor(logging.DEBUG) else 0) ftp.connect(host) + env_user = os.getenv("TOIL_FTP_USER") + env_passwd = os.getenv("TOIL_FTP_PASSWORD") + if env_user: + user = env_user + if env_passwd: + passwd = env_passwd ftp.login(user or "", passwd or "", secure=not self.insecure) self.cache[(host, user, passwd)] = ftp return ftp @@ -2045,7 +2108,12 @@ def size(self, fn: str) -> Optional[int]: host, user, passwd, path = self._parse_url(fn) try: return ftp.size(path) - except ftplib.all_errors: + except ftplib.all_errors as e: + if str(e) == "550 SIZE not allowed in ASCII mode": + # some servers don't allow grabbing size in ascii mode + # https://stackoverflow.com/questions/22090001/get-folder-size-using-ftplib/22093848#22093848 + ftp.voidcmd("TYPE I") + return ftp.size(path) if host is None: # no host return None From af46642b9c1230049bb3e8bf3184b40f7571e051 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 25 Oct 2024 12:34:09 -0700 Subject: [PATCH 03/14] strtobool for hardcoded strings for AWS env vars --- src/toil/lib/aws/utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/toil/lib/aws/utils.py b/src/toil/lib/aws/utils.py index a742a9e8ae..a6b2a2e0a3 100644 --- a/src/toil/lib/aws/utils.py +++ b/src/toil/lib/aws/utils.py @@ -20,6 +20,7 @@ from urllib.parse import ParseResult from toil.lib.aws import AWSRegionName, AWSServerErrors, session +from toil.lib.conversions import strtobool from toil.lib.misc import printq from toil.lib.retry import ( DEFAULT_DELAYS, @@ -363,7 +364,7 @@ def get_object_for_url(url: ParseResult, existing: Optional[bool] = None) -> "S3 host = os.environ.get("TOIL_S3_HOST", None) port = os.environ.get("TOIL_S3_PORT", None) protocol = "https" - if os.environ.get("TOIL_S3_USE_SSL", True) == "False": + if strtobool(os.environ.get("TOIL_S3_USE_SSL", 'True')) is False: protocol = "http" if host: endpoint_url = f"{protocol}://{host}" + f":{port}" if port else "" @@ -425,7 +426,7 @@ def list_objects_for_url(url: ParseResult) -> list[str]: host = os.environ.get("TOIL_S3_HOST", None) port = os.environ.get("TOIL_S3_PORT", None) protocol = "https" - if os.environ.get("TOIL_S3_USE_SSL", True) == "False": + if strtobool(os.environ.get("TOIL_S3_USE_SSL", 'True')) is False: protocol = "http" if host: endpoint_url = f"{protocol}://{host}" + f":{port}" if port else "" From 1ea4e3d2e8551a9fd6621352f17857e011f54579 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 25 Oct 2024 12:38:09 -0700 Subject: [PATCH 04/14] typo --- docs/appendices/environment_vars.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/appendices/environment_vars.rst b/docs/appendices/environment_vars.rst index 948665bbe3..8f16e45489 100644 --- a/docs/appendices/environment_vars.rst +++ b/docs/appendices/environment_vars.rst @@ -225,7 +225,7 @@ There are several environment variables that affect the way Toil runs. +----------------------------------+----------------------------------------------------+ | TOIL_FTP_INSECURE | Enable or disable connecting to all FTP | | | connections in insecure mode | -| | to S3 (``False`` by default). | +| | (``False`` by default). | | | Example: ``TOIL_FTP_INSECURE=True`` | +----------------------------------+----------------------------------------------------+ | TOIL_WES_BROKER_URL | An optional broker URL to use to communicate | From 754b43c3039c26c0a0d0204625d0284dedc97df8 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 31 Oct 2024 16:56:36 -0700 Subject: [PATCH 05/14] Change envvar to USE_SSL and move to separate file to put original copyright license in --- docs/appendices/environment_vars.rst | 8 +- src/toil/jobStores/abstractJobStore.py | 144 +------------------------ 2 files changed, 6 insertions(+), 146 deletions(-) diff --git a/docs/appendices/environment_vars.rst b/docs/appendices/environment_vars.rst index 8f16e45489..d70ac02a53 100644 --- a/docs/appendices/environment_vars.rst +++ b/docs/appendices/environment_vars.rst @@ -223,10 +223,10 @@ There are several environment variables that affect the way Toil runs. | TOIL_FTP_PASSWORD | The FTP password to override all FTP logins with | | | Example: ``TOIL_FTP_PASSWORD=ftp_password`` | +----------------------------------+----------------------------------------------------+ -| TOIL_FTP_INSECURE | Enable or disable connecting to all FTP | -| | connections in insecure mode | -| | (``False`` by default). | -| | Example: ``TOIL_FTP_INSECURE=True`` | +| TOIL_FTP_USE_SSL | Enable or disable usage of SSL for connecting to | +| | FTP servers | +| | (``True`` by default). | +| | Example: ``TOIL_FTP_USE_SSL=False`` | +----------------------------------+----------------------------------------------------+ | TOIL_WES_BROKER_URL | An optional broker URL to use to communicate | | | between the WES server and Celery task queue. If | diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index 3cf2e416fd..e5e8ee6d84 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -11,9 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import ftplib import logging -import netrc import os import pickle import re @@ -48,6 +46,7 @@ JobException, ServiceJobDescription, ) +from toil.jobStores.ftp_utils import FtpFsAccess from toil.lib.compatibility import deprecated from toil.lib.conversions import strtobool from toil.lib.io import WriteWatchingStream @@ -1880,7 +1879,7 @@ class JobStoreSupport(AbstractJobStore, metaclass=ABCMeta): @classmethod def _setup_ftp(cls) -> None: if cls.ftp is None: - cls.ftp = FtpFsAccess(insecure=strtobool(os.environ.get('TOIL_FTP_INSECURE', 'False')) is True) + cls.ftp = FtpFsAccess(insecure=strtobool(os.environ.get('TOIL_FTP_USE_SSL', 'True')) is False) @classmethod def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: @@ -1988,142 +1987,3 @@ def _get_is_directory(cls, url: ParseResult) -> bool: def _list_url(cls, url: ParseResult) -> list[str]: # TODO: Implement HTTP index parsing and FTP directory listing raise NotImplementedError("HTTP and FTP URLs cannot yet be listed") - - -class FtpFsAccess: - """ - FTP access with upload. - - Taken and modified from https://github.com/ohsu-comp-bio/cwl-tes/blob/03f0096f9fae8acd527687d3460a726e09190c3a/cwl_tes/ftp.py#L8 - """ - - def __init__( - self, cache: Optional[dict[Any, ftplib.FTP]] = None, insecure: bool = False - ): - self.cache = cache or {} - self.netrc = None - self.insecure = insecure - try: - if "HOME" in os.environ: - if os.path.exists(os.path.join(os.environ["HOME"], ".netrc")): - self.netrc = netrc.netrc(os.path.join(os.environ["HOME"], ".netrc")) - elif os.path.exists(os.path.join(os.curdir, ".netrc")): - self.netrc = netrc.netrc(os.path.join(os.curdir, ".netrc")) - except netrc.NetrcParseError as err: - logger.debug(err) - - def exists(self, fn: str) -> bool: - return self.isfile(fn) or self.isdir(fn) - - def isfile(self, fn: str) -> bool: - ftp = self._connect(fn) - if ftp: - try: - if not self.size(fn) is None: - return True - else: - return False - except ftplib.all_errors: - return False - return False - - def isdir(self, fn: str) -> bool: - ftp = self._connect(fn) - if ftp: - try: - cwd = ftp.pwd() - ftp.cwd(urlparse(fn).path) - ftp.cwd(cwd) - return True - except ftplib.all_errors: - return False - return False - - def open(self, fn: str, mode: str) -> IO[bytes]: - if 'r' in mode: - host, user, passwd, path = self._parse_url(fn) - handle = urlopen( - "ftp://{}:{}@{}/{}".format(user, passwd, host, path)) - return cast(IO[bytes], closing(handle)) - # TODO: support write mode - raise Exception('Write mode FTP not implemented') - - def _parse_url( - self, url: str - ) -> tuple[Optional[str], Optional[str], Optional[str], str]: - parse = urlparse(url) - user = parse.username - passwd = parse.password - host = parse.hostname - path = parse.path - if parse.scheme == "ftp": - if not user and self.netrc: - if host is not None: - creds = self.netrc.authenticators(host) - if creds: - user, _, passwd = creds - if not user: - if host is not None: - user, passwd = self._recall_credentials(host) - if passwd is None: - passwd = "anonymous@" - if user is None: - user = "anonymous" - - return host, user, passwd, path - - def _connect(self, url: str) -> Optional[ftplib.FTP]: - parse = urlparse(url) - if parse.scheme == "ftp": - host, user, passwd, _ = self._parse_url(url) - if host is None: - # there has to be a host - return None - if (host, user, passwd) in self.cache: - if self.cache[(host, user, passwd)].pwd(): - return self.cache[(host, user, passwd)] - ftp = ftplib.FTP_TLS() - ftp.set_debuglevel(1 if logger.isEnabledFor(logging.DEBUG) else 0) - ftp.connect(host) - env_user = os.getenv("TOIL_FTP_USER") - env_passwd = os.getenv("TOIL_FTP_PASSWORD") - if env_user: - user = env_user - if env_passwd: - passwd = env_passwd - ftp.login(user or "", passwd or "", secure=not self.insecure) - self.cache[(host, user, passwd)] = ftp - return ftp - return None - - def _recall_credentials( - self, desired_host: str - ) -> tuple[Optional[str], Optional[str]]: - for host, user, passwd in self.cache: - if desired_host == host: - return user, passwd - return None, None - - def size(self, fn: str) -> Optional[int]: - ftp = self._connect(fn) - if ftp: - host, user, passwd, path = self._parse_url(fn) - try: - return ftp.size(path) - except ftplib.all_errors as e: - if str(e) == "550 SIZE not allowed in ASCII mode": - # some servers don't allow grabbing size in ascii mode - # https://stackoverflow.com/questions/22090001/get-folder-size-using-ftplib/22093848#22093848 - ftp.voidcmd("TYPE I") - return ftp.size(path) - if host is None: - # no host - return None - handle = urlopen("ftp://{}:{}@{}/{}".format(user, passwd, host, path)) - info = handle.info() - handle.close() - if "Content-length" in info: - return int(info["Content-length"]) - return None - - return None From 5ed71cf95a7456ba3c995e67286bf4413e7a2256 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 31 Oct 2024 17:27:11 -0700 Subject: [PATCH 06/14] Forgot to add file --- src/toil/jobStores/ftp_utils.py | 214 ++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 src/toil/jobStores/ftp_utils.py diff --git a/src/toil/jobStores/ftp_utils.py b/src/toil/jobStores/ftp_utils.py new file mode 100644 index 0000000000..894fbff3e8 --- /dev/null +++ b/src/toil/jobStores/ftp_utils.py @@ -0,0 +1,214 @@ +# Copyright 2017 Oregon Health and Science University +# +# Copyright (C) 2015-2021 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import ftplib +import logging +import netrc +import os +from contextlib import closing +from typing import Optional, Any, cast, IO +from urllib.parse import urlparse +from urllib.request import urlopen + +logger = logging.getLogger(__name__) + + +class FtpFsAccess: + """ + FTP access with upload. + + Taken and modified from https://github.com/ohsu-comp-bio/cwl-tes/blob/03f0096f9fae8acd527687d3460a726e09190c3a/cwl_tes/ftp.py#L37-L251 + """ + + def __init__( + self, cache: Optional[dict[Any, ftplib.FTP]] = None, insecure: bool = False + ): + """ + FTP object to handle FTP connections. By default, connect over FTP with TLS. + + :param cache: cache of generated FTP objects + :param insecure: Whether to connect over FTP with TLS + """ + self.cache = cache or {} + self.netrc = None + self.insecure = insecure + try: + if "HOME" in os.environ: + if os.path.exists(os.path.join(os.environ["HOME"], ".netrc")): + self.netrc = netrc.netrc(os.path.join(os.environ["HOME"], ".netrc")) + elif os.path.exists(os.path.join(os.curdir, ".netrc")): + self.netrc = netrc.netrc(os.path.join(os.curdir, ".netrc")) + except netrc.NetrcParseError as err: + logger.debug(err) + + def exists(self, fn: str) -> bool: + """ + Check if a file/directory exists over an FTP server + :param fn: FTP url + :return: True or false depending on whether the object exists on the server + """ + return self.isfile(fn) or self.isdir(fn) + + def isfile(self, fn: str) -> bool: + """ + Check if the FTP url points to a file + :param fn: FTP url + :return: True if url is file, else false + """ + ftp = self._connect(fn) + if ftp: + try: + if not self.size(fn) is None: + return True + else: + return False + except ftplib.all_errors: + return False + return False + + def isdir(self, fn: str) -> bool: + """ + Check if the FTP url points to a directory + :param fn: FTP url + :return: True if url is directory, else false + """ + ftp = self._connect(fn) + if ftp: + try: + cwd = ftp.pwd() + ftp.cwd(urlparse(fn).path) + ftp.cwd(cwd) + return True + except ftplib.all_errors: + return False + return False + + def open(self, fn: str, mode: str) -> IO[bytes]: + """ + Open an FTP url. + + Only supports reading, no write support. + :param fn: FTP url + :param mode: Mode to open FTP url in + :return: + """ + if "r" in mode: + host, user, passwd, path = self._parse_url(fn) + handle = urlopen("ftp://{}:{}@{}/{}".format(user, passwd, host, path)) + return cast(IO[bytes], closing(handle)) + # TODO: support write mode + raise Exception("Write mode FTP not implemented") + + def _parse_url( + self, url: str + ) -> tuple[Optional[str], Optional[str], Optional[str], str]: + """ + Parse an FTP url into hostname, username, password, and path + :param url: + :return: hostname, username, password, path + """ + parse = urlparse(url) + user = parse.username + passwd = parse.password + host = parse.hostname + path = parse.path + if parse.scheme == "ftp": + if not user and self.netrc: + if host is not None: + creds = self.netrc.authenticators(host) + if creds: + user, _, passwd = creds + if not user: + if host is not None: + user, passwd = self._recall_credentials(host) + if passwd is None: + passwd = "anonymous@" + if user is None: + user = "anonymous" + + return host, user, passwd, path + + def _connect(self, url: str) -> Optional[ftplib.FTP]: + """ + Connect to an FTP server. Handles authentication. + :param url: FTP url + :return: FTP object + """ + parse = urlparse(url) + if parse.scheme == "ftp": + host, user, passwd, _ = self._parse_url(url) + if host is None: + # there has to be a host + return None + if (host, user, passwd) in self.cache: + if self.cache[(host, user, passwd)].pwd(): + return self.cache[(host, user, passwd)] + ftp = ftplib.FTP_TLS() + ftp.set_debuglevel(1 if logger.isEnabledFor(logging.DEBUG) else 0) + ftp.connect(host) + env_user = os.getenv("TOIL_FTP_USER") + env_passwd = os.getenv("TOIL_FTP_PASSWORD") + if env_user: + user = env_user + if env_passwd: + passwd = env_passwd + ftp.login(user or "", passwd or "", secure=not self.insecure) + self.cache[(host, user, passwd)] = ftp + return ftp + return None + + def _recall_credentials( + self, desired_host: str + ) -> tuple[Optional[str], Optional[str]]: + """ + Grab the cached credentials + :param desired_host: FTP hostname + :return: username, password + """ + for host, user, passwd in self.cache: + if desired_host == host: + return user, passwd + return None, None + + def size(self, fn: str) -> Optional[int]: + """ + Get the size of an FTP object + :param fn: FTP url + :return: Size of object + """ + ftp = self._connect(fn) + if ftp: + host, user, passwd, path = self._parse_url(fn) + try: + return ftp.size(path) + except ftplib.all_errors as e: + if str(e) == "550 SIZE not allowed in ASCII mode": + # some servers don't allow grabbing size in ascii mode + # https://stackoverflow.com/questions/22090001/get-folder-size-using-ftplib/22093848#22093848 + ftp.voidcmd("TYPE I") + return ftp.size(path) + if host is None: + # no host + return None + handle = urlopen("ftp://{}:{}@{}/{}".format(user, passwd, host, path)) + info = handle.info() + handle.close() + if "Content-length" in info: + return int(info["Content-length"]) + return None + + return None From 17536ba4241d1eafa849797adebb04ae013c755f Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 31 Oct 2024 17:41:23 -0700 Subject: [PATCH 07/14] missing space in rst --- docs/appendices/environment_vars.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/appendices/environment_vars.rst b/docs/appendices/environment_vars.rst index d70ac02a53..561ff4cb5a 100644 --- a/docs/appendices/environment_vars.rst +++ b/docs/appendices/environment_vars.rst @@ -225,7 +225,7 @@ There are several environment variables that affect the way Toil runs. +----------------------------------+----------------------------------------------------+ | TOIL_FTP_USE_SSL | Enable or disable usage of SSL for connecting to | | | FTP servers | -| | (``True`` by default). | +| | (``True`` by default). | | | Example: ``TOIL_FTP_USE_SSL=False`` | +----------------------------------+----------------------------------------------------+ | TOIL_WES_BROKER_URL | An optional broker URL to use to communicate | From 0f38d5858e0aa0b53dbdc2df896fbedda9543835 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 8 Nov 2024 11:21:14 -0800 Subject: [PATCH 08/14] Don't keep FTP objects around forever and disable FTP SSL by default --- docs/appendices/environment_vars.rst | 4 ++-- src/toil/jobStores/abstractJobStore.py | 27 ++++++++---------------- src/toil/{jobStores => lib}/ftp_utils.py | 0 3 files changed, 11 insertions(+), 20 deletions(-) rename src/toil/{jobStores => lib}/ftp_utils.py (100%) diff --git a/docs/appendices/environment_vars.rst b/docs/appendices/environment_vars.rst index 561ff4cb5a..f1d700d49a 100644 --- a/docs/appendices/environment_vars.rst +++ b/docs/appendices/environment_vars.rst @@ -225,8 +225,8 @@ There are several environment variables that affect the way Toil runs. +----------------------------------+----------------------------------------------------+ | TOIL_FTP_USE_SSL | Enable or disable usage of SSL for connecting to | | | FTP servers | -| | (``True`` by default). | -| | Example: ``TOIL_FTP_USE_SSL=False`` | +| | (``False`` by default). | +| | Example: ``TOIL_FTP_USE_SSL=True`` | +----------------------------------+----------------------------------------------------+ | TOIL_WES_BROKER_URL | An optional broker URL to use to communicate | | | between the WES server and Celery task queue. If | diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index e5e8ee6d84..37b9b23a09 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -46,7 +46,7 @@ JobException, ServiceJobDescription, ) -from toil.jobStores.ftp_utils import FtpFsAccess +from toil.lib.ftp_utils import FtpFsAccess from toil.lib.compatibility import deprecated from toil.lib.conversions import strtobool from toil.lib.io import WriteWatchingStream @@ -1874,12 +1874,9 @@ class JobStoreSupport(AbstractJobStore, metaclass=ABCMeta): stores. """ - ftp = None - @classmethod - def _setup_ftp(cls) -> None: - if cls.ftp is None: - cls.ftp = FtpFsAccess(insecure=strtobool(os.environ.get('TOIL_FTP_USE_SSL', 'True')) is False) + def _setup_ftp(cls) -> FtpFsAccess: + return FtpFsAccess(insecure=strtobool(os.environ.get('TOIL_FTP_USE_SSL', 'False')) is False) @classmethod def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: @@ -1889,10 +1886,8 @@ def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: def _url_exists(cls, url: ParseResult) -> bool: # Deal with FTP first to support user/password auth if url.scheme.lower() == "ftp": - cls._setup_ftp() - # mypy is unable to understand that ftp must exist by this point - assert cls.ftp is not None - return cls.ftp.exists(url.geturl()) + ftp = cls._setup_ftp() + return ftp.exists(url.geturl()) try: with closing(urlopen(Request(url.geturl(), method="HEAD"))): @@ -1911,10 +1906,8 @@ def _url_exists(cls, url: ParseResult) -> bool: ) def _get_size(cls, url: ParseResult) -> Optional[int]: if url.scheme.lower() == "ftp": - cls._setup_ftp() - # mypy is unable to understand that ftp must exist by this point - assert cls.ftp is not None - return cls.ftp.size(url.geturl()) + ftp = cls._setup_ftp() + return ftp.size(url.geturl()) # just read the header for content length resp = urlopen(Request(url.geturl(), method="HEAD")) @@ -1954,11 +1947,9 @@ def count(l: int) -> None: def _open_url(cls, url: ParseResult) -> IO[bytes]: # Deal with FTP first so we support user/password auth if url.scheme.lower() == "ftp": - cls._setup_ftp() - # mypy is unable to understand that ftp must exist by this point - assert cls.ftp is not None + ftp = cls._setup_ftp() # we open in read mode as write mode is not supported - return cls.ftp.open(url.geturl(), mode="r") + return ftp.open(url.geturl(), mode="r") try: return cast(IO[bytes], closing(urlopen(url.geturl()))) diff --git a/src/toil/jobStores/ftp_utils.py b/src/toil/lib/ftp_utils.py similarity index 100% rename from src/toil/jobStores/ftp_utils.py rename to src/toil/lib/ftp_utils.py From a2105af76c086678e854637311d8d24b4be53ee3 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 14 Nov 2024 12:22:19 -0800 Subject: [PATCH 09/14] Fix HTTP errors + add prot_p in secure mode --- src/toil/jobStores/abstractJobStore.py | 7 +++++-- src/toil/lib/ftp_utils.py | 5 +++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index 37b9b23a09..0ebc5aa97c 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -1892,8 +1892,11 @@ def _url_exists(cls, url: ParseResult) -> bool: try: with closing(urlopen(Request(url.geturl(), method="HEAD"))): return True - except FileNotFoundError: - return False + except HTTPError as e: + if e.code in (404, 410): + return False + else: + raise # Any other errors we should pass through because something really went # wrong (e.g. server is broken today but file may usually exist) diff --git a/src/toil/lib/ftp_utils.py b/src/toil/lib/ftp_utils.py index 894fbff3e8..3d7e4d3b2d 100644 --- a/src/toil/lib/ftp_utils.py +++ b/src/toil/lib/ftp_utils.py @@ -125,6 +125,9 @@ def _parse_url( user = parse.username passwd = parse.password host = parse.hostname + if parse.port is not None: + # Don't forget the port + host += f":{parse.port}" path = parse.path if parse.scheme == "ftp": if not user and self.netrc: @@ -167,6 +170,8 @@ def _connect(self, url: str) -> Optional[ftplib.FTP]: if env_passwd: passwd = env_passwd ftp.login(user or "", passwd or "", secure=not self.insecure) + if self.insecure is False: + ftp.prot_p() self.cache[(host, user, passwd)] = ftp return ftp return None From 9f862770213298d3f5944948cc24c4359ec03820 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 14 Nov 2024 13:08:14 -0800 Subject: [PATCH 10/14] Always try to use SSL before falling back, raising if user enforces SSL --- src/toil/lib/ftp_utils.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/toil/lib/ftp_utils.py b/src/toil/lib/ftp_utils.py index 3d7e4d3b2d..ac7c5edb84 100644 --- a/src/toil/lib/ftp_utils.py +++ b/src/toil/lib/ftp_utils.py @@ -161,6 +161,7 @@ def _connect(self, url: str) -> Optional[ftplib.FTP]: if self.cache[(host, user, passwd)].pwd(): return self.cache[(host, user, passwd)] ftp = ftplib.FTP_TLS() + # Note: the FTP lib logger handles logging itself and doesn't go through our logging implementation ftp.set_debuglevel(1 if logger.isEnabledFor(logging.DEBUG) else 0) ftp.connect(host) env_user = os.getenv("TOIL_FTP_USER") @@ -169,9 +170,19 @@ def _connect(self, url: str) -> Optional[ftplib.FTP]: user = env_user if env_passwd: passwd = env_passwd - ftp.login(user or "", passwd or "", secure=not self.insecure) - if self.insecure is False: - ftp.prot_p() + try: + # Always try a SSL connection first + ftp.login(user or "", passwd or "", secure=True) + if self.insecure is False: + ftp.prot_p() + except ftplib.error_perm as e: + # SSL failed, consult the insecure flag + if self.insecure: + # If the user has not forced toil to always use SSL, fallback to insecure + ftp.login(user or "", passwd or "", secure=False) + else: + # Else raise an error + raise self.cache[(host, user, passwd)] = ftp return ftp return None From 91f1cbd6b36843f1c44fa4e62082e2ed8c89ff07 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 14 Nov 2024 13:09:21 -0800 Subject: [PATCH 11/14] Change environment_vars.rst to reflect enforcement --- docs/appendices/environment_vars.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/appendices/environment_vars.rst b/docs/appendices/environment_vars.rst index f1d700d49a..b8945e7666 100644 --- a/docs/appendices/environment_vars.rst +++ b/docs/appendices/environment_vars.rst @@ -223,7 +223,7 @@ There are several environment variables that affect the way Toil runs. | TOIL_FTP_PASSWORD | The FTP password to override all FTP logins with | | | Example: ``TOIL_FTP_PASSWORD=ftp_password`` | +----------------------------------+----------------------------------------------------+ -| TOIL_FTP_USE_SSL | Enable or disable usage of SSL for connecting to | +| TOIL_FTP_USE_SSL | Enforce usage of SSL for connecting to | | | FTP servers | | | (``False`` by default). | | | Example: ``TOIL_FTP_USE_SSL=True`` | From 5884417c1e7bc7741f2f54fcae7508fd651a0558 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 14 Nov 2024 16:35:53 -0800 Subject: [PATCH 12/14] add proper port suport --- src/toil/lib/ftp_utils.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/toil/lib/ftp_utils.py b/src/toil/lib/ftp_utils.py index ac7c5edb84..ca56e12dee 100644 --- a/src/toil/lib/ftp_utils.py +++ b/src/toil/lib/ftp_utils.py @@ -107,15 +107,15 @@ def open(self, fn: str, mode: str) -> IO[bytes]: :return: """ if "r" in mode: - host, user, passwd, path = self._parse_url(fn) - handle = urlopen("ftp://{}:{}@{}/{}".format(user, passwd, host, path)) + host, port, user, passwd, path = self._parse_url(fn) + handle = urlopen("ftp://{}:{}@{}:{}/{}".format(user, passwd, host, port, path)) return cast(IO[bytes], closing(handle)) # TODO: support write mode raise Exception("Write mode FTP not implemented") def _parse_url( self, url: str - ) -> tuple[Optional[str], Optional[str], Optional[str], str]: + ) -> tuple[str, int, Optional[str], Optional[str], str]: """ Parse an FTP url into hostname, username, password, and path :param url: @@ -125,10 +125,14 @@ def _parse_url( user = parse.username passwd = parse.password host = parse.hostname - if parse.port is not None: - # Don't forget the port - host += f":{parse.port}" + port = parse.port path = parse.path + if host is None: + # The URL we connect to must have a host + raise RuntimeError(f"FTP URL does not contain a host: {url}") + # default port is 21 + if port is None: + port = 21 if parse.scheme == "ftp": if not user and self.netrc: if host is not None: @@ -142,8 +146,7 @@ def _parse_url( passwd = "anonymous@" if user is None: user = "anonymous" - - return host, user, passwd, path + return host, port, user, passwd, path def _connect(self, url: str) -> Optional[ftplib.FTP]: """ @@ -153,7 +156,7 @@ def _connect(self, url: str) -> Optional[ftplib.FTP]: """ parse = urlparse(url) if parse.scheme == "ftp": - host, user, passwd, _ = self._parse_url(url) + host, port, user, passwd, _ = self._parse_url(url) if host is None: # there has to be a host return None @@ -163,7 +166,7 @@ def _connect(self, url: str) -> Optional[ftplib.FTP]: ftp = ftplib.FTP_TLS() # Note: the FTP lib logger handles logging itself and doesn't go through our logging implementation ftp.set_debuglevel(1 if logger.isEnabledFor(logging.DEBUG) else 0) - ftp.connect(host) + ftp.connect(host, port) env_user = os.getenv("TOIL_FTP_USER") env_passwd = os.getenv("TOIL_FTP_PASSWORD") if env_user: @@ -208,7 +211,7 @@ def size(self, fn: str) -> Optional[int]: """ ftp = self._connect(fn) if ftp: - host, user, passwd, path = self._parse_url(fn) + host, port, user, passwd, path = self._parse_url(fn) try: return ftp.size(path) except ftplib.all_errors as e: @@ -217,10 +220,7 @@ def size(self, fn: str) -> Optional[int]: # https://stackoverflow.com/questions/22090001/get-folder-size-using-ftplib/22093848#22093848 ftp.voidcmd("TYPE I") return ftp.size(path) - if host is None: - # no host - return None - handle = urlopen("ftp://{}:{}@{}/{}".format(user, passwd, host, path)) + handle = urlopen("ftp://{}:{}@{}:{}/{}".format(user, passwd, host, port, path)) info = handle.info() handle.close() if "Content-length" in info: From 8fe81ccc4d0a115bd7e126ebfc823c99085bccea Mon Sep 17 00:00:00 2001 From: stxue1 Date: Wed, 20 Nov 2024 14:30:03 -0800 Subject: [PATCH 13/14] Remove FTP SSL support and add comments --- src/toil/jobStores/abstractJobStore.py | 4 +++- src/toil/lib/ftp_utils.py | 19 +++---------------- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index 0ebc5aa97c..dc3bdb2cf7 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -1876,7 +1876,9 @@ class JobStoreSupport(AbstractJobStore, metaclass=ABCMeta): @classmethod def _setup_ftp(cls) -> FtpFsAccess: - return FtpFsAccess(insecure=strtobool(os.environ.get('TOIL_FTP_USE_SSL', 'False')) is False) + # FTP connections are not reused. Ideally, a thread should watch any reused FTP connections + # and close them when necessary + return FtpFsAccess() @classmethod def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: diff --git a/src/toil/lib/ftp_utils.py b/src/toil/lib/ftp_utils.py index ca56e12dee..50dbffc584 100644 --- a/src/toil/lib/ftp_utils.py +++ b/src/toil/lib/ftp_utils.py @@ -33,19 +33,18 @@ class FtpFsAccess: Taken and modified from https://github.com/ohsu-comp-bio/cwl-tes/blob/03f0096f9fae8acd527687d3460a726e09190c3a/cwl_tes/ftp.py#L37-L251 """ + # TODO: Properly support FTP over SSL def __init__( - self, cache: Optional[dict[Any, ftplib.FTP]] = None, insecure: bool = False + self, cache: Optional[dict[Any, ftplib.FTP]] = None ): """ FTP object to handle FTP connections. By default, connect over FTP with TLS. :param cache: cache of generated FTP objects - :param insecure: Whether to connect over FTP with TLS """ self.cache = cache or {} self.netrc = None - self.insecure = insecure try: if "HOME" in os.environ: if os.path.exists(os.path.join(os.environ["HOME"], ".netrc")): @@ -173,19 +172,7 @@ def _connect(self, url: str) -> Optional[ftplib.FTP]: user = env_user if env_passwd: passwd = env_passwd - try: - # Always try a SSL connection first - ftp.login(user or "", passwd or "", secure=True) - if self.insecure is False: - ftp.prot_p() - except ftplib.error_perm as e: - # SSL failed, consult the insecure flag - if self.insecure: - # If the user has not forced toil to always use SSL, fallback to insecure - ftp.login(user or "", passwd or "", secure=False) - else: - # Else raise an error - raise + ftp.login(user or "", passwd or "", secure=False) self.cache[(host, user, passwd)] = ftp return ftp return None From f499cafa49a9a7ecfc905fa05fd87888257be645 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Wed, 20 Nov 2024 14:48:48 -0800 Subject: [PATCH 14/14] Remove envvar --- docs/appendices/environment_vars.rst | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docs/appendices/environment_vars.rst b/docs/appendices/environment_vars.rst index b8945e7666..6d7c9246c8 100644 --- a/docs/appendices/environment_vars.rst +++ b/docs/appendices/environment_vars.rst @@ -223,11 +223,6 @@ There are several environment variables that affect the way Toil runs. | TOIL_FTP_PASSWORD | The FTP password to override all FTP logins with | | | Example: ``TOIL_FTP_PASSWORD=ftp_password`` | +----------------------------------+----------------------------------------------------+ -| TOIL_FTP_USE_SSL | Enforce usage of SSL for connecting to | -| | FTP servers | -| | (``False`` by default). | -| | Example: ``TOIL_FTP_USE_SSL=True`` | -+----------------------------------+----------------------------------------------------+ | TOIL_WES_BROKER_URL | An optional broker URL to use to communicate | | | between the WES server and Celery task queue. If | | | unset, ``amqp://guest:guest@localhost:5672//`` is |