From 945118dd910b560b8da5583e0b04bc7296d51bcd Mon Sep 17 00:00:00 2001 From: taloric Date: Mon, 20 Jan 2025 16:23:34 +0800 Subject: [PATCH] fix: query skywalking data update resource info --- .../app/tracing-adapter/model/tracing.go | 2 + .../app/tracing-adapter/service/base.go | 27 +++++ .../app/tracing-adapter/service/skywalking.go | 104 ++++++++++-------- 3 files changed, 88 insertions(+), 45 deletions(-) diff --git a/server/querier/app/tracing-adapter/model/tracing.go b/server/querier/app/tracing-adapter/model/tracing.go index cfeafb72434..b6d89701aef 100644 --- a/server/querier/app/tracing-adapter/model/tracing.go +++ b/server/querier/app/tracing-adapter/model/tracing.go @@ -27,6 +27,7 @@ type ExSpan struct { TapSide string `json:"tap_side"` // spankind=server: s-app/ spankind=client: c-app/ spankind=internal: app L7Protocol int `json:"l7_protocol"` L7ProtocolStr string `json:"l7_protocol_str"` + L7ProtocolEnum string `json:"Enum(l7_protocol)"` // required, will show in span bar name in flamegraph TraceID string `json:"trace_id"` SpanID string `json:"span_id"` ParentSpanID string `json:"parent_span_id"` @@ -34,6 +35,7 @@ type ExSpan struct { Endpoint string `json:"endpoint"` RequestType string `json:"request_type"` // method RequestResource string `json:"request_resource"` // path + ResponseCode int `json:"response_code"` ResponseStatus int `json:"response_status"` AppService string `json:"app_service"` // service name AppInstance string `json:"app_instance"` // service instance name diff --git a/server/querier/app/tracing-adapter/service/base.go b/server/querier/app/tracing-adapter/service/base.go index 76df48fafee..a78a459ad56 100644 --- a/server/querier/app/tracing-adapter/service/base.go +++ b/server/querier/app/tracing-adapter/service/base.go @@ -17,6 +17,10 @@ package service import ( + "fmt" + "strings" + + "github.com/deepflowio/deepflow/server/libs/datatype" "github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/model" "github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/service/packet_service" "github.com/op/go-logging" @@ -41,3 +45,26 @@ func Register() error { } return nil } + +func ParseUrlPath(rawURL string) (string, error) { + parts := strings.SplitN(rawURL, "://", 2) + if len(parts) != 2 || parts[1] == "" { + return "", fmt.Errorf("invalid URL format") + } + pathStart := strings.Index(parts[1], "/") + if pathStart == -1 { + return "/", nil + } + + return parts[1][pathStart:], nil +} + +func HttpCodeToResponseStatus(code int) datatype.LogMessageStatus { + if code >= 400 && code <= 499 { + return datatype.STATUS_CLIENT_ERROR + } else if code >= 500 && code <= 600 { + return datatype.STATUS_SERVER_ERROR + } else { + return datatype.STATUS_OK + } +} diff --git a/server/querier/app/tracing-adapter/service/skywalking.go b/server/querier/app/tracing-adapter/service/skywalking.go index 0537659949c..bd7e7c583bc 100644 --- a/server/querier/app/tracing-adapter/service/skywalking.go +++ b/server/querier/app/tracing-adapter/service/skywalking.go @@ -20,9 +20,12 @@ import ( "fmt" "net/http" "strconv" + "strings" "skywalking.apache.org/repo/goapi/query" + "github.com/baidubce/bce-sdk-go/util/log" + "github.com/deepflowio/deepflow/server/libs/datatype" "github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/common" "github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/config" "github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/model" @@ -54,12 +57,25 @@ const ( SpanTypeClient = "Exit" SpanTypeServer = "Entry" - AttributeHTTPMethod = "http.method" - AttributeHTTPStatus_Code = "http.status_code" - AttributeHTTPStatusCode = "http.status.code" - AttributeDbStatement = "db.statement" - AttributeCacheCmd = "cache.cmd" - AttributeCacheKey = "cache.key" + AttributeURL = "url" + AttributeHttpURL = "http.url" + AttributeHTTPMethod = "http.method" + AttributeHttpScheme = "http.scheme" + AttributeHTTPStatus = "http.status" + AttributeHTTPStatus_Code = "http.status_code" + AttributeHTTPStatusCode = "http.status.code" + AttributeCacheCmd = "cache.cmd" + AttributeCacheKey = "cache.key" + AttributeDbType = "db.type" + AttributeDbSystem = "db.system" + AttributeDbStatement = "db.statement" + AttributeDbOperation = "db.operation" + AttributeRpcMethod = "rpc.method" + AttributeRpcSystem = "rpc.system" + AttributeRpcService = "rpc.service" + AttributeMessagingURL = "messaging.url" + AttributeMessagingSystem = "messaging.system" + AttributeMessagingProtocol = "messaging.protocol" // layer possible values: Unknown, Database, RPCFramework, Http, MQ and Cache // ref: https://github.com/apache/skywalking-query-protocol/blob/master/trace.graphqls#L94 @@ -267,58 +283,56 @@ func (s *SkyWalkingAdapter) swTagsToSpanRequestInfo(layer string, tags []*query. if tags == nil { return } - span.L7Protocol, span.L7ProtocolStr = s.getL7Protocol(layer) - switch layer { - case LayerDatabase: - s.getDBTags(tags, span) - case LayerHTTP: - s.getHTTPTags(tags, span) - case LayerCache: - s.getCacheTags(tags, span) - default: - // Unknown - return - } -} - -func (s *SkyWalkingAdapter) getL7Protocol(layer string) (int, string) { if layer == LayerHTTP { - // the only protocol can get from span now - return 20, "HTTP" - } else { - return 0, "" + span.L7Protocol, span.L7ProtocolStr, span.L7ProtocolEnum = int(datatype.L7_PROTOCOL_HTTP_1), datatype.L7_PROTOCOL_HTTP_1.String(false), datatype.L7_PROTOCOL_HTTP_1.String(false) + } + s.getTagValue(tags, span) + // for which not match l7protocol, but found l7protocolstr by tag.value, try to match + if span.L7Protocol == 0 && len(span.L7ProtocolStr) > 0 { + l7ProtocolStrLower := strings.ToLower(span.L7ProtocolStr) + for l7ProtocolEnumStr, l7ProtocolMap := range datatype.L7ProtocolStringMap { + if strings.Contains(l7ProtocolEnumStr, l7ProtocolStrLower) { + span.L7Protocol = int(l7ProtocolMap) + span.L7ProtocolEnum = l7ProtocolEnumStr + break + } + } } } -func (s *SkyWalkingAdapter) getHTTPTags(tags []*query.KeyValue, span *model.ExSpan) { +func (s *SkyWalkingAdapter) getTagValue(tags []*query.KeyValue, span *model.ExSpan) { + httpURL := "" for _, v := range tags { + if span.L7Protocol == 0 && len(span.L7ProtocolStr) == 0 { + // if layer != http (maybe is unknown), but have some http attributes, it's http + if strings.HasPrefix(v.Key, "http") { + span.L7Protocol, span.L7ProtocolStr, span.L7ProtocolEnum = int(datatype.L7_PROTOCOL_HTTP_1), datatype.L7_PROTOCOL_HTTP_1.String(false), datatype.L7_PROTOCOL_HTTP_1.String(false) + } + } switch v.Key { - case AttributeHTTPMethod: - span.RequestType = *v.Value // http method - case AttributeHTTPStatusCode, AttributeHTTPStatus_Code: + case AttributeURL, AttributeHttpURL: + httpURL = *v.Value + case AttributeHTTPMethod, AttributeCacheCmd, AttributeDbOperation, AttributeRpcMethod: + span.RequestType = *v.Value + case AttributeHTTPStatusCode, AttributeHTTPStatus_Code, AttributeHTTPStatus: code, err := strconv.Atoi(*v.Value) if err == nil { - span.ResponseStatus = code + span.ResponseCode = code } - } - } -} - -func (s *SkyWalkingAdapter) getDBTags(tags []*query.KeyValue, span *model.ExSpan) { - for _, v := range tags { - if v.Key == AttributeDbStatement { + case AttributeDbStatement, AttributeCacheKey: span.RequestResource = *v.Value + case AttributeDbType, AttributeDbSystem, AttributeHttpScheme, AttributeRpcSystem, AttributeMessagingSystem, AttributeMessagingProtocol: + span.L7ProtocolStr = *v.Value } } -} -func (s *SkyWalkingAdapter) getCacheTags(tags []*query.KeyValue, span *model.ExSpan) { - for _, v := range tags { - switch v.Key { - case AttributeCacheCmd: - span.RequestType = *v.Value - case AttributeCacheKey: - span.RequestResource = *v.Value + if span.RequestResource == "" && httpURL != "" { + parsedURLPath, err := ParseUrlPath(httpURL) + if err != nil { + log.Warnf("query skywalking data get http.url (%s) parsed failed : %s", httpURL, err) + } else { + span.RequestResource = parsedURLPath } } + span.ResponseStatus = int(HttpCodeToResponseStatus(span.ResponseCode)) }