Skip to content

Commit

Permalink
feat: dynamic native tags are supported
Browse files Browse the repository at this point in the history
  • Loading branch information
lzf575 committed Jan 23, 2025
1 parent 7f5faff commit e5f2eb5
Show file tree
Hide file tree
Showing 25 changed files with 634 additions and 7 deletions.
10 changes: 10 additions & 0 deletions server/cmd/server/free_os_memory_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"strconv"
"time"

"github.com/deepflowio/deepflow/server/common"
"github.com/deepflowio/deepflow/server/ingester/ingesterctl"
debugcmd "github.com/deepflowio/deepflow/server/libs/debug"
"github.com/deepflowio/deepflow/server/libs/nativetag"
)

const (
Expand Down Expand Up @@ -118,6 +120,14 @@ func (f *FreeOSMemoryHandler) HandleSimpleCommand(op uint16, arg string) string
case CMD_FREE_OS_MEMORY_ONCE:
return f.CallFreeOSMemoryOnce()
case CMD_FREE_OS_MEMORY_STATUS:
common.UpdateNativeTag(false,
1,
nativetag.DEEPFLOW_TENANT,
&nativetag.NativeTag{
AttributeNames: []string{"host", "queue_capacity", "aaa"},
ColumnNames: []string{"host", "queue_capacity", "aaa"},
ColumnTypes: []nativetag.NativeTagType{nativetag.NATIVE_TAG_STRING, nativetag.NATIVE_TAG_FLOAT64, nativetag.NATIVE_TAG_STRING},
})
if f.running {
return fmt.Sprintf("running with interval %d", f.interval)
}
Expand Down
17 changes: 17 additions & 0 deletions server/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/deepflowio/deepflow/server/ingester/ingesterctl"
"github.com/deepflowio/deepflow/server/libs/debug"
"github.com/deepflowio/deepflow/server/libs/logger"
"github.com/deepflowio/deepflow/server/libs/nativetag"
"github.com/deepflowio/deepflow/server/querier/querier"

logging "github.com/op/go-logging"
Expand Down Expand Up @@ -105,6 +106,22 @@ func main() {
report.SetServerInfo(Branch, RevCount, Revision)

shared := common.NewControllerIngesterShared()
common.UpdateNativeTag(true,
1,
nativetag.DEEPFLOW_ADMIN,
&nativetag.NativeTag{
AttributeNames: []string{"host", "table", "metrics_count", "duration"},
ColumnNames: []string{"host", "table", "metrics_count", "duration"},
ColumnTypes: []nativetag.NativeTagType{nativetag.NATIVE_TAG_STRING, nativetag.NATIVE_TAG_STRING, nativetag.NATIVE_TAG_INT64, nativetag.NATIVE_TAG_FLOAT64},
})
common.UpdateNativeTag(true,
1,
nativetag.DEEPFLOW_TENANT,
&nativetag.NativeTag{
AttributeNames: []string{"host", "type", "count", "queue_capacity"},
ColumnNames: []string{"host", "type", "count", "queue_capacity"},
ColumnTypes: []nativetag.NativeTagType{nativetag.NATIVE_TAG_STRING, nativetag.NATIVE_TAG_STRING, nativetag.NATIVE_TAG_INT64, nativetag.NATIVE_TAG_FLOAT64},
})

go controller.Start(ctx, *configPath, cfg.LogFile, shared)

Expand Down
28 changes: 28 additions & 0 deletions server/common/module_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/deepflowio/deepflow/server/libs/eventapi"
"github.com/deepflowio/deepflow/server/libs/nativetag"
"github.com/deepflowio/deepflow/server/libs/queue"
"github.com/deepflowio/deepflow/server/libs/tracetree"
logging "github.com/op/go-logging"
Expand Down Expand Up @@ -84,6 +85,7 @@ func ExportersEnabled(configPath string) bool {

type OrgHanderInterface interface {
DropOrg(orgId uint16) error
UpdateNativeTag(uint16, nativetag.NativeTagTable, *nativetag.NativeTag) error
}

var ingesterOrgHanders []OrgHanderInterface
Expand Down Expand Up @@ -124,3 +126,29 @@ func DropOrg(orgId uint16) error {
}
return nil
}

// 1. When starting, you need to call the interface, and set 'isStartUp' to 'true'.
// 2. When adding or removing native_tag, you need to call the interface, and set 'isStartUp' to 'false'
func UpdateNativeTag(isStartUp bool, orgId uint16, table nativetag.NativeTagTable, nativeTag *nativetag.NativeTag) error {
log.Infof("isstart %v orgId %d update %s native tag: %+v", isStartUp, orgId, table.Table(), nativeTag)
if nativeTag == nil {
return nil
}

if !isStartUp {
if ingesterOrgHanders == nil {
err := fmt.Errorf("ingester is not ready, update native tag failed")
log.Error(err)
return err
}
for _, ingesterOrgHander := range ingesterOrgHanders {
err := ingesterOrgHander.UpdateNativeTag(orgId, table, nativeTag)
if err != nil {
log.Error(err)
return err
}
}
}
nativetag.UpdateNativeTag(orgId, table, nativeTag)
return nil
}
5 changes: 5 additions & 0 deletions server/ingester/app_log/dbwriter/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/deepflowio/deepflow/server/ingester/exporters/config"
"github.com/deepflowio/deepflow/server/ingester/flow_tag"
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/nativetag"
"github.com/deepflowio/deepflow/server/libs/pool"
)

Expand Down Expand Up @@ -114,6 +115,10 @@ type ApplicationLogStore struct {
MetricsValues []float64 `json:"metrics_values" category:"$metrics" data_type:"[]float64"`
}

func (l *ApplicationLogStore) NativeTagVersion() uint32 {
return nativetag.GetTableNativeTagsVersion(l.OrgId, nativetag.APPLICATION_LOG)
}

func (l *ApplicationLogStore) OrgID() uint16 {
return l.OrgId
}
Expand Down
18 changes: 16 additions & 2 deletions server/ingester/app_log/dbwriter/log_column_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/ClickHouse/ch-go/proto"
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/nativetag"
)

