-
Notifications
You must be signed in to change notification settings - Fork 0
/
Pipe.cc
148 lines (104 loc) · 3.26 KB
/
Pipe.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#include "Pipe.h"
#include <iostream>
#include <stdlib.h>
Pipe :: Pipe (int bufferSize) {
// set up the mutex assoicated with the pipe
pthread_mutex_init (&pipeMutex, NULL);
// set up the condition variables associated with the pipe
pthread_cond_init (&producerVar, NULL);
pthread_cond_init (&consumerVar, NULL);
// set up the pipe's buffer
buffered = new (std::nothrow) Record[bufferSize];
if (buffered == NULL)
{
cout << "ERROR : Not enough memory. EXIT !!!\n";
exit(1);
}
totSpace = bufferSize;
firstSlot = lastSlot = 0;
// note that the pipe has not yet been turned off
done = 0;
}
Pipe :: ~Pipe () {
// free everything up!
// delete [] buffered;
pthread_mutex_destroy (&pipeMutex);
pthread_cond_destroy (&producerVar);
pthread_cond_destroy (&consumerVar);
}
void Pipe :: Insert (Record *insertMe) {
// first, get a mutex on the pipeline
pthread_mutex_lock (&pipeMutex);
// next, see if there is space in the pipe for more data; if
// there is, then do the insertion
if (lastSlot - firstSlot < totSpace) {
buffered [lastSlot % totSpace].Consume (insertMe);
// if there is not, then we need to wait until the consumer
// frees up some space in the pipeline
} else {
pthread_cond_wait (&producerVar, &pipeMutex);
buffered [lastSlot % totSpace].Consume (insertMe);
}
// note that we have added a new record
lastSlot++;
// signal the consumer who might now want to suck up the new
// record that has been added to the pipeline
pthread_cond_signal (&consumerVar);
// done!
pthread_mutex_unlock (&pipeMutex);
}
int Pipe :: Remove (Record *removeMe) {
// cout << "Removing Record from pipe" << endl;
// first, get a mutex on the pipeline
pthread_mutex_lock (&pipeMutex);
// next, see if there is anything in the pipeline; if
// there is, then do the removal
if (lastSlot != firstSlot) {
// cout << "Pipe not empty" << endl;
removeMe->Consume (&buffered [firstSlot % totSpace]);
// if there is not, then we need to wait until the producer
// puts some data into the pipeline
} else {
// cout << "Pipe empty not done" << endl;
// the pipeline is empty so we first see if this
// is because it was turned off
if (done) {
pthread_mutex_unlock (&pipeMutex);
return 0;
}
// wait until there is something there
pthread_cond_wait (&consumerVar, &pipeMutex);
// since the producer may have decided to turn off
// the pipe, we need to check if it is still open
if (done && lastSlot == firstSlot) {
pthread_mutex_unlock (&pipeMutex);
return 0;
}
removeMe->Consume (&buffered [firstSlot % totSpace]);
}
// note that we have deleted a record
firstSlot++;
// signal the producer who might now want to take the slot
// that has been freed up by the deletion
pthread_cond_signal (&producerVar);
// done!
pthread_mutex_unlock (&pipeMutex);
return 1;
}
void Pipe :: ShutDown () {
// first, get a mutex on the pipeline
pthread_mutex_lock (&pipeMutex);
// note that we are now done with the pipeline
done = 1;
// signal the consumer who may be waiting
pthread_cond_signal (&consumerVar);
// unlock the mutex
pthread_mutex_unlock (&pipeMutex);
}
// returns true if pipe is done
bool Pipe :: isDone(){
return done == 1;
}
bool Pipe:: isFull(){
return (lastSlot - firstSlot >= totSpace);
}