diff --git a/docs/configuration.rst b/docs/configuration.rst index 6b7d6aa2..64c4eb34 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -29,6 +29,19 @@ This is what the default configuration looks like: # Determines whether remote or local git branches/tags are preferred if their output dirs conflict smv_prefer_remote_refs = False + # Specify build targets and whether the resulting artefacts should be downloadable + smv_build_targets = { + "HTML" : { + "builder": "html", + "downloadable": False, + "download_format": "", + }, + } + + # Flag indicating whether the intermediate build directories should be removed after artefacts are produced + smv_clean_intermediate_files = True + + You can override all of these values inside your :file:`conf.py`. .. note:: @@ -105,6 +118,157 @@ Here are some examples: Have a look at `PyFormat `_ for information how to use new-stye Python formatting. +Specify Additional Build Targets +================================ + +In addition to generating static HTML documentation, it is also possible to specify additional build targets for each version of your documentation by providing a value for the ``smv_build_targets`` setting. This can be used to generate and package the documentation for download, or for post processing by an external program. The ``smv_build_targets`` setting has the following format: + +.. code-block:: python + + smv_build_targets = { + "build_target_name" : { + "builder": ``, + "downloadable": bool, + "download_format": str + }, + } + +These fields can be populated as follows: + +* ``build_target_name``: This is the name of the build target. It must be unique within the ``smv_build_targets`` dictionary, and is used as the display name of the download artefacts if ``downloadable == True``. +* ``builder``: This is the string identifying any valid `sphinx builder `_. +* ``downloadable``: Indicate whether an artefact for this build should be generated. All artefacts are placed within the ``build/version/artefacts`` directory and made available in the html context. +* ``download_format``: A string indicating the format of the final downloadable artefact. Only valid if ``downloadable == True``. Valid values for this include ``tar``, ``zip``, ``pdf``, ``epub``, or any other extension for build artefacts produced by the sphinx builder specified in ``builder``. + + .. note:: + + If ``tar`` or ``zip`` are specified, the entire build directory is archived. An example of this would be the ``html`` directory for a ``html`` sphinx builder, or the ``latex`` directory for a ``latex`` sphinx builder. + + .. note:: + + When the build artefact is an individual file, it is only matched according to the pattern . to avoid the ambiguity associated with multiple matches to a file extension. To illustrate this limitation, html files are always indexed with ``index.html``, which would not be identified as an individual build artefact. Thus, in order to make HTML available as a build artefact it must be archived using ``zip``, ``tar``, ``gztar``, ``bztar`` or ``xztar``. + +Some common examples may be as follows: + +.. code-block:: python + + smv_build_targets = { + "HTML" : { + "builder": "html", + "downloadable": True, + "download_format": "zip", + }, + "SingleHTML" : { + "builder": "singlehtml", + "downloadable": True, + "download_format": "tar", + }, + "PDF" : { + "builder": "latexpdf", # This will build a .pdf file after generating latex documents + "downloadable": True, + "download_format": "pdf", + }, + "LaTeX" : { + "builder": "latex", # This will only generate latex documents. + "downloadable": True, + "download_format": "gztar", + }, + "ePub" : { + "builder": "epub", + "downloadable": True, + "download_format": "epub", + }, + } + +Additionally, the user is able to configure whether intermediate build files are cleaned from the output directory using the ``smv_clean_intermediate_files`` setting: + +.. code-block:: python + + smv_clean_intermediate_files = True + +If this flag is ``True``, the resulting directory structure will resemble the following: + +.. code-block:: bash + + build + ├── develop + │   ├── artefacts + │   │   ├── example_docs-develop.epub + │   │   ├── example_docs-develop-HTML.zip + │   │   └── example_docs-develop.pdf + │   ├── index.html + │   └── ... + ├── master + │   ├── artefacts + │   │   ├── example_docs-master.epub + │   │   ├── example_docs-master-HTML.zip + │   │   └── example_docs-master.pdf + │   ├── index.html + │   └── ... + └── v0.1.0 + ├── artefacts + │   ├── example_docs-v0.1.0.epub + │   ├── example_docs-v0.1.0-HTML.zip + │   └── example_docs-v0.1.0.pdf + ├── index.html + └── ... + +However, if this flag is set to ``False``, the resulting directory will also include intermediate build directories: + +.. code-block:: bash + + build + ├── develop + │   ├── artefacts + │   │   ├── example_docs-develop.epub + │   │   ├── example_docs-develop-HTML.zip + │   │   └── example_docs-develop.pdf + │   ├── epub + │   │   ├── example.epub + │   │   ├── index.xhtml + │   │   └── ... + │   ├── html + │   │   ├── index.html + │   │   └── ... + │   ├── index.html + │   ├── latexpdf + │   │   └── latex + │   └── ... + ├── master + │   ├── artefacts + │   │   ├── example_docs-master.epub + │   │   ├── example_docs-master-HTML.zip + │   │   └── example_docs-master.pdf + │   ├── epub + │   │   ├── example.epub + │   │   ├── index.xhtml + │   │   └── ... + │   ├── html + │   │   ├── index.html + │   │   └── ... + │   ├── index.html + │   ├── latexpdf + │   │   └── latex + │   └── ... + └── v0.1.0 + ├── artefacts + │   ├── example_docs-v0.1.0.epub + │   ├── example_docs-v0.1.0-HTML.zip + │   └── example_docs-v0.1.0.pdf + ├── epub + │   ├── example.epub + │   ├── index.xhtml + │   └── ... + ├── html + │   ├── index.html + │   └── ... + ├── index.html + ├── latexpdf + │   └── latex + └── ... + +This will be useful if you want to use an external program to interact with the build output. + Overriding Configuration Variables ================================== diff --git a/docs/templates.rst b/docs/templates.rst index 10191535..5a484646 100644 --- a/docs/templates.rst +++ b/docs/templates.rst @@ -80,6 +80,19 @@ List releases and development versions separately {% endif %} +List available downloads +------------------------ + +.. code-block:: html + + {% if current_version.artefacts %} +

{{ _('Downloads') }}

