diff --git a/docs/api.rst b/docs/api.rst index 17347d93..86137e63 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -4,3 +4,7 @@ API .. automodule:: filelock :members: :show-inheritance: + +.. automodule:: filelock.read_write + :members: + :show-inheritance: diff --git a/docs/index.rst b/docs/index.rst index 7995f760..27d410f3 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -191,8 +191,19 @@ cases. Asyncio support --------------- -This library currently does not support asyncio. We'd recommend adding an asyncio variant though if someone can make a -pull request for it, `see here `_. +This library supports asyncio. See :class:`AsyncFileLock `. + +Read/write FileLock +------------------- + +An implementation of a read/write FileLock is also available: and :class:`ReadWriteFileLockWrapper ` and +:class:`AsyncReadWriteFileLockWrapper `. + +Multiple readers can hold the lock at the same time, but a writer is guaranteed to hold the lock exclusively across both readers and writers. + +The lock is writer-preferring on a best effort basis (there are no guarantees). + +Currently, this FileLock type is implemented only for Unix. FileLocks and threads --------------------- diff --git a/src/filelock/__init__.py b/src/filelock/__init__.py index c9d8c5b8..9602636f 100644 --- a/src/filelock/__init__.py +++ b/src/filelock/__init__.py @@ -12,13 +12,14 @@ import warnings from typing import TYPE_CHECKING -from ._api import AcquireReturnProxy, BaseFileLock +from ._api import AcquireReturnProxy, BaseFileLock, LockProtocol from ._error import Timeout from ._soft import SoftFileLock from ._unix import UnixFileLock, has_fcntl from ._windows import WindowsFileLock from .asyncio import ( AsyncAcquireReturnProxy, + AsyncLockProtocol, AsyncSoftFileLock, AsyncUnixFileLock, AsyncWindowsFileLock, @@ -56,12 +57,14 @@ "AcquireReturnProxy", "AsyncAcquireReturnProxy", "AsyncFileLock", + "AsyncLockProtocol", "AsyncSoftFileLock", "AsyncUnixFileLock", "AsyncWindowsFileLock", "BaseAsyncFileLock", "BaseFileLock", "FileLock", + "LockProtocol", "SoftFileLock", "Timeout", "UnixFileLock", diff --git a/src/filelock/_api.py b/src/filelock/_api.py index 8fde69a0..16c7d221 100644 --- a/src/filelock/_api.py +++ b/src/filelock/_api.py @@ -9,7 +9,7 @@ from abc import ABCMeta, abstractmethod from dataclasses import dataclass from threading import local -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, Any, Protocol, cast from weakref import WeakValueDictionary from ._error import Timeout @@ -26,6 +26,26 @@ _LOGGER = logging.getLogger("filelock") +DEFAULT_POLL_INTERVAL = 0.05 + + +class LockProtocol(Protocol): + """Protocol for objects implementing ``acquire`` and ``release`` methods.""" + + @abstractmethod + def acquire( + self, + timeout: float | None = None, + poll_interval: float = DEFAULT_POLL_INTERVAL, + *, + poll_intervall: float | None = None, + blocking: bool | None = None, + ) -> AcquireReturnProxy: ... + + @abstractmethod + def release(self, force: bool = False) -> None: # noqa: FBT001, FBT002 + ... + # This is a helper class which is returned by :meth:`BaseFileLock.acquire` and wraps the lock to make sure __enter__ # is not called twice when entering the with statement. If we would simply return *self*, the lock would be acquired @@ -33,10 +53,10 @@ class AcquireReturnProxy: """A context-aware object that will release the lock file when exiting.""" - def __init__(self, lock: BaseFileLock) -> None: + def __init__(self, lock: LockProtocol) -> None: self.lock = lock - def __enter__(self) -> BaseFileLock: + def __enter__(self) -> LockProtocol: return self.lock def __exit__( @@ -271,7 +291,7 @@ def lock_counter(self) -> int: def acquire( self, timeout: float | None = None, - poll_interval: float = 0.05, + poll_interval: float = DEFAULT_POLL_INTERVAL, *, poll_intervall: float | None = None, blocking: bool | None = None, diff --git a/src/filelock/_unix.py b/src/filelock/_unix.py index 04f590da..69d960ce 100644 --- a/src/filelock/_unix.py +++ b/src/filelock/_unix.py @@ -23,6 +23,10 @@ def _acquire(self) -> None: def _release(self) -> None: raise NotImplementedError + class NonExclusiveUnixFileLock(UnixFileLock): + """Uses the :func:`fcntl.flock` to non-exclusively lock the lock file on unix systems.""" + + else: # pragma: win32 no cover try: import fcntl @@ -34,6 +38,8 @@ def _release(self) -> None: class UnixFileLock(BaseFileLock): """Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems.""" + _fcntl_mode: int = fcntl.LOCK_EX + def _acquire(self) -> None: ensure_directory_exists(self.lock_file) open_flags = os.O_RDWR | os.O_TRUNC @@ -43,7 +49,7 @@ def _acquire(self) -> None: with suppress(PermissionError): # This locked is not owned by this UID os.fchmod(fd, self._context.mode) try: - fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + fcntl.flock(fd, self._fcntl_mode | fcntl.LOCK_NB) except OSError as exception: os.close(fd) if exception.errno == ENOSYS: # NotImplemented error @@ -61,8 +67,14 @@ def _release(self) -> None: fcntl.flock(fd, fcntl.LOCK_UN) os.close(fd) + class NonExclusiveUnixFileLock(UnixFileLock): + """Uses the :func:`fcntl.flock` to non-exclusively lock the lock file on unix systems.""" + + _fcntl_mode = fcntl.LOCK_SH + __all__ = [ + "NonExclusiveUnixFileLock", "UnixFileLock", "has_fcntl", ] diff --git a/src/filelock/asyncio.py b/src/filelock/asyncio.py index 252de203..c1c47f1f 100644 --- a/src/filelock/asyncio.py +++ b/src/filelock/asyncio.py @@ -7,14 +7,15 @@ import logging import os import time +from abc import abstractmethod from dataclasses import dataclass from threading import local -from typing import TYPE_CHECKING, Any, Callable, NoReturn, cast +from typing import TYPE_CHECKING, Any, Callable, NoReturn, Protocol, cast -from ._api import BaseFileLock, FileLockContext, FileLockMeta +from ._api import DEFAULT_POLL_INTERVAL, BaseFileLock, FileLockContext, FileLockMeta from ._error import Timeout from ._soft import SoftFileLock -from ._unix import UnixFileLock +from ._unix import NonExclusiveUnixFileLock, UnixFileLock from ._windows import WindowsFileLock if TYPE_CHECKING: @@ -31,6 +32,23 @@ _LOGGER = logging.getLogger("filelock") +class AsyncLockProtocol(Protocol): + """Protocol for async objects implementing ``acquire`` and ``release`` methods.""" + + @abstractmethod + async def acquire( + self, + timeout: float | None = None, + poll_interval: float = DEFAULT_POLL_INTERVAL, + *, + blocking: bool | None = None, + ) -> AsyncAcquireReturnProxy: ... + + @abstractmethod + async def release(self, force: bool = False) -> None: # noqa: FBT001, FBT002 + ... + + @dataclass class AsyncFileLockContext(FileLockContext): """A dataclass which holds the context for a ``BaseAsyncFileLock`` object.""" @@ -52,10 +70,10 @@ class AsyncThreadLocalFileContext(AsyncFileLockContext, local): class AsyncAcquireReturnProxy: """A context-aware object that will release the lock file when exiting.""" - def __init__(self, lock: BaseAsyncFileLock) -> None: # noqa: D107 + def __init__(self, lock: AsyncLockProtocol) -> None: # noqa: D107 self.lock = lock - async def __aenter__(self) -> BaseAsyncFileLock: # noqa: D105 + async def __aenter__(self) -> AsyncLockProtocol: # noqa: D105 return self.lock async def __aexit__( # noqa: D105 @@ -180,7 +198,7 @@ def loop(self) -> asyncio.AbstractEventLoop | None: async def acquire( # type: ignore[override] self, timeout: float | None = None, - poll_interval: float = 0.05, + poll_interval: float = DEFAULT_POLL_INTERVAL, *, blocking: bool | None = None, ) -> AsyncAcquireReturnProxy: @@ -329,12 +347,17 @@ class AsyncUnixFileLock(UnixFileLock, BaseAsyncFileLock): """Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems.""" +class AsyncNonExclusiveUnixFileLock(NonExclusiveUnixFileLock, BaseAsyncFileLock): + """Uses the :func:`fcntl.flock` to non-exclusively lock the lock file on unix systems.""" + + class AsyncWindowsFileLock(WindowsFileLock, BaseAsyncFileLock): """Uses the :func:`msvcrt.locking` to hard lock the lock file on windows systems.""" __all__ = [ "AsyncAcquireReturnProxy", + "AsyncNonExclusiveUnixFileLock", "AsyncSoftFileLock", "AsyncUnixFileLock", "AsyncWindowsFileLock", diff --git a/src/filelock/read_write/__init__.py b/src/filelock/read_write/__init__.py new file mode 100644 index 00000000..603a40a9 --- /dev/null +++ b/src/filelock/read_write/__init__.py @@ -0,0 +1,64 @@ +"""Read/write file lock.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from filelock._unix import NonExclusiveUnixFileLock, UnixFileLock, has_fcntl +from filelock.read_write._api import BaseReadWriteFileLock, ReadWriteMode, _DisabledReadWriteFileLock +from filelock.read_write._wrapper import BaseReadWriteFileLockWrapper, _DisabledReadWriteFileLockWrapper +from filelock.read_write.asyncio import ( + AsyncReadWriteFileLock, + AsyncReadWriteFileLockWrapper, + BaseAsyncReadWriteFileLock, + BaseAsyncReadWriteFileLockWrapper, + UnixAsyncReadWriteFileLock, + UnixAsyncReadWriteFileLockWrapper, +) + +if TYPE_CHECKING: + from filelock._api import BaseFileLock + +ReadWriteFileLock: type[BaseReadWriteFileLock] +ReadWriteFileLockWrapper: type[BaseReadWriteFileLockWrapper] + + +class UnixReadWriteFileLock(BaseReadWriteFileLock): + """Unix implementation of a read/write FileLock.""" + + _shared_file_lock_cls: type[BaseFileLock] = NonExclusiveUnixFileLock + _exclusive_file_lock_cls: type[BaseFileLock] = UnixFileLock + + +class UnixReadWriteFileLockWrapper(BaseReadWriteFileLockWrapper): + """Wrapper for a Unix implementation of a read/write FileLock.""" + + _read_write_file_lock_cls = UnixReadWriteFileLock + + +if has_fcntl: # pragma: win32 no cover + ReadWriteFileLock = UnixReadWriteFileLock + ReadWriteFileLockWrapper = UnixReadWriteFileLockWrapper + has_read_write_file_lock = True +else: # pragma: win32 cover + ReadWriteFileLock = _DisabledReadWriteFileLock + ReadWriteFileLockWrapper = _DisabledReadWriteFileLockWrapper + has_read_write_file_lock = False + + +__all__ = [ + "AsyncReadWriteFileLock", + "AsyncReadWriteFileLockWrapper", + "BaseAsyncReadWriteFileLock", + "BaseAsyncReadWriteFileLockWrapper", + "BaseReadWriteFileLock", + "BaseReadWriteFileLockWrapper", + "ReadWriteFileLock", + "ReadWriteFileLockWrapper", + "ReadWriteMode", + "UnixAsyncReadWriteFileLock", + "UnixAsyncReadWriteFileLockWrapper", + "UnixReadWriteFileLock", + "UnixReadWriteFileLockWrapper", + "has_read_write_file_lock", +] diff --git a/src/filelock/read_write/_api.py b/src/filelock/read_write/_api.py new file mode 100644 index 00000000..0c1d3ce9 --- /dev/null +++ b/src/filelock/read_write/_api.py @@ -0,0 +1,277 @@ +from __future__ import annotations + +import contextlib +import time +from abc import ABC +from enum import Enum +from pathlib import Path +from typing import TYPE_CHECKING + +from filelock._api import DEFAULT_POLL_INTERVAL, AcquireReturnProxy, BaseFileLock, LockProtocol + +if TYPE_CHECKING: + import os + import sys + from types import TracebackType + + if sys.version_info >= (3, 11): # pragma: no cover (py311+) + from typing import Self + else: # pragma: no cover ( None: + """ + Create a new writer-preferring read/write lock object. Multiple READers can hold the lock + at the same time, but a WRITEr is guaranteed to hold the lock exclusively across both + readers and writers. + + This object will use two lock files to ensure writers have priority over readers. + + :param read_write_mode: whether this object should be in WRITE mode or READ mode. + :param lock_file: path to the file. Note that two files will be created: \ + ``{lock_file}.inner`` and ``{lock_file}.outer``. \ + If not specified, ``lock_file_inner`` and ``lock_file_outer`` must both be specified. + :param timeout: default timeout when acquiring the lock, in seconds. It will be used as fallback value in \ + the acquire method, if no timeout value (``None``) is given. If you want to disable the timeout, set it \ + to a negative value. A timeout of 0 means that there is exactly one attempt to acquire the file lock. + :param mode: file permissions for the lockfile + :param thread_local: Whether this object's internal context should be thread local or not. If this is set to \ + ``False`` then the lock will be reentrant across threads. Note that misuse of the lock while this argument \ + is set to ``False`` may result in deadlocks due to the non-exclusive nature of the read/write lock. + :param blocking: whether the lock should be blocking or not + :param lock_file_inner: path to the inner lock file. Can be left unspecified if ``lock_file`` is specified. + :param lock_file_outer: path to the outer lock file Can be left unspecified if ``lock_file`` is specified. + + """ + if read_write_mode == ReadWriteMode.READ: + file_lock_cls = self._shared_file_lock_cls + else: + file_lock_cls = self._exclusive_file_lock_cls + self.read_write_mode = read_write_mode + + if not lock_file_inner: + if not lock_file: + msg = "If lock_file is unspecified, both lock_file_inner and lock_file_outer must be specified." + raise ValueError(msg) + lock_file_inner = Path(lock_file).with_suffix(".inner") + if not lock_file_outer: + if not lock_file: + msg = "If lock_file is unspecified, both lock_file_inner and lock_file_outer must be specified." + raise ValueError(msg) + lock_file_outer = Path(lock_file).with_suffix(".outer") + + # is_singleton is always disabled, as I don't believe it will work + # correctly with this setup. + self._inner_lock = file_lock_cls( + lock_file_inner, + timeout=timeout, + mode=mode, + thread_local=thread_local, + blocking=blocking, + is_singleton=False, + ) + self._outer_lock = file_lock_cls( + lock_file_outer, + timeout=timeout, + mode=mode, + thread_local=thread_local, + blocking=blocking, + is_singleton=False, + ) + + def is_thread_local(self) -> bool: + """:return: a flag indicating if this lock is thread local or not""" + return self._inner_lock.is_thread_local() + + @property + def is_singleton(self) -> bool: + """:return: a flag indicating if this lock is singleton or not""" + return self._inner_lock.is_singleton + + @property + def lock_file_inner(self) -> str: + """:return: path to the lock file""" + return self._inner_lock.lock_file + + @property + def lock_file_outer(self) -> str: + """:return: path to the lock file""" + return self._outer_lock.lock_file + + @property + def timeout(self) -> float: + """:return: the default timeout value, in seconds""" + return self._inner_lock.timeout + + @timeout.setter + def timeout(self, value: float | str) -> None: + """ + Change the default timeout value. + + :param value: the new value, in seconds + + """ + self._inner_lock.timeout = float(value) + self._outer_lock.timeout = float(value) + + @property + def blocking(self) -> bool: + """:return: whether the locking is blocking or not""" + return self._inner_lock.blocking + + @blocking.setter + def blocking(self, value: bool) -> None: + """ + Change the default blocking value. + + :param value: the new value as bool + + """ + self._inner_lock.blocking = value + self._outer_lock.blocking = value + + @property + def mode(self) -> int: + """:return: the file permissions for the lockfile""" + return self._inner_lock.mode + + @property + def is_locked(self) -> bool: + """:return: A boolean indicating if the lock file is holding the lock currently.""" + return self._inner_lock.is_locked + + @property + def lock_counter(self) -> int: + """:return: The number of times this lock has been acquired (but not yet released).""" + return self._inner_lock.lock_counter + self._outer_lock.lock_counter + + def acquire( + self, + timeout: float | None = None, + poll_interval: float = DEFAULT_POLL_INTERVAL, + *, + poll_intervall: float | None = None, + blocking: bool | None = None, + ) -> AcquireReturnProxy: + """ + Try to acquire the file lock. + + :param timeout: maximum wait time for acquiring the lock, ``None`` means use the default :attr:`~timeout` is and + if ``timeout < 0``, there is no timeout and this method will block until the lock could be acquired + :param poll_interval: interval of trying to acquire the lock file + :param poll_intervall: deprecated, kept for backwards compatibility, use ``poll_interval`` instead + :param blocking: defaults to True. If False, function will return immediately if it cannot obtain a lock on the + first attempt. Otherwise, this method will block until the timeout expires or the lock is acquired. + :raises Timeout: if fails to acquire lock within the timeout period + :return: a context object that will unlock the file when the context is exited + + .. code-block:: python + + # You can use this method in the context manager (recommended) + with lock.acquire(): + pass + + # Or use an equivalent try-finally construct: + lock.acquire() + try: + pass + finally: + lock.release() + + """ + start_time = time.monotonic() + self._outer_lock.acquire( + timeout=timeout, poll_interval=poll_interval, poll_intervall=poll_intervall, blocking=blocking + ) + dur = time.monotonic() - start_time + if timeout: + timeout -= dur + if self.read_write_mode == ReadWriteMode.READ: + try: + self._inner_lock.acquire( + timeout=timeout, poll_interval=poll_interval, poll_intervall=poll_intervall, blocking=blocking + ) + finally: + self._outer_lock.release() + else: + self._inner_lock.acquire( + timeout=timeout, poll_interval=poll_interval, poll_intervall=poll_intervall, blocking=blocking + ) + return AcquireReturnProxy(lock=self) + + def release(self, force: bool = False) -> None: # noqa: FBT001, FBT002 + """ + Releases the file lock. Please note, that the lock is only completely released, if the lock counter is 0. + Also note, that the lock file itself is not automatically deleted. + + :param force: If true, the lock counter is ignored and the lock is released in every case/ + + """ + self._inner_lock.release(force=force) + if self.read_write_mode == ReadWriteMode.WRITE: + self._outer_lock.release(force=force) + + def __enter__(self) -> Self: + """ + Acquire the lock. + + :return: the lock object + + """ + self.acquire() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + """ + Release the lock. + + :param exc_type: the exception type if raised + :param exc_value: the exception value if raised + :param traceback: the exception traceback if raised + + """ + self.release() + + def __del__(self) -> None: + """Called when the lock object is deleted.""" + self.release(force=True) + + +class _DisabledReadWriteFileLock(BaseReadWriteFileLock): + def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] # noqa: ANN002, ANN003 + msg = "ReadWriteFileLock is unavailable." + raise NotImplementedError(msg) + + +__all__ = [ + "BaseReadWriteFileLock", + "ReadWriteMode", +] diff --git a/src/filelock/read_write/_wrapper.py b/src/filelock/read_write/_wrapper.py new file mode 100644 index 00000000..6b358b1b --- /dev/null +++ b/src/filelock/read_write/_wrapper.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +from abc import ABC +from typing import TYPE_CHECKING + +from ._api import BaseReadWriteFileLock, ReadWriteMode + +if TYPE_CHECKING: + import os + + +class BaseReadWriteFileLockWrapper(ABC): + """ + Convenience wrapper for read/write locks. + + Provides `.read()` and `.write()` methods to easily access a read or write lock. + + .. code-block:: python + + # Acquire a non-exclusive reader lock + with lock.read(): + pass + + # Acquire an exclusive writer lock + with lock.write(): + pass + """ + + _read_write_file_lock_cls: type[BaseReadWriteFileLock] + + def __init__( # noqa: PLR0913 + self, + lock_file: str | os.PathLike[str] | None = None, + timeout: float = -1, + mode: int = 0o644, + thread_local: bool = True, # noqa: FBT001, FBT002 + *, + blocking: bool = True, + lock_file_inner: str | os.PathLike[str] | None = None, + lock_file_outer: str | os.PathLike[str] | None = None, + ) -> None: + """ + Convenience wrapper for read/write locks. + + See ReadWriteFileLock for description of the parameters. + """ + self.read_lock = self._read_write_file_lock_cls( + lock_file=lock_file, + lock_file_inner=lock_file_inner, + lock_file_outer=lock_file_outer, + read_write_mode=ReadWriteMode.READ, + timeout=timeout, + mode=mode, + thread_local=thread_local, + blocking=blocking, + ) + self.write_lock = self._read_write_file_lock_cls( + lock_file=lock_file, + lock_file_inner=lock_file_inner, + lock_file_outer=lock_file_outer, + read_write_mode=ReadWriteMode.WRITE, + timeout=timeout, + mode=mode, + thread_local=thread_local, + blocking=blocking, + ) + + def __call__(self, read_write_mode: ReadWriteMode) -> BaseReadWriteFileLock: + """ + Get read/write lock object with the specified ``read_write_mode``. + + :param read_write_mode: whether this object should be in WRITE mode or READ mode. + :return: a lock object in specified ``read_write_mode``. + """ + if read_write_mode == ReadWriteMode.READ: + return self.read_lock + return self.write_lock + + def read(self) -> BaseReadWriteFileLock: + """ + Get read/write lock object in READ mode. + + :return: a lock object in READ mode. + """ + return self(ReadWriteMode.READ) + + def write(self) -> BaseReadWriteFileLock: + """ + Get read/write lock object in WRITE mode. + + :return: a lock object in WRITE mode. + """ + return self(ReadWriteMode.WRITE) + + +class _DisabledReadWriteFileLockWrapper(BaseReadWriteFileLockWrapper): + def __init__( # noqa: PLR0913 + self, + lock_file: str | os.PathLike[str] | None = None, + timeout: float = -1, + mode: int = 0o644, + thread_local: bool = True, # noqa: FBT001, FBT002 + *, + blocking: bool = True, + lock_file_inner: str | os.PathLike[str] | None = None, + lock_file_outer: str | os.PathLike[str] | None = None, + ) -> None: + msg = "ReadWriteFileLock is unavailable." + raise NotImplementedError(msg) + + +__all__ = [ + "BaseReadWriteFileLockWrapper", +] diff --git a/src/filelock/read_write/asyncio/__init__.py b/src/filelock/read_write/asyncio/__init__.py new file mode 100644 index 00000000..14253528 --- /dev/null +++ b/src/filelock/read_write/asyncio/__init__.py @@ -0,0 +1,51 @@ +"""Async read/write file lock.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from filelock._unix import has_fcntl +from filelock.asyncio import AsyncNonExclusiveUnixFileLock, AsyncUnixFileLock +from filelock.read_write.asyncio._api import BaseAsyncReadWriteFileLock, _DisabledAsyncReadWriteFileLock +from filelock.read_write.asyncio._wrapper import ( + BaseAsyncReadWriteFileLockWrapper, + _DisabledAsyncReadWriteFileLockWrapper, +) + +if TYPE_CHECKING: + from filelock.asyncio import BaseAsyncFileLock + + +AsyncReadWriteFileLock: type[BaseAsyncReadWriteFileLock] +AsyncReadWriteFileLockWrapper: type[BaseAsyncReadWriteFileLockWrapper] + + +class UnixAsyncReadWriteFileLock(BaseAsyncReadWriteFileLock): + """Unix implementation of an async read/write FileLock.""" + + _shared_file_lock_cls: type[BaseAsyncFileLock] = AsyncNonExclusiveUnixFileLock + _exclusive_file_lock_cls: type[BaseAsyncFileLock] = AsyncUnixFileLock + + +class UnixAsyncReadWriteFileLockWrapper(BaseAsyncReadWriteFileLockWrapper): + """Wrapper for a Unix implementation of an async read/write FileLock.""" + + _read_write_file_lock_cls = UnixAsyncReadWriteFileLock + + +if has_fcntl: # pragma: win32 no cover + AsyncReadWriteFileLock = UnixAsyncReadWriteFileLock + AsyncReadWriteFileLockWrapper = UnixAsyncReadWriteFileLockWrapper +else: # pragma: win32 cover + AsyncReadWriteFileLock = _DisabledAsyncReadWriteFileLock + AsyncReadWriteFileLockWrapper = _DisabledAsyncReadWriteFileLockWrapper + + +__all__ = [ + "AsyncReadWriteFileLock", + "AsyncReadWriteFileLockWrapper", + "BaseAsyncReadWriteFileLock", + "BaseAsyncReadWriteFileLockWrapper", + "UnixAsyncReadWriteFileLock", + "UnixAsyncReadWriteFileLockWrapper", +] diff --git a/src/filelock/read_write/asyncio/_api.py b/src/filelock/read_write/asyncio/_api.py new file mode 100644 index 00000000..1148d965 --- /dev/null +++ b/src/filelock/read_write/asyncio/_api.py @@ -0,0 +1,257 @@ +from __future__ import annotations + +import asyncio +import contextlib +import time +from pathlib import Path +from typing import TYPE_CHECKING, NoReturn, cast + +from filelock._api import DEFAULT_POLL_INTERVAL +from filelock.asyncio import AsyncAcquireReturnProxy, AsyncLockProtocol, BaseAsyncFileLock +from filelock.read_write._api import BaseReadWriteFileLock, ReadWriteMode + +if TYPE_CHECKING: + import os + import sys + from concurrent import futures + from types import TracebackType + + if sys.version_info >= (3, 11): # pragma: no cover (py311+) + from typing import Self + else: # pragma: no cover ( None: + """ + Create a new async writer-preferring read/write lock object. Multiple READers can hold the lock + at the same time, but a WRITEr is guaranteed to hold the lock exclusively across both + readers and writers. + + This object will use two lock files to ensure writers have priority over readers. + + Note that this lock is always thread-local, to allow for non-exclusive access. + + :param read_write_mode: whether this object should be in WRITE mode or READ mode. + :param lock_file: path to the file. Note that two files will be created: \ + ``{lock_file}.inner`` and ``{lock_file}.outer``. \ + If not specified, ``lock_file_inner`` and ``lock_file_outer`` must both be specified. + :param timeout: default timeout when acquiring the lock, in seconds. It will be used as fallback value in \ + the acquire method, if no timeout value (``None``) is given. If you want to disable the timeout, set it \ + to a negative value. A timeout of 0 means that there is exactly one attempt to acquire the file lock. + :param mode: file permissions for the lockfile + :param thread_local: Whether this object's internal context should be thread local or not. If this is set to \ + ``False`` then the lock will be reentrant across threads. Note that misuse of the lock while this argument \ + is set to ``False`` and run_in_executor is ``False`` may result in deadlocks due to the non-exclusive \ + nature of the read/write lock. + :param blocking: whether the lock should be blocking or not + :param lock_file_inner: path to the inner lock file. Can be left unspecified if ``lock_file`` is specified. + :param lock_file_outer: path to the outer lock file Can be left unspecified if ``lock_file`` is specified. + :param loop: The event loop to use. If not specified, the running event loop will be used. + :param run_in_executor: If this is set to ``True`` then the lock will be acquired in an executor. + :param executor: The executor to use. If not specified, the default executor will be used. + """ + if read_write_mode == ReadWriteMode.READ: + file_lock_cls = self._shared_file_lock_cls + else: + file_lock_cls = self._exclusive_file_lock_cls + self.read_write_mode = read_write_mode + + if not lock_file_inner: + if not lock_file: + msg = "If lock_file is unspecified, both lock_file_inner and lock_file_outer must be specified." + raise ValueError(msg) + lock_file_inner = Path(lock_file).with_suffix(".inner") + if not lock_file_outer: + if not lock_file: + msg = "If lock_file is unspecified, both lock_file_inner and lock_file_outer must be specified." + raise ValueError(msg) + lock_file_outer = Path(lock_file).with_suffix(".outer") + + self._inner_lock = file_lock_cls( + lock_file_inner, + timeout=timeout, + mode=mode, + thread_local=thread_local, + blocking=blocking, + is_singleton=False, + loop=loop, + run_in_executor=run_in_executor, + executor=executor, + ) + + self._outer_lock = file_lock_cls( + lock_file_outer, + timeout=timeout, + mode=mode, + thread_local=thread_local, + blocking=blocking, + is_singleton=False, + loop=loop, + run_in_executor=run_in_executor, + executor=executor, + ) + + async def acquire( # type: ignore[override] + self, + timeout: float | None = None, + poll_interval: float = DEFAULT_POLL_INTERVAL, + *, + blocking: bool | None = None, + ) -> AsyncAcquireReturnProxy: + """ + Try to acquire the file lock. + + :param timeout: maximum wait time for acquiring the lock, ``None`` means use the default + :attr:`filelock.BaseFileLock.timeout` is and if ``timeout < 0``, there is no timeout and + this method will block until the lock could be acquired + :param poll_interval: interval of trying to acquire the lock file + :param blocking: defaults to True. If False, function will return immediately if it cannot obtain a lock on the + first attempt. Otherwise, this method will block until the timeout expires or the lock is acquired. + :raises Timeout: if fails to acquire lock within the timeout period + :return: a context object that will unlock the file when the context is exited + + .. code-block:: python + + # You can use this method in the context manager (recommended) + with lock.acquire(): + pass + + # Or use an equivalent try-finally construct: + lock.acquire() + try: + pass + finally: + lock.release() + + """ + start_time = time.monotonic() + inner_lock = cast("AsyncLockProtocol", self._inner_lock) + outer_lock = cast("AsyncLockProtocol", self._outer_lock) + + # Writers or readers must first acquire the outer lock to verify no writer is active or pending. + await outer_lock.acquire(timeout=timeout, poll_interval=poll_interval, blocking=blocking) + dur = time.monotonic() - start_time + if timeout is not None: + timeout -= dur + + if self.read_write_mode == ReadWriteMode.READ: + try: + # Acquire the inner lock for reading. + await inner_lock.acquire(timeout=timeout, poll_interval=poll_interval, blocking=blocking) + finally: + # Release outer lock once the inner lock is acquired, allowing other readers in. + await outer_lock.release() + else: + # In write mode, hold both locks: + # - Outer lock prevents new readers from starting. + # - Inner lock ensures exclusive write access. + await inner_lock.acquire(timeout=timeout, poll_interval=poll_interval, blocking=blocking) + return AsyncAcquireReturnProxy(lock=self) + + async def release(self, force: bool = False) -> None: # type: ignore[override] # noqa: FBT001, FBT002 + """ + Releases the file lock. Please note, that the lock is only completely released, if the lock counter is 0. + Also note, that the lock file itself is not automatically deleted. + + :param force: If true, the lock counter is ignored and the lock is released in every case/ + + """ + inner_lock = cast("AsyncLockProtocol", self._inner_lock) + outer_lock = cast("AsyncLockProtocol", self._outer_lock) + + await inner_lock.release(force=force) + if self.read_write_mode == ReadWriteMode.WRITE: + await outer_lock.release(force=force) + + async def __aenter__(self) -> Self: + """ + Acquire the lock. + + :return: the lock object + + """ + await self.acquire() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + """ + Release the lock. + + :param exc_type: the exception type if raised + :param exc_value: the exception value if raised + :param traceback: the exception traceback if raised + + """ + await self.release() + + def __enter__(self) -> NoReturn: + """ + Replace old __enter__ method to avoid using it. + + NOTE: DO NOT USE `with` FOR ASYNCIO LOCKS, USE `async with` INSTEAD. + + :return: none + :rtype: NoReturn + """ + msg = "Do not use `with` for asyncio locks, use `async with` instead." + raise NotImplementedError(msg) + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + pass + + def __del__(self) -> None: + """Called when the lock object is deleted.""" + with contextlib.suppress(RuntimeError): + loop = self.loop or asyncio.get_running_loop() + if not loop.is_running(): # pragma: no cover + loop.run_until_complete(self.release(force=True)) + else: + loop.create_task(self.release(force=True)) + + +class _DisabledAsyncReadWriteFileLock(BaseAsyncReadWriteFileLock): + def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] # noqa: ANN002, ANN003 + msg = "AsyncReadWriteFileLock is unavailable." + raise NotImplementedError(msg) + + +__all__ = [ + "BaseAsyncReadWriteFileLock", +] diff --git a/src/filelock/read_write/asyncio/_wrapper.py b/src/filelock/read_write/asyncio/_wrapper.py new file mode 100644 index 00000000..d0d5ce3f --- /dev/null +++ b/src/filelock/read_write/asyncio/_wrapper.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from filelock.read_write._api import ReadWriteMode +from filelock.read_write._wrapper import BaseReadWriteFileLockWrapper + +if TYPE_CHECKING: + import asyncio + import os + from concurrent import futures + + from ._api import BaseAsyncReadWriteFileLock + + +class BaseAsyncReadWriteFileLockWrapper(BaseReadWriteFileLockWrapper): + """ + Convenience wrapper class for async read/write locks. + + Provides `.read()` and `.write()` methods to easily access a read or write lock. + + .. code-block:: python + + # Acquire a non-exclusive reader lock + async with lock.read(): + pass + + # Acquire an exclusive writer lock + async with lock.write(): + pass + """ + + _read_write_file_lock_cls: type[BaseAsyncReadWriteFileLock] + + def __init__( # noqa: PLR0913 + self, + lock_file: str | os.PathLike[str] | None = None, + timeout: float = -1, + mode: int = 0o644, + thread_local: bool = False, # noqa: FBT001, FBT002 + *, + blocking: bool = True, + lock_file_inner: str | os.PathLike[str] | None = None, + lock_file_outer: str | os.PathLike[str] | None = None, + loop: asyncio.AbstractEventLoop | None = None, + run_in_executor: bool = True, + executor: futures.Executor | None = None, + ) -> None: + """See documentation of BaseAsyncReadWriteFileLock for parameter descriptions.""" + self.read_lock = self._read_write_file_lock_cls( + lock_file=lock_file, + lock_file_inner=lock_file_inner, + lock_file_outer=lock_file_outer, + read_write_mode=ReadWriteMode.READ, + timeout=timeout, + mode=mode, + thread_local=thread_local, + blocking=blocking, + loop=loop, + run_in_executor=run_in_executor, + executor=executor, + ) + self.write_lock = self._read_write_file_lock_cls( + lock_file=lock_file, + lock_file_inner=lock_file_inner, + lock_file_outer=lock_file_outer, + read_write_mode=ReadWriteMode.WRITE, + timeout=timeout, + mode=mode, + thread_local=thread_local, + blocking=blocking, + loop=loop, + run_in_executor=run_in_executor, + executor=executor, + ) + + +class _DisabledAsyncReadWriteFileLockWrapper(BaseAsyncReadWriteFileLockWrapper): + def __init__( # noqa: PLR0913 + self, + lock_file: str | os.PathLike[str] | None = None, + timeout: float = -1, + mode: int = 0o644, + thread_local: bool = False, # noqa: FBT001, FBT002 + *, + blocking: bool = True, + lock_file_inner: str | os.PathLike[str] | None = None, + lock_file_outer: str | os.PathLike[str] | None = None, + loop: asyncio.AbstractEventLoop | None = None, + run_in_executor: bool = True, + executor: futures.Executor | None = None, + ) -> None: + msg = "AsyncReadWriteFileLock is unavailable." + raise NotImplementedError(msg) + + +__all__ = [ + "BaseAsyncReadWriteFileLockWrapper", +] diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..0e07bcc7 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import sys +import threading +from types import TracebackType +from typing import Callable, Tuple, Type, Union + +import pytest + +_ExcInfoType = Union[Tuple[Type[BaseException], BaseException, TracebackType], Tuple[None, None, None]] + + +class ExThread(threading.Thread): + def __init__(self, target: Callable[[], None], name: str) -> None: + super().__init__(target=target, name=name) + self.ex: _ExcInfoType | None = None + + def run(self) -> None: + try: + super().run() + except Exception: # noqa: BLE001 # pragma: no cover + self.ex = sys.exc_info() # pragma: no cover + + def join(self, timeout: float | None = None) -> None: + super().join(timeout=timeout) + if self.ex is not None: + raise RuntimeError from self.ex[1] # pragma: no cover + + +@pytest.fixture +def ex_thread_cls() -> type[ExThread]: + return ExThread diff --git a/tests/test_filelock.py b/tests/test_filelock.py index 8808bf6f..9a611bcc 100644 --- a/tests/test_filelock.py +++ b/tests/test_filelock.py @@ -11,8 +11,7 @@ from inspect import getframeinfo, stack from pathlib import Path, PurePath from stat import S_IWGRP, S_IWOTH, S_IWUSR, filemode -from types import TracebackType -from typing import TYPE_CHECKING, Any, Callable, Iterator, Tuple, Type, Union +from typing import TYPE_CHECKING, Any, Iterator from uuid import uuid4 from weakref import WeakValueDictionary @@ -218,28 +217,10 @@ def test_nested_contruct(lock_type: type[BaseFileLock], tmp_path: Path) -> None: assert not lock_1.is_locked -_ExcInfoType = Union[Tuple[Type[BaseException], BaseException, TracebackType], Tuple[None, None, None]] - - -class ExThread(threading.Thread): - def __init__(self, target: Callable[[], None], name: str) -> None: - super().__init__(target=target, name=name) - self.ex: _ExcInfoType | None = None - - def run(self) -> None: - try: - super().run() - except Exception: # noqa: BLE001 # pragma: no cover - self.ex = sys.exc_info() # pragma: no cover - - def join(self, timeout: float | None = None) -> None: - super().join(timeout=timeout) - if self.ex is not None: - raise RuntimeError from self.ex[1] # pragma: no cover - - @pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) -def test_threaded_shared_lock_obj(lock_type: type[BaseFileLock], tmp_path: Path) -> None: +def test_threaded_shared_lock_obj( + lock_type: type[BaseFileLock], tmp_path: Path, ex_thread_cls: threading.Thread +) -> None: # Runs 100 threads, which need the filelock. The lock must be acquired if at least one thread required it and # released, as soon as all threads stopped. lock_path = tmp_path / "a" @@ -250,7 +231,7 @@ def thread_work() -> None: with lock: assert lock.is_locked - threads = [ExThread(target=thread_work, name=f"t{i}") for i in range(100)] + threads = [ex_thread_cls(target=thread_work, name=f"t{i}") for i in range(100)] for thread in threads: thread.start() for thread in threads: @@ -261,7 +242,9 @@ def thread_work() -> None: @pytest.mark.parametrize("lock_type", [FileLock, SoftFileLock]) @pytest.mark.skipif(hasattr(sys, "pypy_version_info") and sys.platform == "win32", reason="deadlocks randomly") -def test_threaded_lock_different_lock_obj(lock_type: type[BaseFileLock], tmp_path: Path) -> None: +def test_threaded_lock_different_lock_obj( + lock_type: type[BaseFileLock], tmp_path: Path, ex_thread_cls: threading.Thread +) -> None: # Runs multiple threads, which acquire the same lock file with a different FileLock object. When thread group 1 # acquired the lock, thread group 2 must not hold their lock. @@ -279,7 +262,7 @@ def t_2() -> None: lock_path = tmp_path / "a" lock_1, lock_2 = lock_type(str(lock_path)), lock_type(str(lock_path)) - threads = [(ExThread(t_1, f"t1_{i}"), ExThread(t_2, f"t2_{i}")) for i in range(10)] + threads = [(ex_thread_cls(t_1, f"t1_{i}"), ex_thread_cls(t_2, f"t2_{i}")) for i in range(10)] for thread_1, thread_2 in threads: thread_1.start() diff --git a/tests/test_read_write_filelock.py b/tests/test_read_write_filelock.py new file mode 100644 index 00000000..526379ec --- /dev/null +++ b/tests/test_read_write_filelock.py @@ -0,0 +1,635 @@ +from __future__ import annotations + +import asyncio +import random +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from queue import Queue +from typing import TYPE_CHECKING + +import pytest + +from filelock import Timeout +from filelock.read_write import ( + AsyncReadWriteFileLockWrapper, + ReadWriteFileLockWrapper, + ReadWriteMode, + has_read_write_file_lock, +) + +if TYPE_CHECKING: + from pathlib import Path + +if not has_read_write_file_lock: + pytest.skip(reason="ReadWriteFileLock is not available", allow_module_level=True) + + +def test_basic_read_lock(tmp_path: Path) -> None: + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + with rw_wrapper.read() as lock: + assert lock.is_locked + assert lock.read_write_mode == ReadWriteMode.READ + assert not lock.is_locked + + +def test_basic_write_lock(tmp_path: Path) -> None: + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + with rw_wrapper.write() as lock: + assert lock.is_locked + assert lock.read_write_mode == ReadWriteMode.WRITE + assert not lock.is_locked + + +def test_reentrant_read_lock(tmp_path: Path) -> None: + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + lock = rw_wrapper.read() + with lock: + assert lock.is_locked + with lock: + assert lock.is_locked + assert lock.is_locked + assert not lock.is_locked + + +def test_reentrant_write_lock(tmp_path: Path) -> None: + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + lock = rw_wrapper.write() + with lock: + assert lock.is_locked + with lock: + assert lock.is_locked + assert lock.is_locked + assert not lock.is_locked + + +def test_multiple_readers_shared_lock(tmp_path: Path) -> None: + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + lock1 = rw_wrapper.read() + lock2 = rw_wrapper.read() + + with lock1: + assert lock1.is_locked + # Acquiring another read lock should not block + with lock2: + assert lock2.is_locked + assert lock1.is_locked + assert not lock1.is_locked + assert not lock2.is_locked + + +def test_writer_excludes_readers(tmp_path: Path) -> None: + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + rlock = rw_wrapper.read() + wlock = rw_wrapper.write() + + with wlock: + assert wlock.is_locked + # Attempting to acquire a read lock now should block or time out + start = time.perf_counter() + with pytest.raises((Timeout, Exception)): + rlock.acquire(timeout=0.1, blocking=False) + end = time.perf_counter() + assert (end - start) < 1.0 # ensure it didn't block too long + assert not wlock.is_locked + + +def test_readers_blocked_by_writer_priority(tmp_path: Path, ex_thread_cls: threading.Thread) -> None: + """ + Once a writer is waiting for the lock, new readers should not enter. + This ensures writer preference. + """ + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + rlock = rw_wrapper.read() + wlock = rw_wrapper.write() + + # Acquire read lock first + with rlock: + # Start a writer in another thread and ensure it wants the lock + def writer() -> None: + with wlock: + pass + + t = ex_thread_cls(target=writer, name="writer") + t.start() + + # Give some time for writer to start and attempt acquire + time.sleep(0.1) + + # Now attempt to acquire another read lock - should block or timeout because writer preference + another_r = rw_wrapper.read() + with pytest.raises((Timeout, Exception)): + another_r.acquire(timeout=0.1, blocking=True) + + # Now that the read lock is released, the writer should get the lock and release it + t.join() + assert not rlock.is_locked + assert not wlock.is_locked + + +def test_non_blocking_read_when_write_held(tmp_path: Path) -> None: + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + wlock = rw_wrapper.write() + rlock = rw_wrapper.read() + + wlock.acquire() + assert wlock.is_locked + # Non-blocking read should fail immediately + with pytest.raises((Timeout, Exception)): + rlock.acquire(blocking=False) + wlock.release() + + +def test_timeout_read_lock(tmp_path: Path) -> None: + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + wlock = rw_wrapper.write() + rlock = rw_wrapper.read() + + wlock.acquire() + with pytest.raises((Timeout, Exception)): + # Attempt read lock with some timeout + rlock.acquire(timeout=0.1) + wlock.release() + + +def test_timeout_write_lock(tmp_path: Path) -> None: + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + rlock = rw_wrapper.read() + wlock = rw_wrapper.write() + + # Acquire a read lock first + rlock.acquire() + assert rlock.is_locked + # Attempt to acquire a write lock with a short timeout + with pytest.raises((Timeout, Exception)): + wlock.acquire(timeout=0.1) + rlock.release() + + +def test_forced_release(tmp_path: Path) -> None: + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + rlock = rw_wrapper.read() + rlock.acquire() + assert rlock.is_locked + + # Force release + rlock.release(force=True) + assert not rlock.is_locked + + +def test_stress_multiple_threads_readers_and_writers(tmp_path: Path, ex_thread_cls: threading.Thread) -> None: + # Stress test: multiple readers and writers competing + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + num_readers = 50 + num_writers = 10 + + read_hold_time = 0.01 + write_hold_time = 0.02 + + # Shared resource + shared_data = [] + shared_data_lock = threading.Lock() + + def reader() -> None: + with rw_wrapper.read(): + # Multiple readers can enter + # Just check that no writes happen simultaneously + time.sleep(read_hold_time) + with shared_data_lock: + # Check consistency + pass + + def writer() -> None: + with rw_wrapper.write(): + # Exclusive access + old_len = len(shared_data) + time.sleep(write_hold_time) + with shared_data_lock: + shared_data.append(1) + assert len(shared_data) == old_len + 1 + + threads = [] + threads.extend(ex_thread_cls(target=reader, name=f"reader_{i}") for i in range(num_readers)) + threads.extend(ex_thread_cls(target=writer, name=f"writer_{i}") for i in range(num_writers)) + + # Shuffle threads to randomize execution + random.shuffle(threads) + + for t in threads: + t.start() + + for t in threads: + t.join() + + # We expect that all writers appended to the list + assert len(shared_data) == num_writers + + +def test_thrashing_with_thread_pool_readers_writers(tmp_path: Path) -> None: + rw_wrapper = ReadWriteFileLockWrapper(str(tmp_path / "test_rw")) + + txt_file = tmp_path / "data.txt" + txt_file.write_text("initial") + + def read_work() -> None: + with rw_wrapper.read(): + txt_file.read_text() + time.sleep(0.001) + + def write_work() -> None: + with rw_wrapper.write(): + current = txt_file.read_text() + txt_file.write_text(current + "x") + time.sleep(0.002) + + with ThreadPoolExecutor() as executor: + futures = [] + futures.extend(executor.submit(read_work) for _ in range(50)) + futures.extend(executor.submit(write_work) for _ in range(10)) + + # Add more mixed load + for _ in range(20): + futures.append(executor.submit(read_work)) # noqa: FURB113 + futures.append(executor.submit(write_work)) + + for f in futures: + f.result() + + # Ensure file got appended by writers + final_data = txt_file.read_text() + # At least writers appended something + assert len(final_data) > len("initial") + + +def test_threaded_read_write_lock(tmp_path: Path, ex_thread_cls: threading.Thread) -> None: + # Runs 100 reader and 10 writer threads. + # Ensure all readers can acquire the lock at the same time, while no writer can + # acquire the lock. + # Release all the readers and ensure only one writer can acquire the lock at the same time. + + # Note that we can do this because ReadWriteFileLock is thread local by default. + read_write_lock = ReadWriteFileLockWrapper(str(tmp_path / "rw")) + + num_readers = 0 + num_readers_lock = threading.Lock() + num_writers = 0 + num_writers_lock = threading.Lock() + is_ready_readers_queue = Queue() + is_ready_writers_queue = Queue() + should_proceed_event = threading.Event() + is_writer_ready_event = threading.Event() + + def read_thread_work() -> None: + nonlocal num_readers + with read_write_lock(ReadWriteMode.READ): + assert read_write_lock(ReadWriteMode.READ).is_locked + with num_readers_lock: + num_readers += 1 + is_ready_readers_queue.put_nowait(None) + should_proceed_event.wait() + with num_readers_lock: + num_readers -= 1 + + def write_thread_work() -> None: + nonlocal num_writers + is_writer_ready_event.set() + with read_write_lock(ReadWriteMode.WRITE): + assert read_write_lock(ReadWriteMode.WRITE).is_locked + with num_writers_lock: + num_writers += 1 + is_ready_writers_queue.put(1) + with num_writers_lock: + num_writers -= 1 + + read_threads = [ex_thread_cls(target=read_thread_work, name=f"rt{i}") for i in range(100)] + for thread in read_threads: + thread.start() + + for _ in read_threads: + is_ready_readers_queue.get() + + with num_readers_lock: + assert num_readers == len(read_threads) + + write_threads = [ex_thread_cls(target=write_thread_work, name=f"wt{i}") for i in range(10)] + for thread in write_threads: + thread.start() + + is_writer_ready_event.wait() + + # Sleeps are not ideal... + time.sleep(0.1) + with num_writers_lock: + assert num_writers == 0 + time.sleep(0.1) + + should_proceed_event.set() + + for _ in write_threads: + is_ready_writers_queue.get() + with num_writers_lock: + assert num_writers in {0, 1} + + for thread in write_threads: + thread.join() + + assert not read_write_lock(ReadWriteMode.READ).is_locked + assert not read_write_lock(ReadWriteMode.WRITE).is_locked + assert num_readers == 0 + assert num_writers == 0 + + +@pytest.mark.asyncio +async def test_async_basic_read_lock(tmp_path: Path) -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + async with rw_wrapper.read() as lock: + assert lock.is_locked + assert lock.read_write_mode == ReadWriteMode.READ + assert not lock.is_locked + + +@pytest.mark.asyncio +async def test_async_basic_write_lock(tmp_path: Path) -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + async with rw_wrapper.write() as lock: + assert lock.is_locked + assert lock.read_write_mode == ReadWriteMode.WRITE + assert not lock.is_locked + + +@pytest.mark.asyncio +async def test_async_reentrant_read_lock(tmp_path: Path) -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + lock = rw_wrapper.read() + async with lock: + assert lock.is_locked + async with lock: + assert lock.is_locked + assert lock.is_locked + assert not lock.is_locked + + +@pytest.mark.asyncio +async def test_async_reentrant_write_lock(tmp_path: Path) -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + lock = rw_wrapper.write() + async with lock: + assert lock.is_locked + async with lock: + assert lock.is_locked + assert lock.is_locked + assert not lock.is_locked + + +@pytest.mark.asyncio +async def test_async_multiple_readers_shared_lock(tmp_path: Path) -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + lock1 = rw_wrapper.read() + lock2 = rw_wrapper.read() + + async with lock1: + assert lock1.is_locked + # Another read lock should also be acquirable without blocking + async with lock2: + assert lock2.is_locked + assert lock1.is_locked + assert not lock1.is_locked + assert not lock2.is_locked + + +@pytest.mark.asyncio +async def test_async_writer_excludes_readers(tmp_path: Path) -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + rlock = rw_wrapper.read() + wlock = rw_wrapper.write() + + async with wlock: + assert wlock.is_locked + # Attempting to acquire read lock should fail immediately if non-blocking + with pytest.raises(Timeout): + await rlock.acquire(timeout=0.1, blocking=False) + assert not wlock.is_locked + + +@pytest.mark.asyncio +async def test_async_readers_blocked_by_writer_priority(tmp_path: Path) -> None: + """ + Once a writer is waiting for the lock, new readers should not start. + This ensures writer preference. + """ + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + rlock = rw_wrapper.read() + wlock = rw_wrapper.write() + + await rlock.acquire() + try: + # Start a writer attempt in another task + async def writer() -> None: + async with wlock: + pass + + writer_task = asyncio.create_task(writer()) + # Give the writer a moment to start and attempt acquire + await asyncio.sleep(0.1) + + # Now attempt another read lock - should fail due to writer preference + another_r = rw_wrapper.read() + with pytest.raises(Timeout): + await another_r.acquire(timeout=0.1, blocking=True) + + finally: + await rlock.release() + + # Once read lock is released, writer should proceed + await writer_task + + +@pytest.mark.asyncio +async def test_async_non_blocking_read_when_write_held(tmp_path: Path) -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + wlock = rw_wrapper.write() + rlock = rw_wrapper.read() + + await wlock.acquire() + try: + with pytest.raises(Timeout): + await rlock.acquire(blocking=False) + finally: + await wlock.release() + + +@pytest.mark.asyncio +async def test_async_timeout_read_lock(tmp_path: Path) -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + wlock = rw_wrapper.write() + rlock = rw_wrapper.read() + + await wlock.acquire() + try: + with pytest.raises(Timeout): + await rlock.acquire(timeout=0.1) + finally: + await wlock.release() + + +@pytest.mark.asyncio +async def test_async_timeout_write_lock(tmp_path: Path) -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + rlock = rw_wrapper.read() + wlock = rw_wrapper.write() + + await rlock.acquire() + try: + with pytest.raises(Timeout): + await wlock.acquire(timeout=0.1) + finally: + await rlock.release() + + +@pytest.mark.asyncio +async def test_async_forced_release(tmp_path: Path) -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + rlock = rw_wrapper.read() + await rlock.acquire() + assert rlock.is_locked + await rlock.release(force=True) + assert not rlock.is_locked + + +@pytest.mark.asyncio +async def test_async_stress_multiple_tasks_readers_and_writers(tmp_path: Path) -> None: + num_readers = 50 + num_writers = 10 + + # Shared state + shared_data = [] + data_lock = asyncio.Lock() + + async def reader() -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + async with rw_wrapper.read(): + # Multiple readers allowed + await asyncio.sleep(0.01) + # Just check/inspect shared_data under lock + async with data_lock: + # read operation + pass + + async def writer() -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + async with rw_wrapper.write(): + async with data_lock: + old_len = len(shared_data) + await asyncio.sleep(0.02) + async with data_lock: + shared_data.append(1) + assert len(shared_data) == old_len + 1 + + tasks = [] + tasks.extend(asyncio.create_task(reader()) for _ in range(num_readers)) + tasks.extend(asyncio.create_task(writer()) for _ in range(num_writers)) + + # Shuffle tasks + random.shuffle(tasks) + await asyncio.gather(*tasks) + + # All writers must have appended to shared_data + assert len(shared_data) == num_writers + + +@pytest.mark.asyncio +async def test_async_asyncio_concurrent_readers_writers(tmp_path: Path) -> None: + # Similar stress test with mixed load + txt_file = tmp_path / "data.txt" + txt_file.write_text("initial") + + async def read_work() -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + async with rw_wrapper.read(): + _ = txt_file.read_text() + await asyncio.sleep(0.001) + + async def write_work() -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + async with rw_wrapper.write(): + current = txt_file.read_text() + txt_file.write_text(current + "x") + await asyncio.sleep(0.002) + + tasks = [] + tasks.extend(asyncio.create_task(read_work()) for _ in range(50)) + tasks.extend(asyncio.create_task(write_work()) for _ in range(10)) + + # Add more mixed load + for _ in range(20): + tasks.append(asyncio.create_task(read_work())) # noqa: FURB113 + tasks.append(asyncio.create_task(write_work())) + + await asyncio.gather(*tasks) + + final_data = txt_file.read_text() + # At least some writes have occurred + assert len(final_data) > len("initial") + + +def test_async_cannot_use_with_instead_of_async_with(tmp_path: Path) -> None: + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + # Trying to use sync `with` should raise NotImplementedError + lock = rw_wrapper.read() + with pytest.raises(NotImplementedError, match="Do not use `with` for asyncio locks"), lock: + pass + + lock = rw_wrapper.write() + with pytest.raises(NotImplementedError, match="Do not use `with` for asyncio locks"), lock: + pass + + +@pytest.mark.asyncio +async def test_async_writer_priority_race_condition(tmp_path: Path) -> None: + """ + Tests a scenario where multiple tasks attempt to read and write concurrently, + to ensure writer preference is maintained. + """ + + # Track how many readers and writers are active + active_readers = 0 + active_writers = 0 + lock = asyncio.Lock() # to guard active_readers & active_writers + + async def reader() -> None: + nonlocal active_readers, active_writers + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + read_lock = rw_wrapper.read() + async with read_lock: + async with lock: + active_readers += 1 + # No writer should be active while reading + assert active_writers == 0 + await asyncio.sleep(0.001) + async with lock: + active_readers -= 1 + + async def writer() -> None: + nonlocal active_readers, active_writers + rw_wrapper = AsyncReadWriteFileLockWrapper(lock_file=str(tmp_path / "test_rw")) + write_lock = rw_wrapper.write() + async with write_lock: + async with lock: + active_writers += 1 + # No readers should be active while writing + assert active_readers == 0 + await asyncio.sleep(0.002) + async with lock: + active_writers -= 1 + + tasks = [] + tasks.extend(asyncio.create_task(reader()) for _ in range(5)) + tasks.extend(asyncio.create_task(writer()) for _ in range(5)) + + # Shuffle tasks + random.shuffle(tasks) + + await asyncio.gather(*tasks) + + assert active_readers == 0 + assert active_writers == 0