diff --git a/server/ingester/app_log/app_log.go b/server/ingester/app_log/app_log.go new file mode 100644 index 00000000000..0aec08d103e --- /dev/null +++ b/server/ingester/app_log/app_log.go @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app_log + +import ( + "strconv" + "time" + + "github.com/deepflowio/deepflow/server/ingester/app_log/config" + "github.com/deepflowio/deepflow/server/ingester/app_log/dbwriter" + "github.com/deepflowio/deepflow/server/ingester/app_log/decoder" + dropletqueue "github.com/deepflowio/deepflow/server/ingester/droplet/queue" + "github.com/deepflowio/deepflow/server/ingester/ingesterctl" + "github.com/deepflowio/deepflow/server/ingester/pkg/ckwriter" + "github.com/deepflowio/deepflow/server/libs/datatype" + "github.com/deepflowio/deepflow/server/libs/grpc" + "github.com/deepflowio/deepflow/server/libs/queue" + libqueue "github.com/deepflowio/deepflow/server/libs/queue" + "github.com/deepflowio/deepflow/server/libs/receiver" +) + +type ApplicationLogger struct { + Config *config.Config + Ckwriter *ckwriter.CKWriter + SysLogger *AppLogger + AppLogger *AppLogger +} + +type AppLogger struct { + Config *config.Config + Decoders []*decoder.Decoder + PlatformDatas []*grpc.PlatformInfoTable +} + +func NewApplicationLogger( + config *config.Config, + recv *receiver.Receiver, + platformDataManager *grpc.PlatformDataManager, +) (*ApplicationLogger, error) { + manager := dropletqueue.NewManager(ingesterctl.INGESTERCTL_APPLICATION_LOG_QUEUE) + + ckwriter, err := dbwriter.NewAppLogCKWriter(config) + if err != nil { + return nil, err + } + sysLogger, err := NewAppLogger(datatype.MESSAGE_TYPE_SYSLOG, config, manager, recv, platformDataManager, ckwriter) + if err != nil { + return nil, err + } + appLogger, err := NewAppLogger(datatype.MESSAGE_TYPE_APPLICATION_LOG, config, manager, recv, platformDataManager, ckwriter) + if err != nil { + return nil, err + } + + return &ApplicationLogger{ + Config: config, + Ckwriter: ckwriter, + SysLogger: sysLogger, + AppLogger: appLogger, + }, nil +} + +func (l *ApplicationLogger) Start() { + l.Ckwriter.Run() + l.SysLogger.Start() + l.AppLogger.Start() +} + +func (l *ApplicationLogger) Close() error { + l.SysLogger.Close() + l.AppLogger.Close() + l.Ckwriter.Close() + return nil +} + +func NewAppLogger( + msgType datatype.MessageType, + config *config.Config, + manager *dropletqueue.Manager, + recv *receiver.Receiver, + platformDataManager *grpc.PlatformDataManager, + ckwriter *ckwriter.CKWriter, +) (*AppLogger, error) { + + queueCount := config.DecoderQueueCount + decodeQueues := manager.NewQueues( + "1-receive-to-decode-"+msgType.String(), + config.DecoderQueueSize, + queueCount, + 1, + libqueue.OptionFlushIndicator(3*time.Second), + libqueue.OptionRelease(func(p interface{}) { receiver.ReleaseRecvBuffer(p.(*receiver.RecvBuffer)) })) + recv.RegistHandler(msgType, decodeQueues, queueCount) + + decoders := make([]*decoder.Decoder, queueCount) + platformDatas := make([]*grpc.PlatformInfoTable, queueCount) + for i := 0; i < queueCount; i++ { + logWriter, err := dbwriter.NewAppLogWriter(i, msgType, config, ckwriter) + if err != nil { + return nil, err + } + platformDatas[i], err = platformDataManager.NewPlatformInfoTable("app-log-" + msgType.String() + "-" + strconv.Itoa(i)) + if err != nil { + return nil, err + } + decoders[i] = decoder.NewDecoder( + i, + msgType, + queue.QueueReader(decodeQueues.FixedMultiQueue[i]), + logWriter, + platformDatas[i], + config, + ) + } + + return &AppLogger{ + Config: config, + Decoders: decoders, + PlatformDatas: platformDatas, + }, nil + +} + +func (l *AppLogger) Start() { + for _, decoder := range l.Decoders { + go decoder.Run() + } + for _, platformData := range l.PlatformDatas { + platformData.Start() + } +} + +func (l *AppLogger) Close() error { + for _, decoder := range l.Decoders { + decoder.Close() + } + for _, platformData := range l.PlatformDatas { + platformData.ClosePlatformInfoTable() + } + return nil +} diff --git a/server/ingester/app_log/config/config.go b/server/ingester/app_log/config/config.go new file mode 100644 index 00000000000..a5b70a3fbc2 --- /dev/null +++ b/server/ingester/app_log/config/config.go @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "io/ioutil" + "os" + + "github.com/deepflowio/deepflow/server/ingester/config" + + logging "github.com/op/go-logging" + yaml "gopkg.in/yaml.v2" +) + +var log = logging.MustGetLogger("event.config") + +const ( + DefaultDecoderQueueCount = 1 + DefaultDecoderQueueSize = 16384 + DefaultTTL = 168 // hour +) + +type Config struct { + Base *config.Config + CKWriterConfig config.CKWriterConfig `yaml:"app-log-ck-writer"` + DecoderQueueCount int `yaml:"app-log-decoder-queue-count"` + DecoderQueueSize int `yaml:"app-log-decoder-queue-size"` + TTL int `yaml:"app-log-ttl-hour"` +} + +type ApplicationLogConfig struct { + ApplicationLog Config `yaml:"ingester"` +} + +func (c *Config) Validate() error { + if c.DecoderQueueCount == 0 { + c.DecoderQueueCount = DefaultDecoderQueueCount + } + if c.DecoderQueueSize == 0 { + c.DecoderQueueSize = DefaultDecoderQueueSize + } + + return nil +} + +func Load(base *config.Config, path string) *Config { + config := &ApplicationLogConfig{ + ApplicationLog: Config{ + Base: base, + CKWriterConfig: config.CKWriterConfig{QueueCount: 1, QueueSize: 50000, BatchSize: 25600, FlushTimeout: 5}, + DecoderQueueCount: DefaultDecoderQueueCount, + DecoderQueueSize: DefaultDecoderQueueSize, + TTL: DefaultTTL, + }, + } + if _, err := os.Stat(path); os.IsNotExist(err) { + log.Info("no config file, use defaults") + return &config.ApplicationLog + } + configBytes, err := ioutil.ReadFile(path) + if err != nil { + log.Warning("Read config file error:", err) + config.ApplicationLog.Validate() + return &config.ApplicationLog + } + if err = yaml.Unmarshal(configBytes, &config); err != nil { + log.Error("Unmarshal yaml error:", err) + os.Exit(1) + } + + if err = config.ApplicationLog.Validate(); err != nil { + log.Error(err) + os.Exit(1) + } + return &config.ApplicationLog +} diff --git a/server/ingester/app_log/dbwriter/log.go b/server/ingester/app_log/dbwriter/log.go new file mode 100644 index 00000000000..247db08d41f --- /dev/null +++ b/server/ingester/app_log/dbwriter/log.go @@ -0,0 +1,324 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbwriter + +import ( + "net" + "sync/atomic" + + basecommon "github.com/deepflowio/deepflow/server/ingester/common" + "github.com/deepflowio/deepflow/server/ingester/exporters/config" + "github.com/deepflowio/deepflow/server/ingester/flow_tag" + "github.com/deepflowio/deepflow/server/libs/ckdb" + "github.com/deepflowio/deepflow/server/libs/pool" +) + +const ( + DefaultPartition = ckdb.TimeFuncHour +) + +type ApplicationLogStore struct { + pool.ReferenceCount + + Time uint32 `json:"time" category:"$tag" sub:"flow_info"` // s + Timestamp int64 `json:"timestamp" category:"$tag" sub:"flow_info"` + _id uint64 `json:"_id" category:"$tag" sub:"flow_info"` + + TraceID string + SpanID string + TraceFlags uint32 + + SeverityNumber uint8 // numerical value of the severity(also known as log level id) + + Body string + + AppInstance string `json:"app_instance" category:"$tag" sub:"service_info"` // service name + + AgentID uint16 `json:"agent_id" category:"$tag" sub:"universal_tag"` + RegionID uint16 `json:"region_id" category:"$tag" sub:"universal_tag"` + AZID uint16 `json:"az_id" category:"$tag" sub:"universal_tag"` + L3EpcID int32 `json:"l3_epc_id" category:"$tag" sub:"universal_tag"` + HostID uint16 `json:"host_id" category:"$tag" sub:"universal_tag"` + PodID uint32 `json:"pod_id" category:"$tag" sub:"universal_tag"` + PodNodeID uint32 `json:"host_node_id" category:"$tag" sub:"universal_tag"` + PodNSID uint16 `json:"pod_ns_id" category:"$tag" sub:"universal_tag"` + PodClusterID uint16 `json:"pod_cluster_id" category:"$tag" sub:"universal_tag"` + PodGroupID uint32 `json:"pod_group_id" category:"$tag" sub:"universal_tag"` + L3DeviceType uint8 `json:"l3_device_type" category:"$tag" sub:"universal_tag"` + L3DeviceID uint32 `json:"l3_device_id" category:"$tag" sub:"universal_tag"` + ServiceID uint32 `json:"service_id" category:"$tag" sub:"universal_tag"` + SubnetID uint16 `json:"subnet_id" category:"$tag" sub:"universal_tag"` + IsIPv4 bool `json:"is_ipv4" category:"$tag" sub:"universal_tag"` + IP4 uint32 `json:"ip4" category:"$tag" sub:"network_layer" to_string:"IPv4String"` + IP6 net.IP `json:"ip6" category:"$tag" sub:"network_layer" to_string:"IPv6String"` + + // Not stored, only determines which database to store in. + // When Orgid is 0 or 1, it is stored in database 'event', otherwise stored in '_event'. + OrgId uint16 + TeamID uint16 + + AutoInstanceID uint32 `json:"auto_instance_id" category:"$tag" sub:"universal_tag"` + AutoInstanceType uint8 `json:"auto_instance_type" category:"$tag" sub:"universal_tag" enumfile:"auto_instance_type"` + AutoServiceID uint32 `json:"auto_service_id" category:"$tag" sub:"universal_tag"` + AutoServiceType uint8 `json:"auto_service_type" category:"$tag" sub:"universal_tag" enumfile:"auto_service_type"` + + AttributeNames []string `json:"attribute_names" category:"$tag" sub:"native_tag" data_type:"[]string"` + AttributeValues []string `json:"attribute_values" category:"$tag" sub:"native_tag" data_type:"[]string"` + + MetricsNames []string `json:"metrics_names" category:"$metrics" data_type:"[]string"` + MetricsValues []float64 `json:"metrics_values" category:"$metrics" data_type:"[]float64"` +} + +func (l *ApplicationLogStore) WriteBlock(block *ckdb.Block) { + block.WriteDateTime(l.Time) + block.Write( + l.Timestamp, + l._id, + l.TraceID, + l.SpanID, + l.TraceFlags, + l.SeverityNumber, + l.Body, + l.AppInstance, + + l.AgentID, + l.RegionID, + l.AZID, + l.L3EpcID, + l.HostID, + l.PodID, + l.PodNodeID, + l.PodNSID, + l.PodClusterID, + l.PodGroupID, + l.L3DeviceType, + l.L3DeviceID, + l.ServiceID, + l.SubnetID) + block.WriteBool(l.IsIPv4) + block.WriteIPv4(l.IP4) + block.WriteIPv6(l.IP6) + + block.Write( + l.TeamID, + l.AutoInstanceID, + l.AutoInstanceType, + l.AutoServiceID, + l.AutoServiceType, + + l.AttributeNames, + l.AttributeValues, + l.MetricsNames, + l.MetricsValues, + ) +} + +func (l *ApplicationLogStore) OrgID() uint16 { + return l.OrgId +} + +func (l *ApplicationLogStore) Table() string { + return LOG_TABLE +} + +func (l *ApplicationLogStore) Release() { + ReleaseApplicationLogStore(l) +} + +func (l *ApplicationLogStore) DataSource() uint32 { + return uint32(config.MAX_DATASOURCE_ID) +} + +var LogCounter uint32 + +func (l *ApplicationLogStore) SetId(time, analyzerID uint32) { + count := atomic.AddUint32(&LogCounter, 1) + // The high 32 bits of time, 23-32 bits represent analyzerId, the low 22 bits are counter + l._id = uint64(time)<<32 | uint64(analyzerID&0x3ff)<<22 | (uint64(count) & 0x3fffff) +} + +func LogColumns() []*ckdb.Column { + columns := []*ckdb.Column{ + ckdb.NewColumn("time", ckdb.DateTime), + ckdb.NewColumn("timestamp", ckdb.DateTime64us).SetComment("presion: us"), + ckdb.NewColumn("_id", ckdb.UInt64).SetComment("Unique ID"), + ckdb.NewColumn("trace_id", ckdb.String).SetCodec(ckdb.CodecZSTD).SetIndex(ckdb.IndexBloomfilter).SetComment("Trace ID"), + ckdb.NewColumn("span_id", ckdb.String).SetCodec(ckdb.CodecZSTD).SetIndex(ckdb.IndexBloomfilter).SetComment("Trace ID"), + ckdb.NewColumn("trace_flags", ckdb.UInt32).SetComment("W3C trace flag, currently not support yet"), + ckdb.NewColumn("severity_number", ckdb.UInt8).SetComment("numerical value of the severity(also known as log level id)"), + ckdb.NewColumn("body", ckdb.String).SetCodec(ckdb.CodecZSTD).SetComment("log content"), + ckdb.NewColumn("app_instance", ckdb.LowCardinalityString).SetComment("Application Instance (service name)"), + + ckdb.NewColumn("agent_id", ckdb.UInt16).SetComment("Agent ID"), + ckdb.NewColumn("region_id", ckdb.UInt16).SetComment("Region ID"), + ckdb.NewColumn("az_id", ckdb.UInt16).SetComment("Availability Zone ID"), + ckdb.NewColumn("l3_epc_id", ckdb.Int32).SetComment("VPC ID"), + ckdb.NewColumn("host_id", ckdb.UInt16).SetComment("M Hypervisor ID"), + ckdb.NewColumn("pod_id", ckdb.UInt32).SetComment("K8s POD ID"), + ckdb.NewColumn("pod_node_id", ckdb.UInt32).SetComment("K8s Node ID"), + ckdb.NewColumn("pod_ns_id", ckdb.UInt16).SetComment("K8s Namespace ID"), + ckdb.NewColumn("pod_cluster_id", ckdb.UInt16).SetComment("K8s Cluster ID"), + ckdb.NewColumn("pod_group_id", ckdb.UInt32).SetComment("K8s Workload ID"), + + ckdb.NewColumn("l3_device_type", ckdb.UInt8).SetComment("Resource Type"), + ckdb.NewColumn("l3_device_id", ckdb.UInt32).SetComment("Resource ID"), + ckdb.NewColumn("service_id", ckdb.UInt32).SetComment("Service ID"), + ckdb.NewColumn("subnet_id", ckdb.UInt16).SetComment("Subnet ID"), + ckdb.NewColumn("is_ipv4", ckdb.UInt8), + ckdb.NewColumn("ip4", ckdb.IPv4), + ckdb.NewColumn("ip6", ckdb.IPv6), + + ckdb.NewColumn("team_id", ckdb.UInt16).SetComment("Team ID"), + + ckdb.NewColumn("auto_instance_id", ckdb.UInt32).SetComment("Instance - K8s POD First"), + ckdb.NewColumn("auto_instance_type", ckdb.UInt8).SetComment("Type - K8s POD First"), + ckdb.NewColumn("auto_service_id", ckdb.UInt32).SetComment("Instance - K8s Service First"), + ckdb.NewColumn("auto_service_type", ckdb.UInt8).SetComment("Type - K8s Service First"), + + ckdb.NewColumn("attribute_names", ckdb.ArrayLowCardinalityString).SetComment("Extra Attributes"), + ckdb.NewColumn("attribute_values", ckdb.ArrayString).SetComment("the value of the extra attributes"), + ckdb.NewColumn("metrics_names", ckdb.ArrayLowCardinalityString).SetComment("Extra Metrics"), + ckdb.NewColumn("metrics_values", ckdb.ArrayFloat64).SetComment("the value of the extra metrics"), + } + return columns +} + +func GenLogCKTable(cluster, storagePolicy, table string, ttl int, coldStorage *ckdb.ColdStorage) *ckdb.Table { + timeKey := "time" + engine := ckdb.MergeTree + orderKeys := []string{"app_instance", timeKey, "l3_device_id"} + partition := DefaultPartition + + return &ckdb.Table{ + Version: basecommon.CK_VERSION, + Database: LOG_DB, + LocalName: table + ckdb.LOCAL_SUBFFIX, + GlobalName: table, + Columns: LogColumns(), + TimeKey: timeKey, + TTL: ttl, + PartitionFunc: partition, + Engine: engine, + Cluster: cluster, + StoragePolicy: storagePolicy, + ColdStorage: *coldStorage, + OrderKeys: orderKeys, + PrimaryKeyCount: len(orderKeys), + } +} + +func (l *ApplicationLogStore) GenerateNewFlowTags(cache *flow_tag.FlowTagCache) { + // reset temporary buffers + flowTagInfo := &cache.FlowTagInfoBuffer + *flowTagInfo = flow_tag.FlowTagInfo{ + Table: l.Table(), + VpcId: l.L3EpcID, + PodNsId: l.PodNSID, + OrgId: l.OrgId, + TeamID: l.TeamID, + } + cache.Fields = cache.Fields[:0] + cache.FieldValues = cache.FieldValues[:0] + + // tags + flowTagInfo.FieldType = flow_tag.FieldTag + for i, name := range l.AttributeNames { + flowTagInfo.FieldName = name + + // tag + value + flowTagInfo.FieldValue = l.AttributeValues[i] + if old, ok := cache.FieldValueCache.AddOrGet(*flowTagInfo, l.Time); ok { + if old+cache.CacheFlushTimeout >= l.Time { + // If there is no new fieldValue, of course there will be no new field. + // So we can just skip the rest of the process in the loop. + continue + } else { + cache.FieldValueCache.Add(*flowTagInfo, l.Time) + } + } + tagFieldValue := flow_tag.AcquireFlowTag(flow_tag.TagFieldValue) + tagFieldValue.Timestamp = l.Time + tagFieldValue.FlowTagInfo = *flowTagInfo + cache.FieldValues = append(cache.FieldValues, tagFieldValue) + + // only tag + flowTagInfo.FieldValue = "" + if old, ok := cache.FieldCache.AddOrGet(*flowTagInfo, l.Time); ok { + if old+cache.CacheFlushTimeout >= l.Time { + continue + } else { + cache.FieldCache.Add(*flowTagInfo, l.Time) + } + } + tagField := flow_tag.AcquireFlowTag(flow_tag.TagField) + tagField.Timestamp = l.Time + tagField.FlowTagInfo = *flowTagInfo + cache.Fields = append(cache.Fields, tagField) + } + + // metrics + flowTagInfo.FieldType = flow_tag.FieldMetrics + flowTagInfo.FieldValue = "" + for _, name := range l.MetricsNames { + flowTagInfo.FieldName = name + if old, ok := cache.FieldCache.AddOrGet(*flowTagInfo, l.Time); ok { + if old+cache.CacheFlushTimeout >= l.Time { + continue + } else { + cache.FieldCache.Add(*flowTagInfo, l.Time) + } + } + tagField := flow_tag.AcquireFlowTag(flow_tag.TagField) + tagField.Timestamp = l.Time + tagField.FlowTagInfo = *flowTagInfo + cache.Fields = append(cache.Fields, tagField) + } +} + +var logPool = pool.NewLockFreePool(func() interface{} { + return &ApplicationLogStore{ + IsIPv4: true, + AttributeNames: []string{}, + AttributeValues: []string{}, + MetricsNames: []string{}, + MetricsValues: []float64{}, + } +}) + +func AcquireApplicationLogStore() *ApplicationLogStore { + e := logPool.Get().(*ApplicationLogStore) + e.Reset() + return e +} + +func ReleaseApplicationLogStore(l *ApplicationLogStore) { + if l == nil || l.SubReferenceCount() { + return + } + attributeNames := l.AttributeNames[:0] + attributeValues := l.AttributeValues[:0] + metricsNames := l.MetricsNames[:0] + metricsValues := l.MetricsValues[:0] + *l = ApplicationLogStore{} + l.AttributeNames = attributeNames + l.AttributeValues = attributeValues + l.MetricsNames = metricsNames + l.MetricsValues = metricsValues + l.IsIPv4 = true + + logPool.Put(l) +} diff --git a/server/ingester/app_log/dbwriter/writer.go b/server/ingester/app_log/dbwriter/writer.go new file mode 100644 index 00000000000..66138a09952 --- /dev/null +++ b/server/ingester/app_log/dbwriter/writer.go @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbwriter + +import ( + logging "github.com/op/go-logging" + + "github.com/deepflowio/deepflow/server/ingester/app_log/config" + baseconfig "github.com/deepflowio/deepflow/server/ingester/config" + "github.com/deepflowio/deepflow/server/ingester/flow_tag" + "github.com/deepflowio/deepflow/server/ingester/pkg/ckwriter" + "github.com/deepflowio/deepflow/server/libs/ckdb" + "github.com/deepflowio/deepflow/server/libs/datatype" +) + +var log = logging.MustGetLogger("app_log.dbwriter") + +const ( + LOG_DB = "application_log" + LOG_TABLE = "log" +) + +type AppLogWriter struct { + writerConfig baseconfig.CKWriterConfig + + ckWriter *ckwriter.CKWriter + flowTagWriter *flow_tag.FlowTagWriter +} + +func (w *AppLogWriter) Write(l *ApplicationLogStore) { + l.GenerateNewFlowTags(w.flowTagWriter.Cache) + w.flowTagWriter.WriteFieldsAndFieldValuesInCache() + w.ckWriter.Put(l) +} + +func NewAppLogWriter(index int, msgType datatype.MessageType, config *config.Config, ckwriter *ckwriter.CKWriter) (*AppLogWriter, error) { + w := &AppLogWriter{ + writerConfig: config.CKWriterConfig, + } + + table := LOG_TABLE + flowTagWriter, err := flow_tag.NewFlowTagWriter(index, table+"-"+msgType.String(), LOG_DB, config.TTL, ckdb.TimeFuncTwelveHour, config.Base, &w.writerConfig) + if err != nil { + return nil, err + } + + w.flowTagWriter = flowTagWriter + w.ckWriter = ckwriter + return w, nil +} + +func NewAppLogCKWriter(cfg *config.Config) (*ckwriter.CKWriter, error) { + ckdbCfg := cfg.Base.CKDB + ckTable := GenLogCKTable(ckdbCfg.ClusterName, ckdbCfg.StoragePolicy, LOG_TABLE, cfg.TTL, ckdb.GetColdStorage(cfg.Base.GetCKDBColdStorages(), LOG_DB, LOG_TABLE)) + return ckwriter.NewCKWriter(ckdbCfg.ActualAddrs, cfg.Base.CKDBAuth.Username, cfg.Base.CKDBAuth.Password, + LOG_TABLE, cfg.Base.CKDB.TimeZone, ckTable, cfg.CKWriterConfig.QueueCount, cfg.CKWriterConfig.QueueSize, cfg.CKWriterConfig.BatchSize, cfg.CKWriterConfig.FlushTimeout, cfg.Base.CKDB.Watcher) +} diff --git a/server/ingester/app_log/decoder/decoder.go b/server/ingester/app_log/decoder/decoder.go new file mode 100644 index 00000000000..26881bb747f --- /dev/null +++ b/server/ingester/app_log/decoder/decoder.go @@ -0,0 +1,360 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package decoder + +import ( + "bytes" + "encoding/json" + "fmt" + "log/syslog" + "net" + "strconv" + "strings" + "time" + + logging "github.com/op/go-logging" + + "github.com/deepflowio/deepflow/server/ingester/app_log/config" + "github.com/deepflowio/deepflow/server/ingester/app_log/dbwriter" + ingestercommon "github.com/deepflowio/deepflow/server/ingester/common" + "github.com/deepflowio/deepflow/server/libs/codec" + "github.com/deepflowio/deepflow/server/libs/datatype" + flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" + "github.com/deepflowio/deepflow/server/libs/grpc" + "github.com/deepflowio/deepflow/server/libs/queue" + "github.com/deepflowio/deepflow/server/libs/receiver" + "github.com/deepflowio/deepflow/server/libs/stats" + "github.com/deepflowio/deepflow/server/libs/utils" +) + +var log = logging.MustGetLogger("app_log.decoder") + +const ( + BUFFER_SIZE = 1024 + SEPARATOR = ", " +) + +type Counter struct { + InCount int64 `statsd:"in-count"` + OutCount int64 `statsd:"out-count"` + ErrorCount int64 `statsd:"err-count"` +} + +type Decoder struct { + index int + msgType datatype.MessageType + platformData *grpc.PlatformInfoTable + inQueue queue.QueueReader + logWriter *dbwriter.AppLogWriter + debugEnabled bool + config *config.Config + appLogEntrysCache []AppLogEntry + + counter *Counter + utils.Closable +} + +func NewDecoder( + index int, + msgType datatype.MessageType, + inQueue queue.QueueReader, + logWriter *dbwriter.AppLogWriter, + platformData *grpc.PlatformInfoTable, + config *config.Config, +) *Decoder { + return &Decoder{ + index: index, + msgType: msgType, + platformData: platformData, + inQueue: inQueue, + debugEnabled: log.IsEnabledFor(logging.DEBUG), + logWriter: logWriter, + appLogEntrysCache: make([]AppLogEntry, 0), + config: config, + counter: &Counter{}, + } +} + +func (d *Decoder) GetCounter() interface{} { + var counter *Counter + counter, d.counter = d.counter, &Counter{} + return counter +} + +func (d *Decoder) Run() { + log.Infof("application log (%s-%d) decoder run", d.msgType.String(), d.index) + ingestercommon.RegisterCountableForIngester("decoder", d, stats.OptionStatTags{ + "msg_type": d.msgType.String()}) + buffer := make([]interface{}, BUFFER_SIZE) + decoder := &codec.SimpleDecoder{} + for { + n := d.inQueue.Gets(buffer) + for i := 0; i < n; i++ { + if buffer[i] == nil { + continue + } + d.counter.InCount++ + recvBytes, ok := buffer[i].(*receiver.RecvBuffer) + if !ok { + log.Warning("get application log decode queue data type wrong") + continue + } + decoder.Init(recvBytes.Buffer[recvBytes.Begin:recvBytes.End]) + switch d.msgType { + case datatype.MESSAGE_TYPE_APPLICATION_LOG: + d.handleAppLog(recvBytes.VtapID, decoder) + case datatype.MESSAGE_TYPE_SYSLOG: + d.handleSysLog(recvBytes.VtapID, decoder) + } + receiver.ReleaseRecvBuffer(recvBytes) + } + } +} + +func (d *Decoder) handleSysLog(vtapId uint16, decoder *codec.SimpleDecoder) { + for !decoder.IsEnd() { + bytes := decoder.ReadBytes() + if decoder.Failed() { + if d.counter.ErrorCount == 0 { + log.Errorf("syslog decode failed, offset=%d len=%d", decoder.Offset(), len(decoder.Bytes())) + } + d.counter.ErrorCount++ + return + } + + if err := d.WriteSysLog(vtapId, bytes); err != nil { + if d.counter.ErrorCount == 0 { + log.Errorf("syslog parse failed: %s", err) + } + d.counter.ErrorCount++ + continue + } + d.counter.OutCount++ + } +} + +func (d *Decoder) WriteSysLog(vtapId uint16, bs []byte) error { + s := dbwriter.AcquireApplicationLogStore() + + log.Infof("recv syslog vtapId %d: %s", vtapId, bs) + // example log + // 2024-04-30T10:26:47.038297752+08:00 mars-1-V3 mars-1[5874]: [ERROR] src/sender/uniform_sender.rs:431 2-protolog-to-collector-sender sender tcp connection to 10.233.100.189:20033 failed + columns := bytes.SplitN(bs, []byte{' '}, 6) + if len(columns) != 6 { + return fmt.Errorf("log parts is %d", len(columns)) + } + datetime, err := time.Parse(time.RFC3339, string(columns[0])) + if err != nil { + return err + } + + s.Time = uint32(datetime.Unix()) + s.Timestamp = int64(datetime.UnixMicro()) + s.SetId(s.Time, d.platformData.QueryAnalyzerID()) + + host := string(columns[1]) + s.AttributeNames = append(s.AttributeNames, "host") + s.AttributeValues = append(s.AttributeValues, host) + s.AppInstance = host + + severity := syslog.Priority(0) + switch string(columns[3]) { + case "[INFO]": + severity = syslog.LOG_INFO + case "[WARN]": + severity = syslog.LOG_WARNING + case "[ERRO]", "[ERROR]": + severity = syslog.LOG_ERR + default: + return fmt.Errorf("ignored log level: %s", string(columns[3])) + } + s.SeverityNumber = uint8(severity) + s.Body = string(columns[5]) + + path := string(columns[4]) + s.AttributeNames = append(s.AttributeNames, "path") + s.AttributeValues = append(s.AttributeValues, path) + + s.OrgId, s.TeamID = d.platformData.QueryVtapOrgAndTeamID(vtapId) + d.logWriter.Write(s) + return nil +} + +func (d *Decoder) WriteAppLog(vtapId uint16, l *AppLogEntry) error { + s := dbwriter.AcquireApplicationLogStore() + timeObj, err := time.Parse(time.RFC3339, l.Timestamp) + if err != nil { + return fmt.Errorf("%s error parsing timestamp: %s", l.Timestamp, err) + } + + if l.Message == "" { + return fmt.Errorf("application log body is empty") + } + + s.Time = uint32(timeObj.Unix()) + s.Timestamp = timeObj.UnixMicro() + s.SetId(s.Time, d.platformData.QueryAnalyzerID()) + + s.AgentID = vtapId + s.OrgId, s.TeamID = d.platformData.QueryVtapOrgAndTeamID(vtapId) + s.L3EpcID = d.platformData.QueryVtapEpc0(vtapId) + + s.Body = l.Message + s.AppInstance = l.Kubernetes.PodName + + s.AttributeNames = append(s.AttributeNames, "file") + s.AttributeValues = append(s.AttributeValues, l.File) + + podName := l.Kubernetes.PodName + var ip net.IP + if l.Kubernetes.PodIp != "" { + ip = net.ParseIP(l.Kubernetes.PodIp) + } + if podName != "" { + podInfo := d.platformData.QueryPodInfo(vtapId, podName) + if podInfo != nil { + s.PodClusterID = uint16(podInfo.PodClusterId) + s.PodID = podInfo.PodId + s.L3EpcID = podInfo.EpcId + if ip == nil { + ip = net.ParseIP(podInfo.Ip) + // maybe Pod is hostnetwork mode or can't get pod IP, then get pod node IP instead + if ip == nil { + ip = net.ParseIP(podInfo.PodNodeIp) + } + } + } + } + + if ip == nil { + // if platformInfo cannot be obtained from PodId, finally fill with Vtap's platformInfo + vtapInfo := d.platformData.QueryVtapInfo(vtapId) + if vtapInfo != nil { + ip = net.ParseIP(vtapInfo.Ip) + } + } + + if ip != nil { + if ip4 := ip.To4(); ip4 != nil { + s.IsIPv4 = true + s.IP4 = utils.IpToUint32(ip4) + } else { + s.IsIPv4 = false + s.IP6 = ip + } + } + + var info *grpc.Info + if s.PodID != 0 { + info = d.platformData.QueryPodIdInfo(s.PodID) + } else { + if s.IsIPv4 && ip != nil { + info = d.platformData.QueryIPV4Infos(s.L3EpcID, s.IP4) + } else { + info = d.platformData.QueryIPV6Infos(s.L3EpcID, s.IP6) + } + } + + podGroupType := uint8(0) + if info != nil { + s.RegionID = uint16(info.RegionID) + s.AZID = uint16(info.AZID) + s.L3EpcID = info.EpcID + s.HostID = uint16(info.HostID) + if s.PodID == 0 { + s.PodID = info.PodID + } + s.PodNodeID = info.PodNodeID + s.PodNSID = uint16(info.PodNSID) + s.PodClusterID = uint16(info.PodClusterID) + s.PodGroupID = info.PodGroupID + podGroupType = info.PodGroupType + s.L3DeviceType = uint8(info.DeviceType) + s.L3DeviceID = info.DeviceID + s.SubnetID = uint16(info.SubnetID) + s.IsIPv4 = info.IsIPv4 + s.IP4 = info.IP4 + s.IP6 = info.IP6 + // if it is just Pod Node, there is no need to match the service + if ingestercommon.IsPodServiceIP(flow_metrics.DeviceType(s.L3DeviceType), s.PodID, 0) { + s.ServiceID = d.platformData.QueryService( + s.PodID, s.PodNodeID, uint32(s.PodClusterID), s.PodGroupID, s.L3EpcID, !s.IsIPv4, s.IP4, s.IP6, 0, 0) + } + } else if baseInfo := d.platformData.QueryEpcIDBaseInfo(s.L3EpcID); baseInfo != nil { + s.RegionID = uint16(baseInfo.RegionID) + } + + s.AutoInstanceID, s.AutoInstanceType = ingestercommon.GetAutoInstance(s.PodID, 0, s.PodNodeID, s.L3DeviceID, uint8(s.L3DeviceType), s.L3EpcID) + s.AutoServiceID, s.AutoServiceType = ingestercommon.GetAutoService(s.ServiceID, s.PodGroupID, 0, s.PodNodeID, s.L3DeviceID, uint8(s.L3DeviceType), podGroupType, s.L3EpcID) + + d.logWriter.Write(s) + return nil +} + +type AppLogEntry struct { + File string `json:"file"` + Kubernetes struct { + PodName string `json:"pod_name"` + PodIp string `json:"pod_ip"` + } `json:"kubernetes"` + Message string `json:"message"` + Timestamp string `json:"timestamp"` +} + +func (d *Decoder) handleAppLog(vtapId uint16, decoder *codec.SimpleDecoder) { + for !decoder.IsEnd() { + bytes := decoder.ReadBytes() + if decoder.Failed() { + if d.counter.ErrorCount == 0 { + log.Errorf("application log decode failed, offset=%d len=%d", decoder.Offset(), len(decoder.Bytes())) + } + d.counter.ErrorCount++ + return + } + d.counter.OutCount++ + + log.Infof("recv applog: vtapId: %d: %s", vtapId, bytes) + d.appLogEntrysCache = d.appLogEntrysCache[:0] + err := json.Unmarshal(bytes, &d.appLogEntrysCache) + if err != nil { + if d.counter.ErrorCount == 0 { + log.Errorf("application log json decode failed: %s", err) + } + d.counter.ErrorCount++ + return + } + for _, appLogEntry := range d.appLogEntrysCache { + if err := d.WriteAppLog(vtapId, &appLogEntry); err != nil { + if d.counter.ErrorCount == 0 { + log.Warningf("application log decode failed: %s", err) + } + d.counter.ErrorCount++ + } + } + } +} + +func uint32ArrayToStr(u32s []uint32) string { + sb := &strings.Builder{} + for i, u32 := range u32s { + sb.WriteString(strconv.Itoa(int(u32))) + if i < len(u32s)-1 { + sb.WriteString(SEPARATOR) + } + } + return sb.String() +} diff --git a/server/ingester/ingester/ingester.go b/server/ingester/ingester/ingester.go index 4c776ffd5f8..fb5283a0a8f 100644 --- a/server/ingester/ingester/ingester.go +++ b/server/ingester/ingester/ingester.go @@ -24,6 +24,7 @@ import ( "strconv" "time" + "github.com/deepflowio/deepflow/server/ingester/app_log" "github.com/deepflowio/deepflow/server/ingester/ckmonitor" "github.com/deepflowio/deepflow/server/ingester/datasource" "github.com/deepflowio/deepflow/server/ingester/exporters" @@ -37,11 +38,11 @@ import ( yaml "gopkg.in/yaml.v2" servercommon "github.com/deepflowio/deepflow/server/common" + applicationlogcfg "github.com/deepflowio/deepflow/server/ingester/app_log/config" "github.com/deepflowio/deepflow/server/ingester/ckissu" "github.com/deepflowio/deepflow/server/ingester/common" "github.com/deepflowio/deepflow/server/ingester/config" dropletcfg "github.com/deepflowio/deepflow/server/ingester/droplet/config" - "github.com/deepflowio/deepflow/server/ingester/droplet/droplet" eventcfg "github.com/deepflowio/deepflow/server/ingester/event/config" "github.com/deepflowio/deepflow/server/ingester/event/event" exporterscfg "github.com/deepflowio/deepflow/server/ingester/exporters/config" @@ -99,7 +100,7 @@ func Start(configPath string, shared *servercommon.ControllerIngesterShared) []i receiver := receiver.NewReceiver(int(cfg.ListenPort), cfg.UDPReadBuffer, cfg.TCPReadBuffer, cfg.TCPReaderBuffer) - closers := droplet.Start(dropletConfig, receiver) + closers := []io.Closer{} if cfg.IngesterEnabled { flowLogConfig := flowlogcfg.Load(cfg, configPath) @@ -130,6 +131,10 @@ func Start(configPath string, shared *servercommon.ControllerIngesterShared) []i bytes, _ = yaml.Marshal(prometheusConfig) log.Infof("prometheus config:\n%s", string(bytes)) + applicationLogConfig := applicationlogcfg.Load(cfg, configPath) + bytes, _ = yaml.Marshal(applicationLogConfig) + log.Infof("application log config:\n%s", string(bytes)) + exportersConfig := exporterscfg.Load(cfg, configPath) bytes, _ = yaml.Marshal(exportersConfig) log.Infof("exporters config:\n%s", string(bytes)) @@ -215,6 +220,12 @@ func Start(configPath string, shared *servercommon.ControllerIngesterShared) []i prometheus.Start() closers = append(closers, prometheus) + // write application log data + applicationLog, err := app_log.NewApplicationLogger(applicationLogConfig, receiver, platformDataManager) + checkError(err) + applicationLog.Start() + closers = append(closers, applicationLog) + // 检查clickhouse的磁盘空间占用,达到阈值时,自动删除老数据 cm, err := ckmonitor.NewCKMonitor(cfg) checkError(err) diff --git a/server/ingester/ingesterctl/const.go b/server/ingester/ingesterctl/const.go index fd90552f7ac..1bd1a9e3fda 100644 --- a/server/ingester/ingesterctl/const.go +++ b/server/ingester/ingesterctl/const.go @@ -37,6 +37,7 @@ const ( INGESTERCTL_EVENT_QUEUE INGESTERCTL_PROMETHEUS_QUEUE INGESTERCTL_PROFILE_QUEUE + INGESTERCTL_APPLICATION_LOG_QUEUE INGESTERCTL_MAX ) diff --git a/server/libs/ckdb/ckdb.go b/server/libs/ckdb/ckdb.go index 18e6ee720f9..26ce9a512dc 100644 --- a/server/libs/ckdb/ckdb.go +++ b/server/libs/ckdb/ckdb.go @@ -178,7 +178,7 @@ var codecTypeString = []string{ CodecDefault: "", CodecLZ4: "LZ4", CodecLZ4HC: "LZ4HC", - CodecZSTD: "ZSTD", + CodecZSTD: "ZSTD(1)", CodecT64: "T64", CodecDelta: "Delta", CodecDoubleDelta: "DoubleDelta", diff --git a/server/libs/datatype/droplet-message.go b/server/libs/datatype/droplet-message.go index a6a0928bf64..0b1445cc7d2 100644 --- a/server/libs/datatype/droplet-message.go +++ b/server/libs/datatype/droplet-message.go @@ -44,13 +44,14 @@ const ( MESSAGE_TYPE_TELEGRAF MESSAGE_TYPE_PACKETSEQUENCE - MESSAGE_TYPE_DFSTATS + MESSAGE_TYPE_DFSTATS // 10 MESSAGE_TYPE_OPENTELEMETRY_COMPRESSED MESSAGE_TYPE_RAW_PCAP MESSAGE_TYPE_PROFILE MESSAGE_TYPE_PROC_EVENT MESSAGE_TYPE_ALARM_EVENT MESSAGE_TYPE_K8S_EVENT + MESSAGE_TYPE_APPLICATION_LOG // 17 MESSAGE_TYPE_MAX ) @@ -74,6 +75,7 @@ var MessageTypeString = [MESSAGE_TYPE_MAX]string{ MESSAGE_TYPE_PROC_EVENT: "proc_event", MESSAGE_TYPE_ALARM_EVENT: "alarm_event", MESSAGE_TYPE_K8S_EVENT: "k8s_event", + MESSAGE_TYPE_APPLICATION_LOG: "application_log", } func (m MessageType) String() string { @@ -115,6 +117,7 @@ var MessageHeaderTypes = [MESSAGE_TYPE_MAX]MessageHeaderType{ MESSAGE_TYPE_PROC_EVENT: HEADER_TYPE_LT_VTAP, MESSAGE_TYPE_ALARM_EVENT: HEADER_TYPE_LT_VTAP, MESSAGE_TYPE_K8S_EVENT: HEADER_TYPE_LT_VTAP, + MESSAGE_TYPE_APPLICATION_LOG: HEADER_TYPE_LT_VTAP, } func (m MessageType) HeaderType() MessageHeaderType { diff --git a/server/server.yaml b/server/server.yaml index 54bce017d8c..0c648644238 100644 --- a/server/server.yaml +++ b/server/server.yaml @@ -564,6 +564,22 @@ ingester: ## prometheus cache expiration of label ids. uint: s #prometheus-label-cache-expiration: 86400 + + ## app log data writer config + #app-log-ck-writer: + # queue-count: 1 # parallelism of table writing + # queue-size: 50000 # size of writing queue + # batch-size: 25600 # size of batch writing + # flush-timeout: 5 # timeout of table writing + + ## app log decoder queue count/size + #app-log-decoder-queue-count: 1 + #app-log-decoder-queue-size: 16384 + + ## app log database data retention time(unit: hour) + ## Note: This configuration is only valid when DeepFlow is run for the first time or the ClickHouse tables have not yet been created + #app-log-ttl-hour: 168 + #ck-disk-monitor: # check-interval: 180 # check time interval (unit: seconds) # ttl-check-disabled: false # whether to not check TTL expired data