Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 164d7d5

Browse files
committed
[Join] InitHashTable optimisation
This commit reworks `fill_hash_join_buff_bucketized_cpu` to use tbb and utilize cpu properly. Partially resolves: #574 Signed-off-by: Dmitrii Makarenko <[email protected]>
1 parent 6fd1eec commit 164d7d5

File tree

3 files changed

+302
-46
lines changed

3 files changed

+302
-46
lines changed

Diff for: omniscidb/QueryEngine/JoinHashTable/Builders/PerfectHashTableBuilder.h

+26-46
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,6 @@ class PerfectJoinHashTableBuilder {
166166
0);
167167

168168
auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
169-
const int thread_count = cpu_threads();
170-
std::vector<std::thread> init_cpu_buff_threads;
171169

172170
{
173171
auto timer_init = DEBUG_TIMER("CPU One-To-One Perfect-Hash: init_hash_join_buff");
@@ -176,54 +174,36 @@ class PerfectJoinHashTableBuilder {
176174
hash_join_invalid_val);
177175
}
178176
const bool for_semi_join = for_semi_anti_join(join_type);
179-
std::atomic<int> err{0};
180177
{
181178
auto timer_fill =
182-
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized");
183-
for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
184-
init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
185-
&join_column,
186-
str_proxy_translation_map,
187-
thread_idx,
188-
thread_count,
189-
type,
190-
&err,
191-
&col_range,
192-
&is_bitwise_eq,
193-
&for_semi_join,
194-
cpu_hash_table_buff,
195-
hash_entry_info] {
196-
int partial_err = fill_hash_join_buff_bucketized(
197-
cpu_hash_table_buff,
198-
hash_join_invalid_val,
199-
for_semi_join,
200-
join_column,
201-
{static_cast<size_t>(type->size()),
202-
col_range.getIntMin(),
203-
col_range.getIntMax(),
204-
inline_fixed_encoding_null_value(type),
205-
is_bitwise_eq,
206-
col_range.getIntMax() + 1,
207-
get_join_column_type_kind(type)},
208-
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
209-
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
210-
: 0, // 0 is dummy value
211-
thread_idx,
212-
thread_count,
213-
hash_entry_info.bucket_normalization);
214-
int zero{0};
215-
err.compare_exchange_strong(zero, partial_err);
216-
});
217-
}
218-
for (auto& t : init_cpu_buff_threads) {
219-
t.join();
179+
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized_cpu");
180+
181+
{
182+
JoinColumnTypeInfo type_info{static_cast<size_t>(type->size()),
183+
col_range.getIntMin(),
184+
col_range.getIntMax(),
185+
inline_fixed_encoding_null_value(type),
186+
is_bitwise_eq,
187+
col_range.getIntMax() + 1,
188+
get_join_column_type_kind(type)};
189+
190+
int error = fill_hash_join_buff_bucketized_cpu(
191+
cpu_hash_table_buff,
192+
hash_join_invalid_val,
193+
for_semi_join,
194+
join_column,
195+
type_info,
196+
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
197+
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
198+
: 0, // 0 is dummy value
199+
hash_entry_info.bucket_normalization);
200+
if (error) {
201+
// Too many hash entries, need to retry with a 1:many table
202+
hash_table_ = nullptr; // clear the hash table buffer
203+
throw NeedsOneToManyHash();
204+
}
220205
}
221206
}
222-
if (err) {
223-
// Too many hash entries, need to retry with a 1:many table
224-
hash_table_ = nullptr; // clear the hash table buffer
225-
throw NeedsOneToManyHash();
226-
}
227207
}
228208

