Skip to content

Commit

Permalink
feat(flow): make write path faster with shared lock (GreptimeTeam#4073)
Browse files Browse the repository at this point in the history
* feat(WIP): make write faster

* feat: read lock on fast path

* chore: per review
  • Loading branch information
discord9 authored May 31, 2024
1 parent 43afea1 commit ea49f8a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 30 deletions.
20 changes: 10 additions & 10 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub struct FlownodeManager {
table_info_source: TableSource,
frontend_invoker: RwLock<Option<Box<dyn FrontendInvoker + Send + Sync>>>,
/// contains mapping from table name to global id, and table schema
node_context: Mutex<FlownodeContext>,
node_context: RwLock<FlownodeContext>,
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
tick_manager: FlowTickManager,
Expand Down Expand Up @@ -194,7 +194,7 @@ impl FlownodeManager {
query_engine,
table_info_source: srv_map,
frontend_invoker: RwLock::new(None),
node_context: Mutex::new(node_context),
node_context: RwLock::new(node_context),
flow_err_collectors: Default::default(),
src_send_buf_lens: Default::default(),
tick_manager,
Expand Down Expand Up @@ -298,7 +298,7 @@ impl FlownodeManager {
} else {
// TODO(discord9): condiser remove buggy auto create by schema

let node_ctx = self.node_context.lock().await;
let node_ctx = self.node_context.read().await;
let gid: GlobalId = node_ctx
.table_repr
.get_by_name(&table_name)
Expand Down Expand Up @@ -462,7 +462,7 @@ impl FlownodeManager {
let mut output = BTreeMap::new();
for (name, sink_recv) in self
.node_context
.lock()
.write()
.await
.sink_receiver
.iter_mut()
Expand Down Expand Up @@ -542,11 +542,11 @@ impl FlownodeManager {
}
// first check how many inputs were sent
let (flush_res, buf_len) = if blocking {
let mut ctx = self.node_context.lock().await;
(ctx.flush_all_sender(), ctx.get_send_buf_size())
let ctx = self.node_context.read().await;
(ctx.flush_all_sender().await, ctx.get_send_buf_size().await)
} else {
match self.node_context.try_lock() {
Ok(mut ctx) => (ctx.flush_all_sender(), ctx.get_send_buf_size()),
match self.node_context.try_read() {
Ok(ctx) => (ctx.flush_all_sender().await, ctx.get_send_buf_size().await),
Err(_) => return Ok(()),
}
};
Expand Down Expand Up @@ -580,7 +580,7 @@ impl FlownodeManager {
rows.len()
);
let table_id = region_id.table_id();
self.node_context.lock().await.send(table_id, rows)?;
self.node_context.read().await.send(table_id, rows).await?;
// TODO(discord9): put it in a background task?
// self.run_available(false).await?;
Ok(())
Expand Down Expand Up @@ -628,7 +628,7 @@ impl FlownodeManager {
}
}

let mut node_ctx = self.node_context.lock().await;
let mut node_ctx = self.node_context.write().await;
// assign global id to source and sink table
for source in source_table_ids {
node_ctx
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Flownode for FlownodeManager {
let now = self.tick_manager.tick();

let fetch_order = {
let ctx = self.node_context.lock().await;
let ctx = self.node_context.read().await;
let table_col_names = ctx
.table_repr
.get_by_table_id(&table_id)
Expand Down
48 changes: 29 additions & 19 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_telemetry::debug;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{broadcast, mpsc, RwLock};

use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::adapter::{FlowId, TableName, TableSource};
Expand Down Expand Up @@ -65,7 +65,7 @@ pub struct FlownodeContext {
#[derive(Debug)]
pub struct SourceSender {
sender: broadcast::Sender<DiffRow>,
send_buf: VecDeque<DiffRow>,
send_buf: RwLock<VecDeque<DiffRow>>,
}

impl Default for SourceSender {
Expand All @@ -78,22 +78,24 @@ impl Default for SourceSender {
}
}

// TODO: make all send operation immut
impl SourceSender {
pub fn get_receiver(&self) -> broadcast::Receiver<DiffRow> {
self.sender.subscribe()
}

/// send as many as possible rows from send buf
/// until send buf is empty or broadchannel is full
pub fn try_send_all(&mut self) -> Result<usize, Error> {
pub async fn try_send_all(&self) -> Result<usize, Error> {
let mut row_cnt = 0;
loop {
let mut send_buf = self.send_buf.write().await;
// if inner sender channel is empty or send buf is empty, there
// is nothing to do for now, just break
if self.sender.len() >= BROADCAST_CAP || self.send_buf.is_empty() {
if self.sender.len() >= BROADCAST_CAP || send_buf.is_empty() {
break;
}
if let Some(row) = self.send_buf.pop_front() {
if let Some(row) = send_buf.pop_front() {
self.sender
.send(row)
.map_err(|err| {
Expand All @@ -108,17 +110,20 @@ impl SourceSender {
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
debug!("Remaining Send buf.len() = {}", self.send_buf.len());
debug!(
"Remaining Send buf.len() = {}",
self.send_buf.read().await.len()
);
}

Ok(row_cnt)
}

/// return number of rows it actual send(including what's in the buffer)
pub fn send_rows(&mut self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf.extend(rows);
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf.write().await.extend(rows);

let row_cnt = self.try_send_all()?;
let row_cnt = self.try_send_all().await?;

Ok(row_cnt)
}
Expand All @@ -128,30 +133,35 @@ impl FlownodeContext {
/// return number of rows it actual send(including what's in the buffer)
///
/// TODO(discord9): make this concurrent
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
pub async fn send(&self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
let sender = self
.source_sender
.get_mut(&table_id)
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
// debug!("FlownodeContext::send: trying to send {} rows", rows.len());
sender.send_rows(rows)
sender.send_rows(rows).await
}

/// flush all sender's buf
///
/// return numbers being sent
pub fn flush_all_sender(&mut self) -> Result<usize, Error> {
self.source_sender
.iter_mut()
.map(|(_table_id, src_sender)| src_sender.try_send_all())
.try_fold(0, |acc, x| x.map(|x| x + acc))
pub async fn flush_all_sender(&self) -> Result<usize, Error> {
let mut sum = 0;
for sender in self.source_sender.values() {
sender.try_send_all().await.inspect(|x| sum += x)?;
}
Ok(sum)
}

/// Return the sum number of rows in all send buf
pub fn get_send_buf_size(&self) -> usize {
self.source_sender.values().map(|v| v.send_buf.len()).sum()
pub async fn get_send_buf_size(&self) -> usize {
let mut sum = 0;
for sender in self.source_sender.values() {
sum += sender.send_buf.read().await.len();
}
sum
}
}

Expand Down

0 comments on commit ea49f8a

Please sign in to comment.