Skip to content

A lightweight, flexible task execution library for easily creating multithreaded and asynchronous code

License

Apache-2.0, MIT licenses found

Licenses found

Apache-2.0
LICENSE-APACHE
MIT
LICENSE-MIT
Notifications You must be signed in to change notification settings

FastestMolasses/FastTask

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

FastTask

Python 3.13+ License: MIT License: Apache 2.0 PyPI

Easily create multithreaded or asynchronous code without manually touching threads or event loops. FastTask handles the complexity of task management, allowing you to focus on your task logic.

Features

  • âś… Simple API to create and run multithreaded tasks
  • âś… Supports both synchronous and asynchronous execution
  • âś… Run tasks concurrently or sequentially
  • âś… Built-in retry mechanism
  • âś… Comprehensive error handling
  • âś… Colorful logging

Installation

Preferred Method (using UV)

# Setup a virtual environment
uv venv
source .venv/bin/activate

# Install FastTask
uv add fast-task

Alternative Method (using pip)

pip install fast-task

Basic Usage

Create a Simple Task

Create a task by subclassing FastTask and override the main method to implement your task logic.

from fast_task import FastTask
import time


class SimpleTask(FastTask):
    TASK_NAME = "Simple Task"

    def main(self, message: str) -> bool:
        self.log.info(f"Processing message: {message}")
        self.wait()
        return True

    def wait(self) -> None:
        self.log.info("Waiting for task to complete...")
        time.sleep(2)
        self.log.success("Task completed successfully!")


# Run the task
# `task_data` is the input data for the task that will be passed to the `main` method
task = SimpleTask(task_data={"message": "Hello, World!"})
task.start(workers_count=3)  # Start 3 workers executing this task

Create an Async Task

from fast_task import FastTask
import asyncio


class SimpleAsyncTask(FastTask):
    TASK_NAME = "Simple Async Task"

    async def main(self, message: str) -> bool:
        self.log.info(f"Processing message: {message}")
        await self.wait()
        return True

    async def wait(self) -> None:
        self.log.info("Waiting for task to complete...")
        await asyncio.sleep(2)
        self.log.success("Async task completed successfully!")


# Run the async task
# `task_data` is the input data for the task that will be passed to the `main` method
task = SimpleAsyncTask(task_data={"message": "Hello, Async World!"})
task.start(workers_count=3)  # Start 3 workers executing this task

Advanced Usage

Running Tasks Concurrently

from fast_task import FastTask

# Create tasks
task1 = SimpleTask(task_data={"message": "Task 1"})
task2 = SimpleAsyncTask(task_data={"message": "Task 2"})

# Start both tasks concurrently (non-blocking)
task1.start_async(2)
task2.start_async(2)

print("Tasks are running in the background...")

# Do other work here

# Wait for both tasks to complete when needed
FastTask.wait_for_tasks([task1, task2])
print("All tasks have completed!")

Checking Task Status

task = SimpleTask(task_data={"message": "Status Example"})
task.start_async(2)

# Check if the task is still running
if task.is_running():
    print("Task is still running...")
else:
    print("Task has completed")

# Get detailed task metadata
metadata = task.metadata()
print(f"Task Name: {metadata['name']}")
print(f"Status: {metadata['status']}")
print(f"Workers: {metadata['workers']}")

Retrying Tasks on Failure

from random import randint
from fast_task import FastTask, RetryTaskException


class RetryTask(FastTask):
    TASK_NAME = "Retry Task"

    def main(self, data: dict) -> bool:
        if randint(0, 1) == 0:
            # Force a retry
            raise RetryTaskException()

        self.log.success("Task succeeded")
        return True


# Run with 5 retry attempts
task = RetryTask(retries=5)
task.start()

Task Input Validation

from fast_task import FastTask


def validate_user_input(params: dict) -> list[str] | None:
    errors = []

    if "username" not in params:
        errors.append("Username is required")
    elif len(params["username"]) < 3:
        errors.append("Username must be at least 3 characters")

    if "email" not in params:
        errors.append("Email is required")

    return errors if errors else None


class UserTask(FastTask):
    TASK_NAME = "User Task"

    def main(self, username: str, email: str) -> bool:
        self.log.info(f"Processing user: {username} ({email})")
        return True


# Run with validation
task = UserTask(
    task_data={"username": "john", "email": "[email protected]"},
    validator=validate_user_input
)
task.start()

Exception Handling

FastTask provides several exception types to control task flow:

  • RetryTaskException: Triggers a retry of the current function
  • TaskFailedException: Marks the task as failed
  • StopTaskException: Stops the task execution
  • FinishTaskEarlyException: Completes the task early
  • LoopFailedException: Signals a loop failure in looping tasks
  • TooManyRetriesException: Thrown when retry limit is exceeded
  • TaskInputError: Raised when input validation fails

Example:

from fast_task import FastTask, exceptions


class ControlFlowTask(FastTask):
    TASK_NAME = "Control Flow Task"

    def main(self, condition: str) -> bool:
        if condition == "fail":
            raise exceptions.TaskFailedException("Task failed due to condition")
        elif condition == "stop":
            raise exceptions.StopTaskException()
        elif condition == "finish":
            raise exceptions.FinishTaskEarlyException()

        return True

API Reference

FastTask Class

Base class for all tasks.

Constructor

FastTask(
    task_data: dict,                               # Input data for the task
    validator: Optional[Callable] = None,          # Input validator function
    retries: int = 1,                              # Number of retry attempts
    event_loop: Optional[asyncio.AbstractEventLoop] = None  # Custom event loop
)

Main Methods

  • start(workers_count: int = 1, blocking: bool = True) -> Self: Start task execution
  • start_async(workers_count: int = 1) -> Self: Start non-blocking task execution
  • join() -> Self: Wait for task completion
  • is_running() -> bool: Check if task is still running

Overridable Methods

  • main(**kwargs) -> bool: Main task logic (override this)
  • initialize() -> None: Setup before execution
  • on_task_finish() -> None: Cleanup after execution
  • create_variables() -> dict: Add custom variables for main

Static Methods

  • wait_for_tasks(tasks: List[FastTask]) -> None: Wait for multiple tasks

Task Status

from fast_task import TaskStatus

# Available statuses
TaskStatus.WAITING       # Task is waiting to start
TaskStatus.ERROR         # Task encountered an error
TaskStatus.FAILED        # Task failed to complete
TaskStatus.IN_PROGRESS   # Task is currently running
TaskStatus.COMPLETED     # Task completed successfully
TaskStatus.STOPPING      # Task is in the process of stopping
TaskStatus.STOPPED       # Task was stopped

License

FastTask is licensed under either of

at your option.

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in FastTask by you, as defined in the Apache-2.0 license, shall be dually licensed as above, without any additional terms or conditions.

About

A lightweight, flexible task execution library for easily creating multithreaded and asynchronous code

Topics

Resources

License

Apache-2.0, MIT licenses found

Licenses found

Apache-2.0
LICENSE-APACHE
MIT
LICENSE-MIT

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages