Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.17] Kafka output panics unconditionally at startup #41823

Closed
cmacknz opened this issue Nov 28, 2024 · 5 comments · Fixed by #41824
Closed

[8.17] Kafka output panics unconditionally at startup #41823

cmacknz opened this issue Nov 28, 2024 · 5 comments · Fixed by #41824
Assignees
Labels
Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Comments

@cmacknz
Copy link
Member

cmacknz commented Nov 28, 2024

Originally discovered in the diagnostics in elastic/elastic-agent#6049 (comment)

Detailed logs
{
  "log.level": "error",
  "@timestamp": "2024-11-26T11:38:21.079Z",
  "message": "panic: runtime error: invalid memory address or nil pointer dereference",
  "component": {
    "binary": "metricbeat",
    "dataset": "elastic_agent.metricbeat",
    "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b",
    "type": "system/metrics"
  },
  "log": {
    "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b"
  },
  "ecs.version": "1.6.0"
}
{
  "log.level": "error",
  "@timestamp": "2024-11-26T11:38:21.079Z",
  "message": "[signal 0xc0000005 code=0x0 addr=0x30 pc=0x2cec014]",
  "component": {
    "binary": "metricbeat",
    "dataset": "elastic_agent.metricbeat",
    "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b",
    "type": "system/metrics"
  },
  "log": {
    "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b"
  },
  "ecs.version": "1.6.0"
}
{
  "log.level": "error",
  "@timestamp": "2024-11-26T11:38:21.079Z",
  "message": "goroutine 2168 [running]:",
  "component": {
    "binary": "metricbeat",
    "dataset": "elastic_agent.metricbeat",
    "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b",
    "type": "system/metrics"
  },
  "log": {
    "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b"
  },
  "ecs.version": "1.6.0"
}
{
  "log.level": "error",
  "@timestamp": "2024-11-26T11:38:21.079Z",
  "message": "github.com/elastic/beats/v7/libbeat/outputs/kafka.(*client).Publish(0xc001355008, {0xc002db7f70?, 0x0?}, {0x9271540, 0xc003596d40})",
  "component": {
    "binary": "metricbeat",
    "dataset": "elastic_agent.metricbeat",
    "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b",
    "type": "system/metrics"
  },
  "log": {
    "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b"
  },
  "ecs.version": "1.6.0"
}
{
  "log.level": "error",
  "@timestamp": "2024-11-26T11:38:21.079Z",
  "message": "github.com/elastic/beats/v7/libbeat/outputs/kafka/client.go:167 +0xf4",
  "component": {
    "binary": "metricbeat",
    "dataset": "elastic_agent.metricbeat",
    "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b",
    "type": "system/metrics"
  },
  "log": {
    "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b"
  },
  "ecs.version": "1.6.0"
}
{
  "log.level": "error",
  "@timestamp": "2024-11-26T11:38:21.079Z",
  "message": "github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*clientWorker).run(0xc0034c80e0, {0x9257bc8, 0xc001c5ad70})",
  "component": {
    "binary": "metricbeat",
    "dataset": "elastic_agent.metricbeat",
    "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b",
    "type": "system/metrics"
  },
  "log": {
    "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b"
  },
  "ecs.version": "1.6.0"
}
{
  "log.level": "error",
  "@timestamp": "2024-11-26T11:38:21.079Z",
  "message": "github.com/elastic/beats/v7/libbeat/publisher/pipeline/client_worker.go:101 +0xc6",
  "component": {
    "binary": "metricbeat",
    "dataset": "elastic_agent.metricbeat",
    "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b",
    "type": "system/metrics"
  },
  "log": {
    "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b"
  },
  "ecs.version": "1.6.0"
}
{
  "log.level": "error",
  "@timestamp": "2024-11-26T11:38:21.079Z",
  "message": "created by github.com/elastic/beats/v7/libbeat/publisher/pipeline.makeClientWorker in goroutine 1915",
  "component": {
    "binary": "metricbeat",
    "dataset": "elastic_agent.metricbeat",
    "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b",
    "type": "system/metrics"
  },
  "log": {
    "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b"
  },
  "ecs.version": "1.6.0"
}
{
  "log.level": "error",
  "@timestamp": "2024-11-26T11:38:21.079Z",
  "message": "github.com/elastic/beats/v7/libbeat/publisher/pipeline/client_worker.go:75 +0x1f0",
  "component": {
    "binary": "metricbeat",
    "dataset": "elastic_agent.metricbeat",
    "id": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b",
    "type": "system/metrics"
  },
  "log": {
    "source": "system/metrics-222f7bb5-918e-4247-87cc-d93c0f1d496b"
  },
  "ecs.version": "1.6.0"
}
@cmacknz cmacknz added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Nov 28, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@cmacknz
Copy link
Member Author

cmacknz commented Nov 28, 2024

The panic line is:

ch := c.producer.Input()

This must mean producer is nil. producer can be nil if Connect is not called before Publish

func (c *client) Connect() error {
c.mux.Lock()
defer c.mux.Unlock()
c.log.Debugf("connect: %v", c.hosts)
// try to connect
producer, err := sarama.NewAsyncProducer(c.hosts, &c.config)
if err != nil {
c.log.Errorf("Kafka connect fails with: %+v", err)
return err
}
c.producer = producer

I also see Connect and Close are using a mutex to manipulate producer but Publish is not, so perhaps this is just a race condition.

Also it seems like the Kafka output isn't a NetworkClient output which means it doesn't use networkClientWorker which is guaranteed to call Connect before Publish:

func (w *netClientWorker) run(ctx context.Context) {

It's not clear where the Connect call is happening right now for the Kafka output which is interesting, perhaps part of the problem.

@cmacknz
Copy link
Member Author

cmacknz commented Nov 28, 2024

Reproduces shorlty after startup with the following minimal configuration:

metricbeat.modules:
  - module: system
    metricsets:
      - cpu
    enabled: true
    period: 1s

output.kafka:
  hosts:
      - localhost:9092
  topic: test

@cmacknz cmacknz self-assigned this Nov 28, 2024
@cmacknz
Copy link
Member Author

cmacknz commented Nov 28, 2024

Reverting #40794 fixes it.

@cmacknz cmacknz changed the title Kafka output can panic [8.17] Kafka output panics unconditionally at startup Nov 28, 2024
@cmacknz
Copy link
Member Author

cmacknz commented Nov 28, 2024

// Connectable is optionally implemented by clients that might be able to close
// and reconnect dynamically.
type Connectable interface {
// Connect establishes a connection to the clients sink.
// The connection attempt shall report an error if no connection could been
// established within the given time interval. A timeout value of 0 == wait
// forever.
Connect(context.Context) error
}

The Connectable interface was changed to include a context.Context argument, causing all clients with a Connect() that weren't updated to have a Context(context.Context) method instead to fail the NetworkClient interface check in

if nc, ok := client.(outputs.NetworkClient); ok {
c = &netClientWorker{
worker: w,
client: nc,
logger: logger,
tracer: tracer,
}

This bypasses the initial call to Connect() that was there previously at

err := w.client.Connect(ctx)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants