Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang24 committed Jul 10, 2024
1 parent b5c6c72 commit 69cd5f3
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod table_constraints;
mod table_names;
pub mod tables;
pub(crate) mod utils;
mod views;

use std::collections::HashMap;
use std::sync::{Arc, Weak};
Expand All @@ -46,6 +47,7 @@ use table::metadata::{
};
use table::{Table, TableRef};
pub use table_names::*;
use views::InformationSchemaViews;

use self::columns::InformationSchemaColumns;
use crate::error::Result;
Expand Down Expand Up @@ -262,6 +264,10 @@ impl InformationSchemaProvider {
CLUSTER_INFO => Some(Arc::new(InformationSchemaClusterInfo::new(
self.catalog_manager.clone(),
)) as _),
VIEWS => Some(Arc::new(InformationSchemaViews::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
_ => None,
}
}
Expand Down
1 change: 1 addition & 0 deletions src/catalog/src/information_schema/table_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ pub const PARTITIONS: &str = "partitions";
pub const REGION_PEERS: &str = "region_peers";
pub const TABLE_CONSTRAINTS: &str = "table_constraints";
pub const CLUSTER_INFO: &str = "cluster_info";
pub const VIEWS: &str = "views";
289 changes: 289 additions & 0 deletions src/catalog/src/information_schema/views.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_VIEW_TABLE_ID;
use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{
DateTimeVectorBuilder, StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
};
use futures::TryStreamExt;
use crate::error::Error::ViewInfoNotFound;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};

use super::{utils, VIEWS};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, TableMetadataManagerSnafu, UpgradeWeakCatalogManagerRefSnafu, ViewInfoNotFoundSnafu
};
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;
const INIT_CAPACITY: usize = 42;


pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const VIEW_DEFINITION: &str = "view_definition";
pub const CHECK_OPTION: &str = "check_option";
pub const IS_UPDATABLE: &str = "is_updateble";
pub const DEFINER: &str = "definer";
pub const SECURITY_TYPE: &str = "security_type";
pub const CHARACTER_SET_CLIENT: &str = "character_set_client";
pub const COLLATION_CONNECTION: &str = "collation_connection";


pub(super) struct InformationSchemaViews {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
}

impl InformationSchemaViews {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
}
}

pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(VIEW_DEFINITION, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(CHECK_OPTION, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(IS_UPDATABLE, ConcreteDataType::boolean_datatype(), true),
ColumnSchema::new(DEFINER, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(SECURITY_TYPE, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(CHARACTER_SET_CLIENT, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(COLLATION_CONNECTION, ConcreteDataType::string_datatype(), true),
]))
}

fn builder(&self) -> InformationSchemaViewsBuilder {
InformationSchemaViewsBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
)
}
}

impl InformationTable for InformationSchemaViews {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_VIEW_TABLE_ID
}

fn table_name(&self) -> &'static str {
VIEWS
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_views(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(|err| {
datafusion::error::DataFusionError::External(format!("{err:?}").into())
})
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}

/// Builds the `information_schema.VIEWS` table row by row
///
/// Columns are based on <https://dev.mysql.com/doc/refman/8.4/en/information-schema-views-table.html>
struct InformationSchemaViewsBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,

catalog_names: StringVectorBuilder,
schema_names: StringVectorBuilder,
table_names: StringVectorBuilder,
table_types: StringVectorBuilder,
table_ids: UInt32VectorBuilder,
version: UInt64VectorBuilder,
row_format: StringVectorBuilder,
table_rows: UInt64VectorBuilder,
data_length: UInt64VectorBuilder,
max_data_length: UInt64VectorBuilder,
index_length: UInt64VectorBuilder,
avg_row_length: UInt64VectorBuilder,
max_index_length: UInt64VectorBuilder,
data_free: UInt64VectorBuilder,
auto_increment: UInt64VectorBuilder,
create_time: DateTimeVectorBuilder,
update_time: DateTimeVectorBuilder,
check_time: DateTimeVectorBuilder,
table_collation: StringVectorBuilder,
checksum: UInt64VectorBuilder,
create_options: StringVectorBuilder,
table_comment: StringVectorBuilder,
engines: StringVectorBuilder,
temporary: StringVectorBuilder,
}

impl InformationSchemaViewsBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
max_data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
avg_row_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
version: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
row_format: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
create_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
update_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
check_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_comment: StringVectorBuilder::with_capacity(INIT_CAPACITY),
temporary: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}

/// Construct the `information_schema.views` virtual table
async fn make_views(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let table_metadata_manager = utils::table_meta_manager(&self.catalog_manager)?;
let predicates = Predicates::from_scan_request(&request);

if let Some(table_metadata_manager) = &table_metadata_manager {
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name);

while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
if table_info.table_type == TableType::View {
// let view_stmt = sql::query::create_table_stmt(&table_info, '"')?;
}
}
};



self.finish()
}

fn add_view(
&mut self,
) {

}


fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.catalog_names.finish()),
Arc::new(self.schema_names.finish()),
Arc::new(self.table_names.finish()),
Arc::new(self.table_types.finish()),
Arc::new(self.table_ids.finish()),
Arc::new(self.data_length.finish()),
Arc::new(self.max_data_length.finish()),
Arc::new(self.index_length.finish()),
Arc::new(self.max_index_length.finish()),
Arc::new(self.avg_row_length.finish()),
Arc::new(self.engines.finish()),
Arc::new(self.version.finish()),
Arc::new(self.row_format.finish()),
Arc::new(self.table_rows.finish()),
Arc::new(self.data_free.finish()),
Arc::new(self.auto_increment.finish()),
Arc::new(self.create_time.finish()),
Arc::new(self.update_time.finish()),
Arc::new(self.check_time.finish()),
Arc::new(self.table_collation.finish()),
Arc::new(self.checksum.finish()),
Arc::new(self.create_options.finish()),
Arc::new(self.table_comment.finish()),
Arc::new(self.temporary.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}

impl DfPartitionStream for InformationSchemaViews {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}

fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_views(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}
2 changes: 2 additions & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ pub const INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID: u32 = 29;
pub const INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID: u32 = 30;
/// id for information_schema.cluster_info
pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31;
/// id for information_schema.VIEWS
pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32;
/// ----- End of information_schema tables -----
pub const MITO_ENGINE: &str = "mito";
Expand Down

0 comments on commit 69cd5f3

Please sign in to comment.