From a76c0ebda5a6578ec3df9617cf80b18ffe8c8a8a Mon Sep 17 00:00:00 2001 From: Will Toohey Date: Tue, 15 Oct 2024 09:43:49 +1000 Subject: [PATCH] Add `smbclient.liststreams` to enumerate ADS streams --- src/smbclient/__init__.py | 1 + src/smbclient/_os.py | 46 ++++++++++++++++++++++++++++++++++++++ tests/test_smbclient_os.py | 5 +---- 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/smbclient/__init__.py b/src/smbclient/__init__.py index 0429024..a6d61d2 100644 --- a/src/smbclient/__init__.py +++ b/src/smbclient/__init__.py @@ -14,6 +14,7 @@ getxattr, link, listdir, + liststreams, listxattr, lstat, makedirs, diff --git a/src/smbclient/_os.py b/src/smbclient/_os.py index 88917c6..bcdece3 100644 --- a/src/smbclient/_os.py +++ b/src/smbclient/_os.py @@ -1,6 +1,8 @@ # Copyright: (c) 2019, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) +from __future__ import annotations + import collections import datetime import errno @@ -37,6 +39,7 @@ FileLinkInformation, FileRenameInformation, FileStandardInformation, + FileStreamInformation, ) from smbprotocol.header import NtStatus from smbprotocol.ioctl import ( @@ -102,6 +105,12 @@ ) +class SMBFileStreamInformation(t.NamedTuple): + name: str + size: int + allocation_size: int + + def is_remote_path(path: str) -> bool: """ Returns True iff the given path is a remote SMB path (rather than a local path). @@ -1019,6 +1028,43 @@ def setxattr(path, attribute, value, flags=0, follow_symlinks=True, **kwargs): set_info(transaction, ea_info) +def liststreams(path: str, follow_symlinks=True, **kwargs: t.Any) -> list[SMBFileStreamInformation]: + """ + Return a list of the alternative data streams on a path. Listed streams can + be opened by appending their name to the original path. An example call for + a file with a single extra stream may return: + + ``` + [ + SMBFileStreamInformation(name=':extra_stream:$DATA', size=8, allocation_size=8), + SMBFileStreamInformation(name='::$DATA', size=103472, allocation_size=131072), + ] + ``` + + :param path: The full UNC path to the file to get the list of streams for. + :param follow_symlinks: Whether to follow the symlink at path if encountered. + :param kwargs: Common SMB Session arguments for smbclient. + :return: List of streams on the file with each entry being a string. + """ + + raw = SMBRawIO( + path, + desired_access=FilePipePrinterAccessMask.FILE_READ_ATTRIBUTES, + create_options=0 if follow_symlinks else CreateOptions.FILE_OPEN_REPARSE_POINT, + **kwargs, + ) + + with SMBFileTransaction(raw) as transaction: + query_info(transaction, FileStreamInformation, output_buffer_length=MAX_PAYLOAD_SIZE) + + return [ + SMBFileStreamInformation( + s["stream_name"].get_value(), s["stream_size"].get_value(), s["stream_allocation_size"].get_value() + ) + for s in transaction.results[0] + ] + + def _delete(raw_type, path, **kwargs): # Ensures we delete the symlink (if present) and don't follow it down. co = CreateOptions.FILE_OPEN_REPARSE_POINT diff --git a/tests/test_smbclient_os.py b/tests/test_smbclient_os.py index fa661c7..2fc04b8 100644 --- a/tests/test_smbclient_os.py +++ b/tests/test_smbclient_os.py @@ -834,10 +834,7 @@ def test_open_file_with_ads(smb_share): assert smbclient.listdir(smb_share) == ["file.txt"] - with smbclient.open_file(filename, buffering=0, mode="rb") as fd, SMBFileTransaction(fd) as trans: - query_info(trans, FileStreamInformation, output_buffer_length=1024) - - actual = sorted([s["stream_name"].get_value() for s in trans.results[0]]) + actual = sorted([s.name for s in smbclient.liststreams(filename)]) assert actual == ["::$DATA", ":ads:$DATA"]