Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(socket sink): support unix datagram mode #21762

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

jpovixwm
Copy link

@jpovixwm jpovixwm commented Nov 11, 2024

Summary

Adds a unix_mode option to the socket sink.
The implementation is somewhat based on how the statsd sink handles this, but not to the full extent, as that felt like too big of a refactor for me to deal with.

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

How did you test this PR?

make test on x86_64 Ubuntu 22.04 running within WSL, and then

cargo test --lib --no-default-features --features=sinks-socket --target=x86_64-pc-windows-gnu

to ensure I didn't forget any of the #[cfg(unix)] attributes.
Other than this, I also performed some basic tests with a trivial UDS listener implemented in Python.

sources:
  dummy_logs:
    type: "demo_logs"
    format: "syslog"
    count: 3

sinks:
  emit_console:
    inputs:
      - "dummy_logs"
    type: "console"
    encoding:
      codec: "text"
  emit_syslog:
    inputs:
      - "dummy_logs"
    type: "socket"
    mode: "unix"
    # address: "127.0.0.1:5006"
    unix_mode: "Datagram"
    path: "/tmp/test_socket"
    encoding:
      codec: "text"
    framing:
      method: "bytes"

Listener:
nc -lkuU /tmp/test_socket (from package netcat-openbsd)

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the "no-changelog" label to this PR.

Checklist

  • Please read our Vector contributor resources.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run dd-rust-license-tool write to regenerate the license inventory and commit the changes (if any). More details here.

References

@jpovixwm jpovixwm requested review from a team as code owners November 11, 2024 18:18
@github-actions github-actions bot added domain: sinks Anything related to the Vector's sinks domain: external docs Anything related to Vector's external, public documentation labels Nov 11, 2024
Copy link
Contributor

@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/ci-run-component-features

Copy link
Contributor

@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jpovixwm, I did a quick pass. Left some suggestions.

src/sinks/util/unix.rs Outdated Show resolved Hide resolved
src/sinks/util/unix.rs Outdated Show resolved Hide resolved
src/sinks/util/unix.rs Outdated Show resolved Hide resolved
src/sinks/util/unix.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jpovixwm!

@jpovixwm
Copy link
Author

@pront I'm not sure how to best resolve the build errors.
The easy part is to un-gate UnixSocketSendError in src/internal_events/unix.rs, similar to how UnixSendIncompleteError is already not gated, as the impact of this should be minimal.

But then we can either:

  1. Trivially relax the gate
    #[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))]
    pub mod unix;
    so that it becomes just #[cfg(unix)], which I guess will make the binary a bit bigger than necessary for builds without either of those sinks enabled.
  2. Move the UnixEither enum back where it came from, and make it public, so that it can be used in src/sinks/util/unix.rs
  3. Or "Find & replace" all occurrences of #[cfg(unix)] (12 hits) with #[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))] in src/sinks/util/service/net/mod.rs, which is kind of ugly, but does appear to work.

@pront
Copy link
Contributor

pront commented Nov 14, 2024

@pront I'm not sure how to best resolve the build errors. The easy part is to un-gate UnixSocketSendError in src/internal_events/unix.rs, similar to how UnixSendIncompleteError is already not gated, as the impact of this should be minimal.

But then we can either:

  1. Trivially relax the gate
    #[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))]
    pub mod unix;

    so that it becomes just #[cfg(unix)], which I guess will make the binary a bit bigger than necessary for builds without either of those sinks enabled.
  2. Move the UnixEither enum back where it came from, and make it public, so that it can be used in src/sinks/util/unix.rs
  3. Or "Find & replace" all occurrences of #[cfg(unix)] (12 hits) with #[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))] in src/sinks/util/service/net/mod.rs, which is kind of ugly, but does appear to work.

@jpovixwm I am OK with (1)

@jpovixwm
Copy link
Author

Verified locally with

cargo test --lib --no-default-features --features=sinks-amqp
cargo test
cargo test --lib --no-default-features --features=sinks-socket --target=x86_64-pc-windows-gnu

Hope it works this time🤞

Copy link
Contributor

@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/ci-run-component-features

@pront
Copy link
Contributor

pront commented Nov 15, 2024

/ci-run-component-features

Turns out that the above workflow doesn't run on fork branches.

I manually created: https://github.com/vectordotdev/vector/actions/runs/11860591369
If it passes, we can go ahead and merge this PR.

@pront pront enabled auto-merge November 15, 2024 21:48
@pront pront added this pull request to the merge queue Nov 15, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Nov 15, 2024
@jpovixwm
Copy link
Author

I'm unable to reproduce the unit test failure on my machine, which makes it more difficult for me to fix it.
Here's a potential modification that still passes locally for me:

