Skip to content

Commit

Permalink
wip: rpc with error
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Dec 22, 2024
1 parent 5e38365 commit c9affa8
Show file tree
Hide file tree
Showing 14 changed files with 455 additions and 379 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ indexmap = "2.2"
spin = "0.9"
httpmock = "0.7"
test-log = "0.2"
anyhow = "1.0"
1 change: 1 addition & 0 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ hex = { version = "0.4", optional = true }
mime_guess = { version = "2.0", optional = true }
reqwest = { version = "0.12", features = ["json"]}
sentry = "0.34"
anyhow = { workspace = true }

[features]
default = ["console", "gateway", "media", "connector", "standalone", "cert_utils"]
Expand Down
24 changes: 12 additions & 12 deletions bin/src/http/api_console/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Apis {
#[oai(path = "/:node/log/rooms", method = "get")]
async fn rooms(&self, _auth: ConsoleAuthorization, Data(ctx): Data<&ConsoleApisCtx>, Path(node): Path<u32>, Query(page): Query<u32>, Query(limit): Query<u32>) -> Json<Response<Vec<RoomInfo>>> {
match ctx.connector.rooms(node_vnet_addr(node, CONNECTOR_RPC_PORT), GetParams { page, limit }).await {
Some(res) => Json(Response {
Ok(res) => Json(Response {

Check warning on line 80 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L80

Added line #L80 was not covered by tests
status: true,
data: Some(
res.rooms
Expand All @@ -98,9 +98,9 @@ impl Apis {
}),
..Default::default()
}),
None => Json(Response {
Err(e) => Json(Response {

Check warning on line 101 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L101

Added line #L101 was not covered by tests
status: false,
error: Some("CLUSTER_ERROR".to_string()),
error: Some(e.to_string()),

Check warning on line 103 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L103

Added line #L103 was not covered by tests
..Default::default()
}),
}
Expand All @@ -118,7 +118,7 @@ impl Apis {
Query(limit): Query<u32>,
) -> Json<Response<Vec<PeerInfo>>> {
match ctx.connector.peers(node_vnet_addr(node, CONNECTOR_RPC_PORT), GetPeerParams { room, page, limit }).await {
Some(res) => Json(Response {
Ok(res) => Json(Response {

Check warning on line 121 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L121

Added line #L121 was not covered by tests
status: true,
data: Some(
res.peers
Expand Down Expand Up @@ -151,9 +151,9 @@ impl Apis {
}),
..Default::default()
}),
None => Json(Response {
Err(e) => Json(Response {

Check warning on line 154 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L154

Added line #L154 was not covered by tests
status: false,
error: Some("CLUSTER_ERROR".to_string()),
error: Some(e.to_string()),

Check warning on line 156 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L156

Added line #L156 was not covered by tests
..Default::default()
}),
}
Expand All @@ -170,7 +170,7 @@ impl Apis {
Query(limit): Query<u32>,
) -> Json<Response<Vec<SessionInfo>>> {
match ctx.connector.sessions(node_vnet_addr(node, CONNECTOR_RPC_PORT), GetParams { page, limit }).await {
Some(res) => Json(Response {
Ok(res) => Json(Response {

Check warning on line 173 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L173

Added line #L173 was not covered by tests
status: true,
data: Some(
res.sessions
Expand Down Expand Up @@ -204,9 +204,9 @@ impl Apis {
}),
..Default::default()
}),
None => Json(Response {
Err(e) => Json(Response {

Check warning on line 207 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L207

Added line #L207 was not covered by tests
status: false,
error: Some("CLUSTER_ERROR".to_string()),
error: Some(e.to_string()),

Check warning on line 209 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L209

Added line #L209 was not covered by tests
..Default::default()
}),
}
Expand Down Expand Up @@ -240,7 +240,7 @@ impl Apis {
)
.await
{
Some(res) => Json(Response {
Ok(res) => Json(Response {

Check warning on line 243 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L243

Added line #L243 was not covered by tests
status: true,
data: Some(
res.events
Expand All @@ -262,9 +262,9 @@ impl Apis {
}),
..Default::default()
}),
None => Json(Response {
Err(e) => Json(Response {

Check warning on line 265 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L265

Added line #L265 was not covered by tests
status: false,
error: Some("CLUSTER_ERROR".to_string()),
error: Some(e.to_string()),

Check warning on line 267 in bin/src/http/api_console/connector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/http/api_console/connector.rs#L267

Added line #L267 was not covered by tests
..Default::default()
}),
}
Expand Down
25 changes: 13 additions & 12 deletions bin/src/server/connector/remote_rpc_handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use anyhow::{anyhow, Result};
use media_server_connector::Querier;
use media_server_protocol::protobuf::cluster_connector::{
get_events::EventInfo, get_peers::PeerInfo, get_rooms::RoomInfo, get_sessions::SessionInfo, GetEventParams, GetEvents, GetParams, GetPeerParams, GetPeers, GetRooms, GetSessions,
Expand All @@ -16,13 +17,13 @@ pub struct Ctx {
pub struct ConnectorRemoteRpcHandlerImpl {}

impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
async fn rooms(&self, ctx: &Ctx, req: GetParams) -> Option<GetRooms> {
async fn rooms(&self, ctx: &Ctx, req: GetParams) -> Result<GetRooms> {

Check warning on line 20 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L20

Added line #L20 was not covered by tests
log::info!("[ConnectorRemoteRpcHandler] on get rooms {req:?}");
let response = match ctx.storage.rooms(req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get rooms error {err}");
return None;
return Err(anyhow!("{err}"));

Check warning on line 26 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L26

Added line #L26 was not covered by tests
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} rooms", response.data.len());
Expand All @@ -40,7 +41,7 @@ impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
})
.collect::<Vec<_>>();

Some(GetRooms {
Ok(GetRooms {

Check warning on line 44 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L44

Added line #L44 was not covered by tests
rooms,
pagination: Some(Pagination {
total: response.total as u32,
Expand All @@ -49,13 +50,13 @@ impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
})
}

async fn peers(&self, ctx: &Ctx, req: GetPeerParams) -> Option<GetPeers> {
async fn peers(&self, ctx: &Ctx, req: GetPeerParams) -> Result<GetPeers> {

Check warning on line 53 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L53

Added line #L53 was not covered by tests
log::info!("[ConnectorRemoteRpcHandler] on get peers page {req:?}");
let response = match ctx.storage.peers(req.room, req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get peers error {err}");
return None;
return Err(anyhow!("{err}"));

Check warning on line 59 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L59

Added line #L59 was not covered by tests
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} peers", response.data.len());
Expand Down Expand Up @@ -85,7 +86,7 @@ impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
})
.collect::<Vec<_>>();

Some(GetPeers {
Ok(GetPeers {

Check warning on line 89 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L89

Added line #L89 was not covered by tests
peers,
pagination: Some(Pagination {
total: response.total as u32,
Expand All @@ -94,13 +95,13 @@ impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
})
}

async fn sessions(&self, ctx: &Ctx, req: GetParams) -> Option<GetSessions> {
async fn sessions(&self, ctx: &Ctx, req: GetParams) -> Result<GetSessions> {

Check warning on line 98 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L98

Added line #L98 was not covered by tests
log::info!("[ConnectorRemoteRpcHandler] on get sessions page {req:?}");
let response = match ctx.storage.sessions(req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get sessions error {err}");
return None;
return Err(anyhow!("{err}"));

Check warning on line 104 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L104

Added line #L104 was not covered by tests
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} sessions", response.data.len());
Expand Down Expand Up @@ -130,7 +131,7 @@ impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
.collect::<Vec<_>>(),
})
.collect::<Vec<_>>();
Some(GetSessions {
Ok(GetSessions {

Check warning on line 134 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L134

Added line #L134 was not covered by tests
sessions,
pagination: Some(Pagination {
total: response.total as u32,
Expand All @@ -139,13 +140,13 @@ impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
})
}

async fn events(&self, ctx: &Ctx, req: GetEventParams) -> Option<GetEvents> {
async fn events(&self, ctx: &Ctx, req: GetEventParams) -> Result<GetEvents> {

Check warning on line 143 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L143

Added line #L143 was not covered by tests
log::info!("[ConnectorRemoteRpcHandler] on get events page {req:?}");
let response = match ctx.storage.events(req.session, req.start_ts, req.end_ts, req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get events error {err}");
return None;
return Err(anyhow!("{err}"));

Check warning on line 149 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L149

Added line #L149 was not covered by tests
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} events", response.data.len());
Expand All @@ -163,7 +164,7 @@ impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
meta: e.meta.map(|m| m.to_string()),
})
.collect::<Vec<_>>();
Some(GetEvents {
Ok(GetEvents {

Check warning on line 167 in bin/src/server/connector/remote_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/connector/remote_rpc_handler.rs#L167

Added line #L167 was not covered by tests
events,
pagination: Some(Pagination {
total: response.total as u32,
Expand Down
Loading

0 comments on commit c9affa8

Please sign in to comment.