Skip to content

Commit

Permalink
support strong query
Browse files Browse the repository at this point in the history
Signed-off-by: Xianhui.Lin <[email protected]>
  • Loading branch information
JsDove committed Feb 14, 2025
1 parent c8d9474 commit f735b44
Show file tree
Hide file tree
Showing 42 changed files with 627 additions and 463 deletions.
11 changes: 10 additions & 1 deletion internal/core/src/exec/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class QueryContext : public Context {
const milvus::segcore::SegmentInternalInterface* segment,
int64_t active_count,
milvus::Timestamp timestamp,
int32_t consistency_level = 0,
std::shared_ptr<QueryConfig> query_config =
std::make_shared<QueryConfig>(),
folly::Executor* executor = nullptr,
Expand All @@ -187,7 +188,8 @@ class QueryContext : public Context {
active_count_(active_count),
query_timestamp_(timestamp),
query_config_(query_config),
executor_(executor) {
executor_(executor),
consistency_level_(consistency_level) {
}

folly::Executor*
Expand Down Expand Up @@ -270,6 +272,11 @@ class QueryContext : public Context {
return std::move(retrieve_result_);
}

int32_t
get_consistency_level() {
return consistency_level_;
}

private:
folly::Executor* executor_;
//folly::Executor::KeepAlive<> executor_keepalive_;
Expand All @@ -291,6 +298,8 @@ class QueryContext : public Context {
// used for store segment search/retrieve result
milvus::SearchResult search_result_;
milvus::RetrieveResult retrieve_result_;

int32_t consistency_level_ = 0;
};

// Represent the state of one thread of query execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,13 +448,15 @@ class PhyBinaryArithOpEvalRangeExpr : public SegmentExpr {
const std::string& name,
const segcore::SegmentInternalInterface* segment,
int64_t active_count,
int64_t batch_size)
int64_t batch_size,
int32_t consistency_level)
: SegmentExpr(std::move(input),
name,
segment,
expr->column_.field_id_,
active_count,
batch_size),
batch_size,
consistency_level),
expr_(expr) {
}

Expand Down
11 changes: 8 additions & 3 deletions internal/core/src/exec/expression/BinaryRangeExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,14 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonForIndex() {
}
};
bool is_growing = segment_->type() == SegmentType::Growing;
cached_index_chunk_res_ =
index->FilterByPath(pointer, active_count_, is_growing, filter_func)
.clone();
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
Expand Down
6 changes: 4 additions & 2 deletions internal/core/src/exec/expression/BinaryRangeExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,15 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr {
const std::string& name,
const segcore::SegmentInternalInterface* segment,
int64_t active_count,
int64_t batch_size)
int64_t batch_size,
int32_t consistency_level)
: SegmentExpr(std::move(input),
name,
segment,
expr->column_.field_id_,
active_count,
batch_size),
batch_size,
consistency_level),
expr_(expr) {
}

Expand Down
11 changes: 8 additions & 3 deletions internal/core/src/exec/expression/ExistsExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,14 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegmentForIndex() {
return true;
};
bool is_growing = segment_->type() == SegmentType::Growing;
cached_index_chunk_res_ =
index->FilterByPath(pointer, active_count_, is_growing, filter_func)
.clone();
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
Expand Down
6 changes: 4 additions & 2 deletions internal/core/src/exec/expression/ExistsExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ class PhyExistsFilterExpr : public SegmentExpr {
const std::string& name,
const segcore::SegmentInternalInterface* segment,
int64_t active_count,
int64_t batch_size)
int64_t batch_size,
int32_t consistency_level)
: SegmentExpr(std::move(input),
name,
segment,
expr->column_.field_id_,
active_count,
batch_size),
batch_size,
consistency_level),
expr_(expr) {
}

