Skip to content

Commit

Permalink
fix: query skywalking data update resource info
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Jan 20, 2025
1 parent 530975b commit 945118d
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 45 deletions.
2 changes: 2 additions & 0 deletions server/querier/app/tracing-adapter/model/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ type ExSpan struct {
TapSide string `json:"tap_side"` // spankind=server: s-app/ spankind=client: c-app/ spankind=internal: app
L7Protocol int `json:"l7_protocol"`
L7ProtocolStr string `json:"l7_protocol_str"`
L7ProtocolEnum string `json:"Enum(l7_protocol)"` // required, will show in span bar name in flamegraph
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
ParentSpanID string `json:"parent_span_id"`
SpanKind int `json:"span_kind"` // client/server/internal
Endpoint string `json:"endpoint"`
RequestType string `json:"request_type"` // method
RequestResource string `json:"request_resource"` // path
ResponseCode int `json:"response_code"`
ResponseStatus int `json:"response_status"`
AppService string `json:"app_service"` // service name
AppInstance string `json:"app_instance"` // service instance name
Expand Down
27 changes: 27 additions & 0 deletions server/querier/app/tracing-adapter/service/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package service

import (
"fmt"
"strings"

"github.com/deepflowio/deepflow/server/libs/datatype"
"github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/model"
"github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/service/packet_service"
"github.com/op/go-logging"
Expand All @@ -41,3 +45,26 @@ func Register() error {
}
return nil
}

func ParseUrlPath(rawURL string) (string, error) {
parts := strings.SplitN(rawURL, "://", 2)
if len(parts) != 2 || parts[1] == "" {
return "", fmt.Errorf("invalid URL format")
}
pathStart := strings.Index(parts[1], "/")
if pathStart == -1 {
return "/", nil
}

return parts[1][pathStart:], nil
}

func HttpCodeToResponseStatus(code int) datatype.LogMessageStatus {
if code >= 400 && code <= 499 {
return datatype.STATUS_CLIENT_ERROR
} else if code >= 500 && code <= 600 {
return datatype.STATUS_SERVER_ERROR
} else {
return datatype.STATUS_OK
}
}
104 changes: 59 additions & 45 deletions server/querier/app/tracing-adapter/service/skywalking.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"fmt"
"net/http"
"strconv"
"strings"

"skywalking.apache.org/repo/goapi/query"

"github.com/baidubce/bce-sdk-go/util/log"
"github.com/deepflowio/deepflow/server/libs/datatype"
"github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/common"
"github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/config"
"github.com/deepflowio/deepflow/server/querier/app/tracing-adapter/model"
Expand Down Expand Up @@ -54,12 +57,25 @@ const (
SpanTypeClient = "Exit"
SpanTypeServer = "Entry"

AttributeHTTPMethod = "http.method"
AttributeHTTPStatus_Code = "http.status_code"
AttributeHTTPStatusCode = "http.status.code"
AttributeDbStatement = "db.statement"
AttributeCacheCmd = "cache.cmd"
AttributeCacheKey = "cache.key"
AttributeURL = "url"
AttributeHttpURL = "http.url"
AttributeHTTPMethod = "http.method"
AttributeHttpScheme = "http.scheme"
AttributeHTTPStatus = "http.status"
AttributeHTTPStatus_Code = "http.status_code"
AttributeHTTPStatusCode = "http.status.code"
AttributeCacheCmd = "cache.cmd"
AttributeCacheKey = "cache.key"
AttributeDbType = "db.type"
AttributeDbSystem = "db.system"
AttributeDbStatement = "db.statement"
AttributeDbOperation = "db.operation"
AttributeRpcMethod = "rpc.method"
AttributeRpcSystem = "rpc.system"
AttributeRpcService = "rpc.service"
AttributeMessagingURL = "messaging.url"
AttributeMessagingSystem = "messaging.system"
AttributeMessagingProtocol = "messaging.protocol"

// layer possible values: Unknown, Database, RPCFramework, Http, MQ and Cache
// ref: https://github.com/apache/skywalking-query-protocol/blob/master/trace.graphqls#L94
Expand Down Expand Up @@ -267,58 +283,56 @@ func (s *SkyWalkingAdapter) swTagsToSpanRequestInfo(layer string, tags []*query.
if tags == nil {
return
}
span.L7Protocol, span.L7ProtocolStr = s.getL7Protocol(layer)
switch layer {
case LayerDatabase:
s.getDBTags(tags, span)
case LayerHTTP:
s.getHTTPTags(tags, span)
case LayerCache:
s.getCacheTags(tags, span)
default:
// Unknown
return
}
}

func (s *SkyWalkingAdapter) getL7Protocol(layer string) (int, string) {
if layer == LayerHTTP {
// the only protocol can get from span now
return 20, "HTTP"
} else {
return 0, ""
span.L7Protocol, span.L7ProtocolStr, span.L7ProtocolEnum = int(datatype.L7_PROTOCOL_HTTP_1), datatype.L7_PROTOCOL_HTTP_1.String(false), datatype.L7_PROTOCOL_HTTP_1.String(false)
}
s.getTagValue(tags, span)
// for which not match l7protocol, but found l7protocolstr by tag.value, try to match
if span.L7Protocol == 0 && len(span.L7ProtocolStr) > 0 {
l7ProtocolStrLower := strings.ToLower(span.L7ProtocolStr)
for l7ProtocolEnumStr, l7ProtocolMap := range datatype.L7ProtocolStringMap {
if strings.Contains(l7ProtocolEnumStr, l7ProtocolStrLower) {
span.L7Protocol = int(l7ProtocolMap)
span.L7ProtocolEnum = l7ProtocolEnumStr
break
}
}
}
}

func (s *SkyWalkingAdapter) getHTTPTags(tags []*query.KeyValue, span *model.ExSpan) {
func (s *SkyWalkingAdapter) getTagValue(tags []*query.KeyValue, span *model.ExSpan) {
httpURL := ""
for _, v := range tags {
if span.L7Protocol == 0 && len(span.L7ProtocolStr) == 0 {
// if layer != http (maybe is unknown), but have some http attributes, it's http
if strings.HasPrefix(v.Key, "http") {
span.L7Protocol, span.L7ProtocolStr, span.L7ProtocolEnum = int(datatype.L7_PROTOCOL_HTTP_1), datatype.L7_PROTOCOL_HTTP_1.String(false), datatype.L7_PROTOCOL_HTTP_1.String(false)
}
}
switch v.Key {
case AttributeHTTPMethod:
span.RequestType = *v.Value // http method
case AttributeHTTPStatusCode, AttributeHTTPStatus_Code:
case AttributeURL, AttributeHttpURL:
httpURL = *v.Value
case AttributeHTTPMethod, AttributeCacheCmd, AttributeDbOperation, AttributeRpcMethod:
span.RequestType = *v.Value
case AttributeHTTPStatusCode, AttributeHTTPStatus_Code, AttributeHTTPStatus:
code, err := strconv.Atoi(*v.Value)
if err == nil {
span.ResponseStatus = code
span.ResponseCode = code
}
}
}
}

func (s *SkyWalkingAdapter) getDBTags(tags []*query.KeyValue, span *model.ExSpan) {
for _, v := range tags {
if v.Key == AttributeDbStatement {
case AttributeDbStatement, AttributeCacheKey:
span.RequestResource = *v.Value
case AttributeDbType, AttributeDbSystem, AttributeHttpScheme, AttributeRpcSystem, AttributeMessagingSystem, AttributeMessagingProtocol:
span.L7ProtocolStr = *v.Value
}
}
}

func (s *SkyWalkingAdapter) getCacheTags(tags []*query.KeyValue, span *model.ExSpan) {
for _, v := range tags {
switch v.Key {
case AttributeCacheCmd:
span.RequestType = *v.Value
case AttributeCacheKey:
span.RequestResource = *v.Value
if span.RequestResource == "" && httpURL != "" {
parsedURLPath, err := ParseUrlPath(httpURL)
if err != nil {
log.Warnf("query skywalking data get http.url (%s) parsed failed : %s", httpURL, err)
} else {
span.RequestResource = parsedURLPath
}
}
span.ResponseStatus = int(HttpCodeToResponseStatus(span.ResponseCode))
}

0 comments on commit 945118d

Please sign in to comment.