From 2ba6e91035c291522440a4a55deef84ed97e489c Mon Sep 17 00:00:00 2001 From: Wen Lin Date: Mon, 28 May 2018 09:40:44 +0800 Subject: [PATCH] HAWQ-1616. Fix the wrong result of hash join when enable Bloom filter, because the projection information of join keys hasn't been pushed down to parquet scan correctly. --- src/backend/cdb/cdbparquetrowgroup.c | 9 ++----- src/backend/executor/nodeHash.c | 8 +++--- src/backend/executor/nodeHashjoin.c | 25 ++++++++++++++++--- src/backend/utils/hash/bloomfilter.c | 2 ++ .../query/test_hashjoin_bloomfilter.cpp | 4 +++ 5 files changed, 34 insertions(+), 14 deletions(-) diff --git a/src/backend/cdb/cdbparquetrowgroup.c b/src/backend/cdb/cdbparquetrowgroup.c index c9779b8b53..743815d831 100644 --- a/src/backend/cdb/cdbparquetrowgroup.c +++ b/src/backend/cdb/cdbparquetrowgroup.c @@ -226,16 +226,13 @@ ParquetRowGroupReader_ScanNextTuple( bool *nulls = slot_get_isnull(slot); int colReaderIndex = 0; - int16 proj[natts]; - for (int i = 0, j = 0; i < natts; i++) + for (int i = 0; i < natts; i++) { if (projs[i] == false) { nulls[i] = true; continue; } - proj[j] = i; - j++; ParquetColumnReader *nextReader = &rowGroupReader->columnReaders[colReaderIndex]; int hawqTypeID = tupDesc->attrs[i]->atttypid; @@ -290,7 +287,6 @@ ParquetRowGroupReader_ScanNextTuple( && !rfState->stopRuntimeFilter) { Assert(rfState->bloomfilter != NULL); - rfState->bloomfilter->nTested++; uint32_t hashkey = 0; ListCell *hk; int i = 0; @@ -302,7 +298,7 @@ ParquetRowGroupReader_ScanNextTuple( /* rotate hashkey left 1 bit at each step */ hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); - keyval = values[proj[attrno - 1]]; + keyval = values[attrno - 1]; /* Evaluate expression */ hkey = DatumGetUInt32( @@ -315,7 +311,6 @@ ParquetRowGroupReader_ScanNextTuple( { continue; } - rfState->bloomfilter->nMatched++; } /*construct tuple, and return back*/ diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 96a31e7297..e7463d723f 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1482,10 +1482,12 @@ ExecHashTableExplainEnd(PlanState *planstate, struct StringInfoData *buf) } /* Report Bloom filter statistics. */ - if (hashtable->bloomfilter != NULL) + if (hjstate->js.ps.lefttree->type == T_TableScanState && + ((ScanState*)hjstate->js.ps.lefttree)->runtimeFilter != NULL && + ((ScanState*)hjstate->js.ps.lefttree)->runtimeFilter->bloomfilter != NULL) { - BloomFilter bf = hashtable->bloomfilter; - appendStringInfo(buf,"Bloom filter, inner table row number:%d, " + BloomFilter bf = ((ScanState*)hjstate->js.ps.lefttree)->runtimeFilter->bloomfilter; + appendStringInfo(buf, "Bloom filter, inner table row number:%d, " "outer table checked row number:%d, " "outer table matched row number:%d, " "outer table filtered row number:%d, filtered rate:%.3f", diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 66dba83454..09d851569e 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -807,18 +807,35 @@ ExecEndHashJoin(HashJoinState *node) * TODO: how to pass it across motion */ static RuntimeFilterState* -CreateRuntimeFilterState(HashJoinState *hjstate) +CreateRuntimeFilterState(HashJoinState *hjstate, ProjectionInfo* projInfo) { /* record projection info */ ListCell *hk; int i = 0; - RuntimeFilterState* rf = (RuntimeFilterState*)palloc0(sizeof(RuntimeFilterState)); + Assert(hjstate != NULL); + + if (projInfo != NULL && !projInfo->pi_isVarList) + { + /* Create bloom filter for simple-Var-list case */ + return NULL; + } + /* push down join key projection information */ + RuntimeFilterState* rf = (RuntimeFilterState*)palloc0(sizeof(RuntimeFilterState)); foreach(hk, hjstate->hj_OuterHashKeys) { ExprState *keyexpr = (ExprState *) lfirst(hk); Var *variable = (Var *) keyexpr->expr; - rf->joinkeys = lappend_int(rf->joinkeys, variable->varattno); + if (projInfo != NULL) + { + Assert(projInfo->pi_varNumbers != NULL); + rf->joinkeys = lappend_int(rf->joinkeys, projInfo->pi_varNumbers[variable->varattno-1]); + } + else + { + /* select * from ... */ + rf->joinkeys = lappend_int(rf->joinkeys, variable->varattno); + } i++; } rf->hashfunctions = (FmgrInfo *) palloc(i * sizeof(FmgrInfo)); @@ -862,7 +879,7 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, ((ScanState*)outerNode)->runtimeFilter == NULL) { Assert(hashtable->bloomfilter->isCreated); - ((ScanState*)outerNode)->runtimeFilter = CreateRuntimeFilterState(hjstate); + ((ScanState*) outerNode)->runtimeFilter = CreateRuntimeFilterState(hjstate, ((ScanState*) outerNode)->ps.ps_ProjInfo); } RuntimeFilterState* rf = ((ScanState*)outerNode)->runtimeFilter; diff --git a/src/backend/utils/hash/bloomfilter.c b/src/backend/utils/hash/bloomfilter.c index 7d71ad55a2..628c7b8faf 100644 --- a/src/backend/utils/hash/bloomfilter.c +++ b/src/backend/utils/hash/bloomfilter.c @@ -90,6 +90,7 @@ void InsertBloomFilter(BloomFilter bf, uint32_t value) */ bool FindBloomFilter(BloomFilter bf, uint32_t value) { + bf->nTested++; uint32_t bucket_idx = getBucketIdx(value, bf->data_mask); for (int i = 0; i < NUM_BUCKET_WORDS; ++i) { @@ -100,6 +101,7 @@ bool FindBloomFilter(BloomFilter bf, uint32_t value) return false; } } + bf->nMatched++; return true; } diff --git a/src/test/feature/query/test_hashjoin_bloomfilter.cpp b/src/test/feature/query/test_hashjoin_bloomfilter.cpp index 2adb24b960..f43a042e09 100644 --- a/src/test/feature/query/test_hashjoin_bloomfilter.cpp +++ b/src/test/feature/query/test_hashjoin_bloomfilter.cpp @@ -45,6 +45,10 @@ TEST_F(TestHashJoinBloomFilter, BasicTest) util.query("select * from dim;", 10); util.query("select * from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7); util.query("set hawq_hashjoin_bloomfilter=true; select * from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7); + util.query("set hawq_hashjoin_bloomfilter=true; select fact.c1 from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7); + util.query("set hawq_hashjoin_bloomfilter=true; select fact.c1, dim.c1 from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7); + util.query("set hawq_hashjoin_bloomfilter=true; select fact.c1, dim.c1, dim.c2 from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7); + util.query("set hawq_hashjoin_bloomfilter=true; select dim.c1, dim.c2 from fact, dim where fact.c1 = dim.c1 and dim.c2<4", 7); util.execute("set hawq_hashjoin_bloomfilter=true; explain analyze select * from fact, dim where fact.c1 = dim.c1 and dim.c2<4"); util.execute("drop table dim;"); util.execute("drop table fact;");