diff --git a/Cargo.lock b/Cargo.lock
index 11ff3881a9eb..e3d12bcd4247 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4597,7 +4597,7 @@ dependencies = [
 [[package]]
 name = "greptime-proto"
 version = "0.1.0"
-source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0b90ddc7eb2e99ce15d1d62c5d41f76a139c5c28#0b90ddc7eb2e99ce15d1d62c5d41f76a139c5c28"
+source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a875e976441188028353f7274a46a7e6e065c5d4#a875e976441188028353f7274a46a7e6e065c5d4"
 dependencies = [
  "prost 0.12.6",
  "serde",
@@ -11451,6 +11451,7 @@ dependencies = [
  "datafusion-sql",
  "datatypes",
  "hex",
+ "humantime",
  "iso8601",
  "itertools 0.10.5",
  "jsonb",
@@ -11460,6 +11461,7 @@ dependencies = [
  "snafu 0.8.5",
  "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
  "sqlparser_derive 0.1.1",
+ "store-api",
  "table",
 ]
 
diff --git a/Cargo.toml b/Cargo.toml
index e2fec19aa6c9..d8ba27ffdca5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -122,7 +122,7 @@ etcd-client = "0.13"
 fst = "0.4.7"
 futures = "0.3"
 futures-util = "0.3"
-greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0b90ddc7eb2e99ce15d1d62c5d41f76a139c5c28" }
+greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
 hex = "0.4"
 humantime = "2.1"
 humantime-serde = "1.1"
diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs
index 2cd338e1774a..59068fba7616 100644
--- a/src/api/src/helper.rs
+++ b/src/api/src/helper.rs
@@ -527,13 +527,14 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
     match request.expr {
         Some(Expr::CreateDatabase(_)) => "ddl.create_database",
         Some(Expr::CreateTable(_)) => "ddl.create_table",
-        Some(Expr::Alter(_)) => "ddl.alter",
+        Some(Expr::AlterTable(_)) => "ddl.alter_table",
         Some(Expr::DropTable(_)) => "ddl.drop_table",
         Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
         Some(Expr::CreateFlow(_)) => "ddl.create_flow",
         Some(Expr::DropFlow(_)) => "ddl.drop_flow",
         Some(Expr::CreateView(_)) => "ddl.create_view",
         Some(Expr::DropView(_)) => "ddl.drop_view",
+        Some(Expr::AlterDatabase(_)) => "ddl.alter_database",
         None => "ddl.empty",
     }
 }
diff --git a/src/catalog/src/system_schema/information_schema/schemata.rs b/src/catalog/src/system_schema/information_schema/schemata.rs
index 02d6e606e797..d13e958131b6 100644
--- a/src/catalog/src/system_schema/information_schema/schemata.rs
+++ b/src/catalog/src/system_schema/information_schema/schemata.rs
@@ -180,7 +180,7 @@ impl InformationSchemaSchemataBuilder {
                     .context(TableMetadataManagerSnafu)?
                     // information_schema is not available from this
                     // table_metadata_manager and we return None
-                    .map(|schema_opts| format!("{schema_opts}"))
+                    .map(|schema_opts| format!("{}", schema_opts.into_inner()))
             } else {
                 None
             };
diff --git a/src/client/src/database.rs b/src/client/src/database.rs
index a6223dc4cb5a..fe96b0a266ef 100644
--- a/src/client/src/database.rs
+++ b/src/client/src/database.rs
@@ -18,7 +18,7 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient;
 use api::v1::greptime_request::Request;
 use api::v1::query_request::Query;
 use api::v1::{
-    AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
+    AlterTableExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
     QueryRequest, RequestHeader,
 };
 use arrow_flight::Ticket;
@@ -211,9 +211,9 @@ impl Database {
         .await
     }
 
-    pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
+    pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
         self.do_get(Request::Ddl(DdlRequest {
-            expr: Some(DdlExpr::Alter(expr)),
+            expr: Some(DdlExpr::AlterTable(expr)),
         }))
         .await
     }
diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs
index 94cac67d6f91..4c4ad0905bb3 100644
--- a/src/common/grpc-expr/src/alter.rs
+++ b/src/common/grpc-expr/src/alter.rs
@@ -14,11 +14,11 @@
 
 use api::helper::ColumnDataTypeWrapper;
 use api::v1::add_column_location::LocationType;
-use api::v1::alter_expr::Kind;
+use api::v1::alter_table_expr::Kind;
 use api::v1::column_def::as_fulltext_option;
 use api::v1::{
-    column_def, AddColumnLocation as Location, AlterExpr, Analyzer, CreateTableExpr, DropColumns,
-    ModifyColumnTypes, RenameTable, SemanticType,
+    column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
+    DropColumns, ModifyColumnTypes, RenameTable, SemanticType,
 };
 use common_query::AddColumnLocation;
 use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema};
