Skip to content

Commit

Permalink
Introduce Local Operators
Browse files Browse the repository at this point in the history
  • Loading branch information
lucemans committed Dec 18, 2024
1 parent 1499802 commit 576ca4c
Show file tree
Hide file tree
Showing 18 changed files with 3,585 additions and 743 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "v3x-property-engine"
version = "0.0.3"
version = "0.0.4"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
3 changes: 3 additions & 0 deletions engine/migrations/0014_local-operators.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE IF EXISTS local_operators;

DELETE FROM policies WHERE resource_type = 'local_operator';
18 changes: 18 additions & 0 deletions engine/migrations/0014_local-operators.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
--- Create LocalOperator table
CREATE TABLE IF NOT EXISTS local_operators (
operator_id TEXT PRIMARY KEY,
operator_endpoint TEXT NOT NULL,
operator_last_heartbeat TIMESTAMP WITH TIME ZONE NOT NULL
);

INSERT INTO policies (
resource_type, resource_id, action, subject_type, subject_id
) VALUES (
'local_operator', NULL, 'read', 'authed', 'true'
);

INSERT INTO policies (
resource_type, resource_id, action, subject_type, subject_id
) VALUES (
'local_operator', NULL, 'write', 'authed', 'true'
);
40 changes: 40 additions & 0 deletions engine/src/models/local_operators/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use chrono::{DateTime, Utc};
use poem_openapi::Object;
use serde::{Deserialize, Serialize};
use sqlx::query_as;

use crate::database::Database;

#[derive(Debug, Serialize, Deserialize, Object)]
pub struct LocalOperator {
pub operator_id: String,
pub operator_endpoint: String,
pub operator_last_heartbeat: DateTime<Utc>,
}

impl LocalOperator {
pub async fn upsert(
db: &Database,
operator_id: &str,
operator_endpoint: &str,
) -> Result<Self, anyhow::Error> {
let x = query_as!(
LocalOperator,
"INSERT INTO local_operators (operator_id, operator_endpoint, operator_last_heartbeat) VALUES ($1, $2, NOW()) ON CONFLICT (operator_id) DO UPDATE SET operator_last_heartbeat = NOW() RETURNING *",
operator_id,
operator_endpoint
)
.fetch_one(&db.pool)
.await?;

Ok(x)
}

pub async fn list_operators(db: &Database) -> Result<Vec<Self>, sqlx::Error> {
let x = query_as!(LocalOperator, "SELECT * FROM local_operators")
.fetch_all(&db.pool)
.await?;

Ok(x)
}
}
1 change: 1 addition & 0 deletions engine/src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod field;
pub mod item;
pub mod keys;
pub mod local_operators;
pub mod location;
pub mod log;
pub mod media;
Expand Down
22 changes: 16 additions & 6 deletions engine/src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use logs::LogsApi;
use me::MeApi;
use media::MediaApi;
use oauth::{callback::CallbackApi, login::LoginApi};
use operators::OperatorApi;
use poem::{
endpoint::StaticFilesEndpoint, get, handler, listener::TcpListener, middleware::Cors,
web::Html, EndpointExt, Route, Server,
Expand All @@ -22,20 +23,21 @@ use users::{keys::UserKeysApi, UserApi};

use crate::state::AppState;

pub mod error;
pub mod fields;
pub mod instance;
pub mod item;
pub mod logs;
pub mod me;
pub mod media;
pub mod oauth;
pub mod operators;
pub mod policy;
pub mod product;
pub mod search;
pub mod sessions;
pub mod tags;
pub mod users;
pub mod error;

#[derive(Tags)]
enum ApiTags {
Expand All @@ -53,6 +55,8 @@ enum ApiTags {
Logs,
/// Search-related operations
Search,
/// Operators-related operations
Operators,
/// User-related operations
User,
/// Auth-related operations
Expand All @@ -62,7 +66,7 @@ enum ApiTags {
Instance,
}

fn get_api() -> impl OpenApi {
fn get_api_2() -> impl OpenApi {
(
// Items
(ItemsApi, ItemMediaApi, ItemIntelligenceApi),
Expand All @@ -71,19 +75,25 @@ fn get_api() -> impl OpenApi {
// Media
MediaApi,
// Search
SearchApi,
SearchTaskApi,
(SearchApi, SearchTaskApi),
// Fields
FieldsApi,
// Tags
TagsApi,
// Logs
LogsApi,
)
}

fn get_api() -> impl OpenApi {
(
get_api_2(),
// Me
MeApi,
// Operators
OperatorApi,
// User
UserApi,
UserKeysApi,
(UserApi, UserKeysApi),
// Policy
PolicyApi,
// Sessions
Expand Down
61 changes: 61 additions & 0 deletions engine/src/routes/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::sync::Arc;

use crate::auth::middleware::AuthUser;
use crate::auth::permissions::Action;
use crate::models::local_operators::LocalOperator;
use crate::state::AppState;
use poem::web::Data;
use poem::Result;
use poem_openapi::payload::Json;
use poem_openapi::Object;
use poem_openapi::OpenApi;
use serde::{Deserialize, Serialize};

use super::error::HttpError;
use crate::routes::ApiTags;

#[derive(Debug, Deserialize, Serialize, Object)]
pub struct LocalOperatorPayload {
// The operator decides this themselves
pub operator_id: String,
// This identifies where the operator is running
pub operator_endpoint: String,
}

pub struct OperatorApi;

#[OpenApi]
impl OperatorApi {
#[oai(path = "/operators", method = "get", tag = "ApiTags::Operators")]
async fn list_operators(
&self,
user: AuthUser,
state: Data<&Arc<AppState>>,
) -> Result<Json<Vec<LocalOperator>>> {
user.check_policy("local_operator", None, Action::Read)
.await?;

LocalOperator::list_operators(&state.database)
.await
.map(Json)
.map_err(HttpError::from)
.map_err(poem::Error::from)
}

#[oai(path = "/operators", method = "post", tag = "ApiTags::Operators")]
async fn create_operator(
&self,
user: AuthUser,
state: Data<&Arc<AppState>>,
payload: Json<LocalOperatorPayload>,
) -> Result<Json<LocalOperator>> {
user.check_policy("local_operator", None, Action::Write)
.await?;

LocalOperator::upsert(&state.database, &payload.operator_id, &payload.operator_endpoint)
.await
.map(Json)
.map_err(HttpError::from)
.map_err(poem::Error::from)
}
}
Loading

0 comments on commit 576ca4c

Please sign in to comment.