Skip to content

Commit

Permalink
feat: import create database
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Aug 30, 2024
1 parent 7f9c58b commit b08bc00
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 25 deletions.
38 changes: 18 additions & 20 deletions src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -120,6 +120,10 @@ pub struct Export {
}

impl Export {
fn catalog_path(&self) -> PathBuf {
PathBuf::from(&self.output_dir).join(&self.catalog)
}

async fn get_db_names(&self) -> Result<Vec<String>> {
let db_names = self.all_db_names().await?;
let Some(schema) = &self.schema else {
Expand Down Expand Up @@ -269,13 +273,11 @@ impl Export {
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
for schema in db_names {
let output_dir = Path::new(&self.output_dir)
.join(&self.catalog)
.join(format!("{schema}/"));
tokio::fs::create_dir_all(&output_dir)
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;
let file = Path::new(&output_dir).join("create_database.sql");
let file = db_dir.join("create_database.sql");
let mut file = File::create(file).await.context(FileIoSnafu)?;
match self
.show_create("DATABASE", &self.catalog, &schema, None)
Expand Down Expand Up @@ -312,13 +314,11 @@ impl Export {
self.get_table_list(&self.catalog, &schema).await?;
let table_count =
metric_physical_tables.len() + remaining_tables.len() + views.len();
let output_dir = Path::new(&self.output_dir)
.join(&self.catalog)
.join(format!("{schema}/"));
tokio::fs::create_dir_all(&output_dir)
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;
let file = Path::new(&output_dir).join("create_tables.sql");
let file = db_dir.join("create_tables.sql");
let mut file = File::create(file).await.context(FileIoSnafu)?;
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
match self.show_create("TABLE", &c, &s, Some(&t)).await {
Expand Down Expand Up @@ -348,7 +348,7 @@ impl Export {
info!(
"Finished exporting {}.{schema} with {table_count} table schemas to path: {}",
self.catalog,
output_dir.to_string_lossy()
db_dir.to_string_lossy()
);

Ok::<(), Error>(())
Expand Down Expand Up @@ -383,10 +383,8 @@ impl Export {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let output_dir = Path::new(&self.output_dir)
.join(&self.catalog)
.join(format!("{schema}/"));
tokio::fs::create_dir_all(&output_dir)
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;

Expand All @@ -410,7 +408,7 @@ impl Export {
r#"COPY DATABASE "{}"."{}" TO '{}' {};"#,
self.catalog,
schema,
output_dir.to_str().unwrap(),
db_dir.to_str().unwrap(),
with_options
);

Expand All @@ -421,18 +419,18 @@ impl Export {
info!(
"Finished exporting {}.{schema} data into path: {}",
self.catalog,
output_dir.to_string_lossy()
db_dir.to_string_lossy()
);

// The export copy from sql
let copy_from_file = output_dir.join("copy_from.sql");
let copy_from_file = db_dir.join("copy_from.sql");
let mut writer =
BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?);
let copy_database_from_sql = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#,
self.catalog,
schema,
output_dir.to_str().unwrap()
db_dir.to_str().unwrap()
);
writer
.write(copy_database_from_sql.as_bytes())
Expand Down
14 changes: 9 additions & 5 deletions src/cmd/src/cli/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use snafu::ResultExt;
use tokio::sync::Semaphore;
use tokio::time::Instant;

Check warning on line 23 in src/cmd/src/cli/import.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/greptimedb/greptimedb/src/cmd/src/cli/import.rs
use tracing_appender::non_blocking::WorkerGuard;

use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use crate::cli::database::DatabaseClient;
use crate::cli::{database, Instance, Tool};
use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
Expand Down Expand Up @@ -100,14 +100,17 @@ pub struct Import {

impl Import {
async fn import_create_table(&self) -> Result<()> {
self.do_sql_job("create_tables.sql").await
// Use default db to creates other dbs
self.do_sql_job("create_database.sql", Some(DEFAULT_SCHEMA_NAME))
.await?;
self.do_sql_job("create_tables.sql", None).await
}

async fn import_database_data(&self) -> Result<()> {
self.do_sql_job("copy_from.sql").await
self.do_sql_job("copy_from.sql", None).await
}

async fn do_sql_job(&self, filename: &str) -> Result<()> {
async fn do_sql_job(&self, filename: &str, exec_db: Option<&str>) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.get_db_names().await?;
Expand All @@ -125,7 +128,8 @@ impl Import {
if sql.is_empty() {
info!("Empty `{filename}` {database_input_dir:?}");
} else {
self.database_client.sql(&sql, &schema).await?;
let db = exec_db.unwrap_or(&schema);
self.database_client.sql(&sql, db).await?;
info!("Imported `{filename}` for database {schema}");
}

Expand Down
6 changes: 6 additions & 0 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use aide::transform::TransformOperation;
use axum::extract::{Json, Query, State};
use axum::response::{IntoResponse, Response};
use axum::{Extension, Form};
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_plugins::GREPTIME_EXEC_WRITE_COST;
Expand Down Expand Up @@ -76,6 +77,11 @@ pub async fn sql(
) -> HttpResponse {
let start = Instant::now();
let sql_handler = &state.sql_handler;
if let Some(db) = &query_params.db.or(form_params.db) {
let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
query_ctx.set_current_catalog(&catalog);
query_ctx.set_current_schema(&schema);
}
let db = query_ctx.get_db_string();

query_ctx.set_channel(Channel::Http);
Expand Down

0 comments on commit b08bc00

Please sign in to comment.