diff --git a/docs/appendices/environment_vars.rst b/docs/appendices/environment_vars.rst index b6b2889789..6d7c9246c8 100644 --- a/docs/appendices/environment_vars.rst +++ b/docs/appendices/environment_vars.rst @@ -217,6 +217,12 @@ There are several environment variables that affect the way Toil runs. | | to S3 (``True`` by default). | | | Example: ``TOIL_S3_USE_SSL=False`` | +----------------------------------+----------------------------------------------------+ +| TOIL_FTP_USER | The FTP username to override all FTP logins with | +| | Example: ``TOIL_FTP_USER=ftp_user`` | ++----------------------------------+----------------------------------------------------+ +| TOIL_FTP_PASSWORD | The FTP password to override all FTP logins with | +| | Example: ``TOIL_FTP_PASSWORD=ftp_password`` | ++----------------------------------+----------------------------------------------------+ | TOIL_WES_BROKER_URL | An optional broker URL to use to communicate | | | between the WES server and Celery task queue. If | | | unset, ``amqp://guest:guest@localhost:5672//`` is | diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index d0ecca100f..dc3bdb2cf7 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -35,7 +35,7 @@ ) from urllib.error import HTTPError from urllib.parse import ParseResult, urlparse -from urllib.request import urlopen +from urllib.request import urlopen, Request from uuid import uuid4 from toil.common import Config, getNodeID, safeUnpickleFromStream @@ -46,7 +46,9 @@ JobException, ServiceJobDescription, ) +from toil.lib.ftp_utils import FtpFsAccess from toil.lib.compatibility import deprecated +from toil.lib.conversions import strtobool from toil.lib.io import WriteWatchingStream from toil.lib.memoize import memoize from toil.lib.retry import ErrorCondition, retry @@ -1872,18 +1874,31 @@ class JobStoreSupport(AbstractJobStore, metaclass=ABCMeta): stores. """ + @classmethod + def _setup_ftp(cls) -> FtpFsAccess: + # FTP connections are not reused. Ideally, a thread should watch any reused FTP connections + # and close them when necessary + return FtpFsAccess() + @classmethod def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: return url.scheme.lower() in ("http", "https", "ftp") and not export @classmethod def _url_exists(cls, url: ParseResult) -> bool: + # Deal with FTP first to support user/password auth + if url.scheme.lower() == "ftp": + ftp = cls._setup_ftp() + return ftp.exists(url.geturl()) + try: - # TODO: Figure out how to HEAD instead of this. - with cls._open_url(url): + with closing(urlopen(Request(url.geturl(), method="HEAD"))): return True - except FileNotFoundError: - return False + except HTTPError as e: + if e.code in (404, 410): + return False + else: + raise # Any other errors we should pass through because something really went # wrong (e.g. server is broken today but file may usually exist) @@ -1896,11 +1911,13 @@ def _url_exists(cls, url: ParseResult) -> bool: ) def _get_size(cls, url: ParseResult) -> Optional[int]: if url.scheme.lower() == "ftp": - return None - with closing(urlopen(url.geturl())) as readable: - # just read the header for content length - size = readable.info().get("content-length") - return int(size) if size is not None else None + ftp = cls._setup_ftp() + return ftp.size(url.geturl()) + + # just read the header for content length + resp = urlopen(Request(url.geturl(), method="HEAD")) + size = resp.info().get("content-length") + return int(size) if size is not None else None @classmethod def _read_from_url( @@ -1933,6 +1950,12 @@ def count(l: int) -> None: ] ) def _open_url(cls, url: ParseResult) -> IO[bytes]: + # Deal with FTP first so we support user/password auth + if url.scheme.lower() == "ftp": + ftp = cls._setup_ftp() + # we open in read mode as write mode is not supported + return ftp.open(url.geturl(), mode="r") + try: return cast(IO[bytes], closing(urlopen(url.geturl()))) except HTTPError as e: diff --git a/src/toil/lib/aws/utils.py b/src/toil/lib/aws/utils.py index a742a9e8ae..a6b2a2e0a3 100644 --- a/src/toil/lib/aws/utils.py +++ b/src/toil/lib/aws/utils.py @@ -20,6 +20,7 @@ from urllib.parse import ParseResult from toil.lib.aws import AWSRegionName, AWSServerErrors, session +from toil.lib.conversions import strtobool from toil.lib.misc import printq from toil.lib.retry import ( DEFAULT_DELAYS, @@ -363,7 +364,7 @@ def get_object_for_url(url: ParseResult, existing: Optional[bool] = None) -> "S3 host = os.environ.get("TOIL_S3_HOST", None) port = os.environ.get("TOIL_S3_PORT", None) protocol = "https" - if os.environ.get("TOIL_S3_USE_SSL", True) == "False": + if strtobool(os.environ.get("TOIL_S3_USE_SSL", 'True')) is False: protocol = "http" if host: endpoint_url = f"{protocol}://{host}" + f":{port}" if port else "" @@ -425,7 +426,7 @@ def list_objects_for_url(url: ParseResult) -> list[str]: host = os.environ.get("TOIL_S3_HOST", None) port = os.environ.get("TOIL_S3_PORT", None) protocol = "https" - if os.environ.get("TOIL_S3_USE_SSL", True) == "False": + if strtobool(os.environ.get("TOIL_S3_USE_SSL", 'True')) is False: protocol = "http" if host: endpoint_url = f"{protocol}://{host}" + f":{port}" if port else "" diff --git a/src/toil/lib/ftp_utils.py b/src/toil/lib/ftp_utils.py new file mode 100644 index 0000000000..50dbffc584 --- /dev/null +++ b/src/toil/lib/ftp_utils.py @@ -0,0 +1,217 @@ +# Copyright 2017 Oregon Health and Science University +# +# Copyright (C) 2015-2021 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import ftplib +import logging +import netrc +import os +from contextlib import closing +from typing import Optional, Any, cast, IO +from urllib.parse import urlparse +from urllib.request import urlopen + +logger = logging.getLogger(__name__) + + +class FtpFsAccess: + """ + FTP access with upload. + + Taken and modified from https://github.com/ohsu-comp-bio/cwl-tes/blob/03f0096f9fae8acd527687d3460a726e09190c3a/cwl_tes/ftp.py#L37-L251 + """ + # TODO: Properly support FTP over SSL + + def __init__( + self, cache: Optional[dict[Any, ftplib.FTP]] = None + ): + """ + FTP object to handle FTP connections. By default, connect over FTP with TLS. + + :param cache: cache of generated FTP objects + """ + self.cache = cache or {} + self.netrc = None + try: + if "HOME" in os.environ: + if os.path.exists(os.path.join(os.environ["HOME"], ".netrc")): + self.netrc = netrc.netrc(os.path.join(os.environ["HOME"], ".netrc")) + elif os.path.exists(os.path.join(os.curdir, ".netrc")): + self.netrc = netrc.netrc(os.path.join(os.curdir, ".netrc")) + except netrc.NetrcParseError as err: + logger.debug(err) + + def exists(self, fn: str) -> bool: + """ + Check if a file/directory exists over an FTP server + :param fn: FTP url + :return: True or false depending on whether the object exists on the server + """ + return self.isfile(fn) or self.isdir(fn) + + def isfile(self, fn: str) -> bool: + """ + Check if the FTP url points to a file + :param fn: FTP url + :return: True if url is file, else false + """ + ftp = self._connect(fn) + if ftp: + try: + if not self.size(fn) is None: + return True + else: + return False + except ftplib.all_errors: + return False + return False + + def isdir(self, fn: str) -> bool: + """ + Check if the FTP url points to a directory + :param fn: FTP url + :return: True if url is directory, else false + """ + ftp = self._connect(fn) + if ftp: + try: + cwd = ftp.pwd() + ftp.cwd(urlparse(fn).path) + ftp.cwd(cwd) + return True + except ftplib.all_errors: + return False + return False + + def open(self, fn: str, mode: str) -> IO[bytes]: + """ + Open an FTP url. + + Only supports reading, no write support. + :param fn: FTP url + :param mode: Mode to open FTP url in + :return: + """ + if "r" in mode: + host, port, user, passwd, path = self._parse_url(fn) + handle = urlopen("ftp://{}:{}@{}:{}/{}".format(user, passwd, host, port, path)) + return cast(IO[bytes], closing(handle)) + # TODO: support write mode + raise Exception("Write mode FTP not implemented") + + def _parse_url( + self, url: str + ) -> tuple[str, int, Optional[str], Optional[str], str]: + """ + Parse an FTP url into hostname, username, password, and path + :param url: + :return: hostname, username, password, path + """ + parse = urlparse(url) + user = parse.username + passwd = parse.password + host = parse.hostname + port = parse.port + path = parse.path + if host is None: + # The URL we connect to must have a host + raise RuntimeError(f"FTP URL does not contain a host: {url}") + # default port is 21 + if port is None: + port = 21 + if parse.scheme == "ftp": + if not user and self.netrc: + if host is not None: + creds = self.netrc.authenticators(host) + if creds: + user, _, passwd = creds + if not user: + if host is not None: + user, passwd = self._recall_credentials(host) + if passwd is None: + passwd = "anonymous@" + if user is None: + user = "anonymous" + return host, port, user, passwd, path + + def _connect(self, url: str) -> Optional[ftplib.FTP]: + """ + Connect to an FTP server. Handles authentication. + :param url: FTP url + :return: FTP object + """ + parse = urlparse(url) + if parse.scheme == "ftp": + host, port, user, passwd, _ = self._parse_url(url) + if host is None: + # there has to be a host + return None + if (host, user, passwd) in self.cache: + if self.cache[(host, user, passwd)].pwd(): + return self.cache[(host, user, passwd)] + ftp = ftplib.FTP_TLS() + # Note: the FTP lib logger handles logging itself and doesn't go through our logging implementation + ftp.set_debuglevel(1 if logger.isEnabledFor(logging.DEBUG) else 0) + ftp.connect(host, port) + env_user = os.getenv("TOIL_FTP_USER") + env_passwd = os.getenv("TOIL_FTP_PASSWORD") + if env_user: + user = env_user + if env_passwd: + passwd = env_passwd + ftp.login(user or "", passwd or "", secure=False) + self.cache[(host, user, passwd)] = ftp + return ftp + return None + + def _recall_credentials( + self, desired_host: str + ) -> tuple[Optional[str], Optional[str]]: + """ + Grab the cached credentials + :param desired_host: FTP hostname + :return: username, password + """ + for host, user, passwd in self.cache: + if desired_host == host: + return user, passwd + return None, None + + def size(self, fn: str) -> Optional[int]: + """ + Get the size of an FTP object + :param fn: FTP url + :return: Size of object + """ + ftp = self._connect(fn) + if ftp: + host, port, user, passwd, path = self._parse_url(fn) + try: + return ftp.size(path) + except ftplib.all_errors as e: + if str(e) == "550 SIZE not allowed in ASCII mode": + # some servers don't allow grabbing size in ascii mode + # https://stackoverflow.com/questions/22090001/get-folder-size-using-ftplib/22093848#22093848 + ftp.voidcmd("TYPE I") + return ftp.size(path) + handle = urlopen("ftp://{}:{}@{}:{}/{}".format(user, passwd, host, port, path)) + info = handle.info() + handle.close() + if "Content-length" in info: + return int(info["Content-length"]) + return None + + return None