Skip to content

Commit

Permalink
feat: impl drop view (GreptimeTeam#4231)
Browse files Browse the repository at this point in the history
* feat: impl drop view

* fix: metric name

* fix: comments

* test: add DropViewProcedure test

* test: drop view meets a table

* test: update sqlness tests by drop view

* feat: apply suggestion from AI

* chore: apply suggestion

Co-authored-by: Jeremyhi <[email protected]>

* chore: apply suggestion

Co-authored-by: Jeremyhi <[email protected]>

* chore: apply suggestion

Co-authored-by: Jeremyhi <[email protected]>

* fix: TYPE_NAME for DropFlowProcedure

---------

Co-authored-by: Jeremyhi <[email protected]>
  • Loading branch information
killme2008 and fengjiachun authored Jul 11, 2024
1 parent 7ad248d commit ab22bba
Show file tree
Hide file tree
Showing 25 changed files with 830 additions and 137 deletions.
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub mod create_view;
pub mod drop_database;
pub mod drop_flow;
pub mod drop_table;
pub mod drop_view;
pub mod flow_meta;
mod physical_table_metadata;
pub mod table_meta;
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/drop_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct DropFlowProcedure {
}

impl DropFlowProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure:DropFlow";
pub const TYPE_NAME: &'static str = "metasrv-procedure::DropFlow";

pub fn new(cluster_id: ClusterId, task: DropFlowTask, context: DdlContext) -> Self {
Self {
Expand Down
242 changes: 242 additions & 0 deletions src/common/meta/src/ddl/drop_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId, TableType};
use table::table_reference::TableReference;

use super::utils::handle_retry_error;
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::rpc::ddl::DropViewTask;
use crate::{metrics, ClusterId};

/// The procedure for dropping a view.
pub struct DropViewProcedure {
/// The context of procedure runtime.
pub(crate) context: DdlContext,
/// The serializable data.
pub(crate) data: DropViewData,
}

impl DropViewProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::DropView";

pub fn new(cluster_id: ClusterId, task: DropViewTask, context: DdlContext) -> Self {
Self {
context,
data: DropViewData {
state: DropViewState::Prepare,
cluster_id,
task,
},
}
}

pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: DropViewData = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self { context, data })
}

#[cfg(test)]
pub(crate) fn state(&self) -> DropViewState {
self.data.state
}

/// Checks whether view exists.
/// - Early returns if view not exists and `drop_if_exists` is `true`.
/// - Throws an error if view not exists and `drop_if_exists` is `false`.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let table_ref = self.data.table_ref();

let exists = self
.context
.table_metadata_manager
.table_name_manager()
.exists(TableNameKey::new(
table_ref.catalog,
table_ref.schema,
table_ref.table,
))
.await?;

if !exists && self.data.task.drop_if_exists {
return Ok(Status::done());
}

ensure!(
exists,
error::ViewNotFoundSnafu {
view_name: table_ref.to_string(),
}
);

self.check_view_metadata().await?;
self.data.state = DropViewState::DeleteMetadata;

Ok(Status::executing(true))
}

async fn check_view_metadata(&mut self) -> Result<()> {
let view_id = self.data.view_id();
let table_info_value = self
.context
.table_metadata_manager
.table_info_manager()
.get(view_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: self.data.table_ref().to_string(),
})?;

self.ensure_is_view(&table_info_value.table_info)?;
self.ensure_view_info_exists(view_id).await?;

Ok(())
}

fn ensure_is_view(&self, table_info: &RawTableInfo) -> Result<()> {
ensure!(
table_info.table_type == TableType::View,
error::InvalidViewInfoSnafu {
err_msg: format!("{} is not a view", self.data.table_ref()),
}
);
Ok(())
}

async fn ensure_view_info_exists(&self, view_id: TableId) -> Result<()> {
self.context
.table_metadata_manager
.view_info_manager()
.get(view_id)
.await?
.with_context(|| error::ViewNotFoundSnafu {
view_name: self.data.table_ref().to_string(),
})?;
Ok(())
}

async fn on_delete_metadata(&mut self) -> Result<Status> {
let view_id = self.data.view_id();
self.context
.table_metadata_manager
.destroy_view_info(view_id, &self.data.table_ref().into())
.await?;

info!("Deleted view metadata for view {view_id}");

self.data.state = DropViewState::InvalidateViewCache;
Ok(Status::executing(true))
}

async fn on_broadcast(&mut self) -> Result<Status> {
let view_id = self.data.view_id();
let ctx = Context {
subject: Some("Invalidate view cache by dropping view".to_string()),
};

self.context
.cache_invalidator
.invalidate(
&ctx,
&[
CacheIdent::TableId(view_id),
CacheIdent::TableName(self.data.table_ref().into()),
],
)
.await?;

Ok(Status::done())
}
}

#[async_trait]
impl Procedure for DropViewProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_DROP_VIEW
.with_label_values(&[state.as_ref()])
.start_timer();

match self.data.state {
DropViewState::Prepare => self.on_prepare().await,
DropViewState::DeleteMetadata => self.on_delete_metadata().await,
DropViewState::InvalidateViewCache => self.on_broadcast().await,
}
.map_err(handle_retry_error)
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let table_ref = &self.data.table_ref();
let view_id = self.data.view_id();
let lock_key = vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableLock::Write(view_id).into(),
];

LockKey::new(lock_key)
}
}

/// The serializable data
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DropViewData {
state: DropViewState,
cluster_id: ClusterId,
task: DropViewTask,
}

impl DropViewData {
fn table_ref(&self) -> TableReference {
self.task.table_ref()
}

fn view_id(&self) -> TableId {
self.task.view_id
}
}

/// The state of drop view
#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq, Clone, Copy)]
pub(crate) enum DropViewState {
/// Prepares to drop the view
Prepare,
/// Deletes metadata
DeleteMetadata,
/// Invalidate view cache
InvalidateViewCache,
}
1 change: 1 addition & 0 deletions src/common/meta/src/ddl/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ pub(crate) mod create_view;
mod drop_database;
mod drop_flow;
mod drop_table;
mod drop_view;
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/tests/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::error::Error;
use crate::rpc::ddl::CreateViewTask;
use crate::test_util::{new_ddl_context, MockDatanodeManager};

fn test_table_names() -> HashSet<table::table_name::TableName> {
pub(crate) fn test_table_names() -> HashSet<table::table_name::TableName> {
let mut set = HashSet::new();
set.insert(table::table_name::TableName {
catalog_name: "greptime".to_string(),
Expand Down
Loading

0 comments on commit ab22bba

Please sign in to comment.