diff --git a/core/types.go b/core/types.go index d5dc07a9..94b96998 100755 --- a/core/types.go +++ b/core/types.go @@ -20,6 +20,35 @@ import ( kube_api "k8s.io/api/core/v1" ) + + + +//DistinctSameResourceEvent : Distinct Events base on involvedObject.Name & event.Reason +func (eventBatch *EventBatch) DistinctSameResourceEvent() { + tempMap := make(map[string]bool) + var finalEvents []*kube_api.Event + for _, event := range eventBatch.Events { + involvedObject := event.InvolvedObject + if &involvedObject == nil { + continue + } + resourceName := involvedObject.Name + reason := event.Reason + msg:=event.Message + key := resourceName + reason + msg + if _, contain := tempMap[key]; !contain { + // fmt.Printf("key: %s \n", key) + tempMap[key] = true + finalEvents = append(finalEvents, event) + } + } + + if len(finalEvents) > 0 { + eventBatch.Events = finalEvents + } +} + + type EventBatch struct { // When this batch was created. Timestamp time.Time diff --git a/manager/manager.go b/manager/manager.go index 14ea9e7b..9ca87619 100755 --- a/manager/manager.go +++ b/manager/manager.go @@ -89,6 +89,8 @@ func (rm *realManager) Housekeep() { } } + + func (rm *realManager) housekeep() { defer func() { lastHousekeepTimestamp.Set(float64(time.Now().Unix())) @@ -100,5 +102,6 @@ func (rm *realManager) housekeep() { // when this stops to be true. events := rm.source.GetNewEvents() klog.V(0).Infof("Exporting %d events", len(events.Events)) + rm.sink.ExportEvents(events) } diff --git a/sinks/dingtalk/dingtalk.go b/sinks/dingtalk/dingtalk.go index 5d92adad..4e169a02 100755 --- a/sinks/dingtalk/dingtalk.go +++ b/sinks/dingtalk/dingtalk.go @@ -17,6 +17,7 @@ package dingtalk import ( "bytes" "encoding/json" + "flag" "fmt" "net/http" "net/url" @@ -52,6 +53,15 @@ var ( } ) +var ArgDDbufferWindows time.Duration + +func init() { + //dingding buffer windows + flag.DurationVar(&ArgDDbufferWindows, "bufferwindows", 0, "if you wanna aggregate the event's message what type is"+ + " Waring in a given time windows to dingding sink, just set bufferwindows >0 ,but Sugget you set bufferwindows > 300"+ + "(defult 0s means do not aggregate message) ") +} + /** dingtalk msg struct */ @@ -98,13 +108,19 @@ func (d *DingTalkSink) Stop() { } func (d *DingTalkSink) ExportEvents(batch *core.EventBatch) { - for _, event := range batch.Events { - if d.isEventLevelDangerous(event.Type) { - d.Ding(event) - // add threshold - time.Sleep(time.Millisecond * 50) + if ArgDDbufferWindows == 0 { + for _, event := range batch.Events { + if d.isEventLevelDangerous(event.Type) { + d.Ding(event) + // add threshold + time.Sleep(time.Millisecond * 50) + } } + } else { + klog.V(2).Info("ArgDDbufferWindows value is ", ArgDDbufferWindows, "!=0 , then Trun on dingdingtalk buffer windows.") + d.ExportBufferEvents(batch) } + } func (d *DingTalkSink) isEventLevelDangerous(level string) bool { diff --git a/sinks/dingtalk/dingtalkbuffer.go b/sinks/dingtalk/dingtalkbuffer.go new file mode 100755 index 00000000..72fb072d --- /dev/null +++ b/sinks/dingtalk/dingtalkbuffer.go @@ -0,0 +1,124 @@ +package dingtalk + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/AliyunContainerService/kube-eventer/core" + kube_api "k8s.io/api/core/v1" + "k8s.io/klog" + "net/http" + "sync" + "time" +) + +type BufferEventBatch map[string][]*kube_api.Event + +func (d *DingTalkSink) ExportBufferEvents(batch *core.EventBatch) { + + var wg sync.WaitGroup + var bufferEventBatch = BufferEventBatch{} + defer func() { + bufferEventBatch = BufferEventBatch{} + }() + // dump level is error event into buffer + wg.Add(1) + go func() { + defer wg.Done() + for _, event := range batch.Events { + // only handler Warning Buffer + if event.Type == "Warning" { + bufferEventBatch[event.InvolvedObject.Name] = append(bufferEventBatch[event.InvolvedObject.Name], event) + } + } + }() + + //buffer windows + klog.V(2).Info("dingding buffer windows is ", ArgDDbufferWindows) + time.Sleep(ArgDDbufferWindows) + klog.V(2).Info("NewEventBatch len:", len(bufferEventBatch)) + + for _, bufferEvent := range bufferEventBatch { + d.DingBuffer(bufferEvent) + // add threshold + time.Sleep(time.Millisecond * 50) + } + + wg.Wait() +} + +func (d *DingTalkSink) DingBuffer(bufferevent []*kube_api.Event) { + + msg := NewcreateMsgFromEvent(d, bufferevent) + + if msg == nil { + klog.Warningf("failed to create msg from event,because of %v", bufferevent) + return + } + + msg_bytes, err := json.Marshal(msg) + if err != nil { + klog.Warningf("failed to marshal msg %v", msg) + return + } + + b := bytes.NewBuffer(msg_bytes) + + resp, err := http.Post(fmt.Sprintf("https://%s?access_token=%s", d.Endpoint, d.Token), CONTENT_TYPE_JSON, b) + if err != nil { + klog.Errorf("failed to send msg to dingtalk. error: %s", err.Error()) + return + } + + defer resp.Body.Close() + if resp != nil && resp.StatusCode != http.StatusOK { + klog.Errorf("failed to send msg to dingtalk, because the response code is %d", resp.StatusCode) + return + } +} + +func NewcreateMsgFromEvent(d *DingTalkSink, bufferevent []*kube_api.Event) *DingTalkMsg { + msg := &DingTalkMsg{} + msg.MsgType = d.MsgType + + m := "" + m2 := "" + i := 0 + for _, event := range bufferevent { + i = i + 1 + m = m + fmt.Sprintf("msg%d : ", i) + event.Message + "\n" + " " + m2 = m2 + "### " + fmt.Sprintf("msg%d : ", i) + event.Message + "\n" + " " + } + msgs := fmt.Sprintf("[%s]", m) + msgs_markdown := fmt.Sprintf("[\n%s]", m2) + + switch msg.MsgType { + //https://open-doc.dingtalk.com/microapp/serverapi2/ye8tup#-6 + case MARKDOWN_MSG_TYPE: + markdownCreator := NewMarkdownMsgBuilder(d.ClusterID, d.Region, bufferevent[0], msgs_markdown) + markdownCreator.AddNodeName(bufferevent[0].Source.Host) + markdownCreator.AddLabels(d.Labels) + msg.Markdown = DingTalkMarkdown{ + //title 加不加其实没所谓,最终不会显示 + Title: fmt.Sprintf("Kubernetes(ID:%s) Event", d.ClusterID), + Text: markdownCreator.Build(), + } + break + + default: + //默认按文本模式推送 + template := MSG_TEMPLATE + if len(d.Labels) > 0 { + for _, label := range d.Labels { + template = fmt.Sprintf(LABE_TEMPLATE, label) + template + } + } + + event := bufferevent[0] + msg.Text = DingTalkText{ + Content: fmt.Sprintf(template, event.Type, event.InvolvedObject.Kind, event.Namespace, event.InvolvedObject.Name, event.Reason, event.LastTimestamp.Format(TIME_FORMAT), msgs), + } + } + + return msg +} diff --git a/sinks/dingtalk/markdownMsgBuilder.go b/sinks/dingtalk/markdownMsgBuilder.go index ab7f3792..cc73194a 100644 --- a/sinks/dingtalk/markdownMsgBuilder.go +++ b/sinks/dingtalk/markdownMsgBuilder.go @@ -3,7 +3,6 @@ package dingtalk import ( "fmt" "strings" - v1 "k8s.io/api/core/v1" ) @@ -13,7 +12,6 @@ const ( MARKDOWN_LINK_TEMPLATE = "[%s](%s)" MARKDOWN_TEXT_BOLD = "**%s**" MARKDOWN_NEW_LINE = "\n\n" - URL_ALIYUN_K8S_CONSULE = "https://cs.console.aliyun.com/#/k8s" //阿里云 kubernetes 管理控制台, Deployment,StatefulSet,DaemonSet 有同样的URL规律 URL_ALIYUN_RESOURCE_DETAIL_TEMPLATE = URL_ALIYUN_K8S_CONSULE + "/%s/detail/%s/%s/%s/%s/pods" @@ -31,7 +29,7 @@ type MarkdownMsgBuilder struct { OutputText string } -func NewMarkdownMsgBuilder(clusterID, region string, event *v1.Event) *MarkdownMsgBuilder { +func NewMarkdownMsgBuilder(clusterID, region string, event *v1.Event,msgs ...string) *MarkdownMsgBuilder { m := MarkdownMsgBuilder{ Region: region, @@ -43,6 +41,7 @@ func NewMarkdownMsgBuilder(clusterID, region string, event *v1.Event) *MarkdownM namespace := fmt.Sprintf(MARKDOWN_LINK_TEMPLATE, event.Namespace, URL_ALIYUN_NAMESPACE_TEMPLATE) name := "" + switch event.InvolvedObject.Kind { case "Deployment": deployName := removeDotContent(event.Name) @@ -81,8 +80,13 @@ func NewMarkdownMsgBuilder(clusterID, region string, event *v1.Event) *MarkdownM } reason := fmt.Sprintf(MARKDOWN_TEXT_BOLD, event.Reason) timestamp := fmt.Sprintf(MARKDOWN_TEXT_BOLD, event.LastTimestamp.String()) - message := fmt.Sprintf(MARKDOWN_TEXT_BOLD, event.Message) - m.OutputText = fmt.Sprintf(MARKDOWN_TEMPLATE, level, kind, namespace, name, reason, timestamp, message) + + if len(msgs)==0{ + message := fmt.Sprintf(MARKDOWN_TEXT_BOLD, event.Message) + m.OutputText = fmt.Sprintf(MARKDOWN_TEMPLATE, level, kind, namespace, name, reason, timestamp, message) + }else{ + m.OutputText = fmt.Sprintf(MARKDOWN_TEMPLATE, level, kind, namespace, name, reason, timestamp, msgs[0]) + } return &m } diff --git a/sinks/manager.go b/sinks/manager.go index 3b916b52..3303dde4 100755 --- a/sinks/manager.go +++ b/sinks/manager.go @@ -135,6 +135,7 @@ func (this *sinkManager) Stop() { } } +// export to Eventsink ,for example dingding func export(s core.EventSink, data *core.EventBatch) { startTime := time.Now() defer func() { @@ -142,5 +143,9 @@ func export(s core.EventSink, data *core.EventBatch) { WithLabelValues(s.Name()). Observe(float64(time.Since(startTime)) / float64(time.Millisecond)) }() + + data.DistinctSameResourceEvent() s.ExportEvents(data) } + +