Skip to content

Commit

Permalink
refactor: move create database to procedure (#3626)
Browse files Browse the repository at this point in the history
* refactor: move create database to procedure

* feat: enable database creation of rpc

* chore: update the commit hash of greptime-proto
  • Loading branch information
CookiePieWw authored Apr 8, 2024
1 parent 2ede968 commit c4798d1
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "06f6297ff3cab578a1589741b504342fbad70453" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1bd2398b686e5ac6c1eef6daf615867ce27f75c1" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu

pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
Expand Down
152 changes: 152 additions & 0 deletions src/common/meta/src/ddl/create_database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use strum::AsRefStr;

use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::schema_name::{SchemaNameKey, SchemaNameValue};
use crate::lock_key::{CatalogLock, SchemaLock};

pub struct CreateDatabaseProcedure {
pub context: DdlContext,
pub data: CreateDatabaseData,
}

impl CreateDatabaseProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateDatabase";

pub fn new(
catalog: String,
schema: String,
create_if_not_exists: bool,
options: Option<HashMap<String, String>>,
context: DdlContext,
) -> Self {
Self {
context,
data: CreateDatabaseData {
state: CreateDatabaseState::Prepare,
catalog,
schema,
create_if_not_exists,
options,
},
}
}

pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self { context, data })
}

pub async fn on_prepare(&mut self) -> Result<Status> {
let exists = self
.context
.table_metadata_manager
.schema_manager()
.exists(SchemaNameKey::new(&self.data.catalog, &self.data.schema))
.await?;

if exists && self.data.create_if_not_exists {
return Ok(Status::done());
}

ensure!(
!exists,
error::SchemaAlreadyExistsSnafu {
catalog: &self.data.catalog,
schema: &self.data.schema,
}
);

self.data.state = CreateDatabaseState::CreateMetadata;
Ok(Status::executing(true))
}

pub async fn on_create_metadata(&mut self) -> Result<Status> {
let value: Option<SchemaNameValue> = self
.data
.options
.as_ref()
.map(|hash_map_ref| hash_map_ref.try_into())
.transpose()?;

self.context
.table_metadata_manager
.schema_manager()
.create(
SchemaNameKey::new(&self.data.catalog, &self.data.schema),
value,
self.data.create_if_not_exists,
)
.await?;

Ok(Status::done())
}
}

#[async_trait]
impl Procedure for CreateDatabaseProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;

match state {
CreateDatabaseState::Prepare => self.on_prepare().await,
CreateDatabaseState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let lock_key = vec![
CatalogLock::Read(&self.data.catalog).into(),
SchemaLock::write(&self.data.catalog, &self.data.schema).into(),
];

LockKey::new(lock_key)
}
}

#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
pub enum CreateDatabaseState {
Prepare,
CreateMetadata,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CreateDatabaseData {
pub state: CreateDatabaseState,
pub catalog: String,
pub schema: String,
pub create_if_not_exists: bool,
pub options: Option<HashMap<String, String>>,
}
60 changes: 57 additions & 3 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_database::CreateDatabaseProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_database::DropDatabaseProcedure;
Expand All @@ -45,12 +46,12 @@ use crate::key::table_route::TableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::DdlTask::{
AlterLogicalTables, AlterTable, CreateLogicalTables, CreateTable, DropDatabase,
AlterLogicalTables, AlterTable, CreateDatabase, CreateLogicalTables, CreateTable, DropDatabase,
DropLogicalTables, DropTable, TruncateTable,
};
use crate::rpc::ddl::{
AlterTableTask, CreateTableTask, DropDatabaseTask, DropTableTask, SubmitDdlTaskRequest,
SubmitDdlTaskResponse, TruncateTableTask,
AlterTableTask, CreateDatabaseTask, CreateTableTask, DropDatabaseTask, DropTableTask,
SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::procedure;
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
Expand Down Expand Up @@ -170,6 +171,15 @@ impl DdlManager {
})
},
),
(
CreateDatabaseProcedure::TYPE_NAME,
&|context: DdlContext| -> BoxedProcedureLoader {
Box::new(move |json: &str| {
let context = context.clone();
CreateDatabaseProcedure::from_json(json, context).map(|p| Box::new(p) as _)
})
},
),
(
DropDatabaseProcedure::TYPE_NAME,
&|context: DdlContext| -> BoxedProcedureLoader {
Expand Down Expand Up @@ -293,6 +303,26 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
/// Submits and executes a create database task.
pub async fn submit_create_database(
&self,
_cluster_id: ClusterId,
CreateDatabaseTask {
catalog,
schema,
create_if_not_exists,
options,
}: CreateDatabaseTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure =
CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
/// Submits and executes a drop table task.
pub async fn submit_drop_database(
Expand Down Expand Up @@ -557,6 +587,27 @@ async fn handle_create_logical_table_tasks(
})
}

async fn handle_create_database_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
create_database_task: CreateDatabaseTask,
) -> Result<SubmitDdlTaskResponse> {
let (id, _) = ddl_manager
.submit_create_database(cluster_id, create_database_task.clone())
.await?;

let procedure_id = id.to_string();
info!(
"Database {}.{} is created via procedure_id {id:?}",
create_database_task.catalog, create_database_task.schema
);

Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
..Default::default()
})
}

async fn handle_drop_database_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
Expand Down Expand Up @@ -651,6 +702,9 @@ impl ProcedureExecutor for DdlManager {
handle_alter_logical_table_tasks(self, cluster_id, alter_table_tasks).await
}
DropLogicalTables(_) => todo!(),
CreateDatabase(create_database_task) => {
handle_create_database_task(self, cluster_id, create_database_task).await
}
DropDatabase(drop_database_task) => {
handle_drop_database_task(self, cluster_id, drop_database_task).await
}
Expand Down
Loading

0 comments on commit c4798d1

Please sign in to comment.