diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 00d916407458..1328dd756014 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -46,7 +46,10 @@ enum ExportTarget { #[default] CreateTable, /// Corresponding to `EXPORT TABLE` + #[deprecated(note = "Please use `DatabaseData` instead.")] TableData, + /// Corresponding to `EXPORT DATABASE` + DatabaseData, } #[derive(Debug, Default, Parser)] @@ -75,7 +78,17 @@ pub struct ExportCommand { #[clap(long, short = 't', value_enum)] target: ExportTarget, - /// basic authentication for connecting to the server + /// A half-open time range: [start_time, end_time). + /// The start of the time range (time-index column) for data export. + #[clap(long)] + start_time: Option, + + /// A half-open time range: [start_time, end_time). + /// The end of the time range (time-index column) for data export. + #[clap(long)] + end_time: Option, + + /// The basic authentication for connecting to the server #[clap(long)] auth_basic: Option, } @@ -99,6 +112,8 @@ impl ExportCommand { output_dir: self.output_dir.clone(), parallelism: self.export_jobs, target: self.target.clone(), + start_time: self.start_time.clone(), + end_time: self.end_time.clone(), auth_header, }), guard, @@ -113,6 +128,8 @@ pub struct Export { output_dir: String, parallelism: usize, target: ExportTarget, + start_time: Option, + end_time: Option, auth_header: Option, } @@ -167,7 +184,7 @@ impl Export { }; let mut result = Vec::with_capacity(records.len()); for value in records { - let serde_json::Value::String(schema) = &value[0] else { + let Value::String(schema) = &value[0] else { unreachable!() }; if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME { @@ -256,7 +273,7 @@ impl Export { let Some(records) = result else { EmptyResultSnafu.fail()? }; - let serde_json::Value::String(create_table) = &records[0][1] else { + let Value::String(create_table) = &records[0][1] else { unreachable!() }; @@ -276,11 +293,13 @@ impl Export { let (metric_physical_tables, remaining_tables) = self.get_table_list(&catalog, &schema).await?; let table_count = metric_physical_tables.len() + remaining_tables.len(); - tokio::fs::create_dir_all(&self.output_dir) + let output_dir = Path::new(&self.output_dir) + .join(&catalog) + .join(format!("{schema}/")); + tokio::fs::create_dir_all(&output_dir) .await .context(FileIoSnafu)?; - let output_file = - Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql")); + let output_file = Path::new(&output_dir).join("create_tables.sql"); let mut file = File::create(output_file).await.context(FileIoSnafu)?; for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) { match self.show_create_table(&c, &s, &t).await { @@ -294,7 +313,12 @@ impl Export { } } } - info!("finished exporting {catalog}.{schema} with {table_count} tables",); + + info!( + "Finished exporting {catalog}.{schema} with {table_count} table schemas to path: {}", + output_dir.to_string_lossy() + ); + Ok::<(), Error>(()) }); } @@ -409,14 +433,106 @@ impl Export { Ok(()) } + + async fn export_database_data(&self) -> Result<()> { + let timer = Instant::now(); + let semaphore = Arc::new(Semaphore::new(self.parallelism)); + let db_names = self.iter_db_names().await?; + let db_count = db_names.len(); + let mut tasks = Vec::with_capacity(db_names.len()); + for (catalog, schema) in db_names { + let semaphore_moved = semaphore.clone(); + tasks.push(async move { + let _permit = semaphore_moved.acquire().await.unwrap(); + let output_dir = Path::new(&self.output_dir) + .join(&catalog) + .join(format!("{schema}/")); + tokio::fs::create_dir_all(&output_dir) + .await + .context(FileIoSnafu)?; + + let with_options = match (&self.start_time, &self.end_time) { + (Some(start_time), Some(end_time)) => { + format!( + "WITH (FORMAT='parquet', start_time='{}', end_time='{}')", + start_time, end_time + ) + } + (Some(start_time), None) => { + format!("WITH (FORMAT='parquet', start_time='{}')", start_time) + } + (None, Some(end_time)) => { + format!("WITH (FORMAT='parquet', end_time='{}')", end_time) + } + (None, None) => "WITH (FORMAT='parquet')".to_string(), + }; + + let sql = format!( + r#"COPY DATABASE "{}"."{}" TO '{}' {};"#, + catalog, + schema, + output_dir.to_str().unwrap(), + with_options + ); + + info!("Executing sql: {sql}"); + + self.sql(&sql).await?; + + info!( + "Finished exporting {catalog}.{schema} data into path: {}", + output_dir.to_string_lossy() + ); + + // The export copy from sql + let copy_from_file = output_dir.join("copy_from.sql"); + let mut writer = + BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?); + let copy_database_from_sql = format!( + r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#, + catalog, + schema, + output_dir.to_str().unwrap() + ); + writer + .write(copy_database_from_sql.as_bytes()) + .await + .context(FileIoSnafu)?; + writer.flush().await.context(FileIoSnafu)?; + + info!("Finished exporting {catalog}.{schema} copy_from.sql"); + + Ok::<(), Error>(()) + }) + } + + let success = futures::future::join_all(tasks) + .await + .into_iter() + .filter(|r| match r { + Ok(_) => true, + Err(e) => { + error!(e; "export database job failed"); + false + } + }) + .count(); + let elapsed = timer.elapsed(); + + info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed); + + Ok(()) + } } +#[allow(deprecated)] #[async_trait] impl Tool for Export { async fn do_work(&self) -> Result<()> { match self.target { ExportTarget::CreateTable => self.export_create_table().await, ExportTarget::TableData => self.export_table_data().await, + ExportTarget::DatabaseData => self.export_database_data().await, } } } @@ -496,7 +612,9 @@ mod tests { let output_file = output_dir .path() - .join("greptime-cli.export.create_table.sql"); + .join("greptime") + .join("cli.export.create_table") + .join("create_tables.sql"); let res = std::fs::read_to_string(output_file).unwrap(); let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" ( "ts" TIMESTAMP(3) NOT NULL,