Skip to content

Commit

Permalink
feat: export data with time range
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jul 18, 2024
1 parent d9e3beb commit d36737f
Showing 1 changed file with 38 additions and 6 deletions.
44 changes: 38 additions & 6 deletions src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,17 @@ pub struct ExportCommand {
#[clap(long, short = 't', value_enum)]
target: ExportTarget,

/// basic authentication for connecting to the server
/// A half-open time range: [start_time, end_time).
/// The start of a time range of time-index column for export data.
#[clap(long)]
start_time: Option<String>,

/// A half-open time range: [start_time, end_time).
/// The end of a time range of time-index column for export data.
#[clap(long)]
end_time: Option<String>,

/// The basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
}
Expand All @@ -102,6 +112,8 @@ impl ExportCommand {
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
target: self.target.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
auth_header,
}),
guard,
Expand All @@ -116,6 +128,8 @@ pub struct Export {
output_dir: String,
parallelism: usize,
target: ExportTarget,
start_time: Option<String>,
end_time: Option<String>,
auth_header: Option<String>,
}

Expand Down Expand Up @@ -170,7 +184,7 @@ impl Export {
};
let mut result = Vec::with_capacity(records.len());
for value in records {
let serde_json::Value::String(schema) = &value[0] else {
let Value::String(schema) = &value[0] else {
unreachable!()
};
if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
Expand Down Expand Up @@ -259,7 +273,7 @@ impl Export {
let Some(records) = result else {
EmptyResultSnafu.fail()?
};
let serde_json::Value::String(create_table) = &records[0][1] else {
let Value::String(create_table) = &records[0][1] else {
unreachable!()
};

Expand Down Expand Up @@ -427,11 +441,29 @@ impl Export {
.await
.context(FileIoSnafu)?;
let output_dir = Path::new(&self.output_dir).join(format!("{catalog}-{schema}/"));

let with_options = match (&self.start_time, &self.end_time) {
(Some(start_time), Some(end_time)) => {
format!(
"WITH (FORMAT='parquet', start_time='{}', end_time='{})",
start_time, end_time
)
}
(Some(start_time), None) => {
format!("WITH (FORMAT='parquet', start_time='{}')", start_time)
}
(None, Some(end_time)) => {
format!("WITH (FORMAT='parquet', end_time='{})", end_time)
}
(None, None) => "WITH (FORMAT='parquet')".to_string(),
};

let sql = format!(
r#"COPY DATABASE "{}"."{}" TO '{}' WITH (FORMAT='parquet');"#,
r#"COPY DATABASE "{}"."{}" TO '{}' {};"#,
catalog,
schema,
output_dir.to_str().unwrap()
output_dir.to_str().unwrap(),
with_options
);

info!("Executing sql: {sql}");
Expand All @@ -457,7 +489,7 @@ impl Export {
.context(FileIoSnafu)?;
writer.flush().await.context(FileIoSnafu)?;

info!("finished exporting {catalog}.{schema} copy_database_from.sql");
info!("Finished exporting {catalog}.{schema} copy_database_from.sql");

Ok::<(), Error>(())
})
Expand Down

0 comments on commit d36737f

Please sign in to comment.