Skip to content

Commit

Permalink
Merge pull request #5 from idaholab/feature/client
Browse files Browse the repository at this point in the history
Reinvigorate Client
  • Loading branch information
DnOberon authored Aug 20, 2024
2 parents f293897 + e46f22b commit f3f5c68
Show file tree
Hide file tree
Showing 19 changed files with 1,209 additions and 66 deletions.
765 changes: 722 additions & 43 deletions client/Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ thiserror = "1.0.50"
tokio = { version = "1.34.0", features = ["full"] }
serde = { version = "1.0.192", features = ["derive"] }
serde_yaml = "0.9.27"
tray-icon = "0.14.3"
tray-icon = "0.15.1"
image = { version = "0.25.1", features = [] }
winit = "0.30.3"
chrono = { version = "0.4.31", features = ["serde"] }
rust-embed = "8.0.0"
handlebars = { version = "6.0.0", features = ["rust-embed"] }
hyper = "1.4.1"
serde_json = "1.0.108"
Expand All @@ -25,8 +24,11 @@ futures = "0.3.29"
futures-util = "0.3.29"
tracing = "0.1"
tracing-subscriber = "0.3.0"
object_store = { version = "0.10.2" , features = ["aws", "azure", "gcp"]}
object_store = { version = "0.11.0", features = ["aws", "azure", "gcp"]}
rocksdb = { version = "0.22.0", features = ["serde"] }
notify-rust = "4.11.1"
rust-embed = "8.5.0"
open = "5.3.0"

[dependencies.uuid]
version = "1.10.0"
Expand Down
File renamed without changes
1 change: 1 addition & 0 deletions client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub struct ClientConfiguration {
pub hardware_id: Option<Uuid>,
pub ingest_server: Option<String>,
pub token: Option<String>,
pub token_expires_at: Option<chrono::NaiveDate>,
}