@@ -36,8 +36,8 @@ use crate::error::{
 const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
 const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
 
-/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
-pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<AlterTableRequest> {
+/// Convert an [`AlterTableExpr`] to an [`AlterTableRequest`]
+pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<AlterTableRequest> {
     let catalog_name = expr.catalog_name;
     let schema_name = expr.schema_name;
     let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
@@ -203,7 +203,7 @@ mod tests {
 
     #[test]
     fn test_alter_expr_to_request() {
-        let expr = AlterExpr {
+        let expr = AlterTableExpr {
             catalog_name: String::default(),
             schema_name: String::default(),
             table_name: "monitor".to_string(),
@@ -244,7 +244,7 @@ mod tests {
 
     #[test]
     fn test_alter_expr_with_location_to_request() {
-        let expr = AlterExpr {
+        let expr = AlterTableExpr {
             catalog_name: String::default(),
             schema_name: String::default(),
             table_name: "monitor".to_string(),
@@ -321,7 +321,7 @@ mod tests {
 
     #[test]
     fn test_modify_column_type_expr() {
-        let expr = AlterExpr {
+        let expr = AlterTableExpr {
             catalog_name: "test_catalog".to_string(),
             schema_name: "test_schema".to_string(),
             table_name: "monitor".to_string(),
@@ -355,7 +355,7 @@ mod tests {
 
     #[test]
     fn test_drop_column_expr() {
-        let expr = AlterExpr {
+        let expr = AlterTableExpr {
             catalog_name: "test_catalog".to_string(),
             schema_name: "test_schema".to_string(),
             table_name: "monitor".to_string(),
diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs
index 11654f04d6a3..0753ab51fccd 100644
--- a/src/common/meta/src/ddl.rs
+++ b/src/common/meta/src/ddl.rs
@@ -32,6 +32,7 @@ use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
 use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
 use crate::{ClusterId, DatanodeId};
 
+pub mod alter_database;
 pub mod alter_logical_tables;
 pub mod alter_table;
 pub mod create_database;
diff --git a/src/common/meta/src/ddl/alter_database.rs b/src/common/meta/src/ddl/alter_database.rs
new file mode 100644
index 000000000000..5e992a7d4e81
--- /dev/null
+++ b/src/common/meta/src/ddl/alter_database.rs
@@ -0,0 +1,248 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use async_trait::async_trait;
+use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
+use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
+use common_telemetry::tracing::info;
+use serde::{Deserialize, Serialize};
+use snafu::{ensure, ResultExt};
+use strum::AsRefStr;
+
+use super::utils::handle_retry_error;
+use crate::cache_invalidator::Context;
+use crate::ddl::DdlContext;
+use crate::error::{Result, SchemaNotFoundSnafu};
+use crate::instruction::CacheIdent;
+use crate::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
+use crate::key::DeserializedValueWithBytes;
+use crate::lock_key::{CatalogLock, SchemaLock};
+use crate::rpc::ddl::UnsetDatabaseOption::{self};
+use crate::rpc::ddl::{AlterDatabaseKind, AlterDatabaseTask, SetDatabaseOption};
+use crate::ClusterId;
+
+pub struct AlterDatabaseProcedure {
+    pub context: DdlContext,
+    pub data: AlterDatabaseData,
+}
+
+fn build_new_schema_value(
+    mut value: SchemaNameValue,
+    alter_kind: &AlterDatabaseKind,
+) -> Result<SchemaNameValue> {
+    match alter_kind {
+        AlterDatabaseKind::SetDatabaseOptions(options) => {
+            for option in options.0.iter() {
+                match option {
+                    SetDatabaseOption::Ttl(ttl) => {
+                        if ttl.is_zero() {
+                            value.ttl = None;
+                        } else {
+                            value.ttl = Some(*ttl);
+                        }
+                    }
+                }
+            }
+        }
+        AlterDatabaseKind::UnsetDatabaseOptions(keys) => {
+            for key in keys.0.iter() {
+                match key {
+                    UnsetDatabaseOption::Ttl => value.ttl = None,
+                }
+            }
+        }
+    }
+    Ok(value)
+}
+
+impl AlterDatabaseProcedure {
+    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterDatabase";
+
+    pub fn new(
+        cluster_id: ClusterId,
+        task: AlterDatabaseTask,
+        context: DdlContext,
+    ) -> Result<Self> {
+        Ok(Self {
+            context,
+            data: AlterDatabaseData::new(task, cluster_id)?,
+        })
+    }
+
+    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
+        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
+
+        Ok(Self { context, data })
+    }
+
+    pub async fn on_prepare(&mut self) -> Result<Status> {
+        let value = self
+            .context
+            .table_metadata_manager
+            .schema_manager()
+            .get(SchemaNameKey::new(self.data.catalog(), self.data.schema()))
+            .await?;
+
+        ensure!(
+            value.is_some(),
+            SchemaNotFoundSnafu {
+                table_schema: self.data.schema(),
+            }
+        );
+
+        self.data.schema_value = value;
+        self.data.state = AlterDatabaseState::UpdateMetadata;
+
+        Ok(Status::executing(true))
+    }
+
+    pub async fn on_update_metadata(&mut self) -> Result<Status> {
+        let schema_name = SchemaNameKey::new(self.data.catalog(), self.data.schema());
+
+        // Safety: schema_value is not None.
+        let current_schema_value = self.data.schema_value.as_ref().unwrap();
+
+        let new_schema_value = build_new_schema_value(
+            current_schema_value.get_inner_ref().clone(),
+            &self.data.kind,
+        )?;
+
+        self.context
+            .table_metadata_manager
+            .schema_manager()
+            .update(schema_name, current_schema_value, &new_schema_value)
+            .await?;
+
+        info!("Updated database metadata for schema {schema_name}");
+        self.data.state = AlterDatabaseState::InvalidateSchemaCache;
+        Ok(Status::executing(true))
+    }
+
+    pub async fn on_invalidate_schema_cache(&mut self) -> Result<Status> {
+        let cache_invalidator = &self.context.cache_invalidator;
+        cache_invalidator
+            .invalidate(
+                &Context::default(),
+                &[CacheIdent::SchemaName(SchemaName {
+                    catalog_name: self.data.catalog().to_string(),
+                    schema_name: self.data.schema().to_string(),
+                })],
+            )
+            .await?;
+
+        Ok(Status::done())
+    }
+}
+
+#[async_trait]
+impl Procedure for AlterDatabaseProcedure {
+    fn type_name(&self) -> &str {
+        Self::TYPE_NAME
+    }
+
+    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
+        match self.data.state {
+            AlterDatabaseState::Prepare => self.on_prepare().await,
+            AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
+            AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
+        }
+        .map_err(handle_retry_error)
+    }
+
+    fn dump(&self) -> ProcedureResult<String> {
+        serde_json::to_string(&self.data).context(ToJsonSnafu)
+    }
+
+    fn lock_key(&self) -> LockKey {
+        let catalog = self.data.catalog();
+        let schema = self.data.schema();
+
+        let lock_key = vec![
+            CatalogLock::Read(catalog).into(),
+            SchemaLock::write(catalog, schema).into(),
+        ];
+
+        LockKey::new(lock_key)
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize, AsRefStr)]
+enum AlterDatabaseState {
+    Prepare,
+    UpdateMetadata,
+    InvalidateSchemaCache,
+}
+
+/// The data of alter database procedure.
+#[derive(Debug, Serialize, Deserialize)]
+pub struct AlterDatabaseData {
+    cluster_id: ClusterId,
+    state: AlterDatabaseState,
+    kind: AlterDatabaseKind,
+    catalog_name: String,
+    schema_name: String,
+    schema_value: Option<DeserializedValueWithBytes<SchemaNameValue>>,
+}
+
+impl AlterDatabaseData {
+    pub fn new(task: AlterDatabaseTask, cluster_id: ClusterId) -> Result<Self> {
+        Ok(Self {
+            cluster_id,
+            state: AlterDatabaseState::Prepare,
+            kind: AlterDatabaseKind::try_from(task.alter_expr.kind.unwrap())?,
+            catalog_name: task.alter_expr.catalog_name,
+            schema_name: task.alter_expr.schema_name,
+            schema_value: None,
+        })
+    }
+
+    pub fn catalog(&self) -> &str {
+        &self.catalog_name
+    }
+
+    pub fn schema(&self) -> &str {
+        &self.schema_name
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::time::Duration;
+
+    use crate::ddl::alter_database::build_new_schema_value;
+    use crate::key::schema_name::SchemaNameValue;
+    use crate::rpc::ddl::{
+        AlterDatabaseKind, SetDatabaseOption, SetDatabaseOptions, UnsetDatabaseOption,
+        UnsetDatabaseOptions,
+    };
+
+    #[test]
+    fn test_build_new_schema_value() {
+        let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
+            SetDatabaseOption::Ttl(Duration::from_secs(10)),
+        ]));
+        let current_schema_value = SchemaNameValue::default();
+        let new_schema_value =
+            build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
+        assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10)));
+
+        let unset_ttl_alter_kind =
+            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
+                UnsetDatabaseOption::Ttl,
+            ]));
+        let new_schema_value =
+            build_new_schema_value(current_schema_value, &unset_ttl_alter_kind).unwrap();
+        assert_eq!(new_schema_value.ttl, None);
+    }
+}
diff --git a/src/common/meta/src/ddl/alter_logical_tables/check.rs b/src/common/meta/src/ddl/alter_logical_tables/check.rs
index d0d4b8ef3945..a80ef3cd8cdb 100644
--- a/src/common/meta/src/ddl/alter_logical_tables/check.rs
+++ b/src/common/meta/src/ddl/alter_logical_tables/check.rs
@@ -14,7 +14,7 @@
 
 use std::collections::HashSet;
 
-use api::v1::alter_expr::Kind;
+use api::v1::alter_table_expr::Kind;
 use snafu::{ensure, OptionExt};
 
 use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
diff --git a/src/common/meta/src/ddl/alter_logical_tables/region_request.rs b/src/common/meta/src/ddl/alter_logical_tables/region_request.rs
index 012cecf12d7a..f4e2dd7099a5 100644
--- a/src/common/meta/src/ddl/alter_logical_tables/region_request.rs
+++ b/src/common/meta/src/ddl/alter_logical_tables/region_request.rs
@@ -13,7 +13,7 @@
 // limitations under the License.
 
 use api::v1;
-use api::v1::alter_expr::Kind;
+use api::v1::alter_table_expr::Kind;
 use api::v1::region::{
     alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
     RegionColumnDef, RegionRequest, RegionRequestHeader,
diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs
index ae0d6dc6bd4f..e745af900f24 100644
--- a/src/common/meta/src/ddl/alter_table.rs
+++ b/src/common/meta/src/ddl/alter_table.rs
@@ -19,7 +19,7 @@ mod update_metadata;
 
 use std::vec;
 
-use api::v1::alter_expr::Kind;
+use api::v1::alter_table_expr::Kind;
 use api::v1::RenameTable;
 use async_trait::async_trait;
 use common_error::ext::ErrorExt;
diff --git a/src/common/meta/src/ddl/alter_table/check.rs b/src/common/meta/src/ddl/alter_table/check.rs
index 2bf44f96d17e..5be40ac3e2e0 100644
--- a/src/common/meta/src/ddl/alter_table/check.rs
+++ b/src/common/meta/src/ddl/alter_table/check.rs
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use api::v1::alter_expr::Kind;
+use api::v1::alter_table_expr::Kind;
 use api::v1::RenameTable;
 use common_catalog::format_full_table_name;
 use snafu::ensure;
diff --git a/src/common/meta/src/ddl/alter_table/region_request.rs b/src/common/meta/src/ddl/alter_table/region_request.rs
index 4583a78faaeb..fb700335aaf8 100644
--- a/src/common/meta/src/ddl/alter_table/region_request.rs
+++ b/src/common/meta/src/ddl/alter_table/region_request.rs
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use api::v1::alter_expr::Kind;
+use api::v1::alter_table_expr::Kind;
 use api::v1::region::region_request::Body;
 use api::v1::region::{
     alter_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns, RegionColumnDef,
@@ -121,11 +121,11 @@ mod tests {
     use std::sync::Arc;
 
     use api::v1::add_column_location::LocationType;
-    use api::v1::alter_expr::Kind;
+    use api::v1::alter_table_expr::Kind;
     use api::v1::region::region_request::Body;
     use api::v1::region::RegionColumnDef;
     use api::v1::{
-        region, AddColumn, AddColumnLocation, AddColumns, AlterExpr, ColumnDataType,
+        region, AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
         ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType,
     };
     use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -215,7 +215,7 @@ mod tests {
             prepare_ddl_context().await;
 
         let task = AlterTableTask {
-            alter_table: AlterExpr {
+            alter_table: AlterTableExpr {
                 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
                 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
                 table_name,
@@ -282,7 +282,7 @@ mod tests {
             prepare_ddl_context().await;
 
         let task = AlterTableTask {
-            alter_table: AlterExpr {
+            alter_table: AlterTableExpr {
                 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
                 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
                 table_name,
diff --git a/src/common/meta/src/ddl/test_util/alter_table.rs b/src/common/meta/src/ddl/test_util/alter_table.rs
index f212a3cdffea..0274256d2da6 100644
--- a/src/common/meta/src/ddl/test_util/alter_table.rs
+++ b/src/common/meta/src/ddl/test_util/alter_table.rs
@@ -12,8 +12,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use api::v1::alter_expr::Kind;
-use api::v1::{AddColumn, AddColumns, AlterExpr, ColumnDef, RenameTable};
+use api::v1::alter_table_expr::Kind;
+use api::v1::{AddColumn, AddColumns, AlterTableExpr, ColumnDef, RenameTable};
 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
 use derive_builder::Builder;
 
@@ -32,7 +32,7 @@ pub struct TestAlterTableExpr {
     new_table_name: Option<String>,
 }
 
-impl From<TestAlterTableExpr> for AlterExpr {
+impl From<TestAlterTableExpr> for AlterTableExpr {
     fn from(value: TestAlterTableExpr) -> Self {
         if let Some(new_table_name) = value.new_table_name {
             Self {
diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs
index 7874c3e79835..b065f56d4529 100644
--- a/src/common/meta/src/ddl/tests/alter_table.rs
+++ b/src/common/meta/src/ddl/tests/alter_table.rs
@@ -16,11 +16,11 @@ use std::assert_matches::assert_matches;
 use std::collections::HashMap;
 use std::sync::Arc;
 
-use api::v1::alter_expr::Kind;
+use api::v1::alter_table_expr::Kind;
 use api::v1::region::{region_request, RegionRequest};
 use api::v1::{
-    AddColumn, AddColumns, AlterExpr, ColumnDataType, ColumnDef as PbColumnDef, DropColumn,
-    DropColumns, SemanticType, SetTableOptions, TableOption,
+    AddColumn, AddColumns, AlterTableExpr, ColumnDataType, ColumnDef as PbColumnDef, DropColumn,
+    DropColumns, SemanticType, SetTableOptions,
 };
 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
 use common_error::ext::ErrorExt;
@@ -133,7 +133,7 @@ async fn test_on_submit_alter_request() {
         .unwrap();
 
     let alter_table_task = AlterTableTask {
-        alter_table: AlterExpr {
+        alter_table: AlterTableExpr {
             catalog_name: DEFAULT_CATALOG_NAME.to_string(),
             schema_name: DEFAULT_SCHEMA_NAME.to_string(),
             table_name: table_name.to_string(),
@@ -219,7 +219,7 @@ async fn test_on_submit_alter_request_with_outdated_request() {
         .unwrap();
 
     let alter_table_task = AlterTableTask {
-        alter_table: AlterExpr {
+        alter_table: AlterTableExpr {
             catalog_name: DEFAULT_CATALOG_NAME.to_string(),
             schema_name: DEFAULT_SCHEMA_NAME.to_string(),
             table_name: table_name.to_string(),
@@ -316,7 +316,7 @@ async fn test_on_update_metadata_add_columns() {
         .unwrap();
 
     let task = AlterTableTask {
-        alter_table: AlterExpr {
+        alter_table: AlterTableExpr {
             catalog_name: DEFAULT_CATALOG_NAME.to_string(),
             schema_name: DEFAULT_SCHEMA_NAME.to_string(),
             table_name: table_name.to_string(),
@@ -385,12 +385,12 @@ async fn test_on_update_table_options() {
         .unwrap();
 
     let task = AlterTableTask {
-        alter_table: AlterExpr {
+        alter_table: AlterTableExpr {
             catalog_name: DEFAULT_CATALOG_NAME.to_string(),
             schema_name: DEFAULT_SCHEMA_NAME.to_string(),
             table_name: table_name.to_string(),
             kind: Some(Kind::SetTableOptions(SetTableOptions {
-                table_options: vec![TableOption {
+                table_options: vec![api::v1::Option {
                     key: TTL_KEY.to_string(),
                     value: "1d".to_string(),
                 }],
diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs
index 617a8c574d7e..bac640d401a6 100644
--- a/src/common/meta/src/ddl_manager.rs
+++ b/src/common/meta/src/ddl_manager.rs
@@ -24,6 +24,7 @@ use derive_builder::Builder;
 use snafu::{ensure, OptionExt, ResultExt};
 use store_api::storage::TableId;
 
+use crate::ddl::alter_database::AlterDatabaseProcedure;
 use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
 use crate::ddl::alter_table::AlterTableProcedure;
 use crate::ddl::create_database::CreateDatabaseProcedure;
@@ -47,12 +48,13 @@ use crate::key::table_info::TableInfoValue;
 use crate::key::table_name::TableNameKey;
 use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
 use crate::rpc::ddl::DdlTask::{
-    AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable,
-    CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView, TruncateTable,
+    AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
+    CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
+    TruncateTable,
 };
 use crate::rpc::ddl::{
-    AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, CreateViewTask,
-    DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
+    AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
+    CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
     SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
 };
 use crate::rpc::procedure;
@@ -129,6 +131,7 @@ impl DdlManager {
             CreateFlowProcedure,
             AlterTableProcedure,
             AlterLogicalTablesProcedure,
+            AlterDatabaseProcedure,
             DropTableProcedure,
             DropFlowProcedure,
             TruncateTableProcedure,
@@ -294,6 +297,18 @@ impl DdlManager {
         self.submit_procedure(procedure_with_id).await
     }
 
+    pub async fn submit_alter_database(
+        &self,
+        cluster_id: ClusterId,
+        alter_database_task: AlterDatabaseTask,
+    ) -> Result<(ProcedureId, Option<Output>)> {
+        let context = self.create_context();
+        let procedure = AlterDatabaseProcedure::new(cluster_id, alter_database_task, context)?;
+        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
+
+        self.submit_procedure(procedure_with_id).await
+    }
+
     /// Submits and executes a create flow task.
     #[tracing::instrument(skip_all)]
     pub async fn submit_create_flow_task(
@@ -593,6 +608,28 @@ async fn handle_drop_database_task(
     })
 }
 
+async fn handle_alter_database_task(
+    ddl_manager: &DdlManager,
+    cluster_id: ClusterId,
+    alter_database_task: AlterDatabaseTask,
+) -> Result<SubmitDdlTaskResponse> {
+    let (id, _) = ddl_manager
+        .submit_alter_database(cluster_id, alter_database_task.clone())
+        .await?;
+
+    let procedure_id = id.to_string();
+    info!(
+        "Database {}.{} is altered via procedure_id {id:?}",
+        alter_database_task.catalog(),
+        alter_database_task.schema()
+    );
+
+    Ok(SubmitDdlTaskResponse {
+        key: procedure_id.into(),
+        ..Default::default()
+    })
+}
+
 async fn handle_drop_flow_task(
     ddl_manager: &DdlManager,
     cluster_id: ClusterId,
@@ -779,6 +816,9 @@ impl ProcedureExecutor for DdlManager {
                 DropDatabase(drop_database_task) => {
                     handle_drop_database_task(self, cluster_id, drop_database_task).await
                 }
+                AlterDatabase(alter_database_task) => {
+                    handle_alter_database_task(self, cluster_id, alter_database_task).await
+                }
                 CreateFlow(create_flow_task) => {
                     handle_create_flow_task(
                         self,
diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs
index 51dc74e64614..855e112645de 100644
--- a/src/common/meta/src/error.rs
+++ b/src/common/meta/src/error.rs
@@ -593,6 +593,21 @@ pub enum Error {
         location: Location,
     },
 
+    #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
+    InvalidSetDatabaseOption {
+        key: String,
+        value: String,
+        #[snafu(implicit)]
+        location: Location,
+    },
+
+    #[snafu(display("Invalid unset database option, key: {}", key))]
+    InvalidUnsetDatabaseOption {
+        key: String,
+        #[snafu(implicit)]
+        location: Location,
+    },
+
     #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
     MismatchPrefix {
         prefix: String,
@@ -730,7 +745,9 @@ impl ErrorExt for Error {
             | AlterLogicalTablesInvalidArguments { .. }
             | CreateLogicalTablesInvalidArguments { .. }
             | MismatchPrefix { .. }
-            | TlsConfig { .. } => StatusCode::InvalidArguments,
+            | TlsConfig { .. }
+            | InvalidSetDatabaseOption { .. }
+            | InvalidUnsetDatabaseOption { .. } => StatusCode::InvalidArguments,
 
             FlowNotFound { .. } => StatusCode::FlowNotFound,
             FlowRouteNotFound { .. } => StatusCode::Unexpected,
diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs
index aa88aa935ddf..b6bdf6189c79 100644
--- a/src/common/meta/src/key.rs
+++ b/src/common/meta/src/key.rs
@@ -565,13 +565,13 @@ impl TableMetadataManager {
             let mut set = TxnOpGetResponseSet::from(&mut r.responses);
             let remote_table_info = on_create_table_info_failure(&mut set)?
                 .context(error::UnexpectedSnafu {
-                    err_msg: "Reads the empty table info during the create table metadata",
+                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
                 })?
                 .into_inner();
 
             let remote_view_info = on_create_view_info_failure(&mut set)?
                 .context(error::UnexpectedSnafu {
-                    err_msg: "Reads the empty view info during the create view info",
+                    err_msg: "Reads the empty view info in comparing operation of creating view metadata",
                 })?
                 .into_inner();
 
@@ -644,13 +644,13 @@ impl TableMetadataManager {
             let mut set = TxnOpGetResponseSet::from(&mut r.responses);
             let remote_table_info = on_create_table_info_failure(&mut set)?
                 .context(error::UnexpectedSnafu {
-                    err_msg: "Reads the empty table info during the create table metadata",
+                    err_msg: "Reads the empty table info in comparing operation of creating table metadata",
                 })?
                 .into_inner();
 
             let remote_table_route = on_create_table_route_failure(&mut set)?
                 .context(error::UnexpectedSnafu {
-                    err_msg: "Reads the empty table route during the create table metadata",
+                    err_msg: "Reads the empty table route in comparing operation of creating table metadata",
                 })?
                 .into_inner();
 
@@ -731,13 +731,13 @@ impl TableMetadataManager {
             for on_failure in on_failures {
                 let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
                     .context(error::UnexpectedSnafu {
-                        err_msg: "Reads the empty table info during the create table metadata",
+                        err_msg: "Reads the empty table info in comparing operation of creating table metadata",
                     })?
                     .into_inner();
 
                 let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
                     .context(error::UnexpectedSnafu {
-                        err_msg: "Reads the empty table route during the create table metadata",
+                        err_msg: "Reads the empty table route in comparing operation of creating table metadata",
                     })?
                     .into_inner();
 
@@ -915,7 +915,7 @@ impl TableMetadataManager {
             let mut set = TxnOpGetResponseSet::from(&mut r.responses);
             let remote_table_info = on_update_table_info_failure(&mut set)?
                 .context(error::UnexpectedSnafu {
-                    err_msg: "Reads the empty table info during the rename table metadata",
+                    err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
                 })?
                 .into_inner();
 
@@ -961,7 +961,7 @@ impl TableMetadataManager {
             let mut set = TxnOpGetResponseSet::from(&mut r.responses);
             let remote_table_info = on_update_table_info_failure(&mut set)?
                 .context(error::UnexpectedSnafu {
-                    err_msg: "Reads the empty table info during the updating table info",
+                    err_msg: "Reads the empty table info in comparing operation of the updating table info",
                 })?
                 .into_inner();
 
@@ -1012,7 +1012,7 @@ impl TableMetadataManager {
             let mut set = TxnOpGetResponseSet::from(&mut r.responses);
             let remote_view_info = on_update_view_info_failure(&mut set)?
                 .context(error::UnexpectedSnafu {
-                    err_msg: "Reads the empty view info during the updating view info",
+                    err_msg: "Reads the empty view info in comparing operation of the updating view info",
                 })?
                 .into_inner();
 
@@ -1069,7 +1069,7 @@ impl TableMetadataManager {
             for on_failure in on_failures {
                 let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
                     .context(error::UnexpectedSnafu {
-                        err_msg: "Reads the empty table info during the updating table info",
+                        err_msg: "Reads the empty table info in comparing operation of the updating table info",
                     })?
                     .into_inner();
 
@@ -1121,7 +1121,7 @@ impl TableMetadataManager {
             let mut set = TxnOpGetResponseSet::from(&mut r.responses);
             let remote_table_route = on_update_table_route_failure(&mut set)?
                 .context(error::UnexpectedSnafu {
-                    err_msg: "Reads the empty table route during the updating table route",
+                    err_msg: "Reads the empty table route in comparing operation of the updating table route",
                 })?
                 .into_inner();
 
@@ -1173,7 +1173,7 @@ impl TableMetadataManager {
             let mut set = TxnOpGetResponseSet::from(&mut r.responses);
             let remote_table_route = on_update_table_route_failure(&mut set)?
                 .context(error::UnexpectedSnafu {
-                    err_msg: "Reads the empty table route during the updating leader region status",
+                    err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
                 })?
                 .into_inner();
 
@@ -1261,7 +1261,8 @@ impl_metadata_value! {
     FlowNameValue,
     FlowRouteValue,
     TableFlowValue,
-    NodeAddressValue
+    NodeAddressValue,
+    SchemaNameValue
 }
 
 impl_optional_metadata_value! {
diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs
index 24fed99b8fc5..9715aab1fde1 100644
--- a/src/common/meta/src/key/flow.rs
+++ b/src/common/meta/src/key/flow.rs
@@ -197,7 +197,7 @@ impl FlowMetadataManager {
                 on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
                     error::UnexpectedSnafu {
                         err_msg: format!(
-                    "Reads the empty flow name during the creating flow, flow_id: {flow_id}"
+                    "Reads the empty flow name in comparing operation of the creating flow, flow_id: {flow_id}"
                 ),
                     }
                 })?;
@@ -220,7 +220,7 @@ impl FlowMetadataManager {
             let remote_flow =
                 on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
                     err_msg: format!(
-                        "Reads the empty flow during the creating flow, flow_id: {flow_id}"
+                        "Reads the empty flow in comparing operation of creating flow, flow_id: {flow_id}"
                     ),
                 })?;
             let op_name = "creating flow";
@@ -288,7 +288,7 @@ impl FlowMetadataManager {
                 on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
                     error::UnexpectedSnafu {
                         err_msg: format!(
-                        "Reads the empty flow name during the updating flow, flow_id: {flow_id}"
+                        "Reads the empty flow name in comparing operation of the updating flow, flow_id: {flow_id}"
                     ),
                     }
                 })?;
@@ -316,7 +316,7 @@ impl FlowMetadataManager {
             let remote_flow =
                 on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
                     err_msg: format!(
-                        "Reads the empty flow during the updating flow, flow_id: {flow_id}"
+                        "Reads the empty flow in comparing operation of the updating flow, flow_id: {flow_id}"
                     ),
                 })?;
             let op_name = "updating flow";
diff --git a/src/common/meta/src/key/schema_metadata_manager.rs b/src/common/meta/src/key/schema_metadata_manager.rs
index 6ee3a3112d5d..23ad86d2ab45 100644
--- a/src/common/meta/src/key/schema_metadata_manager.rs
+++ b/src/common/meta/src/key/schema_metadata_manager.rs
@@ -75,7 +75,10 @@ impl SchemaMetadataManager {
             &table_info.table_info.catalog_name,
             &table_info.table_info.schema_name,
         );
-        self.schema_manager.get(key).await
+        self.schema_manager
+            .get(key)
+            .await
+            .map(|v| v.map(|v| v.into_inner()))
     }
 
     #[cfg(any(test, feature = "testing"))]
diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs
index 753b865b5362..15cb37c4d1a8 100644
--- a/src/common/meta/src/key/schema_name.rs
+++ b/src/common/meta/src/key/schema_name.rs
@@ -21,10 +21,14 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
 use futures::stream::BoxStream;
 use humantime_serde::re::humantime;
 use serde::{Deserialize, Serialize};
-use snafu::{OptionExt, ResultExt};
+use snafu::{ensure, OptionExt, ResultExt};
 
+use super::txn_helper::TxnOpGetResponseSet;
+use super::DeserializedValueWithBytes;
+use crate::ensure_values;
 use crate::error::{self, Error, InvalidMetadataSnafu, ParseOptionSnafu, Result};
 use crate::key::{MetadataKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX};
+use crate::kv_backend::txn::Txn;
 use crate::kv_backend::KvBackendRef;
 use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
 use crate::rpc::store::RangeRequest;
@@ -171,6 +175,8 @@ pub struct SchemaManager {
     kv_backend: KvBackendRef,
 }
 
+pub type SchemaNameDecodeResult = Result<Option<DeserializedValueWithBytes<SchemaNameValue>>>;
+
 impl SchemaManager {
     pub fn new(kv_backend: KvBackendRef) -> Self {
         Self { kv_backend }
@@ -204,11 +210,15 @@ impl SchemaManager {
         self.kv_backend.exists(&raw_key).await
     }
 
-    pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result<Option<SchemaNameValue>> {
+    pub async fn get(
+        &self,
+        schema: SchemaNameKey<'_>,
+    ) -> Result<Option<DeserializedValueWithBytes<SchemaNameValue>>> {
         let raw_key = schema.to_bytes();
-        let value = self.kv_backend.get(&raw_key).await?;
-        value
-            .and_then(|v| SchemaNameValue::try_from_raw_value(v.value.as_ref()).transpose())
+        self.kv_backend
+            .get(&raw_key)
+            .await?
+            .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
             .transpose()
     }
 
@@ -220,6 +230,54 @@ impl SchemaManager {
         Ok(())
     }
 
+    pub(crate) fn build_update_txn(
+        &self,
+        schema: SchemaNameKey<'_>,
+        current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
+        new_schema_value: &SchemaNameValue,
+    ) -> Result<(
+        Txn,
+        impl FnOnce(&mut TxnOpGetResponseSet) -> SchemaNameDecodeResult,
+    )> {
+        let raw_key = schema.to_bytes();
+        let raw_value = current_schema_value.get_raw_bytes();
+        let new_raw_value: Vec<u8> = new_schema_value.try_as_raw_value()?;
+
+        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
+
+        Ok((
+            txn,
+            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
+        ))
+    }
+
+    /// Updates a [SchemaNameKey].
+    pub async fn update(
+        &self,
+        schema: SchemaNameKey<'_>,
+        current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
+        new_schema_value: &SchemaNameValue,
+    ) -> Result<()> {
+        let (txn, on_failure) =
+            self.build_update_txn(schema, current_schema_value, new_schema_value)?;
+        let mut r = self.kv_backend.txn(txn).await?;
+
+        if !r.succeeded {
+            let mut set = TxnOpGetResponseSet::from(&mut r.responses);
+            let remote_schema_value = on_failure(&mut set)?
+                .context(error::UnexpectedSnafu {
+                    err_msg:
+                        "Reads the empty schema name value in comparing operation of updating schema name value",
+                })?
+                .into_inner();
+
+            let op_name = "the updating schema name value";
+            ensure_values!(&remote_schema_value, new_schema_value, op_name);
+        }
+
+        Ok(())
+    }
+
     /// Returns a schema stream, it lists all schemas belong to the target `catalog`.
     pub fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result<String>> {
         let start_key = SchemaNameKey::range_start_key(catalog);
@@ -306,4 +364,42 @@ mod tests {
 
         assert!(!manager.exists(wrong_schema_key).await.unwrap());
     }
+
+    #[tokio::test]
+    async fn test_update_schema_value() {
+        let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
+        let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
+        manager.create(schema_key, None, false).await.unwrap();
+
+        let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
+        let new_schema_value = SchemaNameValue {
+            ttl: Some(Duration::from_secs(10)),
+        };
+        manager
+            .update(schema_key, &current_schema_value, &new_schema_value)
+            .await
+            .unwrap();
+
+        // Update with the same value, should be ok
+        manager
+            .update(schema_key, &current_schema_value, &new_schema_value)
+            .await
+            .unwrap();
+
+        let new_schema_value = SchemaNameValue {
+            ttl: Some(Duration::from_secs(40)),
+        };
+        let incorrect_schema_value = SchemaNameValue {
+            ttl: Some(Duration::from_secs(20)),
+        }
+        .try_as_raw_value()
+        .unwrap();
+        let incorrect_schema_value =
+            DeserializedValueWithBytes::from_inner_slice(&incorrect_schema_value).unwrap();
+
+        manager
+            .update(schema_key, &incorrect_schema_value, &new_schema_value)
+            .await
+            .unwrap_err();
+    }
 }
diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs
index ea14a1a5af45..562ecb8ee660 100644
--- a/src/common/meta/src/rpc/ddl.rs
+++ b/src/common/meta/src/rpc/ddl.rs
@@ -14,25 +14,29 @@
 
 use std::collections::{HashMap, HashSet};
 use std::result;
+use std::time::Duration;
 
+use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
 use api::v1::meta::ddl_task_request::Task;
 use api::v1::meta::{
-    AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks,
-    CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
-    CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
-    CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
-    DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
-    DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
-    DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
+    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
+    AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
+    CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
+    CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
+    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
+    DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
+    DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
+    DropViewTask as PbDropViewTask, Partition, ProcedureId,
     TruncateTableTask as PbTruncateTableTask,
 };
 use api::v1::{
-    AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
-    DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
-    QueryContext as PbQueryContext, TruncateTableExpr,
+    AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
+    CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
+    Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
 };
 use base64::engine::general_purpose;
 use base64::Engine as _;
+use humantime_serde::re::humantime;
 use prost::Message;
 use serde::{Deserialize, Serialize};
 use serde_with::{serde_as, DefaultOnNull};
@@ -42,7 +46,7 @@ use table::metadata::{RawTableInfo, TableId};
 use table::table_name::TableName;
 use table::table_reference::TableReference;
 
-use crate::error::{self, Result};
+use crate::error::{self, InvalidSetDatabaseOptionSnafu, InvalidUnsetDatabaseOptionSnafu, Result};
 use crate::key::FlowId;
 
 /// DDL tasks
@@ -57,6 +61,7 @@ pub enum DdlTask {
     AlterLogicalTables(Vec<AlterTableTask>),
     CreateDatabase(CreateDatabaseTask),
     DropDatabase(DropDatabaseTask),
+    AlterDatabase(AlterDatabaseTask),
     CreateFlow(CreateFlowTask),
     DropFlow(DropFlowTask),
     CreateView(CreateViewTask),
@@ -99,7 +104,7 @@ impl DdlTask {
     }
 
     /// Creates a [`DdlTask`] to alter several logical tables.
-    pub fn new_alter_logical_tables(table_data: Vec<AlterExpr>) -> Self {
+    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
         DdlTask::AlterLogicalTables(
             table_data
                 .into_iter()
@@ -149,8 +154,13 @@ impl DdlTask {
         })
     }
 
+    /// Creates a [`DdlTask`] to alter a database.
+    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
+        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
+    }
+
     /// Creates a [`DdlTask`] to alter a table.
-    pub fn new_alter_table(alter_table: AlterExpr) -> Self {
+    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
         DdlTask::AlterTable(AlterTableTask { alter_table })
     }
 
@@ -223,6 +233,9 @@ impl TryFrom<Task> for DdlTask {
             Task::DropDatabaseTask(drop_database) => {
                 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
             }
+            Task::AlterDatabaseTask(alter_database) => {
+                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
+            }
             Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
             Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
             Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
@@ -272,6 +285,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
             }
             DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
             DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
+            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
             DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
             DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
             DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
@@ -680,7 +694,8 @@ impl<'de> Deserialize<'de> for CreateTableTask {
 
 #[derive(Debug, PartialEq, Clone)]
 pub struct AlterTableTask {
-    pub alter_table: AlterExpr,
+    // TODO(CookiePieWw): Replace proto struct with user-defined struct
+    pub alter_table: AlterTableExpr,
 }
 
 impl AlterTableTask {
@@ -932,6 +947,125 @@ impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
     }
 }
 
+#[derive(Debug, PartialEq, Clone)]
+pub struct AlterDatabaseTask {
+    pub alter_expr: AlterDatabaseExpr,
+}
+
+impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
+    type Error = error::Error;
+
+    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
+        Ok(PbAlterDatabaseTask {
+            task: Some(task.alter_expr),
+        })
+    }
+}
+
+impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
+    type Error = error::Error;
+
+    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
+        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
+            err_msg: "expected alter database",
+        })?;
+
+        Ok(AlterDatabaseTask { alter_expr })
+    }
+}
+
+impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
+    type Error = error::Error;
+
+    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
+        match pb {
+            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
+                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
+                    options
+                        .set_database_options
+                        .into_iter()
+                        .map(SetDatabaseOption::try_from)
+                        .collect::<Result<Vec<_>>>()?,
+                )))
+            }
+            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
+                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
+                    options
+                        .keys
+                        .iter()
+                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
+                        .collect::<Result<Vec<_>>>()?,
+                )),
+            ),
+        }
+    }
+}
+
+const TTL_KEY: &str = "ttl";
+
+impl TryFrom<PbOption> for SetDatabaseOption {
+    type Error = error::Error;
+
+    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
+        match key.to_ascii_lowercase().as_str() {
+            TTL_KEY => {
+                let ttl = if value.is_empty() {
+                    Duration::from_secs(0)
+                } else {
+                    humantime::parse_duration(&value)
+                        .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?
+                };
+
+                Ok(SetDatabaseOption::Ttl(ttl))
+            }
+            _ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
+        }
+    }
+}
+
+#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
+pub enum SetDatabaseOption {
+    Ttl(Duration),
+}
+
+#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
+pub enum UnsetDatabaseOption {
+    Ttl,
+}
+
+impl TryFrom<&str> for UnsetDatabaseOption {
+    type Error = error::Error;
+
+    fn try_from(key: &str) -> Result<Self> {
+        match key.to_ascii_lowercase().as_str() {
+            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
+            _ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
+        }
+    }
+}
+
+#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
+pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
+
+#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
+pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
+
+#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
+pub enum AlterDatabaseKind {
+    SetDatabaseOptions(SetDatabaseOptions),
+    UnsetDatabaseOptions(UnsetDatabaseOptions),
+}
+
+impl AlterDatabaseTask {
+    pub fn catalog(&self) -> &str {
+        &self.alter_expr.catalog_name
+    }
+
+    pub fn schema(&self) -> &str {
+        &self.alter_expr.catalog_name
+    }
+}
+
 /// Create flow
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct CreateFlowTask {
@@ -1118,7 +1252,7 @@ impl From<QueryContext> for PbQueryContext {
 mod tests {
     use std::sync::Arc;
 
-    use api::v1::{AlterExpr, ColumnDef, CreateTableExpr, SemanticType};
+    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
     use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
     use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
     use store_api::storage::ConcreteDataType;
@@ -1146,7 +1280,7 @@ mod tests {
     #[test]
     fn test_basic_ser_de_alter_table_task() {
         let task = AlterTableTask {
-            alter_table: AlterExpr::default(),
+            alter_table: AlterTableExpr::default(),
         };
 
         let output = serde_json::to_vec(&task).unwrap();
diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs
index 17102b51cd7a..6ffab3c1f619 100644
--- a/src/frontend/src/instance.rs
+++ b/src/frontend/src/instance.rs
@@ -492,6 +492,7 @@ pub fn check_permission(
         Statement::CreateDatabase(_)
         | Statement::ShowDatabases(_)
         | Statement::DropDatabase(_)
+        | Statement::AlterDatabase(_)
         | Statement::DropFlow(_)
         | Statement::Use(_) => {}
         Statement::ShowCreateDatabase(stmt) => {
@@ -516,7 +517,7 @@ pub fn check_permission(
         Statement::CreateView(stmt) => {
             validate_param(&stmt.name, query_ctx)?;
         }
-        Statement::Alter(stmt) => {
+        Statement::AlterTable(stmt) => {
             validate_param(stmt.table_name(), query_ctx)?;
         }
         // set/show variable now only alter/show variable in session
diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs
index ca8ea462a83e..ad225bf30b4e 100644
--- a/src/frontend/src/instance/grpc.rs
+++ b/src/frontend/src/instance/grpc.rs
@@ -115,7 +115,14 @@ impl GrpcQueryHandler for Instance {
                             .await?;
                         Output::new_with_affected_rows(0)
                     }
-                    DdlExpr::Alter(expr) => {
+                    DdlExpr::AlterDatabase(expr) => {
+                        let _ = self
+                            .statement_executor
+                            .alter_database_inner(expr, ctx.clone())
+                            .await?;
+                        Output::new_with_affected_rows(0)
+                    }
+                    DdlExpr::AlterTable(expr) => {
                         self.statement_executor
                             .alter_table_inner(expr, ctx.clone())
                             .await?
@@ -195,11 +202,11 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
     }
 
     match ddl_expr {
-        Expr::CreateDatabase(_) => { /* do nothing*/ }
+        Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
         Expr::CreateTable(expr) => {
             check_and_fill!(expr);
         }
-        Expr::Alter(expr) => {
+        Expr::AlterTable(expr) => {
             check_and_fill!(expr);
         }
         Expr::DropTable(expr) => {
diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs
index 451bd5d5e8e5..59fe87a66ec3 100644
--- a/src/operator/src/expr_factory.rs
+++ b/src/operator/src/expr_factory.rs
@@ -15,13 +15,15 @@
 use std::collections::{HashMap, HashSet};
 
 use api::helper::ColumnDataTypeWrapper;
-use api::v1::alter_expr::Kind;
+use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
+use api::v1::alter_table_expr::Kind as AlterTableKind;
 use api::v1::column_def::options_from_column_schema;
 use api::v1::{
-    AddColumn, AddColumns, AlterExpr, Analyzer, ColumnDataType, ColumnDataTypeExtension,
-    CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn, DropColumns, ExpireAfter,
-    ModifyColumnType, ModifyColumnTypes, RenameTable, SemanticType, SetColumnFulltext,
-    SetTableOptions, TableName, UnsetColumnFulltext, UnsetTableOptions,
+    AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
+    ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
+    DropColumns, ExpireAfter, ModifyColumnType, ModifyColumnTypes, RenameTable, SemanticType,
+    SetColumnFulltext, SetDatabaseOptions, SetTableOptions, TableName, UnsetColumnFulltext,
+    UnsetDatabaseOptions, UnsetTableOptions,
 };
 use common_error::ext::BoxedError;
 use common_grpc_expr::util::ColumnExpr;
@@ -37,7 +39,9 @@ use session::context::QueryContextRef;
 use session::table_name::table_idents_to_full_name;
 use snafu::{ensure, OptionExt, ResultExt};
 use sql::ast::ColumnOption;
-use sql::statements::alter::{AlterTable, AlterTableOperation};
+use sql::statements::alter::{
+    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
+};
 use sql::statements::create::{
     Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint,
 };
@@ -472,10 +476,10 @@ pub fn column_schemas_to_defs(
         .collect()
 }
 
-pub(crate) fn to_alter_expr(
+pub(crate) fn to_alter_table_expr(
     alter_table: AlterTable,
     query_ctx: &QueryContextRef,
-) -> Result<AlterExpr> {
+) -> Result<AlterTableExpr> {
     let (catalog_name, schema_name, table_name) =
         table_idents_to_full_name(alter_table.table_name(), query_ctx)
             .map_err(BoxedError::new)
@@ -491,7 +495,7 @@ pub(crate) fn to_alter_expr(
         AlterTableOperation::AddColumn {
             column_def,
             location,
-        } => Kind::AddColumns(AddColumns {
+        } => AlterTableKind::AddColumns(AddColumns {
             add_columns: vec![AddColumn {
                 column_def: Some(
                     sql_column_def_to_grpc_column_def(&column_def, Some(&query_ctx.timezone()))
@@ -510,7 +514,7 @@ pub(crate) fn to_alter_expr(
             let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
                 .map(|w| w.to_parts())
                 .context(ColumnDataTypeSnafu)?;
-            Kind::ModifyColumnTypes(ModifyColumnTypes {
+            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
                 modify_column_types: vec![ModifyColumnType {
                     column_name: column_name.value,
                     target_type: target_type as i32,
@@ -518,26 +522,28 @@ pub(crate) fn to_alter_expr(
                 }],
             })
         }
-        AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns {
+        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
             drop_columns: vec![DropColumn {
                 name: name.value.to_string(),
             }],
         }),
-        AlterTableOperation::RenameTable { new_table_name } => Kind::RenameTable(RenameTable {
-            new_table_name: new_table_name.to_string(),
-        }),
+        AlterTableOperation::RenameTable { new_table_name } => {
+            AlterTableKind::RenameTable(RenameTable {
+                new_table_name: new_table_name.to_string(),
+            })
+        }
         AlterTableOperation::SetTableOptions { options } => {
-            Kind::SetTableOptions(SetTableOptions {
+            AlterTableKind::SetTableOptions(SetTableOptions {
                 table_options: options.into_iter().map(Into::into).collect(),
             })
         }
         AlterTableOperation::UnsetTableOptions { keys } => {
-            Kind::UnsetTableOptions(UnsetTableOptions { keys })
+            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
         }
         AlterTableOperation::SetColumnFulltext {
             column_name,
             options,
-        } => Kind::SetColumnFulltext(SetColumnFulltext {
+        } => AlterTableKind::SetColumnFulltext(SetColumnFulltext {
             column_name: column_name.value,
             enable: options.enable,
             analyzer: match options.analyzer {
@@ -547,13 +553,13 @@ pub(crate) fn to_alter_expr(
             case_sensitive: options.case_sensitive,
         }),
         AlterTableOperation::UnsetColumnFulltext { column_name } => {
-            Kind::UnsetColumnFulltext(UnsetColumnFulltext {
+            AlterTableKind::UnsetColumnFulltext(UnsetColumnFulltext {
                 column_name: column_name.value,
             })
         }
     };
 
-    Ok(AlterExpr {
+    Ok(AlterTableExpr {
         catalog_name,
         schema_name,
         table_name,
@@ -561,6 +567,33 @@ pub(crate) fn to_alter_expr(
     })
 }
 
+/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
+pub fn to_alter_database_expr(
+    alter_database: AlterDatabase,
+    query_ctx: &QueryContextRef,
+) -> Result<AlterDatabaseExpr> {
+    let catalog = query_ctx.current_catalog();
+    let schema = alter_database.database_name;
+
+    let kind = match alter_database.alter_operation {
+        AlterDatabaseOperation::SetDatabaseOption { options } => {
+            let options = options.into_iter().map(Into::into).collect();
+            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
+                set_database_options: options,
+            })
+        }
+        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
+            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
+        }
+    };
+
+    Ok(AlterDatabaseExpr {
+        catalog_name: catalog.to_string(),
+        schema_name: schema.to_string(),
+        kind: Some(kind),
+    })
+}
+
 /// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
 pub fn to_create_view_expr(
     stmt: CreateView,
@@ -656,6 +689,7 @@ pub fn to_create_flow_task_expr(
 
 #[cfg(test)]
 mod tests {
+    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
     use datatypes::value::Value;
     use session::context::{QueryContext, QueryContextBuilder};
     use sql::dialect::GreptimeDbDialect;
@@ -761,6 +795,55 @@ mod tests {
 
     #[test]
     fn test_to_alter_expr() {
+        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
+        let stmt =
+            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
+                .unwrap()
+                .pop()
+                .unwrap();
+
+        let Statement::AlterDatabase(alter_database) = stmt else {
+            unreachable!()
+        };
+
+        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
+        let kind = expr.kind.unwrap();
+
+        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
+            set_database_options,
+        }) = kind
+        else {
+            unreachable!()
+        };
+
+        assert_eq!(2, set_database_options.len());
+        assert_eq!("key1", set_database_options[0].key);
+        assert_eq!("value1", set_database_options[0].value);
+        assert_eq!("key2", set_database_options[1].key);
+        assert_eq!("value2", set_database_options[1].value);
+
+        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
+        let stmt =
+            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
+                .unwrap()
+                .pop()
+                .unwrap();
+
+        let Statement::AlterDatabase(alter_database) = stmt else {
+            unreachable!()
+        };
+
+        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
+        let kind = expr.kind.unwrap();
+
+        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
+            unreachable!()
+        };
+
+        assert_eq!(2, keys.len());
+        assert!(keys.contains(&"key1".to_string()));
+        assert!(keys.contains(&"key2".to_string()));
+
         let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
         let stmt =
             ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
@@ -768,15 +851,15 @@ mod tests {
                 .pop()
                 .unwrap();
 
-        let Statement::Alter(alter_table) = stmt else {
+        let Statement::AlterTable(alter_table) = stmt else {
             unreachable!()
         };
 
         // query context with system timezone UTC.
-        let expr = to_alter_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
+        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
         let kind = expr.kind.unwrap();
 
-        let Kind::AddColumns(AddColumns { add_columns, .. }) = kind else {
+        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
             unreachable!()
         };
 
@@ -794,10 +877,10 @@ mod tests {
             .timezone(Timezone::from_tz_string("+08:00").unwrap())
             .build()
             .into();
-        let expr = to_alter_expr(alter_table, &ctx).unwrap();
+        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
         let kind = expr.kind.unwrap();
 
-        let Kind::AddColumns(AddColumns { add_columns, .. }) = kind else {
+        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
             unreachable!()
         };
 
@@ -819,15 +902,15 @@ mod tests {
                 .pop()
                 .unwrap();
 
-        let Statement::Alter(alter_table) = stmt else {
+        let Statement::AlterTable(alter_table) = stmt else {
             unreachable!()
         };
 
         // query context with system timezone UTC.
-        let expr = to_alter_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
+        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
         let kind = expr.kind.unwrap();
 
-        let Kind::ModifyColumnTypes(ModifyColumnTypes {
+        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
             modify_column_types,
         }) = kind
         else {
diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs
index cae919f770f3..4637f7fd10bb 100644
--- a/src/operator/src/insert.rs
+++ b/src/operator/src/insert.rs
@@ -15,11 +15,11 @@
 use std::collections::HashMap;
 use std::sync::Arc;
 
-use api::v1::alter_expr::Kind;
+use api::v1::alter_table_expr::Kind;
 use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader};
 use api::v1::{
-    AlterExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest,
-    RowInsertRequests, SemanticType,
+    AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
+    RowInsertRequest, RowInsertRequests, SemanticType,
 };
 use catalog::CatalogManagerRef;
 use client::{OutputData, OutputMeta};
@@ -692,7 +692,7 @@ impl Inserter {
         req: &RowInsertRequest,
         table: &TableRef,
         ctx: &QueryContextRef,
-    ) -> Result<Option<AlterExpr>> {
+    ) -> Result<Option<AlterTableExpr>> {
         let catalog_name = ctx.current_catalog();
         let schema_name = ctx.current_schema();
         let table_name = table.table_info().name.clone();
@@ -705,7 +705,7 @@ impl Inserter {
             return Ok(None);
         };
 
-        Ok(Some(AlterExpr {
+        Ok(Some(AlterTableExpr {
             catalog_name: catalog_name.to_string(),
             schema_name: schema_name.to_string(),
             table_name: table_name.to_string(),
diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs
index 271e1b75e0fb..64417dbd6b0d 100644
--- a/src/operator/src/statement.rs
+++ b/src/operator/src/statement.rs
@@ -224,7 +224,12 @@ impl StatementExecutor {
                 )
                 .await
             }
-            Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
+            Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
+
+            Statement::AlterDatabase(alter_database) => {
+                self.alter_database(alter_database, query_ctx).await
+            }
+
             Statement::DropTable(stmt) => {
                 let mut table_names = Vec::with_capacity(stmt.table_names().len());
                 for table_name_stmt in stmt.table_names() {
@@ -282,6 +287,7 @@ impl StatementExecutor {
                     .context(SchemaNotFoundSnafu {
                         schema_info: &database,
                     })?
+                    .into_inner()
                     .into();
 
                 self.show_create_database(&database, opts.into()).await
diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs
index 222126952066..ed96ca6f1833 100644
--- a/src/operator/src/statement/ddl.rs
+++ b/src/operator/src/statement/ddl.rs
@@ -17,7 +17,9 @@ use std::sync::Arc;
 
 use api::helper::ColumnDataTypeWrapper;
 use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
-use api::v1::{column_def, AlterExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr};
+use api::v1::{
+    column_def, AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
+};
 use catalog::CatalogManagerRef;
 use chrono::Utc;
 use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -26,7 +28,7 @@ use common_error::ext::BoxedError;
 use common_meta::cache_invalidator::Context;
 use common_meta::ddl::ExecutorContext;
 use common_meta::instruction::CacheIdent;
-use common_meta::key::schema_name::SchemaNameKey;
+use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
 use common_meta::key::NAME_PATTERN;
 use common_meta::rpc::ddl::{
     CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
@@ -51,7 +53,7 @@ use regex::Regex;
 use session::context::QueryContextRef;
 use session::table_name::table_idents_to_full_name;
 use snafu::{ensure, OptionExt, ResultExt};
-use sql::statements::alter::AlterTable;
+use sql::statements::alter::{AlterDatabase, AlterTable};
 use sql::statements::create::{
     CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
 };
@@ -125,7 +127,8 @@ impl StatementExecutor {
                 schema: &schema,
             })
             .await
-            .context(TableMetadataManagerSnafu)?;
+            .context(TableMetadataManagerSnafu)?
+            .map(|v| v.into_inner());
 
         let quote_style = ctx.quote_style();
         let mut create_stmt =
@@ -723,7 +726,7 @@ impl StatementExecutor {
     #[tracing::instrument(skip_all)]
     pub async fn alter_logical_tables(
         &self,
-        alter_table_exprs: Vec<AlterExpr>,
+        alter_table_exprs: Vec<AlterTableExpr>,
         query_context: QueryContextRef,
     ) -> Result<Output> {
         let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
@@ -885,7 +888,7 @@ impl StatementExecutor {
         &self,
         table_id: TableId,
         table_info: Arc<TableInfo>,
-        expr: AlterExpr,
+        expr: AlterTableExpr,
     ) -> Result<()> {
         let request: AlterTableRequest = common_grpc_expr::alter_expr_to_request(table_id, expr)
             .context(AlterExprToRequestSnafu)?;
@@ -921,14 +924,14 @@ impl StatementExecutor {
         alter_table: AlterTable,
         query_context: QueryContextRef,
     ) -> Result<Output> {
-        let expr = expr_factory::to_alter_expr(alter_table, &query_context)?;
+        let expr = expr_factory::to_alter_table_expr(alter_table, &query_context)?;
         self.alter_table_inner(expr, query_context).await
     }
 
     #[tracing::instrument(skip_all)]
     pub async fn alter_table_inner(
         &self,
-        expr: AlterExpr,
+        expr: AlterTableExpr,
         query_context: QueryContextRef,
     ) -> Result<Output> {
         ensure!(
@@ -1041,6 +1044,58 @@ impl StatementExecutor {
         Ok(Output::new_with_affected_rows(0))
     }
 
+    #[tracing::instrument(skip_all)]
+    pub async fn alter_database(
+        &self,
+        alter_expr: AlterDatabase,
+        query_context: QueryContextRef,
+    ) -> Result<Output> {
+        let alter_expr = expr_factory::to_alter_database_expr(alter_expr, &query_context)?;
+        self.alter_database_inner(alter_expr, query_context).await
+    }
+
+    #[tracing::instrument(skip_all)]
+    pub async fn alter_database_inner(
+        &self,
+        alter_expr: AlterDatabaseExpr,
+        query_context: QueryContextRef,
+    ) -> Result<Output> {
+        ensure!(
+            !is_readonly_schema(&alter_expr.schema_name),
+            SchemaReadOnlySnafu {
+                name: query_context.current_schema().clone()
+            }
+        );
+
+        let exists = self
+            .catalog_manager
+            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
+            .await
+            .context(CatalogSnafu)?;
+        ensure!(
+            exists,
+            SchemaNotFoundSnafu {
+                schema_info: alter_expr.schema_name,
+            }
+        );
+
+        let cache_ident = [CacheIdent::SchemaName(SchemaName {
+            catalog_name: alter_expr.catalog_name.clone(),
+            schema_name: alter_expr.schema_name.clone(),
+        })];
+
+        self.alter_database_procedure(alter_expr, query_context)
+            .await?;
+
+        // Invalidates local cache ASAP.
+        self.cache_invalidator
+            .invalidate(&Context::default(), &cache_ident)
+            .await
+            .context(error::InvalidateTableCacheSnafu)?;
+
+        Ok(Output::new_with_affected_rows(0))
+    }
+
     async fn create_table_procedure(
         &self,
         create_table: CreateTableExpr,
@@ -1079,7 +1134,7 @@ impl StatementExecutor {
 
     async fn alter_logical_tables_procedure(
         &self,
-        tables_data: Vec<AlterExpr>,
+        tables_data: Vec<AlterTableExpr>,
         query_context: QueryContextRef,
     ) -> Result<SubmitDdlTaskResponse> {
         let request = SubmitDdlTaskRequest {
@@ -1135,6 +1190,22 @@ impl StatementExecutor {
             .context(error::ExecuteDdlSnafu)
     }
 
+    async fn alter_database_procedure(
+        &self,
+        alter_expr: AlterDatabaseExpr,
+        query_context: QueryContextRef,
+    ) -> Result<SubmitDdlTaskResponse> {
+        let request = SubmitDdlTaskRequest {
+            query_context,
+            task: DdlTask::new_alter_database(alter_expr),
+        };
+
+        self.procedure_executor
+            .submit_ddl_task(&ExecutorContext::default(), request)
+            .await
+            .context(error::ExecuteDdlSnafu)
+    }
+
     async fn truncate_table_procedure(
         &self,
         table_name: &TableName,
diff --git a/src/operator/src/statement/show.rs b/src/operator/src/statement/show.rs
index 111a70ab194d..210ec4e7f28f 100644
--- a/src/operator/src/statement/show.rs
+++ b/src/operator/src/statement/show.rs
@@ -127,7 +127,8 @@ impl StatementExecutor {
                 schema: &table_name.schema_name,
             })
             .await
-            .context(TableMetadataManagerSnafu)?;
+            .context(TableMetadataManagerSnafu)?
+            .map(|v| v.into_inner());
 
         let partitions = self
             .partition_manager
diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml
index e459c1d01559..e3340a8f6c90 100644
--- a/src/sql/Cargo.toml
+++ b/src/sql/Cargo.toml
@@ -24,6 +24,7 @@ datafusion-physical-expr.workspace = true
 datafusion-sql.workspace = true
 datatypes.workspace = true
 hex = "0.4"
+humantime.workspace = true
 iso8601 = "0.6.1"
 itertools.workspace = true
 jsonb.workspace = true
@@ -33,6 +34,7 @@ serde_json.workspace = true
 snafu.workspace = true
 sqlparser.workspace = true
 sqlparser_derive = "0.1"
+store-api.workspace = true
 table.workspace = true
 
 [dev-dependencies]
diff --git a/src/sql/src/parsers/alter_parser.rs b/src/sql/src/parsers/alter_parser.rs
index 4fa2a3f865bf..e64a734ebf99 100644
--- a/src/sql/src/parsers/alter_parser.rs
+++ b/src/sql/src/parsers/alter_parser.rs
@@ -25,19 +25,78 @@ use sqlparser::tokenizer::Token;
 use crate::error::{self, InvalidColumnOptionSnafu, Result, SetFulltextOptionSnafu};
 use crate::parser::ParserContext;
 use crate::parsers::utils::validate_column_fulltext_create_option;
-use crate::statements::alter::{AlterTable, AlterTableOperation, TableOption};
+use crate::statements::alter::{
+    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation, KeyValueOption,
+};
 use crate::statements::statement::Statement;
 use crate::util::parse_option_string;
 
 impl ParserContext<'_> {
     pub(crate) fn parse_alter(&mut self) -> Result<Statement> {
-        let alter_table = self.parse_alter_table()?;
-        Ok(Statement::Alter(alter_table))
+        let _ = self.parser.expect_keyword(Keyword::ALTER);
+        match self.parser.peek_token().token {
+            Token::Word(w) => match w.keyword {
+                Keyword::DATABASE => self.parse_alter_database().map(Statement::AlterDatabase),
+                Keyword::TABLE => self.parse_alter_table().map(Statement::AlterTable),
+                _ => self.expected("DATABASE or TABLE after ALTER", self.parser.peek_token()),
+            },
+            unexpected => self.unsupported(unexpected.to_string()),
+        }
+    }
+
+    fn parse_alter_database(&mut self) -> Result<AlterDatabase> {
+        self.parser
+            .expect_keyword(Keyword::DATABASE)
+            .context(error::SyntaxSnafu)?;
+
+        let database_name = self
+            .parser
+            .parse_object_name(false)
+            .context(error::SyntaxSnafu)?;
+        let database_name = Self::canonicalize_object_name(database_name);
+
+        match self.parser.peek_token().token {
+            Token::Word(w) => {
+                if w.value.eq_ignore_ascii_case("UNSET") {
+                    let _ = self.parser.next_token();
+                    let keys = self
+                        .parser
+                        .parse_comma_separated(parse_string_option_names)
+                        .context(error::SyntaxSnafu)?
+                        .into_iter()
+                        .map(|name| name.to_string())
+                        .collect();
+                    Ok(AlterDatabase::new(
+                        database_name,
+                        AlterDatabaseOperation::UnsetDatabaseOption { keys },
+                    ))
+                } else if w.keyword == Keyword::SET {
+                    let _ = self.parser.next_token();
+                    let options = self
+                        .parser
+                        .parse_comma_separated(parse_string_options)
+                        .context(error::SyntaxSnafu)?
+                        .into_iter()
+                        .map(|(key, value)| KeyValueOption { key, value })
+                        .collect();
+                    Ok(AlterDatabase::new(
+                        database_name,
+                        AlterDatabaseOperation::SetDatabaseOption { options },
+                    ))
+                } else {
+                    self.expected(
+                        "SET or UNSET after ALTER DATABASE",
+                        self.parser.peek_token(),
+                    )
+                }
+            }
+            unexpected => self.unsupported(unexpected.to_string()),
+        }
     }
 
     fn parse_alter_table(&mut self) -> Result<AlterTable> {
         self.parser
-            .expect_keywords(&[Keyword::ALTER, Keyword::TABLE])
+            .expect_keyword(Keyword::TABLE)
             .context(error::SyntaxSnafu)?;
 
         let raw_table_name = self
@@ -89,7 +148,7 @@ impl ParserContext<'_> {
                                 .parse_comma_separated(parse_string_options)
                                 .context(error::SyntaxSnafu)?
                                 .into_iter()
-                                .map(|(key, value)| TableOption { key, value })
+                                .map(|(key, value)| KeyValueOption { key, value })
                                 .collect();
                             AlterTableOperation::SetTableOptions { options }
                         }
@@ -261,6 +320,67 @@ mod tests {
     use super::*;
     use crate::dialect::GreptimeDbDialect;
     use crate::parser::ParseOptions;
+    use crate::statements::alter::AlterDatabaseOperation;
+
+    #[test]
+    fn test_parse_alter_database() {
+        let sql = "ALTER DATABASE test_db SET 'a'='A', 'b' = 'B'";
+        let mut result =
+            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
+                .unwrap();
+        assert_eq!(1, result.len());
+
+        let statement = result.remove(0);
+        assert_matches!(statement, Statement::AlterDatabase { .. });
+        match statement {
+            Statement::AlterDatabase(alter_database) => {
+                assert_eq!("test_db", alter_database.database_name().0[0].value);
+
+                let alter_operation = alter_database.alter_operation();
+                assert_matches!(
+                    alter_operation,
+                    AlterDatabaseOperation::SetDatabaseOption { .. }
+                );
+                match alter_operation {
+                    AlterDatabaseOperation::SetDatabaseOption { options } => {
+                        assert_eq!(2, options.len());
+                        assert_eq!("a", options[0].key);
+                        assert_eq!("A", options[0].value);
+                        assert_eq!("b", options[1].key);
+                        assert_eq!("B", options[1].value);
+                    }
+                    _ => unreachable!(),
+                }
+            }
+            _ => unreachable!(),
+        }
+        let sql = "ALTER DATABASE test_db UNSET 'a', 'b'";
+        let mut result =
+            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
+                .unwrap();
+        assert_eq!(1, result.len());
+        let statement = result.remove(0);
+        assert_matches!(statement, Statement::AlterDatabase { .. });
+        match statement {
+            Statement::AlterDatabase(alter_database) => {
+                assert_eq!("test_db", alter_database.database_name().0[0].value);
+                let alter_operation = alter_database.alter_operation();
+                assert_matches!(
+                    alter_operation,
+                    AlterDatabaseOperation::UnsetDatabaseOption { .. }
+                );
+                match alter_operation {
+                    AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
+                        assert_eq!(2, keys.len());
+                        assert_eq!("a", keys[0]);
+                        assert_eq!("b", keys[1]);
+                    }
+                    _ => unreachable!(),
+                }
+            }
+            _ => unreachable!(),
+        }
+    }
 
     #[test]
     fn test_parse_alter_add_column() {
@@ -271,9 +391,9 @@ mod tests {
         assert_eq!(1, result.len());
 
         let statement = result.remove(0);
-        assert_matches!(statement, Statement::Alter { .. });
+        assert_matches!(statement, Statement::AlterTable { .. });
         match statement {
-            Statement::Alter(alter_table) => {
+            Statement::AlterTable(alter_table) => {
                 assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
 
                 let alter_operation = alter_table.alter_operation();
@@ -307,9 +427,9 @@ mod tests {
         assert_eq!(1, result.len());
 
         let statement = result.remove(0);
-        assert_matches!(statement, Statement::Alter { .. });
+        assert_matches!(statement, Statement::AlterTable { .. });
         match statement {
-            Statement::Alter(alter_table) => {
+            Statement::AlterTable(alter_table) => {
                 assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
 
                 let alter_operation = alter_table.alter_operation();
@@ -343,9 +463,9 @@ mod tests {
         assert_eq!(1, result.len());
 
         let statement = result.remove(0);
-        assert_matches!(statement, Statement::Alter { .. });
+        assert_matches!(statement, Statement::AlterTable { .. });
         match statement {
-            Statement::Alter(alter_table) => {
+            Statement::AlterTable(alter_table) => {
                 assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
 
                 let alter_operation = alter_table.alter_operation();
@@ -394,9 +514,9 @@ mod tests {
         assert_eq!(1, result.len());
 
         let statement = result.remove(0);
-        assert_matches!(statement, Statement::Alter { .. });
+        assert_matches!(statement, Statement::AlterTable { .. });
         match statement {
-            Statement::Alter(alter_table) => {
+            Statement::AlterTable(alter_table) => {
                 assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
 
                 let alter_operation = alter_table.alter_operation();
@@ -433,9 +553,9 @@ mod tests {
         assert_eq!(1, result_2.len());
 
         let statement = result_2.remove(0);
-        assert_matches!(statement, Statement::Alter { .. });
+        assert_matches!(statement, Statement::AlterTable { .. });
         match statement {
-            Statement::Alter(alter_table) => {
+            Statement::AlterTable(alter_table) => {
                 assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
 
                 let alter_operation = alter_table.alter_operation();
@@ -469,7 +589,7 @@ mod tests {
         .unwrap();
 
         match result_1.remove(0) {
-            Statement::Alter(alter_table) => {
+            Statement::AlterTable(alter_table) => {
                 assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
 
                 let alter_operation = alter_table.alter_operation();
@@ -500,7 +620,7 @@ mod tests {
         .unwrap();
 
         match result_2.remove(0) {
-            Statement::Alter(alter_table) => {
+            Statement::AlterTable(alter_table) => {
                 assert_eq!("my_metric_1", alter_table.table_name().0[0].value);
 
                 let alter_operation = alter_table.alter_operation();
@@ -539,9 +659,9 @@ mod tests {
         assert_eq!(1, result.len());
 
         let statement = result.remove(0);
-        assert_matches!(statement, Statement::Alter { .. });
+        assert_matches!(statement, Statement::AlterTable { .. });
         match statement {
-            Statement::Alter(alter_table) => {
+            Statement::AlterTable(alter_table) => {
                 assert_eq!("test_table", alter_table.table_name().0[0].value);
 
                 let alter_operation = alter_table.alter_operation();
@@ -562,7 +682,7 @@ mod tests {
             ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
                 .unwrap();
         assert_eq!(1, result.len());
-        let Statement::Alter(alter) = &result[0] else {
+        let Statement::AlterTable(alter) = &result[0] else {
             unreachable!()
         };
         assert_eq!("test_table", alter.table_name.0[0].value);
@@ -583,7 +703,7 @@ mod tests {
             ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
                 .unwrap();
         assert_eq!(1, result.len());
-        let Statement::Alter(alter) = &result[0] else {
+        let Statement::AlterTable(alter) = &result[0] else {
             unreachable!()
         };
         assert_eq!("test_table", alter.table_name.0[0].value);
@@ -638,9 +758,9 @@ mod tests {
 
         assert_eq!(1, result.len());
         let statement = result.remove(0);
-        assert_matches!(statement, Statement::Alter { .. });
+        assert_matches!(statement, Statement::AlterTable { .. });
         match statement {
-            Statement::Alter(alter_table) => {
+            Statement::AlterTable(alter_table) => {
                 assert_eq!("test_table", alter_table.table_name().0[0].value);
 
                 let alter_operation = alter_table.alter_operation();
@@ -675,9 +795,9 @@ mod tests {
                 .unwrap();
         assert_eq!(1, result.len());
         let statement = result.remove(0);
-        assert_matches!(statement, Statement::Alter { .. });
+        assert_matches!(statement, Statement::AlterTable { .. });
         match statement {
-            Statement::Alter(alter_table) => {
+            Statement::AlterTable(alter_table) => {
                 assert_eq!("test_table", alter_table.table_name().0[0].value);
 
                 let alter_operation = alter_table.alter_operation();
diff --git a/src/sql/src/statements/alter.rs b/src/sql/src/statements/alter.rs
index 543c6e827294..cf59257e8931 100644
--- a/src/sql/src/statements/alter.rs
+++ b/src/sql/src/statements/alter.rs
@@ -72,7 +72,7 @@ pub enum AlterTableOperation {
     },
     /// `SET <table attrs key> = <table attr value>`
     SetTableOptions {
-        options: Vec<TableOption>,
+        options: Vec<KeyValueOption>,
     },
     UnsetTableOptions {
         keys: Vec<String>,
@@ -123,7 +123,7 @@ impl Display for AlterTableOperation {
             AlterTableOperation::SetTableOptions { options } => {
                 let kvs = options
                     .iter()
-                    .map(|TableOption { key, value }| {
+                    .map(|KeyValueOption { key, value }| {
                         if !value.is_empty() {
                             format!("'{key}'='{value}'")
                         } else {
@@ -152,20 +152,86 @@ impl Display for AlterTableOperation {
 }
 
 #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
-pub struct TableOption {
+pub struct KeyValueOption {
     pub key: String,
     pub value: String,
 }
 
-impl From<TableOption> for v1::TableOption {
-    fn from(c: TableOption) -> Self {
-        v1::TableOption {
+impl From<KeyValueOption> for v1::Option {
+    fn from(c: KeyValueOption) -> Self {
+        v1::Option {
             key: c.key,
             value: c.value,
         }
     }
 }
 
+#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
+pub struct AlterDatabase {
+    pub database_name: ObjectName,
+    pub alter_operation: AlterDatabaseOperation,
+}
+
+impl AlterDatabase {
+    pub(crate) fn new(database_name: ObjectName, alter_operation: AlterDatabaseOperation) -> Self {
+        Self {
+            database_name,
+            alter_operation,
+        }
+    }
+
+    pub fn database_name(&self) -> &ObjectName {
+        &self.database_name
+    }
+
+    pub fn alter_operation(&self) -> &AlterDatabaseOperation {
+        &self.alter_operation
+    }
+}
+
+impl Display for AlterDatabase {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let database_name = self.database_name();
+        let alter_operation = self.alter_operation();
+        write!(f, r#"ALTER DATABASE {database_name} {alter_operation}"#)
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
+pub enum AlterDatabaseOperation {
+    SetDatabaseOption { options: Vec<KeyValueOption> },
+    UnsetDatabaseOption { keys: Vec<String> },
+}
+
+impl Display for AlterDatabaseOperation {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            AlterDatabaseOperation::SetDatabaseOption { options } => {
+                let kvs = options
+                    .iter()
+                    .map(|KeyValueOption { key, value }| {
+                        if !value.is_empty() {
+                            format!("'{key}'='{value}'")
+                        } else {
+                            format!("'{key}'=NULL")
+                        }
+                    })
+                    .join(",");
+
+                write!(f, "SET {kvs}")?;
+
+                Ok(())
+            }
+            AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
+                let keys = keys.iter().map(|key| format!("'{key}'")).join(",");
+                write!(f, "UNSET {keys}")?;
+
+                Ok(())
+            }
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use std::assert_matches::assert_matches;
@@ -176,15 +242,56 @@ mod tests {
 
     #[test]
     fn test_display_alter() {
+        let sql = r"ALTER DATABASE db SET 'a' = 'b', 'c' = 'd'";
+        let stmts =
+            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
+                .unwrap();
+        assert_eq!(1, stmts.len());
+        assert_matches!(&stmts[0], Statement::AlterDatabase { .. });
+
+        match &stmts[0] {
+            Statement::AlterDatabase(set) => {
+                let new_sql = format!("\n{}", set);
+                assert_eq!(
+                    r#"
+ALTER DATABASE db SET 'a'='b','c'='d'"#,
+                    &new_sql
+                );
+            }
+            _ => {
+                unreachable!();
+            }
+        }
+
+        let sql = r"ALTER DATABASE db UNSET 'a', 'c'";
+        let stmts =
+            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
+                .unwrap();
+        assert_eq!(1, stmts.len());
+
+        match &stmts[0] {
+            Statement::AlterDatabase(set) => {
+                let new_sql = format!("\n{}", set);
+                assert_eq!(
+                    r#"
+ALTER DATABASE db UNSET 'a','c'"#,
+                    &new_sql
+                );
+            }
+            _ => {
+                unreachable!();
+            }
+        }
+
         let sql = r"alter table monitor add column app string default 'shop' primary key;";
         let stmts =
             ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
                 .unwrap();
         assert_eq!(1, stmts.len());
-        assert_matches!(&stmts[0], Statement::Alter { .. });
+        assert_matches!(&stmts[0], Statement::AlterTable { .. });
 
         match &stmts[0] {
-            Statement::Alter(set) => {
+            Statement::AlterTable(set) => {
                 let new_sql = format!("\n{}", set);
                 assert_eq!(
                     r#"
@@ -202,10 +309,10 @@ ALTER TABLE monitor ADD COLUMN app STRING DEFAULT 'shop' PRIMARY KEY"#,
             ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
                 .unwrap();
         assert_eq!(1, stmts.len());
-        assert_matches!(&stmts[0], Statement::Alter { .. });
+        assert_matches!(&stmts[0], Statement::AlterTable { .. });
 
         match &stmts[0] {
-            Statement::Alter(set) => {
+            Statement::AlterTable(set) => {
                 let new_sql = format!("\n{}", set);
                 assert_eq!(
                     r#"
@@ -223,10 +330,10 @@ ALTER TABLE monitor MODIFY COLUMN load_15 STRING"#,
             ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
                 .unwrap();
         assert_eq!(1, stmts.len());
-        assert_matches!(&stmts[0], Statement::Alter { .. });
+        assert_matches!(&stmts[0], Statement::AlterTable { .. });
 
         match &stmts[0] {
-            Statement::Alter(set) => {
+            Statement::AlterTable(set) => {
                 let new_sql = format!("\n{}", set);
                 assert_eq!(
                     r#"
@@ -244,10 +351,10 @@ ALTER TABLE monitor DROP COLUMN load_15"#,
             ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
                 .unwrap();
         assert_eq!(1, stmts.len());
-        assert_matches!(&stmts[0], Statement::Alter { .. });
+        assert_matches!(&stmts[0], Statement::AlterTable { .. });
 
         match &stmts[0] {
-            Statement::Alter(set) => {
+            Statement::AlterTable(set) => {
                 let new_sql = format!("\n{}", set);
                 assert_eq!(
                     r#"
@@ -265,10 +372,10 @@ ALTER TABLE monitor RENAME monitor_new"#,
             ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
                 .unwrap();
         assert_eq!(1, stmts.len());
-        assert_matches!(&stmts[0], Statement::Alter { .. });
+        assert_matches!(&stmts[0], Statement::AlterTable { .. });
 
         match &stmts[0] {
-            Statement::Alter(set) => {
+            Statement::AlterTable(set) => {
                 let new_sql = format!("\n{}", set);
                 assert_eq!(
                     r#"
@@ -286,10 +393,10 @@ ALTER TABLE monitor MODIFY COLUMN a SET FULLTEXT WITH(analyzer=English, case_sen
             ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
                 .unwrap();
         assert_eq!(1, stmts.len());
-        assert_matches!(&stmts[0], Statement::Alter { .. });
+        assert_matches!(&stmts[0], Statement::AlterTable { .. });
 
         match &stmts[0] {
-            Statement::Alter(set) => {
+            Statement::AlterTable(set) => {
                 let new_sql = format!("\n{}", set);
                 assert_eq!(
                     r#"
diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs
index 803066a6036f..0c4b324cd63f 100644
--- a/src/sql/src/statements/statement.rs
+++ b/src/sql/src/statements/statement.rs
@@ -20,7 +20,7 @@ use sqlparser_derive::{Visit, VisitMut};
 
 use crate::error::{ConvertToDfStatementSnafu, Error};
 use crate::statements::admin::Admin;
-use crate::statements::alter::AlterTable;
+use crate::statements::alter::{AlterDatabase, AlterTable};
 use crate::statements::create::{
     CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView,
 };
@@ -70,7 +70,9 @@ pub enum Statement {
     // CREATE DATABASE
     CreateDatabase(CreateDatabase),
     /// ALTER TABLE
-    Alter(AlterTable),
+    AlterTable(AlterTable),
+    /// ALTER DATABASE
+    AlterDatabase(AlterDatabase),
     // Databases.
     ShowDatabases(ShowDatabases),
     // SHOW TABLES
@@ -133,7 +135,8 @@ impl Display for Statement {
             Statement::DropDatabase(s) => s.fmt(f),
             Statement::DropView(s) => s.fmt(f),
             Statement::CreateDatabase(s) => s.fmt(f),
-            Statement::Alter(s) => s.fmt(f),
+            Statement::AlterTable(s) => s.fmt(f),
+            Statement::AlterDatabase(s) => s.fmt(f),
             Statement::ShowDatabases(s) => s.fmt(f),
             Statement::ShowTables(s) => s.fmt(f),
             Statement::ShowTableStatus(s) => s.fmt(f),
diff --git a/src/sql/src/statements/transform/type_alias.rs b/src/sql/src/statements/transform/type_alias.rs
index 04756e761755..9e51ca918041 100644
--- a/src/sql/src/statements/transform/type_alias.rs
+++ b/src/sql/src/statements/transform/type_alias.rs
@@ -52,7 +52,7 @@ impl TransformRule for TypeAliasTransformRule {
                     .iter_mut()
                     .for_each(|column| replace_type_alias(column.mut_data_type()));
             }
-            Statement::Alter(alter_table) => {
+            Statement::AlterTable(alter_table) => {
                 if let AlterTableOperation::ModifyColumnType { target_type, .. } =
                     alter_table.alter_operation_mut()
                 {
diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs
index dcc6a9ef9b49..dc37e4a2cce4 100644
--- a/src/store-api/src/region_request.rs
+++ b/src/store-api/src/region_request.rs
@@ -24,7 +24,7 @@ use api::v1::region::{
     CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
     FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
 };
-use api::v1::{self, Analyzer, Rows, SemanticType, TableOption};
+use api::v1::{self, Analyzer, Option as PbOption, Rows, SemanticType};
 pub use common_base::AffectedRows;
 use datatypes::data_type::ConcreteDataType;
 use datatypes::schema::FulltextOptions;
@@ -751,12 +751,11 @@ pub enum SetRegionOption {
     Twsc(String, String),
 }
 
-impl TryFrom<&TableOption> for SetRegionOption {
+impl TryFrom<&PbOption> for SetRegionOption {
     type Error = MetadataError;
 
-    fn try_from(value: &TableOption) -> std::result::Result<Self, Self::Error> {
-        let TableOption { key, value } = value;
-
+    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
+        let PbOption { key, value } = value;
         match key.as_str() {
             TTL_KEY => {
                 let ttl = if value.is_empty() {
diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs
index 5e5e402b8557..1fd5e4239dd6 100644
--- a/tests-integration/src/grpc.rs
+++ b/tests-integration/src/grpc.rs
@@ -23,7 +23,7 @@ mod test {
     use api::v1::query_request::Query;
     use api::v1::region::QueryRequest as RegionQueryRequest;
     use api::v1::{
-        alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType,
+        alter_table_expr, AddColumn, AddColumns, AlterTableExpr, Column, ColumnDataType,
         ColumnDataTypeExtension, ColumnDef, CreateDatabaseExpr, CreateTableExpr, DdlRequest,
         DeleteRequest, DeleteRequests, DropTableExpr, InsertRequest, InsertRequests, QueryRequest,
         SemanticType, VectorTypeExtension,
@@ -116,11 +116,11 @@ mod test {
         assert!(matches!(output.data, OutputData::AffectedRows(0)));
 
         let request = Request::Ddl(DdlRequest {
-            expr: Some(DdlExpr::Alter(AlterExpr {
+            expr: Some(DdlExpr::AlterTable(AlterTableExpr {
                 catalog_name: "greptime".to_string(),
                 schema_name: "database_created_through_grpc".to_string(),
                 table_name: "table_created_through_grpc".to_string(),
-                kind: Some(alter_expr::Kind::AddColumns(AddColumns {
+                kind: Some(alter_table_expr::Kind::AddColumns(AddColumns {
                     add_columns: vec![AddColumn {
                         column_def: Some(ColumnDef {
                             name: "b".to_string(),
diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs
index 26c0385c330a..8b91ed55d520 100644
--- a/tests-integration/tests/grpc.rs
+++ b/tests-integration/tests/grpc.rs
@@ -12,10 +12,10 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use api::v1::alter_expr::Kind;
+use api::v1::alter_table_expr::Kind;
 use api::v1::promql_request::Promql;
 use api::v1::{
-    column, AddColumn, AddColumns, AlterExpr, Basic, Column, ColumnDataType, ColumnDef,
+    column, AddColumn, AddColumns, AlterTableExpr, Basic, Column, ColumnDataType, ColumnDef,
     CreateTableExpr, InsertRequest, InsertRequests, PromInstantQuery, PromRangeQuery,
     PromqlRequest, RequestHeader, SemanticType,
 };
@@ -374,7 +374,7 @@ pub async fn test_insert_and_select(store_type: StorageType) {
             location: None,
         }],
     });
-    let expr = AlterExpr {
+    let expr = AlterTableExpr {
         catalog_name: DEFAULT_CATALOG_NAME.to_string(),
         schema_name: DEFAULT_SCHEMA_NAME.to_string(),
         table_name: "demo".to_string(),
diff --git a/tests/cases/standalone/common/alter/alter_database.result b/tests/cases/standalone/common/alter/alter_database.result
new file mode 100644
index 000000000000..a98d48323659
--- /dev/null
+++ b/tests/cases/standalone/common/alter/alter_database.result
@@ -0,0 +1,107 @@
+CREATE DATABASE alter_database;
+
+Affected Rows: 1
+
+SHOW CREATE DATABASE alter_database;
+
++----------------+----------------------------------------------+
+| Database       | Create Database                              |
++----------------+----------------------------------------------+
+| alter_database | CREATE DATABASE IF NOT EXISTS alter_database |
++----------------+----------------------------------------------+
+
+ALTER DATABASE alter_database SET 'ttl'='10s';
+
+Affected Rows: 0
+
+SHOW CREATE DATABASE alter_database;
+
++----------------+----------------------------------------------+
+| Database       | Create Database                              |
++----------------+----------------------------------------------+
+| alter_database | CREATE DATABASE IF NOT EXISTS alter_database |
+|                | WITH(                                        |
+|                |   ttl = '10s'                                |
+|                | )                                            |
++----------------+----------------------------------------------+
+
+ALTER DATABASE alter_database SET 'ttl'='20s';
+
+Affected Rows: 0
+
+SHOW CREATE DATABASE alter_database;
+
++----------------+----------------------------------------------+
+| Database       | Create Database                              |
++----------------+----------------------------------------------+
+| alter_database | CREATE DATABASE IF NOT EXISTS alter_database |
+|                | WITH(                                        |
+|                |   ttl = '20s'                                |
+|                | )                                            |
++----------------+----------------------------------------------+
+
+-- SQLNESS ARG restart=true
+SHOW CREATE DATABASE alter_database;
+
++----------------+----------------------------------------------+
+| Database       | Create Database                              |
++----------------+----------------------------------------------+
+| alter_database | CREATE DATABASE IF NOT EXISTS alter_database |
+|                | WITH(                                        |
+|                |   ttl = '20s'                                |
+|                | )                                            |
++----------------+----------------------------------------------+
+
+ALTER DATABASE alter_database SET 'ttl'='';
+
+Affected Rows: 0
+
+SHOW CREATE DATABASE alter_database;
+
++----------------+----------------------------------------------+
+| Database       | Create Database                              |
++----------------+----------------------------------------------+
+| alter_database | CREATE DATABASE IF NOT EXISTS alter_database |
++----------------+----------------------------------------------+
+
+ALTER DATABASE alter_database SET 'ttl'='😁';
+
+Error: 1004(InvalidArguments), Invalid set database option, key: ttl, value: 😁
+
+ALTER DATABASE alter_database SET '🕶️'='1s';
+
+Error: 1004(InvalidArguments), Invalid set database option, key: 🕶️, value: 1s
+
+ALTER DATABASE alter_database SET 'ttl'='40s';
+
+Affected Rows: 0
+
+ALTER DATABASE alter_database UNSET 'ttl';
+
+Affected Rows: 0
+
+ALTER DATABASE alter_database UNSET '🕶️';
+
+Error: 1004(InvalidArguments), Invalid unset database option, key: 🕶️
+
+SHOW CREATE DATABASE alter_database;
+
++----------------+----------------------------------------------+
+| Database       | Create Database                              |
++----------------+----------------------------------------------+
+| alter_database | CREATE DATABASE IF NOT EXISTS alter_database |
++----------------+----------------------------------------------+
+
+-- SQLNESS ARG restart=true
+SHOW CREATE DATABASE alter_database;
+
++----------------+----------------------------------------------+
+| Database       | Create Database                              |
++----------------+----------------------------------------------+
+| alter_database | CREATE DATABASE IF NOT EXISTS alter_database |
++----------------+----------------------------------------------+
+
+DROP DATABASE alter_database;
+
+Affected Rows: 0
+
diff --git a/tests/cases/standalone/common/alter/alter_database.sql b/tests/cases/standalone/common/alter/alter_database.sql
new file mode 100644
index 000000000000..f6491a24dd38
--- /dev/null
+++ b/tests/cases/standalone/common/alter/alter_database.sql
@@ -0,0 +1,36 @@
+CREATE DATABASE alter_database;
+
+SHOW CREATE DATABASE alter_database;
+
+ALTER DATABASE alter_database SET 'ttl'='10s';
+
+SHOW CREATE DATABASE alter_database;
+
+ALTER DATABASE alter_database SET 'ttl'='20s';
+
+SHOW CREATE DATABASE alter_database;
+
+-- SQLNESS ARG restart=true
+SHOW CREATE DATABASE alter_database;
+
+ALTER DATABASE alter_database SET 'ttl'='';
+
+SHOW CREATE DATABASE alter_database;
+
+ALTER DATABASE alter_database SET 'ttl'='😁';
+
+ALTER DATABASE alter_database SET '🕶️'='1s';
+
+ALTER DATABASE alter_database SET 'ttl'='40s';
+
+ALTER DATABASE alter_database UNSET 'ttl';
+
+ALTER DATABASE alter_database UNSET '🕶️';
+
+SHOW CREATE DATABASE alter_database;
+
+-- SQLNESS ARG restart=true
+SHOW CREATE DATABASE alter_database;
+
+DROP DATABASE alter_database;
+
diff --git a/tests/cases/standalone/common/alter/alter_table_options.result b/tests/cases/standalone/common/alter/alter_table_options.result
index 0cd05c96212e..a375b7ac78c3 100644
--- a/tests/cases/standalone/common/alter/alter_table_options.result
+++ b/tests/cases/standalone/common/alter/alter_table_options.result
@@ -128,6 +128,14 @@ SHOW CREATE TABLE ato;
 |       | )                                  |
 +-------+------------------------------------+
 
+ALTER TABLE ato SET 'ttl'='😁';
+
+Error: 1004(InvalidArguments), Invalid set table option request: Invalid set region option request, key: ttl, value: 😁
+
+ALTER TABLE ato SET '🕶️'='1s';
+
+Error: 1004(InvalidArguments), Invalid set table option request: Invalid set region option request, key: 🕶️, value: 1s
+
 SELECT i FROM ato;
 
 +---+
@@ -190,6 +198,10 @@ ALTER TABLE ato UNSET 'compaction.twcs.time_window';
 
 Affected Rows: 0
 
+ALTER TABLE ato UNSET '🕶️';
+
+Error: 1004(InvalidArguments), Invalid unset table option request: Invalid set region option request, key: 🕶️
+
 SHOW CREATE TABLE ato;
 
 +-------+----------------------------------------------------+
@@ -214,6 +226,33 @@ SHOW CREATE TABLE ato;
 |       | )                                                  |
 +-------+----------------------------------------------------+
 
+ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='';
+
+Affected Rows: 0
+
+SHOW CREATE TABLE ato;
+
++-------+----------------------------------------------------+
+| Table | Create Table                                       |
++-------+----------------------------------------------------+
+| ato   | CREATE TABLE IF NOT EXISTS "ato" (                 |
+|       |   "i" INT NULL,                                    |
+|       |   "j" TIMESTAMP(3) NOT NULL,                       |
+|       |   TIME INDEX ("j"),                                |
+|       |   PRIMARY KEY ("i")                                |
+|       | )                                                  |
+|       |                                                    |
+|       | ENGINE=mito                                        |
+|       | WITH(                                              |
+|       |   compaction.twcs.max_active_window_files = '2',   |
+|       |   compaction.twcs.max_active_window_runs = '6',    |
+|       |   compaction.twcs.max_inactive_window_files = '2', |
+|       |   compaction.twcs.max_output_file_size = '500MB',  |
+|       |   compaction.type = 'twcs',                        |
+|       |   ttl = '1s'                                       |
+|       | )                                                  |
++-------+----------------------------------------------------+
+
 -- SQLNESS ARG restart=true
 SHOW CREATE TABLE ato;
 
@@ -232,7 +271,6 @@ SHOW CREATE TABLE ato;
 |       |   compaction.twcs.max_active_window_files = '2',   |
 |       |   compaction.twcs.max_active_window_runs = '6',    |
 |       |   compaction.twcs.max_inactive_window_files = '2', |
-|       |   compaction.twcs.max_inactive_window_runs = '6',  |
 |       |   compaction.twcs.max_output_file_size = '500MB',  |
 |       |   compaction.type = 'twcs',                        |
 |       |   ttl = '1s'                                       |
diff --git a/tests/cases/standalone/common/alter/alter_table_options.sql b/tests/cases/standalone/common/alter/alter_table_options.sql
index a6cdb1de6f98..2fcc8a9707ab 100644
--- a/tests/cases/standalone/common/alter/alter_table_options.sql
+++ b/tests/cases/standalone/common/alter/alter_table_options.sql
@@ -26,6 +26,10 @@ ALTER TABLE ato SET 'ttl'='1s';
 
 SHOW CREATE TABLE ato;
 
+ALTER TABLE ato SET 'ttl'='😁';
+
+ALTER TABLE ato SET '🕶️'='1s';
+
 SELECT i FROM ato;
 
 ALTER TABLE ato SET 'compaction.twcs.time_window'='2h';
@@ -44,6 +48,12 @@ SHOW CREATE TABLE ato;
 
 ALTER TABLE ato UNSET 'compaction.twcs.time_window';
 
+ALTER TABLE ato UNSET '🕶️';
+
+SHOW CREATE TABLE ato;
+
+ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='';
+
 SHOW CREATE TABLE ato;
 
 -- SQLNESS ARG restart=true