+ + {% endif %} Version Banners =============== @@ -139,6 +152,14 @@ So instead of adding a custom template to ``html_sidebars``, you need to create {%- endfor %} {%- endif %} + {%- if current_version.artefacts %} +
+
Downloads
+ {%- for artefact in current_version.artefacts %} +
{{ artefact.name }}
+ {%- endfor %} +
+ {%- endif %} {%- endif %} diff --git a/setup.py b/setup.py index 799e04d1..3b2d3e18 100755 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ version="0.2.4", install_requires=["sphinx >= 2.1"], license="BSD", - packages=["sphinx_multiversion"], + packages=["sphinx_multiversion", "sphinx_multiversion.lib"], entry_points={ "console_scripts": [ "sphinx-multiversion=sphinx_multiversion:main", diff --git a/sphinx_multiversion/lib/__init__.py b/sphinx_multiversion/lib/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sphinx_multiversion/lib/shutil.py b/sphinx_multiversion/lib/shutil.py new file mode 100644 index 00000000..73e9931e --- /dev/null +++ b/sphinx_multiversion/lib/shutil.py @@ -0,0 +1,1584 @@ +"""This is a direct copy of shutil.py from the cpython library, available at +https://github.com/python/cpython/blob/main/Lib/shutil.py. It is included here +exclusively for backwards compatability for python 3.6+. All authorship credit +for this file go to PSF and its developers. + +Utility functions for copying and archiving files and directory trees. +XXX The functions here don't copy the resource fork or other metadata on Mac. +""" +import os +import sys +import stat +import fnmatch +import collections +import errno + +try: + import zlib + + del zlib + _ZLIB_SUPPORTED = True +except ImportError: + _ZLIB_SUPPORTED = False + +try: + import bz2 + + del bz2 + _BZ2_SUPPORTED = True +except ImportError: + _BZ2_SUPPORTED = False + +try: + import lzma + + del lzma + _LZMA_SUPPORTED = True +except ImportError: + _LZMA_SUPPORTED = False + +_WINDOWS = os.name == "nt" +posix = nt = None +if os.name == "posix": + import posix +elif _WINDOWS: + import nt + +COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 +# This should never be removed, see rationale in: +# https://bugs.python.org/issue43743#msg393429 +_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") +_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS + +# CMD defaults in Windows 10 +_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC" + +__all__ = [ + "copyfileobj", + "copyfile", + "copymode", + "copystat", + "copy", + "copy2", + "copytree", + "move", + "rmtree", + "Error", + "SpecialFileError", + "ExecError", + "make_archive", + "get_archive_formats", + "register_archive_format", + "unregister_archive_format", + "get_unpack_formats", + "register_unpack_format", + "unregister_unpack_format", + "unpack_archive", + "ignore_patterns", + "chown", + "which", + "get_terminal_size", + "SameFileError", +] +# disk_usage is added later, if available on the platform + + +class Error(OSError): + pass + + +class SameFileError(Error): + """Raised when source and destination are the same file.""" + + +class SpecialFileError(OSError): + """Raised when trying to do a kind of operation (e.g. copying) which is + not supported on a special file (e.g. a named pipe)""" + + +class ExecError(OSError): + """Raised when a command could not be executed""" + + +class ReadError(OSError): + """Raised when an archive cannot be read""" + + +class RegistryError(Exception): + """Raised when a registry operation with the archiving + and unpacking registries fails""" + + +class _GiveupOnFastCopy(Exception): + """Raised as a signal to fallback on using raw read()/write() + file copy when fast-copy functions fail to do so. + """ + + +def _fastcopy_fcopyfile(fsrc, fdst, flags): + """Copy a regular file content or metadata by using high-performance + fcopyfile(3) syscall (macOS). + """ + try: + infd = fsrc.fileno() + outfd = fdst.fileno() + except Exception as err: + raise _GiveupOnFastCopy(err) # not a regular file + + try: + posix._fcopyfile(infd, outfd, flags) + except OSError as err: + err.filename = fsrc.name + err.filename2 = fdst.name + if err.errno in {errno.EINVAL, errno.ENOTSUP}: + raise _GiveupOnFastCopy(err) + else: + raise err from None + + +def _fastcopy_sendfile(fsrc, fdst): + """Copy data from one regular mmap-like fd to another by using + high-performance sendfile(2) syscall. + This should work on Linux >= 2.6.33 only. + """ + # Note: copyfileobj() is left alone in order to not introduce any + # unexpected breakage. Possible risks by using zero-copy calls + # in copyfileobj() are: + # - fdst cannot be open in "a"(ppend) mode + # - fsrc and fdst may be open in "t"(ext) mode + # - fsrc may be a BufferedReader (which hides unread data in a buffer), + # GzipFile (which decompresses data), HTTPResponse (which decodes + # chunks). + # - possibly others (e.g. encrypted fs/partition?) + global _USE_CP_SENDFILE + try: + infd = fsrc.fileno() + outfd = fdst.fileno() + except Exception as err: + raise _GiveupOnFastCopy(err) # not a regular file + + # Hopefully the whole file will be copied in a single call. + # sendfile() is called in a loop 'till EOF is reached (0 return) + # so a bufsize smaller or bigger than the actual file size + # should not make any difference, also in case the file content + # changes while being copied. + try: + blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB + except OSError: + blocksize = 2 ** 27 # 128MiB + # On 32-bit architectures truncate to 1GiB to avoid OverflowError, + # see bpo-38319. + if sys.maxsize < 2 ** 32: + blocksize = min(blocksize, 2 ** 30) + + offset = 0 + while True: + try: + sent = os.sendfile(outfd, infd, offset, blocksize) + except OSError as err: + # ...in oder to have a more informative exception. + err.filename = fsrc.name + err.filename2 = fdst.name + + if err.errno == errno.ENOTSOCK: + # sendfile() on this platform (probably Linux < 2.6.33) + # does not support copies between regular files (only + # sockets). + _USE_CP_SENDFILE = False + raise _GiveupOnFastCopy(err) + + if err.errno == errno.ENOSPC: # filesystem is full + raise err from None + + # Give up on first call and if no data was copied. + if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: + raise _GiveupOnFastCopy(err) + + raise err + else: + if sent == 0: + break # EOF + offset += sent + + +def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): + """readinto()/memoryview() based variant of copyfileobj(). + *fsrc* must support readinto() method and both files must be + open in binary mode. + """ + # Localize variable access to minimize overhead. + fsrc_readinto = fsrc.readinto + fdst_write = fdst.write + with memoryview(bytearray(length)) as mv: + while True: + n = fsrc_readinto(mv) + if not n: + break + elif n < length: + with mv[:n] as smv: + fdst.write(smv) + else: + fdst_write(mv) + + +def copyfileobj(fsrc, fdst, length=0): + """copy data from file-like object fsrc to file-like object fdst""" + # Localize variable access to minimize overhead. + if not length: + length = COPY_BUFSIZE + fsrc_read = fsrc.read + fdst_write = fdst.write + while True: + buf = fsrc_read(length) + if not buf: + break + fdst_write(buf) + + +def _samefile(src, dst): + # Macintosh, Unix. + if isinstance(src, os.DirEntry) and hasattr(os.path, "samestat"): + try: + return os.path.samestat(src.stat(), os.stat(dst)) + except OSError: + return False + + if hasattr(os.path, "samefile"): + try: + return os.path.samefile(src, dst) + except OSError: + return False + + # All other platforms: check for same pathname. + return os.path.normcase(os.path.abspath(src)) == os.path.normcase( + os.path.abspath(dst) + ) + + +def _stat(fn): + return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) + + +def _islink(fn): + return ( + fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) + ) + + +def copyfile(src, dst, *, follow_symlinks=True): + """Copy data from src to dst in the most efficient way possible. + If follow_symlinks is not set and src is a symbolic link, a new + symlink will be created instead of copying the file it points to. + """ + # sys.audit("shutil.copyfile", src, dst) + + if _samefile(src, dst): + raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) + + file_size = 0 + for i, fn in enumerate([src, dst]): + try: + st = _stat(fn) + except OSError: + # File most likely does not exist + pass + else: + # XXX What about other special files? (sockets, devices...) + if stat.S_ISFIFO(st.st_mode): + fn = fn.path if isinstance(fn, os.DirEntry) else fn + raise SpecialFileError("`%s` is a named pipe" % fn) + if _WINDOWS and i == 0: + file_size = st.st_size + + if not follow_symlinks and _islink(src): + os.symlink(os.readlink(src), dst) + else: + with open(src, "rb") as fsrc, open(dst, "wb") as fdst: + # macOS + if _HAS_FCOPYFILE: + try: + _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) + return dst + except _GiveupOnFastCopy: + pass + # Linux + elif _USE_CP_SENDFILE: + try: + _fastcopy_sendfile(fsrc, fdst) + return dst + except _GiveupOnFastCopy: + pass + # Windows, see: + # https://github.com/python/cpython/pull/7160#discussion_r195405230 + elif _WINDOWS and file_size > 0: + _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) + return dst + + copyfileobj(fsrc, fdst) + + return dst + + +def copymode(src, dst, *, follow_symlinks=True): + """Copy mode bits from src to dst. + If follow_symlinks is not set, symlinks aren't followed if and only + if both `src` and `dst` are symlinks. If `lchmod` isn't available + (e.g. Linux) this method does nothing. + """ + # sys.audit("shutil.copymode", src, dst) + + if not follow_symlinks and _islink(src) and os.path.islink(dst): + if hasattr(os, "lchmod"): + stat_func, chmod_func = os.lstat, os.lchmod + else: + return + else: + stat_func, chmod_func = _stat, os.chmod + + st = stat_func(src) + chmod_func(dst, stat.S_IMODE(st.st_mode)) + + +if hasattr(os, "listxattr"): + + def _copyxattr(src, dst, *, follow_symlinks=True): + """Copy extended filesystem attributes from `src` to `dst`. + Overwrite existing attributes. + If `follow_symlinks` is false, symlinks won't be followed. + """ + + try: + names = os.listxattr(src, follow_symlinks=follow_symlinks) + except OSError as e: + if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): + raise + return + for name in names: + try: + value = os.getxattr(src, name, follow_symlinks=follow_symlinks) + os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) + except OSError as e: + if e.errno not in ( + errno.EPERM, + errno.ENOTSUP, + errno.ENODATA, + errno.EINVAL, + ): + raise + + +else: + + def _copyxattr(*args, **kwargs): + pass + + +def copystat(src, dst, *, follow_symlinks=True): + """Copy file metadata + Copy the permission bits, last access time, last modification time, and + flags from `src` to `dst`. On Linux, copystat() also copies the "extended + attributes" where possible. The file contents, owner, and group are + unaffected. `src` and `dst` are path-like objects or path names given as + strings. + If the optional flag `follow_symlinks` is not set, symlinks aren't + followed if and only if both `src` and `dst` are symlinks. + """ + # sys.audit("shutil.copystat", src, dst) + + def _nop(*args, ns=None, follow_symlinks=None): + pass + + # follow symlinks (aka don't not follow symlinks) + follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) + if follow: + # use the real function if it exists + def lookup(name): + return getattr(os, name, _nop) + + else: + # use the real function only if it exists + # *and* it supports follow_symlinks + def lookup(name): + fn = getattr(os, name, _nop) + if fn in os.supports_follow_symlinks: + return fn + return _nop + + if isinstance(src, os.DirEntry): + st = src.stat(follow_symlinks=follow) + else: + st = lookup("stat")(src, follow_symlinks=follow) + mode = stat.S_IMODE(st.st_mode) + lookup("utime")( + dst, ns=(st.st_atime_ns, st.st_mtime_ns), follow_symlinks=follow + ) + # We must copy extended attributes before the file is (potentially) + # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. + _copyxattr(src, dst, follow_symlinks=follow) + try: + lookup("chmod")(dst, mode, follow_symlinks=follow) + except NotImplementedError: + # if we got a NotImplementedError, it's because + # * follow_symlinks=False, + # * lchown() is unavailable, and + # * either + # * fchownat() is unavailable or + # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. + # (it returned ENOSUP.) + # therefore we're out of options--we simply cannot chown the + # symlink. give up, suppress the error. + # (which is what shutil always did in this circumstance.) + pass + if hasattr(st, "st_flags"): + try: + lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) + except OSError as why: + for err in "EOPNOTSUPP", "ENOTSUP": + if hasattr(errno, err) and why.errno == getattr(errno, err): + break + else: + raise + + +def copy(src, dst, *, follow_symlinks=True): + """Copy data and mode bits ("cp src dst"). Return the file's destination. + The destination may be a directory. + If follow_symlinks is false, symlinks won't be followed. This + resembles GNU's "cp -P src dst". + If source and destination are the same file, a SameFileError will be + raised. + """ + if os.path.isdir(dst): + dst = os.path.join(dst, os.path.basename(src)) + copyfile(src, dst, follow_symlinks=follow_symlinks) + copymode(src, dst, follow_symlinks=follow_symlinks) + return dst + + +def copy2(src, dst, *, follow_symlinks=True): + """Copy data and metadata. Return the file's destination. + Metadata is copied with copystat(). Please see the copystat function + for more information. + The destination may be a directory. + If follow_symlinks is false, symlinks won't be followed. This + resembles GNU's "cp -P src dst". + """ + if os.path.isdir(dst): + dst = os.path.join(dst, os.path.basename(src)) + copyfile(src, dst, follow_symlinks=follow_symlinks) + copystat(src, dst, follow_symlinks=follow_symlinks) + return dst + + +def ignore_patterns(*patterns): + """Function that can be used as copytree() ignore parameter. + Patterns is a sequence of glob-style patterns + that are used to exclude files""" + + def _ignore_patterns(path, names): + ignored_names = [] + for pattern in patterns: + ignored_names.extend(fnmatch.filter(names, pattern)) + return set(ignored_names) + + return _ignore_patterns + + +def _copytree( + entries, + src, + dst, + symlinks, + ignore, + copy_function, + ignore_dangling_symlinks, + dirs_exist_ok=False, +): + if ignore is not None: + ignored_names = ignore(os.fspath(src), [x.name for x in entries]) + else: + ignored_names = set() + + os.makedirs(dst, exist_ok=dirs_exist_ok) + errors = [] + use_srcentry = copy_function is copy2 or copy_function is copy + + for srcentry in entries: + if srcentry.name in ignored_names: + continue + srcname = os.path.join(src, srcentry.name) + dstname = os.path.join(dst, srcentry.name) + srcobj = srcentry if use_srcentry else srcname + try: + is_symlink = srcentry.is_symlink() + if is_symlink and os.name == "nt": + # Special check for directory junctions, which appear as + # symlinks but we want to recurse. + lstat = srcentry.stat(follow_symlinks=False) + if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT: + is_symlink = False + if is_symlink: + linkto = os.readlink(srcname) + if symlinks: + # We can't just leave it to `copy_function` because legacy + # code with a custom `copy_function` may rely on copytree + # doing the right thing. + os.symlink(linkto, dstname) + copystat(srcobj, dstname, follow_symlinks=not symlinks) + else: + # ignore dangling symlink if the flag is on + if not os.path.exists(linkto) and ignore_dangling_symlinks: + continue + # otherwise let the copy occur. copy2 will raise an error + if srcentry.is_dir(): + copytree( + srcobj, + dstname, + symlinks, + ignore, + copy_function, + dirs_exist_ok=dirs_exist_ok, + ) + else: + copy_function(srcobj, dstname) + elif srcentry.is_dir(): + copytree( + srcobj, + dstname, + symlinks, + ignore, + copy_function, + dirs_exist_ok=dirs_exist_ok, + ) + else: + # Will raise a SpecialFileError for unsupported file types + copy_function(srcobj, dstname) + # catch the Error from the recursive copytree so that we can + # continue with other files + except Error as err: + errors.extend(err.args[0]) + except OSError as why: + errors.append((srcname, dstname, str(why))) + try: + copystat(src, dst) + except OSError as why: + # Copying file access times may fail on Windows + if getattr(why, "winerror", None) is None: + errors.append((src, dst, str(why))) + if errors: + raise Error(errors) + return dst + + +def copytree( + src, + dst, + symlinks=False, + ignore=None, + copy_function=copy2, + ignore_dangling_symlinks=False, + dirs_exist_ok=False, +): + """Recursively copy a directory tree and return the destination directory. + dirs_exist_ok dictates whether to raise an exception in case dst or any + missing parent directory already exists. + If exception(s) occur, an Error is raised with a list of reasons. + If the optional symlinks flag is true, symbolic links in the + source tree result in symbolic links in the destination tree; if + it is false, the contents of the files pointed to by symbolic + links are copied. If the file pointed by the symlink doesn't + exist, an exception will be added in the list of errors raised in + an Error exception at the end of the copy process. + You can set the optional ignore_dangling_symlinks flag to true if you + want to silence this exception. Notice that this has no effect on + platforms that don't support os.symlink. + The optional ignore argument is a callable. If given, it + is called with the `src` parameter, which is the directory + being visited by copytree(), and `names` which is the list of + `src` contents, as returned by os.listdir(): + callable(src, names) -> ignored_names + Since copytree() is called recursively, the callable will be + called once for each directory that is copied. It returns a + list of names relative to the `src` directory that should + not be copied. + The optional copy_function argument is a callable that will be used + to copy each file. It will be called with the source path and the + destination path as arguments. By default, copy2() is used, but any + function that supports the same signature (like copy()) can be used. + """ + # sys.audit("shutil.copytree", src, dst) + with os.scandir(src) as itr: + entries = list(itr) + return _copytree( + entries=entries, + src=src, + dst=dst, + symlinks=symlinks, + ignore=ignore, + copy_function=copy_function, + ignore_dangling_symlinks=ignore_dangling_symlinks, + dirs_exist_ok=dirs_exist_ok, + ) + + +if hasattr(os.stat_result, "st_file_attributes"): + # Special handling for directory junctions to make them behave like + # symlinks for shutil.rmtree, since in general they do not appear as + # regular links. + def _rmtree_isdir(entry): + try: + st = entry.stat(follow_symlinks=False) + return stat.S_ISDIR(st.st_mode) and not ( + st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT + and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT + ) + except OSError: + return False + + def _rmtree_islink(path): + try: + st = os.lstat(path) + return stat.S_ISLNK(st.st_mode) or ( + st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT + and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT + ) + except OSError: + return False + + +else: + + def _rmtree_isdir(entry): + try: + return entry.is_dir(follow_symlinks=False) + except OSError: + return False + + def _rmtree_islink(path): + return os.path.islink(path) + + +# version vulnerable to race conditions +def _rmtree_unsafe(path, onerror): + try: + with os.scandir(path) as scandir_it: + entries = list(scandir_it) + except OSError: + onerror(os.scandir, path, sys.exc_info()) + entries = [] + for entry in entries: + fullname = entry.path + if _rmtree_isdir(entry): + try: + if entry.is_symlink(): + # This can only happen if someone replaces + # a directory with a symlink after the call to + # os.scandir or entry.is_dir above. + raise OSError("Cannot call rmtree on a symbolic link") + except OSError: + onerror(os.path.islink, fullname, sys.exc_info()) + continue + _rmtree_unsafe(fullname, onerror) + else: + try: + os.unlink(fullname) + except OSError: + onerror(os.unlink, fullname, sys.exc_info()) + try: + os.rmdir(path) + except OSError: + onerror(os.rmdir, path, sys.exc_info()) + + +# Version using fd-based APIs to protect against races +def _rmtree_safe_fd(topfd, path, onerror): + try: + with os.scandir(topfd) as scandir_it: + entries = list(scandir_it) + except OSError as err: + err.filename = path + onerror(os.scandir, path, sys.exc_info()) + return + for entry in entries: + fullname = os.path.join(path, entry.name) + try: + is_dir = entry.is_dir(follow_symlinks=False) + except OSError: + is_dir = False + else: + if is_dir: + try: + orig_st = entry.stat(follow_symlinks=False) + is_dir = stat.S_ISDIR(orig_st.st_mode) + except OSError: + onerror(os.lstat, fullname, sys.exc_info()) + continue + if is_dir: + try: + dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd) + except OSError: + onerror(os.open, fullname, sys.exc_info()) + else: + try: + if os.path.samestat(orig_st, os.fstat(dirfd)): + _rmtree_safe_fd(dirfd, fullname, onerror) + try: + os.rmdir(entry.name, dir_fd=topfd) + except OSError: + onerror(os.rmdir, fullname, sys.exc_info()) + else: + try: + # This can only happen if someone replaces + # a directory with a symlink after the call to + # os.scandir or stat.S_ISDIR above. + raise OSError( + "Cannot call rmtree on a symbolic " "link" + ) + except OSError: + onerror(os.path.islink, fullname, sys.exc_info()) + finally: + os.close(dirfd) + else: + try: + os.unlink(entry.name, dir_fd=topfd) + except OSError: + onerror(os.unlink, fullname, sys.exc_info()) + + +_use_fd_functions = ( + {os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd + and os.scandir in os.supports_fd + and os.stat in os.supports_follow_symlinks +) + + +def rmtree(path, ignore_errors=False, onerror=None): + """Recursively delete a directory tree. + If ignore_errors is set, errors are ignored; otherwise, if onerror + is set, it is called to handle the error with arguments (func, + path, exc_info) where func is platform and implementation dependent; + path is the argument to that function that caused it to fail; and + exc_info is a tuple returned by sys.exc_info(). If ignore_errors + is false and onerror is None, an exception is raised. + """ + # sys.audit("shutil.rmtree", path) + if ignore_errors: + + def onerror(*args): + pass + + elif onerror is None: + + def onerror(*args): + raise + + if _use_fd_functions: + # While the unsafe rmtree works fine on bytes, the fd based does not. + if isinstance(path, bytes): + path = os.fsdecode(path) + # Note: To guard against symlink races, we use the standard + # lstat()/open()/fstat() trick. + try: + orig_st = os.lstat(path) + except Exception: + onerror(os.lstat, path, sys.exc_info()) + return + try: + fd = os.open(path, os.O_RDONLY) + except Exception: + onerror(os.open, path, sys.exc_info()) + return + try: + if os.path.samestat(orig_st, os.fstat(fd)): + _rmtree_safe_fd(fd, path, onerror) + try: + os.rmdir(path) + except OSError: + onerror(os.rmdir, path, sys.exc_info()) + else: + try: + # symlinks to directories are forbidden, see bug #1669 + raise OSError("Cannot call rmtree on a symbolic link") + except OSError: + onerror(os.path.islink, path, sys.exc_info()) + finally: + os.close(fd) + else: + try: + if _rmtree_islink(path): + # symlinks to directories are forbidden, see bug #1669 + raise OSError("Cannot call rmtree on a symbolic link") + except OSError: + onerror(os.path.islink, path, sys.exc_info()) + # can't continue even if onerror hook returns + return + return _rmtree_unsafe(path, onerror) + + +# Allow introspection of whether or not the hardening against symlink +# attacks is supported on the current platform +rmtree.avoids_symlink_attacks = _use_fd_functions + + +def _basename(path): + """A basename() variant which first strips the trailing slash, if present. + Thus we always get the last component of the path, even for directories. + path: Union[PathLike, str] + e.g. + >>> os.path.basename('/bar/foo') + 'foo' + >>> os.path.basename('/bar/foo/') + '' + >>> _basename('/bar/foo/') + 'foo' + """ + path = os.fspath(path) + sep = os.path.sep + (os.path.altsep or "") + return os.path.basename(path.rstrip(sep)) + + +def move(src, dst, copy_function=copy2): + """Recursively move a file or directory to another location. This is + similar to the Unix "mv" command. Return the file or directory's + destination. + If the destination is a directory or a symlink to a directory, the source + is moved inside the directory. The destination path must not already + exist. + If the destination already exists but is not a directory, it may be + overwritten depending on os.rename() semantics. + If the destination is on our current filesystem, then rename() is used. + Otherwise, src is copied to the destination and then removed. Symlinks are + recreated under the new name if os.rename() fails because of cross + filesystem renames. + The optional `copy_function` argument is a callable that will be used + to copy the source or it will be delegated to `copytree`. + By default, copy2() is used, but any function that supports the same + signature (like copy()) can be used. + A lot more could be done here... A look at a mv.c shows a lot of + the issues this implementation glosses over. + """ + # sys.audit("shutil.move", src, dst) + real_dst = dst + if os.path.isdir(dst): + if _samefile(src, dst): + # We might be on a case insensitive filesystem, + # perform the rename anyway. + os.rename(src, dst) + return + + # Using _basename instead of os.path.basename is important, as we must + # ignore any trailing slash to avoid the basename returning '' + real_dst = os.path.join(dst, _basename(src)) + + if os.path.exists(real_dst): + raise Error("Destination path '%s' already exists" % real_dst) + try: + os.rename(src, real_dst) + except OSError: + if os.path.islink(src): + linkto = os.readlink(src) + os.symlink(linkto, real_dst) + os.unlink(src) + elif os.path.isdir(src): + if _destinsrc(src, dst): + raise Error( + "Cannot move a directory '%s' into itself" + " '%s'." % (src, dst) + ) + if _is_immutable(src) or ( + not os.access(src, os.W_OK) + and os.listdir(src) + and sys.platform == "darwin" + ): + raise PermissionError( + "Cannot move the non-empty directory " + "'%s': Lacking write permission to '%s'." % (src, src) + ) + copytree(src, real_dst, copy_function=copy_function, symlinks=True) + rmtree(src) + else: + copy_function(src, real_dst) + os.unlink(src) + return real_dst + + +def _destinsrc(src, dst): + src = os.path.abspath(src) + dst = os.path.abspath(dst) + if not src.endswith(os.path.sep): + src += os.path.sep + if not dst.endswith(os.path.sep): + dst += os.path.sep + return dst.startswith(src) + + +def _is_immutable(src): + st = _stat(src) + immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE] + return hasattr(st, "st_flags") and st.st_flags in immutable_states + + +def _get_gid(name): + """Returns a gid, given a group name.""" + if name is None: + return None + + try: + from grp import getgrnam + except ImportError: + return None + + try: + result = getgrnam(name) + except KeyError: + result = None + if result is not None: + return result[2] + return None + + +def _get_uid(name): + """Returns an uid, given a user name.""" + if name is None: + return None + + try: + from pwd import getpwnam + except ImportError: + return None + + try: + result = getpwnam(name) + except KeyError: + result = None + if result is not None: + return result[2] + return None + + +def _make_tarball( + base_name, + base_dir, + compress="gzip", + verbose=0, + dry_run=0, + owner=None, + group=None, + logger=None, +): + """Create a (possibly compressed) tar file from all the files under + 'base_dir'. + 'compress' must be "gzip" (the default), "bzip2", "xz", or None. + 'owner' and 'group' can be used to define an owner and a group for the + archive that is being built. If not provided, the current owner and group + will be used. + The output tar file will be named 'base_name' + ".tar", possibly plus + the appropriate compression extension (".gz", ".bz2", or ".xz"). + Returns the output filename. + """ + if compress is None: + tar_compression = "" + elif _ZLIB_SUPPORTED and compress == "gzip": + tar_compression = "gz" + elif _BZ2_SUPPORTED and compress == "bzip2": + tar_compression = "bz2" + elif _LZMA_SUPPORTED and compress == "xz": + tar_compression = "xz" + else: + raise ValueError( + "bad value for 'compress', or compression format not " + "supported : {0}".format(compress) + ) + + import tarfile # late import for breaking circular dependency + + compress_ext = "." + tar_compression if compress else "" + archive_name = base_name + ".tar" + compress_ext + archive_dir = os.path.dirname(archive_name) + + if archive_dir and not os.path.exists(archive_dir): + if logger is not None: + logger.info("creating %s", archive_dir) + if not dry_run: + os.makedirs(archive_dir) + + # creating the tarball + if logger is not None: + logger.info("Creating tar archive") + + uid = _get_uid(owner) + gid = _get_gid(group) + + def _set_uid_gid(tarinfo): + if gid is not None: + tarinfo.gid = gid + tarinfo.gname = group + if uid is not None: + tarinfo.uid = uid + tarinfo.uname = owner + return tarinfo + + if not dry_run: + tar = tarfile.open(archive_name, "w|%s" % tar_compression) + try: + tar.add(base_dir, filter=_set_uid_gid) + finally: + tar.close() + + return archive_name + + +def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): + """Create a zip file from all the files under 'base_dir'. + The output zip file will be named 'base_name' + ".zip". Returns the + name of the output zip file. + """ + import zipfile # late import for breaking circular dependency + + zip_filename = base_name + ".zip" + archive_dir = os.path.dirname(base_name) + + if archive_dir and not os.path.exists(archive_dir): + if logger is not None: + logger.info("creating %s", archive_dir) + if not dry_run: + os.makedirs(archive_dir) + + if logger is not None: + logger.info( + "creating '%s' and adding '%s' to it", zip_filename, base_dir + ) + + if not dry_run: + with zipfile.ZipFile( + zip_filename, "w", compression=zipfile.ZIP_DEFLATED + ) as zf: + path = os.path.normpath(base_dir) + if path != os.curdir: + zf.write(path, path) + if logger is not None: + logger.info("adding '%s'", path) + for dirpath, dirnames, filenames in os.walk(base_dir): + for name in sorted(dirnames): + path = os.path.normpath(os.path.join(dirpath, name)) + zf.write(path, path) + if logger is not None: + logger.info("adding '%s'", path) + for name in filenames: + path = os.path.normpath(os.path.join(dirpath, name)) + if os.path.isfile(path): + zf.write(path, path) + if logger is not None: + logger.info("adding '%s'", path) + + return zip_filename + + +_ARCHIVE_FORMATS = { + "tar": (_make_tarball, [("compress", None)], "uncompressed tar file"), +} + +if _ZLIB_SUPPORTED: + _ARCHIVE_FORMATS["gztar"] = ( + _make_tarball, + [("compress", "gzip")], + "gzip'ed tar-file", + ) + _ARCHIVE_FORMATS["zip"] = (_make_zipfile, [], "ZIP file") + +if _BZ2_SUPPORTED: + _ARCHIVE_FORMATS["bztar"] = ( + _make_tarball, + [("compress", "bzip2")], + "bzip2'ed tar-file", + ) + +if _LZMA_SUPPORTED: + _ARCHIVE_FORMATS["xztar"] = ( + _make_tarball, + [("compress", "xz")], + "xz'ed tar-file", + ) + + +def get_archive_formats(): + """Returns a list of supported formats for archiving and unarchiving. + Each element of the returned sequence is a tuple (name, description) + """ + formats = [ + (name, registry[2]) for name, registry in _ARCHIVE_FORMATS.items() + ] + formats.sort() + return formats + + +def register_archive_format(name, function, extra_args=None, description=""): + """Registers an archive format. + name is the name of the format. function is the callable that will be + used to create archives. If provided, extra_args is a sequence of + (name, value) tuples that will be passed as arguments to the callable. + description can be provided to describe the format, and will be returned + by the get_archive_formats() function. + """ + if extra_args is None: + extra_args = [] + if not callable(function): + raise TypeError("The %s object is not callable" % function) + if not isinstance(extra_args, (tuple, list)): + raise TypeError("extra_args needs to be a sequence") + for element in extra_args: + if not isinstance(element, (tuple, list)) or len(element) != 2: + raise TypeError("extra_args elements are : (arg_name, value)") + + _ARCHIVE_FORMATS[name] = (function, extra_args, description) + + +def unregister_archive_format(name): + del _ARCHIVE_FORMATS[name] + + +def make_archive( + base_name, + format, + root_dir=None, + base_dir=None, + verbose=0, + dry_run=0, + owner=None, + group=None, + logger=None, +): + """Create an archive file (eg. zip or tar). + 'base_name' is the name of the file to create, minus any format-specific + extension; 'format' is the archive format: one of "zip", "tar", "gztar", + "bztar", or "xztar". Or any other registered format. + 'root_dir' is a directory that will be the root directory of the + archive; ie. we typically chdir into 'root_dir' before creating the + archive. 'base_dir' is the directory where we start archiving from; + ie. 'base_dir' will be the common prefix of all files and + directories in the archive. 'root_dir' and 'base_dir' both default + to the current directory. Returns the name of the archive file. + 'owner' and 'group' are used when creating a tar archive. By default, + uses the current owner and group. + """ + # sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) + save_cwd = os.getcwd() + if root_dir is not None: + if logger is not None: + logger.debug("changing into '%s'", root_dir) + base_name = os.path.abspath(base_name) + if not dry_run: + os.chdir(root_dir) + + if base_dir is None: + base_dir = os.curdir + + kwargs = {"dry_run": dry_run, "logger": logger} + + try: + format_info = _ARCHIVE_FORMATS[format] + except KeyError: + raise ValueError("unknown archive format '%s'" % format) from None + + func = format_info[0] + for arg, val in format_info[1]: + kwargs[arg] = val + + if format != "zip": + kwargs["owner"] = owner + kwargs["group"] = group + + try: + filename = func(base_name, base_dir, **kwargs) + finally: + if root_dir is not None: + if logger is not None: + logger.debug("changing back to '%s'", save_cwd) + os.chdir(save_cwd) + + return filename + + +def get_unpack_formats(): + """Returns a list of supported formats for unpacking. + Each element of the returned sequence is a tuple + (name, extensions, description) + """ + formats = [ + (name, info[0], info[3]) for name, info in _UNPACK_FORMATS.items() + ] + formats.sort() + return formats + + +def _check_unpack_options(extensions, function, extra_args): + """Checks what gets registered as an unpacker.""" + # first make sure no other unpacker is registered for this extension + existing_extensions = {} + for name, info in _UNPACK_FORMATS.items(): + for ext in info[0]: + existing_extensions[ext] = name + + for extension in extensions: + if extension in existing_extensions: + msg = '%s is already registered for "%s"' + raise RegistryError( + msg % (extension, existing_extensions[extension]) + ) + + if not callable(function): + raise TypeError("The registered function must be a callable") + + +def register_unpack_format( + name, extensions, function, extra_args=None, description="" +): + """Registers an unpack format. + `name` is the name of the format. `extensions` is a list of extensions + corresponding to the format. + `function` is the callable that will be + used to unpack archives. The callable will receive archives to unpack. + If it's unable to handle an archive, it needs to raise a ReadError + exception. + If provided, `extra_args` is a sequence of + (name, value) tuples that will be passed as arguments to the callable. + description can be provided to describe the format, and will be returned + by the get_unpack_formats() function. + """ + if extra_args is None: + extra_args = [] + _check_unpack_options(extensions, function, extra_args) + _UNPACK_FORMATS[name] = extensions, function, extra_args, description + + +def unregister_unpack_format(name): + """Removes the pack format from the registry.""" + del _UNPACK_FORMATS[name] + + +def _ensure_directory(path): + """Ensure that the parent directory of `path` exists""" + dirname = os.path.dirname(path) + if not os.path.isdir(dirname): + os.makedirs(dirname) + + +def _unpack_zipfile(filename, extract_dir): + """Unpack zip `filename` to `extract_dir`""" + import zipfile # late import for breaking circular dependency + + if not zipfile.is_zipfile(filename): + raise ReadError("%s is not a zip file" % filename) + + zip = zipfile.ZipFile(filename) + try: + for info in zip.infolist(): + name = info.filename + + # don't extract absolute paths or ones with .. in them + if name.startswith("/") or ".." in name: + continue + + targetpath = os.path.join(extract_dir, *name.split("/")) + if not targetpath: + continue + + _ensure_directory(targetpath) + if not name.endswith("/"): + # file + with zip.open(name, "r") as source, open( + targetpath, "wb" + ) as target: + copyfileobj(source, target) + finally: + zip.close() + + +def _unpack_tarfile(filename, extract_dir): + """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`""" + import tarfile # late import for breaking circular dependency + + try: + tarobj = tarfile.open(filename) + except tarfile.TarError: + raise ReadError( + "%s is not a compressed or uncompressed tar file" % filename + ) + try: + tarobj.extractall(extract_dir) + finally: + tarobj.close() + + +_UNPACK_FORMATS = { + "tar": ([".tar"], _unpack_tarfile, [], "uncompressed tar file"), + "zip": ([".zip"], _unpack_zipfile, [], "ZIP file"), +} + +if _ZLIB_SUPPORTED: + _UNPACK_FORMATS["gztar"] = ( + [".tar.gz", ".tgz"], + _unpack_tarfile, + [], + "gzip'ed tar-file", + ) + +if _BZ2_SUPPORTED: + _UNPACK_FORMATS["bztar"] = ( + [".tar.bz2", ".tbz2"], + _unpack_tarfile, + [], + "bzip2'ed tar-file", + ) + +if _LZMA_SUPPORTED: + _UNPACK_FORMATS["xztar"] = ( + [".tar.xz", ".txz"], + _unpack_tarfile, + [], + "xz'ed tar-file", + ) + + +def _find_unpack_format(filename): + for name, info in _UNPACK_FORMATS.items(): + for extension in info[0]: + if filename.endswith(extension): + return name + return None + + +def unpack_archive(filename, extract_dir=None, format=None): + """Unpack an archive. + `filename` is the name of the archive. + `extract_dir` is the name of the target directory, where the archive + is unpacked. If not provided, the current working directory is used. + `format` is the archive format: one of "zip", "tar", "gztar", "bztar", + or "xztar". Or any other registered format. If not provided, + unpack_archive will use the filename extension and see if an unpacker + was registered for that extension. + In case none is found, a ValueError is raised. + """ + # sys.audit("shutil.unpack_archive", filename, extract_dir, format) + + if extract_dir is None: + extract_dir = os.getcwd() + + extract_dir = os.fspath(extract_dir) + filename = os.fspath(filename) + + if format is not None: + try: + format_info = _UNPACK_FORMATS[format] + except KeyError: + raise ValueError( + "Unknown unpack format '{0}'".format(format) + ) from None + + func = format_info[1] + func(filename, extract_dir, **dict(format_info[2])) + else: + # we need to look at the registered unpackers supported extensions + format = _find_unpack_format(filename) + if format is None: + raise ReadError("Unknown archive format '{0}'".format(filename)) + + func = _UNPACK_FORMATS[format][1] + kwargs = dict(_UNPACK_FORMATS[format][2]) + func(filename, extract_dir, **kwargs) + + +if hasattr(os, "statvfs"): + + __all__.append("disk_usage") + _ntuple_diskusage = collections.namedtuple("usage", "total used free") + _ntuple_diskusage.total.__doc__ = "Total space in bytes" + _ntuple_diskusage.used.__doc__ = "Used space in bytes" + _ntuple_diskusage.free.__doc__ = "Free space in bytes" + + def disk_usage(path): + """Return disk usage statistics about the given path. + Returned value is a named tuple with attributes 'total', 'used' and + 'free', which are the amount of total, used and free space, in bytes. + """ + st = os.statvfs(path) + free = st.f_bavail * st.f_frsize + total = st.f_blocks * st.f_frsize + used = (st.f_blocks - st.f_bfree) * st.f_frsize + return _ntuple_diskusage(total, used, free) + + +elif _WINDOWS: + + __all__.append("disk_usage") + _ntuple_diskusage = collections.namedtuple("usage", "total used free") + + def disk_usage(path): + """Return disk usage statistics about the given path. + Returned values is a named tuple with attributes 'total', 'used' and + 'free', which are the amount of total, used and free space, in bytes. + """ + total, free = nt._getdiskusage(path) + used = total - free + return _ntuple_diskusage(total, used, free) + + +def chown(path, user=None, group=None): + """Change owner user and group of the given path. + user and group can be the uid/gid or the user/group names, and in that + case, they are converted to their respective uid/gid. + """ + # sys.audit("shutil.chown", path, user, group) + + if user is None and group is None: + raise ValueError("user and/or group must be set") + + _user = user + _group = group + + # -1 means don't change it + if user is None: + _user = -1 + # user can either be an int (the uid) or a string (the system username) + elif isinstance(user, str): + _user = _get_uid(user) + if _user is None: + raise LookupError("no such user: {!r}".format(user)) + + if group is None: + _group = -1 + elif not isinstance(group, int): + _group = _get_gid(group) + if _group is None: + raise LookupError("no such group: {!r}".format(group)) + + os.chown(path, _user, _group) + + +def get_terminal_size(fallback=(80, 24)): + """Get the size of the terminal window. + For each of the two dimensions, the environment variable, COLUMNS + and LINES respectively, is checked. If the variable is defined and + the value is a positive integer, it is used. + When COLUMNS or LINES is not defined, which is the common case, + the terminal connected to sys.__stdout__ is queried + by invoking os.get_terminal_size. + If the terminal size cannot be successfully queried, either because + the system doesn't support querying, or because we are not + connected to a terminal, the value given in fallback parameter + is used. Fallback defaults to (80, 24) which is the default + size used by many terminal emulators. + The value returned is a named tuple of type os.terminal_size. + """ + # columns, lines are the working values + try: + columns = int(os.environ["COLUMNS"]) + except (KeyError, ValueError): + columns = 0 + + try: + lines = int(os.environ["LINES"]) + except (KeyError, ValueError): + lines = 0 + + # only query if necessary + if columns <= 0 or lines <= 0: + try: + size = os.get_terminal_size(sys.__stdout__.fileno()) + except (AttributeError, ValueError, OSError): + # stdout is None, closed, detached, or not a terminal, or + # os.get_terminal_size() is unsupported + size = os.terminal_size(fallback) + if columns <= 0: + columns = size.columns + if lines <= 0: + lines = size.lines + + return os.terminal_size((columns, lines)) + + +# Check that a given file can be accessed with the correct mode. +# Additionally check that `file` is not a directory, as on Windows +# directories pass the os.access check. +def _access_check(fn, mode): + return os.path.exists(fn) and os.access(fn, mode) and not os.path.isdir(fn) + + +def which(cmd, mode=os.F_OK | os.X_OK, path=None): + """Given a command, mode, and a PATH string, return the path which + conforms to the given mode on the PATH, or None if there is no such + file. + `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result + of os.environ.get("PATH"), or can be overridden with a custom search + path. + """ + # If we're given a path with a directory part, look it up directly rather + # than referring to PATH directories. This includes checking relative to + # the current directory, e.g. ./script + if os.path.dirname(cmd): + if _access_check(cmd, mode): + return cmd + return None + + use_bytes = isinstance(cmd, bytes) + + if path is None: + path = os.environ.get("PATH", None) + if path is None: + try: + path = os.confstr("CS_PATH") + except (AttributeError, ValueError): + # os.confstr() or CS_PATH is not available + path = os.defpath + # bpo-35755: Don't use os.defpath if the PATH environment variable is + # set to an empty string + + # PATH='' doesn't match, whereas PATH=':' looks in the current directory + if not path: + return None + + if use_bytes: + path = os.fsencode(path) + path = path.split(os.fsencode(os.pathsep)) + else: + path = os.fsdecode(path) + path = path.split(os.pathsep) + + if sys.platform == "win32": + # The current directory takes precedence on Windows. + curdir = os.curdir + if use_bytes: + curdir = os.fsencode(curdir) + if curdir not in path: + path.insert(0, curdir) + + # PATHEXT is necessary to check on Windows. + pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT + pathext = [ext for ext in pathext_source.split(os.pathsep) if ext] + + if use_bytes: + pathext = [os.fsencode(ext) for ext in pathext] + # See if the given file matches any of the expected path extensions. + # This will allow us to short circuit when given "python.exe". + # If it does match, only test that one, otherwise we have to try + # others. + if any(cmd.lower().endswith(ext.lower()) for ext in pathext): + files = [cmd] + else: + files = [cmd + ext for ext in pathext] + else: + # On other platforms you don't have things like PATHEXT to tell you + # what file suffixes are executable, so just pass on cmd as-is. + files = [cmd] + + seen = set() + for dir in path: + normdir = os.path.normcase(dir) + # if not normdir in seen: + if normdir not in seen: + seen.add(normdir) + for thefile in files: + name = os.path.join(dir, thefile) + if _access_check(name, mode): + return name + return None diff --git a/sphinx_multiversion/main.py b/sphinx_multiversion/main.py index 9870b314..d552b310 100644 --- a/sphinx_multiversion/main.py +++ b/sphinx_multiversion/main.py @@ -7,6 +7,7 @@ import logging import os import pathlib +import glob import re import string import subprocess @@ -18,6 +19,7 @@ from . import sphinx from . import git +from .lib import shutil @contextlib.contextmanager @@ -66,7 +68,19 @@ def load_sphinx_config_worker(q, confpath, confoverrides, add_defaults): "html", str, ) + current_config.add( + "smv_build_targets", + sphinx.DEFAULT_BUILD_TARGETS, + "html", + {str: {str: str}}, + ) current_config.add("smv_prefer_remote_refs", False, "html", bool) + current_config.add( + "smv_clean_intermediate_files", + sphinx.DEFAULT_CLEAN_INTERMEDIATE_FILES_FLAG, + "html", + bool, + ) current_config.pre_init_values() current_config.init_values() except Exception as err: @@ -274,6 +288,7 @@ def main(argv=None): ) metadata[gitref.name] = { "name": gitref.name, + "project": current_config.project, "version": current_config.version, "release": current_config.release, "rst_prolog": current_config.rst_prolog, @@ -289,6 +304,7 @@ def main(argv=None): ), "confdir": confpath, "docnames": list(project.discover()), + "build_targets": config.smv_build_targets, } if args.dump_metadata: @@ -324,31 +340,149 @@ def main(argv=None): "smv_current_version={}".format(version_name), "-c", confdir_absolute, - data["sourcedir"], - data["outputdir"], *args.filenames, ] ) - logger.debug("Running sphinx-build with args: %r", current_argv) - cmd = ( - sys.executable, - *get_python_flags(), - "-m", - "sphinx", - *current_argv, - ) - current_cwd = os.path.join(data["basedir"], cwd_relative) - env = os.environ.copy() - env.update( - { - "SPHINX_MULTIVERSION_NAME": data["name"], - "SPHINX_MULTIVERSION_VERSION": data["version"], - "SPHINX_MULTIVERSION_RELEASE": data["release"], - "SPHINX_MULTIVERSION_SOURCEDIR": data["sourcedir"], - "SPHINX_MULTIVERSION_OUTPUTDIR": data["outputdir"], - "SPHINX_MULTIVERSION_CONFDIR": data["confdir"], - } - ) - subprocess.check_call(cmd, cwd=current_cwd, env=env) + for build_target_name, build_target in data[ + "build_targets" + ].items(): + builder = build_target.get("builder", None) + if not builder: + raise AttributeError( + "Builder for build target {} not defined".format( + build_target_name + ) + ) + + downloadable = build_target.get("downloadable", False) + download_format = build_target.get("download_format", "") + + if downloadable and download_format == "": + raise ValueError( + ( + "Download format for build target {} not defined " + "but downloadable is True" + ).format(build_target_name) + ) + + flag = "-b" + builder_modifier = builder + if builder == "latexpdf" or builder == "info": + flag = "-M" + + build_args = [flag, builder] + target_build_dir = "{}/{}".format( + data["outputdir"], builder_modifier + ) + logger.debug( + "Running sphinx-build with args: %r", + build_args + current_argv, + ) + # Create sphinx-build command + cmd = ( + sys.executable, + *get_python_flags(), + "-m", + "sphinx", + *build_args, + data["sourcedir"], + target_build_dir, + *current_argv, + ) + current_cwd = os.path.join(data["basedir"], cwd_relative) + env = os.environ.copy() + env.update( + { + "SPHINX_MULTIVERSION_NAME": data["name"], + "SPHINX_MULTIVERSION_VERSION": data["version"], + "SPHINX_MULTIVERSION_RELEASE": data["release"], + "SPHINX_MULTIVERSION_SOURCEDIR": data["sourcedir"], + "SPHINX_MULTIVERSION_OUTPUTDIR": data["outputdir"], + "SPHINX_MULTIVERSION_CONFDIR": data["confdir"], + } + ) + # Run sphinx-build + subprocess.check_call(cmd, cwd=current_cwd, env=env) + + # Create artefacts if this build target should be downloadable + if downloadable: + artefact_dir = "{}/artefacts".format(data["outputdir"]) + os.makedirs(artefact_dir, exist_ok=True) + filename = "{project}_docs-{version}".format( + project=config.project.replace(" ", ""), + version=version_name.replace("/", "-"), + ) + + # Make an archive out of the build targets build directory + # Archive types supported by shutil.make_archive + if download_format in sphinx.ARCHIVE_TYPES: + shutil.make_archive( + "{}/{}-{}".format( + artefact_dir, filename, build_target_name + ), + download_format, + target_build_dir, + ) + else: + # Find files matching project-name.extension, e.g. + # example.pdf in the target build directory + candidate_files = glob.glob( + "{build_dir}/**/*.{extension}".format( + build_dir=target_build_dir, + extension=download_format, + ), + recursive=True, + ) + if len(candidate_files) > 1: + build_file_pattern = ( + "{project}.{extension}".format( + project=config.project.replace(" ", ""), + extension=download_format, + ) + ) + build_artefacts = [ + x + for x in candidate_files + if pathlib.Path(x.lower()).name + == build_file_pattern.lower() + ] + else: + build_artefacts = candidate_files + if len(build_artefacts) == 0: + logger.warning( + ( + "Build artefact {project}" "not found." + ).format( + project=build_file_pattern.lower(), + ) + ) + elif len(build_artefacts) > 1: + logger.warning( + ( + "Ambiguous build artefact results: {}. " + "Files not moved to artefact directory." + ).format(build_artefacts) + ) + else: + # Found a single file with appropriate extension + shutil.copy( + build_artefacts[0], + os.path.join( + artefact_dir, + "{}.{}".format(filename, download_format), + ), + ) + + # Clean up build directory + # Move html target to root of outputdir, if it exists. + if build_target_name == "HTML": + shutil.copytree( + os.path.join(target_build_dir), + os.path.join(data["outputdir"]), + dirs_exist_ok=True, + ) + # Remove build directories + if config.smv_clean_intermediate_files: + shutil.rmtree(target_build_dir) return 0 diff --git a/sphinx_multiversion/sphinx.py b/sphinx_multiversion/sphinx.py index 5a2edeb9..1d16b486 100644 --- a/sphinx_multiversion/sphinx.py +++ b/sphinx_multiversion/sphinx.py @@ -18,6 +18,15 @@ DEFAULT_REMOTE_WHITELIST = None DEFAULT_RELEASED_PATTERN = r"^tags/.*$" DEFAULT_OUTPUTDIR_FORMAT = r"{ref.name}" +DEFAULT_BUILD_TARGETS = { + "HTML": { + "builder": "html", + "downloadable": False, + "download_format": "", + }, +} +DEFAULT_CLEAN_INTERMEDIATE_FILES_FLAG = True +ARCHIVE_TYPES = ["zip", "tar", "gztar", "bztar", "xztar"] Version = collections.namedtuple( "Version", @@ -27,6 +36,7 @@ "version", "release", "is_released", + "artefacts", ], ) @@ -45,6 +55,12 @@ def _dict_to_versionobj(self, v): version=v["version"], release=v["release"], is_released=v["is_released"], + artefacts=[ + {"name": name, "url": self.apathto(name, target)} + for name, target in v["build_targets"].items() + if self.apathto(name, target) is not None + and target["downloadable"] + ], ) @property @@ -138,6 +154,38 @@ def vpathto(self, other_version_name): other_outputdir, "{}.html".format(self.context["pagename"]) ) + def apathto(self, build_target_name, build_target): + """Find the path to the artefact identified by build_target_name + and build_target. + """ + current_version = self.metadata[self.current_version_name] + current_outputroot = os.path.abspath(current_version["outputdir"]) + artefact_dir = posixpath.join(current_outputroot, "artefacts") + current_outputdir = posixpath.dirname( + posixpath.join(current_outputroot, self.context["pagename"]) + ) + + filename = "{project}_docs-{version}".format( + project=self.app.config.project.replace(" ", ""), + version=self.current_version_name.replace("/", "-"), + ) + + if build_target["download_format"] in ARCHIVE_TYPES: + filename = "{f}-{build_name}.{extension}".format( + f=filename, + build_name=build_target_name, + extension=build_target["download_format"], + ) + else: + filename = "{f}.{extension}".format( + f=filename, + extension=build_target["download_format"], + ) + artefact_path = posixpath.relpath( + posixpath.join(artefact_dir, filename), start=current_outputdir + ) + return artefact_path + def html_page_context(app, pagename, templatename, context, doctree): versioninfo = VersionInfo( @@ -210,6 +258,12 @@ def setup(app): app.add_config_value( "smv_outputdir_format", DEFAULT_OUTPUTDIR_FORMAT, "html" ) + app.add_config_value("smv_build_targets", DEFAULT_BUILD_TARGETS, "html") + app.add_config_value( + "smv_clean_intermediate_files", + DEFAULT_CLEAN_INTERMEDIATE_FILES_FLAG, + "html", + ) app.connect("config-inited", config_inited) return { diff --git a/tests/test_sphinx.py b/tests/test_sphinx.py index db38cb07..cc2f151d 100644 --- a/tests/test_sphinx.py +++ b/tests/test_sphinx.py @@ -2,16 +2,21 @@ import posixpath import tempfile import unittest +from unittest.mock import Mock import sphinx_multiversion +mock = Mock() +myapp = mock.config +myapp.config.project = "example" + class VersionInfoTestCase(unittest.TestCase): def setUp(self): root = tempfile.gettempdir() self.versioninfo = sphinx_multiversion.sphinx.VersionInfo( - app=None, + app=myapp, context={"pagename": "testpage"}, metadata={ "master": { @@ -26,6 +31,18 @@ def setUp(self): "outputdir": os.path.join(root, "build", "html", "master"), "confdir": os.path.join(root, "master", "docs"), "docnames": ["testpage", "appendix/faq"], + "build_targets": { + "HTML": { + "builder": "html", + "downloadable": True, + "download_format": "zip", + }, + "PDF": { + "builder": "latexpdf", + "downloadable": False, + "download_format": "pdf", + }, + }, }, "v0.1.0": { "name": "v0.1.0", @@ -39,6 +56,13 @@ def setUp(self): "outputdir": os.path.join(root, "build", "html", "v0.1.0"), "confdir": os.path.join(root, "v0.1.0", "docs"), "docnames": ["old_testpage", "appendix/faq"], + "build_targets": { + "HTML": { + "builder": "html", + "downloadable": True, + "download_format": "zip", + }, + }, }, "branch-with/slash": { "name": "branch-with/slash", @@ -56,6 +80,13 @@ def setUp(self): ), "confdir": os.path.join(root, "branch-with/slash", "docs"), "docnames": ["testpage"], + "build_targets": { + "HTML": { + "builder": "html", + "downloadable": True, + "download_format": "zip", + }, + }, }, }, current_version_name="master", @@ -83,6 +114,16 @@ def test_in_development_property(self): ["master", "branch-with/slash"], ) + def test_artefacts_only_available_if_downloadable(self): + versions = self.versioninfo.branches + master_branch = [version for version in versions if version.name == "master"][0] + + # Only HTML should be in the artefact list because PDF has downloadable = False + self.assertEqual( + [artefact["name"] for artefact in master_branch.artefacts], + ["HTML"] + ) + def test_vhasdoc(self): self.assertTrue(self.versioninfo.vhasdoc("master")) self.assertFalse(self.versioninfo.vhasdoc("v0.1.0")) @@ -114,3 +155,63 @@ def test_vpathto(self): self.versioninfo.vpathto("branch-with/slash"), posixpath.join("..", "..", "branch-with/slash", "index.html"), ) + + def test_apathto(self): + build_targets = { + "HTML": { + "builder": "html", + "downloadable": True, + "download_format": "zip", + }, + "PDF": { + "builder": "latexpdf", + "downloadable": True, + "download_format": "pdf", + }, + } + self.assertEqual( + self.versioninfo.apathto("HTML", build_targets["HTML"]), + posixpath.join("artefacts", "example_docs-master-HTML.zip"), + ) + self.assertEqual( + self.versioninfo.apathto("PDF", build_targets["PDF"]), + posixpath.join("artefacts", "example_docs-master.pdf"), + ) + + self.versioninfo.context["pagename"] = "appendix/faq" + self.assertEqual( + self.versioninfo.apathto("PDF", build_targets["PDF"]), + posixpath.join("..", "artefacts", "example_docs-master.pdf"), + ) + + self.versioninfo.context["pagename"] = "testpage" + self.versioninfo.current_version_name = "branch-with/slash" + self.assertEqual( + self.versioninfo.apathto("PDF", build_targets["PDF"]), + posixpath.join("artefacts", "example_docs-branch-with-slash.pdf"), + ) + self.assertEqual( + self.versioninfo.apathto("HTML", build_targets["HTML"]), + posixpath.join( + "artefacts", "example_docs-branch-with-slash-HTML.zip" + ), + ) + + self.versioninfo.app.config.project = ( + "Project Name with Spaces and VaRiAbLe case" + ) + self.versioninfo.current_version_name = "master" + self.assertEqual( + self.versioninfo.apathto("HTML", build_targets["HTML"]), + posixpath.join( + "artefacts", + "ProjectNamewithSpacesandVaRiAbLecase_docs-master-HTML.zip", + ), + ) + self.assertEqual( + self.versioninfo.apathto("PDF", build_targets["PDF"]), + posixpath.join( + "artefacts", + "ProjectNamewithSpacesandVaRiAbLecase_docs-master.pdf", + ), + )