Skip to content

Commit

Permalink
feat(flow): mirror insert req to flow node (#3858)
Browse files Browse the repository at this point in the history
* feat: mirror insert req to flow node

* refactor: group_requests_by_peer

* chore: rename `nodes` to `flows` to be more apt

* docs: add TODO

* refactor: split flow&data node grouping to two func

* refactor: mirror_flow_node_request

* chore: add some TODOs

* refactor: use Option in value

* feat: skip non-src table quickly

* docs: add TODO for  `Peer.address`

* fix: dedup
  • Loading branch information
discord9 authored May 6, 2024
1 parent f3b6825 commit 573c19b
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 15 deletions.
10 changes: 5 additions & 5 deletions src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use snafu::{ensure, OptionExt};
use self::flow_info::FlowInfoValue;
use crate::ensure_values;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoManager;
use crate::key::flow::flow_name::FlowNameManager;
use crate::key::flow::flownode_flow::FlownodeFlowManager;
use crate::key::flow::table_flow::TableFlowManager;
use crate::key::flow::flow_info::{FlowInfoManager, FlowInfoManagerRef};
use crate::key::flow::flow_name::{FlowNameManager, FlowNameManagerRef};
use crate::key::flow::flownode_flow::{FlownodeFlowManager, FlownodeFlowManagerRef};
pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{FlowId, MetaKey};
use crate::kv_backend::txn::Txn;
Expand Down Expand Up @@ -306,7 +306,7 @@ mod tests {
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_flow_manager()
.nodes(table_id)
.flows(table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use lazy_static::lazy_static;
use regex::Regex;
Expand Down Expand Up @@ -143,6 +144,8 @@ impl FlowInfoValue {
}
}

pub type FlowInfoManagerRef = Arc<FlowInfoManager>;

/// The manager of [FlowInfoKey].
pub struct FlowInfoManager {
kv_backend: KvBackendRef,
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/key/flow/flow_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::v1::flow::flow_server::Flow;
use lazy_static::lazy_static;
use regex::Regex;
Expand Down Expand Up @@ -141,6 +143,8 @@ impl FlowNameValue {
}
}

pub type FlowNameManagerRef = Arc<FlowNameManager>;

/// The manager of [FlowNameKey].
pub struct FlowNameManager {
kv_backend: KvBackendRef,
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/key/flow/flownode_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ impl<'a> MetaKey<'a, FlownodeFlowKeyInner> for FlownodeFlowKeyInner {
}
}

pub type FlownodeFlowManagerRef = Arc<FlownodeFlowManager>;

/// The manager of [FlownodeFlowKey].
pub struct FlownodeFlowManager {
kv_backend: KvBackendRef,
Expand Down
6 changes: 5 additions & 1 deletion src/common/meta/src/key/flow/table_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub fn table_flow_decoder(kv: KeyValue) -> Result<TableFlowKey> {
TableFlowKey::from_bytes(&kv.key)
}

pub type TableFlowManagerRef = Arc<TableFlowManager>;

/// The manager of [TableFlowKey].
pub struct TableFlowManager {
kv_backend: KvBackendRef,
Expand All @@ -188,7 +190,9 @@ impl TableFlowManager {
}

/// Retrieves all [TableFlowKey]s of the specified `table_id`.
pub fn nodes(&self, table_id: TableId) -> BoxStream<'static, Result<TableFlowKey>> {
///
/// TODO(discord9): add cache for it since range request does not support cache.
pub fn flows(&self, table_id: TableId) -> BoxStream<'static, Result<TableFlowKey>> {
let start_key = TableFlowKey::range_start_key(table_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse, InsertRequest};
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, QueryRequest, RegionRequest};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

Expand All @@ -40,7 +40,7 @@ pub type DatanodeRef = Arc<dyn Datanode>;
pub trait Flownode: Send + Sync {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;

async fn handle_insert(&self, request: InsertRequest) -> Result<FlowResponse>;
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
}

pub type FlownodeRef = Arc<dyn Flownode>;
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::TableFlowManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
Expand Down Expand Up @@ -101,10 +102,13 @@ impl FrontendBuilder {
let region_query_handler =
FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone());

let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend.clone()));

let inserter = Arc::new(Inserter::new(
self.catalog_manager.clone(),
partition_manager.clone(),
node_manager.clone(),
table_flow_manager,
));
let deleter = Arc::new(Deleter::new(
self.catalog_manager.clone(),
Expand Down
115 changes: 109 additions & 6 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::alter_expr::Kind;
use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader};
use api::v1::{
Expand All @@ -25,22 +26,25 @@ use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
use common_catalog::consts::default_engine;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_meta::key::flow::TableFlowManagerRef;
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
use common_meta::peer::Peer;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info};
use datatypes::schema::Schema;
use futures_util::future;
use futures_util::{future, TryStreamExt};
use meter_macros::write_meter;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use snafu::ResultExt;
use sql::statements::insert::Insert;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::storage::{RegionId, TableId};
use table::requests::InsertRequest as TableInsertRequest;
use table::table_reference::TableReference;
use table::TableRef;
Expand All @@ -58,6 +62,7 @@ pub struct Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
table_flow_manager: TableFlowManagerRef,
}

pub type InserterRef = Arc<Inserter>;
Expand All @@ -67,11 +72,13 @@ impl Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
table_flow_manager: TableFlowManagerRef,
) -> Self {
Self {
catalog_manager,
partition_manager,
node_manager,
table_flow_manager,
}
}

