Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(opentelemetry sink): new sink #21866

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft

Conversation

pront
Copy link
Contributor

@pront pront commented Nov 21, 2024

Summary

This PR introduces basic support for pushing data to OTEL.

TODO for this PR:

  • sink docs
    • explain public contract, relying on OTEL proto V1

There are numerous potential improvements:

  • gRPC support
    • another protocol variant
    • can refactor and reuse existing code
  • native conversion from Vector event to OTEL proto

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

How did you test this PR?

Vector Config

sources:
  generate_syslog:
    type: "demo_logs"
    format: "syslog"
    count: 100000
    interval: 1

transforms:
  remap_syslog:
    inputs: ["generate_syslog"]
    type: "remap"
    source: |
      structured = parse_syslog!(.message)
      .timestamp_nanos = to_unix_timestamp!(structured.timestamp, unit: "nanoseconds")
      .body = structured
      .service_name = structured.appname
      .resource_attributes.source_type = .source_type
      .resource_attributes.host.hostname = structured.hostname
      .resource_attributes.service.name = structured.appname
      .attributes.syslog.procid = structured.procid
      .attributes.syslog.facility = structured.facility
      .attributes.syslog.version = structured.version
      .severity_text = if includes(["emerg", "err", "crit", "alert"], structured.severity) {
        "ERROR"
      } else if structured.severity == "warning" {
        "WARN"
      } else if structured.severity == "debug" {
        "DEBUG"
      } else if includes(["info", "notice"], structured.severity) {
        "INFO"
      } else {
       structured.severity
      }
      .scope_name = structured.msgid
      del(.message)
      del(.timestamp)
      del(.service)
      del(.source_type)

sinks:
 emit_syslog:
   inputs: ["remap_syslog"]
   type: opentelemetry
   protocol:
     type: http
     uri: http://localhost:5318/v1/logs
     method: post
     encoding:
       codec: json
     framing:
       method: newline_delimited
     headers:
      content-type: application/json

# console:
#   type: console
#   inputs: ["remap_syslog"]
#   encoding:
#     codec: json

Run Vector:

VECTOR_LOG=debug cargo run --color=always --profile dev -- --config /Users/pavlos.rontidis/CLionProjects/vector/pront/otel/otel-sink-test.yaml

OTEL config:

receivers:
  otlp:
    protocols:
      http:
        endpoint: "0.0.0.0:5318" # from python generator

exporters:
  debug:
  otlp:
    endpoint: localhost:4317
    tls:
      insecure: true

processors:
  batch: {} # Batch processor to optimize log export

service:
  pipelines:
    logs:
      receivers: [otlp]
      processors: [batch]
      exporters: [debug]

Run OTEL collector:

 ./otelcol --config ./otel-collector-config.yaml

Sample output:

/Users/pavlos.rontidis/.cargo/bin/cargo run --color=always --profile dev -- --config /Users/pavlos.rontidis/CLionProjects/vector/pront/otel/otel-sink-test.yaml
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.33s
     Running `target/debug/vector --config /Users/pavlos.rontidis/CLionProjects/vector/pront/otel/otel-sink-test.yaml`
