Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(socket sink): support unix datagram mode #21762

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The `socket` sink now supports the `unix_mode` configuration option that specifies the Unix socket mode to use. Valid values:

- `Stream` (default) - Stream-oriented (`SOCK_STREAM`)
- `Datagram` - Datagram-oriented (`SOCK_DGRAM`)

This option only applies when `mode = "unix"`.

authors: jpovixwm
2 changes: 0 additions & 2 deletions src/internal_events/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,12 @@ impl<E: std::fmt::Display> InternalEvent for UnixSocketError<'_, E> {
}
}

#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))]
#[derive(Debug)]
pub struct UnixSocketSendError<'a, E> {
pub(crate) error: &'a E,
pub path: &'a std::path::Path,
}

#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))]
impl<E: std::fmt::Display> InternalEvent for UnixSocketSendError<'_, E> {
fn emit(self) {
let reason = "Unix socket send error.";
Expand Down
74 changes: 60 additions & 14 deletions src/sinks/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ impl SinkConfig for SocketSinkConfig {

#[cfg(test)]
mod test {
#[cfg(unix)]
use std::path::PathBuf;
use std::{
future::ready,
net::{SocketAddr, UdpSocket},
};
#[cfg(unix)]
use std::{os::unix::net::UnixDatagram, path::PathBuf};

use futures::stream::StreamExt;
use futures_util::stream;
Expand Down Expand Up @@ -196,14 +196,42 @@ mod test {
crate::test_util::test_generate_config::<SocketSinkConfig>();
}

async fn test_udp(addr: SocketAddr) {
let receiver = UdpSocket::bind(addr).unwrap();
enum DatagramSocket {
Udp(UdpSocket),
#[cfg(unix)]
Unix(UnixDatagram),
}

enum DatagramSocketAddr {
Udp(SocketAddr),
#[cfg(unix)]
Unix(PathBuf),
}

async fn test_datagram(datagram_addr: DatagramSocketAddr) {
let receiver = match &datagram_addr {
DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
#[cfg(unix)]
DatagramSocketAddr::Unix(path) => {
DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
}
};

let config = SocketSinkConfig {
mode: Mode::Udp(UdpMode {
config: UdpSinkConfig::from_address(addr.to_string()),
encoding: JsonSerializerConfig::default().into(),
}),
mode: match &datagram_addr {
DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
config: UdpSinkConfig::from_address(addr.to_string()),
encoding: JsonSerializerConfig::default().into(),
}),
#[cfg(unix)]
DatagramSocketAddr::Unix(path) => Mode::Unix(UnixMode {
config: UnixSinkConfig::new(
path.to_path_buf(),
crate::sinks::util::service::net::UnixMode::Datagram,
),
encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
}),
},
acknowledgements: Default::default(),
};

Expand All @@ -218,9 +246,13 @@ mod test {
.expect("Running sink failed");

let mut buf = [0; 256];
let (size, _src_addr) = receiver
.recv_from(&mut buf)
.expect("Did not receive message");
let size = match &receiver {
DatagramSocket::Udp(sock) => {
sock.recv_from(&mut buf).expect("Did not receive message").0
}
#[cfg(unix)]
DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
};

let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
Expand All @@ -234,14 +266,25 @@ mod test {
async fn udp_ipv4() {
trace_init();

test_udp(next_addr()).await;
test_datagram(DatagramSocketAddr::Udp(next_addr())).await;
}

#[tokio::test]
async fn udp_ipv6() {
trace_init();

test_udp(next_addr_v6()).await;
test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await;
}

#[cfg(unix)]
#[tokio::test]
async fn unix_datagram() {
trace_init();

test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
"unix_datagram_socket_test",
)))
.await;
}

#[tokio::test]
Expand Down Expand Up @@ -292,7 +335,10 @@ mod test {

let config = SocketSinkConfig {
mode: Mode::Unix(UnixMode {
config: UnixSinkConfig::new(out_path),
config: UnixSinkConfig::new(
out_path,
crate::sinks::util::service::net::UnixMode::Stream,
),
encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
}),
acknowledgements: Default::default(),
Expand Down
106 changes: 106 additions & 0 deletions src/sinks/util/datagram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#[cfg(unix)]
use std::path::PathBuf;

use bytes::BytesMut;
use futures::{stream::BoxStream, StreamExt};
use futures_util::stream::Peekable;
use tokio::net::UdpSocket;
#[cfg(unix)]
use tokio::net::UnixDatagram;
use tokio_util::codec::Encoder;
use vector_lib::internal_event::RegisterInternalEvent;
use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle};
use vector_lib::EstimatedJsonEncodedSizeOf;

use crate::{
codecs::Transformer,
event::{Event, EventStatus, Finalizable},
internal_events::{SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError},
};

#[cfg(unix)]
use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError};

pub enum DatagramSocket {
Udp(UdpSocket),
#[cfg(unix)]
Unix(UnixDatagram, PathBuf),
}

pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>>(
input: &mut Peekable<BoxStream<'_, Event>>,
mut socket: DatagramSocket,
transformer: &Transformer,
encoder: &mut E,
bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
) {
while let Some(mut event) = input.next().await {
let byte_size = event.estimated_json_encoded_size_of();

transformer.transform(&mut event);

let finalizers = event.take_finalizers();
let mut bytes = BytesMut::new();

// Errors are handled by `Encoder`.
if encoder.encode(event, &mut bytes).is_err() {
continue;
}

match send_datagram(&mut socket, &bytes).await {
Ok(()) => {
emit!(SocketEventsSent {
mode: match socket {
DatagramSocket::Udp(_) => SocketMode::Udp,
#[cfg(unix)]
DatagramSocket::Unix(..) => SocketMode::Unix,
},
count: 1,
byte_size,
});

bytes_sent.emit(ByteSize(bytes.len()));
finalizers.update_status(EventStatus::Delivered);
}
Err(error) => {
match socket {
DatagramSocket::Udp(_) => emit!(SocketSendError {
mode: SocketMode::Udp,
error
}),
#[cfg(unix)]
DatagramSocket::Unix(_, path) => {
emit!(UnixSocketSendError {
path: path.as_path(),
error: &error
})
}
};
finalizers.update_status(EventStatus::Errored);
return;
}
}
}
}

async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> {
let sent = match socket {
DatagramSocket::Udp(udp) => udp.send(buf).await,
#[cfg(unix)]
DatagramSocket::Unix(uds, _) => uds.send(buf).await,
}?;
if sent != buf.len() {
match socket {
DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError {
data_size: buf.len(),
sent,
}),
#[cfg(unix)]
DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError {
data_size: buf.len(),
sent,
}),
}
}
Ok(())
}
3 changes: 2 additions & 1 deletion src/sinks/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod batch;
pub mod buffer;
pub mod builder;
pub mod compressor;
pub mod datagram;
pub mod encoding;
pub mod http;
pub mod metadata;
Expand All @@ -23,7 +24,7 @@ pub mod tcp;
#[cfg(any(test, feature = "test-utils"))]
pub mod test;
pub mod udp;
#[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))]
#[cfg(unix)]
pub mod unix;
pub mod uri;
pub mod zstd;
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/util/service/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
};

