Skip to content

Commit

Permalink
Expose status of submission runs
Browse files Browse the repository at this point in the history
  • Loading branch information
evanjt committed Oct 31, 2024
1 parent 663a238 commit 63346f8
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 59 deletions.
56 changes: 34 additions & 22 deletions src/external/k8s/models.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
use crate::config::Config;
use crate::{config::Config, submissions};
use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::Serialize;
use thiserror::Error;
use tokio::io::split;
use uuid::Uuid;

#[derive(Debug)]
pub struct PodInfo {
pub name: String,
pub start_time: DateTime<Utc>,
pub latest_status: String,
pub latest_status_time: DateTime<Utc>,
}

#[derive(Serialize, Debug, Clone)]
pub struct PodName {
pub prefix: String,
pub submission_id: Uuid,
pub start_time: DateTime<Utc>,
pub latest_status: String,
pub latest_status_time: DateTime<Utc>,
pub run_id: u64,
}

Expand All @@ -23,34 +36,33 @@ pub enum PodNameError {
InvalidRunId(#[from] std::num::ParseIntError),
}

impl TryFrom<String> for PodName {
type Error = PodNameError;
impl From<PodInfo> for PodName {
fn from(pod_info: PodInfo) -> Self {
let config = Config::from_env();

fn try_from(pod_name: String) -> Result<Self, Self::Error> {
let app_config = Config::from_env();
let parts: Vec<&str> = pod_name.split('-').collect();
// Strip the prefix from the pod name, regardless of hyphens
let name_without_prefix = pod_info
.name
.strip_prefix(&format!("{}-", config.pod_prefix))
.unwrap_or(&pod_info.name); // fallback if prefix is absent

// Check that the pod name has the expected structure and prefix
if parts.len() < 7 {
return Err(PodNameError::InvalidStructure);
}
// Reverse split to isolate <UUID>-<run_id>-x-x parts
let parts: Vec<&str> = name_without_prefix.rsplitn(4, '-').collect();

if parts[0] != app_config.pod_prefix {
return Err(PodNameError::InvalidPrefix);
if parts.len() < 4 {
panic!("Pod name does not have the expected structure");
}

let uuid_str = format!(
"{}-{}-{}-{}-{}",
parts[1], parts[2], parts[3], parts[4], parts[5]
);
let run_id: u64 = parts[2].parse().expect("Invalid run ID format");
let submission_id = Uuid::parse_str(parts[3]).expect("Invalid UUID format");

let submission_id = Uuid::parse_str(&uuid_str)?;
let run_id: u64 = parts[6].parse()?;

Ok(PodName {
prefix: parts[0].to_string(),
PodName {
prefix: config.pod_prefix.clone(),
submission_id,
start_time: pod_info.start_time,
latest_status: pod_info.latest_status,
latest_status_time: pod_info.latest_status_time,
run_id,
})
}
}
}
94 changes: 81 additions & 13 deletions src/external/k8s/services.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::models::PodName;
use crate::config::Config;
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, ListParams},
Expand All @@ -11,6 +12,7 @@ use secrecy::Secret;
use serde::Deserialize;
use std::fs::File;
use std::io::Read;
use uuid::Uuid;

#[derive(Deserialize)]
struct TokenResponse {
Expand Down Expand Up @@ -58,27 +60,78 @@ fn extract_refresh_token(kubeconfig: &Kubeconfig) -> Option<String> {
None
}

pub async fn get_pods() -> Result<Option<Vec<PodName>>> {
pub async fn get_pods() -> Result<Vec<crate::external::k8s::models::PodName>, kube::Error> {
// Get app config and kube client
let app_config = Config::from_env();
let client = match refresh_token_and_get_client().await {
Ok(client) => client,
Err(e) => {
return Err(e);
}
};
let client = refresh_token_and_get_client().await.unwrap();

// Get pods from RCP
// Get pods from Kubernetes API
let pods: Api<Pod> = Api::namespaced(client, &app_config.kube_namespace);
let lp = ListParams::default();
let pods: Vec<PodName> = pods
.list(&lp)
.await?

let pod_list = match pods.list(&lp).await {
Ok(pod_list) => pod_list,
Err(e) => return Err(e),
};
println!("Found {} pods", pod_list.items.len());
if let Some(first_pod) = pod_list.items.first() {
println!("First pod name: {:?}", first_pod.metadata.name);
println!("First pod status: {:?}", first_pod.status);
} else {
println!("No pods found.");
}

let pod_infos: Vec<crate::external::k8s::models::PodName> = pod_list
.clone()
.items
.into_iter()
.filter_map(|pod| PodName::try_from(pod.metadata.name.clone().unwrap()).ok())
.filter_map(|pod| {
let name = pod.metadata.name.clone()?;

// Add this line to filter by prefix
if !name.starts_with(&app_config.pod_prefix) {
return None;
}

let start_time: DateTime<Utc> = pod.status.clone().unwrap().start_time.unwrap().0;
// Get the latest status by the latest container status.conditions ordered by last_transition_time
let latest_status_info = pod.status.as_ref().and_then(|status| {
status.conditions.as_ref().and_then(|conditions| {
conditions
.iter()
.max_by_key(|condition| condition.last_transition_time.clone())
.map(|condition| {
(
condition.reason.clone(),
condition.last_transition_time.clone(),
)
})
})
});

let (latest_status, latest_status_time) = match latest_status_info {
Some((status, time)) => (status.unwrap_or("Unknown".to_string()), time.unwrap().0),
None => ("Unknown".to_string(), Utc::now()),
};

println!("Pod name: {}", name);
println!("Start time: {:?}", start_time);
println!("Latest status message: {:?}", latest_status);
println!("Latest status time: {:?}", latest_status_time);

Some(
crate::external::k8s::models::PodInfo {
name,
start_time,
latest_status,
latest_status_time,
}
.into(),
)
})
.collect();

Ok(Some(pods))
Ok(pod_infos)
}

pub async fn refresh_token_and_get_client() -> Result<Client> {
Expand Down Expand Up @@ -156,3 +209,18 @@ pub async fn refresh_token_and_get_client() -> Result<Client> {
let client = Client::try_from(config)?;
Ok(client)
}

pub async fn get_jobs_for_submission_id(submission_id: Uuid) -> Result<Vec<PodName>> {
// Get app config and kube client
let pods = get_pods().await.unwrap();

println!("Found {} pods for {}", pods.len(), submission_id);

// Filter pods by submission_id
let jobs: Vec<PodName> = pods
.into_iter()
.filter(|pod| pod.submission_id == submission_id)
.collect();

Ok(jobs)
}
6 changes: 2 additions & 4 deletions src/external/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ use anyhow::{anyhow, Result};
use sea_orm::{Database, DatabaseConnection, EntityTrait};

async fn check_kubernetes() -> Result<serde_json::Value> {
let pods_result = crate::external::k8s::services::get_pods().await;

match pods_result {
Ok(Some(pods)) => Ok(serde_json::to_value(pods).unwrap()),
match crate::external::k8s::services::get_pods().await {
Ok(pods) => Ok(serde_json::to_value(pods).unwrap()),
Ok(_) => Ok(serde_json::to_value("No pods found").unwrap()),
Err(err) => Err(anyhow!(serde_json::to_value(err.to_string()).unwrap())),
}
Expand Down
1 change: 1 addition & 0 deletions src/submissions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod db;
pub mod models;
pub mod run_status;
pub mod services;
pub mod views;
35 changes: 22 additions & 13 deletions src/submissions/models.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::uploads::models::UploadRead;

use super::db::ActiveModel;
use chrono::NaiveDateTime;
use sea_orm::{DeriveIntoActiveModel, NotSet, Set};
Expand All @@ -14,8 +16,10 @@ pub struct Submission {
comment: Option<String>,
created_on: NaiveDateTime,
last_updated: NaiveDateTime,
pub(super) associations: Option<Vec<crate::uploads::models::UploadRead>>,
outputs: Option<Vec<crate::external::s3::models::OutputObjectResponse>>,
pub(super) associations: Vec<crate::uploads::models::UploadRead>,
outputs: Vec<crate::external::s3::models::OutputObjectResponse>,
// status: Vec<super::run_status::models::RunStatus>,
status: Vec<crate::external::k8s::models::PodName>,
}

impl From<super::db::Model> for Submission {
Expand All @@ -28,8 +32,9 @@ impl From<super::db::Model> for Submission {
comment: model.comment,
created_on: model.created_on,
last_updated: model.last_updated,
associations: None,
outputs: None,
associations: vec![],
outputs: vec![],
status: vec![],
}
}
}
Expand All @@ -38,19 +43,24 @@ impl
From<(
super::db::Model,
Option<Vec<crate::uploads::db::Model>>,
// Vec<super::run_status::db::Model>,
Vec<crate::external::k8s::models::PodName>,
Vec<crate::external::s3::models::OutputObject>,
)> for Submission
{
fn from(
model_tuple: (
super::db::Model,
Option<Vec<crate::uploads::db::Model>>,
// Vec<super::run_status::db::Model>,
Vec<crate::external::k8s::models::PodName>,
Vec<crate::external::s3::models::OutputObject>,
),
) -> Self {
let submission = model_tuple.0;
let uploads = model_tuple.1;
let outputs = model_tuple.2;
let status = model_tuple.2;
let outputs = model_tuple.3;
Self {
id: submission.id,
name: submission.name,
Expand All @@ -59,14 +69,13 @@ impl
comment: submission.comment,
created_on: submission.created_on,
last_updated: submission.last_updated,
associations: Some(
uploads
.unwrap_or_default()
.into_iter()
.map(|association| association.into())
.collect(),
),
outputs: Some(outputs.into_iter().map(|output| output.into()).collect()),
associations: uploads
.unwrap_or_default()
.into_iter()
.map(|association| association.into())
.collect(),
status: status.into_iter().map(|status| status.into()).collect(),
outputs: outputs.into_iter().map(|output| output.into()).collect(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/submissions/run_status/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod db;
pub mod models;
38 changes: 38 additions & 0 deletions src/submissions/run_status/models.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use chrono::NaiveDateTime;
use serde::Serialize;
use serde_json::Value;
use utoipa::ToSchema;
use uuid::Uuid;

#[derive(ToSchema, Serialize, Debug)]
pub struct RunStatus {
pub id: Uuid,
pub submission_id: Uuid,
pub kubernetes_pod_name: Option<String>,
pub status: Option<String>,
pub is_running: bool,
pub is_successful: bool,
pub is_still_kubernetes_resource: bool,
pub time_started: Option<String>,
pub logs: Value,
pub time_added_utc: NaiveDateTime,
pub last_updated: NaiveDateTime,
}

impl From<super::db::Model> for RunStatus {
fn from(model: super::db::Model) -> Self {
Self {
id: model.id,
submission_id: model.submission_id,
kubernetes_pod_name: model.kubernetes_pod_name,
status: model.status,
is_running: model.is_running,
is_successful: model.is_successful,
is_still_kubernetes_resource: model.is_still_kubernetes_resource,
time_started: model.time_started,
logs: model.logs,
time_added_utc: model.time_added_utc,
last_updated: model.last_updated,
}
}
}
Loading

0 comments on commit 63346f8

Please sign in to comment.