-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathParallelAggregate.cpp
205 lines (158 loc) · 5.48 KB
/
ParallelAggregate.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
/**
* @file ArraySum.cpp
*
* @brief This file implements an algorithm for a parallel aggregator.
*
* @author Shrey Tiwari
* Contact: [email protected]
*/
/*
- Standard Linear Tranversal Aggregate:
- Time Complexity = O(N)
- Space Complexity = O(1)
- Parallel Multiple Linear Traversal Aggregate:
- Time Complexity = O(N/K) + O(K)
- Space Complexity = O(K)
- Where 'K' is the factor of parallelization
COMMANDS:
- g++ ParallelAggregate.cpp -std=c++11 -pthread
- ./a.out
NOTE:
1. Currently the program assumes equal chunk sizes for all partitions. Hence, 'SIZE' should be divisible by 'CORES'.
2. Time Measurements will work on Linux only.
*/
// Importing the required header files
#include <iostream>
#include <time.h>
#include <unistd.h>
#include <vector>
#include <thread>
// Defining the input size for the array
#define SIZE 1024
// Defining total number of CPU cores
#define CORES 16
// Defining the range for array elements
#define LOWER 0
#define UPPER 10
using namespace std;
// Struct to hold aggregation results
typedef struct result
{
int x;
int y;
} Result;
// Helper Function to help measure the execution time (works only on Ubuntu)
double time_elapsed(struct timespec *start, struct timespec *end)
{
double t;
t = (end->tv_sec - start->tv_sec); // diff in seconds
t += (end->tv_nsec - start->tv_nsec) * 0.000000001; //diff in nanoseconds
return t;
}
// Helper Function to compare the results from two approaches
int is_equal(Result a, Result b)
{
if((a.x == b.x) && (a.y == b.y))
return 1;
return 0;
}
// Helper Function to randomly fill in the array with intergers in specified range
void fill_array_random(short int *array)
{
srand(time(0));
// printf("The array is: \n")
for(int i = 0; i < SIZE; i++)
{
array[i] = (rand() % (UPPER - LOWER + 1)) + LOWER;
// printf("%d ", array[i]);
}
// printf("\n");
}
// Function to calculate the array aggregates in a linear way
Result linear_aggregate(short int *array, int start, int end)
{
int x = 0, y = 0;
// printf("\nI X Y\n");
for(int i = start; i <= end; i++)
{
x += array[i];
y += x;
// printf("%d %d %d\n", i, x, y);
}
// printf("\n");
Result linear_result = {x, y};
return linear_result;
}
// Function executed by each thread
void thread_sum(short int *array, int start, int end, Result *res, int index)
{
res[index] = linear_aggregate(array, start, end);
}
// Function to calculate the array aggregates parallely
Result parallel_aggregate(short int *array)
{
// Meta data for the threads to execute
Result res[CORES];
int chunk_size = SIZE/CORES;
// Launch the parallel threads
vector<thread> threads;
for(int i = 0; i < CORES; i++)
threads.push_back(thread(thread_sum, array, (i * chunk_size), ((i+1) * chunk_size - 1), res, i));
for (auto &th : threads)
th.join();
// Linearly sum the resulting array
int x = 0, y = 0;
for(int i = 0; i < CORES; i++)
{
y += res[i].y + (x*chunk_size); // Adding the constant offset of x to y (from previous partitions)
x += res[i].x;
}
Result parallel_result = {x, y};
return parallel_result;
}
// Code execution begins here
int main ()
{
// Object of timespec to calculate execution time
struct timespec l_start, l_end, p_start, p_end;
short int arr[SIZE];
fill_array_random(arr);
int start = 0, end = SIZE - 1;
clock_gettime(CLOCK_REALTIME, &l_start);
Result linear_result1 = linear_aggregate(arr, start, end);
Result linear_result2 = linear_aggregate(arr, start, end);
Result linear_result3 = linear_aggregate(arr, start, end);
clock_gettime(CLOCK_REALTIME, &l_end);
// Very varied results if tested back to back. Could be due to CPU spikes.
// Adding cool down period results in much more consistency.
usleep(1000000);
clock_gettime(CLOCK_REALTIME, &p_start);
Result parallel_result1 = parallel_aggregate(arr);
Result parallel_result2 = parallel_aggregate(arr);
Result parallel_result3 = parallel_aggregate(arr);
clock_gettime(CLOCK_REALTIME, &p_end);
printf("-------------------------------------------------------------------------------\n");
if (!is_equal(linear_result1, linear_result2) || !is_equal(linear_result2, linear_result3))
{
printf("ERROR: The computed totals did not match (Linear Computation Case).\n");
}
if (!is_equal(parallel_result1, parallel_result2) || !is_equal(parallel_result2, parallel_result3))
{
printf("ERROR: The computed totals did not match (Parallel Computation Case).\n");
}
if (!is_equal(linear_result1, parallel_result1))
{
printf("ERROR: The computed totals did not match.\n");
}
else
{
printf("The array size is: %d\nThe value of K is: %d\nThe range is: %d to %d\n", SIZE, CORES, LOWER, UPPER);
printf("The array sum (X) is: %d\n", parallel_result1.x);
printf("The array aggregate (Y) is: %d\n", parallel_result1.y);
// We are averaging the time over the three runs
printf("Time taken using linear approach: %lf microseconds\n", (time_elapsed(&l_start, &l_end) * 1e6)/3);
printf("Time taken using parallel approach: %lf microseconds\n", (time_elapsed(&p_start, &p_end) * 1e6)/3);
}
printf("-------------------------------------------------------------------------------\n");
return 0;
}