Skip to content

Commit

Permalink
Merge branch 'fix-core-clean-up-crash' of github.com:giangndm/8xFF-de…
Browse files Browse the repository at this point in the history
…centralized-media-server into fix-core-clean-up-crash
  • Loading branch information
giangndm committed Oct 29, 2024
2 parents d68cb05 + b5053c5 commit 3c26709
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 49 deletions.
11 changes: 7 additions & 4 deletions bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,16 @@ pub struct Args {
pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

let app_storage = Arc::new(MultiTenancyStorage::new("not-use-this", None));
if let Some(url) = args.multi_tenancy_sync {
let mut app_sync = MultiTenancySync::new(app_storage.clone(), &url, Duration::from_millis(args.multi_tenancy_sync_interval_ms));
let app_storage = if let Some(url) = args.multi_tenancy_sync {
let app_storage = Arc::new(MultiTenancyStorage::new());
let mut app_sync = MultiTenancySync::new(app_storage.clone(), url, Duration::from_millis(args.multi_tenancy_sync_interval_ms));
tokio::spawn(async move {
app_sync.run_loop().await;
});
}
app_storage
} else {
Arc::new(MultiTenancyStorage::new_with_single(&node.secret, args.hook_uri.as_deref()))
};

let mut connector_storage = ConnectorStorage::new(
node.node_id,
Expand Down
13 changes: 8 additions & 5 deletions bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,17 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod

let edge_secure = Arc::new(MediaEdgeSecureJwt::from(node.secret.as_bytes()));

let app_storage = Arc::new(MultiTenancyStorage::new(&node.secret, None));
let gateway_secure = MediaGatewaySecureJwt::new(node.secret.as_bytes(), app_storage.clone());
if let Some(url) = args.multi_tenancy_sync {
let mut app_sync = MultiTenancySync::new(app_storage, &url, Duration::from_millis(args.multi_tenancy_sync_interval_ms));
let app_storage = if let Some(url) = args.multi_tenancy_sync {
let app_storage = Arc::new(MultiTenancyStorage::new());
let mut app_sync = MultiTenancySync::new(app_storage.clone(), url, Duration::from_millis(args.multi_tenancy_sync_interval_ms));
tokio::spawn(async move {
app_sync.run_loop().await;
});
}
app_storage
} else {
Arc::new(MultiTenancyStorage::new_with_single(&node.secret, None))
};
let gateway_secure = MediaGatewaySecureJwt::new(node.secret.as_bytes(), app_storage.clone());
let gateway_secure = Arc::new(gateway_secure);

let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024);
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024);
if let Some(http_port) = http_port {
let secure_gateway = args.enable_token_api.then(|| {
let app_storage = Arc::new(MultiTenancyStorage::new(&node.secret, None));
let app_storage = Arc::new(MultiTenancyStorage::new_with_single(&node.secret, None));
Arc::new(MediaGatewaySecureJwt::new(node.secret.as_bytes(), app_storage))
});
let req_tx = req_tx.clone();
Expand Down
19 changes: 11 additions & 8 deletions docs/user-guide/integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@ We support integration with other systems through the following methods:

The media server employs an efficient and secure approach for token generation and validation. It does not store any token information in its database and does not rely on external services for token validation. Instead, each node in the cluster validates tokens based on its configuration.

Currently, the media server uses JWT with a static cluster secret for token generation. It also supports multi-tenancy applications by synchronizing data from an external HTTP source that responds with a JSON structure like:
Currently, the media server uses JWT with a static cluster secret for token generation. It also supports multi-tenancy mode by synchronizing data from an external HTTP source that responds with a JSON structure like:

```json
{
"apps": {
"app1": {
"secret": "secret1"
"apps": [
{
"app_id": "app1",
"app_secret": "secret1",
"hook": "http://hook_endpoint?params=what_ever_ouwant"
},
"app2": {
"secret": "secret2"
{
"app_id": "app2",
"app_secret": "secret2"
}
}
]
}
```

The synchronization endpoint can be used with the `--multi-tenancy-sync` option of the gateway node. Instead of using the root secret, we can use the app secret to create tokens specific to that app.
The synchronization endpoint can be used with the --multi-tenancy-sync option of the gateway node. There are two separate modes: multi-tenancy and fixed secret. In multi-tenancy mode, you use the app secret to create tokens specific to each app. Once --multi-tenancy-sync is set, the default secret becomes unusable, and you can only use secrets from the list of apps provided in the --multi-tenancy-sync response. In fixed secret mode, the root secret is used for token creation.

We can use token generation APIs to create tokens. For more information, please refer to the HTTP APIs section below.

Expand Down
60 changes: 29 additions & 31 deletions packages/multi_tenancy/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, ops::Deref, sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};

