Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impact of CHUNK_SIZE on Acknowledgement Guarantees for All Sources #21714

Open
linw1995 opened this issue Nov 6, 2024 · 1 comment
Open

Impact of CHUNK_SIZE on Acknowledgement Guarantees for All Sources #21714

linw1995 opened this issue Nov 6, 2024 · 1 comment
Labels
source: file Anything `file` source related type: bug A code related bug.

Comments

@linw1995
Copy link

linw1995 commented Nov 6, 2024

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

In the source_sender module, if the CHUNK_SIZE constant is larger than the buffer.max_events configured in Sink, can cause unexpected behavior. In the configuration, the sink's blackhole log output will remain at 0 until data equal to CHUNK_SIZE is consumed, rather than stopping once buffer.max_events is consumed.

Due to the existence of buffer in the source_sender, the buffer.when_full=block strategy will not function correctly. For example, with a file source, the source will read the file until the source_sender buffer is full (with a length of SOURCE_SENDER_BUFFER_SIZE = TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE), instead of respecting the length defined by the sink’s buffer.max_events.

source -> source_sender(buffered) -fanout-> sinks(buffered) 

If remove_after_secs is configured, it's possible for the file to be deleted before the data in both buffers has been fully consumed.

In addition to the impact of the source_sender implementation, the internal implementation of the file source also affects Acknowledgement Guarantees.

In the following code block, the checkpoints for file are updated in messages.map ahead of time, rather than after send_event_stream, let alone waiting for the sink acknowledgment.

vector/src/sources/file.rs

Lines 666 to 693 in c1da408

let mut messages = messages.map(move |line| {
let mut event = create_event(
line.text,
line.start_offset,
&line.filename,
&event_metadata,
log_namespace,
include_file_metric_tag,
);
if let Some(finalizer) = &finalizer {
let (batch, receiver) = BatchNotifier::new_with_receiver();
event = event.with_batch_notifier(&batch);
let entry = FinalizerEntry {
file_id: line.file_id,
offset: line.end_offset,
};
finalizer.add(entry, receiver);
} else {
checkpoints.update(line.file_id, line.end_offset);
}
event
});
tokio::spawn(async move {
match out
.send_event_stream(&mut messages)
.instrument(span.or_current())
.await

Configuration

data_dir = "/usr/local/vector/data"

[acknowledgements]
enabled = true

[sources.small_log_files]
type = "file"
include = [ "/tmp/logs/*" ]
ignore_not_found = true
remove_after_secs = 10

[sinks.console]
type = "blackhole"
inputs = ["small_log_files"]
rate = 1
print_interval_secs = 1

[sinks.console.buffer]
type = "memory"
max_events = 1
when_full = "block"

Version

0.37.0

Debug Output

No response

Example Data

No response

Additional Context

References

@linw1995 linw1995 added the type: bug A code related bug. label Nov 6, 2024
@pront pront added the source: file Anything `file` source related label Nov 6, 2024
@pront
Copy link
Contributor

pront commented Nov 6, 2024

Hi @linw1995, this looks like a bug indeed. Thank you for providing all the details 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
source: file Anything `file` source related type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

2 participants