ConnectorSourceWorker {
- /// Recreate the `SplitEnumerator` to establish a new connection to the external source service.
- async fn refresh(&mut self) -> MetaResult<()> {
- let enumerator = P::SplitEnumerator::new(
- self.connector_properties.clone(),
- Arc::new(SourceEnumeratorContext {
- metrics: self.metrics.source_enumerator_metrics.clone(),
- info: SourceEnumeratorInfo {
- source_id: self.source_id as u32,
- },
- }),
- )
- .await
- .context("failed to create SplitEnumerator")?;
- self.enumerator = enumerator;
- self.fail_cnt = 0;
- tracing::info!("refreshed source enumerator: {}", self.source_name);
- Ok(())
- }
- /// On creation, connection to the external source service will be established, but `splits`
- /// will not be updated until `tick` is called.
- pub async fn create(
- source: &Source,
- connector_properties: P,
- period: Duration,
- splits: Arc>,
- metrics: Arc,
- ) -> MetaResult {
- let enumerator = P::SplitEnumerator::new(
- connector_properties.clone(),
- Arc::new(SourceEnumeratorContext {
- metrics: metrics.source_enumerator_metrics.clone(),
- info: SourceEnumeratorInfo {
- source_id:,
- },
- }),
- )
- .await
- .context("failed to create SplitEnumerator")?;
- let source_is_up = metrics
- .source_is_up
- .with_guarded_label_values(&[, &]);
- Ok(Self {
- source_id: as SourceId,
- source_name:,
- current_splits: splits,
- enumerator,
- period,
- metrics,
- connector_properties,
- fail_cnt: 0,
- source_is_up,
- })
- }
- pub async fn run(
- &mut self,
- mut sync_call_rx: UnboundedReceiver>>,
- ) {
- let mut interval = time::interval(self.period);
- interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
- loop {
- select! {
- biased;
- tx = sync_call_rx.borrow_mut().recv() => {
- if let Some(tx) = tx {
- let _ = tx.send(self.tick().await);
- }
- }
- _ = interval.tick() => {
- if self.fail_cnt > MAX_FAIL_CNT {
- if let Err(e) = self.refresh().await {
- tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
- }
- }
- if let Err(e) = self.tick().await {
- tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker");
- }
- }
- }
- }
- }
- /// Uses [`SplitEnumerator`] to fetch the latest split metadata from the external source service.
- async fn tick(&mut self) -> MetaResult<()> {
- let source_is_up = |res: i64| {
- self.source_is_up.set(res);
- };
- let splits = self.enumerator.list_splits().await.inspect_err(|_| {
- source_is_up(0);
- self.fail_cnt += 1;
- })?;
- source_is_up(1);
- self.fail_cnt = 0;
- let mut current_splits = self.current_splits.lock().await;
- current_splits.splits.replace(
- splits
- .into_iter()
- .map(|split| (, P::Split::into(split)))
- .collect(),
- );
- Ok(())
- }
-/// Handle for a running [`ConnectorSourceWorker`].
-pub struct ConnectorSourceWorkerHandle {
- handle: JoinHandle<()>,
- sync_call_tx: UnboundedSender>>,
- splits: SharedSplitMapRef,
- enable_drop_split: bool,
- enable_adaptive_splits: bool,
-impl ConnectorSourceWorkerHandle {
- async fn discovered_splits(&self) -> Option> {
- self.splits.lock().await.splits.clone()
- }
- pub fn get_enable_adaptive_splits(&self) -> bool {
- self.enable_adaptive_splits
- }
pub struct SourceManagerCore {
metadata_manager: MetadataManager,
@@ -300,36 +84,6 @@ pub struct SourceManagerRunningInfo {
pub actor_splits: HashMap>,
-async fn handle_discover_splits(
- handle: &ConnectorSourceWorkerHandle,
- source_id: SourceId,
- actors: &HashSet,
-) -> MetaResult, SplitImpl>> {
- let Some(mut discovered_splits) = handle.discovered_splits().await else {
- tracing::info!(
- "The discover loop for source {} is not ready yet; we'll wait for the next run",
- source_id
- );
- return Ok(BTreeMap::new());
- };
- if discovered_splits.is_empty() {
- tracing::warn!("No splits discovered for source {}", source_id);
- }
- if handle.enable_adaptive_splits {
- // Connector supporting adaptive splits returns just one split, and we need to make the number of splits equal to the number of actors in this fragment.
- // Because we Risingwave consume the splits statelessly and we do not need to keep the id internally, we always use actor_id as split_id.
- // And prev splits record should be dropped via CN.
- debug_assert!(handle.enable_drop_split);
- debug_assert!(discovered_splits.len() == 1);
- discovered_splits =
- fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?;
- }
- Ok(discovered_splits)
impl SourceManagerCore {
fn new(
metadata_manager: MetadataManager,
@@ -347,117 +101,6 @@ impl SourceManagerCore {
- /// Checks whether the external source metadata has changed,
- /// and re-assigns splits if there's a diff.
- ///
- /// `self.actor_splits` will not be updated. It will be updated by `Self::apply_source_change`,
- /// after the mutation barrier has been collected.
- async fn reassign_splits(&self) -> MetaResult> {
- let mut split_assignment: SplitAssignment = HashMap::new();
- 'loop_source: for (source_id, handle) in &self.managed_sources {
- let source_fragment_ids = match self.source_fragments.get(source_id) {
- Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids,
- _ => {
- continue;
- }
- };
- let backfill_fragment_ids = self.backfill_fragments.get(source_id);
- 'loop_fragment: for &fragment_id in source_fragment_ids {
- let actors = match self
- .metadata_manager
- .get_running_actors_of_fragment(fragment_id)
- .await
- {
- Ok(actors) => {
- if actors.is_empty() {
- tracing::warn!("No actors found for fragment {}", fragment_id);
- continue 'loop_fragment;
- }
- actors
- }
- Err(err) => {
- tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
- continue 'loop_fragment;
- }
- };
- let discovered_splits = handle_discover_splits(handle, *source_id, &actors).await?;
- if discovered_splits.is_empty() {
- // The discover loop for this source is not ready yet; we'll wait for the next run
- continue 'loop_source;
- }
- let prev_actor_splits: HashMap<_, _> = actors
- .into_iter()
- .map(|actor_id| {
- (
- actor_id,
- self.actor_splits
- .get(&actor_id)
- .cloned()
- .unwrap_or_default(),
- )
- })
- .collect();
- if let Some(new_assignment) = reassign_splits(
- fragment_id,
- prev_actor_splits,
- &discovered_splits,
- SplitDiffOptions {
- enable_scale_in: handle.enable_drop_split,
- enable_adaptive: handle.enable_adaptive_splits,
- },
- ) {
- split_assignment.insert(fragment_id, new_assignment);
- }
- }
- if let Some(backfill_fragment_ids) = backfill_fragment_ids {
- // align splits for backfill fragments with its upstream source fragment
- for (fragment_id, upstream_fragment_id) in backfill_fragment_ids {
- let Some(upstream_assignment) = split_assignment.get(upstream_fragment_id)
- else {
- // upstream fragment unchanged, do not update backfill fragment too
- continue;
- };
- let actors = match self
- .metadata_manager
- .get_running_actors_for_source_backfill(*fragment_id)
- .await
- {
- Ok(actors) => {
- if actors.is_empty() {
- tracing::warn!("No actors found for fragment {}", fragment_id);
- continue;
- }
- actors
- }
- Err(err) => {
- tracing::warn!(error = %err.as_report(),"Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
- continue;
- }
- };
- split_assignment.insert(
- *fragment_id,
- align_splits(
- actors,
- upstream_assignment,
- *fragment_id,
- *upstream_fragment_id,
- )?,
- );
- }
- }
- }
- self.metadata_manager
- .split_fragment_map_by_database(split_assignment)
- .await
- }
/// Updates states after all kinds of source change.
pub fn apply_source_change(&mut self, source_change: SourceChange) {
let mut added_source_fragments = Default::default();
@@ -593,203 +236,6 @@ impl SourceManagerCore {
-/// Note: the `PartialEq` and `Ord` impl just compares the number of splits.
-struct ActorSplitsAssignment {
- actor_id: ActorId,
- splits: Vec,
-impl Eq for ActorSplitsAssignment {}
-impl PartialEq for ActorSplitsAssignment {
- fn eq(&self, other: &Self) -> bool {
- self.splits.len() == other.splits.len()
- }
-impl PartialOrd for ActorSplitsAssignment {
- fn partial_cmp(&self, other: &Self) -> Option {
- Some(self.cmp(other))
- }
-impl Ord for ActorSplitsAssignment {
- fn cmp(&self, other: &Self) -> Ordering {
- // Note: this is reversed order, to make BinaryHeap a min heap.
- other.splits.len().cmp(&self.splits.len())
- }
-struct SplitDiffOptions {
- enable_scale_in: bool,
- /// For most connectors, this should be false. When enabled, RisingWave will not track any progress.
- enable_adaptive: bool,
-impl Default for SplitDiffOptions {
- fn default() -> Self {
- SplitDiffOptions {
- enable_scale_in: false,
- enable_adaptive: false,
- }
- }
-/// Reassigns splits if there are new splits or dropped splits,
-/// i.e., `actor_splits` and `discovered_splits` differ, or actors are rescheduled.
-/// The existing splits will remain unmoved in their currently assigned actor.
-/// If an actor has an upstream actor, it should be a backfill executor,
-/// and its splits should be aligned with the upstream actor. **`reassign_splits` should not be used in this case.
-/// Use [`align_splits`] instead.**
-/// - `fragment_id`: just for logging
-/// ## Different connectors' behavior of split change
-/// ### Kafka and Pulsar
-/// They only support increasing the number of splits via adding new empty splits.
-/// Old data is not moved.
-/// ### Kinesis
-/// It supports *pairwise* shard split and merge.
-/// In both cases, old data remain in the old shard(s) and the old shard is still available.
-/// New data are routed to the new shard(s).
-/// After the retention period has expired, the old shard will become `EXPIRED` and isn't
-/// listed any more. In other words, the total number of shards will first increase and then decrease.
-/// See also:
-/// - [Kinesis resharding doc](
-/// - An example of how the shards can be like:
-fn reassign_splits(
- fragment_id: FragmentId,
- actor_splits: HashMap>,
- discovered_splits: &BTreeMap,
- opts: SplitDiffOptions,
-) -> Option>>
- T: SplitMetaData + Clone,
- // if no actors, return
- if actor_splits.is_empty() {
- return None;
- }
- let prev_split_ids: HashSet<_> = actor_splits
- .values()
- .flat_map(|splits| splits.iter().map(SplitMetaData::id))
- .collect();
- tracing::trace!(fragment_id, prev_split_ids = ?prev_split_ids, "previous splits");
- tracing::trace!(fragment_id, prev_split_ids = ?discovered_splits.keys(), "discovered splits");
- let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
- let dropped_splits: HashSet<_> = prev_split_ids
- .difference(&discovered_split_ids)
- .cloned()
- .collect();
- if !dropped_splits.is_empty() {
- if opts.enable_scale_in {
- tracing::info!(fragment_id, dropped_spltis = ?dropped_splits, "new dropped splits");
- } else {
- tracing::warn!(fragment_id, dropped_spltis = ?dropped_splits, "split dropping happened, but it is not allowed");
- }
- }
- let new_discovered_splits: BTreeSet<_> = discovered_split_ids
- .into_iter()
- .filter(|split_id| !prev_split_ids.contains(split_id))
- .collect();
- if opts.enable_scale_in || opts.enable_adaptive {
- // if we support scale in, no more splits are discovered, and no splits are dropped, return
- // we need to check if discovered_split_ids is empty, because if it is empty, we need to
- // handle the case of scale in to zero (like deleting all objects from s3)
- if dropped_splits.is_empty()
- && new_discovered_splits.is_empty()
- && !discovered_splits.is_empty()
- {
- return None;
- }
- } else {
- // if we do not support scale in, and no more splits are discovered, return
- if new_discovered_splits.is_empty() && !discovered_splits.is_empty() {
- return None;
- }
- }
- tracing::info!(fragment_id, new_discovered_splits = ?new_discovered_splits, "new discovered splits");
- let mut heap = BinaryHeap::with_capacity(actor_splits.len());
- for (actor_id, mut splits) in actor_splits {
- if opts.enable_scale_in || opts.enable_adaptive {
- splits.retain(|split| !dropped_splits.contains(&;
- }
- heap.push(ActorSplitsAssignment { actor_id, splits })
- }
- for split_id in new_discovered_splits {
- // ActorSplitsAssignment's Ord is reversed, so this is min heap, i.e.,
- // we get the assignment with the least splits here.
- // Note: If multiple actors have the same number of splits, it will be randomly picked.
- // When the number of source actors is larger than the number of splits,
- // It's possible that the assignment is uneven.
- // e.g.,
- // TODO: We should make the assignment rack-aware to make sure it's even.
- let mut peek_ref = heap.peek_mut().unwrap();
- peek_ref
- .splits
- .push(discovered_splits.get(&split_id).cloned().unwrap());
- }
- Some(
- heap.into_iter()
- .map(|ActorSplitsAssignment { actor_id, splits }| (actor_id, splits))
- .collect(),
- )
-/// Assign splits to a new set of actors, according to existing assignment.
-/// illustration:
-/// ```text
-/// upstream new
-/// actor x1 [split 1, split2] -> actor y1 [split 1, split2]
-/// actor x2 [split 3] -> actor y2 [split 3]
-/// ...
-/// ```
-fn align_splits(
- // (actor_id, upstream_actor_id)
- aligned_actors: impl IntoIterator- ,
- existing_assignment: &HashMap>,
- fragment_id: FragmentId,
- upstream_source_fragment_id: FragmentId,
-) -> anyhow::Result>> {
- aligned_actors
- .into_iter()
- .map(|(actor_id, upstream_actor_id)| {
- let Some(splits) = existing_assignment.get(&upstream_actor_id) else {
- return Err(anyhow::anyhow!("upstream assignment not found, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_assignment: {existing_assignment:?}, upstream_actor_id: {upstream_actor_id:?}"));
- };
- Ok((
- actor_id,
- splits.clone(),
- ))
- })
- .collect()
impl SourceManager {
const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
@@ -802,7 +248,7 @@ impl SourceManager {
let sources = metadata_manager.list_sources().await?;
for source in sources {
- Self::create_source_worker_async(source, &mut managed_sources, metrics.clone())?
+ create_source_worker_async(source, &mut managed_sources, metrics.clone())?
@@ -904,283 +350,12 @@ impl SourceManager {
- /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
- ///
- /// Very occasionally split removal may happen during scaling, in which case we need to
- /// use the old splits for reallocation instead of the latest splits (which may be missing),
- /// so that we can resolve the split removal in the next command.
- pub async fn migrate_splits_for_source_actors(
- &self,
- fragment_id: FragmentId,
- prev_actor_ids: &[ActorId],
- curr_actor_ids: &[ActorId],
- ) -> MetaResult>> {
- let core = self.core.lock().await;
- let prev_splits = prev_actor_ids
- .iter()
- .flat_map(|actor_id| core.actor_splits.get(actor_id).unwrap())
- .map(|split| (, split.clone()))
- .collect();
- let empty_actor_splits = curr_actor_ids
- .iter()
- .map(|actor_id| (*actor_id, vec![]))
- .collect();
- let diff = reassign_splits(
- fragment_id,
- empty_actor_splits,
- &prev_splits,
- // pre-allocate splits is the first time getting splits and it does not have scale-in scene
- SplitDiffOptions::default(),
- )
- .unwrap_or_default();
- Ok(diff)
- }
- /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
- pub fn migrate_splits_for_backfill_actors(
- &self,
- fragment_id: FragmentId,
- upstream_source_fragment_id: FragmentId,
- curr_actor_ids: &[ActorId],
- fragment_actor_splits: &HashMap>>,
- no_shuffle_upstream_actor_map: &HashMap>,
- ) -> MetaResult>> {
- // align splits for backfill fragments with its upstream source fragment
- let actors = no_shuffle_upstream_actor_map
- .iter()
- .filter(|(id, _)| curr_actor_ids.contains(id))
- .map(|(id, upstream_fragment_actors)| {
- (
- *id,
- *upstream_fragment_actors
- .get(&upstream_source_fragment_id)
- .unwrap(),
- )
- });
- let upstream_assignment = fragment_actor_splits
- .get(&upstream_source_fragment_id)
- .unwrap();
- tracing::info!(
- fragment_id,
- upstream_source_fragment_id,
- ?upstream_assignment,
- "migrate_splits_for_backfill_actors"
- );
- Ok(align_splits(
- actors,
- upstream_assignment,
- fragment_id,
- upstream_source_fragment_id,
- )?)
- }
- /// Allocates splits to actors for a newly created source executor.
- pub async fn allocate_splits(&self, job_id: &TableId) -> MetaResult {
- let core = self.core.lock().await;
- let table_fragments = core
- .metadata_manager
- .get_job_fragments_by_id(job_id)
- .await?;
- let source_fragments = table_fragments.stream_source_fragments();
- let mut assigned = HashMap::new();
- 'loop_source: for (source_id, fragments) in source_fragments {
- let handle = core
- .managed_sources
- .get(&source_id)
- .with_context(|| format!("could not find source {}", source_id))?;
- if handle.splits.lock().await.splits.is_none() {
- // force refresh source
- let (tx, rx) = oneshot::channel();
- handle
- .sync_call_tx
- .send(tx)
- .ok()
- .context("failed to send sync call")?;
- rx.await
- .ok()
- .context("failed to receive sync call response")??;
- }
- for fragment_id in fragments {
- let empty_actor_splits: HashMap> = table_fragments
- .fragments
- .get(&fragment_id)
- .unwrap()
- .actors
- .iter()
- .map(|actor| (actor.actor_id, vec![]))
- .collect();
- let actor_hashset: HashSet = empty_actor_splits.keys().cloned().collect();
- let splits = handle_discover_splits(handle, source_id, &actor_hashset).await?;
- if splits.is_empty() {
- tracing::warn!("no splits detected for source {}", source_id);
- continue 'loop_source;
- }
- if let Some(diff) = reassign_splits(
- fragment_id,
- empty_actor_splits,
- &splits,
- SplitDiffOptions::default(),
- ) {
- assigned.insert(fragment_id, diff);
- }
- }
- }
- Ok(assigned)
- }
- /// Allocates splits to actors for replace source job.
- pub async fn allocate_splits_for_replace_source(
- &self,
- job_id: &TableId,
- merge_updates: &Vec,
- ) -> MetaResult {
- tracing::debug!(?merge_updates, "allocate_splits_for_replace_source");
- if merge_updates.is_empty() {
- // no existing downstream. We can just re-allocate splits arbitrarily.
- return self.allocate_splits(job_id).await;
- }
- let core = self.core.lock().await;
- let table_fragments = core
- .metadata_manager
- .get_job_fragments_by_id(job_id)
- .await?;
- let source_fragments = table_fragments.stream_source_fragments();
- assert_eq!(
- source_fragments.len(),
- 1,
- "replace source job should only have one source"
- );
- let (_source_id, fragments) = source_fragments.into_iter().next().unwrap();
- assert_eq!(
- fragments.len(),
- 1,
- "replace source job should only have one fragment"
- );
- let fragment_id = fragments.into_iter().next().unwrap();
- debug_assert!(
- !merge_updates.is_empty()
- && merge_updates.iter().all(|merge_update| {
- merge_update.upstream_fragment_id == merge_updates[0].upstream_fragment_id
- && merge_update.new_upstream_fragment_id == Some(fragment_id)
- }),
- "merge update should only replace one fragment: {:?}",
- merge_updates
- );
- let prev_fragment_id = merge_updates[0].upstream_fragment_id;
- // Here we align the new source executor to backfill executors
- //
- // old_source => new_source backfill_1
- // actor_x1 => actor_y1 -----┬------>actor_a1
- // actor_x2 => actor_y2 -----┼-┬---->actor_a2
- // │ │
- // │ │ backfill_2
- // └─┼---->actor_b1
- // â””---->actor_b2
- //
- // Note: we can choose any backfill actor to align here.
- // We use `HashMap` to dedup.
- let aligned_actors: HashMap = merge_updates
- .iter()
- .map(|merge_update| {
- assert_eq!(merge_update.added_upstream_actor_id.len(), 1);
- // Note: removed_upstream_actor_id is not set for replace job, so we can't use it.
- assert_eq!(merge_update.removed_upstream_actor_id.len(), 0);
- (
- merge_update.added_upstream_actor_id[0],
- merge_update.actor_id,
- )
- })
- .collect();
- let assignment = align_splits(
- aligned_actors.into_iter(),
- &core.actor_splits,
- fragment_id,
- prev_fragment_id,
- )?;
- Ok(HashMap::from([(fragment_id, assignment)]))
- }
- /// Allocates splits to actors for a newly created `SourceBackfill` executor.
- ///
- /// Unlike [`Self::allocate_splits`], which creates a new assignment,
- /// this method aligns the splits for backfill fragments with its upstream source fragment ([`align_splits`]).
- pub async fn allocate_splits_for_backfill(
- &self,
- table_id: &TableId,
- // dispatchers from SourceExecutor to SourceBackfillExecutor
- dispatchers: &HashMap>,
- ) -> MetaResult {
- let core = self.core.lock().await;
- let table_fragments = core
- .metadata_manager
- .get_job_fragments_by_id(table_id)
- .await?;
- let source_backfill_fragments = table_fragments.source_backfill_fragments()?;
- let mut assigned = HashMap::new();
- for (_source_id, fragments) in source_backfill_fragments {
- for (fragment_id, upstream_source_fragment_id) in fragments {
- let upstream_actors = core
- .metadata_manager
- .get_running_actors_of_fragment(upstream_source_fragment_id)
- .await?;
- let mut backfill_actors = vec![];
- for upstream_actor in upstream_actors {
- if let Some(dispatchers) = dispatchers.get(&upstream_actor) {
- let err = || {
- anyhow::anyhow!(
- "source backfill fragment's upstream fragment should have one dispatcher, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, dispatchers: {dispatchers:?}",
- fragment_id = fragment_id,
- upstream_source_fragment_id = upstream_source_fragment_id,
- upstream_actor = upstream_actor,
- dispatchers = dispatchers
- )
- };
- if dispatchers.len() != 1 || dispatchers[0].downstream_actor_id.len() != 1 {
- return Err(err().into());
- }
- backfill_actors
- .push((dispatchers[0].downstream_actor_id[0], upstream_actor));
- }
- }
- assigned.insert(
- fragment_id,
- align_splits(
- backfill_actors,
- &core.actor_splits,
- fragment_id,
- upstream_source_fragment_id,
- )?,
- );
- }
- }
- Ok(assigned)
- }
/// create and register connector worker for source.
pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
tracing::debug!("register_source: {}", source.get_id());
let mut core = self.core.lock().await;
if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) {
- let handle = create_source_worker_handle(source, self.metrics.clone())
+ let handle = create_source_worker(source, self.metrics.clone())
.context("failed to create source worker")?;
@@ -1204,66 +379,6 @@ impl SourceManager {
- /// Used on startup ([`Self::new`]). Failed sources will not block meta startup.
- fn create_source_worker_async(
- source: Source,
- managed_sources: &mut HashMap,
- metrics: Arc,
- ) -> MetaResult<()> {
- tracing::info!("spawning new watcher for source {}",;
- let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
- let current_splits_ref = splits.clone();
- let source_id =;
- let connector_properties = extract_prop_from_existing_source(&source)?;
- let enable_drop_split = connector_properties.enable_drop_split();
- let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
- let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();
- let handle = tokio::spawn(async move {
- let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
- ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
- dispatch_source_prop!(connector_properties, prop, {
- let mut worker = loop {
- ticker.tick().await;
- match ConnectorSourceWorker::create(
- &source,
- prop.deref().clone(),
- current_splits_ref.clone(),
- metrics.clone(),
- )
- .await
- {
- Ok(worker) => {
- break worker;
- }
- Err(e) => {
- tracing::warn!(error = %e.as_report(), "failed to create source worker");
- }
- }
- };
- });
- });
- managed_sources.insert(
- source_id as SourceId,
- ConnectorSourceWorkerHandle {
- handle,
- sync_call_tx,
- splits,
- enable_drop_split,
- enable_adaptive_splits,
- },
- );
- Ok(())
- }
pub async fn list_assignments(&self) -> HashMap> {
let core = self.core.lock().await;
@@ -1384,235 +499,3 @@ pub fn build_actor_split_impls(
-mod tests {
- use std::collections::{BTreeMap, HashMap, HashSet};
- use risingwave_common::types::JsonbVal;
- use risingwave_connector::error::ConnectorResult;
- use risingwave_connector::source::{SplitId, SplitMetaData};
- use serde::{Deserialize, Serialize};
- use crate::model::{ActorId, FragmentId};
- use crate::stream::source_manager::{reassign_splits, SplitDiffOptions};
- #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
- struct TestSplit {
- id: u32,
- }
- impl SplitMetaData for TestSplit {
- fn id(&self) -> SplitId {
- format!("{}",
- }
- fn encode_to_json(&self) -> JsonbVal {
- serde_json::to_value(*self).unwrap().into()
- }
- fn restore_from_json(value: JsonbVal) -> ConnectorResult {
- serde_json::from_value(value.take()).map_err(Into::into)
- }
- fn update_offset(&mut self, _last_read_offset: String) -> ConnectorResult<()> {
- Ok(())
- }
- }
- fn check_all_splits(
- discovered_splits: &BTreeMap,
- diff: &HashMap>,
- ) {
- let mut split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
- for splits in diff.values() {
- for split in splits {
- assert!(split_ids.remove(&
- }
- }
- assert!(split_ids.is_empty());
- }
- #[test]
- fn test_drop_splits() {
- let mut actor_splits: HashMap = HashMap::new();
- actor_splits.insert(0, vec![TestSplit { id: 0 }, TestSplit { id: 1 }]);
- actor_splits.insert(1, vec![TestSplit { id: 2 }, TestSplit { id: 3 }]);
- actor_splits.insert(2, vec![TestSplit { id: 4 }, TestSplit { id: 5 }]);
- let mut prev_split_to_actor = HashMap::new();
- for (actor_id, splits) in &actor_splits {
- for split in splits {
- prev_split_to_actor.insert(, *actor_id);
- }
- }
- let discovered_splits: BTreeMap = (1..5)
- .map(|i| {
- let split = TestSplit { id: i };
- (, split)
- })
- .collect();
- let opts = SplitDiffOptions {
- enable_scale_in: true,
- enable_adaptive: false,
- };
- let prev_split_ids: HashSet<_> = actor_splits
- .values()
- .flat_map(|splits| splits.iter().map(|split|
- .collect();
- let diff = reassign_splits(
- FragmentId::default(),
- actor_splits,
- &discovered_splits,
- opts,
- )
- .unwrap();
- check_all_splits(&discovered_splits, &diff);
- let mut after_split_to_actor = HashMap::new();
- for (actor_id, splits) in &diff {
- for split in splits {
- after_split_to_actor.insert(, *actor_id);
- }
- }
- let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
- let retained_split_ids: HashSet<_> =
- prev_split_ids.intersection(&discovered_split_ids).collect();
- for retained_split_id in retained_split_ids {
- assert_eq!(
- prev_split_to_actor.get(retained_split_id),
- after_split_to_actor.get(retained_split_id)
- )
- }
- }
- #[test]
- fn test_drop_splits_to_empty() {
- let mut actor_splits: HashMap = HashMap::new();
- actor_splits.insert(0, vec![TestSplit { id: 0 }]);
- let discovered_splits: BTreeMap = BTreeMap::new();
- let opts = SplitDiffOptions {
- enable_scale_in: true,
- enable_adaptive: false,
- };
- let diff = reassign_splits(
- FragmentId::default(),
- actor_splits,
- &discovered_splits,
- opts,
- )
- .unwrap();
- assert!(!diff.is_empty())
- }
- #[test]
- fn test_reassign_splits() {
- let actor_splits = HashMap::new();
- let discovered_splits: BTreeMap = BTreeMap::new();
- assert!(reassign_splits(
- FragmentId::default(),
- actor_splits,
- &discovered_splits,
- Default::default()
- )
- .is_none());
- let actor_splits = (0..3).map(|i| (i, vec![])).collect();
- let discovered_splits: BTreeMap = BTreeMap::new();
- let diff = reassign_splits(
- FragmentId::default(),
- actor_splits,
- &discovered_splits,
- Default::default(),
- )
- .unwrap();
- assert_eq!(diff.len(), 3);
- for splits in diff.values() {
- assert!(splits.is_empty())
- }
- let actor_splits = (0..3).map(|i| (i, vec![])).collect();
- let discovered_splits: BTreeMap = (0..3)
- .map(|i| {
- let split = TestSplit { id: i };
- (, split)
- })
- .collect();
- let diff = reassign_splits(
- FragmentId::default(),
- actor_splits,
- &discovered_splits,
- Default::default(),
- )
- .unwrap();
- assert_eq!(diff.len(), 3);
- for splits in diff.values() {
- assert_eq!(splits.len(), 1);
- }
- check_all_splits(&discovered_splits, &diff);
- let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
- let discovered_splits: BTreeMap = (0..5)
- .map(|i| {
- let split = TestSplit { id: i };
- (, split)
- })
- .collect();
- let diff = reassign_splits(
- FragmentId::default(),
- actor_splits,
- &discovered_splits,
- Default::default(),
- )
- .unwrap();
- assert_eq!(diff.len(), 3);
- for splits in diff.values() {
- let len = splits.len();
- assert!(len == 1 || len == 2);
- }
- check_all_splits(&discovered_splits, &diff);
- let mut actor_splits: HashMap> =
- (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
- actor_splits.insert(3, vec![]);
- actor_splits.insert(4, vec![]);
- let discovered_splits: BTreeMap = (0..5)
- .map(|i| {
- let split = TestSplit { id: i };
- (, split)
- })
- .collect();
- let diff = reassign_splits(
- FragmentId::default(),
- actor_splits,
- &discovered_splits,
- Default::default(),
- )
- .unwrap();
- assert_eq!(diff.len(), 5);
- for splits in diff.values() {
- assert_eq!(splits.len(), 1);
- }
- check_all_splits(&discovered_splits, &diff);
- }
diff --git a/src/meta/src/stream/source_manager/ b/src/meta/src/stream/source_manager/
new file mode 100644
index 0000000000000..e36c1198bfc96
--- /dev/null
+++ b/src/meta/src/stream/source_manager/
@@ -0,0 +1,830 @@
+// Copyright 2025 RisingWave Labs
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+use super::*;
+impl SourceManager {
+ /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
+ ///
+ /// Very occasionally split removal may happen during scaling, in which case we need to
+ /// use the old splits for reallocation instead of the latest splits (which may be missing),
+ /// so that we can resolve the split removal in the next command.
+ pub async fn migrate_splits_for_source_actors(
+ &self,
+ fragment_id: FragmentId,
+ prev_actor_ids: &[ActorId],
+ curr_actor_ids: &[ActorId],
+ ) -> MetaResult>> {
+ let core = self.core.lock().await;
+ let prev_splits = prev_actor_ids
+ .iter()
+ .flat_map(|actor_id| core.actor_splits.get(actor_id).unwrap())
+ .map(|split| (, split.clone()))
+ .collect();
+ let empty_actor_splits = curr_actor_ids
+ .iter()
+ .map(|actor_id| (*actor_id, vec![]))
+ .collect();
+ let diff = reassign_splits(
+ fragment_id,
+ empty_actor_splits,
+ &prev_splits,
+ // pre-allocate splits is the first time getting splits and it does not have scale-in scene
+ SplitDiffOptions::default(),
+ )
+ .unwrap_or_default();
+ Ok(diff)
+ }
+ /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
+ pub fn migrate_splits_for_backfill_actors(
+ &self,
+ fragment_id: FragmentId,
+ upstream_source_fragment_id: FragmentId,
+ curr_actor_ids: &[ActorId],
+ fragment_actor_splits: &HashMap>>,
+ no_shuffle_upstream_actor_map: &HashMap>,
+ ) -> MetaResult>> {
+ // align splits for backfill fragments with its upstream source fragment
+ let actors = no_shuffle_upstream_actor_map
+ .iter()
+ .filter(|(id, _)| curr_actor_ids.contains(id))
+ .map(|(id, upstream_fragment_actors)| {
+ (
+ *id,
+ *upstream_fragment_actors
+ .get(&upstream_source_fragment_id)
+ .unwrap(),
+ )
+ });
+ let upstream_assignment = fragment_actor_splits
+ .get(&upstream_source_fragment_id)
+ .unwrap();
+ tracing::info!(
+ fragment_id,
+ upstream_source_fragment_id,
+ ?upstream_assignment,
+ "migrate_splits_for_backfill_actors"
+ );
+ Ok(align_splits(
+ actors,
+ upstream_assignment,
+ fragment_id,
+ upstream_source_fragment_id,
+ )?)
+ }
+ /// Allocates splits to actors for a newly created source executor.
+ pub async fn allocate_splits(&self, job_id: &TableId) -> MetaResult {
+ let core = self.core.lock().await;
+ let table_fragments = core
+ .metadata_manager
+ .get_job_fragments_by_id(job_id)
+ .await?;
+ let source_fragments = table_fragments.stream_source_fragments();
+ let mut assigned = HashMap::new();
+ 'loop_source: for (source_id, fragments) in source_fragments {
+ let handle = core
+ .managed_sources
+ .get(&source_id)
+ .with_context(|| format!("could not find source {}", source_id))?;
+ if handle.splits.lock().await.splits.is_none() {
+ // force refresh source
+ let (tx, rx) = oneshot::channel();
+ handle
+ .sync_call_tx
+ .send(tx)
+ .ok()
+ .context("failed to send sync call")?;
+ rx.await
+ .ok()
+ .context("failed to receive sync call response")??;
+ }
+ for fragment_id in fragments {
+ let empty_actor_splits: HashMap> = table_fragments
+ .fragments
+ .get(&fragment_id)
+ .unwrap()
+ .actors
+ .iter()
+ .map(|actor| (actor.actor_id, vec![]))
+ .collect();
+ let actor_hashset: HashSet = empty_actor_splits.keys().cloned().collect();
+ let splits = handle.discovered_splits(source_id, &actor_hashset).await?;
+ if splits.is_empty() {
+ tracing::warn!("no splits detected for source {}", source_id);
+ continue 'loop_source;
+ }
+ if let Some(diff) = reassign_splits(
+ fragment_id,
+ empty_actor_splits,
+ &splits,
+ SplitDiffOptions::default(),
+ ) {
+ assigned.insert(fragment_id, diff);
+ }
+ }
+ }
+ Ok(assigned)
+ }
+ /// Allocates splits to actors for replace source job.
+ pub async fn allocate_splits_for_replace_source(
+ &self,
+ job_id: &TableId,
+ merge_updates: &Vec,
+ ) -> MetaResult {
+ tracing::debug!(?merge_updates, "allocate_splits_for_replace_source");
+ if merge_updates.is_empty() {
+ // no existing downstream. We can just re-allocate splits arbitrarily.
+ return self.allocate_splits(job_id).await;
+ }
+ let core = self.core.lock().await;
+ let table_fragments = core
+ .metadata_manager
+ .get_job_fragments_by_id(job_id)
+ .await?;
+ let source_fragments = table_fragments.stream_source_fragments();
+ assert_eq!(
+ source_fragments.len(),
+ 1,
+ "replace source job should only have one source"
+ );
+ let (_source_id, fragments) = source_fragments.into_iter().next().unwrap();
+ assert_eq!(
+ fragments.len(),
+ 1,
+ "replace source job should only have one fragment"
+ );
+ let fragment_id = fragments.into_iter().next().unwrap();
+ debug_assert!(
+ !merge_updates.is_empty()
+ && merge_updates.iter().all(|merge_update| {
+ merge_update.upstream_fragment_id == merge_updates[0].upstream_fragment_id
+ && merge_update.new_upstream_fragment_id == Some(fragment_id)
+ }),
+ "merge update should only replace one fragment: {:?}",
+ merge_updates
+ );
+ let prev_fragment_id = merge_updates[0].upstream_fragment_id;
+ // Here we align the new source executor to backfill executors
+ //
+ // old_source => new_source backfill_1
+ // actor_x1 => actor_y1 -----┬------>actor_a1
+ // actor_x2 => actor_y2 -----┼-┬---->actor_a2
+ // │ │
+ // │ │ backfill_2
+ // └─┼---->actor_b1
+ // â””---->actor_b2
+ //
+ // Note: we can choose any backfill actor to align here.
+ // We use `HashMap` to dedup.
+ let aligned_actors: HashMap = merge_updates
+ .iter()
+ .map(|merge_update| {
+ assert_eq!(merge_update.added_upstream_actor_id.len(), 1);
+ // Note: removed_upstream_actor_id is not set for replace job, so we can't use it.
+ assert_eq!(merge_update.removed_upstream_actor_id.len(), 0);
+ (
+ merge_update.added_upstream_actor_id[0],
+ merge_update.actor_id,
+ )
+ })
+ .collect();
+ let assignment = align_splits(
+ aligned_actors.into_iter(),
+ &core.actor_splits,
+ fragment_id,
+ prev_fragment_id,
+ )?;
+ Ok(HashMap::from([(fragment_id, assignment)]))
+ }
+ /// Allocates splits to actors for a newly created `SourceBackfill` executor.
+ ///
+ /// Unlike [`Self::allocate_splits`], which creates a new assignment,
+ /// this method aligns the splits for backfill fragments with its upstream source fragment ([`align_splits`]).
+ pub async fn allocate_splits_for_backfill(
+ &self,
+ table_id: &TableId,
+ // dispatchers from SourceExecutor to SourceBackfillExecutor
+ dispatchers: &HashMap>,
+ ) -> MetaResult {
+ let core = self.core.lock().await;
+ let table_fragments = core
+ .metadata_manager
+ .get_job_fragments_by_id(table_id)
+ .await?;
+ let source_backfill_fragments = table_fragments.source_backfill_fragments()?;
+ let mut assigned = HashMap::new();
+ for (_source_id, fragments) in source_backfill_fragments {
+ for (fragment_id, upstream_source_fragment_id) in fragments {
+ let upstream_actors = core
+ .metadata_manager
+ .get_running_actors_of_fragment(upstream_source_fragment_id)
+ .await?;
+ let mut backfill_actors = vec![];
+ for upstream_actor in upstream_actors {
+ if let Some(dispatchers) = dispatchers.get(&upstream_actor) {
+ let err = || {
+ anyhow::anyhow!(
+ "source backfill fragment's upstream fragment should have one dispatcher, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, upstream_actor: {upstream_actor}, dispatchers: {dispatchers:?}",
+ fragment_id = fragment_id,
+ upstream_source_fragment_id = upstream_source_fragment_id,
+ upstream_actor = upstream_actor,
+ dispatchers = dispatchers
+ )
+ };
+ if dispatchers.len() != 1 || dispatchers[0].downstream_actor_id.len() != 1 {
+ return Err(err().into());
+ }
+ backfill_actors
+ .push((dispatchers[0].downstream_actor_id[0], upstream_actor));
+ }
+ }
+ assigned.insert(
+ fragment_id,
+ align_splits(
+ backfill_actors,
+ &core.actor_splits,
+ fragment_id,
+ upstream_source_fragment_id,
+ )?,
+ );
+ }
+ }
+ Ok(assigned)
+ }
+impl SourceManagerCore {
+ /// Checks whether the external source metadata has changed,
+ /// and re-assigns splits if there's a diff.
+ ///
+ /// `self.actor_splits` will not be updated. It will be updated by `Self::apply_source_change`,
+ /// after the mutation barrier has been collected.
+ pub async fn reassign_splits(&self) -> MetaResult> {
+ let mut split_assignment: SplitAssignment = HashMap::new();
+ 'loop_source: for (source_id, handle) in &self.managed_sources {
+ let source_fragment_ids = match self.source_fragments.get(source_id) {
+ Some(fragment_ids) if !fragment_ids.is_empty() => fragment_ids,
+ _ => {
+ continue;
+ }
+ };
+ let backfill_fragment_ids = self.backfill_fragments.get(source_id);
+ 'loop_fragment: for &fragment_id in source_fragment_ids {
+ let actors = match self
+ .metadata_manager
+ .get_running_actors_of_fragment(fragment_id)
+ .await
+ {
+ Ok(actors) => {
+ if actors.is_empty() {
+ tracing::warn!("No actors found for fragment {}", fragment_id);
+ continue 'loop_fragment;
+ }
+ actors
+ }
+ Err(err) => {
+ tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
+ continue 'loop_fragment;
+ }
+ };
+ let discovered_splits = handle.discovered_splits(*source_id, &actors).await?;
+ if discovered_splits.is_empty() {
+ // The discover loop for this source is not ready yet; we'll wait for the next run
+ continue 'loop_source;
+ }
+ let prev_actor_splits: HashMap<_, _> = actors
+ .into_iter()
+ .map(|actor_id| {
+ (
+ actor_id,
+ self.actor_splits
+ .get(&actor_id)
+ .cloned()
+ .unwrap_or_default(),
+ )
+ })
+ .collect();
+ if let Some(new_assignment) = reassign_splits(
+ fragment_id,
+ prev_actor_splits,
+ &discovered_splits,
+ SplitDiffOptions {
+ enable_scale_in: handle.enable_drop_split,
+ enable_adaptive: handle.enable_adaptive_splits,
+ },
+ ) {
+ split_assignment.insert(fragment_id, new_assignment);
+ }
+ }
+ if let Some(backfill_fragment_ids) = backfill_fragment_ids {
+ // align splits for backfill fragments with its upstream source fragment
+ for (fragment_id, upstream_fragment_id) in backfill_fragment_ids {
+ let Some(upstream_assignment) = split_assignment.get(upstream_fragment_id)
+ else {
+ // upstream fragment unchanged, do not update backfill fragment too
+ continue;
+ };
+ let actors = match self
+ .metadata_manager
+ .get_running_actors_for_source_backfill(*fragment_id)
+ .await
+ {
+ Ok(actors) => {
+ if actors.is_empty() {
+ tracing::warn!("No actors found for fragment {}", fragment_id);
+ continue;
+ }
+ actors
+ }
+ Err(err) => {
+ tracing::warn!(error = %err.as_report(),"Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore");
+ continue;
+ }
+ };
+ split_assignment.insert(
+ *fragment_id,
+ align_splits(
+ actors,
+ upstream_assignment,
+ *fragment_id,
+ *upstream_fragment_id,
+ )?,
+ );
+ }
+ }
+ }
+ self.metadata_manager
+ .split_fragment_map_by_database(split_assignment)
+ .await
+ }
+/// Reassigns splits if there are new splits or dropped splits,
+/// i.e., `actor_splits` and `discovered_splits` differ, or actors are rescheduled.
+/// The existing splits will remain unmoved in their currently assigned actor.
+/// If an actor has an upstream actor, it should be a backfill executor,
+/// and its splits should be aligned with the upstream actor. **`reassign_splits` should not be used in this case.
+/// Use [`align_splits`] instead.**
+/// - `fragment_id`: just for logging
+/// ## Different connectors' behavior of split change
+/// ### Kafka and Pulsar
+/// They only support increasing the number of splits via adding new empty splits.
+/// Old data is not moved.
+/// ### Kinesis
+/// It supports *pairwise* shard split and merge.
+/// In both cases, old data remain in the old shard(s) and the old shard is still available.
+/// New data are routed to the new shard(s).
+/// After the retention period has expired, the old shard will become `EXPIRED` and isn't
+/// listed any more. In other words, the total number of shards will first increase and then decrease.
+/// See also:
+/// - [Kinesis resharding doc](
+/// - An example of how the shards can be like:
+fn reassign_splits(
+ fragment_id: FragmentId,
+ actor_splits: HashMap>,
+ discovered_splits: &BTreeMap,
+ opts: SplitDiffOptions,
+) -> Option>>
+ T: SplitMetaData + Clone,
+ // if no actors, return
+ if actor_splits.is_empty() {
+ return None;
+ }
+ let prev_split_ids: HashSet<_> = actor_splits
+ .values()
+ .flat_map(|splits| splits.iter().map(SplitMetaData::id))
+ .collect();
+ tracing::trace!(fragment_id, prev_split_ids = ?prev_split_ids, "previous splits");
+ tracing::trace!(fragment_id, prev_split_ids = ?discovered_splits.keys(), "discovered splits");
+ let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
+ let dropped_splits: HashSet<_> = prev_split_ids
+ .difference(&discovered_split_ids)
+ .cloned()
+ .collect();
+ if !dropped_splits.is_empty() {
+ if opts.enable_scale_in {
+ tracing::info!(fragment_id, dropped_spltis = ?dropped_splits, "new dropped splits");
+ } else {
+ tracing::warn!(fragment_id, dropped_spltis = ?dropped_splits, "split dropping happened, but it is not allowed");
+ }
+ }
+ let new_discovered_splits: BTreeSet<_> = discovered_split_ids
+ .into_iter()
+ .filter(|split_id| !prev_split_ids.contains(split_id))
+ .collect();
+ if opts.enable_scale_in || opts.enable_adaptive {
+ // if we support scale in, no more splits are discovered, and no splits are dropped, return
+ // we need to check if discovered_split_ids is empty, because if it is empty, we need to
+ // handle the case of scale in to zero (like deleting all objects from s3)
+ if dropped_splits.is_empty()
+ && new_discovered_splits.is_empty()
+ && !discovered_splits.is_empty()
+ {
+ return None;
+ }
+ } else {
+ // if we do not support scale in, and no more splits are discovered, return
+ if new_discovered_splits.is_empty() && !discovered_splits.is_empty() {
+ return None;
+ }
+ }
+ tracing::info!(fragment_id, new_discovered_splits = ?new_discovered_splits, "new discovered splits");
+ let mut heap = BinaryHeap::with_capacity(actor_splits.len());
+ for (actor_id, mut splits) in actor_splits {
+ if opts.enable_scale_in || opts.enable_adaptive {
+ splits.retain(|split| !dropped_splits.contains(&;
+ }
+ heap.push(ActorSplitsAssignment { actor_id, splits })
+ }
+ for split_id in new_discovered_splits {
+ // ActorSplitsAssignment's Ord is reversed, so this is min heap, i.e.,
+ // we get the assignment with the least splits here.
+ // Note: If multiple actors have the same number of splits, it will be randomly picked.
+ // When the number of source actors is larger than the number of splits,
+ // It's possible that the assignment is uneven.
+ // e.g.,
+ // TODO: We should make the assignment rack-aware to make sure it's even.
+ let mut peek_ref = heap.peek_mut().unwrap();
+ peek_ref
+ .splits
+ .push(discovered_splits.get(&split_id).cloned().unwrap());
+ }
+ Some(
+ heap.into_iter()
+ .map(|ActorSplitsAssignment { actor_id, splits }| (actor_id, splits))
+ .collect(),
+ )
+/// Assign splits to a new set of actors, according to existing assignment.
+/// illustration:
+/// ```text
+/// upstream new
+/// actor x1 [split 1, split2] -> actor y1 [split 1, split2]
+/// actor x2 [split 3] -> actor y2 [split 3]
+/// ...
+/// ```
+fn align_splits(
+ // (actor_id, upstream_actor_id)
+ aligned_actors: impl IntoIterator
- ,
+ existing_assignment: &HashMap>,
+ fragment_id: FragmentId,
+ upstream_source_fragment_id: FragmentId,
+) -> anyhow::Result>> {
+ aligned_actors
+ .into_iter()
+ .map(|(actor_id, upstream_actor_id)| {
+ let Some(splits) = existing_assignment.get(&upstream_actor_id) else {
+ return Err(anyhow::anyhow!("upstream assignment not found, fragment_id: {fragment_id}, upstream_fragment_id: {upstream_source_fragment_id}, actor_id: {actor_id}, upstream_assignment: {existing_assignment:?}, upstream_actor_id: {upstream_actor_id:?}"));
+ };
+ Ok((
+ actor_id,
+ splits.clone(),
+ ))
+ })
+ .collect()
+/// Note: the `PartialEq` and `Ord` impl just compares the number of splits.
+struct ActorSplitsAssignment {
+ actor_id: ActorId,
+ splits: Vec,
+impl Eq for ActorSplitsAssignment {}
+impl PartialEq for ActorSplitsAssignment {
+ fn eq(&self, other: &Self) -> bool {
+ self.splits.len() == other.splits.len()
+ }
+impl PartialOrd for ActorSplitsAssignment {
+ fn partial_cmp(&self, other: &Self) -> Option {
+ Some(self.cmp(other))
+ }
+impl Ord for ActorSplitsAssignment {
+ fn cmp(&self, other: &Self) -> Ordering {
+ // Note: this is reversed order, to make BinaryHeap a min heap.
+ other.splits.len().cmp(&self.splits.len())
+ }
+pub struct SplitDiffOptions {
+ pub enable_scale_in: bool,
+ /// For most connectors, this should be false. When enabled, RisingWave will not track any progress.
+ pub enable_adaptive: bool,
+impl Default for SplitDiffOptions {
+ fn default() -> Self {
+ SplitDiffOptions {
+ enable_scale_in: false,
+ enable_adaptive: false,
+ }
+ }
+mod tests {
+ use std::collections::{BTreeMap, HashMap, HashSet};
+ use risingwave_common::types::JsonbVal;
+ use risingwave_connector::error::ConnectorResult;
+ use risingwave_connector::source::{SplitId, SplitMetaData};
+ use serde::{Deserialize, Serialize};
+ use super::*;
+ use crate::model::{ActorId, FragmentId};
+ #[derive(Debug, Copy, Clone, Serialize, Deserialize)]
+ struct TestSplit {
+ id: u32,
+ }
+ impl SplitMetaData for TestSplit {
+ fn id(&self) -> SplitId {
+ format!("{}",
+ }
+ fn encode_to_json(&self) -> JsonbVal {
+ serde_json::to_value(*self).unwrap().into()
+ }
+ fn restore_from_json(value: JsonbVal) -> ConnectorResult {
+ serde_json::from_value(value.take()).map_err(Into::into)
+ }
+ fn update_offset(&mut self, _last_read_offset: String) -> ConnectorResult<()> {
+ Ok(())
+ }
+ }
+ fn check_all_splits(
+ discovered_splits: &BTreeMap,
+ diff: &HashMap>,
+ ) {
+ let mut split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
+ for splits in diff.values() {
+ for split in splits {
+ assert!(split_ids.remove(&
+ }
+ }
+ assert!(split_ids.is_empty());
+ }
+ #[test]
+ fn test_drop_splits() {
+ let mut actor_splits: HashMap = HashMap::new();
+ actor_splits.insert(0, vec![TestSplit { id: 0 }, TestSplit { id: 1 }]);
+ actor_splits.insert(1, vec![TestSplit { id: 2 }, TestSplit { id: 3 }]);
+ actor_splits.insert(2, vec![TestSplit { id: 4 }, TestSplit { id: 5 }]);
+ let mut prev_split_to_actor = HashMap::new();
+ for (actor_id, splits) in &actor_splits {
+ for split in splits {
+ prev_split_to_actor.insert(, *actor_id);
+ }
+ }
+ let discovered_splits: BTreeMap = (1..5)
+ .map(|i| {
+ let split = TestSplit { id: i };
+ (, split)
+ })
+ .collect();
+ let opts = SplitDiffOptions {
+ enable_scale_in: true,
+ enable_adaptive: false,
+ };
+ let prev_split_ids: HashSet<_> = actor_splits
+ .values()
+ .flat_map(|splits| splits.iter().map(|split|
+ .collect();
+ let diff = reassign_splits(
+ FragmentId::default(),
+ actor_splits,
+ &discovered_splits,
+ opts,
+ )
+ .unwrap();
+ check_all_splits(&discovered_splits, &diff);
+ let mut after_split_to_actor = HashMap::new();
+ for (actor_id, splits) in &diff {
+ for split in splits {
+ after_split_to_actor.insert(, *actor_id);
+ }
+ }
+ let discovered_split_ids: HashSet<_> = discovered_splits.keys().cloned().collect();
+ let retained_split_ids: HashSet<_> =
+ prev_split_ids.intersection(&discovered_split_ids).collect();
+ for retained_split_id in retained_split_ids {
+ assert_eq!(
+ prev_split_to_actor.get(retained_split_id),
+ after_split_to_actor.get(retained_split_id)
+ )
+ }
+ }
+ #[test]
+ fn test_drop_splits_to_empty() {
+ let mut actor_splits: HashMap = HashMap::new();
+ actor_splits.insert(0, vec![TestSplit { id: 0 }]);
+ let discovered_splits: BTreeMap = BTreeMap::new();
+ let opts = SplitDiffOptions {
+ enable_scale_in: true,
+ enable_adaptive: false,
+ };
+ let diff = reassign_splits(
+ FragmentId::default(),
+ actor_splits,
+ &discovered_splits,
+ opts,
+ )
+ .unwrap();
+ assert!(!diff.is_empty())
+ }
+ #[test]
+ fn test_reassign_splits() {
+ let actor_splits = HashMap::new();
+ let discovered_splits: BTreeMap = BTreeMap::new();
+ assert!(reassign_splits(
+ FragmentId::default(),
+ actor_splits,
+ &discovered_splits,
+ Default::default()
+ )
+ .is_none());
+ let actor_splits = (0..3).map(|i| (i, vec![])).collect();
+ let discovered_splits: BTreeMap = BTreeMap::new();
+ let diff = reassign_splits(
+ FragmentId::default(),
+ actor_splits,
+ &discovered_splits,
+ Default::default(),
+ )
+ .unwrap();
+ assert_eq!(diff.len(), 3);
+ for splits in diff.values() {
+ assert!(splits.is_empty())
+ }
+ let actor_splits = (0..3).map(|i| (i, vec![])).collect();
+ let discovered_splits: BTreeMap = (0..3)
+ .map(|i| {
+ let split = TestSplit { id: i };
+ (, split)
+ })
+ .collect();
+ let diff = reassign_splits(
+ FragmentId::default(),
+ actor_splits,
+ &discovered_splits,
+ Default::default(),
+ )
+ .unwrap();
+ assert_eq!(diff.len(), 3);
+ for splits in diff.values() {
+ assert_eq!(splits.len(), 1);
+ }
+ check_all_splits(&discovered_splits, &diff);
+ let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
+ let discovered_splits: BTreeMap = (0..5)
+ .map(|i| {
+ let split = TestSplit { id: i };
+ (, split)
+ })
+ .collect();
+ let diff = reassign_splits(
+ FragmentId::default(),
+ actor_splits,
+ &discovered_splits,
+ Default::default(),
+ )
+ .unwrap();
+ assert_eq!(diff.len(), 3);
+ for splits in diff.values() {
+ let len = splits.len();
+ assert!(len == 1 || len == 2);
+ }
+ check_all_splits(&discovered_splits, &diff);
+ let mut actor_splits: HashMap> =
+ (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
+ actor_splits.insert(3, vec![]);
+ actor_splits.insert(4, vec![]);
+ let discovered_splits: BTreeMap = (0..5)
+ .map(|i| {
+ let split = TestSplit { id: i };
+ (, split)
+ })
+ .collect();
+ let diff = reassign_splits(
+ FragmentId::default(),
+ actor_splits,
+ &discovered_splits,
+ Default::default(),
+ )
+ .unwrap();
+ assert_eq!(diff.len(), 5);
+ for splits in diff.values() {
+ assert_eq!(splits.len(), 1);
+ }
+ check_all_splits(&discovered_splits, &diff);
+ }
diff --git a/src/meta/src/stream/source_manager/ b/src/meta/src/stream/source_manager/
new file mode 100644
index 0000000000000..6f6d647ffb78f
--- /dev/null
+++ b/src/meta/src/stream/source_manager/
@@ -0,0 +1,320 @@
+// Copyright 2025 RisingWave Labs
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+use super::*;
+const MAX_FAIL_CNT: u32 = 10;
+const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
+pub struct SharedSplitMap {
+ pub splits: Option>,
+type SharedSplitMapRef = Arc>;
+/// `ConnectorSourceWorker` keeps fetching the latest split metadata from the external source service ([`Self::tick`]),
+/// and maintains it in `current_splits`.
+pub struct ConnectorSourceWorker {
+ source_id: SourceId,
+ source_name: String,
+ current_splits: SharedSplitMapRef,
+ enumerator: P::SplitEnumerator,
+ period: Duration,
+ metrics: Arc,
+ connector_properties: P,
+ fail_cnt: u32,
+ source_is_up: LabelGuardedIntGauge<2>,
+fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult {
+ let options_with_secret =
+ WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
+ let mut properties = ConnectorProperties::extract(options_with_secret, false)?;
+ properties.init_from_pb_source(source);
+ Ok(properties)
+fn extract_prop_from_new_source(source: &Source) -> ConnectorResult {
+ let options_with_secret =
+ WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
+ let mut properties = ConnectorProperties::extract(options_with_secret, true)?;
+ properties.init_from_pb_source(source);
+ Ok(properties)
+/// Used to create a new [`ConnectorSourceWorkerHandle`] for a new source.
+/// It will call [`ConnectorSourceWorker::tick()`] to fetch split metadata once before returning.
+pub async fn create_source_worker(
+ source: &Source,
+ metrics: Arc,
+) -> MetaResult {
+ tracing::info!("spawning new watcher for source {}",;
+ let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
+ let current_splits_ref = splits.clone();
+ let connector_properties = extract_prop_from_new_source(source)?;
+ let enable_scale_in = connector_properties.enable_drop_split();
+ let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
+ let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();
+ let handle = dispatch_source_prop!(connector_properties, prop, {
+ let mut worker = ConnectorSourceWorker::create(
+ source,
+ *prop,
+ current_splits_ref.clone(),
+ metrics,
+ )
+ .await?;
+ // if fail to fetch meta info, will refuse to create source
+ // todo: make the timeout configurable, longer than ``
+ // in kafka
+ tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick())
+ .await
+ .ok()
+ .with_context(|| {
+ format!(
+ "failed to fetch meta info for source {}, timeout {:?}",
+ )
+ })??;
+ tokio::spawn(async move { })
+ });
+ Ok(ConnectorSourceWorkerHandle {
+ handle,
+ sync_call_tx,
+ splits,
+ enable_drop_split: enable_scale_in,
+ enable_adaptive_splits,
+ })
+/// Used on startup ([`SourceManager::new`]). Failed sources will not block meta startup.
+pub fn create_source_worker_async(
+ source: Source,
+ managed_sources: &mut HashMap,
+ metrics: Arc,
+) -> MetaResult<()> {
+ tracing::info!("spawning new watcher for source {}",;
+ let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
+ let current_splits_ref = splits.clone();
+ let source_id =;
+ let connector_properties = extract_prop_from_existing_source(&source)?;
+ let enable_drop_split = connector_properties.enable_drop_split();
+ let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
+ let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();
+ let handle = tokio::spawn(async move {
+ let mut ticker = time::interval(DEFAULT_SOURCE_WORKER_TICK_INTERVAL);
+ ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
+ dispatch_source_prop!(connector_properties, prop, {
+ let mut worker = loop {
+ ticker.tick().await;
+ match ConnectorSourceWorker::create(
+ &source,
+ prop.deref().clone(),
+ current_splits_ref.clone(),
+ metrics.clone(),
+ )
+ .await
+ {
+ Ok(worker) => {
+ break worker;
+ }
+ Err(e) => {
+ tracing::warn!(error = %e.as_report(), "failed to create source worker");
+ }
+ }
+ };
+ });
+ });
+ managed_sources.insert(
+ source_id as SourceId,
+ ConnectorSourceWorkerHandle {
+ handle,
+ sync_call_tx,
+ splits,
+ enable_drop_split,
+ enable_adaptive_splits,
+ },
+ );
+ Ok(())
+const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30);
+impl ConnectorSourceWorker
+ /// Recreate the `SplitEnumerator` to establish a new connection to the external source service.
+ async fn refresh(&mut self) -> MetaResult<()> {
+ let enumerator = P::SplitEnumerator::new(
+ self.connector_properties.clone(),
+ Arc::new(SourceEnumeratorContext {
+ metrics: self.metrics.source_enumerator_metrics.clone(),
+ info: SourceEnumeratorInfo {
+ source_id: self.source_id as u32,
+ },
+ }),
+ )
+ .await
+ .context("failed to create SplitEnumerator")?;
+ self.enumerator = enumerator;
+ self.fail_cnt = 0;
+ tracing::info!("refreshed source enumerator: {}", self.source_name);
+ Ok(())
+ }
+ /// On creation, connection to the external source service will be established, but `splits`
+ /// will not be updated until `tick` is called.
+ pub async fn create(
+ source: &Source,
+ connector_properties: P,
+ period: Duration,
+ splits: Arc>,
+ metrics: Arc,
+ ) -> MetaResult {
+ let enumerator = P::SplitEnumerator::new(
+ connector_properties.clone(),
+ Arc::new(SourceEnumeratorContext {
+ metrics: metrics.source_enumerator_metrics.clone(),
+ info: SourceEnumeratorInfo {
+ source_id:,
+ },
+ }),
+ )
+ .await
+ .context("failed to create SplitEnumerator")?;
+ let source_is_up = metrics
+ .source_is_up
+ .with_guarded_label_values(&[, &]);
+ Ok(Self {
+ source_id: as SourceId,
+ source_name:,
+ current_splits: splits,
+ enumerator,
+ period,
+ metrics,
+ connector_properties,
+ fail_cnt: 0,
+ source_is_up,
+ })
+ }
+ pub async fn run(
+ &mut self,
+ mut sync_call_rx: UnboundedReceiver>>,
+ ) {
+ let mut interval = time::interval(self.period);
+ interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
+ loop {
+ select! {
+ biased;
+ tx = sync_call_rx.borrow_mut().recv() => {
+ if let Some(tx) = tx {
+ let _ = tx.send(self.tick().await);
+ }
+ }
+ _ = interval.tick() => {
+ if self.fail_cnt > MAX_FAIL_CNT {
+ if let Err(e) = self.refresh().await {
+ tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
+ }
+ }
+ if let Err(e) = self.tick().await {
+ tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker");
+ }
+ }
+ }
+ }
+ }
+ /// Uses [`SplitEnumerator`] to fetch the latest split metadata from the external source service.
+ async fn tick(&mut self) -> MetaResult<()> {
+ let source_is_up = |res: i64| {
+ self.source_is_up.set(res);
+ };
+ let splits = self.enumerator.list_splits().await.inspect_err(|_| {
+ source_is_up(0);
+ self.fail_cnt += 1;
+ })?;
+ source_is_up(1);
+ self.fail_cnt = 0;
+ let mut current_splits = self.current_splits.lock().await;
+ current_splits.splits.replace(
+ splits
+ .into_iter()
+ .map(|split| (, P::Split::into(split)))
+ .collect(),
+ );
+ Ok(())
+ }
+/// Handle for a running [`ConnectorSourceWorker`].
+pub struct ConnectorSourceWorkerHandle {
+ pub handle: JoinHandle<()>,
+ pub sync_call_tx: UnboundedSender>>,
+ pub splits: SharedSplitMapRef,
+ pub enable_drop_split: bool,
+ pub enable_adaptive_splits: bool,
+impl ConnectorSourceWorkerHandle {
+ pub fn get_enable_adaptive_splits(&self) -> bool {
+ self.enable_adaptive_splits
+ }
+ pub async fn discovered_splits(
+ &self,
+ source_id: SourceId,
+ actors: &HashSet,
+ ) -> MetaResult, SplitImpl>> {
+ let Some(mut discovered_splits) = self.splits.lock().await.splits.clone() else {
+ tracing::info!(
+ "The discover loop for source {} is not ready yet; we'll wait for the next run",
+ source_id
+ );
+ return Ok(BTreeMap::new());
+ };
+ if discovered_splits.is_empty() {
+ tracing::warn!("No splits discovered for source {}", source_id);
+ }
+ if self.enable_adaptive_splits {
+ // Connector supporting adaptive splits returns just one split, and we need to make the number of splits equal to the number of actors in this fragment.
+ // Because we Risingwave consume the splits statelessly and we do not need to keep the id internally, we always use actor_id as split_id.
+ // And prev splits record should be dropped via CN.
+ debug_assert!(self.enable_drop_split);
+ debug_assert!(discovered_splits.len() == 1);
+ discovered_splits =
+ fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?;
+ }
+ Ok(discovered_splits)
+ }
diff --git a/src/sqlparser/src/ast/ b/src/sqlparser/src/ast/
index 16496b71c97eb..98f8599ccf89a 100644
--- a/src/sqlparser/src/ast/
+++ b/src/sqlparser/src/ast/
@@ -1349,6 +1349,7 @@ pub enum Statement {
CreateFunction {
or_replace: bool,
temporary: bool,
+ if_not_exists: bool,
name: ObjectName,
args: Option>,
returns: Option,
@@ -1361,6 +1362,7 @@ pub enum Statement {
/// Postgres:
CreateAggregate {
or_replace: bool,
+ if_not_exists: bool,
name: ObjectName,
args: Vec,
returns: DataType,
@@ -1768,6 +1770,7 @@ impl fmt::Display for Statement {
Statement::CreateFunction {
+ if_not_exists,
@@ -1776,9 +1779,10 @@ impl fmt::Display for Statement {
} => {
- "CREATE {or_replace}{temp}FUNCTION {name}",
+ "CREATE {or_replace}{temp}FUNCTION {if_not_exists}{name}",
temp = if *temporary { "TEMPORARY " } else { "" },
or_replace = if *or_replace { "OR REPLACE " } else { "" },
+ if_not_exists = if *if_not_exists { "IF NOT EXISTS " } else { "" },
if let Some(args) = args {
write!(f, "({})", display_comma_separated(args))?;
@@ -1792,6 +1796,7 @@ impl fmt::Display for Statement {
Statement::CreateAggregate {
+ if_not_exists,
@@ -1800,8 +1805,9 @@ impl fmt::Display for Statement {
} => {
- "CREATE {or_replace}AGGREGATE {name}",
+ "CREATE {or_replace}AGGREGATE {if_not_exists}{name}",
or_replace = if *or_replace { "OR REPLACE " } else { "" },
+ if_not_exists = if *if_not_exists { "IF NOT EXISTS " } else { "" },
write!(f, "({})", display_comma_separated(args))?;
write!(f, " RETURNS {}", returns)?;
@@ -3551,8 +3557,9 @@ mod tests {
fn test_create_function_display() {
let create_function = Statement::CreateFunction {
- temporary: false,
or_replace: false,
+ temporary: false,
+ if_not_exists: false,
name: ObjectName(vec![Ident::new_unchecked("foo")]),
args: Some(vec![OperateFunctionArg::unnamed(DataType::Int)]),
returns: Some(CreateFunctionReturns::Value(DataType::Int)),
@@ -3573,8 +3580,9 @@ mod tests {
format!("{}", create_function)
let create_function = Statement::CreateFunction {
- temporary: false,
or_replace: false,
+ temporary: false,
+ if_not_exists: false,
name: ObjectName(vec![Ident::new_unchecked("foo")]),
args: Some(vec![OperateFunctionArg::unnamed(DataType::Int)]),
returns: Some(CreateFunctionReturns::Value(DataType::Int)),
diff --git a/src/sqlparser/src/ b/src/sqlparser/src/
index 28dff1c076a90..273d038f97556 100644
--- a/src/sqlparser/src/
+++ b/src/sqlparser/src/
@@ -2210,6 +2210,8 @@ impl Parser<'_> {
or_replace: bool,
temporary: bool,
) -> PResult {
+ impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], self);
let name = self.parse_object_name()?;
let args = if self.peek_token().token == Token::RParen {
@@ -2248,6 +2250,7 @@ impl Parser<'_> {
Ok(Statement::CreateFunction {
+ if_not_exists,
returns: return_type,
@@ -2257,6 +2260,8 @@ impl Parser<'_> {
fn parse_create_aggregate(&mut self, or_replace: bool) -> PResult {
+ impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], self);
let name = self.parse_object_name()?;
let args = self.parse_comma_separated(Parser::parse_function_arg)?;
@@ -2270,6 +2275,7 @@ impl Parser<'_> {
Ok(Statement::CreateAggregate {
+ if_not_exists,
@@ -4250,17 +4256,17 @@ impl Parser<'_> {
- let limit = if self.parse_keyword(Keyword::LIMIT) {
- self.parse_limit()?
- } else {
- None
- };
+ let mut limit = None;
+ let mut offset = None;
+ for _x in 0..2 {
+ if limit.is_none() && self.parse_keyword(Keyword::LIMIT) {
+ limit = self.parse_limit()?
+ }
- let offset = if self.parse_keyword(Keyword::OFFSET) {
- Some(self.parse_offset()?)
- } else {
- None
- };
+ if offset.is_none() && self.parse_keyword(Keyword::OFFSET) {
+ offset = Some(self.parse_offset()?)
+ }
+ }
let fetch = if self.parse_keyword(Keyword::FETCH) {
if limit.is_some() {
diff --git a/src/sqlparser/tests/ b/src/sqlparser/tests/
index 7acf6d29b4444..1466a9024a6d5 100644
--- a/src/sqlparser/tests/
+++ b/src/sqlparser/tests/
@@ -753,6 +753,7 @@ fn parse_create_function() {
Statement::CreateFunction {
or_replace: false,
temporary: false,
+ if_not_exists: false,
name: ObjectName(vec![Ident::new_unchecked("add")]),
args: Some(vec![
@@ -777,6 +778,7 @@ fn parse_create_function() {
Statement::CreateFunction {
or_replace: false,
temporary: false,
+ if_not_exists: false,
name: ObjectName(vec![Ident::new_unchecked("sub")]),
args: Some(vec![
@@ -801,6 +803,7 @@ fn parse_create_function() {
Statement::CreateFunction {
or_replace: false,
temporary: false,
+ if_not_exists: false,
name: ObjectName(vec![Ident::new_unchecked("return_test")]),
args: Some(vec![
@@ -826,6 +829,7 @@ fn parse_create_function() {
Statement::CreateFunction {
or_replace: true,
temporary: false,
+ if_not_exists: false,
name: ObjectName(vec![Ident::new_unchecked("add")]),
args: Some(vec![
OperateFunctionArg::with_name("a", DataType::Int),
@@ -851,12 +855,14 @@ fn parse_create_function() {
+ let sql =
Statement::CreateFunction {
or_replace: false,
- temporary: false,
+ temporary: true,
+ if_not_exists: false,
name: ObjectName(vec![Ident::new_unchecked("unnest")]),
args: Some(vec![OperateFunctionArg::with_name(
@@ -874,6 +880,32 @@ fn parse_create_function() {
with_options: Default::default(),
+ let sql =
+ assert_eq!(
+ verified_stmt(sql),
+ Statement::CreateFunction {
+ or_replace: false,
+ temporary: false,
+ if_not_exists: true,
+ name: ObjectName(vec![Ident::new_unchecked("add")]),
+ args: Some(vec![
+ OperateFunctionArg::unnamed(DataType::Int),
+ OperateFunctionArg::unnamed(DataType::Int),
+ ]),
+ returns: Some(CreateFunctionReturns::Value(DataType::Int)),
+ params: CreateFunctionBody {
+ language: Some("SQL".into()),
+ behavior: Some(FunctionBehavior::Immutable),
+ as_: Some(FunctionDefinition::SingleQuotedDef(
+ "select $1 + $2;".into()
+ )),
+ ..Default::default()
+ },
+ with_options: Default::default(),
+ }
+ );
@@ -884,6 +916,27 @@ fn parse_create_aggregate() {
Statement::CreateAggregate {
or_replace: true,
+ if_not_exists: false,
+ name: ObjectName(vec![Ident::new_unchecked("sum")]),
+ args: vec![OperateFunctionArg::unnamed(DataType::Int)],
+ returns: DataType::BigInt,
+ append_only: true,
+ params: CreateFunctionBody {
+ language: Some("python".into()),
+ as_: Some(FunctionDefinition::SingleQuotedDef("sum".into())),
+ using: Some(CreateFunctionUsing::Link("xxx".into())),
+ ..Default::default()
+ },
+ }
+ );
+ let sql =
+ assert_eq!(
+ verified_stmt(sql),
+ Statement::CreateAggregate {
+ or_replace: false,
+ if_not_exists: true,
name: ObjectName(vec![Ident::new_unchecked("sum")]),
args: vec![OperateFunctionArg::unnamed(DataType::Int)],
returns: DataType::BigInt,
diff --git a/src/sqlparser/tests/testdata/select.yaml b/src/sqlparser/tests/testdata/select.yaml
index 72de88bd1dfb4..78f7c6a001dbd 100644
--- a/src/sqlparser/tests/testdata/select.yaml
+++ b/src/sqlparser/tests/testdata/select.yaml
@@ -237,3 +237,25 @@
SELECT a0 + b0 FROM a_log, "B_log"
formatted_sql: WITH a_log AS changelog from a, "B_log" AS changelog from public."B" SELECT a0 + b0 FROM a_log, "B_log"
formatted_ast: 'Query(Query { with: Some(With { recursive: false, cte_tables: [Cte { alias: TableAlias { name: Ident { value: "a_log", quote_style: None }, columns: [] }, cte_inner: ChangeLog(ObjectName([Ident { value: "a", quote_style: None }])) }, Cte { alias: TableAlias { name: Ident { value: "B_log", quote_style: Some(''"'') }, columns: [] }, cte_inner: ChangeLog(ObjectName([Ident { value: "public", quote_style: None }, Ident { value: "B", quote_style: Some(''"'') }])) }] }), body: Select(Select { distinct: All, projection: [UnnamedExpr(BinaryOp { left: Identifier(Ident { value: "a0", quote_style: None }), op: Plus, right: Identifier(Ident { value: "b0", quote_style: None }) })], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "a_log", quote_style: None }]), alias: None, as_of: None }, joins: [] }, TableWithJoins { relation: Table { name: ObjectName([Ident { value: "B_log", quote_style: Some(''"'') }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })'
+- input: SELECT * FROM t LIMIT 1
+ formatted_sql: SELECT * FROM t LIMIT 1
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "t", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: Some(Value(Number("1"))), offset: None, fetch: None })'
+- input: SELECT * FROM t OFFSET 1
+ formatted_sql: SELECT * FROM t OFFSET 1
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "t", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: Some("1"), fetch: None })'
+- input: SELECT * FROM t LIMIT 1 OFFSET 2
+ formatted_sql: SELECT * FROM t LIMIT 1 OFFSET 2
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "t", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: Some(Value(Number("1"))), offset: Some("2"), fetch: None })'
+- input: SELECT * FROM t OFFSET 2 LIMIT 1
+ formatted_sql: SELECT * FROM t LIMIT 1 OFFSET 2
+ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(None)], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "t", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: Some(Value(Number("1"))), offset: Some("2"), fetch: None })'
+- input: SELECT * FROM t LIMIT 1 LIMIT 2
+ error_msg: |-
+ sql parser error: expected end of statement, found: LIMIT
+ ^
+ error_msg: |-
+ sql parser error: expected end of statement, found: OFFSET
+ ^
diff --git a/src/storage/hummock_sdk/src/ b/src/storage/hummock_sdk/src/
index 5a7bf0143c764..5a5c4a647ecbe 100644
--- a/src/storage/hummock_sdk/src/
+++ b/src/storage/hummock_sdk/src/
@@ -190,11 +190,11 @@ pub fn build_table_change_log_delta<'a>(
ChangeLogDelta {
- new_log: Some(EpochNewChangeLog {
+ new_log: EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
epochs: epochs.clone(),
- }),
+ },
@@ -203,7 +203,7 @@ pub fn build_table_change_log_delta<'a>(
for table_id in &sst.table_ids {
match table_change_log.get_mut(&TableId::new(*table_id)) {
Some(log) => {
- log.new_log.as_mut().unwrap().old_value.push(sst.clone());
+ log.new_log.old_value.push(sst.clone());
None => {
warn!(table_id, ?sst, "old value sst contains non-log-store table");
@@ -214,7 +214,7 @@ pub fn build_table_change_log_delta<'a>(
for sst in new_value_ssts {
for table_id in &sst.table_ids {
if let Some(log) = table_change_log.get_mut(&TableId::new(*table_id)) {
- log.new_log.as_mut().unwrap().new_value.push(sst.clone());
+ log.new_log.new_value.push(sst.clone());
@@ -224,7 +224,7 @@ pub fn build_table_change_log_delta<'a>(
#[derive(Debug, PartialEq, Clone)]
pub struct ChangeLogDeltaCommon {
pub truncate_epoch: u64,
- pub new_log: Option>,
+ pub new_log: EpochNewChangeLogCommon,
pub type ChangeLogDelta = ChangeLogDeltaCommon;
@@ -236,7 +236,7 @@ where
fn from(val: &ChangeLogDeltaCommon) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
- new_log: val.new_log.as_ref().map(|a| a.into()),
+ new_log: Some((&val.new_log).into()),
@@ -248,7 +248,7 @@ where
fn from(val: &PbChangeLogDelta) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
- new_log: val.new_log.as_ref().map(|a| a.into()),
+ new_log: val.new_log.as_ref().unwrap().into(),
@@ -260,7 +260,7 @@ where
fn from(val: ChangeLogDeltaCommon) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
- new_log:|a| a.into()),
+ new_log: Some(val.new_log.into()),
@@ -272,7 +272,7 @@ where
fn from(val: PbChangeLogDelta) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
- new_log:|a| a.into()),
+ new_log: val.new_log.unwrap().into(),
diff --git a/src/storage/hummock_sdk/src/compaction_group/ b/src/storage/hummock_sdk/src/compaction_group/
index ee316f75ffd65..eb4bb30e69dc3 100644
--- a/src/storage/hummock_sdk/src/compaction_group/
+++ b/src/storage/hummock_sdk/src/compaction_group/
@@ -693,7 +693,7 @@ impl HummockVersion {
changed_table_info: &HashMap>,
) {
for (table_id, change_log_delta) in change_log_delta {
- let new_change_log = change_log_delta.new_log.as_ref().unwrap();
+ let new_change_log = &change_log_delta.new_log;
match table_change_log.entry(*table_id) {
Entry::Occupied(entry) => {
let change_log = entry.into_mut();
diff --git a/src/storage/hummock_sdk/src/ b/src/storage/hummock_sdk/src/
index 4840b402a292b..11eb045efbe22 100644
--- a/src/storage/hummock_sdk/src/
+++ b/src/storage/hummock_sdk/src/
@@ -151,14 +151,12 @@ impl FrontendHummockVersionDelta {
ChangeLogDeltaCommon {
truncate_epoch: change_log_delta.truncate_epoch,
- new_log: change_log_delta.new_log.as_ref().map(|new_log| {
- EpochNewChangeLogCommon {
- // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
- new_value: vec![(); new_log.new_value.len()],
- old_value: vec![(); new_log.old_value.len()],
- epochs: new_log.epochs.clone(),
- }
- }),
+ new_log: EpochNewChangeLogCommon {
+ // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
+ new_value: vec![(); change_log_delta.new_log.new_value.len()],
+ old_value: vec![(); change_log_delta.new_log.old_value.len()],
+ epochs: change_log_delta.new_log.epochs.clone(),
+ },
@@ -187,11 +185,17 @@ impl FrontendHummockVersionDelta {
PbChangeLogDelta {
- new_log: delta.new_log.as_ref().map(|new_log| PbEpochNewChangeLog {
+ new_log: Some(PbEpochNewChangeLog {
// Here we need to determine if value is null but don't care what the value is, so we fill him in using `PbSstableInfo::default()`
- old_value: vec![PbSstableInfo::default(); new_log.old_value.len()],
- new_value: vec![PbSstableInfo::default(); new_log.new_value.len()],
- epochs: new_log.epochs.clone(),
+ old_value: vec![
+ PbSstableInfo::default();
+ delta.new_log.old_value.len()
+ ],
+ new_value: vec![
+ PbSstableInfo::default();
+ delta.new_log.new_value.len()
+ ],
+ epochs: delta.new_log.epochs.clone(),
truncate_epoch: delta.truncate_epoch,
@@ -228,14 +232,18 @@ impl FrontendHummockVersionDelta {
ChangeLogDeltaCommon {
truncate_epoch: change_log_delta.truncate_epoch,
- new_log: change_log_delta.new_log.as_ref().map(|new_log| {
- EpochNewChangeLogCommon {
- // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
- new_value: vec![(); new_log.new_value.len()],
- old_value: vec![(); new_log.old_value.len()],
- epochs: new_log.epochs.clone(),
- }
- }),
+ new_log: change_log_delta
+ .new_log
+ .as_ref()
+ .map(|new_log| {
+ EpochNewChangeLogCommon {
+ // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
+ new_value: vec![(); new_log.new_value.len()],
+ old_value: vec![(); new_log.old_value.len()],
+ epochs: new_log.epochs.clone(),
+ }
+ })
+ .unwrap(),
diff --git a/src/storage/hummock_sdk/src/ b/src/storage/hummock_sdk/src/
index 0a651260b3789..9f628808909ff 100644
--- a/src/storage/hummock_sdk/src/
+++ b/src/storage/hummock_sdk/src/
@@ -191,15 +191,15 @@ impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockV
- .as_ref()
- .map(|d| {
- d.new_value.iter().chain(d.old_value.iter()).all(|s| {
- s.table_ids
- .iter()
- .any(|tid| time_travel_table_ids.contains(tid))
- })
- })
- .unwrap_or(true));
+ .new_value
+ .iter()
+ .chain(log_delta.new_log.old_value.iter())
+ .all(|s| {
+ s.table_ids
+ .iter()
+ .any(|tid| time_travel_table_ids.contains(tid))
+ }));
Some((*table_id, PbChangeLogDelta::from(log_delta).into()))
diff --git a/src/storage/hummock_sdk/src/ b/src/storage/hummock_sdk/src/
index 09e96860cc839..4c90e6cae47f1 100644
--- a/src/storage/hummock_sdk/src/
+++ b/src/storage/hummock_sdk/src/
@@ -570,7 +570,7 @@ where
.chain(self.change_log_delta.values().flat_map(|delta| {
// TODO: optimization: strip table change log
- let new_log = delta.new_log.as_ref().unwrap();
+ let new_log = &delta.new_log;
@@ -623,8 +623,8 @@ where
ChangeLogDeltaCommon {
- new_log: log_delta.new_log.as_ref().map(Into::into),
truncate_epoch: log_delta.truncate_epoch,
+ new_log: log_delta.new_log.as_ref().unwrap().into(),
@@ -752,7 +752,7 @@ where
ChangeLogDeltaCommon {
- new_log: log_delta.new_log.clone().map(Into::into),
+ new_log: log_delta.new_log.clone().unwrap().into(),
truncate_epoch: log_delta.truncate_epoch,
diff --git a/src/storage/src/hummock/iterator/ b/src/storage/src/hummock/iterator/
index a10981c35fe16..0fd227020a527 100644
--- a/src/storage/src/hummock/iterator/
+++ b/src/storage/src/hummock/iterator/
@@ -545,10 +545,10 @@ mod tests {
use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableStore};
use crate::memory::MemoryStateStore;
use crate::store::{
- ChangeLogValue, NewLocalOptions, OpConsistencyLevel, ReadLogOptions, StateStoreIter,
- StateStoreRead, CHECK_BYTES_EQUAL,
+ ChangeLogValue, NewLocalOptions, OpConsistencyLevel, ReadLogOptions, StateStoreReadLog,
- use crate::StateStore;
+ use crate::{StateStore, StateStoreIter};
async fn test_empty() {
diff --git a/src/storage/src/hummock/store/ b/src/storage/src/hummock/store/
index 0caae17c6c204..07aff03b4e804 100644
--- a/src/storage/src/hummock/store/
+++ b/src/storage/src/hummock/store/
@@ -79,7 +79,7 @@ impl Drop for HummockStorageShutdownGuard {
/// `HummockStorage` is the entry point of the Hummock state store backend.
-/// It implements the `StateStore` and `StateStoreRead` traits but not the `StateStoreWrite` trait
+/// It implements the `StateStore` and `StateStoreRead` traits but without any write method
/// since all writes should be done via `LocalHummockStorage` to ensure the single writer property
/// of hummock. `LocalHummockStorage` instance can be created via `new_local` call.
/// Hummock is the state store backend.
@@ -589,7 +589,6 @@ impl HummockStorage {
impl StateStoreRead for HummockStorage {
- type ChangeLogIter = ChangeLogIterator;
type Iter = HummockStorageIterator;
type RevIter = HummockStorageRevIterator;
@@ -635,6 +634,10 @@ impl StateStoreRead for HummockStorage {
self.rev_iter_inner(key_range, epoch, read_options)
+impl StateStoreReadLog for HummockStorage {
+ type ChangeLogIter = ChangeLogIterator;
async fn iter_log(
diff --git a/src/storage/src/hummock/store/ b/src/storage/src/hummock/store/
index 989eb878b0798..69952838e9829 100644
--- a/src/storage/src/hummock/store/
+++ b/src/storage/src/hummock/store/
@@ -32,7 +32,6 @@ use super::version::{StagingData, VersionUpdate};
use crate::error::StorageResult;
use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
-use crate::hummock::iterator::change_log::ChangeLogIterator;
use crate::hummock::iterator::{
Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
IteratorFactory, MergeIterator, UserIterator,
@@ -244,7 +243,6 @@ impl LocalHummockStorage {
impl StateStoreRead for LocalHummockStorage {
- type ChangeLogIter = ChangeLogIterator;
type Iter = HummockStorageIterator;
type RevIter = HummockStorageRevIterator;
@@ -279,20 +277,6 @@ impl StateStoreRead for LocalHummockStorage {
self.rev_iter_flushed(key_range, epoch, read_options)
- async fn iter_log(
- &self,
- epoch_range: (u64, u64),
- key_range: TableKeyRange,
- options: ReadLogOptions,
- ) -> StorageResult {
- let version =;
- let iter = self
- .hummock_version_reader
- .iter_log(version, epoch_range, key_range, options)
- .await?;
- Ok(iter)
- }
impl LocalStateStore for LocalHummockStorage {
diff --git a/src/storage/src/hummock/ b/src/storage/src/hummock/
index 6f42260bcc9c1..7f30f63d83e19 100644
--- a/src/storage/src/hummock/
+++ b/src/storage/src/hummock/
@@ -23,9 +23,11 @@ use std::time::{Duration, Instant};
use bytes::Bytes;
use foyer::CacheHint;
+use futures::{pin_mut, Stream, StreamExt};
use parking_lot::Mutex;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::config::StorageMemoryConfig;
+use risingwave_expr::codegen::try_stream;
use risingwave_hummock_sdk::can_concat;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::key::{
@@ -35,12 +37,12 @@ use risingwave_hummock_sdk::sstable_info::SstableInfo;
use tokio::sync::oneshot::{channel, Receiver, Sender};
use super::{HummockError, HummockResult, SstableStoreRef};
-use crate::error::StorageResult;
+use crate::error::{StorageError, StorageResult};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::CachePolicy;
use crate::mem_table::{KeyOp, MemTableError};
use crate::monitor::MemoryCollector;
-use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreRead};
+use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreKeyedRow, StateStoreRead};
pub fn range_overlap(
search_key_range: &R,
@@ -716,6 +718,91 @@ impl MemoryCollector for HummockMemoryCollector {
+#[try_stream(ok = StateStoreKeyedRow, error = StorageError)]
+pub(crate) async fn merge_stream<'a>(
+ mem_table_iter: impl Iterator- , &'a KeyOp)> + 'a,
+ inner_stream: impl Stream
- > + 'static,
+ table_id: TableId,
+ epoch: u64,
+ rev: bool,
+) {
+ let inner_stream = inner_stream.peekable();
+ pin_mut!(inner_stream);
+ let mut mem_table_iter = mem_table_iter.fuse().peekable();
+ loop {
+ match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) {
+ (None, None) => break,
+ // The mem table side has come to an end, return data from the shared storage.
+ (Some(_), None) => {
+ let (key, value) =;
+ yield (key, value)
+ }
+ // The stream side has come to an end, return data from the mem table.
+ (None, Some(_)) => {
+ let (key, key_op) =;
+ match key_op {
+ KeyOp::Insert(value) | KeyOp::Update((_, value)) => {
+ yield (FullKey::new(table_id, key.clone(), epoch), value.clone())
+ }
+ _ => {}
+ }
+ }
+ (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => {
+ debug_assert_eq!(inner_key.user_key.table_id, table_id);
+ let mut ret = inner_key.user_key.table_key.cmp(mem_table_key);
+ if rev {
+ ret = ret.reverse();
+ }
+ match ret {
+ Ordering::Less => {
+ // yield data from storage
+ let (key, value) =;
+ yield (key, value);
+ }
+ Ordering::Equal => {
+ // both memtable and storage contain the key, so we advance both
+ // iterators and return the data in memory.
+ let (_, key_op) =;
+ let (key, old_value_in_inner) =;
+ match key_op {
+ KeyOp::Insert(value) => {
+ yield (key.clone(), value.clone());
+ }
+ KeyOp::Delete(_) => {}
+ KeyOp::Update((old_value, new_value)) => {
+ debug_assert!(old_value == &old_value_in_inner);
+ yield (key, new_value.clone());
+ }
+ }
+ }
+ Ordering::Greater => {
+ // yield data from mem table
+ let (key, key_op) =;
+ match key_op {
+ KeyOp::Insert(value) => {
+ yield (FullKey::new(table_id, key.clone(), epoch), value.clone());
+ }
+ KeyOp::Delete(_) => {}
+ KeyOp::Update(_) => unreachable!(
+ "memtable update should always be paired with a storage key"
+ ),
+ }
+ }
+ }
+ }
+ (Some(Err(_)), Some(_)) => {
+ // Throw the error.
+ return Err(;
+ }
+ }
+ }
mod tests {
use std::future::{poll_fn, Future};
diff --git a/src/storage/src/ b/src/storage/src/
index 2694f1433c0fa..61807517a20a4 100644
--- a/src/storage/src/
+++ b/src/storage/src/
@@ -12,37 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::cmp::Ordering;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
-use std::future::Future;
-use std::ops::Bound::{Excluded, Included, Unbounded};
+use std::ops::Bound::{Included, Unbounded};
use std::ops::RangeBounds;
-use std::sync::Arc;
use bytes::Bytes;
-use futures::{pin_mut, Stream, StreamExt};
-use futures_async_stream::try_stream;
-use itertools::Itertools;
-use risingwave_common::bitmap::Bitmap;
-use risingwave_common::catalog::{TableId, TableOption};
-use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common_estimate_size::{EstimateSize, KvSize};
-use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange};
-use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
+use risingwave_hummock_sdk::key::TableKey;
use thiserror::Error;
use thiserror_ext::AsReport;
use tracing::error;
-use crate::error::{StorageError, StorageResult};
use crate::hummock::iterator::{Backward, Forward, FromRustIterator, RustIteratorBuilder};
use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
-use crate::hummock::utils::{
- do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
+use crate::hummock::utils::sanity_check_enabled;
use crate::hummock::value::HummockValue;
use crate::row_serde::value_serde::ValueRowSerde;
-use crate::storage_value::StorageValue;
use crate::store::*;
pub type ImmutableMemtable = SharedBufferBatch;
@@ -427,347 +413,6 @@ impl KeyOp {
-#[try_stream(ok = StateStoreKeyedRow, error = StorageError)]
-pub(crate) async fn merge_stream<'a>(
- mem_table_iter: impl Iterator
- , &'a KeyOp)> + 'a,
- inner_stream: impl Stream
- > + 'static,
- table_id: TableId,
- epoch: u64,
- rev: bool,
-) {
- let inner_stream = inner_stream.peekable();
- pin_mut!(inner_stream);
- let mut mem_table_iter = mem_table_iter.fuse().peekable();
- loop {
- match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) {
- (None, None) => break,
- // The mem table side has come to an end, return data from the shared storage.
- (Some(_), None) => {
- let (key, value) =;
- yield (key, value)
- }
- // The stream side has come to an end, return data from the mem table.
- (None, Some(_)) => {
- let (key, key_op) =;
- match key_op {
- KeyOp::Insert(value) | KeyOp::Update((_, value)) => {
- yield (FullKey::new(table_id, key.clone(), epoch), value.clone())
- }
- _ => {}
- }
- }
- (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => {
- debug_assert_eq!(inner_key.user_key.table_id, table_id);
- let mut ret = inner_key.user_key.table_key.cmp(mem_table_key);
- if rev {
- ret = ret.reverse();
- }
- match ret {
- Ordering::Less => {
- // yield data from storage
- let (key, value) =;
- yield (key, value);
- }
- Ordering::Equal => {
- // both memtable and storage contain the key, so we advance both
- // iterators and return the data in memory.
- let (_, key_op) =;
- let (key, old_value_in_inner) =;
- match key_op {
- KeyOp::Insert(value) => {
- yield (key.clone(), value.clone());
- }
- KeyOp::Delete(_) => {}
- KeyOp::Update((old_value, new_value)) => {
- debug_assert!(old_value == &old_value_in_inner);
- yield (key, new_value.clone());
- }
- }
- }
- Ordering::Greater => {
- // yield data from mem table
- let (key, key_op) =;
- match key_op {
- KeyOp::Insert(value) => {
- yield (FullKey::new(table_id, key.clone(), epoch), value.clone());
- }
- KeyOp::Delete(_) => {}
- KeyOp::Update(_) => unreachable!(
- "memtable update should always be paired with a storage key"
- ),
- }
- }
- }
- }
- (Some(Err(_)), Some(_)) => {
- // Throw the error.
- return Err(;
- }
- }
- }
-pub struct MemtableLocalStateStore {
- mem_table: MemTable,
- inner: S,
- epoch: Option,
- table_id: TableId,
- op_consistency_level: OpConsistencyLevel,
- table_option: TableOption,
- vnodes: Arc,
-impl MemtableLocalStateStore
- pub fn new(inner: S, option: NewLocalOptions) -> Self {
- Self {
- inner,
- mem_table: MemTable::new(option.op_consistency_level.clone()),
- epoch: None,
- table_id: option.table_id,
- op_consistency_level: option.op_consistency_level,
- table_option: option.table_option,
- vnodes: option.vnodes,
- }
- }
- pub fn inner(&self) -> &S {
- &self.inner
- }
-impl LocalStateStore for MemtableLocalStateStore {
- type Iter<'a> = impl StateStoreIter + 'a;
- type RevIter<'a> = impl StateStoreIter + 'a;
- async fn get(
- &self,
- key: TableKey,
- read_options: ReadOptions,
- ) -> StorageResult