use media_server_protocol::multi_tenancy::{AppContext, AppId, AppSecret};
use media_server_secure::AppStorage;
Expand All @@ -10,9 +10,22 @@ pub struct MultiTenancyStorage {
}

impl MultiTenancyStorage {
pub fn new(secret: &str, hook: Option<&str>) -> Self {
pub fn new_with_single(app_secret: &str, hook: Option<&str>) -> Self {
let storage = Self::new();
storage.sync(
vec![AppInfo {
app_id: "".to_owned(),
app_secret: app_secret.to_owned(),
hook: hook.map(|s| s.to_owned()),
}]
.into_iter(),
);
storage
}

pub fn new() -> Self {
Self {
internal: RwLock::new(MultiTenancyStorageInternal::new(secret, hook)),
internal: RwLock::new(MultiTenancyStorageInternal::new()),
}
}

Expand Down Expand Up @@ -42,21 +55,15 @@ impl AppStorage for MultiTenancyStorage {
}

struct MultiTenancyStorageInternal {
root_app: AppInfo,
secrets: HashMap<AppSecret, AppInfo>,
apps: HashMap<AppId, AppInfo>,
}

impl MultiTenancyStorageInternal {
pub fn new(secret: &str, hook: Option<&str>) -> Self {
pub fn new() -> Self {
Self {
secrets: Default::default(),
apps: Default::default(),
root_app: AppInfo {
app_id: AppId::root_app().into(),
app_secret: secret.to_owned(),
hook: hook.map(|h| h.to_owned()),
},
}
}

Expand All @@ -74,17 +81,10 @@ impl MultiTenancyStorageInternal {
}

fn get_app(&self, app: &AppId) -> Option<AppInfo> {
if app.deref().is_empty() {
Some(self.root_app.clone())
} else {
self.apps.get(app).cloned()
}
self.apps.get(app).cloned()
}

fn get_secret(&self, secret: &AppSecret) -> Option<AppInfo> {
if self.root_app.app_secret.eq(secret.deref()) {
return Some(self.root_app.clone());
}
self.secrets.get(secret).cloned()
}

Expand All @@ -110,18 +110,14 @@ pub struct MultiTenancySyncResponse {
}

pub struct MultiTenancySync {
storage: Arc<MultiTenancyStorage>,
endpoint: String,
interval: Duration,
storage: Arc<MultiTenancyStorage>,
}

impl MultiTenancySync {
pub fn new(storage: Arc<MultiTenancyStorage>, endpoint: &str, interval: Duration) -> Self {
Self {
storage,
endpoint: endpoint.to_owned(),
interval,
}
pub fn new(storage: Arc<MultiTenancyStorage>, endpoint: String, interval: Duration) -> Self {
Self { endpoint, interval, storage }
}

async fn sync(&mut self) -> Result<(), reqwest::Error> {
Expand Down Expand Up @@ -159,7 +155,7 @@ mod tests {

#[tokio::test]
async fn test_sync() {
let storage = Arc::new(MultiTenancyStorage::new("aaaa", None));
let storage = Arc::new(MultiTenancyStorage::new());
let app_info = AppInfo {
app_id: "app1".to_owned(),
app_secret: "secret1".to_string(),
Expand All @@ -173,7 +169,7 @@ mod tests {

#[tokio::test]
async fn test_validate_app() {
let storage = Arc::new(MultiTenancyStorage::new("aaaa", None));
let storage = Arc::new(MultiTenancyStorage::new());
let app_info = AppInfo {
app_id: "app1".to_owned(),
app_secret: "secret1".to_string(),
Expand Down Expand Up @@ -218,8 +214,9 @@ mod tests {
}),
);

let storage = Arc::new(MultiTenancyStorage::new("aaaa", None));
let mut sync = MultiTenancySync::new(storage.clone(), &server.url("/sync"), Duration::from_secs(100));
let storage = Arc::new(MultiTenancyStorage::new());
let endpoint = server.url("/sync");
let mut sync = MultiTenancySync::new(storage.clone(), endpoint, Duration::from_secs(100));
sync.sync().await.expect("Should sync ok");

assert_eq!(storage.len(), 1);
Expand All @@ -232,8 +229,9 @@ mod tests {
let server = MockServer::start();
mockhttp(&server, Err(StatusCode::BAD_GATEWAY));

let storage = Arc::new(MultiTenancyStorage::new("aaaa", None));
let mut sync = MultiTenancySync::new(storage.clone(), &server.url("/sync"), Duration::from_secs(100));
let storage = Arc::new(MultiTenancyStorage::new());
let endpoint = server.url("/sync");
let mut sync = MultiTenancySync::new(storage.clone(), endpoint, Duration::from_secs(100));
sync.sync().await.expect_err("Should sync error because of http error");

assert_eq!(storage.len(), 0);
Expand Down

0 comments on commit 3c26709

Please sign in to comment.