Skip to content

Commit

Permalink
Added: support for TCP in gelf-exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
BharatKJain committed Oct 23, 2024
1 parent bdf168d commit 999e924
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 9 deletions.
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ exporters:
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter v0.111.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tencentcloudlogserviceexporter v0.111.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporter v0.111.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/gelfexporter v0.111.0

processors:
- gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.111.0
Expand Down Expand Up @@ -496,3 +497,4 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/githubreceiver => ../../receiver/githubreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudmonitoringreceiver => ../../receiver/googlecloudmonitoringreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/gelfexporter => ../../exporter/gelfexporter
2 changes: 1 addition & 1 deletion exporter/gelfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
| Stability | [alpha]: logs |
| Distributions | [contrib] |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@BharatKJain](https://www.github.com/BharatKJain) |
<!-- | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fudplog%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fudplog) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fudplog%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fudplog) | -->

[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
Expand All @@ -24,6 +23,7 @@ Sends GELF logs over UDP/TCP.
| `send_chunks_with_overflow` | false | Maximum chunks allowed is 128, if the number of chunks exceeds this limit, the message will be dropped. If set to true then |
| `chunk_size` | 1420 | Default maximum chunk size is 1420 bytes. |
| `hostname`| "localhost"| hostname is the name of the host, which is added to the gelf message |
| `tcp_timeout_millis` | 2000 miliseconds | If TCP protocol is used then 2 seconds default timeout is used. Timeout must be less than 30000 milliseconds.


## Example Configuration
Expand Down
10 changes: 10 additions & 0 deletions exporter/gelfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
ChunksOverflow bool `mapstructure:"send_chunks_with_overflow,omitempty"`
ChunkSize int `mapstructure:"chunk_size,omitempty"`
Hostname string `mapstructure:"hostname,omitempty"`
TCPTimeout int `mapstructure:"tcp_timeout_millis,omitempty"`
}

// Validate checks if the exporter configuration is valid
Expand All @@ -35,9 +36,18 @@ func (cfg *Config) Validate() error {
}

if strings.ToLower(cfg.Protocol) != "udp" && strings.ToLower(cfg.Protocol) != "tcp" {
cfg.Protocol = strings.ToLower(cfg.Protocol)
return fmt.Errorf(fmt.Sprintf("Protocol must be either 'udp' or 'tcp', got %s", cfg.Protocol))
}

if cfg.Protocol == "tcp" && cfg.TCPTimeout < 0 {
cfg.TCPTimeout = defaultTCPTimeout
}else{
if cfg.TCPTimeout > 30000 {
return fmt.Errorf("TCP timeout must be less than 30000")
}
}

// Compression types can be zlib, gzip or none
if cfg.CompressionType == "" {
cfg.CompressionType = defaultCompressionType
Expand Down
80 changes: 74 additions & 6 deletions exporter/gelfexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ import (
"context"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"sync"
"syscall"
"time"

"go.opentelemetry.io/collector/exporter"
Expand All @@ -24,6 +27,7 @@ import (
)

var (
defaultTCPTimeout = 2000
defaultChunkSize = 1420
magicChunked = []byte{0x1e, 0x0f}
magicZlib = []byte{0x78}
Expand Down Expand Up @@ -83,13 +87,37 @@ func newBuffer() *bytes.Buffer {
return bytes.NewBuffer(nil)
}

func initExporter(cfg *Config, createSettings exporter.Settings) (*gelfexporter, error) {

func makeConn(protocol string, endpoint string) (net.Conn, error) {
var err error
var conn net.Conn
switch protocol {
case "udp":
udpAddr, err := net.ResolveUDPAddr(protocol, endpoint)
if err != nil {
fmt.Println("Error resolving UDP address:", err)
os.Exit(1)
}
if conn, err = net.DialUDP(protocol, nil, udpAddr); err != nil {
return nil, err
}
return conn, nil
case "tcp":
if conn, err = net.Dial(protocol, endpoint); err != nil {
return nil, err
}
return conn, nil
default:
return nil, fmt.Errorf("Could not create connection! Invalid protocol: %s", protocol)
}

if conn, err = net.Dial("udp", cfg.Endpoint); err != nil {
return nil, err
}

func initExporter(cfg *Config, createSettings exporter.Settings) (*gelfexporter, error) {

conn, err := makeConn(cfg.Protocol, cfg.Endpoint)

if err != nil {
return nil, fmt.Errorf("Error while creating connection: %w", err)
}

g := &gelfexporter{
Expand Down Expand Up @@ -180,13 +208,53 @@ func (m *GELFMessage) MarshalJSONBuf(buf *bytes.Buffer) error {
}

func (g *gelfexporter) send(payload []byte) error {
n, err := g.conn.Write(payload)
var err error
var n int
if g.conn == nil {
g.conn, err = makeConn(g.config.Protocol, g.config.Endpoint)
if err != nil {
return fmt.Errorf("Error while creating connection: %w", err)
}
}

if g.config.Protocol == "tcp" {
timeout := time.Duration(g.config.TCPTimeout) * time.Millisecond
err := g.conn.SetWriteDeadline(time.Now().Add(timeout))
if err != nil {
return fmt.Errorf("failed to set write deadline(timeout): %w", err)
}
}

n, err = g.conn.Write(payload)

if err != nil {
return err
g.logger.Error("Error while sending response", zap.Error(err))

// FOR TCP ONLY:
// Trap "broken pipe" which might happen due to stale broken connections.
// One case for this is when the upstream server was down momentarily and the connection was closed.
if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
g.logger.Debug("Gelf-exporter connection closed, re-creating connection")
g.conn, err = makeConn(g.config.Protocol, g.config.Endpoint)

if err != nil {
return fmt.Errorf("Error while re-creating connection: %w", err)
}

n, err = g.conn.Write(payload)

if err != nil {
return fmt.Errorf("Write timeout: %w", err)
}
g.logger.Debug("Re-sent message")
}
return fmt.Errorf("Error while sending response: %w", err)
}

if n != len(payload) {
return fmt.Errorf("Error while validating UDP response, (received length/payload length): (%d/%d)", n, len(payload))
}

return nil
}

Expand Down
7 changes: 5 additions & 2 deletions exporter/gelfexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
receivers:
exporters:
gelfexporter:
endpoint: "localhost:12201"
protocol: udp
Expand All @@ -7,4 +7,7 @@ receivers:
# Maximum chunks allowed is 128, if the number of chunks exceeds this limit, the message will be dropped
send_chunks_with_overflow: true
# Default maximum chunk size is 8192 bytes
chunk_size: 8192
chunk_size: 8192
# hostname is the name of the host, which is added to the gelf message
hostname: "test-host"
tcp_timeout_millis: 1000

0 comments on commit 999e924

Please sign in to comment.