From d9e3beb9f54a123c0eba3613de18bb035159f3f3 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 17 Jul 2024 21:26:00 +0800 Subject: [PATCH 1/4] feat: export database data --- src/cmd/src/cli/export.rs | 73 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 00d916407458..9308821f4b92 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -46,7 +46,10 @@ enum ExportTarget { #[default] CreateTable, /// Corresponding to `EXPORT TABLE` + #[deprecated(note = "Please use `DatabaseData` instead.")] TableData, + /// Corresponding to `EXPORT DATABASE` + DatabaseData, } #[derive(Debug, Default, Parser)] @@ -409,14 +412,84 @@ impl Export { Ok(()) } + + async fn export_database_data(&self) -> Result<()> { + let timer = Instant::now(); + let semaphore = Arc::new(Semaphore::new(self.parallelism)); + let db_names = self.iter_db_names().await?; + let db_count = db_names.len(); + let mut tasks = Vec::with_capacity(db_names.len()); + for (catalog, schema) in db_names { + let semaphore_moved = semaphore.clone(); + tasks.push(async move { + let _permit = semaphore_moved.acquire().await.unwrap(); + tokio::fs::create_dir_all(&self.output_dir) + .await + .context(FileIoSnafu)?; + let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/")); + let sql = format!( + r#"COPY DATABASE "{}"."{}" TO '{}' WITH (FORMAT='parquet');"#, + catalog, + schema, + output_dir.to_str().unwrap() + ); + + info!("Executing sql: {sql}"); + + self.sql(&sql).await?; + + info!("Finished exporting {catalog}.{schema} data"); + + // export copy from sql + let copy_from_file = Path::new(&self.output_dir) + .join(format!("{catalog}-{schema}_copy_database_from.sql")); + let mut writer = + BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?); + let copy_database_from_sql = format!( + r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#, + catalog, + schema, + output_dir.to_str().unwrap() + ); + writer + .write(copy_database_from_sql.as_bytes()) + .await + .context(FileIoSnafu)?; + writer.flush().await.context(FileIoSnafu)?; + + info!("finished exporting {catalog}.{schema} copy_database_from.sql"); + + Ok::<(), Error>(()) + }) + } + + let success = futures::future::join_all(tasks) + .await + .into_iter() + .filter(|r| match r { + Ok(_) => true, + Err(e) => { + error!(e; "export database job failed"); + false + } + }) + .count(); + let elapsed = timer.elapsed(); + + info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed); + + Ok(()) + } } +#[allow(deprecated)] #[async_trait] impl Tool for Export { async fn do_work(&self) -> Result<()> { match self.target { ExportTarget::CreateTable => self.export_create_table().await, ExportTarget::TableData => self.export_table_data().await, + ExportTarget::DatabaseData => self.export_database_data().await, } } } From 35edc7252beaacda99479a98bba30b280ba0d991 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Thu, 18 Jul 2024 12:25:36 +0800 Subject: [PATCH 2/4] feat: export data with time range --- src/cmd/src/cli/export.rs | 44 +++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 9308821f4b92..a91507df94ce 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -78,7 +78,17 @@ pub struct ExportCommand { #[clap(long, short = 't', value_enum)] target: ExportTarget, - /// basic authentication for connecting to the server + /// A half-open time range: [start_time, end_time). + /// The start of the time range (time-index column) for data export. + #[clap(long)] + start_time: Option, + + /// A half-open time range: [start_time, end_time). + /// The end of the time range (time-index column) for data export. + #[clap(long)] + end_time: Option, + + /// The basic authentication for connecting to the server #[clap(long)] auth_basic: Option, } @@ -102,6 +112,8 @@ impl ExportCommand { output_dir: self.output_dir.clone(), parallelism: self.export_jobs, target: self.target.clone(), + start_time: self.start_time.clone(), + end_time: self.end_time.clone(), auth_header, }), guard, @@ -116,6 +128,8 @@ pub struct Export { output_dir: String, parallelism: usize, target: ExportTarget, + start_time: Option, + end_time: Option, auth_header: Option, } @@ -170,7 +184,7 @@ impl Export { }; let mut result = Vec::with_capacity(records.len()); for value in records { - let serde_json::Value::String(schema) = &value[0] else { + let Value::String(schema) = &value[0] else { unreachable!() }; if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME { @@ -259,7 +273,7 @@ impl Export { let Some(records) = result else { EmptyResultSnafu.fail()? }; - let serde_json::Value::String(create_table) = &records[0][1] else { + let Value::String(create_table) = &records[0][1] else { unreachable!() }; @@ -427,11 +441,29 @@ impl Export { .await .context(FileIoSnafu)?; let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/")); + + let with_options = match (&self.start_time, &self.end_time) { + (Some(start_time), Some(end_time)) => { + format!( + "WITH (FORMAT='parquet', start_time='{}', end_time='{}')", + start_time, end_time + ) + } + (Some(start_time), None) => { + format!("WITH (FORMAT='parquet', start_time='{}')", start_time) + } + (None, Some(end_time)) => { + format!("WITH (FORMAT='parquet', end_time='{}')", end_time) + } + (None, None) => "WITH (FORMAT='parquet')".to_string(), + }; + let sql = format!( - r#"COPY DATABASE "{}"."{}" TO '{}' WITH (FORMAT='parquet');"#, + r#"COPY DATABASE "{}"."{}" TO '{}' {};"#, catalog, schema, - output_dir.to_str().unwrap() + output_dir.to_str().unwrap(), + with_options ); info!("Executing sql: {sql}"); @@ -457,7 +489,7 @@ impl Export { .context(FileIoSnafu)?; writer.flush().await.context(FileIoSnafu)?; - info!("finished exporting {catalog}.{schema} copy_database_from.sql"); + info!("Finished exporting {catalog}.{schema} copy_database_from.sql"); Ok::<(), Error>(()) }) From c38cc50a9dad9553c8639d6eb9c6e7d348c17560 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Fri, 19 Jul 2024 15:12:50 +0800 Subject: [PATCH 3/4] feat: refactor the data dir --- src/cmd/src/cli/export.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index a91507df94ce..ac039233cdbd 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -293,11 +293,14 @@ impl Export { let (metric_physical_tables, remaining_tables) = self.get_table_list(&catalog, &schema).await?; let table_count = metric_physical_tables.len() + remaining_tables.len(); - tokio::fs::create_dir_all(&self.output_dir) + let output_dir = Path::new(&self.output_dir) + .join(&catalog) + .join(format!("{schema}/")); + tokio::fs::create_dir_all(&output_dir) .await .context(FileIoSnafu)?; let output_file = - Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql")); + Path::new(&output_dir).join(format!("{schema}_create_tables.sql")); let mut file = File::create(output_file).await.context(FileIoSnafu)?; for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) { match self.show_create_table(&c, &s, &t).await { @@ -437,10 +440,12 @@ impl Export { let semaphore_moved = semaphore.clone(); tasks.push(async move { let _permit = semaphore_moved.acquire().await.unwrap(); - tokio::fs::create_dir_all(&self.output_dir) + let output_dir = Path::new(&self.output_dir) + .join(&catalog) + .join(format!("{schema}/")); + tokio::fs::create_dir_all(&output_dir) .await .context(FileIoSnafu)?; - let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/")); let with_options = match (&self.start_time, &self.end_time) { (Some(start_time), Some(end_time)) => { @@ -472,9 +477,8 @@ impl Export { info!("Finished exporting {catalog}.{schema} data"); - // export copy from sql - let copy_from_file = Path::new(&self.output_dir) - .join(format!("{catalog}-{schema}_copy_database_from.sql")); + // The export copy from sql + let copy_from_file = output_dir.join(format!("{schema}_copy_from.sql")); let mut writer = BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?); let copy_database_from_sql = format!( @@ -489,7 +493,7 @@ impl Export { .context(FileIoSnafu)?; writer.flush().await.context(FileIoSnafu)?; - info!("Finished exporting {catalog}.{schema} copy_database_from.sql"); + info!("Finished exporting {catalog}.{schema} copy_from.sql"); Ok::<(), Error>(()) }) From da0207e9c6588db50efd61506cb240b04b926aca Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Fri, 19 Jul 2024 17:12:59 +0800 Subject: [PATCH 4/4] feat: by comment --- src/cmd/src/cli/export.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index ac039233cdbd..1328dd756014 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -299,8 +299,7 @@ impl Export { tokio::fs::create_dir_all(&output_dir) .await .context(FileIoSnafu)?; - let output_file = - Path::new(&output_dir).join(format!("{schema}_create_tables.sql")); + let output_file = Path::new(&output_dir).join("create_tables.sql"); let mut file = File::create(output_file).await.context(FileIoSnafu)?; for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) { match self.show_create_table(&c, &s, &t).await { @@ -314,7 +313,12 @@ impl Export { } } } - info!("finished exporting {catalog}.{schema} with {table_count} tables",); + + info!( + "Finished exporting {catalog}.{schema} with {table_count} table schemas to path: {}", + output_dir.to_string_lossy() + ); + Ok::<(), Error>(()) }); } @@ -475,10 +479,13 @@ impl Export { self.sql(&sql).await?; - info!("Finished exporting {catalog}.{schema} data"); + info!( + "Finished exporting {catalog}.{schema} data into path: {}", + output_dir.to_string_lossy() + ); // The export copy from sql - let copy_from_file = output_dir.join(format!("{schema}_copy_from.sql")); + let copy_from_file = output_dir.join("copy_from.sql"); let mut writer = BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?); let copy_database_from_sql = format!( @@ -605,7 +612,9 @@ mod tests { let output_file = output_dir .path() - .join("greptime-cli.export.create_table.sql"); + .join("greptime") + .join("cli.export.create_table") + .join("create_tables.sql"); let res = std::fs::read_to_string(output_file).unwrap(); let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" ( "ts" TIMESTAMP(3) NOT NULL,