-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathThread.cpp
258 lines (220 loc) · 7.79 KB
/
Thread.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
// Thread.cpp
// DNS Proxy Filter Server
// Copyright (c) 2010-2019 Untangle, Inc.
// All Rights Reserved
// Written by Michael A. Hotz
#include "common.h"
/*
ThreadLogic is the base class for all objects that need to spawn
a seperate threads of execution. Includes logic to start the
thread in suspended mode and terminate the thread when the
destructor is called. The ThreadSignal semaphore is initially
clear, causing the static ThreadMaster function to wait until
the BeginExecution function is called, after which it will call
the member ThreadWorker function. ScramExecution sets the same
semaphore again, which tells the running thread to terminate.
Derived classes should override the ThreadWorker function and
must watch the ThreadSignal semaphore which will be set when
the destructor is called.
ThreadPool is the base class for all objects that need to execute
within a dynamic thread pool. It spawns the requested number
of theads as instances of ThreadItem, which is derived from
and expands upon ThreadLogic to support the pool logic. The
threads all monitor a shared message queue, and will call the
ThreadCallback which derived classes should override to process
the messages from the queue.
*/
/*--------------------------------------------------------------------------*/
ThreadLogic::ThreadLogic(void)
{
// initialize the thread control semaphore
sem_init(&ThreadSignal,0,0);
// spin up a new thread
pthread_create(&ThreadHandle,NULL,ThreadMaster,this);
}
/*--------------------------------------------------------------------------*/
ThreadLogic::~ThreadLogic(void)
{
// set the thread signal semaphore
sem_post(&ThreadSignal);
// signal the thread function to terminate
pthread_kill(ThreadHandle,SIGWINCH);
// wait for the thread to terminate
pthread_join(ThreadHandle,NULL);
// destroy the thread killer semaphore
sem_destroy(&ThreadSignal);
}
/*--------------------------------------------------------------------------*/
void ThreadLogic::BeginExecution(int argWait)
{
int value,ret;
// signal the thread function to begin execution
sem_post(&ThreadSignal);
if (argWait == 0) return;
// Normally argWait will be zero but I added this stupidity for
// the sole purpose of allowing the main thread console notice
// message to appear after all the other threads have finished
// spewing their starup messages. Without it the console notice
// was appearing first! Go figure.
for(;;)
{
ret = sem_getvalue(&ThreadSignal,&value);
if (ret != 0) break;
if (value == 0) break;
usleep(argWait);
}
}
/*--------------------------------------------------------------------------*/
void ThreadLogic::ScramExecution(void)
{
// signal the thread function to begin execution
sem_post(&ThreadSignal);
}
/*--------------------------------------------------------------------------*/
void* ThreadLogic::ThreadMaster(void *aObject)
{
ThreadLogic *mypointer = (ThreadLogic *)aObject;
sigset_t sigset;
// first we store our object pointer in thread local storage
pthread_setspecific(g_threadkey,aObject);
// start by blocking all signals
sigfillset(&sigset);
pthread_sigmask(SIG_BLOCK,&sigset,NULL);
// now we allow only the signals we care about
sigemptyset(&sigset);
sigaddset(&sigset,SIGWINCH);
sigaddset(&sigset,SIGPROF);
pthread_sigmask(SIG_UNBLOCK,&sigset,NULL);
// set the itimer value of the main thread which is required
// for gprof to work properly with multithreaded applications
setitimer(ITIMER_PROF,&g_itimer,NULL);
// wait for the control semaphore
sem_wait(&mypointer->ThreadSignal);
// call to our member worker function
return(mypointer->ThreadWorker());
}
/*--------------------------------------------------------------------------*/
/****************************************************************************/
/*--------------------------------------------------------------------------*/
ThreadPool::ThreadPool(int aCount,int aLimit,const char *argName)
{
int x;
// save our pool name and clear member variables
PoolName = newstr(argName);
ThreadList = NULL;
ThreadTotal = 0;
ThreadLimit = aLimit;
// spawn the requested number of threads
for(x = 0;x < aCount;x++) InsertThread();
}
/*--------------------------------------------------------------------------*/
ThreadPool::~ThreadPool(void)
{
ThreadItem *local;
// signal all the threads to speed up shutdown
for(local = ThreadList;local != NULL;local = local->next) sem_post(&local->ThreadSignal);
// now remove all the threads
while (ThreadList != NULL) RemoveThread();
// clean up our pool name string
freestr(PoolName);
}
/*--------------------------------------------------------------------------*/
void ThreadPool::BeginExecution(int argWait)
{
ThreadItem *local;
// signal all the threads to start execution
for(local = ThreadList;local != NULL;local = local->next) local->BeginExecution(argWait);
}
/*--------------------------------------------------------------------------*/
void ThreadPool::InsertThread(int argStart)
{
ThreadItem *local;
// don't exceed our limit
if (ThreadTotal == ThreadLimit) return;
// allocate a new thread item
local = new ThreadItem(this,ThreadCounter++);
// insert into the double linked list
local->next = ThreadList;
if (ThreadList != NULL) ThreadList->last = local;
ThreadList = local;
// increment the thread count
ThreadTotal++;
// normally the start flag will not be set as the pool BeginExecution
// member takes care of spinning everything up. However when new threads
// are added later the flag will be set to signal startup is required
if (argStart != 0) local->BeginExecution();
}
/*--------------------------------------------------------------------------*/
void ThreadPool::RemoveThread(void)
{
ThreadItem *local;
if (ThreadList == NULL) return;
// remove the first item from the linked list
local = ThreadList;
ThreadList = ThreadList->next;
if (local->next != NULL) local->next->last = NULL;
// delete the thread item
delete(local);
// decrement the thread count
ThreadTotal--;
}
/*--------------------------------------------------------------------------*/
void ThreadPool::EnterCallback(void)
{
// increment threads in use counter
BusyCounter++;
// if all the threads are now busy call the virtual notification
if (BusyCounter == ThreadTotal) ThreadSaturation(ThreadTotal);
}
/*--------------------------------------------------------------------------*/
void ThreadPool::LeaveCallback(void)
{
// decrement threads in use counter
BusyCounter--;
}
/*--------------------------------------------------------------------------*/
/****************************************************************************/
/*--------------------------------------------------------------------------*/
ThreadItem::ThreadItem(ThreadPool *argParent,int argIndex)
{
// save our parent
Parent = argParent;
next = last = NULL;
ThreadNumber = argIndex;
}
/*--------------------------------------------------------------------------*/
ThreadItem::~ThreadItem(void)
{
}
/*--------------------------------------------------------------------------*/
void* ThreadItem::ThreadWorker(void)
{
struct timespec ts;
MessageFrame *local;
int check;
int ret;
g_log->LogMessage(LOG_INFO,"Thread pool %s starting thread %d\n",Parent->PoolName,ThreadNumber);
for(;;)
{
// watch the thread signal for termination
check = 0;
ret = sem_getvalue(&ThreadSignal,&check);
if (ret != 0) break;
if (check != 0) break;
// wait for an object in the work queue
clock_gettime(CLOCK_REALTIME,&ts);
ts.tv_sec++;
ret = sem_timedwait(&Parent->MessageSignal,&ts);
if (ret != 0) continue;
// grab, process, and delete the message
local = Parent->GrabMessage();
if (local == NULL) continue;
Parent->EnterCallback();
Parent->ThreadCallback(local);
Parent->LeaveCallback();
delete(local);
}
g_log->LogMessage(LOG_INFO,"Thread pool %s stopping thread %d\n",Parent->PoolName,ThreadNumber);
return(NULL);
}
/*--------------------------------------------------------------------------*/