pub fn get_configuration() -> Result<ClientConfiguration, ClientError> {
Expand Down
9 changes: 8 additions & 1 deletion client/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn make_connection(semaphore: Arc<Mutex<Connected>>) -> Result<(), ClientE

// make the socket connection
let url = config.ingest_server.unwrap_or("localhost:4000".to_string());
let token = config.token.ok_or(ClientError::Token)?;
let token = config.token.ok_or(ClientError::TokenNotPresent)?;

let (ws, _) = tokio_tungstenite::connect_async(format!(
"ws://{}/client/websocket?vsn=2.0.0&token={}",
Expand Down Expand Up @@ -119,6 +119,7 @@ async fn make_connection(semaphore: Arc<Mutex<Connected>>) -> Result<(), ClientE
Ok(_) => {}
Err(e) => {
error!("error in sending message to channel {:?}", e);
break;
}
}
}
Expand All @@ -141,6 +142,7 @@ async fn make_connection(semaphore: Arc<Mutex<Connected>>) -> Result<(), ClientE
let heartbeat_tx = tx.clone();
tokio::spawn(async move {
let mut index = 1;
let mut error_count = 0;

loop {
sleep(Duration::from_millis(500)).await;
Expand All @@ -159,6 +161,11 @@ async fn make_connection(semaphore: Arc<Mutex<Connected>>) -> Result<(), ClientE
Ok(_) => {}
Err(e) => {
error!("error sending heartbeat {:?}", e);
error_count += 1;

if error_count > 10 {
break;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion client/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub enum ClientError {
#[error("tokio thread error: {0}")]
TokioThread(#[from] tokio::task::JoinError),
#[error("auth token not present")]
Token,
TokenNotPresent,
#[error("websocket error {0}")]
Websocket(#[from] tokio_tungstenite::tungstenite::Error),
#[error("mpsc channel send error {0}")]
Expand Down
61 changes: 54 additions & 7 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@ mod uploader;
mod webserver;

use tray_icon::{
menu::{AboutMetadata, Menu, MenuEvent, MenuItem, PredefinedMenuItem},
menu::{Menu, MenuEvent, MenuItem},
TrayIconBuilder, TrayIconEvent,
};
use winit::event_loop::{ControlFlow, EventLoopBuilder};

use crate::connection::make_connection_thread;
use crate::errors::ClientError;
use crate::webserver::boot_webserver;
use chrono::Local;
use chrono::{Local, Utc};
use notify_rust::{Notification, Timeout};
use std::path::Path;
use std::process;
use std::sync::{Arc, Mutex};
use tracing::Level;
use tracing::{error, Level};
use tracing_subscriber::FmtSubscriber;
use uuid::Uuid;

pub struct Connected(bool);

Expand All @@ -36,8 +39,15 @@ async fn main() -> Result<(), ClientError> {

// First let's pull in the current configuration - this will automatically create a hardware_id
// if one does not exist for this client
let _client_config = config::get_configuration()?;
let icon = load_icon(Path::new("icon.png"));
let client_config = config::get_configuration()?;

let register_url = format!(
"http://{}/dashboard/destinations/client/register_client?client_id={}", // eventually we should add dynamic port and callbacks
client_config.ingest_server.unwrap_or_default(),
client_config.hardware_id.unwrap_or(Uuid::new_v4())
);

let icon = load_icon(Path::new("./assets/icon.png"));

// we have to use a standard mutex so we can do a blocking read in the event loop - it's a pain in the ass
let semaphore = Arc::new(Mutex::new(Connected(false)));
Expand Down Expand Up @@ -67,11 +77,37 @@ async fn main() -> Result<(), ClientError> {
gtk::main();
});

let not_authenticated = match client_config.token_expires_at {
None => true,
Some(e) => Utc::now().date_naive() > e && client_config.token.is_some(),
};

if not_authenticated {
let _ = Notification::new()
.summary("Ingest")
.body("You must authenticate with Ingest website before you can use the Ingest application.")
.timeout(Timeout::Milliseconds(6000)) //milliseconds
.show();
}

let event_loop = EventLoopBuilder::new().build().unwrap();
let menu = Menu::new();
let menu_authenticate = MenuItem::new("Authenticate with Ingest", true, None);
let menu_status = MenuItem::new("Disconnected", false, None);
let menu_reconnect = MenuItem::new("Reconnect", true, None);
menu.append_items(&[&menu_status, &menu_reconnect])
let menu_exit = MenuItem::new("Exit", true, None);

if not_authenticated {
match menu.insert(&menu_authenticate, 0) {
Ok(_) => {}
Err(_) => {
error!("unable to append the authentication menu item");
process::exit(1);
}
}
}

menu.append_items(&[&menu_status, &menu_reconnect, &menu_exit])
.expect("unable to register menu item");

#[cfg(not(target_os = "linux"))]
Expand Down Expand Up @@ -99,14 +135,25 @@ async fn main() -> Result<(), ClientError> {
}
}

if let Ok(event) = tray_channel.try_recv() {}
if let Ok(_event) = tray_channel.try_recv() {}

if let Ok(event) = menu_channel.try_recv() {
{
if event.id == menu_reconnect.id() && !semaphore.lock().unwrap().0 {
let new_semaphore = semaphore.clone();
tokio::spawn(async move { make_connection_thread(new_semaphore).await });
}

if event.id == menu_exit.id() {
process::exit(0);
}

if event.id == menu_authenticate.id() {
match open::that(register_url.as_str()) {
Ok(_) => {}
Err(e) => error!("unable to open register URL on user's system {:?}", e),
}
}
}
}
});
Expand Down
8 changes: 2 additions & 6 deletions client/src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Uploader {

// we can do a max of 10,000 parts - so if we're above that, we need to up chunk size
while num_parts > max_parts {
chunk_size += 1 * 1024 * 1024; // increase chunk size by 1 MB
chunk_size += 1024 * 1024; // increase chunk size by 1 MB
num_parts = (stats.len() as f64 / chunk_size as f64).ceil() as usize;
}

Expand Down Expand Up @@ -73,7 +73,7 @@ impl Uploader {
Ok(())
}

pub async fn handle_msg(&self, channel_message: ChannelMessage) -> Result<(), UploaderError> {
pub async fn handle_msg(&self, _channel_message: ChannelMessage) -> Result<(), UploaderError> {
Ok(())
}

Expand Down Expand Up @@ -114,12 +114,8 @@ pub enum UploaderError {
RocksDB(#[from] rocksdb::Error),
#[error("io error: {0}")]
IO(#[from] std::io::Error),
#[error("internal error: {0}")]
Internal(String),
#[error("websocket channel send error: {0}")]
Websocket(#[from] tokio::sync::mpsc::error::SendError<ChannelMessage>),
#[error("not implemented")]
NotImplemented,
#[error("json error {0}")]
Json(#[from] serde_json::Error),
}
11 changes: 11 additions & 0 deletions client/src/webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use axum::extract::{Query, State};
use axum::response::Html;
use axum::routing::get;
use axum::Router;
use chrono::{Days, Utc};
use handlebars::Handlebars;
use rust_embed::RustEmbed;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::ops::Add;
use std::sync::{Arc, Mutex};
use tracing::info;
use uuid::Uuid;
Expand Down Expand Up @@ -151,6 +153,15 @@ async fn callback<'a>(

let mut config = state.config.clone();
config.token = Some(token.clone());

// Ingest should default to 10 days expiry, so if we don't have it set that
config.token_expires_at = match params.get("expires_at") {
None => Some(Utc::now().date_naive().add(Days::new(10))),
Some(expires) => Some(
chrono::NaiveDate::parse_from_str(expires, "%Y%m%d").unwrap_or(Utc::now().date_naive().add(Days::new(10)))
),
};

config
.write_to_host()
.expect("unable to write the token to the host configuration file");
Expand Down
6 changes: 2 additions & 4 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ Users are able to upload files directly using the web interface on the central s
The central server is written in Elixir and uses the Phoenix web framework.


## [Ingest Client](/client/readme.md) *abandoned**
## [Ingest Client](/client/readme.md) *alpha preview*



The DeepLynx Ingest client is a cross-platform application designed to enable high-speed UDP file transfer from the computer it's installed on to either the central server or other DeepLynx Ingest clients. It integrates directly with the central server, with the central server acting as the UI for the application vs. building a native UI for each platform.

The client is written in Rust and is currently abandoned. Due to the directions the main server and platform have taken, the Rust client didn't make any real sense to pursue at this time. The code is there for anyone who wants to play around with it - but the elixir portion has been written out.


The client is written in Rust and is currently in alpha preview. D

![inl_logo](server/priv/static/images/inllogo.png)
100 changes: 100 additions & 0 deletions server/lib/ingest/destinations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,107 @@ defmodule Ingest.Destinations do
alias Ingest.Destinations.S3Config
alias Ingest.Repo

alias Ingest.Destinations.Client
alias Ingest.Accounts.User

@doc """
Returns the list of clients.
## Examples
iex> list_clients()
[%Client{}, ...]
"""
def list_clients do
Repo.all(Client)
end

@doc """
Gets a single client.
Raises `Ecto.NoResultsError` if the Client does not exist.
## Examples
iex> get_client!(123)
%Client{}
iex> get_client!(456)
** (Ecto.NoResultsError)
"""
def get_client!(id), do: Repo.get!(Client, id)

def get_client_for_user(client_id, user_id) do
Repo.get_by(Client, id: client_id, owner_id: user_id)
end

@doc """
Creates a client.
## Examples
iex> create_client(%{field: value})
{:ok, %Client{}}
iex> create_client(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_client(attrs \\ %{}) do
%Client{}
|> Client.changeset(attrs)
|> Repo.insert()
end

@doc """
Updates a client.
## Examples
iex> update_client(client, %{field: new_value})
{:ok, %Client{}}
iex> update_client(client, %{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def update_client(%Client{} = client, attrs) do
client
|> Client.changeset(attrs)
|> Repo.update()
end

@doc """
Deletes a client.
## Examples
iex> delete_client(client)
{:ok, %Client{}}
iex> delete_client(client)
{:error, %Ecto.Changeset{}}
"""
def delete_client(%Client{} = client) do
Repo.delete(client)
end

@doc """
Returns an `%Ecto.Changeset{}` for tracking client changes.
## Examples
iex> change_client(client)
%Ecto.Changeset{data: %Client{}}
"""
def change_client(%Client{} = client, attrs \\ %{}) do
Client.changeset(client, attrs)
end

alias Ingest.Destinations.Destination

@doc """
Expand Down
Loading

0 comments on commit f3f5c68

Please sign in to comment.