Skip to content

Commit

Permalink
support streaming logs by pipeline mode
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Mar 7, 2024
1 parent 83f1a05 commit 3ec4fa2
Show file tree
Hide file tree
Showing 13 changed files with 515 additions and 205 deletions.
39 changes: 25 additions & 14 deletions cmd/monitor/streaming/bootstrap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -189,20 +189,6 @@ log-storage-elasticsearch:
write_timeout: "1m"
index_type: "logs"

log-persist:
_enable: ${WRITE_LOG_TO_ES_ENABLE|WRITE_LOG_TO_CLICKHOUSE_ENABLE:true}
input:
topics: "${LOG_TOPICS:spot-container-log,spot-job-log}"
group: "${LOG_GROUP_ID:erda-logs-dev}"
offsets:
initial: earliest
id_keys: "${LOG_ID_KEYS:TERMINUS_DEFINE_TAG,terminus_define_tag,MESOS_TASK_ID,mesos_task_id}"
read_timeout: "5s"
buffer_size: ${LOG_BATCH_SIZE:4096}
parallelism: ${LOG_PERSIST_PARALLELISM:6}
storage_writer_service: "${LOG_STORAGE_WRITER_SERVICE:log-storage-clickhouse-writer}"
print_invalid_log: false

cassandra:
_enable: ${CASSANDRA_ENABLE:false}
host: "${CASSANDRA_ADDR:localhost:9042}"
Expand Down Expand Up @@ -583,6 +569,15 @@ erda.oap.collector.core:
flush_interval: ${EXPORTER_CLICKHOUSE_METRIC_FLUSH_INTERVAL}
flush_jitter: ${EXPORTER_CLICKHOUSE_METRIC_FLUSH_JITTER}
_enable: ${WRITE_METRIC_TO_CLICKHOUSE_ENABLE:true}
logs:
- receivers:
- "erda.oap.collector.receiver.kafka@spotlog"
exporters:
- "erda.oap.collector.exporter.clickhouse@log"
batch_size: ${LOG_BATCH_SIZE:4096}
flush_interval: ${EXPORTER_CLICKHOUSE_LOG_FLUSH_INTERVAL}
flush_jitter: ${EXPORTER_CLICKHOUSE_LOG_FLUSH_JITTER}
_enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:true}

external_metrics:
- receivers:
Expand Down Expand Up @@ -631,6 +626,15 @@ erda.oap.collector.receiver.kafka@spotprofile:
offsets:
initial: earliest

erda.oap.collector.receiver.kafka@spotlog:
_enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:true}
proto_parser: spotlog
consumer:
topics: "${LOG_TOPICS:spot-container-log,spot-job-log}"
group: "${LOG_GROUP_ID:erda-logs-dev}"
offsets:
initial: earliest

erda.oap.collector.processor.modifier@span:
_enable: ${WRITE_SPAN_TO_CLICKHOUSE_ENABLE:true}
rules:
Expand All @@ -651,6 +655,13 @@ erda.oap.collector.exporter.clickhouse@span:
data_type: span
tenant_id_key: terminus_key

erda.oap.collector.exporter.clickhouse@log:
_enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:true}
storage:
currency_num: ${EXPORTER_CH_LOG_CURRENCY_NUM:6}
builder:
data_type: log

clickhouse.table.initializer@span:
_enable: ${WRITE_SPAN_TO_CLICKHOUSE_ENABLE:true}
table_prefix: "spans"
Expand Down
110 changes: 57 additions & 53 deletions go.mod

Large diffs are not rendered by default.

227 changes: 103 additions & 124 deletions go.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -421,15 +421,15 @@ func (m *mockconn) QueryRow(ctx context.Context, query string, args ...interface
return &mockRow{}
}

func (m *mockconn) PrepareBatch(ctx context.Context, query string) (driver.Batch, error) {
func (m *mockconn) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
return nil, nil
}

func (m *mockconn) Exec(ctx context.Context, query string, args ...interface{}) error {
return nil
}

