Skip to content

Commit

Permalink
remove time_offset and fix other issues
Browse files Browse the repository at this point in the history
  • Loading branch information
hgaol committed Dec 11, 2024
1 parent ea737f7 commit 6176537
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 149 deletions.
2 changes: 1 addition & 1 deletion .chloggen/36650.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: azureeventhubreceiver
Expand Down
48 changes: 15 additions & 33 deletions pkg/translator/azure/resourcelogs_to_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ const (
azureTenantID = "azure.tenant.id"
)

var (
errMissingTimestamp = errors.New("missing timestamp")
errNotSupportedTimeFormat = errors.New("not supported time format")
)

const ISO8601 = "iso8601"
var errMissingTimestamp = errors.New("missing timestamp")

// azureRecords represents an array of Azure log records
// as exported via an Azure Event Hub
Expand Down Expand Up @@ -80,7 +75,6 @@ type ResourceLogsUnmarshaler struct {
Version string
Logger *zap.Logger
TimeFormat []string
TimeOffset time.Duration
}

func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {
Expand Down Expand Up @@ -113,7 +107,7 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {

for i := 0; i < len(logs); i++ {
log := logs[i]
nanos, err := getTimestamp(log, r.TimeFormat, r.TimeOffset)
nanos, err := getTimestamp(log, r.TimeFormat)
if err != nil {
r.Logger.Warn("Unable to convert timestamp from log", zap.String("timestamp", log.Time))
continue
Expand All @@ -137,11 +131,11 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {
return l, nil
}

func getTimestamp(record azureLogRecord, format []string, offset time.Duration) (pcommon.Timestamp, error) {
func getTimestamp(record azureLogRecord, format []string) (pcommon.Timestamp, error) {
if record.Time != "" {
return asTimestamp(record.Time, format, offset)
return asTimestamp(record.Time, format)
} else if record.Timestamp != "" {
return asTimestamp(record.Timestamp, format, offset)
return asTimestamp(record.Timestamp, format)
}

return 0, errMissingTimestamp
Expand All @@ -150,33 +144,21 @@ func getTimestamp(record azureLogRecord, format []string, offset time.Duration)
// asTimestamp will parse an ISO8601 string into an OpenTelemetry
// nanosecond timestamp. If the string cannot be parsed, it will
// return zero and the error.
func asTimestamp(s string, format []string, offset time.Duration) (pcommon.Timestamp, error) {
func asTimestamp(s string, formats []string) (pcommon.Timestamp, error) {
var err error
var t time.Time
if format != nil {
for _, v := range format {
if v == ISO8601 {
t, err = iso8601.ParseString(s)
} else {
t, err = time.Parse(v, s)
}
if err == nil {
break
}
// Try parsing with provided formats first
for _, format := range formats {
if t, err = time.Parse(format, s); err == nil {
return pcommon.Timestamp(t.UnixNano()), nil
}
if t == (time.Time{}) {
err = errNotSupportedTimeFormat
}
} else {
t, err = iso8601.ParseString(s)
}
if err != nil {
return 0, err
}

timestamp := t.Add(offset * time.Hour).UnixNano()

return pcommon.Timestamp(timestamp), nil
// Fallback to ISO 8601 parsing if no format matches
if t, err = iso8601.ParseString(s); err == nil {
return pcommon.Timestamp(t.UnixNano()), nil
}
return 0, err
}

// asSeverity converts the Azure log level to equivalent
Expand Down
33 changes: 25 additions & 8 deletions pkg/translator/azure/resourcelogs_to_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var testBuildInfo = component.BuildInfo{
var minimumLogRecord = func() plog.LogRecord {
lr := plog.NewLogs().ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

ts, _ := asTimestamp("2022-11-11T04:48:27.6767145Z", nil, 0)
ts, _ := asTimestamp("2022-11-11T04:48:27.6767145Z", nil)
lr.SetTimestamp(ts)
lr.Attributes().PutStr(azureOperationName, "SecretGet")
lr.Attributes().PutStr(azureCategory, "AuditEvent")
Expand All @@ -38,7 +38,7 @@ var minimumLogRecord = func() plog.LogRecord {
var maximumLogRecord1 = func() plog.LogRecord {
lr := plog.NewLogs().ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

ts, _ := asTimestamp("2022-11-11T04:48:27.6767145Z", nil, 0)
ts, _ := asTimestamp("2022-11-11T04:48:27.6767145Z", nil)
lr.SetTimestamp(ts)
lr.SetSeverityNumber(plog.SeverityNumberWarn)
lr.SetSeverityText("Warning")
Expand Down Expand Up @@ -72,7 +72,7 @@ var maximumLogRecord2 = func() []plog.LogRecord {
lr := sl.LogRecords().AppendEmpty()
lr2 := sl.LogRecords().AppendEmpty()

ts, _ := asTimestamp("2022-11-11T04:48:29.6767145Z", nil, 0)
ts, _ := asTimestamp("2022-11-11T04:48:29.6767145Z", nil)
lr.SetTimestamp(ts)
lr.SetSeverityNumber(plog.SeverityNumberWarn)
lr.SetSeverityText("Warning")
Expand All @@ -98,7 +98,7 @@ var maximumLogRecord2 = func() []plog.LogRecord {
m.PutDouble("float", 41.3)
m.PutBool("bool", true)

ts, _ = asTimestamp("2022-11-11T04:48:31.6767145Z", nil, 0)
ts, _ = asTimestamp("2022-11-11T04:48:31.6767145Z", nil)
lr2.SetTimestamp(ts)
lr2.SetSeverityNumber(plog.SeverityNumberWarn)
lr2.SetSeverityText("Warning")
Expand Down Expand Up @@ -131,7 +131,7 @@ var maximumLogRecord2 = func() []plog.LogRecord {
var badLevelLogRecord = func() plog.LogRecord {
lr := plog.NewLogs().ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

ts, _ := asTimestamp("2023-10-26T14:22:43.3416357Z", nil, 0)
ts, _ := asTimestamp("2023-10-26T14:22:43.3416357Z", nil)
lr.SetTimestamp(ts)
lr.SetSeverityNumber(plog.SeverityNumberTrace4)
lr.SetSeverityText("4")
Expand Down Expand Up @@ -171,7 +171,7 @@ var badLevelLogRecord = func() plog.LogRecord {
var badTimeLogRecord = func() plog.LogRecord {
lr := plog.NewLogs().ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

ts, _ := asTimestamp("2021-10-14T22:17:11+00:00", nil, 0)
ts, _ := asTimestamp("2021-10-14T22:17:11+00:00", nil)
lr.SetTimestamp(ts)

lr.Attributes().PutStr(azureOperationName, "ApplicationGatewayAccess")
Expand Down Expand Up @@ -213,12 +213,29 @@ var badTimeLogRecord = func() plog.LogRecord {

func TestAsTimestamp(t *testing.T) {
timestamp := "2022-11-11T04:48:27.6767145Z"
nanos, err := asTimestamp(timestamp, nil, 0)
nanos, err := asTimestamp(timestamp, nil)
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

timestamp = "11/20/2024 13:57:18"
nanos, err = asTimestamp(timestamp, []string{"01/02/2006 15:04:05"})
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

// time_format set, but fallback to iso8601 and succeeded to parse
timestamp = "2022-11-11T04:48:27.6767145Z"
nanos, err = asTimestamp(timestamp, []string{"01/02/2006 15:04:05"})
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

// time_format set, but all failed to parse
timestamp = "11/20/2024 13:57:18"
nanos, err = asTimestamp(timestamp, []string{"2006-01-02 15:04:05"})
assert.Error(t, err)
assert.Equal(t, pcommon.Timestamp(0), nanos)

timestamp = "invalid-time"
nanos, err = asTimestamp(timestamp, nil, 0)
nanos, err = asTimestamp(timestamp, nil)
assert.Error(t, err)
assert.Equal(t, pcommon.Timestamp(0), nanos)
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/translator/azure/resources_to_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"encoding/hex"
"net/url"
"time"

jsoniter "github.com/json-iterator/go"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -66,7 +65,6 @@ type TracesUnmarshaler struct {
Version string
Logger *zap.Logger
TimeFormat []string
TimeOffset time.Duration
}

func (r TracesUnmarshaler) UnmarshalTraces(buf []byte) (ptrace.Traces, error) {
Expand Down Expand Up @@ -98,7 +96,7 @@ func (r TracesUnmarshaler) UnmarshalTraces(buf []byte) (ptrace.Traces, error) {

resource.Attributes().PutStr("service.name", azureTrace.AppRoleName)

nanos, err := asTimestamp(azureTrace.Time, r.TimeFormat, r.TimeOffset)
nanos, err := asTimestamp(azureTrace.Time, r.TimeFormat)
if err != nil {
r.Logger.Warn("Invalid Timestamp", zap.String("time", azureTrace.Time))
continue
Expand Down
48 changes: 15 additions & 33 deletions pkg/translator/azurelogs/resourcelogs_to_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ const (
azureTenantID = "tenant.id"
)

var (
errMissingTimestamp = errors.New("missing timestamp")
errNotSupportedTimeFormat = errors.New("not supported time format")
)

const ISO8601 = "iso8601" // azureRecords represents an array of Azure log records
var errMissingTimestamp = errors.New("missing timestamp")

// as exported via an Azure Event Hub
type azureRecords struct {
Expand Down Expand Up @@ -84,7 +79,6 @@ type ResourceLogsUnmarshaler struct {
Version string
Logger *zap.Logger
TimeFormat []string
TimeOffset time.Duration
}

func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {
Expand Down Expand Up @@ -116,7 +110,7 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {

for i := 0; i < len(logs); i++ {
log := logs[i]
nanos, err := getTimestamp(log, r.TimeFormat, r.TimeOffset)
nanos, err := getTimestamp(log, r.TimeFormat)
if err != nil {
r.Logger.Warn("Unable to convert timestamp from log", zap.String("timestamp", log.Time))
continue
Expand Down Expand Up @@ -144,11 +138,11 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) {
return l, nil
}

func getTimestamp(record azureLogRecord, format []string, offset time.Duration) (pcommon.Timestamp, error) {
func getTimestamp(record azureLogRecord, formats []string) (pcommon.Timestamp, error) {
if record.Time != "" {
return asTimestamp(record.Time, format, offset)
return asTimestamp(record.Time, formats)
} else if record.Timestamp != "" {
return asTimestamp(record.Timestamp, format, offset)
return asTimestamp(record.Timestamp, formats)
}

return 0, errMissingTimestamp
Expand All @@ -157,33 +151,21 @@ func getTimestamp(record azureLogRecord, format []string, offset time.Duration)
// asTimestamp will parse an ISO8601 string into an OpenTelemetry
// nanosecond timestamp. If the string cannot be parsed, it will
// return zero and the error.
func asTimestamp(s string, format []string, offset time.Duration) (pcommon.Timestamp, error) {
func asTimestamp(s string, formats []string) (pcommon.Timestamp, error) {
var err error
var t time.Time
if format != nil {
for _, v := range format {
if v == ISO8601 {
t, err = iso8601.ParseString(s)
} else {
t, err = time.Parse(v, s)
}
if err == nil {
break
}
// Try parsing with provided formats first
for _, format := range formats {
if t, err = time.Parse(format, s); err == nil {
return pcommon.Timestamp(t.UnixNano()), nil
}
if t == (time.Time{}) {
err = errNotSupportedTimeFormat
}
} else {
t, err = iso8601.ParseString(s)
}
if err != nil {
return 0, err
}

timestamp := t.Add(offset * time.Hour).UnixNano()

return pcommon.Timestamp(timestamp), nil
// Fallback to ISO 8601 parsing if no format matches
if t, err = iso8601.ParseString(s); err == nil {
return pcommon.Timestamp(t.UnixNano()), nil
}
return 0, err
}

// asSeverity converts the Azure log level to equivalent
Expand Down
30 changes: 21 additions & 9 deletions pkg/translator/azurelogs/resourcelogs_to_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var testBuildInfo = component.BuildInfo{
var minimumLogRecord = func() plog.LogRecord {
lr := plog.NewLogs().ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

ts, _ := asTimestamp("2022-11-11T04:48:27.6767145Z", nil, 0)
ts, _ := asTimestamp("2022-11-11T04:48:27.6767145Z", nil)
lr.SetTimestamp(ts)
lr.Attributes().PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAzure)
lr.Attributes().PutStr(conventions.AttributeCloudResourceID, "/RESOURCE_ID")
Expand All @@ -44,7 +44,7 @@ var minimumLogRecord = func() plog.LogRecord {
var maximumLogRecord1 = func() plog.LogRecord {
lr := plog.NewLogs().ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

ts, _ := asTimestamp("2022-11-11T04:48:27.6767145Z", nil, 0)
ts, _ := asTimestamp("2022-11-11T04:48:27.6767145Z", nil)
lr.SetTimestamp(ts)
lr.SetSeverityNumber(plog.SeverityNumberWarn)
lr.SetSeverityText("Warning")
Expand Down Expand Up @@ -82,7 +82,7 @@ var maximumLogRecord2 = func() []plog.LogRecord {
lr := sl.LogRecords().AppendEmpty()
lr2 := sl.LogRecords().AppendEmpty()

ts, _ := asTimestamp("2022-11-11T04:48:29.6767145Z", nil, 0)
ts, _ := asTimestamp("2022-11-11T04:48:29.6767145Z", nil)
lr.SetTimestamp(ts)
lr.SetSeverityNumber(plog.SeverityNumberWarn)
lr.SetSeverityText("Warning")
Expand Down Expand Up @@ -112,7 +112,7 @@ var maximumLogRecord2 = func() []plog.LogRecord {
properties.PutDouble("float", 41.3)
properties.PutBool("bool", true)

ts, _ = asTimestamp("2022-11-11T04:48:31.6767145Z", nil, 0)
ts, _ = asTimestamp("2022-11-11T04:48:31.6767145Z", nil)
lr2.SetTimestamp(ts)
lr2.SetSeverityNumber(plog.SeverityNumberWarn)
lr2.SetSeverityText("Warning")
Expand Down Expand Up @@ -149,7 +149,7 @@ var maximumLogRecord2 = func() []plog.LogRecord {
var badLevelLogRecord = func() plog.LogRecord {
lr := plog.NewLogs().ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

ts, _ := asTimestamp("2023-10-26T14:22:43.3416357Z", nil, 0)
ts, _ := asTimestamp("2023-10-26T14:22:43.3416357Z", nil)
lr.SetTimestamp(ts)
lr.SetSeverityNumber(plog.SeverityNumberTrace4)
lr.SetSeverityText("4")
Expand Down Expand Up @@ -193,7 +193,7 @@ var badLevelLogRecord = func() plog.LogRecord {
var badTimeLogRecord = func() plog.LogRecord {
lr := plog.NewLogs().ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

ts, _ := asTimestamp("2021-10-14T22:17:11+00:00", nil, 0)
ts, _ := asTimestamp("2021-10-14T22:17:11+00:00", nil)
lr.SetTimestamp(ts)

lr.Attributes().PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAzure)
Expand Down Expand Up @@ -239,17 +239,29 @@ var badTimeLogRecord = func() plog.LogRecord {

func TestAsTimestamp(t *testing.T) {
timestamp := "2022-11-11T04:48:27.6767145Z"
nanos, err := asTimestamp(timestamp, nil, 0)
nanos, err := asTimestamp(timestamp, nil)
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

timestamp = "11/20/2024 13:57:18"
nanos, err = asTimestamp(timestamp, []string{"01/02/2006 15:04:05"}, 0)
nanos, err = asTimestamp(timestamp, []string{"01/02/2006 15:04:05"})
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

// time_format set, but fallback to iso8601 and succeeded to parse
timestamp = "2022-11-11T04:48:27.6767145Z"
nanos, err = asTimestamp(timestamp, []string{"01/02/2006 15:04:05"})
assert.NoError(t, err)
assert.Less(t, pcommon.Timestamp(0), nanos)

// time_format set, but all failed to parse
timestamp = "11/20/2024 13:57:18"
nanos, err = asTimestamp(timestamp, []string{"2006-01-02 15:04:05"})
assert.Error(t, err)
assert.Equal(t, pcommon.Timestamp(0), nanos)

timestamp = "invalid-time"
nanos, err = asTimestamp(timestamp, nil, 0)
nanos, err = asTimestamp(timestamp, nil)
assert.Error(t, err)
assert.Equal(t, pcommon.Timestamp(0), nanos)
}
Expand Down
Loading

0 comments on commit 6176537

Please sign in to comment.