The ThreadPool.ProducerConsumer
unit implements a thread pool using the producer-consumer pattern with advanced backpressure handling. It provides a thread-safe queue for work items and manages a pool of worker threads that process these items.
-
Backpressure Management
- Configurable load thresholds and delays
- Default thresholds:
- Low Load: 50% (0.5)
- Medium Load: 70% (0.7)
- High Load: 90% (0.9)
- Default delays:
- Low Load: 10ms
- Medium Load: 50ms
- High Load: 100ms
- Maximum retry attempts: 5
-
Debug Logging
- Enabled by default (DEBUG_LOG = True)
- Includes timestamps and thread IDs
- Logs queue operations and thread activities
- Helps in monitoring and debugging
-
Enhanced Error Handling
- New
EQueueFullException
for queue saturation - Detailed error messages with queue state
- Thread-safe error reporting
- Proper exception propagation
- New
The implementation uses a fixed-size circular buffer with backpressure:
- Bounded Queue: Fixed capacity of 1024 items prevents memory exhaustion (configurable)
- Load Monitoring: Continuous tracking of queue load factor
- Adaptive Delays: Response times adjust based on queue load
- Fail-Fast Policy: Throws EQueueFullException after max attempts
The Producer-Consumer Thread Pool utilizes a fixed-size circular buffer combined with backpressure and retry mechanisms:
-
Fixed-Size Circular Buffer
- Capacity: The task queue is 1024 items (by default, configurable) to ensure predictable memory usage.
- Circular Nature: Efficiently reuses buffer space without the need for dynamic resizing.
-
Built-in Retry Mechanism
- Automatic Retries: When queue is full, the system will automatically retry up to 5 times (configurable)
- Backpressure Delays: Each retry attempt includes adaptive delays based on queue load
- Exception Handling: Throws EQueueFullException after maximum attempts are exhausted
Warning
While the system includes automatic retry mechanisms, it's recommended that users implement their own error handling strategies for scenarios where the queue remains full after all retry attempts.
classDiagram
class TThreadPoolBase {
<<abstract>>
+Create(threadCount: Integer)
+Queue(procedure)
+Queue(method)
+Queue(procedureIndex, index)
+Queue(methodIndex, index)
+WaitForAll()
+GetLastError()
+ClearLastError()
}
class TProducerConsumerThreadPool {
-FThreads: TThreadList
-FWorkQueue: TThreadSafeQueue
-FCompletionEvent: TEvent
-FWorkItemCount: Integer
-FErrorLock: TCriticalSection
-FWorkItemLock: TCriticalSection
-FLocalThreadCount: Integer
+Create(threadCount: Integer)
+Queue(procedure)
+Queue(method)
+Queue(procedureIndex, index)
+Queue(methodIndex, index)
+WaitForAll()
+GetLastError()
+ClearLastError()
}
class TThreadSafeQueue {
-FItems: array of IWorkItem
-FHead: Integer
-FTail: Integer
-FCount: Integer
-FCapacity: Integer
-FLock: TCriticalSection
+Create(capacity: Integer)
+TryEnqueue(item: IWorkItem): Boolean
+TryDequeue(out item: IWorkItem): Boolean
+GetCount(): Integer
+Clear()
}
class TProducerConsumerWorkerThread {
-FThreadPool: TObject
+Create(threadPool: TObject)
#Execute()
}
TThreadPoolBase <|-- TProducerConsumerThreadPool
TProducerConsumerThreadPool *-- TThreadSafeQueue
TProducerConsumerThreadPool *-- TProducerConsumerWorkerThread
sequenceDiagram
participant App as Application
participant Pool as ThreadPool
participant Queue as WorkQueue
participant Worker as WorkerThread
App->>Pool: Queue(Task)
Pool->>Queue: TryEnqueue
alt Queue Full After Max Attempts
Queue-->>App: Raise EQueueFullException
else Queue Space Available
Queue-->>Pool: true
Pool-->>App: Return
Worker->>Queue: TryDequeue
Queue-->>Worker: Work Item
Worker->>Worker: Execute Task
Worker->>Pool: Update Count
opt Last Task
Pool->>App: Signal Completion
end
end
stateDiagram-v2
[*] --> Created: Create()
Created --> Ready: Initialize Threads
Ready --> Processing: Queue Task
Processing --> Ready: Task Complete
Processing --> QueueFull: Queue Full
QueueFull --> Processing: Queue Space Available
Ready --> Shutdown: Destroy
Processing --> Shutdown: Destroy
QueueFull --> Shutdown: Destroy
Shutdown --> [*]: Cleanup Complete
flowchart LR
A[Task] -->|Create| B[Work Item]
B -->|Queue| C[Thread Pool]
C -->|Enqueue| D[Work Queue]
D -->|Dequeue| E[Worker Thread]
E -->|Execute| F[Complete]
E -->|Error| G[Error State]
G -->|Report| C
- Main thread pool implementation
- Manages worker threads and work queue
- Handles task queueing and completion tracking
- Thread count defaults to CPU count if not specified
- Thread-safe operation using critical sections
- Thread-safe circular queue implementation
- Fixed capacity (1024 items by default, configurable)
- Provides TryEnqueue and TryDequeue operations
- Handles queue full/empty conditions
- Uses critical section for thread safety
- Implements backpressure mechanism
- Monitors queue load factor
- Worker thread implementation
- Continuously processes items from queue
- Handles work item execution and error reporting
- Sleeps when queue is empty (100ms intervals)
- Created suspended, started explicitly
- Uses critical sections for queue operations (FLock)
- Uses critical sections for work item count (FWorkItemLock)
- Uses critical sections for error handling (FErrorLock)
- Uses event object for completion signaling (FCompletionEvent)
- Thread-safe backpressure application
- Queue full conditions trigger retry mechanism
- Maximum retry attempts (default: 5)
- EQueueFullException raised after max retries
- Work item execution errors are captured and stored
- Thread termination is handled gracefully
- Last error accessible via LastError property
- Fixed queue size (1024 items by default, configurable)
- Adaptive delays based on queue load
- Worker threads sleep 100ms when queue empty
- Thread count optimized for CPU count by default
- Thread-safe operations with minimal locking
- Backpressure helps prevent system overload
- Threads created in suspended state
- Started explicitly after creation
- Thread count rules:
- Minimum: 4 threads
- Maximum: 2×
ProcessorCount
- Default:
ProcessorCount
when not specified
- No dynamic thread creation/destruction
- Graceful shutdown via Terminate flag
- Waits for threads to finish current task
- Proper cleanup of thread resources
- Thread-safe removal from thread list
procedure TProducerConsumerWorkerThread.Execute;
begin
while not Terminated do
begin
if TryGetWorkItem then
ProcessWorkItem
else
Sleep(100); // Prevent busy waiting
end;
end;
try
WorkItem.Execute;
except
on E: Exception do
begin
Pool.FErrorLock.Enter;
try
Pool.SetLastError(E.Message);
finally
Pool.FErrorLock.Leave;
end;
end;
end;
- Queue full detection via TryEnqueue
- Exception propagation to caller
- Work item count adjustment
- Completion event management
- Thread-safe error storage
- Last error overwrite policy
- Error clearing mechanism
- Error retrieval synchronization
The implementation uses a fixed-size circular buffer with backpressure:
- Bounded Queue: Fixed capacity of 1024 items (by default, configurable) prevents memory exhaustion
- Thread Safety: All operations protected by FLock critical section
- Backpressure Policy: Adaptive delays based on queue load factor
function TThreadSafeQueue.TryEnqueue(AItem: IWorkItem): boolean;
begin
Result := False;
FLock.Enter;
try
if FCount < FCapacity then
begin
FItems[FTail] := AItem;
FTail := (FTail + 1) mod FCapacity;
Inc(FCount);
Result := True;
end;
finally
FLock.Leave;
end;
end;
-
Memory Safety
- Predictable memory usage
- No risk of unbounded growth
- Protected against memory exhaustion
-
Performance
- O(1) enqueue/dequeue operations
- Minimal lock contention
- No memory allocation during operation
- Quick failure detection
-
Reliability
- Clear failure semantics
- Thread-safe operations
- No hidden blocking
- Predictable behavior under load
type
TThreadSafeQueue = class
private
FLock: TCriticalSection;
// ...
end;
procedure TThreadSafeQueue.TryEnqueue;
begin
FLock.Enter;
try
// Minimal critical section scope
finally
FLock.Leave;
end;
end;
- Pre-allocated queue buffer
- No dynamic resizing
- Work item reference counting
- Proper interface cleanup
- Event-based completion signaling
- Sleep-based idle management
- Multiple critical sections for different concerns
- Minimal lock scope
-
Fixed Capacity
- 1024 items maximum (by default, configurable)
- No dynamic growth
- Blocking on full
- No priority support
-
Performance Impact
- Memory pre-allocation
- Potential queue saturation
- Sleep delay overhead
- Lock contention possible
-
Static Threading
- Fixed thread count
- No dynamic scaling
- No thread pool resizing
- No thread priority control
-
Resource Usage
- Memory for queue buffer
- Thread stack allocation
- Critical section overhead
- Event object overhead
-
Error Storage
- Single error message
- Last error overwrites
- No error history
- No error categorization
-
Exception Management
- Limited error propagation
- No error event system
- No error recovery mechanism
- No error filtering
destructor TProducerConsumerThreadPool.Destroy;
begin
ClearThreads; // Stop and free threads
FWorkQueue.Free; // Free queue and items
FCompletionEvent.Free;
FErrorLock.Free;
FWorkItemLock.Free;
FThreads.Free;
inherited;
end;
- Thread termination before cleanup
- Work item completion handling
- Critical section disposal
- Event object cleanup
- Queue buffer allocation
- Thread stack allocation
- Work item interface references
- Synchronization object overhead