Skip to content

Commit

Permalink
add FileStore & fix passing kwargs to sqlalchemy (#89)
Browse files Browse the repository at this point in the history
* add file store
- add FileStore
- add with_lock
* update notes, bump version
* add warning for lifecycle tasks
- fixes py3.9
- add more documentation regarding lifecycle tasks
  • Loading branch information
devkral authored Jan 7, 2025
1 parent 1cb909f commit bfd9409
Show file tree
Hide file tree
Showing 16 changed files with 616 additions and 26 deletions.
2 changes: 1 addition & 1 deletion asyncz/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.13.0"
__version__ = "0.13.1"
11 changes: 11 additions & 0 deletions asyncz/file_locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
... f.write('Edgy')
"""

from __future__ import annotations

import os
from collections.abc import Generator
from contextlib import contextmanager
from typing import IO, Any

__all__ = ("LOCK_EX", "LOCK_SH", "LOCK_NB", "lock", "unlock")
Expand Down Expand Up @@ -114,3 +118,10 @@ def lock(f: IO, flags: int) -> bool:
def unlock(f: IO) -> bool:
fcntl.flock(_fd(f), fcntl.LOCK_UN)
return True


@contextmanager
def with_lock(f: IO, flags: int) -> Generator[IO, None, None]:
lock(f, flags)
yield f
unlock(f)
2 changes: 1 addition & 1 deletion asyncz/schedulers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ def _process_tasks_of_store(
due_tasks: list[TaskType] = store.get_due_tasks(now)
except Exception as e:
self.loggers[self.logger_name].warning(
f"Error getting due tasks from the store {store_alias}: {e}."
f'Error getting due tasks from the store "{store_alias}": {e}.'
)
retry_wakeup_time = now + timedelta(seconds=self.store_retry_interval)
if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
Expand Down
1 change: 1 addition & 0 deletions asyncz/schedulers/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

stores: dict[str, str] = {
"memory": "asyncz.stores.memory:MemoryStore",
"file": "asyncz.stores.file:FileStore",
"mongodb": "asyncz.stores.mongo:MongoDBStore",
"redis": "asyncz.stores.redis:RedisStore",
"sqlalchemy": "asyncz.stores.sqlalchemy:SQLAlchemyStore",
Expand Down
178 changes: 178 additions & 0 deletions asyncz/stores/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
from __future__ import annotations

import glob
import os
import pickle
import shutil
from contextlib import suppress
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Union, cast

from asyncz.exceptions import ConflictIdError, TaskLookupError
from asyncz.file_locking import LOCK_EX, LOCK_SH, with_lock
from asyncz.stores.base import BaseStore
from asyncz.tasks import Task
from asyncz.tasks.types import TaskType

if TYPE_CHECKING:
from asyncz.schedulers.types import SchedulerType


class FileStore(BaseStore):
"""
Stores tasks via sqlalchemy in a database.
Args:
directory - The directory to store the tasks. String or Path.
suffix - The task suffix.
pickle_protocol - Pickle protocol level to use (for serialization), defaults to the
highest available.
"""

forbidden_characters: set[str] = {"/", "\\", "\0", ":"}

def __init__(
self,
directory: Union[str, os.PathLike],
suffix: str = ".task",
mode: int = 0o700,
cleanup_directory: bool = False,
pickle_protocol: Optional[int] = pickle.HIGHEST_PROTOCOL,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.pickle_protocol = pickle_protocol
self.directory = Path(directory)
self.mode = mode
self.cleanup_directory = cleanup_directory
self.suffix = suffix

def check_task_id(self, task_id: str | None) -> None:
if task_id is None:
raise RuntimeError("Task id is None")
if task_id.startswith("."):
raise RuntimeError(f'Invalid character in task id: "{task_id}".')
for char in task_id:
if char in self.forbidden_characters:
raise RuntimeError(f'Invalid character in task id: "{task_id}".')

def start(self, scheduler: Any, alias: str) -> None:
"""
When starting omits from the index any documents that lack next_run_time field.
"""
super().start(scheduler, alias)
self.directory.mkdir(self.mode, parents=True, exist_ok=True)
if not self.directory.is_dir():
raise RuntimeError("Not a directory.")

def shutdown(self) -> None:
if self.cleanup_directory:
shutil.rmtree(self.directory, ignore_errors=True)
super().shutdown()

def lookup_task(self, task_id: str) -> Optional[TaskType]:
self.check_task_id(task_id)
task_path = self.directory / f"{task_id}{self.suffix}"
try:
with open(task_path, "rb") as read_ob, with_lock(read_ob, LOCK_SH):
task = self.rebuild_task(read_ob.read())
except Exception:
task_path.unlink(missing_ok=True)
task = None
return task

def rebuild_task(self, state: Any) -> TaskType:
state = pickle.loads(self.conditional_decrypt(state))
task = Task.__new__(Task)
task.__setstate__(state)
task.scheduler = cast("SchedulerType", self.scheduler)
task.store_alias = self.alias
return task

def get_due_tasks(self, now: datetime) -> list[TaskType]:
return [
task
for task in self.get_all_tasks()
if task.next_run_time is not None and task.next_run_time <= now
]

def get_tasks(self) -> list[TaskType]:
tasks: list[tuple[TaskType, os.stat_result]] = []
with os.scandir(self.directory) as scanner:
for entry in scanner:
if not entry.name.endswith(self.suffix) or not entry.is_file():
continue
try:
with open(entry.path, "rb") as read_ob, with_lock(read_ob, LOCK_SH):
task = self.rebuild_task(read_ob.read())
tasks.append((task, entry.stat()))
except Exception:
with suppress(FileNotFoundError):
os.unlink(entry.path)
return [
task
for task, _ in sorted(
tasks,
key=lambda task_stat: (
int(task_stat[0].next_run_time is None),
task_stat[0].next_run_time,
# sort for task creation not update
task_stat[1].st_ctime,
),
)
]

def get_next_run_time(self) -> Optional[datetime]:
next_run_time: datetime | None = None
for task in self.get_all_tasks():
if task.next_run_time is None:
break
if next_run_time is None or next_run_time >= task.next_run_time:
next_run_time = task.next_run_time
return next_run_time

def get_all_tasks(self) -> list[TaskType]:
return self.get_tasks()

def add_task(self, task: TaskType) -> None:
self.check_task_id(task.id)
task_path = self.directory / f"{task.id}{self.suffix}"
try:
with task_path.open("xb") as write_ob, with_lock(write_ob, LOCK_EX):
write_ob.write(
self.conditional_encrypt(
pickle.dumps(task.__getstate__(), self.pickle_protocol)
)
)
except FileExistsError:
raise ConflictIdError(task.id) from None

def update_task(self, task: TaskType) -> None:
self.check_task_id(task.id)
task_path = self.directory / f"{task.id}{self.suffix}"
try:
with task_path.open("r+b") as write_ob, with_lock(write_ob, LOCK_EX):
write_ob.truncate()
write_ob.write(
self.conditional_encrypt(
pickle.dumps(task.__getstate__(), self.pickle_protocol)
)
)
except FileNotFoundError:
raise TaskLookupError(task.id) from None

def delete_task(self, task_id: str) -> None:
self.check_task_id(task_id)
task_path = self.directory / f"{task_id}{self.suffix}"
try:
task_path.unlink(missing_ok=False)
except FileNotFoundError:
raise TaskLookupError(task_id) from None

def remove_all_tasks(self) -> None:
for task_path in self.directory.glob(f"*{glob.escape(self.suffix)}"):
task_path.unlink(missing_ok=True)

def __repr__(self) -> str:
return f"<{self.__class__.__name__} (directory={self.directory})>"
16 changes: 7 additions & 9 deletions asyncz/stores/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(
super().__init__(**kwargs)
self.pickle_protocol = pickle_protocol
if isinstance(database, str):
database = sqlalchemy.create_engine(database)
database = sqlalchemy.create_engine(database, **kwargs)
if not database:
raise ValueError("database must not be empty or None")
self.engine: sqlalchemy.Engine = database
Expand All @@ -63,6 +63,10 @@ def start(self, scheduler: Any, alias: str) -> None:
super().start(scheduler, alias)
self.metadata.create_all(self.engine)

def shutdown(self) -> None:
self.engine.dispose()
super().shutdown()

def lookup_task(self, task_id: str) -> Optional[TaskType]:
tasks = self.get_tasks(self.table.c.id == task_id, limit=1)
return tasks[0] if tasks else None
Expand All @@ -82,7 +86,7 @@ def get_due_tasks(self, now: datetime) -> list[TaskType]:
def get_tasks(self, conditions: Any = None, limit: int = 0) -> list[TaskType]:
tasks: list[TaskType] = []
failed_task_ids = []
stmt = self.table.select().order_by(self.table.c.next_run_time.asc())
stmt = self.table.select().order_by(self.table.c.next_run_time.asc().nullslast())
if conditions is not None:
stmt = stmt.where(conditions)

Expand Down Expand Up @@ -119,9 +123,7 @@ def get_next_run_time(self) -> Optional[datetime]:
return utc_timestamp_to_datetime(row.next_run_time) if row else None

def get_all_tasks(self) -> list[TaskType]:
tasks = self.get_tasks()
self.fix_paused_tasks(tasks)
return tasks
return self.get_tasks()

def add_task(self, task: TaskType) -> None:
data = {
Expand Down Expand Up @@ -168,9 +170,5 @@ def remove_all_tasks(self) -> None:
with self.engine.begin() as conn:
conn.execute(self.table.delete())

def shutdown(self) -> None:
self.engine.dispose()
super().shutdown()

def __repr__(self) -> str:
return f"<{self.__class__.__name__} (database={self.engine.url})>"
12 changes: 12 additions & 0 deletions docs/release-notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Release Notes


## 0.13.1

### Added

- `FileStore` was added (simple synchronization via files in a directory).
- `with_lock` was added to `asyncz.file_locking`.

### Fixed

- SQLAlchemyStore didn't pass extra arguments to create_engine.

## 0.13.0

### Added
Expand Down
2 changes: 1 addition & 1 deletion docs/schedulers.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ The third option is by starting the scheduler and use the `setup` method.
### Multi-Proccessing mode

Asyncz schedulers have an optional multiprocessing mode. It can be activated by setting the
`lock_path` option to e.g. `"/tmp/asyncz_{store}_{ppid}.pid"`
`lock_path` option to e.g. `"/tmp/asyncz_{store}_{pgrp}.lock"`

This defines a per-store process lock via a file.

Expand Down
35 changes: 35 additions & 0 deletions docs/stores.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,38 @@ stateless tasks or tasks that do not require some sort of cache.
from asyncz.stores.memory import MemoryStore
```

## FileStore

This is also a basic store which is always available. It supports the task synchronization
between multiple processes via task files in a directory.
It is not fast and its security relies solely on file permissions and encryption.

!!! Warning
People who can inject a file in the directory,
will be able to inject code. Except if you use encryption.

**Store Alias** - `file`

### Parameters

- **directory** - The directory to use. Should be well protected.
- **suffix** - The suffix of task files. Files with other suffixes are ignored.

<sup>Default: `".task"`</sup>

- **mode** - The mode of the directory.

<sup>Default: `0o700`</sup>

- **cleanup_directory** - Shall the directory be deleted after shutdown? This will cleanup old tasks.

<sup>Default: `False`</sup>

- **pickle_protocol**- Pickle protocol level to use (for serialization), defaults to the
highest available.

<sup>Default: `pickle.HIGHEST_PROTOCOL`</sup>

## RedisStore

**Store Alias** - `redis`
Expand Down Expand Up @@ -146,6 +178,9 @@ available.

<sup>Default: `pickle.HIGHEST_PROTOCOL`</sup>

Other kwargs are passed to sqlalchemy.create_engine.


## Custom store

```python
Expand Down
Loading

0 comments on commit bfd9409

Please sign in to comment.