229209
void initOneToManyHashTableOnCpu(

Diff for: omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.cpp

+264
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#endif
3939

4040
#include <tbb/parallel_for.h>
41+
#include <tbb/parallel_reduce.h>
4142

4243
#include <future>
4344
#endif
@@ -263,9 +264,11 @@ DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
263264
#endif
264265
JoinColumnTyped col{&join_column, &type_info};
265266
for (auto item : col.slice(start, step)) {
267+
// LOG(ERROR) << "items: " << item.index;
266268
const size_t index = item.index;
267269
int64_t elem = item.element;
268270
if (elem == type_info.null_val) {
271+
// LOG(ERROR) << "null val";
269272
if (type_info.uses_bw_eq) {
270273
elem = type_info.translated_null_val;
271274
} else {
@@ -323,6 +326,267 @@ DEVICE int SUFFIX(fill_hash_join_buff_bucketized)(
323326
hashtable_filling_func);
324327
}
325328

329+
#ifndef __CUDACC__
330+
331+
namespace {
332+
333+
template <ColumnType T>
334+
inline int64_t getElem(const int8_t* chunk_mem_ptr, size_t elem_size, size_t elem_ind) {
335+
UNREACHABLE();
336+
return 0;
337+
};
338+
339+
template <>
340+
inline int64_t getElem<ColumnType::SmallDate>(const int8_t* chunk_mem_ptr,
341+
size_t elem_size,
342+
size_t elem_ind) {
343+
return fixed_width_small_date_decode_noinline(chunk_mem_ptr,
344+
elem_size,
345+
elem_size == 4 ? NULL_INT : NULL_SMALLINT,
346+
elem_size == 4 ? NULL_INT : NULL_SMALLINT,
347+
elem_ind);
348+
}
349+
350+
template <>
351+
inline int64_t getElem<ColumnType::Signed>(const int8_t* chunk_mem_ptr,
352+
size_t elem_size,
353+
size_t elem_ind) {
354+
return fixed_width_int_decode_noinline(chunk_mem_ptr, elem_size, elem_ind);
355+
}
356+
357+
template <>
358+
inline int64_t getElem<ColumnType::Unsigned>(const int8_t* chunk_mem_ptr,
359+
size_t elem_size,
360+
size_t elem_ind) {
361+
return fixed_width_unsigned_decode_noinline(chunk_mem_ptr, elem_size, elem_ind);
362+
}
363+
364+
template <>
365+
inline int64_t getElem<ColumnType::Double>(const int8_t* chunk_mem_ptr,
366+
size_t elem_size,
367+
size_t elem_ind) {
368+
return fixed_width_double_decode_noinline(chunk_mem_ptr, elem_ind);
369+
}
370+
371+
template <ColumnType T, size_t Elem>
372+
inline int64_t getElem(const int8_t* chunk_mem_ptr, size_t elem_ind) {
373+
return getElem<T>(chunk_mem_ptr, Elem, elem_ind);
374+
}
375+
376+
template <typename HASHTABLE_FILLING_FUNC, ColumnType T, size_t Elem>
377+
inline int raw_func_impl(const tbb::blocked_range<size_t>& elems_range,
378+
const int8_t* chunk_mem_ptr,
379+
size_t curr_chunk_row_offset,
380+
const JoinColumnTypeInfo& type_info,
381+
const int32_t* sd_inner_to_outer_translation_map,
382+
const int32_t min_inner_elem,
383+
HASHTABLE_FILLING_FUNC hashtable_filling_func) {
384+
// DEBUG_TIMER("fill_hash_join_buff_bucketized_cpu raw_func");
385+
// INJECT_TIMER(raw_func);
386+
// LOG(ERROR) << " num_elems threaded: " << elems_range.size();
387+
for (size_t elem_i = elems_range.begin(); elem_i != elems_range.end(); elem_i++) {
388+
int64_t elem = getElem<T, Elem>(chunk_mem_ptr, elem_i);
389+
390+
if (elem == type_info.null_val) {
391+
if (!type_info.uses_bw_eq) {
392+
continue;
393+
}
394+
elem = type_info.translated_null_val;
395+
}
396+
397+
if (sd_inner_to_outer_translation_map &&
398+
(!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
399+
const auto outer_id = map_str_id_to_outer_dict(elem,
400+
min_inner_elem,
401+
type_info.min_val,
402+
type_info.max_val,
403+
sd_inner_to_outer_translation_map);
404+
if (outer_id == StringDictionary::INVALID_STR_ID) {
405+
continue;
406+
}
407+
elem = outer_id;
408+
}
409+
410+
if (hashtable_filling_func(elem, curr_chunk_row_offset + elem_i)) {
411+
return -1;
412+
}
413+
}
414+
return 0;
415+
}
416+
417+
template <typename HASHTABLE_FILLING_FUNC, ColumnType T>
418+
inline int raw_func(const tbb::blocked_range<size_t>& elems_range,
419+
const int8_t* chunk_mem_ptr,
420+
size_t curr_chunk_row_offset,
421+
const JoinColumnTypeInfo& type_info,
422+
const int32_t* sd_inner_to_outer_translation_map,
423+
const int32_t min_inner_elem,
424+
HASHTABLE_FILLING_FUNC hashtable_filling_func) {
425+
switch (type_info.elem_sz) {
426+
case 1:
427+
return raw_func_impl<HASHTABLE_FILLING_FUNC, T, 1>(
428+
elems_range,
429+
chunk_mem_ptr,
430+
curr_chunk_row_offset,
431+
type_info,
432+
sd_inner_to_outer_translation_map,
433+
min_inner_elem,
434+
hashtable_filling_func);
435+
case 2:
436+
return raw_func_impl<HASHTABLE_FILLING_FUNC, T, 2>(
437+
elems_range,
438+
chunk_mem_ptr,
439+
curr_chunk_row_offset,
440+
type_info,
441+
sd_inner_to_outer_translation_map,
442+
min_inner_elem,
443+
hashtable_filling_func);
444+
case 4:
445+
return raw_func_impl<HASHTABLE_FILLING_FUNC, T, 4>(
446+
elems_range,
447+
chunk_mem_ptr,
448+
curr_chunk_row_offset,
449+
type_info,
450+
sd_inner_to_outer_translation_map,
451+
min_inner_elem,
452+
hashtable_filling_func);
453+
case 8:
454+
return raw_func_impl<HASHTABLE_FILLING_FUNC, T, 8>(
455+
elems_range,
456+
chunk_mem_ptr,
457+
curr_chunk_row_offset,
458+
type_info,
459+
sd_inner_to_outer_translation_map,
460+
min_inner_elem,
461+
hashtable_filling_func);
462+
default:
463+
break;
464+
}
465+
UNREACHABLE();
466+
return 0;
467+
}
468+
469+
template <typename HASHTABLE_FILLING_FUNC>
470+
inline int raw_func(const tbb::blocked_range<size_t>& elems_range,
471+
const int8_t* chunk_mem_ptr,
472+
size_t curr_chunk_row_offset,
473+
const JoinColumnTypeInfo& type_info,
474+
const int32_t* sd_inner_to_outer_translation_map,
475+
const int32_t min_inner_elem,
476+
HASHTABLE_FILLING_FUNC hashtable_filling_func) {
477+
switch (type_info.column_type) {
478+
case SmallDate:
479+
return raw_func<HASHTABLE_FILLING_FUNC, SmallDate>(
480+
elems_range,
481+
chunk_mem_ptr,
482+
curr_chunk_row_offset,
483+
type_info,
484+
sd_inner_to_outer_translation_map,
485+
min_inner_elem,
486+
hashtable_filling_func);
487+
case Signed:
488+
return raw_func<HASHTABLE_FILLING_FUNC, Signed>(elems_range,
489+
chunk_mem_ptr,
490+
curr_chunk_row_offset,
491+
type_info,
492+
sd_inner_to_outer_translation_map,
493+
min_inner_elem,
494+
hashtable_filling_func);
495+
case Unsigned:
496+
return raw_func<HASHTABLE_FILLING_FUNC, Unsigned>(elems_range,
497+
chunk_mem_ptr,
498+
curr_chunk_row_offset,
499+
type_info,
500+
sd_inner_to_outer_translation_map,
501+
min_inner_elem,
502+
hashtable_filling_func);
503+
case Double:
504+
return raw_func<HASHTABLE_FILLING_FUNC, Double>(elems_range,
505+
chunk_mem_ptr,
506+
curr_chunk_row_offset,
507+
type_info,
508+
sd_inner_to_outer_translation_map,
509+
min_inner_elem,
510+
hashtable_filling_func);
511+
default:
512+
break;
513+
}
514+
UNREACHABLE();
515+
return 0;
516+
}
517+
518+
} // namespace
519+
520+
DEVICE int SUFFIX(fill_hash_join_buff_bucketized_cpu)(
521+
int32_t* cpu_hash_table_buff,
522+
const int32_t hash_join_invalid_val,
523+
const bool for_semi_join,
524+
const JoinColumn& join_column,
525+
const JoinColumnTypeInfo& type_info,
526+
const int32_t* sd_inner_to_outer_translation_map,
527+
const int32_t min_inner_elem,
528+
const int64_t bucket_normalization) {
529+
auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
530+
: SUFFIX(fill_one_to_one_hashtable);
531+
auto hashtable_filling_func = [&](int64_t elem, size_t index) {
532+
auto entry_ptr = SUFFIX(get_bucketized_hash_slot)(
533+
cpu_hash_table_buff, elem, type_info.min_val, bucket_normalization);
534+
return filling_func(index, entry_ptr, hash_join_invalid_val);
535+
};
536+
537+
// for some stupid reason int8* ptr is actually JoinChunk* Why?
538+
auto join_chunk_array =
539+
reinterpret_cast<const struct JoinChunk*>(join_column.col_chunks_buff);
540+
// BTW it's vector with sz:
541+
// join_column.num_chunks
542+
// const int8_t* chunk_mem_ptr = join_chunk_array->col_buff;
543+
544+
// wtf 1 chunk, but 0 elements.
545+
if (join_column.num_elems == 0) {
546+
return 0;
547+
}
548+
549+
// This value is tuned to make range of elemnts
550+
// handled in each thread spend about 10ms according to timers.
551+
size_t data_to_handle_sz = 512 * 1024;
552+
size_t granularity = data_to_handle_sz / type_info.elem_sz;
553+
554+
std::atomic<int> err{0};
555+
// LOG(ERROR) << "Num chunks: " << join_column.num_chunks;
556+
tbb::parallel_for(
557+
tbb::blocked_range<size_t>(0, join_column.num_chunks),
558+
[&](const tbb::blocked_range<size_t>& join_chunks_range) {
559+
DEBUG_TIMER("fill_hash_join_buff_bucketized_cpu chunk");
560+
for (size_t chunk_i = join_chunks_range.begin();
561+
chunk_i != join_chunks_range.end();
562+
chunk_i++) {
563+
auto curr_chunk = join_chunk_array[chunk_i];
564+
// LOG(ERROR) << " num elems: " << curr_chunk.num_elems;
565+
566+
tbb::parallel_for(
567+
tbb::blocked_range<size_t>(0, curr_chunk.num_elems, granularity),
568+
[&](const tbb::blocked_range<size_t>& curr_chnunk_elems_range) {
569+
auto ret = raw_func(curr_chnunk_elems_range,
570+
curr_chunk.col_buff,
571+
curr_chunk.row_id,
572+
type_info,
573+
sd_inner_to_outer_translation_map,
574+
min_inner_elem,
575+
hashtable_filling_func);
576+
if (ret != 0) {
577+
int zero{0};
578+
err.compare_exchange_strong(zero, ret);
579+
}
580+
});
581+
}
582+
});
583+
if (err) {
584+
return -1;
585+
}
586+
return 0;
587+
}
588+
#endif
589+
326590
DEVICE int SUFFIX(fill_hash_join_buff)(int32_t* buff,
327591
const int32_t invalid_slot_val,
328592
const bool for_semi_join,

0 commit comments

Comments
 (0)