Expand Down Expand Up @@ -199,13 +206,34 @@ impl Inserter {
..Default::default()
});

// spawn all tasks that do job for mirror insert requests for flownode
let flow_tasks = self
.mirror_flow_node_requests(&requests)
.await?
.into_iter()
.map(|(peer, inserts)| {
let node_manager = self.node_manager.clone();
common_runtime::spawn_write(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.map(|flow_response| RegionResponse {
affected_rows: flow_response.affected_rows as AffectedRows,
extension: flow_response.extension,
})
.context(RequestInsertsSnafu)
})
});

let tasks = self
.group_requests_by_peer(requests)
.await?
.into_iter()
.map(|(peer, inserts)| {
let request = request_factory.build_insert(inserts);
let node_manager = self.node_manager.clone();
let request = request_factory.build_insert(inserts);
common_runtime::spawn_write(async move {
node_manager
.datanode(&peer)
Expand All @@ -214,7 +242,8 @@ impl Inserter {
.await
.context(RequestInsertsSnafu)
})
});
})
.chain(flow_tasks);
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;

let affected_rows = results
Expand All @@ -228,19 +257,93 @@ impl Inserter {
))
}

/// Mirror requests for source table to flownode
async fn mirror_flow_node_requests(
&self,
requests: &RegionInsertRequests,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
// store partial source table requests used by flow node(only store what's used)
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
HashMap::new();
for req in &requests.requests {
match src_table_reqs.get_mut(&RegionId::from_u64(req.region_id).table_id()) {
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
// already know this is not source table
Some(None) => continue,
_ => {
let table_id = RegionId::from_u64(req.region_id).table_id();
let peers = self
.table_flow_manager
.flows(table_id)
// TODO(discord9): determine where to store the flow node address in distributed mode
.map_ok(|key| Peer::new(key.flownode_id(), ""))
.try_collect::<Vec<_>>()
.await
.map(|mut v| {
v.dedup();
v
})
.context(RequestInsertsSnafu)?;

if !peers.is_empty() {
let mut reqs = RegionInsertRequests::default();
reqs.requests.push(req.clone());
src_table_reqs.insert(table_id, Some((peers, reqs)));
} else {
// insert a empty entry to avoid repeat query
src_table_reqs.insert(table_id, None);
}
}
}
}

let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();

for (_table_id, (peers, reqs)) in src_table_reqs
.into_iter()
.filter_map(|(k, v)| v.map(|v| (k, v)))
{
for flownode in peers {
inserts
.entry(flownode.clone())
.or_default()
.requests
.extend(reqs.requests.clone());
}
}
Ok(inserts)
}

async fn group_requests_by_peer(
&self,
requests: RegionInsertRequests,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
// group by region ids first to reduce repeatedly call `find_region_leader`
// TODO(discord9): determine if a addition clone is worth it
let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();

for req in requests.requests {
let region_id = RegionId::from_u64(req.region_id);
requests_per_region
.entry(region_id)
.or_default()
.requests
.push(req);
}

let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();

for (region_id, reqs) in requests_per_region {
let peer = self
.partition_manager
.find_region_leader(req.region_id.into())
.find_region_leader(region_id)
.await
.context(FindRegionLeaderSnafu)?;
inserts.entry(peer).or_default().requests.push(req);
inserts
.entry(peer)
.or_default()
.requests
.extend(reqs.requests);
}

Ok(inserts)
Expand Down

0 comments on commit 573c19b

Please sign in to comment.