diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index dcdb1b1eb01a..a62799164d56 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -45,10 +45,8 @@ use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::{broadcast, watch, Mutex, RwLock}; pub(crate) use crate::adapter::node_context::FlownodeContext; -use crate::adapter::table_source::TableSource; -use crate::adapter::util::{ - relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc, -}; +use crate::adapter::table_source::ManagedTableSource; +use crate::adapter::util::relation_desc_to_column_schemas_with_fallback; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; @@ -69,7 +67,7 @@ mod util; mod worker; pub(crate) mod node_context; -mod table_source; +pub(crate) mod table_source; use crate::error::Error; use crate::utils::StateReportHandler; @@ -129,7 +127,7 @@ pub struct FlowWorkerManager { /// The query engine that will be used to parse the query and convert it to a dataflow plan pub query_engine: Arc, /// Getting table name and table schema from table info manager - table_info_source: TableSource, + table_info_source: ManagedTableSource, frontend_invoker: RwLock>, /// contains mapping from table name to global id, and table schema node_context: RwLock, @@ -158,11 +156,11 @@ impl FlowWorkerManager { query_engine: Arc, table_meta: TableMetadataManagerRef, ) -> Self { - let srv_map = TableSource::new( + let srv_map = ManagedTableSource::new( table_meta.table_info_manager().clone(), table_meta.table_name_manager().clone(), ); - let node_context = FlownodeContext::default(); + let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _); let tick_manager = FlowTickManager::new(); let worker_handles = Vec::new(); FlowWorkerManager { @@ -409,7 +407,7 @@ impl FlowWorkerManager { ) -> Result, Option, Vec)>, Error> { if let Some(table_id) = self .table_info_source - .get_table_id_from_name(table_name) + .get_opt_table_id_from_name(table_name) .await? { let table_info = self @@ -828,27 +826,9 @@ impl FlowWorkerManager { .fail()?, } } - - let table_id = self - .table_info_source - .get_table_id_from_name(&sink_table_name) - .await? - .context(UnexpectedSnafu { - reason: format!("Can't get table id for table name {:?}", sink_table_name), - })?; - let table_info_value = self - .table_info_source - .get_table_info_value(&table_id) - .await? - .context(UnexpectedSnafu { - reason: format!("Can't get table info value for table id {:?}", table_id), - })?; - let real_schema = table_info_value_to_relation_desc(table_info_value)?; - node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?; } else { // assign inferred schema to sink table // create sink table - node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?; let did_create = self .create_table_from_relation( &format!("flow-id={flow_id}"), diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 1f431ff92f64..fd858ca5dd26 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -34,11 +34,16 @@ use crate::error::InternalSnafu; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; -fn to_meta_err(err: crate::error::Error) -> common_meta::error::Error { - // TODO(discord9): refactor this - Err::<(), _>(BoxedError::new(err)) - .with_context(|_| ExternalSnafu) - .unwrap_err() +/// return a function to convert `crate::error::Error` to `common_meta::error::Error` +fn to_meta_err( + location: snafu::Location, +) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error { + move |err: crate::error::Error| -> common_meta::error::Error { + common_meta::error::Error::External { + location, + source: BoxedError::new(err), + } + } } #[async_trait::async_trait] @@ -79,7 +84,10 @@ impl Flownode for FlowWorkerManager { flow_options, query_ctx, }; - let ret = self.create_flow(args).await.map_err(to_meta_err)?; + let ret = self + .create_flow(args) + .await + .map_err(to_meta_err(snafu::location!()))?; METRIC_FLOW_TASK_COUNT.inc(); Ok(FlowResponse { affected_flows: ret @@ -94,7 +102,7 @@ impl Flownode for FlowWorkerManager { })) => { self.remove_flow(flow_id.id as u64) .await - .map_err(to_meta_err)?; + .map_err(to_meta_err(snafu::location!()))?; METRIC_FLOW_TASK_COUNT.dec(); Ok(Default::default()) } @@ -112,9 +120,15 @@ impl Flownode for FlowWorkerManager { .await .flush_all_sender() .await - .map_err(to_meta_err)?; - let rows_send = self.run_available(true).await.map_err(to_meta_err)?; - let row = self.send_writeback_requests().await.map_err(to_meta_err)?; + .map_err(to_meta_err(snafu::location!()))?; + let rows_send = self + .run_available(true) + .await + .map_err(to_meta_err(snafu::location!()))?; + let row = self + .send_writeback_requests() + .await + .map_err(to_meta_err(snafu::location!()))?; debug!( "Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed", @@ -156,15 +170,14 @@ impl Flownode for FlowWorkerManager { let fetch_order = { let ctx = self.node_context.read().await; - let table_col_names = ctx - .table_repr - .get_by_table_id(&table_id) - .map(|r| r.1) - .and_then(|id| ctx.schema.get(&id)) - .map(|desc| &desc.names) - .context(UnexpectedSnafu { - err_msg: format!("Table not found: {}", table_id), - })?; + + // TODO(discord9): also check schema version so that altered table can be reported + let table_schema = ctx + .table_source + .table_from_id(&table_id) + .await + .map_err(to_meta_err(snafu::location!()))?; + let table_col_names = table_schema.names; let table_col_names = table_col_names .iter().enumerate() .map(|(idx,name)| match name { @@ -211,12 +224,12 @@ impl Flownode for FlowWorkerManager { .iter() .map(from_proto_to_data_type) .collect::, _>>() - .map_err(to_meta_err)?; + .map_err(to_meta_err(snafu::location!()))?; self.handle_write_request(region_id.into(), rows, &batch_datatypes) .await .map_err(|err| { common_telemetry::error!(err;"Failed to handle write request"); - to_meta_err(err) + to_meta_err(snafu::location!())(err) })?; } Ok(Default::default()) diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index efa3796fd287..02612b6f5a2d 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -25,7 +25,8 @@ use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; use tokio::sync::{broadcast, mpsc, RwLock}; -use crate::adapter::{FlowId, TableName, TableSource}; +use crate::adapter::table_source::FlowTableSource; +use crate::adapter::{FlowId, ManagedTableSource, TableName}; use crate::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::expr::error::InternalSnafu; use crate::expr::{Batch, GlobalId}; @@ -33,7 +34,7 @@ use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE; use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP}; /// A context that holds the information of the dataflow -#[derive(Default, Debug)] +#[derive(Debug)] pub struct FlownodeContext { /// mapping from source table to tasks, useful for schedule which task to run when a source table is updated pub source_to_tasks: BTreeMap>, @@ -50,13 +51,28 @@ pub struct FlownodeContext { /// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here pub sink_receiver: BTreeMap, mpsc::UnboundedReceiver)>, - /// the schema of the table, query from metasrv or inferred from TypedPlan - pub schema: HashMap, + /// can query the schema of the table source, from metasrv with local cache + pub table_source: Box, /// All the tables that have been registered in the worker pub table_repr: IdToNameMap, pub query_context: Option>, } +impl FlownodeContext { + pub fn new(table_source: Box) -> Self { + Self { + source_to_tasks: Default::default(), + flow_to_sink: Default::default(), + sink_to_flow: Default::default(), + source_sender: Default::default(), + sink_receiver: Default::default(), + table_source, + table_repr: Default::default(), + query_context: Default::default(), + } + } +} + /// a simple broadcast sender with backpressure, bounded capacity and blocking on send when send buf is full /// note that it wouldn't evict old data, so it's possible to block forever if the receiver is slow /// @@ -284,7 +300,7 @@ impl FlownodeContext { /// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function. /// /// Returns an error if no table has been registered with the provided names - pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> { + pub async fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> { let id = self .table_repr .get_by_name(name) @@ -292,13 +308,7 @@ impl FlownodeContext { .with_context(|| TableNotFoundSnafu { name: name.join("."), })?; - let schema = self - .schema - .get(&id) - .cloned() - .with_context(|| TableNotFoundSnafu { - name: name.join("."), - })?; + let schema = self.table_source.table(name).await?; Ok((id, schema)) } @@ -312,7 +322,7 @@ impl FlownodeContext { /// merely creating a mapping from table id to global id pub async fn assign_global_id_to_table( &mut self, - srv_map: &TableSource, + srv_map: &ManagedTableSource, mut table_name: Option, table_id: Option, ) -> Result { @@ -333,9 +343,8 @@ impl FlownodeContext { // table id is Some meaning db must have created the table if let Some(table_id) = table_id { - let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?; + let known_table_name = srv_map.get_table_name(&table_id).await?; table_name = table_name.or(Some(known_table_name)); - self.schema.insert(global_id, schema); } // if we don't have table id, it means database haven't assign one yet or we don't need it // still update the mapping with new global id @@ -344,26 +353,6 @@ impl FlownodeContext { } } - /// Assign a schema to a table - /// - pub fn assign_table_schema( - &mut self, - table_name: &TableName, - schema: RelationDesc, - ) -> Result<(), Error> { - let gid = self - .table_repr - .get_by_name(table_name) - .map(|(_, gid)| gid) - .context(TableNotFoundSnafu { - name: format!("Table not found: {:?} in flownode cache", table_name), - })?; - - self.schema.insert(gid, schema); - - Ok(()) - } - /// Get a new global id pub fn new_global_id(&self) -> GlobalId { GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64) diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index e067b273ea25..bc2e9ec2d5ef 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -27,16 +27,56 @@ use crate::error::{ }; use crate::repr::RelationDesc; -/// mapping of table name <-> table id should be query from tableinfo manager -pub struct TableSource { +/// Table source but for flow, provide table schema by table name/id +#[async_trait::async_trait] +pub trait FlowTableSource: Send + Sync + std::fmt::Debug { + async fn table_name_from_id(&self, table_id: &TableId) -> Result; + async fn table_id_from_name(&self, name: &TableName) -> Result; + + /// Get the table schema by table name + async fn table(&self, name: &TableName) -> Result { + let id = self.table_id_from_name(name).await?; + self.table_from_id(&id).await + } + async fn table_from_id(&self, table_id: &TableId) -> Result; +} + +/// managed table source information, query from table info manager and table name manager +#[derive(Clone)] +pub struct ManagedTableSource { /// for query `TableId -> TableName` mapping table_info_manager: TableInfoManager, table_name_manager: TableNameManager, } -impl TableSource { +#[async_trait::async_trait] +impl FlowTableSource for ManagedTableSource { + async fn table_from_id(&self, table_id: &TableId) -> Result { + let table_info_value = self + .get_table_info_value(table_id) + .await? + .with_context(|| TableNotFoundSnafu { + name: format!("TableId = {:?}, Can't found table info", table_id), + })?; + let desc = table_info_value_to_relation_desc(table_info_value)?; + + Ok(desc) + } + async fn table_name_from_id(&self, table_id: &TableId) -> Result { + self.get_table_name(table_id).await + } + async fn table_id_from_name(&self, name: &TableName) -> Result { + self.get_opt_table_id_from_name(name) + .await? + .with_context(|| TableNotFoundSnafu { + name: name.join("."), + }) + } +} + +impl ManagedTableSource { pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self { - TableSource { + ManagedTableSource { table_info_manager, table_name_manager, } @@ -63,7 +103,10 @@ impl TableSource { } /// If the table haven't been created in database, the tableId returned would be null - pub async fn get_table_id_from_name(&self, name: &TableName) -> Result, Error> { + pub async fn get_opt_table_id_from_name( + &self, + name: &TableName, + ) -> Result, Error> { let ret = self .table_name_manager .get(TableNameKey::new(&name[0], &name[1], &name[2])) @@ -126,3 +169,117 @@ impl TableSource { Ok((table_name, desc)) } } + +impl std::fmt::Debug for ManagedTableSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("KvBackendTableSource").finish() + } +} + +#[cfg(test)] +pub(crate) mod test { + use std::collections::HashMap; + + use datatypes::data_type::ConcreteDataType as CDT; + + use super::*; + use crate::repr::{ColumnType, RelationType}; + + pub struct FlowDummyTableSource { + pub id_names_to_desc: Vec<(TableId, TableName, RelationDesc)>, + id_to_idx: HashMap, + name_to_idx: HashMap, + } + + impl Default for FlowDummyTableSource { + fn default() -> Self { + let id_names_to_desc = vec![ + ( + 1024, + [ + "greptime".to_string(), + "public".to_string(), + "numbers".to_string(), + ], + RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]) + .into_named(vec![Some("number".to_string())]), + ), + ( + 1025, + [ + "greptime".to_string(), + "public".to_string(), + "numbers_with_ts".to_string(), + ], + RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), + ColumnType::new(CDT::timestamp_millisecond_datatype(), false), + ]) + .into_named(vec![Some("number".to_string()), Some("ts".to_string())]), + ), + ]; + let id_to_idx = id_names_to_desc + .iter() + .enumerate() + .map(|(idx, (id, _name, _desc))| (*id, idx)) + .collect(); + let name_to_idx = id_names_to_desc + .iter() + .enumerate() + .map(|(idx, (_id, name, _desc))| (name.clone(), idx)) + .collect(); + Self { + id_names_to_desc, + id_to_idx, + name_to_idx, + } + } + } + + #[async_trait::async_trait] + impl FlowTableSource for FlowDummyTableSource { + async fn table_from_id(&self, table_id: &TableId) -> Result { + let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu { + name: format!("Table id = {:?}, couldn't found table desc", table_id), + })?; + let desc = self + .id_names_to_desc + .get(*idx) + .map(|x| x.2.clone()) + .context(TableNotFoundSnafu { + name: format!("Table id = {:?}, couldn't found table desc", table_id), + })?; + Ok(desc) + } + + async fn table_name_from_id(&self, table_id: &TableId) -> Result { + let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu { + name: format!("Table id = {:?}, couldn't found table desc", table_id), + })?; + self.id_names_to_desc + .get(*idx) + .map(|x| x.1.clone()) + .context(TableNotFoundSnafu { + name: format!("Table id = {:?}, couldn't found table desc", table_id), + }) + } + + async fn table_id_from_name(&self, name: &TableName) -> Result { + for (id, table_name, _desc) in &self.id_names_to_desc { + if name == table_name { + return Ok(*id); + } + } + TableNotFoundSnafu { + name: format!("Table name = {:?}, couldn't found table id", name), + } + .fail()? + } + } + + impl std::fmt::Debug for FlowDummyTableSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DummyTableSource").finish() + } + } +} diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 94878115cf8b..d6f30d3c88ad 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -173,12 +173,11 @@ mod test { use super::*; use crate::adapter::node_context::IdToNameMap; + use crate::adapter::table_source::test::FlowDummyTableSource; use crate::df_optimizer::apply_df_optimizer; use crate::expr::GlobalId; - use crate::repr::{ColumnType, RelationType}; pub fn create_test_ctx() -> FlownodeContext { - let mut schemas = HashMap::new(); let mut tri_map = IdToNameMap::new(); { let gid = GlobalId::User(0); @@ -187,10 +186,7 @@ mod test { "public".to_string(), "numbers".to_string(), ]; - let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); - tri_map.insert(Some(name.clone()), Some(1024), gid); - schemas.insert(gid, schema.into_named(vec![Some("number".to_string())])); } { @@ -200,23 +196,16 @@ mod test { "public".to_string(), "numbers_with_ts".to_string(), ]; - let schema = RelationType::new(vec![ - ColumnType::new(CDT::uint32_datatype(), false), - ColumnType::new(CDT::timestamp_millisecond_datatype(), false), - ]); - schemas.insert( - gid, - schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]), - ); tri_map.insert(Some(name.clone()), Some(1025), gid); } - FlownodeContext { - schema: schemas, - table_repr: tri_map, - query_context: Some(Arc::new(QueryContext::with("greptime", "public"))), - ..Default::default() - } + let dummy_source = FlowDummyTableSource::default(); + + let mut ctx = FlownodeContext::new(Box::new(dummy_source)); + ctx.table_repr = tri_map; + ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public"))); + + ctx } pub fn create_test_query_engine() -> Arc { diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index ad5fc2f58dc2..5e8599f3681e 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -176,7 +176,7 @@ impl TypedPlan { } .fail()?, }; - let table = ctx.table(&table_reference)?; + let table = ctx.table(&table_reference).await?; let get_table = Plan::Get { id: crate::expr::Id::Global(table.0), }; diff --git a/tests/cases/standalone/common/flow/flow_aft_alter.result b/tests/cases/standalone/common/flow/flow_aft_alter.result new file mode 100644 index 000000000000..9ee6080e478e --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_aft_alter.result @@ -0,0 +1,107 @@ +-- test if flow can get table schema correctly after table have been altered +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE TABLE approx_rate ( + rate DOUBLE, + time_window TIMESTAMP, + update_at TIMESTAMP, + TIME INDEX(time_window) +); + +Affected Rows: 0 + +-- make both src&sink table in cache of flownode by using them +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window +from + bytes_log +GROUP BY + time_window; + +Affected Rows: 0 + +SHOW CREATE FLOW find_approx_rate; + ++------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ +| find_approx_rate | CREATE FLOW IF NOT EXISTS find_approx_rate | +| | SINK TO approx_rate | +| | AS SELECT (max(byte) - min(byte)) / 30.0 AS rate, date_bin(INTERVAL '30 second', ts) AS time_window FROM bytes_log GROUP BY time_window | ++------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ + +DROP FLOW find_approx_rate; + +Affected Rows: 0 + +ALTER TABLE bytes_log ADD COLUMN stat INT DEFAULT 200 AFTER byte; + +Affected Rows: 0 + +ALTER TABLE approx_rate ADD COLUMN sample_cnt INT64 DEFAULT 0 AFTER rate; + +Affected Rows: 0 + +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + count(byte) as sample_cnt, + date_bin(INTERVAL '30 second', ts) as time_window +from + bytes_log +GROUP BY + time_window; + +Affected Rows: 0 + +INSERT INTO + bytes_log +VALUES + (0, 200, '2023-01-01 00:00:01'), + (300,200, '2023-01-01 00:00:29'); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('find_approx_rate') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT + rate, + sample_cnt, + time_window +FROM + approx_rate; + ++------+------------+---------------------+ +| rate | sample_cnt | time_window | ++------+------------+---------------------+ +| 10.0 | 2 | 2023-01-01T00:00:00 | ++------+------------+---------------------+ + +DROP TABLE bytes_log; + +Affected Rows: 0 + +DROP FLOW find_approx_rate; + +Affected Rows: 0 + +DROP TABLE approx_rate; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_aft_alter.sql b/tests/cases/standalone/common/flow/flow_aft_alter.sql new file mode 100644 index 000000000000..6b25c7ab544e --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_aft_alter.sql @@ -0,0 +1,64 @@ +-- test if flow can get table schema correctly after table have been altered + +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time + TIME INDEX(ts) +); + +CREATE TABLE approx_rate ( + rate DOUBLE, + time_window TIMESTAMP, + update_at TIMESTAMP, + TIME INDEX(time_window) +); + +-- make both src&sink table in cache of flownode by using them +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window +from + bytes_log +GROUP BY + time_window; + +SHOW CREATE FLOW find_approx_rate; + +DROP FLOW find_approx_rate; + +ALTER TABLE bytes_log ADD COLUMN stat INT DEFAULT 200 AFTER byte; +ALTER TABLE approx_rate ADD COLUMN sample_cnt INT64 DEFAULT 0 AFTER rate; + +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + count(byte) as sample_cnt, + date_bin(INTERVAL '30 second', ts) as time_window +from + bytes_log +GROUP BY + time_window; + +INSERT INTO + bytes_log +VALUES + (0, 200, '2023-01-01 00:00:01'), + (300,200, '2023-01-01 00:00:29'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + +SELECT + rate, + sample_cnt, + time_window +FROM + approx_rate; + +DROP TABLE bytes_log; + +DROP FLOW find_approx_rate; + +DROP TABLE approx_rate;