Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various fixes for CLI output #2262

Merged
merged 6 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions dozer-cli/src/cli/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use dozer_tracing::LabelsAndProgress;
use dozer_types::models::config::default_cache_max_map_size;
use dozer_types::prettytable::{row, Table};
use dozer_types::serde_json;
use dozer_types::tracing::info;
use dozer_types::{models::config::Config, serde_yaml};
use handlebars::Handlebars;
use std::collections::BTreeMap;
use std::io::{self, Read};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::runtime::Runtime;

Expand All @@ -24,8 +24,8 @@ pub async fn init_config(
config_token: Option<String>,
config_overrides: Vec<(String, serde_json::Value)>,
ignore_pipe: bool,
) -> Result<Config, CliError> {
let mut config = load_config(config_paths, config_token, ignore_pipe).await?;
) -> Result<(Config, Vec<String>), CliError> {
let (mut config, loaded_files) = load_config(config_paths, config_token, ignore_pipe).await?;

config = apply_overrides(&config, config_overrides)?;

Expand All @@ -35,7 +35,7 @@ pub async fn init_config(
let page_size = page_size::get() as u64;
config.cache_max_map_size = Some(cache_max_map_size / page_size * page_size);

Ok(config)
Ok((config, loaded_files))
}

pub fn get_base_dir() -> Result<Utf8PathBuf, CliError> {
Expand All @@ -61,8 +61,10 @@ pub async fn list_sources(
ignore_pipe: bool,
filter: Option<String>,
) -> Result<(), OrchestrationError> {
let config = init_config(config_paths, config_token, config_overrides, ignore_pipe).await?;
let (config, loaded_files) =
init_config(config_paths, config_token, config_overrides, ignore_pipe).await?;
let dozer = init_dozer(runtime, config, Default::default())?;
info!("Loaded config from: {}", loaded_files.join(", "));
let connection_map = dozer.list_connectors().await?;
let mut table_parent = Table::new();
for (connection_name, (tables, schemas)) in connection_map {
Expand Down Expand Up @@ -99,14 +101,17 @@ async fn load_config(
config_url_or_paths: Vec<String>,
config_token: Option<String>,
ignore_pipe: bool,
) -> Result<Config, CliError> {
) -> Result<(Config, Vec<String>), CliError> {
let read_stdin = atty::isnt(Stream::Stdin) && !ignore_pipe;
let first_config_path = config_url_or_paths.get(0);
match first_config_path {
None => Err(ConfigurationFilePathNotProvided),
Some(path) => {
if path.starts_with("https://") || path.starts_with("http://") {
load_config_from_http_url(path, config_token).await
Ok((
load_config_from_http_url(path, config_token).await?,
vec![path.to_owned()],
))
} else {
load_config_from_file(config_url_or_paths, read_stdin)
}
Expand All @@ -131,21 +136,27 @@ async fn load_config_from_http_url(
pub fn load_config_from_file(
config_path: Vec<String>,
read_stdin: bool,
) -> Result<Config, CliError> {
let stdin_path = PathBuf::from("<stdin>");
) -> Result<(Config, Vec<String>), CliError> {
let stdin_path = "<stdin>";
let input = if read_stdin {
let mut input = String::new();
io::stdin()
.read_to_string(&mut input)
.map_err(|e| CannotReadConfig(stdin_path, e))?;
.map_err(|e| CannotReadConfig(stdin_path.into(), e))?;
Some(input)
} else {
None
};

let config_template = combine_config(config_path.clone(), input)?;
let mut loaded_files = Vec::new();
if input.is_some() {
loaded_files.push(stdin_path.to_owned());
}

let (config_template, files) = combine_config(config_path.clone(), input)?;
loaded_files.extend_from_slice(&files);
match config_template {
Some(template) => parse_config(&template),
Some(template) => Ok((parse_config(&template)?, loaded_files)),
None => Err(FailedToFindConfigurationFiles(config_path.join(", "))),
}
}
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/cli/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub fn generate_config_repl() -> Result<(), OrchestrationError> {
}),
),
(
format!("question: Home directory ({:}): ", default_home_dir()),
format!("question: Data directory ({:}): ", default_home_dir()),
Box::new(move |(home_dir, config)| {
if home_dir.is_empty() {
config.home_dir = Some(default_home_dir());
Expand Down
12 changes: 6 additions & 6 deletions dozer-cli/src/config_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use glob::glob;
pub fn combine_config(
config_paths: Vec<String>,
stdin_yaml: Option<String>,
) -> Result<Option<String>, ConfigCombineError> {
) -> Result<(Option<String>, Vec<String>), ConfigCombineError> {
let mut combined_yaml = serde_yaml::Value::Mapping(Mapping::new());

let mut loaded_files = Vec::new();
let mut config_found = false;
for pattern in config_paths {
let files_glob = glob(&pattern).map_err(WrongPatternOfConfigFilesGlob)?;
Expand All @@ -32,8 +33,8 @@ pub fn combine_config(
if name.contains(".yml") || name.contains(".yaml") {
config_found = true;
}

add_file_content_to_config(&mut combined_yaml, name, content)?;
loaded_files.push(name.to_owned());
}
}
}
Expand All @@ -48,11 +49,10 @@ pub fn combine_config(

if config_found {
// `serde_yaml::from_value` will return deserialization error, not sure why.
serde_yaml::to_string(&combined_yaml)
.map_err(CannotSerializeToString)
.map(Some)
let string = serde_yaml::to_string(&combined_yaml).map_err(CannotSerializeToString)?;
Ok((Some(string), loaded_files))
} else {
Ok(None)
Ok((None, vec![]))
}
}

Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/live/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl LiveState {

let cli = Cli::parse();

let config = init_config(
let (config, _) = init_config(
cli.config_paths.clone(),
cli.config_token.clone(),
cli.config_overrides.clone(),
Expand Down
164 changes: 25 additions & 139 deletions dozer-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,20 @@ use dozer_api::shutdown;
use dozer_cli::cli::cloud::CloudCommands;
use dozer_cli::cli::types::{Cli, Commands, ConnectorCommand, RunCommands, SecurityCommands};
use dozer_cli::cli::{generate_config_repl, init_config};
use dozer_cli::cli::{init_dozer, list_sources, LOGO};
use dozer_cli::cli::{init_dozer, list_sources};
use dozer_cli::cloud::{cloud_app_context::CloudAppContext, CloudClient, DozerGrpcCloudClient};
use dozer_cli::errors::{CliError, CloudError, OrchestrationError};
use dozer_cli::{live, set_ctrl_handler, set_panic_hook};
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::config::Config;
use dozer_types::models::telemetry::{TelemetryConfig, TelemetryMetricsConfig};
use dozer_types::serde::Deserialize;
use dozer_types::tracing::{error, error_span, info};
use futures::stream::{AbortHandle, Abortable};
use std::cmp::Ordering;
use std::convert::identity;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::time;

use dozer_types::log::{debug, warn};
use std::time::Duration;
use std::{env, process};
use std::process;

fn main() {
if let Err(e) = run() {
Expand All @@ -30,94 +25,6 @@ fn main() {
}
}

fn render_logo() {
const VERSION: &str = env!("CARGO_PKG_VERSION");

println!("{LOGO}");
println!("\nDozer Version: {VERSION}\n");
}

#[derive(Deserialize, Debug)]
#[serde(crate = "dozer_types::serde")]
struct DozerPackage {
#[serde(rename(deserialize = "latestVersion"))]
pub latest_version: String,
#[serde(rename(deserialize = "availableAssets"))]
pub _available_assets: Vec<String>,
pub link: String,
}

fn version_to_vector(version: &str) -> Vec<i32> {
version.split('.').map(|s| s.parse().unwrap()).collect()
}

fn compare_versions(v1: Vec<i32>, v2: Vec<i32>) -> bool {
for i in 0..v1.len() {
match v1.get(i).cmp(&v2.get(i)) {
Ordering::Greater => return true,
Ordering::Less => return false,
Ordering::Equal => continue,
}
}
false
}

async fn check_update() {
const VERSION: &str = env!("CARGO_PKG_VERSION");
let dozer_env = std::env::var("DOZER_ENV").unwrap_or("local".to_string());
let dozer_dev = std::env::var("DOZER_DEV").unwrap_or("ext".to_string());
let query = vec![
("version", VERSION),
("build", std::env::consts::ARCH),
("os", std::env::consts::OS),
("env", &dozer_env),
("dev", &dozer_dev),
];

let request_url = "https://metadata.dev.getdozer.io/";

let client = reqwest::Client::new();

let mut printed = false;

loop {
let response = client
.get(&request_url.to_string())
.query(&query)
.send()
.await;

match response {
Ok(r) => {
if !printed {
let package: DozerPackage = r.json().await.unwrap();
let current = version_to_vector(VERSION);
let remote = version_to_vector(&package.latest_version);

if compare_versions(remote, current) {
info!("A new version of Dozer is available.");
info!(
"You can download v{}, from {}.",
package.latest_version, package.link
);
printed = true;
}
}
}
Err(e) => {
// We dont show error if error is connection error, because mostly it happens
// when main thread is shutting down before request completes.
if !e.is_connect() {
warn!("Unable to fetch the latest metadata");
}

debug!("Updates check error: {}", e);
}
}
time::sleep(Duration::from_secs(2 * 60 * 60)).await;
}
}

fn run() -> Result<(), OrchestrationError> {
// Reloading trace layer seems impossible, so we are running Cli::parse in a closure
// and then initializing it after reading the configuration. This is a hacky workaround, but it works.
Expand All @@ -136,7 +43,7 @@ fn run() -> Result<(), OrchestrationError> {
// Now we have access to telemetry configuration. Telemetry must be initialized in tokio runtime.
let app_id = config_res
.as_ref()
.map(|c| c.cloud.app_id.as_deref().unwrap_or(&c.app_name))
.map(|(c, _)| c.cloud.app_id.as_deref().unwrap_or(&c.app_name))
.ok();

// We always enable telemetry when running live.
Expand All @@ -148,7 +55,7 @@ fn run() -> Result<(), OrchestrationError> {
} else {
config_res
.as_ref()
.map(|c| c.telemetry.clone())
.map(|(c, _)| c.telemetry.clone())
.unwrap_or_default()
};

Expand All @@ -158,7 +65,8 @@ fn run() -> Result<(), OrchestrationError> {
if let Commands::Cloud(cloud) = &cli.cmd {
return run_cloud(cloud, runtime, &cli);
}
let config = config_res?;
let (config, config_files) = config_res?;
info!("Loaded config from: {}", config_files.join(", "));

let dozer = init_dozer(
runtime.clone(),
Expand All @@ -170,26 +78,16 @@ fn run() -> Result<(), OrchestrationError> {
// run individual servers
(match cli.cmd {
Commands::Run(run) => match run.command {
Some(RunCommands::Api) => {
render_logo();
dozer.runtime.block_on(dozer.run_api(shutdown_receiver))
}
Some(RunCommands::App) => {
render_logo();
dozer
.runtime
.block_on(dozer.run_apps(shutdown_receiver, None))
}
Some(RunCommands::Api) => dozer.runtime.block_on(dozer.run_api(shutdown_receiver)),
Some(RunCommands::App) => dozer
.runtime
.block_on(dozer.run_apps(shutdown_receiver, None)),
Some(RunCommands::Lambda) => {
render_logo();
dozer.runtime.block_on(dozer.run_lambda(shutdown_receiver))
}
None => {
render_logo();
dozer
.runtime
.block_on(dozer.run_all(shutdown_receiver, run.locked))
}
None => dozer
.runtime
.block_on(dozer.run_all(shutdown_receiver, run.locked)),
},
Commands::Security(security) => match security.command {
SecurityCommands::GenerateToken => {
Expand Down Expand Up @@ -235,7 +133,6 @@ fn run() -> Result<(), OrchestrationError> {
panic!("This should not happen as it is handled in parse_and_generate");
}
Commands::Live(live_flags) => {
render_logo();
dozer.runtime.block_on(live::start_live_server(
&dozer.runtime,
shutdown_receiver,
Expand All @@ -256,10 +153,11 @@ fn run_cloud(
runtime: Arc<Runtime>,
cli: &Cli,
) -> Result<(), OrchestrationError> {
render_logo();
let cloud = cloud.clone();

let config = init_configuration(cli, runtime.clone()).ok();
let config = init_configuration(cli, runtime.clone())
.ok()
.map(|(config, _)| config);
let mut cloud_client = CloudClient::new(cloud.clone(), config.clone(), runtime.clone());
match cloud.command.clone() {
CloudCommands::Deploy(deploy) => cloud_client.deploy(deploy, cli.config_paths.clone()),
Expand Down Expand Up @@ -312,27 +210,15 @@ fn parse_and_generate() -> Result<Cli, OrchestrationError> {
)
}

fn init_configuration(cli: &Cli, runtime: Arc<Runtime>) -> Result<Config, CliError> {
dozer_tracing::init_telemetry_closure(
None,
&Default::default(),
|| -> Result<Config, CliError> {
let res = runtime.block_on(init_config(
cli.config_paths.clone(),
cli.config_token.clone(),
cli.config_overrides.clone(),
cli.ignore_pipe,
));

match res {
Ok(config) => {
runtime.spawn(check_update());
Ok(config)
}
Err(e) => Err(e),
}
},
)
fn init_configuration(cli: &Cli, runtime: Arc<Runtime>) -> Result<(Config, Vec<String>), CliError> {
dozer_tracing::init_telemetry_closure(None, &Default::default(), || -> Result<_, CliError> {
runtime.block_on(init_config(
cli.config_paths.clone(),
cli.config_token.clone(),
cli.config_overrides.clone(),
cli.ignore_pipe,
))
})
}

fn display_error(e: &OrchestrationError) {
Expand Down
Loading
Loading