Skip to content

Commit

Permalink
chore: add export data to migrate tool (#2610)
Browse files Browse the repository at this point in the history
* chore: add export data to migrate tool

* chore: export copy from sql too
  • Loading branch information
shuiyisong authored Oct 17, 2023
1 parent 829db8c commit eccad64
Showing 1 changed file with 86 additions and 2 deletions.
88 changes: 86 additions & 2 deletions src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use clap::{Parser, ValueEnum};
use client::{Client, Database, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use common_recordbatch::util::collect;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, warn};
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{StringVector, Vector};
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -284,14 +284,98 @@ impl Export {

Ok(())
}

async fn export_table_data(&self) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.iter_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_names.len());
for (catalog, schema) in db_names {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
tokio::fs::create_dir_all(&self.output_dir)
.await
.context(FileIoSnafu)?;
let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/"));

let mut client = self.client.clone();
client.set_catalog(catalog.clone());
client.set_schema(schema.clone());

// copy database to
let sql = format!(
"copy database {} to '{}' with (format='parquet');",
schema,
output_dir.to_str().unwrap()
);
client
.sql(sql.clone())
.await
.context(RequestDatabaseSnafu { sql })?;
info!("finished exporting {catalog}.{schema} data");

// export copy from sql
let dir_filenames = match output_dir.read_dir() {
Ok(dir) => dir,
Err(_) => {
warn!("empty database {catalog}.{schema}");
return Ok(());
}
};

let copy_from_file =
Path::new(&self.output_dir).join(format!("{catalog}-{schema}_copy_from.sql"));
let mut file = File::create(copy_from_file).await.context(FileIoSnafu)?;

let copy_from_sql = dir_filenames
.into_iter()
.map(|file| {
let file = file.unwrap();
let filename = file.file_name().into_string().unwrap();

format!(
"copy {} from '{}' with (format='parquet');\n",
filename.replace(".parquet", ""),
file.path().to_str().unwrap()
)
})
.collect::<Vec<_>>()
.join("");
file.write_all(copy_from_sql.as_bytes())
.await
.context(FileIoSnafu)?;

info!("finished exporting {catalog}.{schema} copy_from.sql");

Ok::<(), Error>(())
});
}

let success = futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export job failed");
false
}
})
.count();

info!("success {success}/{db_count} jobs");

Ok(())
}
}

#[async_trait]
impl Tool for Export {
async fn do_work(&self) -> Result<()> {
match self.target {
ExportTarget::CreateTable => self.export_create_table().await,
ExportTarget::TableData => unimplemented!("export table data"),
ExportTarget::TableData => self.export_table_data().await,
}
}
}
Expand Down

0 comments on commit eccad64

Please sign in to comment.