From 8938c15b3ac6e3e90b65df535e102c0d6378be65 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Mon, 24 Feb 2025 13:48:18 +0700 Subject: [PATCH] Reduce timeout from _transaction_lock.acquire() wait --- src/filelock/_read_write.py | 49 +++++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/src/filelock/_read_write.py b/src/filelock/_read_write.py index 4557e61..f0c06b0 100644 --- a/src/filelock/_read_write.py +++ b/src/filelock/_read_write.py @@ -2,6 +2,7 @@ import sqlite3 import threading import logging +import time from _error import Timeout from filelock._api import AcquireReturnProxy, BaseFileLock from typing import Literal, Any @@ -15,15 +16,23 @@ # systems. Use even a lower value to be safe. This 2 bln milliseconds is about 23 days. _MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1 -def timeout_for_sqlite(timeout: float = -1, blocking: bool = True) -> int: +def timeout_for_sqlite(timeout: float, blocking: bool, already_waited: float) -> int: if blocking is False: return 0 + if timeout == -1: return _MAX_SQLITE_TIMEOUT_MS + if timeout < 0: raise ValueError("timeout must be a non-negative number or -1") + if timeout > 0: + timeout = timeout - already_waited + if timeout < 0: + timeout = 0 + assert timeout >= 0 + timeout_ms = int(timeout * 1000) if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0: _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS) @@ -97,16 +106,22 @@ def __init__( def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy: """Acquire a read lock. If a lock is already held, it must be a read lock. Upgrading from read to write is prohibited.""" + + # Attempt to re-enter already held lock. with self._internal_lock: if self._lock_level > 0: # Must already be in read mode. if self._current_mode != "read": - raise RuntimeError("Cannot acquire read lock when a write lock is held (no upgrade allowed)") + raise RuntimeError( + f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): " + "already holding a write lock (downgrade not allowed)" + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) timeout_ms = timeout_for_sqlite(timeout, blocking) + start_time = time.perf_counter() # Acquire the transaction lock so that the (possibly blocking) SQLite work # happens without conflicting with other threads' transaction work. if not self._transaction_lock.acquire(blocking, timeout): @@ -115,11 +130,16 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet # Double-check: another thread might have completed acquisition meanwhile. with self._internal_lock: if self._lock_level > 0: - # Must already be in read mode. if self._current_mode != "read": - raise RuntimeError("Cannot acquire read lock when a write lock is held (no upgrade allowed)") + raise RuntimeError( + f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): " + "already holding a write lock (downgrade not allowed)" + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) + + waited = time.perf_counter() - start_time + timeout_ms = timeout_for_sqlite(timeout, blocking, waited) self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,)) self.con.execute('BEGIN TRANSACTION;') @@ -143,14 +163,21 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy: """Acquire a write lock. If a lock is already held, it must be a write lock. Upgrading from read to write is prohibited.""" + + # Attempt to re-enter already held lock. with self._internal_lock: if self._lock_level > 0: if self._current_mode != "write": - raise RuntimeError("Cannot acquire write lock: already holding a read lock (no upgrade allowed)") + raise RuntimeError( + f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): " + "already holding a read lock (upgrade not allowed)" + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) - timeout_ms = timeout_for_sqlite(timeout, blocking) + start_time = time.perf_counter() + # Acquire the transaction lock so that the (possibly blocking) SQLite work + # happens without conflicting with other threads' transaction work. if not self._transaction_lock.acquire(blocking, timeout): raise Timeout(self.lock_file) try: @@ -158,10 +185,16 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe with self._internal_lock: if self._lock_level > 0: if self._current_mode != "write": - raise RuntimeError("Cannot acquire write lock: already holding a read lock (no upgrade allowed)") + raise RuntimeError( + f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): " + "already holding a read lock (upgrade not allowed)" + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) + waited = time.perf_counter() - start_time + timeout_ms = timeout_for_sqlite(timeout, blocking, waited) + self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,)) self.con.execute('BEGIN EXCLUSIVE TRANSACTION;') @@ -183,7 +216,7 @@ def release(self, force: bool = False) -> None: if self._lock_level == 0: if force: return - raise RuntimeError("Cannot release a lock that is not held") + raise RuntimeError(f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held") if force: self._lock_level = 0 else: