Easily create multithreaded or asynchronous code without manually touching threads or event loops. FastTask handles the complexity of task management, allowing you to focus on your task logic.
- âś… Simple API to create and run multithreaded tasks
- âś… Supports both synchronous and asynchronous execution
- âś… Run tasks concurrently or sequentially
- âś… Built-in retry mechanism
- âś… Comprehensive error handling
- âś… Colorful logging
# Setup a virtual environment
uv venv
source .venv/bin/activate
# Install FastTask
uv add fast-task
pip install fast-task
Create a task by subclassing FastTask
and override the main
method to implement your task logic.
from fast_task import FastTask
import time
class SimpleTask(FastTask):
TASK_NAME = "Simple Task"
def main(self, message: str) -> bool:
self.log.info(f"Processing message: {message}")
self.wait()
return True
def wait(self) -> None:
self.log.info("Waiting for task to complete...")
time.sleep(2)
self.log.success("Task completed successfully!")
# Run the task
# `task_data` is the input data for the task that will be passed to the `main` method
task = SimpleTask(task_data={"message": "Hello, World!"})
task.start(workers_count=3) # Start 3 workers executing this task
from fast_task import FastTask
import asyncio
class SimpleAsyncTask(FastTask):
TASK_NAME = "Simple Async Task"
async def main(self, message: str) -> bool:
self.log.info(f"Processing message: {message}")
await self.wait()
return True
async def wait(self) -> None:
self.log.info("Waiting for task to complete...")
await asyncio.sleep(2)
self.log.success("Async task completed successfully!")
# Run the async task
# `task_data` is the input data for the task that will be passed to the `main` method
task = SimpleAsyncTask(task_data={"message": "Hello, Async World!"})
task.start(workers_count=3) # Start 3 workers executing this task
from fast_task import FastTask
# Create tasks
task1 = SimpleTask(task_data={"message": "Task 1"})
task2 = SimpleAsyncTask(task_data={"message": "Task 2"})
# Start both tasks concurrently (non-blocking)
task1.start_async(2)
task2.start_async(2)
print("Tasks are running in the background...")
# Do other work here
# Wait for both tasks to complete when needed
FastTask.wait_for_tasks([task1, task2])
print("All tasks have completed!")
task = SimpleTask(task_data={"message": "Status Example"})
task.start_async(2)
# Check if the task is still running
if task.is_running():
print("Task is still running...")
else:
print("Task has completed")
# Get detailed task metadata
metadata = task.metadata()
print(f"Task Name: {metadata['name']}")
print(f"Status: {metadata['status']}")
print(f"Workers: {metadata['workers']}")
from random import randint
from fast_task import FastTask, RetryTaskException
class RetryTask(FastTask):
TASK_NAME = "Retry Task"
def main(self, data: dict) -> bool:
if randint(0, 1) == 0:
# Force a retry
raise RetryTaskException()
self.log.success("Task succeeded")
return True
# Run with 5 retry attempts
task = RetryTask(retries=5)
task.start()
from fast_task import FastTask
def validate_user_input(params: dict) -> list[str] | None:
errors = []
if "username" not in params:
errors.append("Username is required")
elif len(params["username"]) < 3:
errors.append("Username must be at least 3 characters")
if "email" not in params:
errors.append("Email is required")
return errors if errors else None
class UserTask(FastTask):
TASK_NAME = "User Task"
def main(self, username: str, email: str) -> bool:
self.log.info(f"Processing user: {username} ({email})")
return True
# Run with validation
task = UserTask(
task_data={"username": "john", "email": "[email protected]"},
validator=validate_user_input
)
task.start()
FastTask provides several exception types to control task flow:
RetryTaskException
: Triggers a retry of the current functionTaskFailedException
: Marks the task as failedStopTaskException
: Stops the task executionFinishTaskEarlyException
: Completes the task earlyLoopFailedException
: Signals a loop failure in looping tasksTooManyRetriesException
: Thrown when retry limit is exceededTaskInputError
: Raised when input validation fails
Example:
from fast_task import FastTask, exceptions
class ControlFlowTask(FastTask):
TASK_NAME = "Control Flow Task"
def main(self, condition: str) -> bool:
if condition == "fail":
raise exceptions.TaskFailedException("Task failed due to condition")
elif condition == "stop":
raise exceptions.StopTaskException()
elif condition == "finish":
raise exceptions.FinishTaskEarlyException()
return True
Base class for all tasks.
FastTask(
task_data: dict, # Input data for the task
validator: Optional[Callable] = None, # Input validator function
retries: int = 1, # Number of retry attempts
event_loop: Optional[asyncio.AbstractEventLoop] = None # Custom event loop
)
start(workers_count: int = 1, blocking: bool = True) -> Self
: Start task executionstart_async(workers_count: int = 1) -> Self
: Start non-blocking task executionjoin() -> Self
: Wait for task completionis_running() -> bool
: Check if task is still running
main(**kwargs) -> bool
: Main task logic (override this)initialize() -> None
: Setup before executionon_task_finish() -> None
: Cleanup after executioncreate_variables() -> dict
: Add custom variables for main
wait_for_tasks(tasks: List[FastTask]) -> None
: Wait for multiple tasks
from fast_task import TaskStatus
# Available statuses
TaskStatus.WAITING # Task is waiting to start
TaskStatus.ERROR # Task encountered an error
TaskStatus.FAILED # Task failed to complete
TaskStatus.IN_PROGRESS # Task is currently running
TaskStatus.COMPLETED # Task completed successfully
TaskStatus.STOPPING # Task is in the process of stopping
TaskStatus.STOPPED # Task was stopped
FastTask is licensed under either of
- MIT License (LICENSE-MIT or http://opensource.org/licenses/MIT)
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in FastTask by you, as defined in the Apache-2.0 license, shall be dually licensed as above, without any additional terms or conditions.