func (m *mockconn) AsyncInsert(ctx context.Context, query string, wait bool) error {
func (m *mockconn) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ func (m MockCkDriver) QueryRow(ctx context.Context, query string, args ...interf
panic("implement me")
}

func (m MockCkDriver) PrepareBatch(ctx context.Context, query string) (ckdriver.Batch, error) {
func (m MockCkDriver) PrepareBatch(ctx context.Context, query string, opts ...ckdriver.PrepareBatchOption) (ckdriver.Batch, error) {
panic("implement me")
}

func (m MockCkDriver) Exec(ctx context.Context, query string, args ...interface{}) error {
panic("implement me")
}

func (m MockCkDriver) AsyncInsert(ctx context.Context, query string, wait bool) error {
func (m MockCkDriver) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
panic("implement me")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (m MockCkDriver) QueryRow(ctx context.Context, query string, args ...interf
panic("implement me")
}

func (m MockCkDriver) PrepareBatch(ctx context.Context, query string) (ckdriver.Batch, error) {
func (m MockCkDriver) PrepareBatch(ctx context.Context, query string, opts ...ckdriver.PrepareBatchOption) (ckdriver.Batch, error) {
panic("implement me")
}

Expand All @@ -131,7 +131,7 @@ func (m MockCkDriver) Exec(ctx context.Context, query string, args ...interface{
return nil
}

func (m MockCkDriver) AsyncInsert(ctx context.Context, query string, wait bool) error {
func (m MockCkDriver) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
panic("implement me")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ func (m *mockClickhouseConn) QueryRow(ctx context.Context, query string, args ..
return nil
}

func (m *mockClickhouseConn) PrepareBatch(ctx context.Context, query string) (driver.Batch, error) {
func (m *mockClickhouseConn) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
return nil, nil
}

func (m *mockClickhouseConn) Exec(ctx context.Context, query string, args ...interface{}) error {
return nil
}

func (m *mockClickhouseConn) AsyncInsert(ctx context.Context, query string, wait bool) error {
func (m *mockClickhouseConn) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package spotlog

import (
"errors"
"fmt"
"strings"
"sync"

"github.com/erda-project/erda/internal/tools/monitor/core/log"
"github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/common"
"github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/common/unmarshalwork"
)

var (
idKeys = []string{"TERMINUS_DEFINE_TAG", "terminus_define_tag", "MESOS_TASK_ID", "mesos_task_id"}
)

var (
logLevelInfoValue = "INFO"
logStreamStdout = "stdout"
)

var (
logLevelTag = "level"
logReqIDTag = "request_id"
logReqIDTagV1 = "request-id"
logTraceTag = "trace_id"
logDiceOrgTag = "dice_org_name"
logOrgTag = "org_name"
logMonitorKeyTag = "monitor_log_key"
logTerminusKeyTag = "terminus_log_key"
)

func ParseSpotLog(buf []byte, callback func(m *log.Log) error) error {
uw := newUnmarshalWork(buf, callback)
uw.wg.Add(1)
unmarshalwork.Schedule(uw)
uw.wg.Wait()
if uw.err != nil {
return fmt.Errorf("parse spotMetric err: %w", uw.err)
}
return nil
}

type unmarshalWork struct {
buf []byte
err error
wg sync.WaitGroup
callback func(m *log.Log) error
}

func newUnmarshalWork(buf []byte, callback func(m *log.Log) error) *unmarshalWork {
return &unmarshalWork{buf: buf, callback: callback}
}

func (uw *unmarshalWork) Unmarshal() {
defer uw.wg.Done()
data := &log.LabeledLog{}
if err := common.JsonDecoder.Unmarshal(uw.buf, data); err != nil {
uw.err = err
return
}
normalize(&data.Log)
if err := Validate(data); err != nil {
uw.err = err
return
}

if err := uw.callback(&data.Log); err != nil {
uw.err = err
return
}
}

var (
// ErrIDEmpty .
ErrIDEmpty = errors.New("id empty")
)

func Validate(l *log.LabeledLog) error {
if len(l.ID) <= 0 {
return ErrIDEmpty
}
return nil
}

func normalize(data *log.Log) {
if data.Tags == nil {
data.Tags = make(map[string]string)
}

if data.Time != nil {
data.Timestamp = data.Time.UnixNano()
data.Time = nil
}

// setup level
if level, ok := data.Tags[logLevelTag]; ok {
data.Tags[logLevelTag] = strings.ToUpper(level)
} else {
data.Tags[logLevelTag] = logLevelInfoValue
}

// setup request id
if reqID, ok := data.Tags[logReqIDTag]; ok {
data.Tags[logTraceTag] = reqID
} else if reqID, ok = data.Tags[logTraceTag]; ok {
data.Tags[logReqIDTag] = reqID
} else if reqID, ok = data.Tags[logReqIDTagV1]; ok {
data.Tags[logReqIDTag] = reqID
data.Tags[logTraceTag] = reqID
delete(data.Tags, logReqIDTagV1)
}

// setup org name
if _, ok := data.Tags[logDiceOrgTag]; !ok {
if org, ok := data.Tags[logOrgTag]; ok {
data.Tags[logDiceOrgTag] = org
}
}

// setup log key for compatibility
key, ok := data.Tags[logMonitorKeyTag]
if !ok {
key, ok = data.Tags[logTerminusKeyTag]
if ok {
data.Tags[logMonitorKeyTag] = key
}
}

// setup log id
for _, key := range idKeys {
if val, ok := data.Tags[key]; ok {
data.ID = val
break
}
}

// setup default stream
if data.Stream == "" {
data.Stream = logStreamStdout
}
}
Loading

0 comments on commit 3ec4fa2

Please sign in to comment.