-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy pathpipe.c
263 lines (242 loc) · 7.88 KB
/
pipe.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
/*
* pipe.c
*
* Simple demonstration of a pipeline. main() is a loop that
* feeds the pipeline with integer values. Each stage of the
* pipeline increases the integer by one before passing it along
* to the next. Entering the command "=" reads the pipeline
* result. (Notice that too many '=' commands will hang.)
*/
#include <pthread.h>
#include "errors.h"
/*
* Internal structure describing a "stage" in the
* pipeline. One for each thread, plus a "result
* stage" where the final thread can stash the value.
*/
typedef struct stage_tag {
pthread_mutex_t mutex; /* Protect data */
pthread_cond_t avail; /* Data available */
pthread_cond_t ready; /* Ready for data */
int data_ready; /* Data present */
long data; /* Data to process */
pthread_t thread; /* Thread for stage */
struct stage_tag *next; /* Next stage */
} stage_t;
/*
* External structure representing the entire
* pipeline.
*/
typedef struct pipe_tag {
pthread_mutex_t mutex; /* Mutex to protect pipe */
stage_t *head; /* First stage */
stage_t *tail; /* Final stage */
int stages; /* Number of stages */
int active; /* Active data elements */
} pipe_t;
/*
* Internal function to send a "message" to the
* specified pipe stage. Threads use this to pass
* along the modified data item.
*/
int pipe_send (stage_t *stage, long data)
{
int status;
status = pthread_mutex_lock (&stage->mutex);
if (status != 0)
return status;
/*
* If there's data in the pipe stage, wait for it
* to be consumed.
*/
while (stage->data_ready) {
status = pthread_cond_wait (&stage->ready, &stage->mutex);
if (status != 0) {
pthread_mutex_unlock (&stage->mutex);
return status;
}
}
/*
* Send the new data
*/
stage->data = data;
stage->data_ready = 1;
status = pthread_cond_signal (&stage->avail);
if (status != 0) {
pthread_mutex_unlock (&stage->mutex);
return status;
}
status = pthread_mutex_unlock (&stage->mutex);
return status;
}
/*
* The thread start routine for pipe stage threads.
* Each will wait for a data item passed from the
* caller or the previous stage, modify the data
* and pass it along to the next (or final) stage.
*/
void *pipe_stage (void *arg)
{
stage_t *stage = (stage_t*)arg;
stage_t *next_stage = stage->next;
int status;
status = pthread_mutex_lock (&stage->mutex);
if (status != 0)
err_abort (status, "Lock pipe stage");
while (1) {
while (stage->data_ready != 1) {
status = pthread_cond_wait (&stage->avail, &stage->mutex);
if (status != 0)
err_abort (status, "Wait for previous stage");
}
pipe_send (next_stage, stage->data + 1);
stage->data_ready = 0;
status = pthread_cond_signal (&stage->ready);
if (status != 0)
err_abort (status, "Wake next stage");
}
/*
* Notice that the routine never unlocks the stage->mutex.
* The call to pthread_cond_wait implicitly unlocks the
* mutex while the thread is waiting, allowing other threads
* to make progress. Because the loop never terminates, this
* function has no need to unlock the mutex explicitly.
*/
}
/*
* External interface to create a pipeline. All the
* data is initialized and the threads created. They'll
* wait for data.
*/
int pipe_create (pipe_t *pipe, int stages)
{
int pipe_index;
stage_t **link = &pipe->head, *new_stage, *stage;
int status;
status = pthread_mutex_init (&pipe->mutex, NULL);
if (status != 0)
err_abort (status, "Init pipe mutex");
pipe->stages = stages;
pipe->active = 0;
for (pipe_index = 0; pipe_index <= stages; pipe_index++) {
new_stage = (stage_t*)malloc (sizeof (stage_t));
if (new_stage == NULL)
errno_abort ("Allocate stage");
status = pthread_mutex_init (&new_stage->mutex, NULL);
if (status != 0)
err_abort (status, "Init stage mutex");
status = pthread_cond_init (&new_stage->avail, NULL);
if (status != 0)
err_abort (status, "Init avail condition");
status = pthread_cond_init (&new_stage->ready, NULL);
if (status != 0)
err_abort (status, "Init ready condition");
new_stage->data_ready = 0;
*link = new_stage;
link = &new_stage->next;
}
*link = (stage_t*)NULL; /* Terminate list */
pipe->tail = new_stage; /* Record the tail */
/*
* Create the threads for the pipe stages only after all
* the data is initialized (including all links). Note
* that the last stage doesn't get a thread, it's just
* a receptacle for the final pipeline value.
*
* At this point, proper cleanup on an error would take up
* more space than worthwhile in a "simple example", so
* instead of cancelling and detaching all the threads
* already created, plus the synchronization object and
* memory cleanup done for earlier errors, it will simply
* abort.
*/
for ( stage = pipe->head;
stage->next != NULL;
stage = stage->next) {
status = pthread_create (
&stage->thread, NULL, pipe_stage, (void*)stage);
if (status != 0)
err_abort (status, "Create pipe stage");
}
return 0;
}
/*
* External interface to start a pipeline by passing
* data to the first stage. The routine returns while
* the pipeline processes in parallel. Call the
* pipe_result return to collect the final stage values
* (note that the pipe will stall when each stage fills,
* until the result is collected).
*/
int pipe_start (pipe_t *pipe, long value)
{
int status;
status = pthread_mutex_lock (&pipe->mutex);
if (status != 0)
err_abort (status, "Lock pipe mutex");
pipe->active++;
status = pthread_mutex_unlock (&pipe->mutex);
if (status != 0)
err_abort (status, "Unlock pipe mutex");
pipe_send (pipe->head, value);
return 0;
}
/*
* Collect the result of the pipeline. Wait for a
* result if the pipeline hasn't produced one.
*/
int pipe_result (pipe_t *pipe, long *result)
{
stage_t *tail = pipe->tail;
long value;
int empty = 0;
int status;
status = pthread_mutex_lock (&pipe->mutex);
if (status != 0)
err_abort (status, "Lock pipe mutex");
if (pipe->active <= 0)
empty = 1;
else
pipe->active--;
status = pthread_mutex_unlock (&pipe->mutex);
if (status != 0)
err_abort (status, "Unlock pipe mutex");
if (empty)
return 0;
pthread_mutex_lock (&tail->mutex);
while (!tail->data_ready)
pthread_cond_wait (&tail->avail, &tail->mutex);
*result = tail->data;
tail->data_ready = 0;
pthread_cond_signal (&tail->ready);
pthread_mutex_unlock (&tail->mutex);
return 1;
}
/*
* The main program to "drive" the pipeline...
*/
int main (int argc, char *argv[])
{
pipe_t my_pipe;
long value, result;
int status;
char line[128];
pipe_create (&my_pipe, 10);
printf ("Enter integer values, or \"=\" for next result\n");
while (1) {
printf ("Data> ");
if (fgets (line, sizeof (line), stdin) == NULL) exit (0);
if (strlen (line) <= 1) continue;
if (strlen (line) <= 2 && line[0] == '=') {
if (pipe_result (&my_pipe, &result))
printf ("Result is %ld\n", result);
else
printf ("Pipe is empty\n");
} else {
if (sscanf (line, "%ld", &value) < 1)
fprintf (stderr, "Enter an integer value\n");
else
pipe_start (&my_pipe, value);
}
}
}