Skip to content

Commit

Permalink
Reduce timeout from _transaction_lock.acquire() wait
Browse files Browse the repository at this point in the history
  • Loading branch information
leventov committed Feb 24, 2025
1 parent 71f4501 commit 8938c15
Showing 1 changed file with 41 additions and 8 deletions.
49 changes: 41 additions & 8 deletions src/filelock/_read_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sqlite3
import threading
import logging
import time
from _error import Timeout
from filelock._api import AcquireReturnProxy, BaseFileLock
from typing import Literal, Any
Expand All @@ -15,15 +16,23 @@
# systems. Use even a lower value to be safe. This 2 bln milliseconds is about 23 days.
_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1

def timeout_for_sqlite(timeout: float = -1, blocking: bool = True) -> int:
def timeout_for_sqlite(timeout: float, blocking: bool, already_waited: float) -> int:
if blocking is False:
return 0

if timeout == -1:
return _MAX_SQLITE_TIMEOUT_MS

if timeout < 0:
raise ValueError("timeout must be a non-negative number or -1")

if timeout > 0:
timeout = timeout - already_waited
if timeout < 0:
timeout = 0

assert timeout >= 0

timeout_ms = int(timeout * 1000)
if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0:
_LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS)
Expand Down Expand Up @@ -97,16 +106,22 @@ def __init__(
def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy:
"""Acquire a read lock. If a lock is already held, it must be a read lock.
Upgrading from read to write is prohibited."""

# Attempt to re-enter already held lock.
with self._internal_lock:
if self._lock_level > 0:
# Must already be in read mode.
if self._current_mode != "read":
raise RuntimeError("Cannot acquire read lock when a write lock is held (no upgrade allowed)")
raise RuntimeError(
f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): "
"already holding a write lock (downgrade not allowed)"
)
self._lock_level += 1
return AcquireReturnProxy(lock=self)

timeout_ms = timeout_for_sqlite(timeout, blocking)

start_time = time.perf_counter()
# Acquire the transaction lock so that the (possibly blocking) SQLite work
# happens without conflicting with other threads' transaction work.
if not self._transaction_lock.acquire(blocking, timeout):
Expand All @@ -115,11 +130,16 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet
# Double-check: another thread might have completed acquisition meanwhile.
with self._internal_lock:
if self._lock_level > 0:
# Must already be in read mode.
if self._current_mode != "read":
raise RuntimeError("Cannot acquire read lock when a write lock is held (no upgrade allowed)")
raise RuntimeError(
f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): "
"already holding a write lock (downgrade not allowed)"
)
self._lock_level += 1
return AcquireReturnProxy(lock=self)

waited = time.perf_counter() - start_time
timeout_ms = timeout_for_sqlite(timeout, blocking, waited)

self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,))
self.con.execute('BEGIN TRANSACTION;')
Expand All @@ -143,25 +163,38 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet
def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy:
"""Acquire a write lock. If a lock is already held, it must be a write lock.
Upgrading from read to write is prohibited."""

# Attempt to re-enter already held lock.
with self._internal_lock:
if self._lock_level > 0:
if self._current_mode != "write":
raise RuntimeError("Cannot acquire write lock: already holding a read lock (no upgrade allowed)")
raise RuntimeError(
f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): "
"already holding a read lock (upgrade not allowed)"
)
self._lock_level += 1
return AcquireReturnProxy(lock=self)

timeout_ms = timeout_for_sqlite(timeout, blocking)
start_time = time.perf_counter()
# Acquire the transaction lock so that the (possibly blocking) SQLite work
# happens without conflicting with other threads' transaction work.
if not self._transaction_lock.acquire(blocking, timeout):
raise Timeout(self.lock_file)
try:
# Double-check: another thread might have completed acquisition meanwhile.
with self._internal_lock:
if self._lock_level > 0:
if self._current_mode != "write":
raise RuntimeError("Cannot acquire write lock: already holding a read lock (no upgrade allowed)")
raise RuntimeError(
f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): "
"already holding a read lock (upgrade not allowed)"
)
self._lock_level += 1
return AcquireReturnProxy(lock=self)

waited = time.perf_counter() - start_time
timeout_ms = timeout_for_sqlite(timeout, blocking, waited)

self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,))
self.con.execute('BEGIN EXCLUSIVE TRANSACTION;')

Expand All @@ -183,7 +216,7 @@ def release(self, force: bool = False) -> None:
if self._lock_level == 0:
if force:
return
raise RuntimeError("Cannot release a lock that is not held")
raise RuntimeError(f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held")
if force:
self._lock_level = 0
else:
Expand Down

0 comments on commit 8938c15

Please sign in to comment.