Skip to content

Commit

Permalink
fixed: postgresql query error (8xFF#419)
Browse files Browse the repository at this point in the history
* fixed: postgresql query error

* update migration
  • Loading branch information
giangndm authored Aug 9, 2024
1 parent 93f3182 commit 260ff03
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 47 deletions.
43 changes: 39 additions & 4 deletions bin/src/server/connector/remote_rpc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@ pub struct ConnectorRemoteRpcHandlerImpl {}

impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
async fn rooms(&self, ctx: &Ctx, req: GetParams) -> Option<GetRooms> {
let response = ctx.storage.rooms(req.page as usize, req.limit as usize).await?;
log::info!("[ConnectorRemoteRpcHandler] on get rooms {req:?}");
let response = match ctx.storage.rooms(req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get rooms error {err}");
return None;
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} rooms", response.data.len());

let rooms = response
.data
Expand All @@ -40,7 +48,16 @@ impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
}

async fn peers(&self, ctx: &Ctx, req: GetPeerParams) -> Option<GetPeers> {
let response = ctx.storage.peers(req.room, req.page as usize, req.limit as usize).await?;
log::info!("[ConnectorRemoteRpcHandler] on get peers page {req:?}");
let response = match ctx.storage.peers(req.room, req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get peers error {err}");
return None;
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} peers", response.data.len());

let peers = response
.data
.into_iter()
Expand Down Expand Up @@ -76,7 +93,16 @@ impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
}

async fn sessions(&self, ctx: &Ctx, req: GetParams) -> Option<GetSessions> {
let response = ctx.storage.sessions(req.page as usize, req.limit as usize).await?;
log::info!("[ConnectorRemoteRpcHandler] on get sessions page {req:?}");
let response = match ctx.storage.sessions(req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get sessions error {err}");
return None;
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} sessions", response.data.len());

let sessions = response
.data
.into_iter()
Expand Down Expand Up @@ -111,7 +137,16 @@ impl MediaConnectorServiceHandler<Ctx> for ConnectorRemoteRpcHandlerImpl {
}

async fn events(&self, ctx: &Ctx, req: GetEventParams) -> Option<GetEvents> {
let response = ctx.storage.events(req.session, req.start_ts, req.end_ts, req.page as usize, req.limit as usize).await?;
log::info!("[ConnectorRemoteRpcHandler] on get events page {req:?}");
let response = match ctx.storage.events(req.session, req.start_ts, req.end_ts, req.page as usize, req.limit as usize).await {
Ok(res) => res,
Err(err) => {
log::error!("[ConnectorRemoteRpcHandler] on get events error {err}");
return None;
}
};
log::info!("[ConnectorRemoteRpcHandler] on got {} events", response.data.len());

let events = response
.data
.into_iter()
Expand Down
8 changes: 4 additions & 4 deletions packages/media_connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ pub trait Storage {
}

pub trait Querier {
fn rooms(&self, page: usize, count: usize) -> impl std::future::Future<Output = Option<PagingResponse<RoomInfo>>> + Send;
fn peers(&self, room: Option<i32>, page: usize, count: usize) -> impl std::future::Future<Output = Option<PagingResponse<PeerInfo>>> + Send;
fn sessions(&self, page: usize, count: usize) -> impl std::future::Future<Output = Option<PagingResponse<SessionInfo>>> + Send;
fn events(&self, session: Option<u64>, from: Option<u64>, to: Option<u64>, page: usize, count: usize) -> impl std::future::Future<Output = Option<PagingResponse<EventInfo>>> + Send;
fn rooms(&self, page: usize, count: usize) -> impl std::future::Future<Output = Result<PagingResponse<RoomInfo>, String>> + Send;
fn peers(&self, room: Option<i32>, page: usize, count: usize) -> impl std::future::Future<Output = Result<PagingResponse<PeerInfo>, String>> + Send;
fn sessions(&self, page: usize, count: usize) -> impl std::future::Future<Output = Result<PagingResponse<SessionInfo>, String>> + Send;
fn events(&self, session: Option<u64>, from: Option<u64>, to: Option<u64>, page: usize, count: usize) -> impl std::future::Future<Output = Result<PagingResponse<EventInfo>, String>> + Send;
}
80 changes: 43 additions & 37 deletions packages/media_connector/src/sql_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl ConnectorStorage {
.idle_timeout(Duration::from_secs(8))
.max_lifetime(Duration::from_secs(8))
.sqlx_logging(false)
.sqlx_logging_level(log::LevelFilter::Info); // Setting default PostgreSQL schema
.sqlx_logging_level(log::LevelFilter::Warn); // Setting default PostgreSQL schema

let db = Database::connect(opt).await.expect("Should connect to sql server");
migration::Migrator::up(&db, None).await.expect("Should run migration success");
Expand Down Expand Up @@ -73,7 +73,7 @@ impl ConnectorStorage {
.ok()?;

entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -89,7 +89,7 @@ impl ConnectorStorage {
}
peer_event::Event::RouteSuccess(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -104,7 +104,7 @@ impl ConnectorStorage {
}
peer_event::Event::RouteError(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand Down Expand Up @@ -132,7 +132,7 @@ impl ConnectorStorage {
.ok()?;

entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -147,7 +147,7 @@ impl ConnectorStorage {
}
peer_event::Event::Connected(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -162,7 +162,7 @@ impl ConnectorStorage {
}
peer_event::Event::ConnectError(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -178,7 +178,7 @@ impl ConnectorStorage {
peer_event::Event::Stats(_) => todo!(),
peer_event::Event::Reconnect(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -193,7 +193,7 @@ impl ConnectorStorage {
}
peer_event::Event::Reconnected(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -208,7 +208,7 @@ impl ConnectorStorage {
}
peer_event::Event::Disconnected(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -227,7 +227,7 @@ impl ConnectorStorage {
let _peer_session = self.upsert_peer_session(peer, session, ts).await?;

entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand Down Expand Up @@ -256,7 +256,7 @@ impl ConnectorStorage {
}

entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -271,7 +271,7 @@ impl ConnectorStorage {
}
peer_event::Event::RemoteTrackStarted(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -286,7 +286,7 @@ impl ConnectorStorage {
}
peer_event::Event::RemoteTrackEnded(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -301,7 +301,7 @@ impl ConnectorStorage {
}
peer_event::Event::LocalTrack(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -316,7 +316,7 @@ impl ConnectorStorage {
}
peer_event::Event::LocalTrackAttach(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand All @@ -331,7 +331,7 @@ impl ConnectorStorage {
}
peer_event::Event::LocalTrackDetach(params) => {
entity::event::ActiveModel {
node: Set(from),
node: Set(from as i64),
node_ts: Set(ts as i64),
session: Set(session as i64),
created_at: Set(now_ms() as i64),
Expand Down Expand Up @@ -439,11 +439,11 @@ struct RoomInfoAndPeersCount {
pub id: i32,
pub room: String,
pub created_at: i64,
pub peers: i32,
pub peers: i64,
}

impl Querier for ConnectorStorage {
async fn rooms(&self, page: usize, limit: usize) -> Option<PagingResponse<RoomInfo>> {
async fn rooms(&self, page: usize, limit: usize) -> Result<PagingResponse<RoomInfo>, String> {
let rooms = entity::room::Entity::find()
.column_as(entity::peer::Column::Id.count(), "peers")
.join_rev(JoinType::LeftJoin, entity::peer::Relation::Room.def())
Expand All @@ -454,7 +454,7 @@ impl Querier for ConnectorStorage {
.into_model::<RoomInfoAndPeersCount>()
.all(&self.db)
.await
.ok()?
.map_err(|e| e.to_string())?
.into_iter()
.map(|r| RoomInfo {
id: r.id,
Expand All @@ -463,32 +463,33 @@ impl Querier for ConnectorStorage {
peers: r.peers as usize,
})
.collect::<Vec<_>>();
let total = entity::room::Entity::find().count(&self.db).await.ok()?;
Some(PagingResponse {
let total = entity::room::Entity::find().count(&self.db).await.map_err(|e| e.to_string())?;
Ok(PagingResponse {
data: rooms,
current: page,
total: calc_page_num(total as usize, limit),
})
}

async fn peers(&self, room: Option<i32>, page: usize, limit: usize) -> Option<PagingResponse<PeerInfo>> {
async fn peers(&self, room: Option<i32>, page: usize, limit: usize) -> Result<PagingResponse<PeerInfo>, String> {
let peers = entity::peer::Entity::find();
let peers = if let Some(room) = room {
peers.filter(entity::peer::Column::Room.eq(room))
} else {
peers
};

let total = peers.clone().count(&self.db).await.ok()?;
let total = peers.clone().count(&self.db).await.map_err(|e| e.to_string())?;
let peers = peers
.order_by(entity::peer::Column::CreatedAt, sea_orm::Order::Desc)
.find_with_related(entity::room::Entity)
.group_by(entity::peer::Column::Id)
.group_by(entity::room::Column::Id)
.limit(limit as u64)
.offset((page * limit) as u64)
.all(&self.db)
.await
.ok()?
.map_err(|e| e.to_string())?
.into_iter()
.collect::<Vec<_>>();

Expand All @@ -499,14 +500,14 @@ impl Querier for ConnectorStorage {
.filter(entity::peer_session::Column::Peer.is_in(peer_ids))
.all(&self.db)
.await
.ok()?;
.map_err(|e| e.to_string())?;
let mut peers_sessions_map = HashMap::new();
for peer_session in peer_sessions {
let entry = peers_sessions_map.entry(peer_session.peer).or_insert(vec![]);
entry.push(peer_session);
}

Some(PagingResponse {
Ok(PagingResponse {
data: peers
.into_iter()
.map(|(peer, room)| PeerInfo {
Expand Down Expand Up @@ -536,27 +537,31 @@ impl Querier for ConnectorStorage {
})
}

async fn sessions(&self, page: usize, limit: usize) -> Option<PagingResponse<SessionInfo>> {
async fn sessions(&self, page: usize, limit: usize) -> Result<PagingResponse<SessionInfo>, String> {
let sessions = entity::session::Entity::find()
.order_by(entity::session::Column::CreatedAt, sea_orm::Order::Desc)
.limit(limit as u64)
.offset((page * limit) as u64)
.find_with_related(entity::peer_session::Entity)
.all(&self.db)
.await
.ok()?;
let total = entity::session::Entity::find().count(&self.db).await.ok()?;
.map_err(|e| e.to_string())?;
let total = entity::session::Entity::find().count(&self.db).await.map_err(|e| e.to_string())?;

// TODO optimize this sub queries
// should combine into single query but it not allowed by sea-orm with multiple find_with_related
let peers_id = sessions.iter().flat_map(|(_, peers)| peers.iter().map(|p| p.peer)).collect::<Vec<_>>();
let peers = entity::peer::Entity::find().filter(entity::peer::Column::Id.is_in(peers_id)).all(&self.db).await.ok()?;
let peers = entity::peer::Entity::find()
.filter(entity::peer::Column::Id.is_in(peers_id))
.all(&self.db)
.await
.map_err(|e| e.to_string())?;
let mut peers_map = HashMap::new();
for peer in peers {
peers_map.insert(peer.id, peer);
}

Some(PagingResponse {
Ok(PagingResponse {
data: sessions
.into_iter()
.map(|(r, peers)| SessionInfo {
Expand Down Expand Up @@ -584,7 +589,7 @@ impl Querier for ConnectorStorage {
})
}

async fn events(&self, session: Option<u64>, from: Option<u64>, to: Option<u64>, page: usize, limit: usize) -> Option<PagingResponse<EventInfo>> {
async fn events(&self, session: Option<u64>, from: Option<u64>, to: Option<u64>, page: usize, limit: usize) -> Result<PagingResponse<EventInfo>, String> {
let events = entity::event::Entity::find();
let events = if let Some(session) = session {
events.filter(entity::event::Column::Session.eq(session as i64))
Expand All @@ -604,26 +609,27 @@ impl Querier for ConnectorStorage {
events
};

let total = events.clone().count(&self.db).await.ok()?;
let total = events.clone().count(&self.db).await.map_err(|e| e.to_string())?;
let events = events
.order_by(entity::event::Column::CreatedAt, sea_orm::Order::Desc)
.limit(limit as u64)
.offset((page * limit) as u64)
.all(&self.db)
.await
.unwrap()
.map_err(|e| e.to_string())?
.into_iter()
.map(|r| EventInfo {
id: r.id,
node: r.node,
node: r.node as u32,
created_at: r.created_at as u64,
session: r.session as u64,
node_ts: r.node_ts as u64,
event: r.event,
meta: r.meta,
})
.collect::<Vec<_>>();
Some(PagingResponse {

Ok(PagingResponse {
data: events,
current: page,
total: calc_page_num(total as usize, limit),
Expand Down
Loading

0 comments on commit 260ff03

Please sign in to comment.