let receiver = std::os::unix::net::UnixDatagram::bind(out_path.clone()).unwrap();

let handle = tokio::task::spawn_blocking(move || {
    let mut output_lines = Vec::<String>::with_capacity(num_lines);

    for _ in 0..num_lines {
        let mut buf = [0; 100];
        let (size, _) = receiver
            .recv_from(&mut buf)
            .expect("Did not receive message");
        let line = String::from_utf8_lossy(&buf[..size]).to_string();
        output_lines.push(line);
    }

    output_lines
});

Tokio's UnixDatagram is replaced with the stdlib one, so that we can do a blocking receive (as spawn_blocking doesn't allow async functions), in the hopes that the receiver will be able to keep up with the sender if it gets to run in its own thread.
Should I commit this?

@pront
Copy link
Contributor

pront commented Nov 18, 2024

Let me check this out locally and run the failing check.

@pront
Copy link
Contributor

pront commented Nov 18, 2024

Edited: Hmm, basic_unix_sink hangs on jpovixwm:feature/5269-support-unix-datagram-mode-in-socket-sink on my macOS machine. I don't have time to dive into right now, but this PR shouldn't affect existing behavior at all.

@jpovixwm
Copy link
Author

@pront can you tell me the invocation that fails for you locally?

@pront
Copy link
Contributor

pront commented Nov 18, 2024

@pront can you tell me the invocation that fails for you locally?

You can do:

# from the repo root
cd src/sinks/util 
cargo test

I did:

# ide
cargo test --color=always --package vector --lib sinks::util::unix::tests::basic_unix_datagram_sink --no-fail-fast --config env.RUSTC_BOOTSTRAP=\"1\" -- --format=json --exact -Z unstable-options --show-output

@jpovixwm
Copy link
Author

You can do:

# from the repo root
cd src/sinks/util 
cargo test
user@hostname:~/vector/src/sinks/util$ git status && git rev-parse HEAD
On branch feature/5269-support-unix-datagram-mode-in-socket-sink
Your branch is up to date with 'remotes/jpovixwm/feature/5269-support-unix-datagram-mode-in-socket-sink'.

nothing to commit, working tree clean
543d33f393a294342fda20beca8040020840d4ca
user@hostname:~/vector/src/sinks/util$ cargo test
    Finished `test` profile [unoptimized + debuginfo] target(s) in 2.21s
     Running unittests src/lib.rs (/home/user/vector/target/debug/deps/vector-8dfd964da1bd40f7)

running 1638 tests
test api::schema::components::sink::tests::sort_component_id_asc ... ok
[...]
test transforms::log_to_metric::tests::parse_failure ... ok

test result: ok. 1635 passed; 0 failed; 3 ignored; 0 measured; 0 filtered out; finished in 19.14s

     Running tests/e2e/mod.rs (/home/user/vector/target/debug/deps/e2e-6fdf1afbfae3c336)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

     Running tests/integration/lib.rs (/home/user/vector/target/debug/deps/integration-fc9efbfa10dceb61)

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

   Doc-tests vector

running 2 tests
test src/expiring_hash_map.rs - expiring_hash_map::ExpiringHashMap<K,V>::next_expired (line 141) ... ignored
test src/expiring_hash_map.rs - expiring_hash_map::ExpiringHashMap<K,V>::next_expired (line 158) ... ok

test result: ok. 1 passed; 0 failed; 1 ignored; 0 measured; 0 filtered out; finished in 42.10s

I did:

# ide
cargo test --color=always --package vector --lib sinks::util::unix::tests::basic_unix_datagram_sink --no-fail-fast --config env.RUSTC_BOOTSTRAP=\"1\" -- --format=json --exact -Z unstable-options --show-output
user@hostname:~/vector/src/sinks/util$ cargo test --color=always --package vector --lib sinks::util::unix::tests::basic_unix_datagram_sink --no-fail-fast --config env.RUSTC_BOOTSTRAP=\"1\" -- --format=json --exact -Z unstable-options --show-output
    Finished `test` profile [unoptimized + debuginfo] target(s) in 0.42s
     Running unittests src/lib.rs (/home/user/vector/target/debug/deps/vector-8dfd964da1bd40f7)
{ "type": "suite", "event": "started", "test_count": 1 }
{ "type": "test", "event": "started", "name": "sinks::util::unix::tests::basic_unix_datagram_sink" }
{ "type": "test", "name": "sinks::util::unix::tests::basic_unix_datagram_sink", "event": "ok" }
{ "type": "suite", "event": "ok", "passed": 1, "failed": 0, "ignored": 0, "measured": 0, "filtered_out": 1637, "exec_time": 0.056496991 }

"Works on my machine". I'm afraid there's not much I can do on my end. I guess I could try it with a proper Linux VM if I can find the time.
FYI:

