-
Notifications
You must be signed in to change notification settings - Fork 3
/
dscompress.cc
372 lines (349 loc) · 13.7 KB
/
dscompress.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
#include "dyscodistribution.h"
#include "dysconormalization.h"
#include "dyscostman.h"
#include "stmanmodifier.h"
#include "stochasticencoder.h"
#include "stopwatch.h"
#include "weightencoder.h"
#include <casacore/tables/Tables/ArrColDesc.h>
#include <casacore/tables/Tables/ArrayColumn.h>
#include <casacore/tables/Tables/ScalarColumn.h>
using namespace dyscostman;
template <typename T>
void createDyscoStManColumn(casacore::Table &ms, const std::string &name,
const casacore::IPosition &shape,
unsigned bitsPerComplex, unsigned bitsPerWeight,
Normalization normalization,
DyscoDistribution distribution, double studentsTNu,
double distributionTruncation, bool staticSeed) {
std::cout << "Constructing new column '" << name << "'...\n";
casacore::ArrayColumnDesc<T> columnDesc(name, "", "DyscoStMan", "DyscoStMan",
shape);
columnDesc.setOptions(casacore::ColumnDesc::Direct |
casacore::ColumnDesc::FixedShape);
std::cout << "Querying storage manager...\n";
bool isAlreadyUsed;
try {
ms.findDataManager("DyscoStMan");
isAlreadyUsed = true;
} catch (std::exception &e) {
std::cout << "Constructing storage manager...\n";
DyscoStMan dataManager(bitsPerComplex, bitsPerWeight);
switch (distribution) {
case GaussianDistribution:
dataManager.SetGaussianDistribution();
break;
case UniformDistribution:
dataManager.SetUniformDistribution();
break;
case StudentsTDistribution:
dataManager.SetStudentsTDistribution(studentsTNu);
break;
case TruncatedGaussianDistribution:
dataManager.SetTruncatedGaussianDistribution(distributionTruncation);
break;
}
dataManager.SetNormalization(normalization);
if (staticSeed) {
std::cout << "Setting static seed...\n";
dataManager.SetStaticSeed(true);
}
std::cout << "Adding column...\n";
ms.addColumn(columnDesc, dataManager);
isAlreadyUsed = false;
}
// We do not want to do this within the try-catch
if (isAlreadyUsed) {
std::cout << "Adding column with existing datamanager...\n";
ms.addColumn(columnDesc, "DyscoStMan", false);
}
}
/**
* Compress a given measurement set.
* @param argc Command line parameter count
* @param argv Command line parameters.
*/
int main(int argc, char *argv[]) {
register_dyscostman();
if (argc < 2) {
std::cout
<< "Usage: dscompress [options] [-column <name> [-column...]] <ms>\n"
"\n"
"This tool replaces one or multiple columns of a measurement set by "
"compressed columns, using the\n"
"Dysco compression storage manager. This tools is mainly aimed at "
"testing the technique.\n"
"Better efficiency can be achieved by integrating the Dysco storage "
"manager directly into\n"
"a preprocessing pipeline or correlator output.\n"
"\n"
"The Dysco compression technique is explained in "
"http://arxiv.org/abs/1609.02019.\n"
"\n"
"Options:\n"
"-rfnormalization / -afnormalization / -rownormalization\n"
"\tSelect normalization method. Default is AF normalization. For "
"high bitrates, RF normalization\n"
"\tis recommended. The use of row normalization is discouraged "
"because it can be unstable.\n"
"-data-bit-rate <n>\n"
"\tSets the number of bits per float for visibility data. Because a "
"visibility is a complex number,\n"
"\tthe total nr bits per visibility will be twice this number. The "
"compression rate is n/32.\n"
"-weight-bit-rate <n>\n"
"\tSets the number of bits per float for the data weights. The "
"storage manager will use a single\n"
"\tweight for all polarizations, hence with four polarizations the "
"compression of weight is\n"
"\t1/4 * n/32.\n"
"-reorder\n"
"\tWill rewrite the measurement set after replacing the column. "
"This makes sure that the space\n"
"\tof the old column is freed. It is for testing only, because the "
"compression error is applied\n"
"\ttwice to the data.\n"
"-uniform / -gaussian / -truncgaus <sigma> / -studentt\n"
"\tSelect the distribution used for the quantization of the data. "
"The truncated gaussian and\n"
"\tuniform distributions generally produce the most accurate "
"results. The default is truncgaus\n"
"\twith sigma=2.5, which is approximately optimal for bitrates "
"4-8.\n"
"-static-seed\n"
"\tInitialize the random number generator with a static seed. This "
"causes correlated\n"
"\tnoise between different measurement sets and should therefore "
"only be used for\n"
"\texperimentation.\n"
"\n"
"Defaults: \n"
"\tbits per data val = 8\n"
"\tbits per weight = 12\n"
"\tdistribution = TruncGaus with sigma=2.5\n"
"\tnormalization = AF\n";
return 0;
}
DyscoDistribution distribution = TruncatedGaussianDistribution;
Normalization normalization = Normalization::kAF;
bool reorder = false, doCheckMSFormat = true;
unsigned bitsPerFloat = 8, bitsPerWeight = 12;
double distributionTruncation = 2.5;
bool staticSeed = false;
std::vector<std::string> columnNames;
int argi = 1;
while (argi < argc && argv[argi][0] == '-') {
std::string p(argv[argi] + 1);
if (p == "data-bit-rate") {
++argi;
bitsPerFloat = atoi(argv[argi]);
} else if (p == "weight-bit-rate") {
++argi;
bitsPerWeight = atoi(argv[argi]);
} else if (p == "reorder") {
reorder = true;
} else if (p == "gaussian") {
distribution = GaussianDistribution;
} else if (p == "uniform") {
distribution = UniformDistribution;
} else if (p == "studentt") {
distribution = StudentsTDistribution;
} else if (p == "truncgaus") {
++argi;
distribution = TruncatedGaussianDistribution;
distributionTruncation = atof(argv[argi]);
} else if (p == "column") {
++argi;
columnNames.push_back(argv[argi]);
} else if (p == "rfnormalization") {
normalization = Normalization::kRF;
} else if (p == "afnormalization") {
normalization = Normalization::kAF;
} else if (p == "rownormalization") {
normalization = Normalization::kRow;
} else if (p == "static-seed") {
staticSeed = true;
} else
throw std::runtime_error(std::string("Invalid parameter: ") + argv[argi]);
++argi;
}
if (columnNames.empty()) columnNames.push_back("DATA");
std::string msPath = argv[argi];
std::cout << "\tbits per data val = " << bitsPerFloat
<< "\n"
"\tbits per weight = "
<< bitsPerWeight
<< "\n"
"\tdistribution = ";
switch (distribution) {
case UniformDistribution:
std::cout << "Uniform";
break;
case GaussianDistribution:
std::cout << "Gaussian";
break;
case TruncatedGaussianDistribution:
std::cout << "Truncated Gaussian with sigma=" << distributionTruncation;
break;
case StudentsTDistribution:
std::cout << "Student T";
break;
default:
std::cout << "?";
break;
}
std::cout << "\n"
"\tnormalization = ";
switch (normalization) {
case Normalization::kAF:
std::cout << "AF";
break;
case Normalization::kRF:
std::cout << "RF";
break;
case Normalization::kRow:
std::cout << "Row";
break;
default:
std::cout << "?";
break;
}
std::cout << "\n\n";
std::cout << "Opening ms...\n";
std::unique_ptr<casacore::Table> ms(
new casacore::Table(msPath, casacore::Table::Update));
Stopwatch watch(true);
std::cout << "Replacing flagged values by NaNs...\n";
for (const std::string &columnName : columnNames) {
if (columnName != "WEIGHT_SPECTRUM") {
casacore::ArrayColumn<std::complex<float>> dataCol(*ms, columnName);
casacore::ArrayColumn<bool> flagCol(*ms, "FLAG");
casacore::Array<std::complex<float>> dataArr(dataCol.shape(0));
casacore::Array<bool> flagArr(flagCol.shape(0));
for (size_t row = 0; row != ms->nrow(); ++row) {
dataCol.get(row, dataArr);
flagCol.get(row, flagArr);
casacore::Array<bool>::contiter f = flagArr.cbegin();
bool isChanged = false;
for (casacore::Array<std::complex<float>>::contiter d =
dataArr.cbegin();
d != dataArr.cend(); ++d) {
if (*f) {
*d = std::complex<float>(std::numeric_limits<float>::quiet_NaN(),
std::numeric_limits<float>::quiet_NaN());
isChanged = true;
}
++f;
}
if (isChanged) dataCol.put(row, dataArr);
}
}
}
std::cout << "Time taken: " << watch.ToString() << '\n';
if (doCheckMSFormat) {
std::cout << "Validating MS ordering...\n";
watch.Reset();
watch.Start();
casacore::ScalarColumn<int> antenna1Col(*ms, "ANTENNA1");
casacore::ScalarColumn<int> antenna2Col(*ms, "ANTENNA2");
casacore::ScalarColumn<int> fieldIdCol(*ms, "FIELD_ID");
casacore::ScalarColumn<int> dataDescIdCol(*ms, "DATA_DESC_ID");
casacore::ScalarColumn<double> timeCol(*ms, "TIME");
int lastFieldId = fieldIdCol(0), lastDataDescId = dataDescIdCol(0);
double lastTime = timeCol(0);
std::vector<std::pair<int, int>> antennasInBlock;
size_t blockOffset = 0, blockNumber = 0;
for (size_t row = 0; row != ms->nrow(); ++row) {
int antenna1 = antenna1Col(row), antenna2 = antenna2Col(row),
fieldId = fieldIdCol(row), dataDescId = dataDescIdCol(row);
double time = timeCol(row);
if (time != lastTime || fieldId != lastFieldId ||
dataDescId != lastDataDescId) {
if (blockOffset != antennasInBlock.size()) {
std::ostringstream msg;
msg << "This measurement set is not 'regular'; at table row " << row
<< ", timeblock index " << blockNumber << ", timeblock offset "
<< blockOffset << " the number of baselines in the timeblock ("
<< blockOffset
<< ") is not equal to the number of baselines in previous "
"timeblocks ("
<< antennasInBlock.size()
<< "). In other words, not all timesteps had the same baselines. "
"This is required to be able to compress with Dysco.";
throw std::runtime_error(msg.str());
}
blockOffset = 0;
++blockNumber;
}
if (blockNumber == 0) {
antennasInBlock.push_back(std::make_pair(antenna1, antenna2));
} else {
if (antennasInBlock[blockOffset].first != antenna1 ||
antennasInBlock[blockOffset].second != antenna2) {
std::string antstr;
if (antennasInBlock[blockOffset].first != antenna1)
antstr = "antenna1";
else
antstr = "antenna2";
std::ostringstream msg;
msg << "This measurement set is not 'regular'; at table row " << row
<< ", timeblock index " << blockNumber << ", timeblock offset "
<< blockOffset << " the index for " << antstr
<< " is not the same as for previous timesteps. In other words, "
"not all timesteps had the same baselines. This is required "
"to be able to compress with Dysco.";
throw std::runtime_error(msg.str());
}
}
++blockOffset;
lastTime = time;
lastDataDescId = dataDescId;
lastFieldId = fieldId;
}
if (blockOffset != antennasInBlock.size())
throw std::runtime_error(
"The final timeblock did not have the same number of timesteps as "
"the previous timeblocks. In other words, not all timesteps had the "
"same baselines. This is required to be able to compress with "
"Dysco.");
std::cout << "Time taken: " << watch.ToString() << '\n';
}
watch.Reset();
watch.Start();
StManModifier modifier(*ms);
casacore::IPosition shape;
bool isDataReplaced = false;
for (std::string columnName : columnNames) {
bool replaced;
if (columnName == "WEIGHT_SPECTRUM")
replaced = modifier.PrepareReplacingColumn<float>(
columnName, "DyscoStMan", bitsPerFloat, bitsPerWeight, shape);
else
replaced = modifier.PrepareReplacingColumn<casacore::Complex>(
columnName, "DyscoStMan", bitsPerFloat, bitsPerWeight, shape);
isDataReplaced = replaced || isDataReplaced;
}
if (isDataReplaced) {
for (std::string columnName : columnNames) {
if (columnName == "WEIGHT_SPECTRUM")
createDyscoStManColumn<float>(
*ms, columnName, shape, bitsPerFloat, bitsPerWeight, normalization,
distribution, 1.0, distributionTruncation, staticSeed);
else
createDyscoStManColumn<casacore::Complex>(
*ms, columnName, shape, bitsPerFloat, bitsPerWeight, normalization,
distribution, 1.0, distributionTruncation, staticSeed);
}
for (std::string columnName : columnNames) {
if (columnName == "WEIGHT_SPECTRUM")
modifier.MoveColumnData<float>(columnName);
else
modifier.MoveColumnData<casacore::Complex>(columnName);
}
if (reorder) {
StManModifier::Reorder(ms, msPath);
}
}
std::cout << "Finished. Compression time: " << watch.ToString() << "\n";
return 0;
}