Expand Down
24 changes: 16 additions & 8 deletions internal/core/src/exec/expression/Expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ CompileExpression(const expr::TypedExprPtr& expr,
const std::unordered_set<std::string>& flatten_candidates,
bool enable_constant_folding) {
ExprPtr result;

std::cout << "CompileExpression context" << context->get_consistency_level()
<< std::endl;
auto compiled_inputs = CompileInputs(expr, context, flatten_candidates);

auto GetTypes = [](const std::vector<ExprPtr>& exprs) {
Expand Down Expand Up @@ -181,7 +182,8 @@ CompileExpression(const expr::TypedExprPtr& expr,
"PhyUnaryRangeFilterExpr",
context->get_segment(),
context->get_active_count(),
context->query_config()->get_expr_batch_size());
context->query_config()->get_expr_batch_size(),
context->get_consistency_level());
} else if (auto casted_expr = std::dynamic_pointer_cast<
const milvus::expr::LogicalUnaryExpr>(expr)) {
result = std::make_shared<PhyLogicalUnaryExpr>(
Expand All @@ -195,7 +197,8 @@ CompileExpression(const expr::TypedExprPtr& expr,
context->get_segment(),
context->get_active_count(),
context->get_query_timestamp(),
context->query_config()->get_expr_batch_size());
context->query_config()->get_expr_batch_size(),
context->get_consistency_level());
} else if (auto casted_expr = std::dynamic_pointer_cast<
const milvus::expr::LogicalBinaryExpr>(expr)) {
if (casted_expr->op_type_ ==
Expand All @@ -218,7 +221,8 @@ CompileExpression(const expr::TypedExprPtr& expr,
"PhyBinaryRangeFilterExpr",
context->get_segment(),
context->get_active_count(),
context->query_config()->get_expr_batch_size());
context->query_config()->get_expr_batch_size(),
context->get_consistency_level());
} else if (auto casted_expr = std::dynamic_pointer_cast<
const milvus::expr::AlwaysTrueExpr>(expr)) {
result = std::make_shared<PhyAlwaysTrueExpr>(
Expand All @@ -236,7 +240,8 @@ CompileExpression(const expr::TypedExprPtr& expr,
"PhyBinaryArithOpEvalRangeExpr",
context->get_segment(),
context->get_active_count(),
context->query_config()->get_expr_batch_size());
context->query_config()->get_expr_batch_size(),
context->get_consistency_level());
} else if (auto casted_expr =
std::dynamic_pointer_cast<const milvus::expr::CompareExpr>(
expr)) {
Expand All @@ -256,7 +261,8 @@ CompileExpression(const expr::TypedExprPtr& expr,
"PhyExistsFilterExpr",
context->get_segment(),
context->get_active_count(),
context->query_config()->get_expr_batch_size());
context->query_config()->get_expr_batch_size(),
context->get_consistency_level());
} else if (auto casted_expr = std::dynamic_pointer_cast<
const milvus::expr::JsonContainsExpr>(expr)) {
result = std::make_shared<PhyJsonContainsFilterExpr>(
Expand All @@ -265,7 +271,8 @@ CompileExpression(const expr::TypedExprPtr& expr,
"PhyJsonContainsFilterExpr",
context->get_segment(),
context->get_active_count(),
context->query_config()->get_expr_batch_size());
context->query_config()->get_expr_batch_size(),
context->get_consistency_level());
} else if (auto value_expr =
std::dynamic_pointer_cast<const milvus::expr::ValueExpr>(
expr)) {
Expand Down Expand Up @@ -296,7 +303,8 @@ CompileExpression(const expr::TypedExprPtr& expr,
"PhyNullExpr",
context->get_segment(),
context->get_active_count(),
context->query_config()->get_expr_batch_size());
context->query_config()->get_expr_batch_size(),
context->get_consistency_level());
} else {
PanicInfo(ExprInvalid, "unsupport expr: ", expr->ToString());
}
Expand Down
7 changes: 5 additions & 2 deletions internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ class SegmentExpr : public Expr {
const segcore::SegmentInternalInterface* segment,
const FieldId& field_id,
int64_t active_count,
int64_t batch_size)
int64_t batch_size,
int32_t consistency_level)
: Expr(DataType::BOOL, std::move(input), name),
segment_(segment),
field_id_(field_id),
active_count_(active_count),
batch_size_(batch_size) {
batch_size_(batch_size),
consistency_level_(consistency_level) {
size_per_chunk_ = segment_->size_per_chunk();
AssertInfo(
batch_size_ > 0,
Expand Down Expand Up @@ -1114,6 +1116,7 @@ class SegmentExpr : public Expr {

// Cache for text match.
std::shared_ptr<TargetBitmap> cached_match_res_{nullptr};
int32_t consistency_level_{0};
};

void
Expand Down
66 changes: 48 additions & 18 deletions internal/core/src/exec/expression/JsonContainsExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,14 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByKeyIndex() {
return false;
};
bool is_growing = segment_->type() == SegmentType::Growing;
cached_index_chunk_res_ =
index->FilterByPath(pointer, active_count_, is_growing, filter_func)
.clone();
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
Expand Down Expand Up @@ -556,9 +561,14 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArrayByKeyIndex() {
return false;
};
bool is_growing = segment_->type() == SegmentType::Growing;
cached_index_chunk_res_ =
index->FilterByPath(pointer, active_count_, is_growing, filter_func)
.clone();
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
Expand Down Expand Up @@ -806,9 +816,14 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByKeyIndex() {
return tmp_elements.empty();
};
bool is_growing = segment_->type() == SegmentType::Growing;
cached_index_chunk_res_ =
index->FilterByPath(pointer, active_count_, is_growing, filter_func)
.clone();
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
Expand Down Expand Up @@ -1092,9 +1107,14 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByKeyIndex() {
return tmp_elements_index.size() == 0;
};
bool is_growing = segment_->type() == SegmentType::Growing;
cached_index_chunk_res_ =
index->FilterByPath(pointer, active_count_, is_growing, filter_func)
.clone();
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
Expand Down Expand Up @@ -1265,9 +1285,14 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByKeyIndex() {
return exist_elements_index.size() == elements.size();
};
bool is_growing = segment_->type() == SegmentType::Growing;
cached_index_chunk_res_ =
index->FilterByPath(pointer, active_count_, is_growing, filter_func)
.clone();
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
Expand Down Expand Up @@ -1525,9 +1550,14 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByKeyIndex() {
return false;
};
bool is_growing = segment_->type() == SegmentType::Growing;
cached_index_chunk_res_ =
index->FilterByPath(pointer, active_count_, is_growing, filter_func)
.clone();
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
Expand Down
6 changes: 4 additions & 2 deletions internal/core/src/exec/expression/JsonContainsExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ class PhyJsonContainsFilterExpr : public SegmentExpr {
const std::string& name,
const segcore::SegmentInternalInterface* segment,
int64_t active_count,
int64_t batch_size)
int64_t batch_size,
int32_t consistency_level)
: SegmentExpr(std::move(input),
name,
segment,
expr->column_.field_id_,
active_count,
batch_size),
batch_size,
consistency_level),
expr_(expr) {
}

Expand Down
6 changes: 4 additions & 2 deletions internal/core/src/exec/expression/NullExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ class PhyNullExpr : public SegmentExpr {
const std::string& name,
const segcore::SegmentInternalInterface* segment,
int64_t active_count,
int64_t batch_size)
int64_t batch_size,
int32_t consistency_level)
: SegmentExpr(std::move(input),
name,
segment,
expr->column_.field_id_,
active_count,
batch_size),
batch_size,
consistency_level),
expr_(expr) {
}

Expand Down
11 changes: 8 additions & 3 deletions internal/core/src/exec/expression/TermExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,14 @@ PhyTermFilterExpr::ExecJsonInVariableByKeyIndex() {
return term_set.find(ValueType(val.value())) != term_set.end();
};
bool is_growing = segment_->type() == SegmentType::Growing;
cached_index_chunk_res_ =
index->FilterByPath(pointer, active_count_, is_growing, filter_func)
.clone();
bool is_strong_consistency = consistency_level_ == 0;
cached_index_chunk_res_ = index
->FilterByPath(pointer,
active_count_,
is_growing,
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_id_ = 0;
}

Expand Down
Loading

0 comments on commit f735b44

Please sign in to comment.