-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpzip.cpp
344 lines (296 loc) · 6.88 KB
/
pzip.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#include <iostream>
#include <fstream>
#include <string>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <queue>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/sysinfo.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <mutex>
#include <vector>
#include <unistd.h>
using namespace std;
// Forward delcare functions
vector<pthread_t> startThreadPool(int num_threads);
void stopThreadPool(vector<pthread_t> tids);
void *job_runner(void *);
void addJob(int num, bool kill, char *filepath, sem_t *prev_sem, sem_t *next_sem);
// Information to be passed to the job runners (child thread)
struct job_t
{
int num; // The job's ID
bool kill; // If true, the child process will gracefully exit (ignoring all other fields and jobs)
char *filepath;
sem_t *prev_sem;
sem_t *next_sem;
};
// Data structure for holding the results of a memory mapped file
struct mem_map_t
{
bool success;
char *mmap;
off_t f_size;
};
// A queue of jobs that the thread pool will run
queue<job_t> jobs;
// Mutex Lock for critical sections (when using shared queue)
mutex mtx;
// Semaphore to block runners when there are no files in the queue
sem_t full;
int main(int argc, char *argv[])
{
// Validate that at least one filepath was given
if (argc <= 1)
{
cout << "pzip: file1 [file2 ...]" << endl;
exit(1);
}
// Initalize semaphore
sem_init(&full, 0, 0);
// Get the number of processors available. Min() with number of files to
// prevent creating more threads than there are files. Default to at least 5
// threads (as per the instructions).
int num_threads = max(min(argc - 1, get_nprocs()), 5);
// Create the thread pool
vector<pthread_t> tids = startThreadPool(num_threads);
// Array to hold semaphores
vector<sem_t *> sems;
// Variables to hold the current working prev/next semaphores
sem_t *prev_sem = NULL;
sem_t *next_sem = NULL;
// Loop through the filepaths provided in argv
for (int i = 1; i < argc; i++)
{
// If single file, no need for semaphore
if (i == 1 && argc == 2)
{
addJob(i, false, argv[i], NULL, NULL);
continue;
}
// Create new semaphore and intialize it
next_sem = new sem_t;
sem_init(next_sem, 0, 0);
sems.push_back(next_sem);
// First file
if (i == 1)
{
// No previous sem
addJob(i, false, argv[i], NULL, next_sem);
}
// Last file
else if (i == argc - 1)
{
// No next sem
addJob(i, false, argv[i], prev_sem, NULL);
}
// Middle file
else
{
addJob(i, false, argv[i], prev_sem, next_sem);
}
prev_sem = next_sem;
}
// Gracefully end threads after they finish the job queue
stopThreadPool(tids);
// Destroy semaphore
for (size_t i = 0; i < sems.size(); i++)
{
sem_destroy(sems[i]);
}
}
mem_map_t mmapFile(const char *filepath)
{
try
{
mem_map_t map;
int fd = open(filepath, O_RDONLY, S_IRUSR | S_IWUSR);
if (fd < 0)
{
map.success = false;
map.mmap = NULL;
map.f_size = 0;
return map;
}
// Get size of file
struct stat sb;
fstat(fd, &sb);
// Mapping file into virtual memory
char *mmapFile = (char *)mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
if (mmapFile == MAP_FAILED)
{
exit(1);
}
// Set up map results to be returned
map.success = true;
map.mmap = mmapFile;
map.f_size = sb.st_size;
return map;
}
catch (...)
{
// Failed to memory map file
mem_map_t map;
map.success = false;
return map;
}
}
// This function will add kill requests to the job queue and return once all
// threads have quit.
void stopThreadPool(vector<pthread_t> tids)
{
// Add kill request jobs
for (size_t i = 0; i < tids.size(); i++)
{
addJob(-1, true, NULL, NULL, NULL);
}
// Wait for all threads to finish
for (size_t i = 0; i < tids.size(); i++)
{
if (pthread_join(tids[i], (void **)NULL) != 0)
{
cout << "Filed to join thread." << endl;
exit(1);
}
}
}
// Create the thread pool
vector<pthread_t> startThreadPool(int num_threads)
{
vector<pthread_t> tids;
// Thread creation retry logic to make this program more robust
int retry = 0;
for (int i = 0; i < num_threads; i++)
{
pthread_t tid;
if (pthread_create(&tid, NULL, job_runner, NULL) != 0)
{
if (retry < num_threads)
{
// Going to retry to create this thread
retry++;
i--;
}
else
{
cout << "Failed to create thread" << endl;
exit(1);
}
}
else
{
tids.push_back(tid);
}
}
return tids;
}
void *job_runner(void *)
{
// This function serves as a job runner is in the thread pool, so we run until
// explicitly terminated.
while (1)
{
// Wait until there is at least one job in the queue
sem_wait(&full);
job_t job;
// Aquire lock for queue
mtx.lock();
// Get the next job
job = jobs.front();
jobs.pop();
// Release the lock
mtx.unlock();
// Check if the job is a kill request
if (job.kill)
{
pthread_exit(0);
}
// MEMORY MAP FILE
mem_map_t map = mmapFile(job.filepath);
if (!map.success)
{
// Wait and post semaphores then go back to thread pool
if (job.prev_sem)
{
sem_wait(job.prev_sem);
}
if (job.next_sem)
{
sem_post(job.next_sem);
}
// continue, don't return. Otherwise, this thread will leave the pool.
continue;
}
// Create buffer to hold compressed data
char *buff = (char *)malloc(5 * map.f_size);
int buffIndex = 0;
// Process the job (compress the file)
int count = 0;
char last;
for (off_t i = 0; i < map.f_size; i++)
{
// Compress the last "count" equivalent characters
if (count && map.mmap[i] != last)
{
buff[buffIndex++] = count & 0xff;
buff[buffIndex++] = (count >> 8) & 0xff;
buff[buffIndex++] = (count >> 16) & 0xff;
buff[buffIndex++] = (count >> 24) & 0xff;
buff[buffIndex++] = last;
count = 0;
}
last = map.mmap[i];
count++;
}
if (count)
{
buff[buffIndex++] = count & 0xff;
buff[buffIndex++] = (count >> 8) & 0xff;
buff[buffIndex++] = (count >> 16) & 0xff;
buff[buffIndex++] = (count >> 24) & 0xff;
buff[buffIndex++] = last;
count = 0;
}
// Wait for the previous job to finish printing
if (job.prev_sem != NULL)
{
sem_wait(job.prev_sem);
}
// Write the buffer to stdout
fwrite(buff, sizeof(char), (size_t)buffIndex, stdout);
// Signal the next thread
if (job.next_sem != NULL)
{
sem_post(job.next_sem);
}
// Deallocating memory for mmap
if (munmap(map.mmap, map.f_size) < 0)
{
cout << "munmap fail" << endl;
exit(1);
}
// DO NOT RETURN, otherwise, this thread will leave the thread pool
}
}
// Creates and adds a job to the job queue
void addJob(int num, bool kill, char *filepath, sem_t *prev_sem, sem_t *next_sem)
{
// Create struct instance
job_t job;
job.num = num;
job.kill = kill;
job.filepath = filepath;
job.prev_sem = prev_sem;
job.next_sem = next_sem;
// Aquire lock for queue
mtx.lock();
// Add the new job to the queue
jobs.push(job);
// Release lock
mtx.unlock();
// Make job runnable by posting to semaphore
sem_post(&full);
}