type LogBlock struct {
Expand Down Expand Up @@ -61,6 +62,7 @@ type LogBlock struct {
ColAttributeValues *proto.ColArr[string]
ColMetricsNames *proto.ColArr[string]
ColMetricsValues *proto.ColArr[float64]
*nativetag.NativeTagsBlock
}

func (b *LogBlock) Reset() {
Expand Down Expand Up @@ -102,10 +104,13 @@ func (b *LogBlock) Reset() {
b.ColAttributeValues.Reset()
b.ColMetricsNames.Reset()
b.ColMetricsValues.Reset()
if b.NativeTagsBlock != nil {
b.NativeTagsBlock.Reset()
}
}

func (b *LogBlock) ToInput(input proto.Input) proto.Input {
return append(input,
input = append(input,
proto.InputColumn{Name: ckdb.COLUMN_TIME, Data: &b.ColTime},
proto.InputColumn{Name: ckdb.COLUMN_TIMESTAMP, Data: &b.ColTimestamp},
proto.InputColumn{Name: ckdb.COLUMN__ID, Data: &b.ColId},
Expand Down Expand Up @@ -145,16 +150,22 @@ func (b *LogBlock) ToInput(input proto.Input) proto.Input {
proto.InputColumn{Name: ckdb.COLUMN_METRICS_NAMES, Data: b.ColMetricsNames},
proto.InputColumn{Name: ckdb.COLUMN_METRICS_VALUES, Data: b.ColMetricsValues},
)
if b.NativeTagsBlock != nil {
input = b.NativeTagsBlock.ToInput(input)
}
return input
}

func (n *ApplicationLogStore) NewColumnBlock() ckdb.CKColumnBlock {
return &LogBlock{
block := &LogBlock{
ColAppService: new(proto.ColStr).LowCardinality(),
ColAttributeNames: new(proto.ColStr).LowCardinality().Array(),
ColAttributeValues: new(proto.ColStr).Array(),
ColMetricsNames: new(proto.ColStr).LowCardinality().Array(),
ColMetricsValues: new(proto.ColFloat64).Array(),
NativeTagsBlock: nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.APPLICATION_LOG),
}
return block
}

func (n *ApplicationLogStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
Expand Down Expand Up @@ -197,4 +208,7 @@ func (n *ApplicationLogStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
block.ColAttributeValues.Append(n.AttributeValues)
block.ColMetricsNames.Append(n.MetricsNames)
block.ColMetricsValues.Append(n.MetricsValues)
if block.NativeTagsBlock != nil {
block.NativeTagsBlock.AppendToColumnBlock(n.AttributeNames, n.AttributeValues, n.MetricsNames, n.MetricsValues)
}
}
21 changes: 21 additions & 0 deletions server/ingester/ckissu/ckissu.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/deepflowio/deepflow/server/ingester/datasource"
"github.com/deepflowio/deepflow/server/libs/ckdb"
flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics"
"github.com/deepflowio/deepflow/server/libs/nativetag"
)

var log = logging.MustGetLogger("issu")
Expand Down Expand Up @@ -1338,19 +1339,29 @@ func (i *Issu) Start() error {

var err error
orgIDPrefixs := make([][]string, len(i.Connections))
nativeTags := nativetag.GetAllNativeTags()
// update default organization databases first
for index, connect := range i.Connections {
err = i.startOrg(index, "", connect)
if err != nil {
log.Error(err)
return err
}
for idx, nativeTag := range nativeTags[ckdb.DEFAULT_ORG_ID] {
if nativeTag == nil {
continue
}
if e := nativetag.CKAddNativeTag(i.cfg.CKDB.Type == ckdb.CKDBTypeByconity, connect, ckdb.DEFAULT_ORG_ID, nativetag.NativeTagTable(idx), nativeTag); e != nil {
log.Error(err)
}
}
orgIDPrefixs[index], err = i.getOrgIDPrefixsWithoutDefault(connect)
if err != nil {
return fmt.Errorf("get orgIDs failed, err: %s", err)
}
}

// update other organization databases
var wg sync.WaitGroup
for index, prefixes := range orgIDPrefixs {
orgCount := len(prefixes)
Expand Down Expand Up @@ -1389,7 +1400,17 @@ func (i *Issu) Start() error {
log.Error(err)
errCount++
}
orgId := parseOrgId(orgIDPrefix + "event")
for idx, nativeTag := range nativeTags[orgId] {
if nativeTag == nil {
continue
}
if e := nativetag.CKAddNativeTag(i.cfg.CKDB.Type == ckdb.CKDBTypeByconity, connect, orgId, nativetag.NativeTagTable(idx), nativeTag); e != nil {
log.Error(err)
}
}
}

log.Infof("end ckissu %+v", orgPrefixs)
}(prefixes[minIndex:maxIndex])
}
Expand Down
4 changes: 4 additions & 0 deletions server/ingester/event/dbwriter/alert_event_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (e *AlertEventStore) Release() {
ReleaseAlertEventStore(e)
}

func (e *AlertEventStore) NativeTagVersion() uint32 {
return 0
}

func (e *AlertEventStore) OrgID() uint16 {
return e.OrgId
}
Expand Down
8 changes: 8 additions & 0 deletions server/ingester/event/dbwriter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
utag "github.com/deepflowio/deepflow/server/ingester/exporters/universal_tag"
"github.com/deepflowio/deepflow/server/ingester/flow_tag"
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/nativetag"
"github.com/deepflowio/deepflow/server/libs/pool"
"github.com/deepflowio/deepflow/server/libs/utils"
)
Expand Down Expand Up @@ -106,6 +107,13 @@ type EventStore struct {
Duration uint64 `json:"duration" category:"$metrics" sub:"delay"`
}

func (e *EventStore) NativeTagVersion() uint32 {
if e.HasMetrics {
return nativetag.GetTableNativeTagsVersion(e.OrgId, nativetag.EVENT_PERF_EVENT)
}
return nativetag.GetTableNativeTagsVersion(e.OrgId, nativetag.EVENT_EVENT)
}

func (e *EventStore) OrgID() uint16 {
return e.OrgId
}
Expand Down
20 changes: 19 additions & 1 deletion server/ingester/event/dbwriter/event_column_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/ClickHouse/ch-go/proto"
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/nativetag"
)

type EventBlock struct {
Expand Down Expand Up @@ -63,6 +64,8 @@ type EventBlock struct {
HasMetrics bool
ColBytes proto.ColUInt32
ColDuration proto.ColUInt64

*nativetag.NativeTagsBlock
}

func (b *EventBlock) Reset() {
Expand Down Expand Up @@ -103,6 +106,9 @@ func (b *EventBlock) Reset() {
b.ColAttributeValues.Reset()
b.ColBytes.Reset()
b.ColDuration.Reset()
if b.NativeTagsBlock != nil {
b.NativeTagsBlock.Reset()
}
}

func (b *EventBlock) ToInput(input proto.Input) proto.Input {
Expand Down Expand Up @@ -149,16 +155,25 @@ func (b *EventBlock) ToInput(input proto.Input) proto.Input {
proto.InputColumn{Name: ckdb.COLUMN_DURATION, Data: &b.ColDuration},
)
}
if b.NativeTagsBlock != nil {
input = b.NativeTagsBlock.ToInput(input)
}
return input
}

func (n *EventStore) NewColumnBlock() ckdb.CKColumnBlock {
return &EventBlock{
b := &EventBlock{
HasMetrics: n.HasMetrics,
ColEventType: new(proto.ColStr).LowCardinality(),
ColAttributeNames: new(proto.ColStr).LowCardinality().Array(),
ColAttributeValues: new(proto.ColStr).Array(),
}
if n.HasMetrics {
b.NativeTagsBlock = nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.EVENT_PERF_EVENT)
} else {
b.NativeTagsBlock = nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.EVENT_EVENT)
}
return b
}

func (n *EventStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
Expand Down Expand Up @@ -200,4 +215,7 @@ func (n *EventStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
block.ColAttributeValues.Append(n.AttributeValues)
block.ColBytes.Append(n.Bytes)
block.ColDuration.Append(n.Duration)
if block.NativeTagsBlock != nil {
block.NativeTagsBlock.AppendToColumnBlock(n.AttributeNames, n.AttributeValues, nil, nil)
}
}
16 changes: 16 additions & 0 deletions server/ingester/ext_metrics/dbwriter/ext_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/datatype"
flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics"
"github.com/deepflowio/deepflow/server/libs/nativetag"
"github.com/deepflowio/deepflow/server/libs/pool"
)

Expand Down Expand Up @@ -89,6 +90,21 @@ func (m *ExtMetrics) VirtualTableName() string {
return m.VTableName
}

func (m *ExtMetrics) NativeTagVersion() uint32 {
switch m.MsgType {
case datatype.MESSAGE_TYPE_DFSTATS:
return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.DEEPFLOW_TENANT)
case datatype.MESSAGE_TYPE_SERVER_DFSTATS:
if ckdb.IsValidOrgID(m.RawOrgId) {
return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.DEEPFLOW_TENANT)
} else {
return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.DEEPFLOW_ADMIN)
}
default:
return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.EXT_METRICS)
}
}

func (m *ExtMetrics) OrgID() uint16 {
return m.OrgId
}
Expand Down
Loading

0 comments on commit e5f2eb5

Please sign in to comment.