#[cfg(unix)]
use std::path::PathBuf;
use {crate::sinks::util::unix::UnixEither, std::path::PathBuf};

use crate::{
internal_events::{
Expand All @@ -33,7 +33,7 @@ pub use self::unix::{UnixConnectorConfig, UnixMode};
use self::tcp::TcpConnector;
use self::udp::UdpConnector;
#[cfg(unix)]
use self::unix::{UnixConnector, UnixEither};
use self::unix::UnixConnector;

use futures_util::{future::BoxFuture, FutureExt};
use snafu::{ResultExt, Snafu};
Expand Down
36 changes: 3 additions & 33 deletions src/sinks/util/service/net/unix.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
use std::{
io,
os::fd::{AsFd, BorrowedFd},
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};

use snafu::ResultExt;
use tokio::{
io::AsyncWriteExt,
net::{UnixDatagram, UnixStream},
};
use tokio::net::{UnixDatagram, UnixStream};

use vector_lib::configurable::configurable_component;

use crate::net;
use crate::{net, sinks::util::unix::UnixEither};

use super::{net_error::*, ConnectorType, NetError, NetworkConnector};

Expand Down Expand Up @@ -74,29 +67,6 @@ impl UnixConnectorConfig {
}
}

pub(super) enum UnixEither {
Datagram(UnixDatagram),
Stream(UnixStream),
}

impl UnixEither {
pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
Self::Datagram(datagram) => datagram.send(buf).await,
Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()),
}
}
}

impl AsFd for UnixEither {
fn as_fd(&self) -> BorrowedFd<'_> {
match self {
Self::Datagram(datagram) => datagram.as_fd(),
Self::Stream(stream) => stream.as_fd(),
}
}
}

#[derive(Clone)]
pub(super) struct UnixConnector {
path: PathBuf,
Expand Down
Loading
Loading