Skip to content

Commit

Permalink
HAWQ-1616. Fix the wrong result of hash join when enable Bloom filter,
Browse files Browse the repository at this point in the history
           because the projection information of join keys hasn't been pushed down to parquet scan correctly.
  • Loading branch information
Wen Lin committed May 28, 2018
1 parent 3c69520 commit 2ba6e91
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 14 deletions.
9 changes: 2 additions & 7 deletions src/backend/cdb/cdbparquetrowgroup.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,13 @@ ParquetRowGroupReader_ScanNextTuple(
bool *nulls = slot_get_isnull(slot);

int colReaderIndex = 0;
int16 proj[natts];
for (int i = 0, j = 0; i < natts; i++)
for (int i = 0; i < natts; i++)
{
if (projs[i] == false)
{
nulls[i] = true;
continue;
}
proj[j] = i;
j++;
ParquetColumnReader *nextReader =
&rowGroupReader->columnReaders[colReaderIndex];
int hawqTypeID = tupDesc->attrs[i]->atttypid;
Expand Down Expand Up @@ -290,7 +287,6 @@ ParquetRowGroupReader_ScanNextTuple(
&& !rfState->stopRuntimeFilter)
{
Assert(rfState->bloomfilter != NULL);
rfState->bloomfilter->nTested++;
uint32_t hashkey = 0;
ListCell *hk;
int i = 0;
Expand All @@ -302,7 +298,7 @@ ParquetRowGroupReader_ScanNextTuple(

/* rotate hashkey left 1 bit at each step */
hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
keyval = values[proj[attrno - 1]];
keyval = values[attrno - 1];

/* Evaluate expression */
hkey = DatumGetUInt32(
Expand All @@ -315,7 +311,6 @@ ParquetRowGroupReader_ScanNextTuple(
{
continue;
}
rfState->bloomfilter->nMatched++;
}

/*construct tuple, and return back*/
Expand Down
8 changes: 5 additions & 3 deletions src/backend/executor/nodeHash.c
Original file line number Diff line number Diff line change
Expand Up @@ -1482,10 +1482,12 @@ ExecHashTableExplainEnd(PlanState *planstate, struct StringInfoData *buf)
}

/* Report Bloom filter statistics. */
if (hashtable->bloomfilter != NULL)
if (hjstate->js.ps.lefttree->type == T_TableScanState &&
((ScanState*)hjstate->js.ps.lefttree)->runtimeFilter != NULL &&
((ScanState*)hjstate->js.ps.lefttree)->runtimeFilter->bloomfilter != NULL)
{
BloomFilter bf = hashtable->bloomfilter;
appendStringInfo(buf,"Bloom filter, inner table row number:%d, "
BloomFilter bf = ((ScanState*)hjstate->js.ps.lefttree)->runtimeFilter->bloomfilter;
appendStringInfo(buf, "Bloom filter, inner table row number:%d, "
"outer table checked row number:%d, "
"outer table matched row number:%d, "
"outer table filtered row number:%d, filtered rate:%.3f",
Expand Down
25 changes: 21 additions & 4 deletions src/backend/executor/nodeHashjoin.c
Original file line number Diff line number Diff line change
Expand Up @@ -807,18 +807,35 @@ ExecEndHashJoin(HashJoinState *node)
* TODO: how to pass it across motion
*/
static RuntimeFilterState*
CreateRuntimeFilterState(HashJoinState *hjstate)
CreateRuntimeFilterState(HashJoinState *hjstate, ProjectionInfo* projInfo)
{
/* record projection info */
ListCell *hk;
int i = 0;
RuntimeFilterState* rf = (RuntimeFilterState*)palloc0(sizeof(RuntimeFilterState));
Assert(hjstate != NULL);

if (projInfo != NULL && !projInfo->pi_isVarList)
{
/* Create bloom filter for simple-Var-list case */
return NULL;
}

/* push down join key projection information */
RuntimeFilterState* rf = (RuntimeFilterState*)palloc0(sizeof(RuntimeFilterState));
foreach(hk, hjstate->hj_OuterHashKeys)
{
ExprState *keyexpr = (ExprState *) lfirst(hk);
Var *variable = (Var *) keyexpr->expr;
rf->joinkeys = lappend_int(rf->joinkeys, variable->varattno);
if (projInfo != NULL)
{
Assert(projInfo->pi_varNumbers != NULL);
rf->joinkeys = lappend_int(rf->joinkeys, projInfo->pi_varNumbers[variable->varattno-1]);
}
else
{
/* select * from ... */
rf->joinkeys = lappend_int(rf->joinkeys, variable->varattno);
}
i++;
}
rf->hashfunctions = (FmgrInfo *) palloc(i * sizeof(FmgrInfo));
Expand Down Expand Up @@ -862,7 +879,7 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
((ScanState*)outerNode)->runtimeFilter == NULL)
{
Assert(hashtable->bloomfilter->isCreated);
((ScanState*)outerNode)->runtimeFilter = CreateRuntimeFilterState(hjstate);
((ScanState*) outerNode)->runtimeFilter = CreateRuntimeFilterState(hjstate, ((ScanState*) outerNode)->ps.ps_ProjInfo);
}
RuntimeFilterState* rf = ((ScanState*)outerNode)->runtimeFilter;

Expand Down
2 changes: 2 additions & 0 deletions src/backend/utils/hash/bloomfilter.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void InsertBloomFilter(BloomFilter bf, uint32_t value)
*/
bool FindBloomFilter(BloomFilter bf, uint32_t value)
{
bf->nTested++;
uint32_t bucket_idx = getBucketIdx(value, bf->data_mask);
for (int i = 0; i < NUM_BUCKET_WORDS; ++i)
{
Expand All @@ -100,6 +101,7 @@ bool FindBloomFilter(BloomFilter bf, uint32_t value)
return false;
}
}
bf->nMatched++;
return true;
}

Expand Down
4 changes: 4 additions & 0 deletions src/test/feature/query/test_hashjoin_bloomfilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ TEST_F(TestHashJoinBloomFilter, BasicTest)
util.query("select * from dim;", 10);
util.query("select * from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7);
util.query("set hawq_hashjoin_bloomfilter=true; select * from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7);
util.query("set hawq_hashjoin_bloomfilter=true; select fact.c1 from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7);
util.query("set hawq_hashjoin_bloomfilter=true; select fact.c1, dim.c1 from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7);
util.query("set hawq_hashjoin_bloomfilter=true; select fact.c1, dim.c1, dim.c2 from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7);
util.query("set hawq_hashjoin_bloomfilter=true; select dim.c1, dim.c2 from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7);
util.execute("set hawq_hashjoin_bloomfilter=true; explain analyze select * from fact, dim where fact.c1 = dim.c1 and dim.c2<4");
util.execute("drop table dim;");
util.execute("drop table fact;");
Expand Down

0 comments on commit 2ba6e91

Please sign in to comment.