user@hostname:~/vector$ uname -r
5.15.153.1-microsoft-standard-WSL2
user@hostname:~/vector$ lsb_release -d
Description:    Ubuntu 22.04.5 LTS

So maybe my setup is a bit exotic.

@pront
Copy link
Contributor

pront commented Nov 18, 2024

Your setup looks fine to me. I can devote some of my time later this week since I have a macOS machine here.

@pront
Copy link
Contributor

pront commented Nov 18, 2024

Can you share the Vector config you used to test this end to end? I think we need to tweak the UX a bit. Currently it only accepts unix_mode: Datagram, note that the first letter needs to be capitalized.

I have:

sources:
  source0:
    type: demo_logs
    format: json

sinks:
  console:
    type: socket
    inputs: ["source0"]
    address: "unix:///tmp/uds_datagram_socket"
    mode: unix
    unix_mode: Datagram
    path: "/tmp/uds_datagram_socket"
    encoding:
      codec: json

and a listener:

import socket
import os

socket_path = "/tmp/uds_datagram_socket"
if os.path.exists(socket_path):
    os.remove(socket_path)

server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
server_socket.bind(socket_path)

print(f"Listening on Unix Domain Socket: {socket_path}")
try:
    while True:
        message, client_address = server_socket.recvfrom(4096)  # 4096 bytes buffer size
        print(f"Received message: {message.decode('utf-8')} from {client_address}")

        # server_socket.sendto(b"Response", client_address)

except KeyboardInterrupt:
    print("\nServer interrupted, shutting down.")
finally:
    server_socket.close()
    os.remove(socket_path)
    print(f"Socket {socket_path} closed.")

and the behavior looks good.

@pront
Copy link
Contributor

pront commented Nov 18, 2024

Tokio's UnixDatagram is replaced with the stdlib one, so that we can do a blocking receive (as spawn_blocking doesn't allow async functions), in the hopes that the receiver will be able to keep up with the sender if it gets to run in its own thread.

FYI, I tried this locally and it passed. However, I didn't dig into it to understand the root cause of this.

@jpovixwm
Copy link
Author

Can you share the Vector config you used to test this end to end?

Sure. It's nothing special:

sources:
  dummy_logs:
    type: "demo_logs"
    format: "syslog"
    count: 3

sinks:
  emit_console:
    inputs:
      - "dummy_logs"
    type: "console"
    encoding:
      codec: "text"
  emit_syslog:
    inputs:
      - "dummy_logs"
    type: "socket"
    mode: "unix"
    # address: "127.0.0.1:5006"
    unix_mode: "Datagram"
    path: "/tmp/test_socket"
    encoding:
      codec: "text"
    framing:
      method: "bytes"

Listener:
nc -lkuU /tmp/test_socket (from package netcat-openbsd)

Currently it only accepts unix_mode: Datagram

It also accepts Stream for me. Unless you meant that it only accepted the capitalized parameter and not datagram, in which case I can confirm that's indeed the case. But this configurable item is the same one that's used in the statsd sink, so I'd assume the issue also exists there (and was present before my PR). So I'm inclined to say it's out of scope for this PR.

@pront
Copy link
Contributor

pront commented Nov 19, 2024

Thanks for sharing, adding these details to the PR test plan. I also did a very similar test on macOS and it worked well.

It also accepts Stream for me. Unless you meant that it only accepted the capitalized parameter and not datagram, in which case I can confirm that's indeed the case.

Yes, exactly.

But this configurable item is the same one that's used in the statsd sink, so I'd assume the issue also exists there (and was present before my PR). So I'm inclined to say it's out of scope for this PR.

It's just a bit unusual from a config UX perspective. I agree this is out of scope for this PR.


So the only open question is the difference in behavior for std::os::unix::net::UnixDatagram vs tokio::net::UnixDatagram. I can confirm this works on macOS.

@jpovixwm
Copy link
Author

If it fixes the failing unit test in a reliable way, I'll push it tomorrow (or actually later today, depending on the time zone).
As to why that works, I can only hypothesize that something somewhere is not yielding enough, resulting in an imbalance in the volume of writes and reads being done to that UDS.
Then Linux maybe just blocks on further writes if the buffer is already full, allowing for the consumer to have its turn, whereas macOS returns an error, causing the test to fail.
Using spawn_blocking creates a dedicated OS-level thread that can stay in that loop and continuously read from the socket, avoiding the issue.
Though if that turns out to be flaky as well, maybe it would help to use a oneshot channel to signal that the receiver has actually reached the for loop, and have the unit test await that. To make sure the receiver is really ready before starting to push data into the socket.

@pront
Copy link
Contributor

pront commented Nov 19, 2024

Hope this helps:

