Skip to content
This repository has been archived by the owner on Oct 26, 2022. It is now read-only.

Rename metadata to response_tx, remove Metadata generic #260

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions netlink-proto/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
{
socket: NetlinkFramed<T, S, C>,

protocol: Protocol<T, UnboundedSender<NetlinkMessage<T>>>,
protocol: Protocol<T>,

/// Channel used by the user to pass requests to the connection.
requests_rx: Option<UnboundedReceiver<Request<T>>>,
Expand Down Expand Up @@ -217,14 +217,14 @@ where
let Response {
message,
done,
metadata: tx,
response_tx: tx,
} = response;
if done {
use NetlinkPayload::*;
match &message.payload {
// Since `self.protocol` set the `done` flag here,
// we know it has already dropped the request and
// its associated metadata, ie the UnboundedSender
// its associated response_tx, the UnboundedSender
// used to forward messages back to the
// ConnectionHandle. By just continuing we're
// dropping the last instance of that sender,
Expand Down
16 changes: 12 additions & 4 deletions netlink-proto/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ where
message: NetlinkMessage<T>,
destination: SocketAddr,
) -> Result<impl Stream<Item = NetlinkMessage<T>>, Error<T>> {
let (tx, rx) = unbounded::<NetlinkMessage<T>>();
let request = Request::from((message, destination, tx));
let (response_tx, rx) = unbounded::<NetlinkMessage<T>>();
let request = Request {
response_tx,
message,
destination,
};
debug!("handle: forwarding new request to connection");
UnboundedSender::unbounded_send(&self.requests_tx, request).map_err(|e| {
// the channel is unbounded, so it can't be full. If this
Expand All @@ -59,8 +63,12 @@ where
message: NetlinkMessage<T>,
destination: SocketAddr,
) -> Result<(), Error<T>> {
let (tx, _rx) = unbounded::<NetlinkMessage<T>>();
let request = Request::from((message, destination, tx));
let (response_tx, _rx) = unbounded::<NetlinkMessage<T>>();
let request = Request {
response_tx,
message,
destination,
};
debug!("handle: forwarding new request to connection");
UnboundedSender::unbounded_send(&self.requests_tx, request)
.map_err(|_| Error::ConnectionClosed)
Expand Down
6 changes: 2 additions & 4 deletions netlink-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,7 @@ mod framed;
pub use crate::framed::*;

mod protocol;
pub(crate) use self::protocol::{Protocol, Response};
pub(crate) type Request<T> =
self::protocol::Request<T, UnboundedSender<crate::packet::NetlinkMessage<T>>>;
pub(crate) use self::protocol::{Protocol, Request, Response};

mod connection;
pub use crate::connection::*;
Expand All @@ -188,7 +186,7 @@ pub use crate::errors::*;
mod handle;
pub use crate::handle::*;

use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use std::{fmt::Debug, io};

pub use netlink_packet_core as packet;
Expand Down
40 changes: 20 additions & 20 deletions netlink-proto/src/protocol/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
fmt::Debug,
};

use futures::channel::mpsc::UnboundedSender;
use netlink_packet_core::{
constants::*,
NetlinkDeserializable,
Expand All @@ -31,30 +32,30 @@ impl RequestId {
}
}

#[derive(Debug, Eq, PartialEq)]
pub(crate) struct Response<T, M> {
#[derive(Debug)]
pub(crate) struct Response<T> {
pub done: bool,
pub message: NetlinkMessage<T>,
pub metadata: M,
pub response_tx: UnboundedSender<NetlinkMessage<T>>,
}

#[derive(Debug)]
struct PendingRequest<M> {
struct PendingRequest<T> {
expecting_ack: bool,
metadata: M,
response_tx: UnboundedSender<NetlinkMessage<T>>,
}

#[derive(Debug, Default)]
pub(crate) struct Protocol<T, M> {
pub(crate) struct Protocol<T> {
/// Counter that is incremented for each message sent
sequence_id: u32,

/// Requests for which we're awaiting a response. Metadata are
/// associated with each request.
pending_requests: HashMap<RequestId, PendingRequest<M>>,
pending_requests: HashMap<RequestId, PendingRequest<T>>,

/// Responses to pending requests
pub incoming_responses: VecDeque<Response<T, M>>,
pub incoming_responses: VecDeque<Response<T>>,

/// Requests from remote peers
pub incoming_requests: VecDeque<(NetlinkMessage<T>, SocketAddr)>,
Expand All @@ -63,10 +64,9 @@ pub(crate) struct Protocol<T, M> {
pub outgoing_messages: VecDeque<(NetlinkMessage<T>, SocketAddr)>,
}

impl<T, M> Protocol<T, M>
impl<T> Protocol<T>
where
T: Debug + NetlinkSerializable + NetlinkDeserializable,
M: Debug + Clone,
{
pub fn new() -> Self {
Self {
Expand All @@ -89,8 +89,8 @@ where
}

fn handle_response(
incoming_responses: &mut VecDeque<Response<T, M>>,
entry: hash_map::OccupiedEntry<RequestId, PendingRequest<M>>,
incoming_responses: &mut VecDeque<Response<T>>,
entry: hash_map::OccupiedEntry<RequestId, PendingRequest<T>>,
message: NetlinkMessage<T>,
) {
let entry_key;
Expand All @@ -110,30 +110,30 @@ where
_ => true,
};

let metadata = if done {
let response_tx = if done {
trace!("request {:?} fully processed", request_id);
let (k, v) = entry.remove_entry();
entry_key = k;
request_id = &entry_key;
v.metadata
v.response_tx
} else {
trace!("more responses to request {:?} may come", request_id);
entry.get().metadata.clone()
entry.get().response_tx.clone()
};

let response = Response::<T, M> {
let response = Response {
done,
message,
metadata,
response_tx,
};
incoming_responses.push_back(response);
debug!("done handling response to request {:?}", request_id);
}

pub fn request(&mut self, request: Request<T, M>) {
pub fn request(&mut self, request: Request<T>) {
let Request {
mut message,
metadata,
response_tx,
destination,
} = request;

Expand All @@ -158,7 +158,7 @@ where
request_id,
PendingRequest {
expecting_ack,
metadata,
response_tx,
},
);
}
Expand Down
29 changes: 3 additions & 26 deletions netlink-proto/src/protocol/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,14 @@

use std::fmt::Debug;

use futures::channel::mpsc::UnboundedSender;
use netlink_packet_core::NetlinkMessage;

use crate::sys::SocketAddr;

#[derive(Debug)]
pub(crate) struct Request<T, M> {
pub metadata: M,
pub(crate) struct Request<T> {
pub response_tx: UnboundedSender<NetlinkMessage<T>>,
pub message: NetlinkMessage<T>,
pub destination: SocketAddr,
}

impl<T, M> From<(NetlinkMessage<T>, SocketAddr, M)> for Request<T, M>
where
T: Debug,
M: Debug,
{
fn from(parts: (NetlinkMessage<T>, SocketAddr, M)) -> Self {
Request {
message: parts.0,
destination: parts.1,
metadata: parts.2,
}
}
}

impl<T, M> From<Request<T, M>> for (NetlinkMessage<T>, SocketAddr, M)
where
T: Debug,
M: Debug,
{
fn from(req: Request<T, M>) -> (NetlinkMessage<T>, SocketAddr, M) {
(req.message, req.destination, req.metadata)
}
}