Skip to content

Commit

Permalink
support streaming logs by pipeline mode
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Mar 7, 2024
1 parent 83f1a05 commit 5f59c7d
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 195 deletions.
39 changes: 25 additions & 14 deletions cmd/monitor/streaming/bootstrap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -189,20 +189,6 @@ log-storage-elasticsearch:
write_timeout: "1m"
index_type: "logs"

log-persist:
_enable: ${WRITE_LOG_TO_ES_ENABLE|WRITE_LOG_TO_CLICKHOUSE_ENABLE:true}
input:
topics: "${LOG_TOPICS:spot-container-log,spot-job-log}"
group: "${LOG_GROUP_ID:erda-logs-dev}"
offsets:
initial: earliest
id_keys: "${LOG_ID_KEYS:TERMINUS_DEFINE_TAG,terminus_define_tag,MESOS_TASK_ID,mesos_task_id}"
read_timeout: "5s"
buffer_size: ${LOG_BATCH_SIZE:4096}
parallelism: ${LOG_PERSIST_PARALLELISM:6}
storage_writer_service: "${LOG_STORAGE_WRITER_SERVICE:log-storage-clickhouse-writer}"
print_invalid_log: false

cassandra:
_enable: ${CASSANDRA_ENABLE:false}
host: "${CASSANDRA_ADDR:localhost:9042}"
Expand Down Expand Up @@ -583,6 +569,15 @@ erda.oap.collector.core:
flush_interval: ${EXPORTER_CLICKHOUSE_METRIC_FLUSH_INTERVAL}
flush_jitter: ${EXPORTER_CLICKHOUSE_METRIC_FLUSH_JITTER}
_enable: ${WRITE_METRIC_TO_CLICKHOUSE_ENABLE:true}
logs:
- receivers:
- "erda.oap.collector.receiver.kafka@spotlog"
exporters:
- "erda.oap.collector.exporter.clickhouse@log"
batch_size: ${LOG_BATCH_SIZE:4096}
flush_interval: ${EXPORTER_CLICKHOUSE_LOG_FLUSH_INTERVAL}
flush_jitter: ${EXPORTER_CLICKHOUSE_LOG_FLUSH_JITTER}
_enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:true}

external_metrics:
- receivers:
Expand Down Expand Up @@ -631,6 +626,15 @@ erda.oap.collector.receiver.kafka@spotprofile:
offsets:
initial: earliest

erda.oap.collector.receiver.kafka@spotlog:
_enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:true}
proto_parser: spotlog
consumer:
topics: "${LOG_TOPICS:spot-container-log,spot-job-log}"
group: "${LOG_GROUP_ID:erda-logs-dev}"
offsets:
initial: earliest

erda.oap.collector.processor.modifier@span:
_enable: ${WRITE_SPAN_TO_CLICKHOUSE_ENABLE:true}
rules:
Expand All @@ -651,6 +655,13 @@ erda.oap.collector.exporter.clickhouse@span:
data_type: span
tenant_id_key: terminus_key

erda.oap.collector.exporter.clickhouse@log:
_enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:true}
storage:
currency_num: ${EXPORTER_CH_LOG_CURRENCY_NUM:6}
builder:
data_type: log

clickhouse.table.initializer@span:
_enable: ${WRITE_SPAN_TO_CLICKHOUSE_ENABLE:true}
table_prefix: "spans"
Expand Down
110 changes: 57 additions & 53 deletions go.mod

Large diffs are not rendered by default.

227 changes: 103 additions & 124 deletions go.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package spotlog

import (
"errors"
"fmt"
"strings"
"sync"

"github.com/erda-project/erda/internal/tools/monitor/core/log"
"github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/common"
"github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/common/unmarshalwork"
)

var (
idKeys = []string{"TERMINUS_DEFINE_TAG", "terminus_define_tag", "MESOS_TASK_ID", "mesos_task_id"}
)

var (
logLevelInfoValue = "INFO"
logStreamStdout = "stdout"
)

