diff --git a/server/cmd/server/free_os_memory_handler.go b/server/cmd/server/free_os_memory_handler.go index 3fc994f80ca..d9d62ccbee3 100644 --- a/server/cmd/server/free_os_memory_handler.go +++ b/server/cmd/server/free_os_memory_handler.go @@ -7,8 +7,10 @@ import ( "strconv" "time" + "github.com/deepflowio/deepflow/server/common" "github.com/deepflowio/deepflow/server/ingester/ingesterctl" debugcmd "github.com/deepflowio/deepflow/server/libs/debug" + "github.com/deepflowio/deepflow/server/libs/nativetag" ) const ( @@ -118,6 +120,14 @@ func (f *FreeOSMemoryHandler) HandleSimpleCommand(op uint16, arg string) string case CMD_FREE_OS_MEMORY_ONCE: return f.CallFreeOSMemoryOnce() case CMD_FREE_OS_MEMORY_STATUS: + common.UpdateNativeTag(false, + 1, + nativetag.DEEPFLOW_TENANT, + &nativetag.NativeTag{ + AttributeNames: []string{"host", "queue_capacity", "aaa"}, + ColumnNames: []string{"host", "queue_capacity", "aaa"}, + ColumnTypes: []nativetag.NativeTagType{nativetag.NATIVE_TAG_STRING, nativetag.NATIVE_TAG_FLOAT64, nativetag.NATIVE_TAG_STRING}, + }) if f.running { return fmt.Sprintf("running with interval %d", f.interval) } diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index 76f45dbfb6b..8c3e4b2f277 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -36,6 +36,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/ingesterctl" "github.com/deepflowio/deepflow/server/libs/debug" "github.com/deepflowio/deepflow/server/libs/logger" + "github.com/deepflowio/deepflow/server/libs/nativetag" "github.com/deepflowio/deepflow/server/querier/querier" logging "github.com/op/go-logging" @@ -105,6 +106,22 @@ func main() { report.SetServerInfo(Branch, RevCount, Revision) shared := common.NewControllerIngesterShared() + common.UpdateNativeTag(true, + 1, + nativetag.DEEPFLOW_ADMIN, + &nativetag.NativeTag{ + AttributeNames: []string{"host", "table", "metrics_count", "duration"}, + ColumnNames: []string{"host", "table", "metrics_count", "duration"}, + ColumnTypes: []nativetag.NativeTagType{nativetag.NATIVE_TAG_STRING, nativetag.NATIVE_TAG_STRING, nativetag.NATIVE_TAG_INT64, nativetag.NATIVE_TAG_FLOAT64}, + }) + common.UpdateNativeTag(true, + 1, + nativetag.DEEPFLOW_TENANT, + &nativetag.NativeTag{ + AttributeNames: []string{"host", "type", "count", "queue_capacity"}, + ColumnNames: []string{"host", "type", "count", "queue_capacity"}, + ColumnTypes: []nativetag.NativeTagType{nativetag.NATIVE_TAG_STRING, nativetag.NATIVE_TAG_STRING, nativetag.NATIVE_TAG_INT64, nativetag.NATIVE_TAG_FLOAT64}, + }) go controller.Start(ctx, *configPath, cfg.LogFile, shared) diff --git a/server/common/module_shared.go b/server/common/module_shared.go index 41375e079e4..241d47c704a 100644 --- a/server/common/module_shared.go +++ b/server/common/module_shared.go @@ -22,6 +22,7 @@ import ( "time" "github.com/deepflowio/deepflow/server/libs/eventapi" + "github.com/deepflowio/deepflow/server/libs/nativetag" "github.com/deepflowio/deepflow/server/libs/queue" "github.com/deepflowio/deepflow/server/libs/tracetree" logging "github.com/op/go-logging" @@ -84,6 +85,7 @@ func ExportersEnabled(configPath string) bool { type OrgHanderInterface interface { DropOrg(orgId uint16) error + UpdateNativeTag(uint16, nativetag.NativeTagTable, *nativetag.NativeTag) error } var ingesterOrgHanders []OrgHanderInterface @@ -124,3 +126,29 @@ func DropOrg(orgId uint16) error { } return nil } + +// 1. When starting, you need to call the interface, and set 'isStartUp' to 'true'. +// 2. When adding or removing native_tag, you need to call the interface, and set 'isStartUp' to 'false' +func UpdateNativeTag(isStartUp bool, orgId uint16, table nativetag.NativeTagTable, nativeTag *nativetag.NativeTag) error { + log.Infof("isstart %v orgId %d update %s native tag: %+v", isStartUp, orgId, table.Table(), nativeTag) + if nativeTag == nil { + return nil + } + + if !isStartUp { + if ingesterOrgHanders == nil { + err := fmt.Errorf("ingester is not ready, update native tag failed") + log.Error(err) + return err + } + for _, ingesterOrgHander := range ingesterOrgHanders { + err := ingesterOrgHander.UpdateNativeTag(orgId, table, nativeTag) + if err != nil { + log.Error(err) + return err + } + } + } + nativetag.UpdateNativeTag(orgId, table, nativeTag) + return nil +} diff --git a/server/ingester/app_log/dbwriter/log.go b/server/ingester/app_log/dbwriter/log.go index 1359d6d5fbf..b0b2666e9d0 100644 --- a/server/ingester/app_log/dbwriter/log.go +++ b/server/ingester/app_log/dbwriter/log.go @@ -26,6 +26,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/exporters/config" "github.com/deepflowio/deepflow/server/ingester/flow_tag" "github.com/deepflowio/deepflow/server/libs/ckdb" + "github.com/deepflowio/deepflow/server/libs/nativetag" "github.com/deepflowio/deepflow/server/libs/pool" ) @@ -114,6 +115,10 @@ type ApplicationLogStore struct { MetricsValues []float64 `json:"metrics_values" category:"$metrics" data_type:"[]float64"` } +func (l *ApplicationLogStore) NativeTagVersion() uint32 { + return nativetag.GetTableNativeTagsVersion(l.OrgId, nativetag.APPLICATION_LOG) +} + func (l *ApplicationLogStore) OrgID() uint16 { return l.OrgId } diff --git a/server/ingester/app_log/dbwriter/log_column_block.go b/server/ingester/app_log/dbwriter/log_column_block.go index 7d80749d905..1cf1923dc48 100644 --- a/server/ingester/app_log/dbwriter/log_column_block.go +++ b/server/ingester/app_log/dbwriter/log_column_block.go @@ -20,6 +20,7 @@ import ( "github.com/ClickHouse/ch-go/proto" "github.com/deepflowio/deepflow/server/libs/ckdb" + "github.com/deepflowio/deepflow/server/libs/nativetag" ) type LogBlock struct { @@ -61,6 +62,7 @@ type LogBlock struct { ColAttributeValues *proto.ColArr[string] ColMetricsNames *proto.ColArr[string] ColMetricsValues *proto.ColArr[float64] + *nativetag.NativeTagsBlock } func (b *LogBlock) Reset() { @@ -102,10 +104,13 @@ func (b *LogBlock) Reset() { b.ColAttributeValues.Reset() b.ColMetricsNames.Reset() b.ColMetricsValues.Reset() + if b.NativeTagsBlock != nil { + b.NativeTagsBlock.Reset() + } } func (b *LogBlock) ToInput(input proto.Input) proto.Input { - return append(input, + input = append(input, proto.InputColumn{Name: ckdb.COLUMN_TIME, Data: &b.ColTime}, proto.InputColumn{Name: ckdb.COLUMN_TIMESTAMP, Data: &b.ColTimestamp}, proto.InputColumn{Name: ckdb.COLUMN__ID, Data: &b.ColId}, @@ -145,16 +150,22 @@ func (b *LogBlock) ToInput(input proto.Input) proto.Input { proto.InputColumn{Name: ckdb.COLUMN_METRICS_NAMES, Data: b.ColMetricsNames}, proto.InputColumn{Name: ckdb.COLUMN_METRICS_VALUES, Data: b.ColMetricsValues}, ) + if b.NativeTagsBlock != nil { + input = b.NativeTagsBlock.ToInput(input) + } + return input } func (n *ApplicationLogStore) NewColumnBlock() ckdb.CKColumnBlock { - return &LogBlock{ + block := &LogBlock{ ColAppService: new(proto.ColStr).LowCardinality(), ColAttributeNames: new(proto.ColStr).LowCardinality().Array(), ColAttributeValues: new(proto.ColStr).Array(), ColMetricsNames: new(proto.ColStr).LowCardinality().Array(), ColMetricsValues: new(proto.ColFloat64).Array(), + NativeTagsBlock: nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.APPLICATION_LOG), } + return block } func (n *ApplicationLogStore) AppendToColumnBlock(b ckdb.CKColumnBlock) { @@ -197,4 +208,7 @@ func (n *ApplicationLogStore) AppendToColumnBlock(b ckdb.CKColumnBlock) { block.ColAttributeValues.Append(n.AttributeValues) block.ColMetricsNames.Append(n.MetricsNames) block.ColMetricsValues.Append(n.MetricsValues) + if block.NativeTagsBlock != nil { + block.NativeTagsBlock.AppendToColumnBlock(n.AttributeNames, n.AttributeValues, n.MetricsNames, n.MetricsValues) + } } diff --git a/server/ingester/ckissu/ckissu.go b/server/ingester/ckissu/ckissu.go index ac2807bbe1c..24f2fc09a75 100644 --- a/server/ingester/ckissu/ckissu.go +++ b/server/ingester/ckissu/ckissu.go @@ -33,6 +33,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/datasource" "github.com/deepflowio/deepflow/server/libs/ckdb" flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" + "github.com/deepflowio/deepflow/server/libs/nativetag" ) var log = logging.MustGetLogger("issu") @@ -1338,6 +1339,7 @@ func (i *Issu) Start() error { var err error orgIDPrefixs := make([][]string, len(i.Connections)) + nativeTags := nativetag.GetAllNativeTags() // update default organization databases first for index, connect := range i.Connections { err = i.startOrg(index, "", connect) @@ -1345,12 +1347,21 @@ func (i *Issu) Start() error { log.Error(err) return err } + for idx, nativeTag := range nativeTags[ckdb.DEFAULT_ORG_ID] { + if nativeTag == nil { + continue + } + if e := nativetag.CKAddNativeTag(i.cfg.CKDB.Type == ckdb.CKDBTypeByconity, connect, ckdb.DEFAULT_ORG_ID, nativetag.NativeTagTable(idx), nativeTag); e != nil { + log.Error(err) + } + } orgIDPrefixs[index], err = i.getOrgIDPrefixsWithoutDefault(connect) if err != nil { return fmt.Errorf("get orgIDs failed, err: %s", err) } } + // update other organization databases var wg sync.WaitGroup for index, prefixes := range orgIDPrefixs { orgCount := len(prefixes) @@ -1389,7 +1400,17 @@ func (i *Issu) Start() error { log.Error(err) errCount++ } + orgId := parseOrgId(orgIDPrefix + "event") + for idx, nativeTag := range nativeTags[orgId] { + if nativeTag == nil { + continue + } + if e := nativetag.CKAddNativeTag(i.cfg.CKDB.Type == ckdb.CKDBTypeByconity, connect, orgId, nativetag.NativeTagTable(idx), nativeTag); e != nil { + log.Error(err) + } + } } + log.Infof("end ckissu %+v", orgPrefixs) }(prefixes[minIndex:maxIndex]) } diff --git a/server/ingester/event/dbwriter/alert_event_writer.go b/server/ingester/event/dbwriter/alert_event_writer.go index f8d8c5053c0..93d9a0e80be 100644 --- a/server/ingester/event/dbwriter/alert_event_writer.go +++ b/server/ingester/event/dbwriter/alert_event_writer.go @@ -103,6 +103,10 @@ func (e *AlertEventStore) Release() { ReleaseAlertEventStore(e) } +func (e *AlertEventStore) NativeTagVersion() uint32 { + return 0 +} + func (e *AlertEventStore) OrgID() uint16 { return e.OrgId } diff --git a/server/ingester/event/dbwriter/event.go b/server/ingester/event/dbwriter/event.go index 4cabaf244d3..12fae1de58a 100644 --- a/server/ingester/event/dbwriter/event.go +++ b/server/ingester/event/dbwriter/event.go @@ -30,6 +30,7 @@ import ( utag "github.com/deepflowio/deepflow/server/ingester/exporters/universal_tag" "github.com/deepflowio/deepflow/server/ingester/flow_tag" "github.com/deepflowio/deepflow/server/libs/ckdb" + "github.com/deepflowio/deepflow/server/libs/nativetag" "github.com/deepflowio/deepflow/server/libs/pool" "github.com/deepflowio/deepflow/server/libs/utils" ) @@ -106,6 +107,13 @@ type EventStore struct { Duration uint64 `json:"duration" category:"$metrics" sub:"delay"` } +func (e *EventStore) NativeTagVersion() uint32 { + if e.HasMetrics { + return nativetag.GetTableNativeTagsVersion(e.OrgId, nativetag.EVENT_PERF_EVENT) + } + return nativetag.GetTableNativeTagsVersion(e.OrgId, nativetag.EVENT_EVENT) +} + func (e *EventStore) OrgID() uint16 { return e.OrgId } diff --git a/server/ingester/event/dbwriter/event_column_block.go b/server/ingester/event/dbwriter/event_column_block.go index e52d4c80c2a..f34bf1706a2 100644 --- a/server/ingester/event/dbwriter/event_column_block.go +++ b/server/ingester/event/dbwriter/event_column_block.go @@ -21,6 +21,7 @@ import ( "github.com/ClickHouse/ch-go/proto" "github.com/deepflowio/deepflow/server/libs/ckdb" + "github.com/deepflowio/deepflow/server/libs/nativetag" ) type EventBlock struct { @@ -63,6 +64,8 @@ type EventBlock struct { HasMetrics bool ColBytes proto.ColUInt32 ColDuration proto.ColUInt64 + + *nativetag.NativeTagsBlock } func (b *EventBlock) Reset() { @@ -103,6 +106,9 @@ func (b *EventBlock) Reset() { b.ColAttributeValues.Reset() b.ColBytes.Reset() b.ColDuration.Reset() + if b.NativeTagsBlock != nil { + b.NativeTagsBlock.Reset() + } } func (b *EventBlock) ToInput(input proto.Input) proto.Input { @@ -149,16 +155,25 @@ func (b *EventBlock) ToInput(input proto.Input) proto.Input { proto.InputColumn{Name: ckdb.COLUMN_DURATION, Data: &b.ColDuration}, ) } + if b.NativeTagsBlock != nil { + input = b.NativeTagsBlock.ToInput(input) + } return input } func (n *EventStore) NewColumnBlock() ckdb.CKColumnBlock { - return &EventBlock{ + b := &EventBlock{ HasMetrics: n.HasMetrics, ColEventType: new(proto.ColStr).LowCardinality(), ColAttributeNames: new(proto.ColStr).LowCardinality().Array(), ColAttributeValues: new(proto.ColStr).Array(), } + if n.HasMetrics { + b.NativeTagsBlock = nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.EVENT_PERF_EVENT) + } else { + b.NativeTagsBlock = nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.EVENT_EVENT) + } + return b } func (n *EventStore) AppendToColumnBlock(b ckdb.CKColumnBlock) { @@ -200,4 +215,7 @@ func (n *EventStore) AppendToColumnBlock(b ckdb.CKColumnBlock) { block.ColAttributeValues.Append(n.AttributeValues) block.ColBytes.Append(n.Bytes) block.ColDuration.Append(n.Duration) + if block.NativeTagsBlock != nil { + block.NativeTagsBlock.AppendToColumnBlock(n.AttributeNames, n.AttributeValues, nil, nil) + } } diff --git a/server/ingester/ext_metrics/dbwriter/ext_metrics.go b/server/ingester/ext_metrics/dbwriter/ext_metrics.go index 77e316984bf..8ba846125f2 100644 --- a/server/ingester/ext_metrics/dbwriter/ext_metrics.go +++ b/server/ingester/ext_metrics/dbwriter/ext_metrics.go @@ -22,6 +22,7 @@ import ( "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/datatype" flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" + "github.com/deepflowio/deepflow/server/libs/nativetag" "github.com/deepflowio/deepflow/server/libs/pool" ) @@ -89,6 +90,21 @@ func (m *ExtMetrics) VirtualTableName() string { return m.VTableName } +func (m *ExtMetrics) NativeTagVersion() uint32 { + switch m.MsgType { + case datatype.MESSAGE_TYPE_DFSTATS: + return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.DEEPFLOW_TENANT) + case datatype.MESSAGE_TYPE_SERVER_DFSTATS: + if ckdb.IsValidOrgID(m.RawOrgId) { + return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.DEEPFLOW_TENANT) + } else { + return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.DEEPFLOW_ADMIN) + } + default: + return nativetag.GetTableNativeTagsVersion(m.OrgId, nativetag.EXT_METRICS) + } +} + func (m *ExtMetrics) OrgID() uint16 { return m.OrgId } diff --git a/server/ingester/ext_metrics/dbwriter/ext_metrics_column_block.go b/server/ingester/ext_metrics/dbwriter/ext_metrics_column_block.go index 97bed343731..055326e7392 100644 --- a/server/ingester/ext_metrics/dbwriter/ext_metrics_column_block.go +++ b/server/ingester/ext_metrics/dbwriter/ext_metrics_column_block.go @@ -21,6 +21,7 @@ import ( "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/datatype" flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" + "github.com/deepflowio/deepflow/server/libs/nativetag" ) type ExtMetricsBlock struct { @@ -33,6 +34,7 @@ type ExtMetricsBlock struct { ColTagValues *proto.ColArr[string] ColMetricsFloatNames *proto.ColArr[string] ColMetricsFloatValues *proto.ColArr[float64] + *nativetag.NativeTagsBlock } func (b *ExtMetricsBlock) Reset() { @@ -46,6 +48,9 @@ func (b *ExtMetricsBlock) Reset() { b.ColTagValues.Reset() b.ColMetricsFloatNames.Reset() b.ColMetricsFloatValues.Reset() + if b.NativeTagsBlock != nil { + b.NativeTagsBlock.Reset() + } } func (b *ExtMetricsBlock) ToInput(input proto.Input) proto.Input { @@ -61,11 +66,15 @@ func (b *ExtMetricsBlock) ToInput(input proto.Input) proto.Input { if b.MsgType != datatype.MESSAGE_TYPE_DFSTATS && b.MsgType != datatype.MESSAGE_TYPE_SERVER_DFSTATS { input = b.UniversalTagBlock.ToInput(input) } + + if b.NativeTagsBlock != nil { + input = b.NativeTagsBlock.ToInput(input) + } return input } func (n *ExtMetrics) NewColumnBlock() ckdb.CKColumnBlock { - return &ExtMetricsBlock{ + b := &ExtMetricsBlock{ MsgType: n.MsgType, UniversalTagBlock: n.UniversalTag.NewColumnBlock().(*flow_metrics.UniversalTagBlock), ColVirtualTableName: new(proto.ColStr).LowCardinality(), @@ -74,6 +83,15 @@ func (n *ExtMetrics) NewColumnBlock() ckdb.CKColumnBlock { ColMetricsFloatNames: new(proto.ColStr).LowCardinality().Array(), ColMetricsFloatValues: new(proto.ColFloat64).Array(), } + switch n.TableName() { + case DEEPFLOW_TENANT_COLLECTOR_TABLE: + b.NativeTagsBlock = nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.DEEPFLOW_TENANT) + case DEEPFLOW_ADMIN_SERVER_TABLE: + b.NativeTagsBlock = nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.DEEPFLOW_ADMIN) + case EXT_METRICS_TABLE: + b.NativeTagsBlock = nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.EXT_METRICS) + } + return b } func (n *ExtMetrics) AppendToColumnBlock(b ckdb.CKColumnBlock) { @@ -89,4 +107,7 @@ func (n *ExtMetrics) AppendToColumnBlock(b ckdb.CKColumnBlock) { block.ColTagValues.Append(n.TagValues) block.ColMetricsFloatNames.Append(n.MetricsFloatNames) block.ColMetricsFloatValues.Append(n.MetricsFloatValues) + if block.NativeTagsBlock != nil { + block.NativeTagsBlock.AppendToColumnBlock(n.TagNames, n.TagValues, n.MetricsFloatNames, n.MetricsFloatValues) + } } diff --git a/server/ingester/flow_log/log_data/l4_flow_log.go b/server/ingester/flow_log/log_data/l4_flow_log.go index 7ee23c18241..a569e19870d 100644 --- a/server/ingester/flow_log/log_data/l4_flow_log.go +++ b/server/ingester/flow_log/log_data/l4_flow_log.go @@ -849,6 +849,10 @@ func L4FlowLogColumns() []*ckdb.Column { return columns } +func (f *L4FlowLog) NativeTagVersion() uint32 { + return 0 +} + func (f *L4FlowLog) OrgID() uint16 { return f.KnowledgeGraph.OrgId } diff --git a/server/ingester/flow_log/log_data/l4_packet.go b/server/ingester/flow_log/log_data/l4_packet.go index 9db8471a7bf..830a9bf34d5 100644 --- a/server/ingester/flow_log/log_data/l4_packet.go +++ b/server/ingester/flow_log/log_data/l4_packet.go @@ -53,6 +53,10 @@ func L4PacketColumns() []*ckdb.Column { } } +func (s *L4Packet) NativeTagVersion() uint32 { + return 0 +} + func (s *L4Packet) OrgID() uint16 { return s.OrgId } diff --git a/server/ingester/flow_log/log_data/l7_column_block.go b/server/ingester/flow_log/log_data/l7_column_block.go index 4b1afddc9db..f1b11e39235 100644 --- a/server/ingester/flow_log/log_data/l7_column_block.go +++ b/server/ingester/flow_log/log_data/l7_column_block.go @@ -20,6 +20,7 @@ import ( "github.com/ClickHouse/ch-go/proto" "github.com/deepflowio/deepflow/server/libs/ckdb" + "github.com/deepflowio/deepflow/server/libs/nativetag" ) type L7BaseBlock struct { @@ -235,6 +236,7 @@ type L7FlowLogBlock struct { ColMetricsNames *proto.ColArr[string] ColMetricsValues *proto.ColArr[float64] ColEvents proto.ColStr + *nativetag.NativeTagsBlock } func (b *L7FlowLogBlock) Reset() { @@ -276,11 +278,14 @@ func (b *L7FlowLogBlock) Reset() { b.ColMetricsNames.Reset() b.ColMetricsValues.Reset() b.ColEvents.Reset() + if b.NativeTagsBlock != nil { + b.NativeTagsBlock.Reset() + } } func (b *L7FlowLogBlock) ToInput(input proto.Input) proto.Input { input = b.L7BaseBlock.ToInput(input) - return append(input, + input = append(input, proto.InputColumn{Name: ckdb.COLUMN__ID, Data: &b.ColId}, proto.InputColumn{Name: ckdb.COLUMN_L7_PROTOCOL, Data: &b.ColL7Protocol}, proto.InputColumn{Name: ckdb.COLUMN_L7_PROTOCOL_STR, Data: b.ColL7ProtocolStr}, @@ -319,6 +324,10 @@ func (b *L7FlowLogBlock) ToInput(input proto.Input) proto.Input { proto.InputColumn{Name: ckdb.COLUMN_METRICS_VALUES, Data: b.ColMetricsValues}, proto.InputColumn{Name: ckdb.COLUMN_EVENTS, Data: &b.ColEvents}, ) + if b.NativeTagsBlock != nil { + return b.NativeTagsBlock.ToInput(input) + } + return input } func (n *L7FlowLog) NewColumnBlock() ckdb.CKColumnBlock { @@ -339,6 +348,7 @@ func (n *L7FlowLog) NewColumnBlock() ckdb.CKColumnBlock { ColAttributeValues: new(proto.ColStr).Array(), ColMetricsNames: new(proto.ColStr).LowCardinality().Array(), ColMetricsValues: new(proto.ColFloat64).Array(), + NativeTagsBlock: nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.L7_FLOW_LOG), } } @@ -382,4 +392,7 @@ func (n *L7FlowLog) AppendToColumnBlock(b ckdb.CKColumnBlock) { block.ColMetricsNames.Append(n.MetricsNames) block.ColMetricsValues.Append(n.MetricsValues) block.ColEvents.Append(n.Events) + if block.NativeTagsBlock != nil { + block.NativeTagsBlock.AppendToColumnBlock(n.AttributeNames, n.AttributeValues, n.MetricsNames, n.MetricsValues) + } } diff --git a/server/ingester/flow_log/log_data/l7_flow_log.go b/server/ingester/flow_log/log_data/l7_flow_log.go index 6ecc4b87a40..962f7f0be6b 100644 --- a/server/ingester/flow_log/log_data/l7_flow_log.go +++ b/server/ingester/flow_log/log_data/l7_flow_log.go @@ -32,6 +32,7 @@ import ( "github.com/deepflowio/deepflow/server/libs/datatype/pb" flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" "github.com/deepflowio/deepflow/server/libs/grpc" + "github.com/deepflowio/deepflow/server/libs/nativetag" "github.com/deepflowio/deepflow/server/libs/pool" "github.com/deepflowio/deepflow/server/libs/utils" @@ -257,6 +258,10 @@ func L7FlowLogColumns() []*ckdb.Column { return l7Columns } +func (h *L7FlowLog) NativeTagVersion() uint32 { + return nativetag.GetTableNativeTagsVersion(h.KnowledgeGraph.OrgId, nativetag.L7_FLOW_LOG) +} + func (h *L7FlowLog) OrgID() uint16 { return h.KnowledgeGraph.OrgId } diff --git a/server/ingester/flow_tag/app_service_tag.go b/server/ingester/flow_tag/app_service_tag.go index 845034157e5..05aba00a19a 100644 --- a/server/ingester/flow_tag/app_service_tag.go +++ b/server/ingester/flow_tag/app_service_tag.go @@ -31,6 +31,10 @@ type AppServiceTag struct { OrgId uint16 } +func (t *AppServiceTag) NativeTagVersion() uint32 { + return 0 +} + func (t *AppServiceTag) OrgID() uint16 { return t.OrgId } diff --git a/server/ingester/flow_tag/flow_tag.go b/server/ingester/flow_tag/flow_tag.go index d77110d30e1..eff9e5bfcb0 100644 --- a/server/ingester/flow_tag/flow_tag.go +++ b/server/ingester/flow_tag/flow_tag.go @@ -117,6 +117,10 @@ type FlowTag struct { FlowTagInfo } +func (t *FlowTag) NativeTagVersion() uint32 { + return 0 +} + func (t *FlowTag) OrgID() uint16 { return t.OrgId } diff --git a/server/ingester/ingester/org_handler.go b/server/ingester/ingester/org_handler.go index 8688e48ad23..18b03e472e5 100644 --- a/server/ingester/ingester/org_handler.go +++ b/server/ingester/ingester/org_handler.go @@ -26,6 +26,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/prometheus/prometheus" "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/debug" + "github.com/deepflowio/deepflow/server/libs/nativetag" ) var CleanDatabaseList = []string{ @@ -93,3 +94,48 @@ func (o *OrgHandler) dropOrgCaches(orgId uint16) { } o.promHander.DropOrg(orgId) } + +func (o *OrgHandler) UpdateNativeTag(orgId uint16, table nativetag.NativeTagTable, nativeTag *nativetag.NativeTag) error { + if nativeTag == nil { + return nil + } + + conns, err := common.NewCKConnections(*o.cfg.CKDB.ActualAddrs, o.cfg.CKDBAuth.Username, o.cfg.CKDBAuth.Password) + if err != nil { + log.Error(err) + return err + } + defer conns.Close() + + for _, conn := range conns { + err := nativetag.CKAddNativeTag(o.cfg.CKDB.Type == ckdb.CKDBTypeByconity, conn, orgId, table, nativeTag) + if err != nil { + log.Error(err) + return err + } + } + + droppedNativeTag := nativetag.GetDroppedNativeTag(orgId, table, nativeTag) + // the drop operation is time-consuming and should be handled asynchronously + go o.dropNativeTag(orgId, table, nativeTag) + return nil +} + +func (o *OrgHandler) dropNativeTag(orgId uint16, table nativetag.NativeTagTable, nativeTag *nativetag.NativeTag) error { + if nativeTag == nil || len(nativeTag.ColumnNames) == 0 { + return nil + } + conns, err := common.NewCKConnections(*o.cfg.CKDB.ActualAddrs, o.cfg.CKDBAuth.Username, o.cfg.CKDBAuth.Password) + if err != nil { + log.Error(err) + return err + } + defer conns.Close() + for _, conn := range conns { + err := nativetag.CKDropNativeTag(o.cfg.CKDB.Type == ckdb.CKDBTypeByconity, conn, orgId, table, droppedNativeTag) + if err != nil { + log.Error(err) + } + } + return nil +} diff --git a/server/ingester/pcap/dbwriter/pcap.go b/server/ingester/pcap/dbwriter/pcap.go index 693969778d3..55b3c65d2c4 100644 --- a/server/ingester/pcap/dbwriter/pcap.go +++ b/server/ingester/pcap/dbwriter/pcap.go @@ -58,6 +58,10 @@ func PcapStoreColumns() []*ckdb.Column { } } +func (s *PcapStore) NativeTagVersion() uint32 { + return 0 +} + func (s *PcapStore) OrgID() uint16 { return s.OrgId } diff --git a/server/ingester/pkg/ckwriter/ckwriter.go b/server/ingester/pkg/ckwriter/ckwriter.go index 8bf7b2c03f0..34f8dbadb01 100644 --- a/server/ingester/pkg/ckwriter/ckwriter.go +++ b/server/ingester/pkg/ckwriter/ckwriter.go @@ -29,6 +29,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/config" "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/grpc" + "github.com/deepflowio/deepflow/server/libs/nativetag" "github.com/deepflowio/deepflow/server/libs/queue" "github.com/deepflowio/deepflow/server/libs/stats" "github.com/deepflowio/deepflow/server/libs/utils" @@ -75,6 +76,10 @@ func (m *CKWriterManager) DropOrg(orgId uint16) error { return nil } +func (m *CKWriterManager) UpdateNativeTag(orgId uint16, table nativetag.NativeTagTable, nativeTag *nativetag.NativeTag) error { + return nil +} + func (m *CKWriterManager) EndpointsChange(addrs []string) { ckwriterManager.Lock() for _, ckwriter := range m.ckwriters { @@ -212,6 +217,7 @@ func (qc *QueueContext) initConn(connIndex int) error { type CKItem interface { OrgID() uint16 Release() + NativeTagVersion() uint32 NewColumnBlock() ckdb.CKColumnBlock AppendToColumnBlock(ckdb.CKColumnBlock) } @@ -466,6 +472,7 @@ type Cache struct { orgID uint16 prepare string columnBlock ckdb.CKColumnBlock + ItemVersion uint32 protoInput proto.Input size int writeCounter int @@ -535,8 +542,11 @@ func (w *CKWriter) queueProcess(queueID int) { } func (c *Cache) Add(item CKItem) error { - if IsNil(c.columnBlock) { + if IsNil(c.columnBlock) || + (c.size == 0 && c.ItemVersion != item.NativeTagVersion()) { c.columnBlock = item.NewColumnBlock() + log.Info("orgId %d (%s) update item version from %d to %d", c.orgID, c.prepare, c.ItemVersion, item.NativeTagVersion()) + c.ItemVersion = item.NativeTagVersion() } item.AppendToColumnBlock(c.columnBlock) item.Release() diff --git a/server/ingester/profile/dbwriter/profile.go b/server/ingester/profile/dbwriter/profile.go index e55acabc8c1..a1dfc79e9dc 100644 --- a/server/ingester/profile/dbwriter/profile.go +++ b/server/ingester/profile/dbwriter/profile.go @@ -28,6 +28,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/flow_tag" "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/grpc" + "github.com/deepflowio/deepflow/server/libs/nativetag" "github.com/deepflowio/deepflow/server/libs/pool" "github.com/deepflowio/deepflow/server/libs/utils" ) @@ -205,6 +206,10 @@ func GenProfileCKTable(cluster, dbName, tableName, storagePolicy, ckdbType strin } } +func (p *InProcessProfile) NativeTagVersion() uint32 { + return nativetag.GetTableNativeTagsVersion(p.OrgId, nativetag.PROFILE) +} + func (p *InProcessProfile) OrgID() uint16 { return p.OrgId } diff --git a/server/ingester/profile/dbwriter/profile_column_block.go b/server/ingester/profile/dbwriter/profile_column_block.go index 067b6906e38..2ff0560e5e9 100644 --- a/server/ingester/profile/dbwriter/profile_column_block.go +++ b/server/ingester/profile/dbwriter/profile_column_block.go @@ -20,6 +20,7 @@ import ( "github.com/ClickHouse/ch-go/proto" "github.com/deepflowio/deepflow/server/libs/ckdb" + "github.com/deepflowio/deepflow/server/libs/nativetag" ) type ProfileBlock struct { @@ -65,6 +66,7 @@ type ProfileBlock struct { ColL3DeviceId proto.ColUInt32 ColServiceId proto.ColUInt32 ColTeamId proto.ColUInt16 + *nativetag.NativeTagsBlock } func (b *ProfileBlock) Reset() { @@ -110,10 +112,13 @@ func (b *ProfileBlock) Reset() { b.ColL3DeviceId.Reset() b.ColServiceId.Reset() b.ColTeamId.Reset() + if b.NativeTagsBlock != nil { + b.NativeTagsBlock.Reset() + } } func (b *ProfileBlock) ToInput(input proto.Input) proto.Input { - return append(input, + input = append(input, proto.InputColumn{Name: ckdb.COLUMN_TIME, Data: &b.ColTime}, proto.InputColumn{Name: ckdb.COLUMN__ID, Data: &b.ColId}, proto.InputColumn{Name: ckdb.COLUMN_IP4, Data: &b.ColIp4}, @@ -157,6 +162,10 @@ func (b *ProfileBlock) ToInput(input proto.Input) proto.Input { proto.InputColumn{Name: ckdb.COLUMN_SERVICE_ID, Data: &b.ColServiceId}, proto.InputColumn{Name: ckdb.COLUMN_TEAM_ID, Data: &b.ColTeamId}, ) + if b.NativeTagsBlock != nil { + input = b.NativeTagsBlock.ToInput(input) + } + return input } func (n *InProcessProfile) NewColumnBlock() ckdb.CKColumnBlock { @@ -169,6 +178,7 @@ func (n *InProcessProfile) NewColumnBlock() ckdb.CKColumnBlock { ColCompressionAlgo: new(proto.ColStr).LowCardinality(), ColTagNames: new(proto.ColStr).LowCardinality().Array(), ColTagValues: new(proto.ColStr).Array(), + NativeTagsBlock: nativetag.GetTableNativeTagsColumnBlock(n.OrgId, nativetag.PROFILE), } } @@ -216,4 +226,7 @@ func (n *InProcessProfile) AppendToColumnBlock(b ckdb.CKColumnBlock) { block.ColL3DeviceId.Append(n.L3DeviceID) block.ColServiceId.Append(n.ServiceID) block.ColTeamId.Append(n.TeamID) + if block.NativeTagsBlock != nil { + block.NativeTagsBlock.AppendToColumnBlock(n.TagNames, n.TagValues, nil, nil) + } } diff --git a/server/ingester/prometheus/dbwriter/prometheus_sample.go b/server/ingester/prometheus/dbwriter/prometheus_sample.go index db57d6782f5..abf1172fc45 100644 --- a/server/ingester/prometheus/dbwriter/prometheus_sample.go +++ b/server/ingester/prometheus/dbwriter/prometheus_sample.go @@ -47,6 +47,7 @@ type PrometheusSampleInterface interface { NewColumnBlock() ckdb.CKColumnBlock AppendToColumnBlock(ckdb.CKColumnBlock) + NativeTagVersion() uint32 } type PrometheusSample struct { @@ -89,6 +90,10 @@ func (m *PrometheusSampleMini) PodNsId() uint16 { return 0 } +func (m *PrometheusSampleMini) NativeTagVersion() uint32 { + return 0 +} + func (m *PrometheusSampleMini) OrgID() uint16 { return m.OrgId } @@ -226,6 +231,10 @@ func (m *PrometheusSample) TableName() string { return m.PrometheusSampleMini.DatabaseName() } +func (m *PrometheusSample) NativeTagVersion() uint32 { + return 0 +} + func (m *PrometheusSample) OrgID() uint16 { return m.OrgId } diff --git a/server/libs/app/document.go b/server/libs/app/document.go index db06507e16f..8485d4ad1b3 100644 --- a/server/libs/app/document.go +++ b/server/libs/app/document.go @@ -56,6 +56,7 @@ type Document interface { NewColumnBlock() ckdb.CKColumnBlock AppendToColumnBlock(ckdb.CKColumnBlock) + NativeTagVersion() uint32 } type DocumentBase struct { @@ -81,6 +82,10 @@ func (b *DocumentBase) TableID() (uint8, error) { return b.Tag.TableID((b.Flags & FLAG_PER_SECOND_METRICS) == 1) } +func (b *DocumentBase) NativeTagVersion() uint32 { + return 0 +} + func (b *DocumentBase) OrgID() uint16 { return b.Tag.OrgId } diff --git a/server/libs/nativetag/nativetag.go b/server/libs/nativetag/nativetag.go new file mode 100644 index 00000000000..c685c14f8c4 --- /dev/null +++ b/server/libs/nativetag/nativetag.go @@ -0,0 +1,339 @@ +/* + * Copyright (c) 2025 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nativetag + +import ( + "database/sql" + "fmt" + "strconv" + "strings" + + "github.com/ClickHouse/ch-go/proto" + + "github.com/deepflowio/deepflow/server/libs/ckdb" + "github.com/deepflowio/deepflow/server/libs/logger" +) + +var log = logger.MustGetLogger("nativetag") + +type NativeTagTable uint8 + +const ( + APPLICATION_LOG NativeTagTable = iota + EVENT_EVENT + EVENT_PERF_EVENT + L7_FLOW_LOG + DEEPFLOW_ADMIN + DEEPFLOW_TENANT + EXT_METRICS + PROFILE + + MAX_NATIVE_TAG_TABLE +) + +var NativeTagDatabaseNames = [MAX_NATIVE_TAG_TABLE]string{ + APPLICATION_LOG: "application_log", + EVENT_EVENT: "event", + EVENT_PERF_EVENT: "event", + L7_FLOW_LOG: "flow_log", + DEEPFLOW_ADMIN: "deepflow_admin", + DEEPFLOW_TENANT: "deepflow_tenant", + EXT_METRICS: "ext_metrics", + PROFILE: "profile", +} + +var NativeTagTableNames = [MAX_NATIVE_TAG_TABLE]string{ + APPLICATION_LOG: "log", + EVENT_EVENT: "event", + EVENT_PERF_EVENT: "perf_event", + L7_FLOW_LOG: "l7_flow_log", + DEEPFLOW_ADMIN: "deepflow_server", + DEEPFLOW_TENANT: "deepflow_collector", + EXT_METRICS: "metrics", + PROFILE: "in_process", +} + +func (table NativeTagTable) Database() string { + return NativeTagDatabaseNames[table] +} + +func (table NativeTagTable) Table() string { + return NativeTagTableNames[table] +} + +func (table NativeTagTable) LocalTable() string { + return table.Table() + "_local" +} + +type NativeTagType uint8 + +const ( + NATIVE_TAG_STRING NativeTagType = iota + NATIVE_TAG_INT64 + NATIVE_TAG_FLOAT64 +) + +func (t NativeTagType) String() string { + switch t { + case NATIVE_TAG_STRING: + return ckdb.String.String() + case NATIVE_TAG_INT64: + return ckdb.Int64.String() + case NATIVE_TAG_FLOAT64: + return ckdb.Float64.String() + } + return "unsupport native tag type" +} + +func (t NativeTagType) IndexString() string { + switch t { + case NATIVE_TAG_STRING: + return ckdb.IndexBloomfilter.String() + default: + return ckdb.IndexMinmax.String() + } +} + +var NativeTags [ckdb.MAX_ORG_ID + 1][MAX_NATIVE_TAG_TABLE]*NativeTag + +type NativeTag struct { + Version uint32 + AttributeNames []string + ColumnNames []string + ColumnTypes []NativeTagType +} + +func UpdateNativeTag(orgId uint16, table NativeTagTable, nativeTag *NativeTag) { + oldVersion := uint32(0) + oldNativeTag := NativeTags[orgId][table] + if oldNativeTag != nil { + oldVersion = oldNativeTag.Version + + } + newNativeTag := *nativeTag + newNativeTag.Version = oldVersion + 1 + + NativeTags[orgId][table] = &newNativeTag +} + +func CKAddNativeTag(isByConity bool, conn *sql.DB, orgId uint16, table NativeTagTable, nativeTag *NativeTag) error { + for i, columnName := range nativeTag.ColumnNames { + tableGlobal := fmt.Sprintf("ALTER TABLE %s.`%s` ADD COLUMN %s %s", + ckdb.OrgDatabasePrefix(orgId)+table.Database(), table.Table(), columnName, nativeTag.ColumnTypes[i]) + tableLocal := fmt.Sprintf("ALTER TABLE %s.`%s` ADD COLUMN %s %s", + ckdb.OrgDatabasePrefix(orgId)+table.Database(), table.LocalTable(), columnName, nativeTag.ColumnTypes[i]) + + indexGlobal := fmt.Sprintf("ALTER TABLE %s.`%s` ADD INDEX IF NOT EXISTS idx_%s %s TYPE %s GRANULARITY 2", + ckdb.OrgDatabasePrefix(orgId)+table.Database(), table.Table(), columnName, columnName, nativeTag.ColumnTypes[i].IndexString()) + indexLocal := fmt.Sprintf("ALTER TABLE %s.`%s` ADD INDEX IF NOT EXISTS idx_%s %s TYPE %s GRANULARITY 2", + ckdb.OrgDatabasePrefix(orgId)+table.Database(), table.LocalTable(), columnName, columnName, nativeTag.ColumnTypes[i].IndexString()) + + sqls := []string{tableGlobal} + if isByConity { + sqls = append(sqls, indexGlobal) + } else { + sqls = append(sqls, tableLocal, indexLocal) + } + + for _, sql := range sqls { + log.Infof("add native tag: %s", sql) + _, err := conn.Exec(sql) + if err != nil { + // if it has already been added, you need to skip the error + if strings.Contains(err.Error(), "column with this name already exists") { + log.Infof("db: %s, table: %s error: %s", table.Database(), table.Table(), err) + continue + } + return err + } + } + } + return nil +} + +func GetDroppedNativeTag(orgId uint16, table NativeTagTable, nativeTag *NativeTag) *NativeTag { + oldNaitveTag := NativeTags[orgId][table] + droppedNativeTag := &NativeTag{} + for _, columnName := range oldNaitveTag.ColumnNames { + if IndexOf(nativeTag.ColumnNames, columnName) == -1 { + droppedNativeTag.ColumnNames = append(droppedNativeTag.ColumnNames, columnName) + } + } + return droppedNativeTag +} + +func CKDropNativeTag(isByConity bool, conn *sql.DB, orgId uint16, table NativeTagTable, nativeTag *NativeTag) error { + if nativeTag == nil { + return nil + } + for _, columnName := range nativeTag.ColumnNames { + tableGlobal := fmt.Sprintf("ALTER TABLE %s.`%s` DROP COLUMN IF EXISTS %s", + ckdb.OrgDatabasePrefix(orgId)+table.Database(), table.Table(), columnName) + tableLocal := fmt.Sprintf("ALTER TABLE %s.`%s` DROP COLUMN IF EXISTS %s", + ckdb.OrgDatabasePrefix(orgId)+table.Database(), table.LocalTable(), columnName) + + indexGlobal := fmt.Sprintf("ALTER TABLE %s.`%s` DROP INDEX IF EXISTS idx_%s", + ckdb.OrgDatabasePrefix(orgId)+table.Database(), table.Table(), columnName) + indexLocal := fmt.Sprintf("ALTER TABLE %s.`%s` DROP INDEX IF EXISTS idx_%s", + ckdb.OrgDatabasePrefix(orgId)+table.Database(), table.LocalTable(), columnName) + + sqls := []string{} + if isByConity { + sqls = []string{indexGlobal, tableGlobal} + } else { + sqls = []string{indexLocal, tableGlobal, tableLocal} + } + + for _, sql := range sqls { + log.Infof("drop native tag: %s", sql) + _, err := conn.Exec(sql) + if err != nil { + log.Info(err) + } + } + } + return nil +} + +func GetAllNativeTags() [ckdb.MAX_ORG_ID + 1][MAX_NATIVE_TAG_TABLE]*NativeTag { + return NativeTags +} + +func GetNativeTags(orgId uint16, table NativeTagTable) *NativeTag { + return NativeTags[orgId][table] +} + +func GetTableNativeTagsVersion(orgId uint16, table NativeTagTable) uint32 { + nativeTag := NativeTags[orgId][table] + if nativeTag == nil { + return 0 + } + return nativeTag.Version +} + +func GetTableNativeTagsColumnBlock(orgId uint16, table NativeTagTable) *NativeTagsBlock { + nativeTag := NativeTags[orgId][table] + if nativeTag == nil { + return nil + } + return nativeTag.NewColumnBlock() +} + +type NativeTagsBlock struct { + TagNames, StringColumnNames []string + ColTags []proto.ColStr + + IntMetricsNames, IntColumnNames []string + ColIntMetrics []proto.ColInt64 + + FloatMetricsNames, FloatColumnNames []string + ColFloatMetrics []proto.ColFloat64 +} + +func (b *NativeTagsBlock) Reset() { + for i := range b.ColTags { + b.ColTags[i].Reset() + } + for i := range b.ColIntMetrics { + b.ColIntMetrics[i].Reset() + } + for i := range b.ColFloatMetrics { + b.ColFloatMetrics[i].Reset() + } +} + +func (b *NativeTagsBlock) ToInput(input proto.Input) proto.Input { + if len(b.TagNames) != len(b.ColTags) || + len(b.IntMetricsNames) != len(b.ColIntMetrics) || + len(b.FloatMetricsNames) != len(b.ColFloatMetrics) { + log.Warningf("invalid native block length: %d %d, %d %d, %d %d", + len(b.TagNames), len(b.ColTags), len(b.IntMetricsNames), len(b.ColIntMetrics), len(b.FloatMetricsNames), len(b.ColFloatMetrics)) + return input + } + for i := range b.ColTags { + input = append(input, proto.InputColumn{Name: b.StringColumnNames[i], Data: &b.ColTags[i]}) + } + for i := range b.ColIntMetrics { + input = append(input, proto.InputColumn{Name: b.IntColumnNames[i], Data: &b.ColIntMetrics[i]}) + } + for i := range b.ColFloatMetrics { + input = append(input, proto.InputColumn{Name: b.FloatColumnNames[i], Data: &b.ColFloatMetrics[i]}) + } + return input +} + +func IndexOf(slice []string, str string) int { + for i, v := range slice { + if v == str { + return i + } + } + return -1 +} + +func (b *NativeTagsBlock) AppendToColumnBlock(attributeNames, attributeValues, metricsNames []string, metricsValues []float64) { + for i, name := range b.TagNames { + if index := IndexOf(attributeNames, name); index >= 0 { + b.ColTags[i].Append(attributeValues[index]) + } else { + b.ColTags[i].Append("") + } + } + for i, name := range b.IntMetricsNames { + if index := IndexOf(attributeNames, name); index >= 0 { + valueInt64, _ := strconv.ParseInt(attributeValues[index], 10, 64) + b.ColIntMetrics[i].Append(valueInt64) + } else if index := IndexOf(metricsNames, name); index >= 0 { + valueInt64 := int64(metricsValues[index]) + b.ColIntMetrics[i].Append(valueInt64) + } else { + b.ColIntMetrics[i].Append(0) + } + } + + for i, name := range b.FloatMetricsNames { + if index := IndexOf(attributeNames, name); index >= 0 { + valueFloat64, _ := strconv.ParseFloat(attributeValues[index], 64) + b.ColFloatMetrics[i].Append(valueFloat64) + } else if index := IndexOf(metricsNames, name); index >= 0 { + b.ColFloatMetrics[i].Append(metricsValues[index]) + } else { + b.ColFloatMetrics[i].Append(0) + } + } +} + +func (t *NativeTag) NewColumnBlock() *NativeTagsBlock { + block := &NativeTagsBlock{} + for i, name := range t.AttributeNames { + switch t.ColumnTypes[i] { + case NATIVE_TAG_STRING: + block.TagNames = append(block.TagNames, name) + block.StringColumnNames = append(block.StringColumnNames, t.ColumnNames[i]) + block.ColTags = append(block.ColTags, proto.ColStr{}) + case NATIVE_TAG_INT64: + block.IntMetricsNames = append(block.IntMetricsNames, name) + block.IntColumnNames = append(block.IntColumnNames, t.ColumnNames[i]) + block.ColIntMetrics = append(block.ColIntMetrics, proto.ColInt64{}) + case NATIVE_TAG_FLOAT64: + block.FloatMetricsNames = append(block.FloatMetricsNames, name) + block.FloatColumnNames = append(block.FloatColumnNames, t.ColumnNames[i]) + block.ColFloatMetrics = append(block.ColFloatMetrics, proto.ColFloat64{}) + } + } + return block +}