diff --git a/cmd/monitor/streaming/bootstrap.yaml b/cmd/monitor/streaming/bootstrap.yaml index f5d905276149..3d69c6f195de 100644 --- a/cmd/monitor/streaming/bootstrap.yaml +++ b/cmd/monitor/streaming/bootstrap.yaml @@ -189,20 +189,6 @@ log-storage-elasticsearch: write_timeout: "1m" index_type: "logs" -log-persist: - _enable: ${WRITE_LOG_TO_ES_ENABLE|WRITE_LOG_TO_CLICKHOUSE_ENABLE:true} - input: - topics: "${LOG_TOPICS:spot-container-log,spot-job-log}" - group: "${LOG_GROUP_ID:erda-logs-dev}" - offsets: - initial: earliest - id_keys: "${LOG_ID_KEYS:TERMINUS_DEFINE_TAG,terminus_define_tag,MESOS_TASK_ID,mesos_task_id}" - read_timeout: "5s" - buffer_size: ${LOG_BATCH_SIZE:4096} - parallelism: ${LOG_PERSIST_PARALLELISM:6} - storage_writer_service: "${LOG_STORAGE_WRITER_SERVICE:log-storage-clickhouse-writer}" - print_invalid_log: false - cassandra: _enable: ${CASSANDRA_ENABLE:false} host: "${CASSANDRA_ADDR:localhost:9042}" @@ -583,6 +569,15 @@ erda.oap.collector.core: flush_interval: ${EXPORTER_CLICKHOUSE_METRIC_FLUSH_INTERVAL} flush_jitter: ${EXPORTER_CLICKHOUSE_METRIC_FLUSH_JITTER} _enable: ${WRITE_METRIC_TO_CLICKHOUSE_ENABLE:true} + logs: + - receivers: + - "erda.oap.collector.receiver.kafka@spotlog" + exporters: + - "erda.oap.collector.exporter.clickhouse@log" + batch_size: ${LOG_BATCH_SIZE:4096} + flush_interval: ${EXPORTER_CLICKHOUSE_LOG_FLUSH_INTERVAL} + flush_jitter: ${EXPORTER_CLICKHOUSE_LOG_FLUSH_JITTER} + _enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:true} external_metrics: - receivers: @@ -631,6 +626,15 @@ erda.oap.collector.receiver.kafka@spotprofile: offsets: initial: earliest +erda.oap.collector.receiver.kafka@spotlog: + _enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:true} + proto_parser: spotlog + consumer: + topics: "${LOG_TOPICS:spot-container-log,spot-job-log}" + group: "${LOG_GROUP_ID:erda-logs-dev}" + offsets: + initial: earliest + erda.oap.collector.processor.modifier@span: _enable: ${WRITE_SPAN_TO_CLICKHOUSE_ENABLE:true} rules: @@ -651,6 +655,13 @@ erda.oap.collector.exporter.clickhouse@span: data_type: span tenant_id_key: terminus_key +erda.oap.collector.exporter.clickhouse@log: + _enable: ${WRITE_LOG_TO_CLICKHOUSE_ENABLE:true} + storage: + currency_num: ${EXPORTER_CH_LOG_CURRENCY_NUM:6} + builder: + data_type: log + clickhouse.table.initializer@span: _enable: ${WRITE_SPAN_TO_CLICKHOUSE_ENABLE:true} table_prefix: "spans" diff --git a/internal/tools/monitor/oap/collector/lib/protoparser/spotlog/spotlog.go b/internal/tools/monitor/oap/collector/lib/protoparser/spotlog/spotlog.go new file mode 100644 index 000000000000..4b8835e10d4d --- /dev/null +++ b/internal/tools/monitor/oap/collector/lib/protoparser/spotlog/spotlog.go @@ -0,0 +1,157 @@ +// Copyright (c) 2021 Terminus, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spotlog + +import ( + "errors" + "fmt" + "strings" + "sync" + + "github.com/erda-project/erda/internal/tools/monitor/core/log" + "github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/common" + "github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/common/unmarshalwork" +) + +var ( + idKeys = []string{"TERMINUS_DEFINE_TAG", "terminus_define_tag", "MESOS_TASK_ID", "mesos_task_id"} +) + +var ( + logLevelInfoValue = "INFO" + logStreamStdout = "stdout" +) + +var ( + logLevelTag = "level" + logReqIDTag = "request_id" + logReqIDTagV1 = "request-id" + logTraceTag = "trace_id" + logDiceOrgTag = "dice_org_name" + logOrgTag = "org_name" + logMonitorKeyTag = "monitor_log_key" + logTerminusKeyTag = "terminus_log_key" +) + +func ParseSpotLog(buf []byte, callback func(m *log.Log) error) error { + uw := newUnmarshalWork(buf, callback) + uw.wg.Add(1) + unmarshalwork.Schedule(uw) + uw.wg.Wait() + if uw.err != nil { + return fmt.Errorf("parse spotMetric err: %w", uw.err) + } + return nil +} + +type unmarshalWork struct { + buf []byte + err error + wg sync.WaitGroup + callback func(m *log.Log) error +} + +func newUnmarshalWork(buf []byte, callback func(m *log.Log) error) *unmarshalWork { + return &unmarshalWork{buf: buf, callback: callback} +} + +func (uw *unmarshalWork) Unmarshal() { + defer uw.wg.Done() + data := &log.LabeledLog{} + if err := common.JsonDecoder.Unmarshal(uw.buf, data); err != nil { + uw.err = err + return + } + normalize(&data.Log) + if err := Validate(data); err != nil { + uw.err = err + return + } + + if err := uw.callback(&data.Log); err != nil { + uw.err = err + return + } +} + +var ( + // ErrIDEmpty . + ErrIDEmpty = errors.New("id empty") +) + +func Validate(l *log.LabeledLog) error { + if len(l.ID) <= 0 { + return ErrIDEmpty + } + return nil +} + +func normalize(data *log.Log) { + if data.Tags == nil { + data.Tags = make(map[string]string) + } + + if data.Time != nil { + data.Timestamp = data.Time.UnixNano() + data.Time = nil + } + + // setup level + if level, ok := data.Tags[logLevelTag]; ok { + data.Tags[logLevelTag] = strings.ToUpper(level) + } else { + data.Tags[logLevelTag] = logLevelInfoValue + } + + // setup request id + if reqID, ok := data.Tags[logReqIDTag]; ok { + data.Tags[logTraceTag] = reqID + } else if reqID, ok = data.Tags[logTraceTag]; ok { + data.Tags[logReqIDTag] = reqID + } else if reqID, ok = data.Tags[logReqIDTagV1]; ok { + data.Tags[logReqIDTag] = reqID + data.Tags[logTraceTag] = reqID + delete(data.Tags, logReqIDTagV1) + } + + // setup org name + if _, ok := data.Tags[logDiceOrgTag]; !ok { + if org, ok := data.Tags[logOrgTag]; ok { + data.Tags[logDiceOrgTag] = org + } + } + + // setup log key for compatibility + key, ok := data.Tags[logMonitorKeyTag] + if !ok { + key, ok = data.Tags[logTerminusKeyTag] + if ok { + data.Tags[logMonitorKeyTag] = key + } + } + + // setup log id + for _, key := range idKeys { + if val, ok := data.Tags[key]; ok { + data.ID = val + break + } + } + + // setup default stream + if data.Stream == "" { + data.Stream = logStreamStdout + } +} diff --git a/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder/log/log.go b/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder/log/log.go index f7555f8d50b9..fe0f1870115f 100644 --- a/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder/log/log.go +++ b/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder/log/log.go @@ -14,4 +14,135 @@ package log -// todo. +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + + "github.com/erda-project/erda-infra/base/logs" + "github.com/erda-project/erda-infra/base/servicehub" + "github.com/erda-project/erda/internal/tools/monitor/core/log" + "github.com/erda-project/erda/internal/tools/monitor/core/settings/retention-strategy" + tablepkg "github.com/erda-project/erda/internal/tools/monitor/core/storekit/clickhouse/table" + "github.com/erda-project/erda/internal/tools/monitor/core/storekit/clickhouse/table/creator" + "github.com/erda-project/erda/internal/tools/monitor/oap/collector/core/model/odata" + "github.com/erda-project/erda/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder" +) + +const ( + chTableCreator = "clickhouse.table.creator@log" + chTableRetention = "storage-retention-strategy@log" +) + +type Builder struct { + logger logs.Logger + client clickhouse.Conn + Creator creator.Interface + Retention retention.Interface + cfg *builder.BuilderConfig +} + +func NewBuilder(ctx servicehub.Context, logger logs.Logger, cfg *builder.BuilderConfig) (*Builder, error) { + bu := &Builder{ + cfg: cfg, + logger: logger, + } + ch, err := builder.GetClickHouseInf(ctx, odata.LogType) + if err != nil { + return nil, fmt.Errorf("get clickhouse interface: %w", err) + } + bu.client = ch.Client() + + if svc, ok := ctx.Service(chTableCreator).(creator.Interface); !ok { + return nil, fmt.Errorf("service %q must existed", chTableCreator) + } else { + bu.Creator = svc + } + + if svc, ok := ctx.Service(chTableRetention).(retention.Interface); !ok { + return nil, fmt.Errorf("service %q must existed", chTableRetention) + } else { + bu.Retention = svc + } + + return bu, nil +} + +func (bu *Builder) BuildBatch(ctx context.Context, sourceBatch interface{}) ([]driver.Batch, error) { + items, ok := sourceBatch.([]*log.Log) + if !ok { + return nil, fmt.Errorf("sourceBatch<%T> must be []*log.LabeldLog") + } + + batches, err := bu.buildBatches(ctx, items) + if err != nil { + return nil, fmt.Errorf("failed buildBatches: %w", err) + } + return batches, nil +} + +func (bu *Builder) buildBatches(ctx context.Context, items []*log.Log) ([]driver.Batch, error) { + tableBatch := make(map[string]driver.Batch) + for _, item := range items { + bu.fillLogInfo(item) + table, err := bu.getOrCreateTenantTable(ctx, item) + if err != nil { + return nil, fmt.Errorf("cannot get tenant table: %w", err) + } + if _, ok := tableBatch[table]; !ok { + batch, err := bu.client.PrepareBatch(ctx, "INSERT INTO "+table) + if err != nil { + return nil, err + } + tableBatch[table] = batch + } + batch := tableBatch[table] + batch.AppendStruct(item) + } + batches := make([]driver.Batch, 0, len(tableBatch)) + for _, batch := range tableBatch { + batches = append(batches, batch) + } + return batches, nil +} + +func (bu *Builder) getOrCreateTenantTable(ctx context.Context, data *log.Log) (string, error) { + key := bu.Retention.GetConfigKey(data.Source, data.Tags) + ttl := bu.Retention.GetTTL(key) + var ( + wait <-chan error + table string + ) + if len(key) > 0 { + wait, table = bu.Creator.Ensure(ctx, data.Tags["dice_org_name"], key, tablepkg.FormatTTLToDays(ttl)) + } else { + wait, table = bu.Creator.Ensure(ctx, data.Tags["dice_org_name"], "", tablepkg.FormatTTLToDays(ttl)) + } + if wait != nil { + select { + case <-wait: + case <-ctx.Done(): + return table, nil + } + } + return table, nil +} + +func (bu *Builder) fillLogInfo(logData *log.Log) { + id := logData.ID + if len(id) > 12 { + id = id[:12] + } + logData.UniqId = strconv.FormatInt(logData.Timestamp, 36) + "-" + id + logData.OrgName = logData.Tags["dice_org_name"] + tenantId := logData.Tags["monitor_log_key"] + if len(tenantId) == 0 { + tenantId = logData.Tags["msp_env_id"] + } + logData.WriteTimestamp = time.Unix(0, logData.Timestamp) + logData.TenantId = tenantId +} diff --git a/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/provider.go b/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/provider.go index b1eb6f8c3772..217541fc3f8d 100644 --- a/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/provider.go +++ b/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/provider.go @@ -29,6 +29,7 @@ import ( "github.com/erda-project/erda/internal/tools/monitor/oap/collector/core/model/odata" "github.com/erda-project/erda/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder" externalmetric "github.com/erda-project/erda/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder/external_metric" + logStore "github.com/erda-project/erda/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder/log" metricstore "github.com/erda-project/erda/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder/metric" profilebuilder "github.com/erda-project/erda/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder/profile" "github.com/erda-project/erda/internal/tools/monitor/oap/collector/plugins/exporters/clickhouse/builder/span" @@ -58,7 +59,10 @@ func (p *provider) ComponentClose() error { } func (p *provider) ExportRaw(items ...*odata.Raw) error { return nil } -func (p *provider) ExportLog(items ...*log.Log) error { return nil } +func (p *provider) ExportLog(items ...*log.Log) error { + p.storage.WriteBatchAsync(items) + return nil +} func (p *provider) ExportProfile(items ...*profile.Output) error { p.storage.WriteBatchAsync(items) return nil @@ -90,6 +94,7 @@ func (p *provider) Init(ctx servicehub.Context) error { case odata.MetricType: case odata.ProfileType: case odata.ExternalMetricType: + case odata.LogType: default: return fmt.Errorf("invalid builder for data_type: %q", p.Cfg.BuilderCfg.DataType) } @@ -120,6 +125,12 @@ func (p *provider) Init(ctx servicehub.Context) error { return fmt.Errorf("external metrics build: %w", err) } batchBuilder = tmp + case odata.LogType: + tmp, err := logStore.NewBuilder(ctx, p.Log.Sub("log-builder"), p.Cfg.BuilderCfg) + if err != nil { + return fmt.Errorf("log build: %w", err) + } + batchBuilder = tmp default: return fmt.Errorf("invalid data_type: %q", p.Cfg.BuilderCfg.DataType) } diff --git a/internal/tools/monitor/oap/collector/plugins/receivers/kafka/provider.go b/internal/tools/monitor/oap/collector/plugins/receivers/kafka/provider.go index 7654a95ccf65..956403dcc224 100644 --- a/internal/tools/monitor/oap/collector/plugins/receivers/kafka/provider.go +++ b/internal/tools/monitor/oap/collector/plugins/receivers/kafka/provider.go @@ -25,12 +25,14 @@ import ( "github.com/erda-project/erda-infra/base/logs" "github.com/erda-project/erda-infra/base/servicehub" "github.com/erda-project/erda/internal/apps/msp/apm/trace" + "github.com/erda-project/erda/internal/tools/monitor/core/log" "github.com/erda-project/erda/internal/tools/monitor/core/metric" "github.com/erda-project/erda/internal/tools/monitor/core/profile" "github.com/erda-project/erda/internal/tools/monitor/oap/collector/core/model" "github.com/erda-project/erda/internal/tools/monitor/oap/collector/core/model/odata" kafkaInf "github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/kafka" "github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/protoparser/oapspan" + "github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/protoparser/spotlog" "github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/protoparser/spotmetric" "github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/protoparser/spotprofile" "github.com/erda-project/erda/internal/tools/monitor/oap/collector/lib/protoparser/spotspan" @@ -44,6 +46,7 @@ const ( spotSpan parserName = "spotspan" spotMetric parserName = "spotmetric" spotProfile parserName = "spotprofile" + spotLog parserName = "spotlog" ) type config struct { @@ -116,6 +119,8 @@ func (p *provider) Init(ctx servicehub.Context) error { invokeFunc = p.parseOapSpanEvent() case spotProfile: invokeFunc = p.parseSpotProfile() + case spotLog: + invokeFunc = p.parseSpotLog() default: return fmt.Errorf("invalide parser: %q", p.parser) } @@ -180,6 +185,14 @@ func (p *provider) parseSpotProfile() kafkaInf.ConsumerFuncV2 { } } +func (p *provider) parseSpotLog() kafkaInf.ConsumerFuncV2 { + return func(msg *sarama.ConsumerMessage) error { + return spotlog.ParseSpotLog(msg.Value, func(log *log.Log) error { + return p.consumeData(log) + }) + } +} + func (p *provider) consumeData(od odata.ObservableData) error { if p.consumer == nil { // wait consumer injected <-p.consumerInjectedC