Skip to content

Commit

Permalink
[exporter/awss3] S3 Exporter (open-telemetry#20912)
Browse files Browse the repository at this point in the history
* aws s3 exporter initial version

---------

Co-authored-by: Przemek Delewski <[email protected]>
  • Loading branch information
atoulme and Przemek Delewski authored Apr 27, 2023
1 parent e5af149 commit 6b98866
Show file tree
Hide file tree
Showing 21 changed files with 511 additions and 92 deletions.
16 changes: 16 additions & 0 deletions .chloggen/add-s3-exporter-impl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3exporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add implementation of AWS S3 exporter

# One or more tracking issues related to the change
issues: [2835]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
33 changes: 19 additions & 14 deletions exporter/awss3exporter/README.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
# AWS S3 Exporter for OpenTelemetry Collector

| Status | |
| ------------------------ |-----------------------|
| Stability | [in development] |
| Supported pipeline types | traces, logs |
| Distributions | [contrib] |
<!-- status autogenerated section -->
| Status | |
| ------------------------ |-----------|
| Stability | [alpha] |
| Supported pipeline types | logs, metrics, traces |
| Distributions | [contrib] |

[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

## Schema supported
This exporter targets to support proto/json and proto/binary format
This exporter targets to support proto/json format.

## Exporter Configuration

The following exporter configuration parameters are supported.

| Name | Description | Default |
| :--------------------- | :--------------------------------------------------------------------------------- | ------- |
| `region` | AWS region. | |
| `s3_bucket` | S3 bucket | |
| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | |
| `s3_partition` | time granularity of S3 key: hour or minute |"minute" |
| `file_prefix` | file prefix defined by user | |
| `marshaler_name` | marshaler used to produce output data otlp_json or otlp_proto | |
| Name | Description | Default |
|:---------------|:------------------------------------------------------|----------|
| `region` | AWS region. | |
| `s3_bucket` | S3 bucket | |
| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | |
| `s3_partition` | time granularity of S3 key: hour or minute | "minute" |
| `file_prefix` | file prefix defined by user | |
| `marshaler` | marshaler used to produce output data otlp_json | |

# Example Configuration

Expand Down
23 changes: 10 additions & 13 deletions exporter/awss3exporter/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 OpenTelemetry Authors
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,10 +14,6 @@

package awss3exporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter"

import (
"go.uber.org/zap"
)

// S3UploaderConfig contains aws s3 uploader related config to controls things
// like bucket, prefix, batching, connections, retries, etc.
type S3UploaderConfig struct {
Expand All @@ -28,15 +24,16 @@ type S3UploaderConfig struct {
FilePrefix string `mapstructure:"file_prefix"`
}

// Config contains the main configuration options for the awskinesis exporter
type MarshalerType string

const (
OtlpJSON MarshalerType = "otlp_json"
)

// Config contains the main configuration options for the s3 exporter
type Config struct {
S3Uploader S3UploaderConfig `mapstructure:"s3uploader"`
MarshalerName string `mapstructure:"marshaler_name"`

// ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes.
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
// exporterhelper.ResourceToTelemetrySettings `mapstructure:"resource_to_telemetry_conversion"`
MarshalerName MarshalerType `mapstructure:"marshaler"`

logger *zap.Logger
FileFormat string `mapstructure:"file_format"`
}
8 changes: 4 additions & 4 deletions exporter/awss3exporter/config_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 OpenTelemetry Authors
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,13 +29,13 @@ func TestLoadConfig(t *testing.T) {
assert.NoError(t, err)

factory := NewFactory()
factories.Exporters[typeStr] = factory
factories.Exporters["awss3"] = factory
cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "default.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

e := cfg.Exporters[component.NewID(typeStr)].(*Config)
e := cfg.Exporters[component.NewID("awss3")].(*Config)
assert.Equal(t, e,
&Config{
S3Uploader: S3UploaderConfig{
Expand All @@ -59,7 +59,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

e := cfg.Exporters[component.NewID(typeStr)].(*Config)
e := cfg.Exporters[component.NewID("awss3")].(*Config)

assert.Equal(t, e,
&Config{
Expand Down
6 changes: 3 additions & 3 deletions exporter/awss3exporter/data_writer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 OpenTelemetry Authors
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,6 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect

import "context"

type DataWriter interface {
WriteBuffer(ctx context.Context, buf []byte, config *Config, metadata string, format string) error
type dataWriter interface {
writeBuffer(ctx context.Context, buf []byte, config *Config, metadata string, format string) error
}
18 changes: 18 additions & 0 deletions exporter/awss3exporter/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate mdatagen metadata.yaml

// Package awss3exporter stores OpenTelemetry data as an AWS S3 exporter.
package awss3exporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter"
60 changes: 34 additions & 26 deletions exporter/awss3exporter/exporter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 OpenTelemetry Authors
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@ import (
"context"
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -27,56 +26,65 @@ import (
"go.uber.org/zap"
)

type S3Exporter struct {
dataWriter DataWriter
type s3Exporter struct {
config *Config
dataWriter dataWriter
logger *zap.Logger
marshaler Marshaler
marshaler marshaler
}

func NewS3Exporter(config *Config,
params exporter.CreateSettings) (*S3Exporter, error) {
func newS3Exporter(config *Config,
params exporter.CreateSettings) (*s3Exporter, error) {

if config == nil {
return nil, errors.New("s3 exporter config is nil")
}

logger := params.Logger
expConfig := config
expConfig.logger = logger

marshaler, err := NewMarshaler(expConfig.MarshalerName, logger)
m, err := NewMarshaler(config.MarshalerName, logger)
if err != nil {
return nil, errors.New("unknown marshaler")
}

s3Exporter := &S3Exporter{
dataWriter: &S3Writer{},
s3Exporter := &s3Exporter{
config: config,
dataWriter: &s3Writer{},
logger: logger,
marshaler: marshaler,
marshaler: m,
}
return s3Exporter, nil
}

func (e *S3Exporter) Capabilities() consumer.Capabilities {
func (e *s3Exporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *S3Exporter) Start(ctx context.Context, host component.Host) error {
return nil
}
func (e *s3Exporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
buf, err := e.marshaler.MarshalMetrics(md)

func (e *S3Exporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
return nil
}
if err != nil {
return err
}

func (e *S3Exporter) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
return nil
return e.dataWriter.writeBuffer(ctx, buf, e.config, "metrics", e.marshaler.format())
}

func (e *S3Exporter) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
return nil
func (e *s3Exporter) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
buf, err := e.marshaler.MarshalLogs(logs)

if err != nil {
return err
}

return e.dataWriter.writeBuffer(ctx, buf, e.config, "logs", e.marshaler.format())
}

func (e *S3Exporter) Shutdown(context.Context) error {
return nil
func (e *s3Exporter) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
buf, err := e.marshaler.MarshalTraces(traces)
if err != nil {
return err
}

return e.dataWriter.writeBuffer(ctx, buf, e.config, "traces", e.marshaler.format())
}
59 changes: 59 additions & 0 deletions exporter/awss3exporter/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package awss3exporter

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

var testLogs = []byte(`{"resourceLogs":[{"resource":{"attributes":[{"key":"_sourceCategory","value":{"stringValue":"logfile"}},{"key":"_sourceHost","value":{"stringValue":"host"}}]},"scopeLogs":[{"scope":{},"logRecords":[{"observedTimeUnixNano":"1654257420681895000","body":{"stringValue":"2022-06-03 13:57:00.62739 +0200 CEST m=+14.018296742 log entry14"},"attributes":[{"key":"log.file.path_resolved","value":{"stringValue":"logwriter/data.log"}}],"traceId":"","spanId":""}]}],"schemaUrl":"https://opentelemetry.io/schemas/1.6.1"}]}`)

type TestWriter struct {
t *testing.T
}

func (testWriter *TestWriter) writeBuffer(ctx context.Context, buf []byte, config *Config, metadata string, format string) error {
assert.Equal(testWriter.t, testLogs, buf)
return nil
}

func getTestLogs(tb testing.TB) plog.Logs {
logsMarshaler := plog.JSONUnmarshaler{}
logs, err := logsMarshaler.UnmarshalLogs(testLogs)
assert.NoError(tb, err, "Can't unmarshal testing logs data -> %s", err)
return logs
}

func getLogExporter(t *testing.T) *s3Exporter {
marshaler, _ := NewMarshaler("otlp_json", zap.NewNop())
exporter := &s3Exporter{
config: createDefaultConfig().(*Config),
dataWriter: &TestWriter{t},
logger: zap.NewNop(),
marshaler: marshaler,
}
return exporter
}

func TestLog(t *testing.T) {
logs := getTestLogs(t)
exporter := getLogExporter(t)
assert.NoError(t, exporter.ConsumeLogs(context.Background(), logs))
}
Loading

0 comments on commit 6b98866

Please sign in to comment.