Skip to content

Commit

Permalink
Pass cacheable flag in connector split (#24303)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #24303

Differential Revision: D67688385
  • Loading branch information
xiaoxmeng authored and tanjialiang committed Dec 29, 2024
1 parent 90f941d commit 3c1b9a3
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 16 deletions.
8 changes: 5 additions & 3 deletions presto-native-execution/presto_cpp/main/SystemConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,16 @@ std::optional<RowVectorPtr> SystemDataSource::next(
std::unique_ptr<velox::connector::ConnectorSplit>
SystemPrestoToVeloxConnector::toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* const connectorSplit) const {
const protocol::ConnectorSplit* const connectorSplit,
const protocol::SplitContext* splitContext) const {
auto systemSplit = dynamic_cast<const protocol::SystemSplit*>(connectorSplit);
VELOX_CHECK_NOT_NULL(
systemSplit, "Unexpected split type {}", connectorSplit->_type);
return std::make_unique<SystemSplit>(
catalogId,
systemSplit->tableHandle.schemaName,
systemSplit->tableHandle.tableName);
systemSplit->tableHandle.tableName,
splitContext->cacheable);
}

std::unique_ptr<velox::connector::ColumnHandle>
Expand Down Expand Up @@ -393,4 +395,4 @@ std::unique_ptr<protocol::ConnectorProtocol>
SystemPrestoToVeloxConnector::createConnectorProtocol() const {
return std::make_unique<protocol::SystemConnectorProtocol>();
}
} // namespace facebook::presto
} // namespace facebook::presto
3 changes: 2 additions & 1 deletion presto-native-execution/presto_cpp/main/SystemConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ class SystemPrestoToVeloxConnector final : public PrestoToVeloxConnector {

std::unique_ptr<velox::connector::ConnectorSplit> toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* connectorSplit) const final;
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const final;

std::unique_ptr<velox::connector::ColumnHandle> toVeloxColumnHandle(
const protocol::ColumnHandle* column,
Expand Down
5 changes: 3 additions & 2 deletions presto-native-execution/presto_cpp/main/SystemSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ struct SystemSplit : public velox::connector::ConnectorSplit {
explicit SystemSplit(
const std::string& connectorId,
const std::string& schemaName,
const std::string& tableName)
: ConnectorSplit(connectorId),
const std::string& tableName,
bool cacheable)
: ConnectorSplit(connectorId, /*splitWeight=*/0, cacheable),
schemaName_(schemaName),
tableName_(tableName) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,8 @@ velox::connector::hive::iceberg::FileContent toVeloxFileContent(
std::unique_ptr<velox::connector::ConnectorSplit>
HivePrestoToVeloxConnector::toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* const connectorSplit) const {
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const {
auto hiveSplit =
dynamic_cast<const protocol::hive::HiveSplit*>(connectorSplit);
VELOX_CHECK_NOT_NULL(
Expand Down Expand Up @@ -1147,6 +1148,7 @@ HivePrestoToVeloxConnector::toVeloxSplit(
extraFileInfo,
serdeParameters,
hiveSplit->splitWeight,
splitContext->cacheable,
infoColumns);
if (hiveSplit->bucketConversion) {
VELOX_CHECK_NOT_NULL(hiveSplit->tableBucketNumber);
Expand Down Expand Up @@ -1331,7 +1333,8 @@ HivePrestoToVeloxConnector::createConnectorProtocol() const {
std::unique_ptr<velox::connector::ConnectorSplit>
IcebergPrestoToVeloxConnector::toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* const connectorSplit) const {
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const {
auto icebergSplit =
dynamic_cast<const protocol::iceberg::IcebergSplit*>(connectorSplit);
VELOX_CHECK_NOT_NULL(
Expand Down Expand Up @@ -1386,6 +1389,7 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
std::nullopt,
customSplitInfo,
nullptr,
splitContext->cacheable,
deletes,
infoColumns);
}
Expand Down Expand Up @@ -1482,13 +1486,17 @@ IcebergPrestoToVeloxConnector::createConnectorProtocol() const {
std::unique_ptr<velox::connector::ConnectorSplit>
TpchPrestoToVeloxConnector::toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* const connectorSplit) const {
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const {
auto tpchSplit =
dynamic_cast<const protocol::tpch::TpchSplit*>(connectorSplit);
VELOX_CHECK_NOT_NULL(
tpchSplit, "Unexpected split type {}", connectorSplit->_type);
return std::make_unique<connector::tpch::TpchConnectorSplit>(
catalogId, tpchSplit->totalParts, tpchSplit->partNumber);
catalogId,
splitContext->cacheable,
tpchSplit->totalParts,
tpchSplit->partNumber);
}

std::unique_ptr<velox::connector::ColumnHandle>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class PrestoToVeloxConnector {
[[nodiscard]] virtual std::unique_ptr<velox::connector::ConnectorSplit>
toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* connectorSplit) const = 0;
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const = 0;

[[nodiscard]] virtual std::unique_ptr<velox::connector::ColumnHandle>
toVeloxColumnHandle(
Expand Down Expand Up @@ -115,7 +116,8 @@ class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector {

std::unique_ptr<velox::connector::ConnectorSplit> toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* connectorSplit) const final;
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const final;

std::unique_ptr<velox::connector::ColumnHandle> toVeloxColumnHandle(
const protocol::ColumnHandle* column,
Expand Down Expand Up @@ -166,7 +168,8 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {

std::unique_ptr<velox::connector::ConnectorSplit> toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* connectorSplit) const final;
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const final;

std::unique_ptr<velox::connector::ColumnHandle> toVeloxColumnHandle(
const protocol::ColumnHandle* column,
Expand All @@ -192,7 +195,8 @@ class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector {

std::unique_ptr<velox::connector::ConnectorSplit> toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* connectorSplit) const final;
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const final;

std::unique_ptr<velox::connector::ColumnHandle> toVeloxColumnHandle(
const protocol::ColumnHandle* column,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ velox::exec::Split toVeloxSplit(

auto& connector = getPrestoToVeloxConnector(connectorSplit->_type);
auto veloxSplit = connector.toVeloxSplit(
scheduledSplit.split.connectorId, connectorSplit.get());
scheduledSplit.split.connectorId,
connectorSplit.get(),
&scheduledSplit.split.splitContext);
return velox::exec::Split(std::move(veloxSplit), splitGroupId);
}

Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 137 files

0 comments on commit 3c1b9a3

Please sign in to comment.