var (
logLevelTag = "level"
logReqIDTag = "request_id"
logReqIDTagV1 = "request-id"
logTraceTag = "trace_id"
logDiceOrgTag = "dice_org_name"
logOrgTag = "org_name"
logMonitorKeyTag = "monitor_log_key"
logTerminusKeyTag = "terminus_log_key"
)

func ParseSpotLog(buf []byte, callback func(m *log.Log) error) error {
uw := newUnmarshalWork(buf, callback)
uw.wg.Add(1)
unmarshalwork.Schedule(uw)
uw.wg.Wait()
if uw.err != nil {
return fmt.Errorf("parse spotMetric err: %w", uw.err)
}
return nil
}

type unmarshalWork struct {
buf []byte
err error
wg sync.WaitGroup
callback func(m *log.Log) error
}

func newUnmarshalWork(buf []byte, callback func(m *log.Log) error) *unmarshalWork {
return &unmarshalWork{buf: buf, callback: callback}
}

func (uw *unmarshalWork) Unmarshal() {
defer uw.wg.Done()
data := &log.LabeledLog{}
if err := common.JsonDecoder.Unmarshal(uw.buf, data); err != nil {
uw.err = err
return
}
normalize(&data.Log)
if err := Validate(data); err != nil {
uw.err = err
return
}

if err := uw.callback(&data.Log); err != nil {
uw.err = err
return
}
}

var (
// ErrIDEmpty .
ErrIDEmpty = errors.New("id empty")
)

func Validate(l *log.LabeledLog) error {
if len(l.ID) <= 0 {
return ErrIDEmpty
}
return nil
}

func normalize(data *log.Log) {
if data.Tags == nil {
data.Tags = make(map[string]string)
}

if data.Time != nil {
data.Timestamp = data.Time.UnixNano()
data.Time = nil
}

// setup level
if level, ok := data.Tags[logLevelTag]; ok {
data.Tags[logLevelTag] = strings.ToUpper(level)
} else {
data.Tags[logLevelTag] = logLevelInfoValue
}

// setup request id
if reqID, ok := data.Tags[logReqIDTag]; ok {
data.Tags[logTraceTag] = reqID
} else if reqID, ok = data.Tags[logTraceTag]; ok {
data.Tags[logReqIDTag] = reqID
} else if reqID, ok = data.Tags[logReqIDTagV1]; ok {
data.Tags[logReqIDTag] = reqID
data.Tags[logTraceTag] = reqID
delete(data.Tags, logReqIDTagV1)
}

// setup org name
if _, ok := data.Tags[logDiceOrgTag]; !ok {
if org, ok := data.Tags[logOrgTag]; ok {
data.Tags[logDiceOrgTag] = org
}
}

// setup log key for compatibility
key, ok := data.Tags[logMonitorKeyTag]
if !ok {
key, ok = data.Tags[logTerminusKeyTag]
if ok {
data.Tags[logMonitorKeyTag] = key
}
}

// setup log id
for _, key := range idKeys {
if val, ok := data.Tags[key]; ok {
data.ID = val
break
}
}

// setup default stream
if data.Stream == "" {
data.Stream = logStreamStdout
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,135 @@

package log

// todo.
import (
"context"
"fmt"
"strconv"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"

"github.com/erda-project/erda-infra/base/logs"
"github.com/erda-project/erda-infra/base/servicehub"
"github.com/erda-project/erda/internal/tools/monitor/core/log"
"github.com/erda-project/erda/internal/tools/monitor/core/settings/retention-strategy"
tablepkg "github.com/erda-project/erda/internal/tools/monitor/core/storekit/clickhouse/table"
"github.com/erda-project/erda/internal/tools/monitor/core/storekit/clickhouse/table/creator"
"github.com/erda-project/erda/internal/tools/monitor/oap/collector/core/model/odata"
"github.com/erda-project/erda/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder"
)

const (
chTableCreator = "clickhouse.table.creator@log"
chTableRetention = "storage-retention-strategy@log"
)

type Builder struct {
logger logs.Logger
client clickhouse.Conn
Creator creator.Interface
Retention retention.Interface
cfg *builder.BuilderConfig
}

func NewBuilder(ctx servicehub.Context, logger logs.Logger, cfg *builder.BuilderConfig) (*Builder, error) {
bu := &Builder{
cfg: cfg,
logger: logger,
}
ch, err := builder.GetClickHouseInf(ctx, odata.LogType)
if err != nil {
return nil, fmt.Errorf("get clickhouse interface: %w", err)
}
bu.client = ch.Client()

if svc, ok := ctx.Service(chTableCreator).(creator.Interface); !ok {
return nil, fmt.Errorf("service %q must existed", chTableCreator)
} else {
bu.Creator = svc
}

if svc, ok := ctx.Service(chTableRetention).(retention.Interface); !ok {
return nil, fmt.Errorf("service %q must existed", chTableRetention)
} else {
bu.Retention = svc
}

return bu, nil
}

func (bu *Builder) BuildBatch(ctx context.Context, sourceBatch interface{}) ([]driver.Batch, error) {
items, ok := sourceBatch.([]*log.Log)
if !ok {
return nil, fmt.Errorf("sourceBatch<%T> must be []*log.LabeldLog", sourceBatch)
}

batches, err := bu.buildBatches(ctx, items)
if err != nil {
return nil, fmt.Errorf("failed buildBatches: %w", err)
}
return batches, nil
}

func (bu *Builder) buildBatches(ctx context.Context, items []*log.Log) ([]driver.Batch, error) {
tableBatch := make(map[string]driver.Batch)
for _, item := range items {
bu.fillLogInfo(item)
table, err := bu.getOrCreateTenantTable(ctx, item)
if err != nil {
return nil, fmt.Errorf("cannot get tenant table: %w", err)
}
if _, ok := tableBatch[table]; !ok {
batch, err := bu.client.PrepareBatch(ctx, "INSERT INTO "+table)
if err != nil {
return nil, err
}
tableBatch[table] = batch
}
batch := tableBatch[table]
batch.AppendStruct(item)
}
batches := make([]driver.Batch, 0, len(tableBatch))
for _, batch := range tableBatch {
batches = append(batches, batch)
}
return batches, nil
}

func (bu *Builder) getOrCreateTenantTable(ctx context.Context, data *log.Log) (string, error) {
key := bu.Retention.GetConfigKey(data.Source, data.Tags)
ttl := bu.Retention.GetTTL(key)
var (
wait <-chan error
table string
)
if len(key) > 0 {
wait, table = bu.Creator.Ensure(ctx, data.Tags["dice_org_name"], key, tablepkg.FormatTTLToDays(ttl))
} else {
wait, table = bu.Creator.Ensure(ctx, data.Tags["dice_org_name"], "", tablepkg.FormatTTLToDays(ttl))
}
if wait != nil {
select {
case <-wait:
case <-ctx.Done():
return table, nil
}
}
return table, nil
}

func (bu *Builder) fillLogInfo(logData *log.Log) {
id := logData.ID
if len(id) > 12 {
id = id[:12]
}
logData.UniqId = strconv.FormatInt(logData.Timestamp, 36) + "-" + id
logData.OrgName = logData.Tags["dice_org_name"]
tenantId := logData.Tags["monitor_log_key"]
if len(tenantId) == 0 {
tenantId = logData.Tags["msp_env_id"]
}
logData.WriteTimestamp = time.Unix(0, logData.Timestamp)
logData.TenantId = tenantId
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ func (m *mockConn) QueryRow(ctx context.Context, query string, args ...interface
panic("implement me")
}

func (m *mockConn) PrepareBatch(ctx context.Context, query string) (driver.Batch, error) {
func (m *mockConn) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
return &mockBatch{}, nil
}

func (m *mockConn) Exec(ctx context.Context, query string, args ...interface{}) error {
panic("implement me")
}

func (m *mockConn) AsyncInsert(ctx context.Context, query string, wait bool) error {
func (m *mockConn) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
panic("implement me")
}

Expand Down Expand Up @@ -176,3 +176,5 @@ func (m *mockBatch) Send() error { return nil }
func (m *mockBatch) Flush() error { return nil }

func (m *mockBatch) IsSent() bool { return true }

func (m *mockBatch) Rows() int { return 0 }
Loading

0 comments on commit 5f59c7d

Please sign in to comment.