2024-11-21T21:08:02.826597Z DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2024-11-21T21:08:02.826938Z  INFO vector::app: Log level is enabled. level="debug"
2024-11-21T21:08:02.827248Z DEBUG vector::app: messaged="Building runtime." worker_threads=10
2024-11-21T21:08:02.831422Z  INFO vector::app: Loading configs. paths=["/Users/pavlos.rontidis/CLionProjects/vector/pront/otel/otel-sink-test.yaml"]
2024-11-21T21:08:02.838985Z DEBUG vector::config::loading: No secret placeholder found, skipping secret resolution.
2024-11-21T21:08:02.881344Z DEBUG vector::topology::builder: Building new source. component=generate_syslog
2024-11-21T21:08:02.883040Z DEBUG vector::topology::builder: Building new transform. component=remap_syslog
2024-11-21T21:08:02.919678Z DEBUG vector::topology::builder: Building new sink. component=emit_syslog
2024-11-21T21:08:02.923264Z  WARN sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}: vector::sinks::util::http: Option `headers` has been deprecated. Use `request.headers` instead.
2024-11-21T21:08:02.976490Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}: vector_core::tls::settings: Fetching system root certs.
2024-11-21T21:08:03.135270Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}: vector_core::tls::settings: Fetching system root certs.
2024-11-21T21:08:03.275055Z  INFO vector::topology::running: Running healthchecks.
2024-11-21T21:08:03.275276Z DEBUG vector::topology::running: Connecting changed/added component(s).
2024-11-21T21:08:03.275409Z DEBUG vector::topology::running: Configuring outputs for source. component=generate_syslog
2024-11-21T21:08:03.275445Z DEBUG vector::topology::running: Configuring output for component. component=generate_syslog output_id=None
2024-11-21T21:08:03.275478Z DEBUG vector::topology::running: Configuring outputs for transform. component=remap_syslog
2024-11-21T21:08:03.275507Z DEBUG vector::topology::running: Configuring output for component. component=remap_syslog output_id=None
2024-11-21T21:08:03.275533Z DEBUG vector::topology::running: Connecting inputs for transform. component=remap_syslog
2024-11-21T21:08:03.275623Z DEBUG vector::topology::running: Adding component input to fanout. component=remap_syslog fanout_id=generate_syslog
2024-11-21T21:08:03.275778Z DEBUG vector::topology::running: Connecting inputs for sink. component=emit_syslog
2024-11-21T21:08:03.275811Z DEBUG vector::topology::running: Adding component input to fanout. component=emit_syslog fanout_id=remap_syslog
2024-11-21T21:08:03.275935Z  INFO vector::topology::builder: Healthcheck passed.
2024-11-21T21:08:03.276164Z DEBUG vector::topology::running: Spawning new source. key=generate_syslog
2024-11-21T21:08:03.276844Z DEBUG vector::topology::running: Spawning new transform. key=remap_syslog
2024-11-21T21:08:03.277416Z DEBUG source{component_kind="source" component_id=generate_syslog component_type=demo_logs}: vector::topology::builder: Source starting.
2024-11-21T21:08:03.277474Z DEBUG source{component_kind="source" component_id=generate_syslog component_type=demo_logs}: vector::topology::builder: Source pump supervisor starting.
2024-11-21T21:08:03.277637Z  INFO vector: Vector has started. debug="true" version="0.43.0" arch="aarch64" revision=""
2024-11-21T21:08:03.277575Z DEBUG transform{component_kind="transform" component_id=remap_syslog component_type=remap}: vector::topology::builder: Synchronous transform starting.
2024-11-21T21:08:03.277673Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}: vector::topology::builder: Sink starting.
2024-11-21T21:08:03.278138Z  INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2024-11-21T21:08:03.279620Z DEBUG source{component_kind="source" component_id=generate_syslog component_type=demo_logs}: vector::topology::builder: Source pump starting.
2024-11-21T21:08:03.279748Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}: vector::utilization: utilization=0.9986980710327271
2024-11-21T21:08:04.286928Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=1}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://localhost:5318/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.0-custom-a57556da0 (aarch64-apple-darwin debug=full)"} body=[539 bytes]
2024-11-21T21:08:04.287893Z DEBUG hyper::client::connect::dns: resolving host="localhost"
2024-11-21T21:08:04.290689Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=1}:http: hyper::client::connect::http: connecting to 127.0.0.1:5318
2024-11-21T21:08:04.291385Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=1}:http: hyper::client::connect::http: connected to 127.0.0.1:5318
2024-11-21T21:08:04.292470Z DEBUG hyper::proto::h1::io: flushed 757 bytes
2024-11-21T21:08:04.295217Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-11-21T21:08:04.295263Z DEBUG hyper::proto::h1::conn: incoming body is content-length (21 bytes)
2024-11-21T21:08:04.295382Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-11-21T21:08:04.295523Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=1}:http: hyper::client::pool: pooling idle connection for ("http", localhost:5318)
2024-11-21T21:08:04.295706Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=1}:http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 headers={"content-type": "application/json", "date": "Thu, 21 Nov 2024 21:08:04 GMT", "content-length": "21"} body=[21 bytes]
2024-11-21T21:08:05.290103Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=2}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://localhost:5318/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.0-custom-a57556da0 (aarch64-apple-darwin debug=full)"} body=[1073 bytes]
2024-11-21T21:08:05.290279Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=2}:http: hyper::client::pool: reuse idle connection for ("http", localhost:5318)
2024-11-21T21:08:05.290494Z DEBUG hyper::proto::h1::io: flushed 1292 bytes
2024-11-21T21:08:05.291118Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-11-21T21:08:05.291131Z DEBUG hyper::proto::h1::conn: incoming body is content-length (21 bytes)
2024-11-21T21:08:05.291156Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-11-21T21:08:05.291212Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=2}:http: hyper::client::pool: pooling idle connection for ("http", localhost:5318)
2024-11-21T21:08:05.291273Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=2}:http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the "no-changelog" label to this PR.

Checklist

  • Please read our Vector contributor resources.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run dd-rust-license-tool write to regenerate the license inventory and commit the changes (if any). More details here.

References

@github-actions github-actions bot added the domain: sinks Anything related to the Vector's sinks label Nov 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: sinks Anything related to the Vector's sinks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant