diff --git a/server/ingester/profile/config/config.go b/server/ingester/profile/config/config.go index 58d126416713..a790bd823162 100644 --- a/server/ingester/profile/config/config.go +++ b/server/ingester/profile/config/config.go @@ -28,12 +28,13 @@ import ( var log = logging.MustGetLogger("profile.config") type Config struct { - Base *config.Config - CKWriterConfig config.CKWriterConfig `yaml:"profile-ck-writer"` - ProfileTTL int `yaml:"profile-ttl-hour"` - DecoderQueueCount int `yaml:"profile-decoder-queue-count"` - DecoderQueueSize int `yaml:"profile-decoder-queue-size"` - CompressionAlgorithm *string `yaml:"profile-compression-algorithm"` + Base *config.Config + CKWriterConfig config.CKWriterConfig `yaml:"profile-ck-writer"` + ProfileTTL int `yaml:"profile-ttl-hour"` + DecoderQueueCount int `yaml:"profile-decoder-queue-count"` + DecoderQueueSize int `yaml:"profile-decoder-queue-size"` + CompressionAlgorithm *string `yaml:"profile-compression-algorithm"` + OffCpuSplittingGranularity int `yaml:"profile-off-cpu-splitting-granularity"` } type ProfileConfig struct { @@ -41,9 +42,10 @@ type ProfileConfig struct { } const ( - DefaultProfileTTL = 72 // hour - DefaultDecoderQueueCount = 2 - DefaultDecoderQueueSize = 1 << 14 + DefaultProfileTTL = 72 // hour + DefaultDecoderQueueCount = 2 + DefaultDecoderQueueSize = 1 << 14 + DefaultOffCpuSplittingGranularity = 1 ) func (c *Config) Validate() error { @@ -72,11 +74,12 @@ func (c *Config) Validate() error { func Load(base *config.Config, path string) *Config { config := &ProfileConfig{ Profile: Config{ - Base: base, - CKWriterConfig: config.CKWriterConfig{QueueCount: 1, QueueSize: 100000, BatchSize: 51200, FlushTimeout: 5}, - ProfileTTL: DefaultProfileTTL, - DecoderQueueCount: DefaultDecoderQueueCount, - DecoderQueueSize: DefaultDecoderQueueSize, + Base: base, + CKWriterConfig: config.CKWriterConfig{QueueCount: 1, QueueSize: 100000, BatchSize: 51200, FlushTimeout: 5}, + ProfileTTL: DefaultProfileTTL, + DecoderQueueCount: DefaultDecoderQueueCount, + DecoderQueueSize: DefaultDecoderQueueSize, + OffCpuSplittingGranularity: DefaultOffCpuSplittingGranularity, }, } if _, err := os.Stat(path); os.IsNotExist(err) { diff --git a/server/ingester/profile/dbwriter/profile.go b/server/ingester/profile/dbwriter/profile.go index 0ea759e0f248..b6d59c619e86 100644 --- a/server/ingester/profile/dbwriter/profile.go +++ b/server/ingester/profile/dbwriter/profile.go @@ -131,14 +131,14 @@ func ProfileColumns() []*ckdb.Column { ckdb.NewColumn("ip6", ckdb.IPv6).SetComment("IPV6地址"), ckdb.NewColumn("is_ipv4", ckdb.UInt8).SetComment("是否为IPv4地址").SetIndex(ckdb.IndexMinmax), - ckdb.NewColumn("app_service", ckdb.String).SetComment("应用名称, 用户配置上报"), + ckdb.NewColumn("app_service", ckdb.LowCardinalityString).SetComment("应用名称, 用户配置上报"), ckdb.NewColumn("profile_location_str", ckdb.String).SetComment("单次 profile 堆栈"), ckdb.NewColumn("profile_value", ckdb.Int64).SetComment("profile self value"), - ckdb.NewColumn("profile_value_unit", ckdb.String).SetComment("profile value 的单位"), - ckdb.NewColumn("profile_event_type", ckdb.String).SetComment("剖析类型"), + ckdb.NewColumn("profile_value_unit", ckdb.LowCardinalityString).SetComment("profile value 的单位"), + ckdb.NewColumn("profile_event_type", ckdb.LowCardinalityString).SetComment("剖析类型"), ckdb.NewColumn("profile_create_timestamp", ckdb.DateTime64us).SetIndex(ckdb.IndexSet).SetComment("client 端聚合时间"), ckdb.NewColumn("profile_in_timestamp", ckdb.DateTime64us).SetComment("DeepFlow 的写入时间,同批上报的批次数据具备相同的值"), - ckdb.NewColumn("profile_language_type", ckdb.String).SetComment("语言类型"), + ckdb.NewColumn("profile_language_type", ckdb.LowCardinalityString).SetComment("语言类型"), ckdb.NewColumn("profile_id", ckdb.String).SetComment("含义等同 l7_flow_log 的 span_id"), ckdb.NewColumn("trace_id", ckdb.String).SetComment("含义等同 l7_flow_log 的 trace_id"), ckdb.NewColumn("span_name", ckdb.String).SetComment("含义等同 l7_flow_log 的 endpoint"), @@ -271,6 +271,16 @@ func ReleaseInProcess(p *InProcessProfile) { poolInProcess.Put(p) } +func (p *InProcessProfile) Clone() *InProcessProfile { + c := AcquireInProcess() + *c = *p + c.TagNames = make([]string, len(p.TagNames)) + copy(p.TagNames, p.TagNames) + c.TagValues = make([]string, len(p.TagValues)) + copy(p.TagValues, p.TagValues) + return c +} + func (p *InProcessProfile) FillProfile(input *storage.PutInput, platformData *grpc.PlatformInfoTable, vtapID uint16, diff --git a/server/ingester/profile/dbwriter/profile_writer.go b/server/ingester/profile/dbwriter/profile_writer.go index d243fa9baf3c..af4cb9cd9fcd 100644 --- a/server/ingester/profile/dbwriter/profile_writer.go +++ b/server/ingester/profile/dbwriter/profile_writer.go @@ -73,13 +73,13 @@ func (p *ProfileWriter) GetCounter() interface{} { return counter } -func (p *ProfileWriter) Write(m interface{}) { - inProcess := m.(*InProcessProfile) +func (p *ProfileWriter) Write(m []interface{}) { + inProcess := m[0].(*InProcessProfile) inProcess.GenerateFlowTags(p.flowTagWriter.Cache) p.flowTagWriter.WriteFieldsAndFieldValuesInCache() - atomic.AddInt64(&p.counter.ProfilesCount, 1) - p.ckWriter.Put(m) + atomic.AddInt64(&p.counter.ProfilesCount, int64(len(m))) + p.ckWriter.Put(m...) } func NewProfileWriter(msgType datatype.MessageType, decoderIndex int, config *config.Config) (*ProfileWriter, error) { diff --git a/server/ingester/profile/decoder/decoder.go b/server/ingester/profile/decoder/decoder.go index 704d151dfa26..a46fc832222c 100644 --- a/server/ingester/profile/decoder/decoder.go +++ b/server/ingester/profile/decoder/decoder.go @@ -65,6 +65,10 @@ type Counter struct { TotalTime int64 `statsd:"total-time"` AvgTime int64 `statsd:"avg-time"` + + OffCpuSplitCount int64 `statsd:"off-cpu-split-count"` + OffCpuSplitIntoCount int64 `statsd:"off-cpu-split-into-count"` + TotalNotSplitCount int64 `statsd:"total-not-split-count"` } var spyMap = map[string]string{ @@ -78,7 +82,7 @@ var spyMap = map[string]string{ "eBPF": "eBPF", } -var eBPFEventType = map[pb.ProfileEventType]string{ +var eBPFEventType = []string{ pb.ProfileEventType_External: "third-party", pb.ProfileEventType_EbpfOnCpu: "on-cpu", pb.ProfileEventType_EbpfOffCpu: "off-cpu", @@ -92,22 +96,26 @@ type Decoder struct { profileWriter *dbwriter.ProfileWriter compressionAlgo string + offCpuSplittingGranularity int + counter *Counter utils.Closable } func NewDecoder(index int, msgType datatype.MessageType, compressionAlgo string, + offCpuSplittingGranularity int, platformData *grpc.PlatformInfoTable, inQueue queue.QueueReader, profileWriter *dbwriter.ProfileWriter) *Decoder { return &Decoder{ - index: index, - msgType: msgType, - platformData: platformData, - inQueue: inQueue, - profileWriter: profileWriter, - compressionAlgo: compressionAlgo, - counter: &Counter{}, + index: index, + msgType: msgType, + platformData: platformData, + inQueue: inQueue, + profileWriter: profileWriter, + compressionAlgo: compressionAlgo, + offCpuSplittingGranularity: offCpuSplittingGranularity, + counter: &Counter{}, } } @@ -159,15 +167,16 @@ func (d *Decoder) handleProfileData(vtapID uint16, decoder *codec.SimpleDecoder) } parser := &Parser{ - vtapID: vtapID, - inTimestamp: time.Now(), - callBack: d.profileWriter.Write, - platformData: d.platformData, - IP: make([]byte, len(profile.Ip)), - podID: profile.PodId, - compressionAlgo: d.compressionAlgo, - observer: &observer{}, - Counter: d.counter, + vtapID: vtapID, + inTimestamp: time.Now(), + callBack: d.profileWriter.Write, + platformData: d.platformData, + IP: make([]byte, len(profile.Ip)), + podID: profile.PodId, + compressionAlgo: d.compressionAlgo, + observer: &observer{}, + offCpuSplittingGranularity: d.offCpuSplittingGranularity, + Counter: d.counter, } copy(parser.IP, profile.Ip[:len(profile.Ip)]) diff --git a/server/ingester/profile/decoder/decoder_parser.go b/server/ingester/profile/decoder/decoder_parser.go index f8b7f3fc1a77..1a3e3e68754c 100644 --- a/server/ingester/profile/decoder/decoder_parser.go +++ b/server/ingester/profile/decoder/decoder_parser.go @@ -26,12 +26,18 @@ import ( "github.com/deepflowio/deepflow/server/ingester/profile/common" "github.com/deepflowio/deepflow/server/ingester/profile/dbwriter" + "github.com/deepflowio/deepflow/server/libs/flow-metrics/pb" "github.com/deepflowio/deepflow/server/libs/grpc" "github.com/deepflowio/deepflow/server/libs/utils" "github.com/klauspost/compress/zstd" "github.com/pyroscope-io/pyroscope/pkg/storage" ) +const ( + // the maximum duration of off-cpu profile is 1h + <1s + MAX_OFF_CPU_PROFILE_SPLIT_COUNT = 4000 +) + type Parser struct { profileName string vtapID uint16 @@ -39,7 +45,8 @@ type Parser struct { podID uint32 // profileWriter.Write - callBack func(interface{}) + callBack func([]interface{}) + offCpuSplittingGranularity int platformData *grpc.PlatformInfoTable inTimestamp time.Time @@ -75,7 +82,7 @@ func (p *Parser) Evaluate(i *storage.PutInput) (storage.SampleObserver, bool) { return p.observer, true } -func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value uint64) *dbwriter.InProcessProfile { +func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value uint64) []interface{} { labels := input.Key.Labels() tagNames := make([]string, 0, len(labels)) tagValues := make([]string, 0, len(labels)) @@ -105,9 +112,9 @@ func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value location := compress(onelineStack, p.compressionAlgo) atomic.AddInt64(&p.Counter.CompressedSize, int64(len(location))) - profileValue := value + profileValueUs := int64(value) if p.processTracer != nil { - profileValue = uint64(p.value) + profileValueUs = int64(p.value) pid = p.processTracer.pid stime = p.processTracer.stime eventType = p.processTracer.eventType @@ -122,7 +129,7 @@ func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value eventType, location, p.compressionAlgo, - int64(profileValue), + profileValueUs, p.inTimestamp, spyMap[input.SpyName], pid, @@ -130,7 +137,41 @@ func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value tagNames, tagValues) - return ret + var writeItems []interface{} + granularityUs := int64(p.offCpuSplittingGranularity) * int64(time.Second/time.Microsecond) + // each off-cpu profile data represents the function call stack within a long period of time (perhaps up to one hour). It is inappropriate to use a single start_time to express a period of time. + if ret.ProfileEventType == eBPFEventType[pb.ProfileEventType_EbpfOffCpu] && + p.offCpuSplittingGranularity > 0 && + profileValueUs > granularityUs { + + splitCount := (profileValueUs + granularityUs - 1) / granularityUs + // prevent abnormal data from causing excessive writing + if splitCount > MAX_OFF_CPU_PROFILE_SPLIT_COUNT { + splitCount = MAX_OFF_CPU_PROFILE_SPLIT_COUNT + } + + writeItems = make([]interface{}, 0, splitCount) + for i := int64(0); i < splitCount-1; i++ { + splitItem := ret.Clone() + splitItem.Time = splitItem.Time + uint32(i)*uint32(p.offCpuSplittingGranularity) + splitItem.ProfileCreateTimestamp = splitItem.ProfileCreateTimestamp + i*granularityUs + splitItem.ProfileValue = granularityUs + writeItems = append(writeItems, splitItem) + } + atomic.AddInt64(&p.Counter.OffCpuSplitCount, 1) + atomic.AddInt64(&p.Counter.OffCpuSplitIntoCount, int64(splitCount)) + + // add last split item from ret itself + ret.Time = ret.Time + uint32(splitCount-1)*uint32(p.offCpuSplittingGranularity) + ret.ProfileCreateTimestamp = ret.ProfileCreateTimestamp + (splitCount-1)*granularityUs + ret.ProfileValue = profileValueUs - (splitCount-1)*granularityUs + writeItems = append(writeItems, ret) + } else { + atomic.AddInt64(&p.Counter.TotalNotSplitCount, 1) + writeItems = []interface{}{ret} + } + + return writeItems } type observer struct { diff --git a/server/ingester/profile/profile/profile.go b/server/ingester/profile/profile/profile.go index eb2b20dec987..80c0367d622d 100644 --- a/server/ingester/profile/profile/profile.go +++ b/server/ingester/profile/profile/profile.go @@ -82,6 +82,7 @@ func NewProfiler(msgType datatype.MessageType, config *config.Config, platformDa i, msgType, *config.CompressionAlgorithm, + config.OffCpuSplittingGranularity, platformDatas[i], queue.QueueReader(decodeQueues.FixedMultiQueue[i]), profileWriter, diff --git a/server/server.yaml b/server/server.yaml index b75bfc57e18b..574d6fc43a9c 100644 --- a/server/server.yaml +++ b/server/server.yaml @@ -524,6 +524,9 @@ ingester: ## profile compression algorithm, default is zstd, empty string for not compress #profile-compression-algorithm: "zstd" + ## off-cpu pofile splitting granularity, 0 mean disable splitting (unit: second) + #profile-off-cpu-splitting-granularity: 1 + ## 默认读超时,修改数据保留时长时使用 #ck-read-timeout: 300