cargo nextest run --retries 10000 --test-threads 12 -p vector
.
.
.
     Summary [ 134.802s] 1621 tests run: 1621 passed (1 flaky), 3 skipped
FLAKY 2/10001 [   0.348s] vector sinks::util::unix::tests::basic_unix_datagram_sink

Which means the test is less flakey, but still flakey.
I think it's worth pursuing the channel synchronization approach.

@jpovixwm
Copy link
Author

I'd also say that the buffer should be larger than 100 bytes (even just 101 bytes should be alright) so that in the future it can catch a potential regression where some additional garbage data is sent to the socket after the expected payload. For example, if you replace the bytes framer with a newline delimited one, there will be a newline character after the first 100 expected bytes.

@jpovixwm
Copy link
Author

It's not flaky for me, so I'm essentially fixing it blind, but still:

user@hostname:~/vector$ git status && git rev-parse HEAD
On branch feature/5269-support-unix-datagram-mode-in-socket-sink
Your branch is ahead of 'remotes/jpovixwm/feature/5269-support-unix-datagram-mode-in-socket-sink' by 1 commit.
  (use "git push" to publish your local commits)

nothing to commit, working tree clean
09dd31b6da377cffbaaabf407c8f036e3881ee5a
user@hostname:~/vector$ rust-flaker vector basic_unix_datagram_sink
    Finished `test` profile [unoptimized + debuginfo] target(s) in 0.39s
Found test binary: target/debug/deps/vector-390aa0364275b128
Running test once...

running 1 test
test sinks::util::unix::tests::basic_unix_datagram_sink ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 1637 filtered out; finished in 0.09s

Running test repeatedly...
1 runs so far, 0 failures (100.00% pass rate). 153.486541ms avg, 153.486541ms max, 153.486541ms min
[...]
10201 runs so far, 0 failures (100.00% pass rate). 145.68302ms avg, 227.653805ms max, 114.913275ms min

FYI: rust-flaker

@pront
Copy link
Contributor

pront commented Nov 19, 2024

TRY 1 TRMNTG [>120.000s] vector sinks::util::unix::tests::basic_unix_datagram_sink
   TRY 1 TMT [ 120.014s] vector sinks::util::unix::tests::basic_unix_datagram_sink

--- TRY 1 STDOUT:        vector sinks::util::unix::tests::basic_unix_datagram_sink ---

running 1 test
2024-11-19T21:20:40.988802Z ERROR vector::internal_events::unix: Unix socket send error. error=No buffer space available (os error 55) path="/var/folders/b4/11qq92411398933v4925cq380000gn/T/.tmp1sxztf/unix_datagram_test" error_type="writer_failed" stage="sending" internal_log_rate_limit=true
2024-11-19T21:20:40.988903Z ERROR vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=1 reason="Unix socket send error." internal_log_rate_limit=true
test sinks::util::unix::tests::basic_unix_datagram_sink has been running for over 60 seconds

   RETRY 2/4 [         ] vector sinks::util::unix::tests::basic_unix_datagram_sink
  TRY 2 PASS [   0.076s] vector sinks::util::unix::tests::basic_unix_datagram_sink
------------
     Summary [ 135.078s] 1621 tests run: 1621 passed (1 flaky), 3 skipped
   FLAKY 2/4 [   0.076s] vector sinks::util::unix::tests::basic_unix_datagram_sink

Sorry, this is still flakey and not ready to be merged. Which is a shame, since this is a great PR.

There is an interesting trace if you are motivated to investigate:

2024-11-19T21:20:40.988802Z ERROR vector::internal_events::unix: Unix socket send error. error=No buffer space available (os error 55) path="/var/folders/b4/11qq92411398933v4925cq380000gn/T/.tmp1sxztf/unix_datagram_test" error_type="writer_failed" stage="sending" internal_log_rate_limit=true

Otherwise, we can leave this open for me or someone else who has access to a macOS machine to pick it up in the future since it's impossible to iterate on it without one.

@jpovixwm
Copy link
Author

It's what I suspected earlier: joshtriplett/io-mux#7
The send semantics are just different between Linux and other non-Linux unices. Linux blocks when the queue is full, but BSD and macOS return an error.
Considering vector's reliability, I think the proper thing to do would be to change the sink implementation itself (rather than tweaking the unit test) so that it can handle this case internally and buffer the events instead of outright dropping them. But I have no idea how to make these send calls block on non-Linux unices (and judging by the referenced issue, I'm not alone).
So maybe the sink could catch this particular error and infinitely retry sending the event with exponential backoff. Each successful send would then reset the backoff timeout.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: external docs Anything related to Vector's external, public documentation domain: sinks Anything related to the Vector's sinks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for UNIX datagram mode to socket sink
3 participants