Skip to content

Commit

Permalink
feat(core): add log aggregator and UI integration (#653)
Browse files Browse the repository at this point in the history
## What type of PR is this?

/kind feature

## What this PR does / why we need it:

This PR introduces a new feature that adds a pod logs aggregator and
integrates it with the UI. This enhancement allows users to easily view
pod logs from a centralized location, improving the overall
observability and troubleshooting experience.

Key Features:
- Server-Sent Events (SSE) for real-time log streaming
- Multi-container support with container switching
- Pause/Resume stream control
- Auto-scroll to latest logs
- Connection status monitoring

https://github.com/user-attachments/assets/fd3bae63-b07e-43f2-a3f2-4f715c110550

## Which issue(s) this PR fixes:

Fixes #636
  • Loading branch information
elliotxx authored Nov 27, 2024
1 parent 2b45db9 commit 95c6585
Show file tree
Hide file tree
Showing 12 changed files with 463 additions and 13 deletions.
150 changes: 150 additions & 0 deletions pkg/core/handler/aggregator/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright The Karpor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aggregator

import (
"bufio"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/KusionStack/karpor/pkg/core/manager/cluster"
"github.com/KusionStack/karpor/pkg/infra/multicluster"
"github.com/KusionStack/karpor/pkg/util/ctxutil"
"github.com/go-chi/chi/v5"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/server"
"k8s.io/utils/pointer"
)

// LogEntry represents a single log entry with timestamp and content
type LogEntry struct {
Timestamp string `json:"timestamp"`
Content string `json:"content"`
Error string `json:"error,omitempty"`
}

// GetPodLogs returns an HTTP handler function that streams Pod logs using Server-Sent Events
//
// @Summary Stream pod logs using Server-Sent Events
// @Description This endpoint streams pod logs in real-time using SSE. It supports container selection and automatic reconnection.
// @Tags insight
// @Produce text/event-stream
// @Param cluster path string true "The cluster name"
// @Param namespace path string true "The namespace name"
// @Param name path string true "The pod name"
// @Param container query string false "The container name (optional if pod has only one container)"
// @Success 200 {object} LogEntry
// @Failure 400 {string} string "Bad Request"
// @Failure 401 {string} string "Unauthorized"
// @Failure 404 {string} string "Not Found"
// @Router /insight/aggregator/pod/{cluster}/{namespace}/{name}/log [get]
func GetPodLogs(clusterMgr *cluster.ClusterManager, c *server.CompletedConfig) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")

// Extract the context and logger from the request
ctx := r.Context()
logger := ctxutil.GetLogger(ctx)

// Get parameters from URL path and query
cluster := chi.URLParam(r, "cluster")
namespace := chi.URLParam(r, "namespace")
name := chi.URLParam(r, "name")
container := r.URL.Query().Get("container")

if cluster == "" || namespace == "" || name == "" {
writeSSEError(w, "cluster, namespace and name are required")
return
}

// Build multi-cluster client
client, err := multicluster.BuildMultiClusterClient(ctx, c.LoopbackClientConfig, cluster)
if err != nil {
writeSSEError(w, fmt.Sprintf("failed to build multi-cluster client: %v", err))
return
}
// Get single cluster clientset
clusterClient := client.ClientSet

logger.Info("Getting pod logs...", "cluster", cluster, "namespace", namespace, "pod", name, "container", container)

// Configure log streaming options
opts := &corev1.PodLogOptions{
Container: container,
Follow: true,
TailLines: pointer.Int64(1000),
}

// Get log stream from the pod
req := clusterClient.CoreV1().Pods(namespace).GetLogs(name, opts)
stream, err := req.Stream(ctx)
if err != nil {
writeSSEError(w, fmt.Sprintf("failed to get pod logs: %v", err))
return
}
defer stream.Close()

// Create a done channel to handle client disconnection
done := r.Context().Done()
go func() {
<-done
stream.Close()
}()

// Read and send logs
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
select {
case <-done:
return
default:
logEntry := LogEntry{
Timestamp: time.Now().Format(time.RFC3339Nano),
Content: scanner.Text(),
}

data, err := json.Marshal(logEntry)
if err != nil {
writeSSEError(w, fmt.Sprintf("failed to marshal log entry: %v", err))
return
}

fmt.Fprintf(w, "data: %s\n\n", data)
w.(http.Flusher).Flush()
}
}

if err := scanner.Err(); err != nil {
writeSSEError(w, fmt.Sprintf("error reading log stream: %v", err))
}
}
}

// writeSSEError writes an error message to the SSE stream
func writeSSEError(w http.ResponseWriter, errMsg string) {
logEntry := LogEntry{
Timestamp: time.Now().Format(time.RFC3339Nano),
Error: errMsg,
}
data, _ := json.Marshal(logEntry)
fmt.Fprintf(w, "data: %s\n\n", data)
w.(http.Flusher).Flush()
}
2 changes: 2 additions & 0 deletions pkg/core/route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"expvar"

docs "github.com/KusionStack/karpor/api/openapispec"
aggregatorhandler "github.com/KusionStack/karpor/pkg/core/handler/aggregator"
authnhandler "github.com/KusionStack/karpor/pkg/core/handler/authn"
clusterhandler "github.com/KusionStack/karpor/pkg/core/handler/cluster"
detailhandler "github.com/KusionStack/karpor/pkg/core/handler/detail"
Expand Down Expand Up @@ -170,6 +171,7 @@ func setupRestAPIV1(
r.Get("/summary", summaryhandler.GetSummary(insightMgr, genericConfig))
r.Get("/events", eventshandler.GetEvents(insightMgr, genericConfig))
r.Get("/detail", detailhandler.GetDetail(clusterMgr, insightMgr, genericConfig))
r.Get("/aggregator/pod/{cluster}/{namespace}/{name}/log", aggregatorhandler.GetPodLogs(clusterMgr, genericConfig))
})

r.Route("/resource-group-rule", func(r chi.Router) {
Expand Down
1 change: 1 addition & 0 deletions ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
"@babel/plugin-transform-private-property-in-object": "^7.23.4",
"@craco/craco": "^7.1.0",
"@craco/types": "^7.1.0",
"@types/js-yaml": "^4.0.9",
"@types/react-grid-layout": "^1.3.2",
"compression-webpack-plugin": "^11.0.0",
"craco-css-modules": "^1.0.5",
Expand Down
12 changes: 6 additions & 6 deletions ui/src/components/layout/style.module.less
Original file line number Diff line number Diff line change
Expand Up @@ -159,32 +159,32 @@
}

.github_corner:hover .octo_arm {
animation: octocat-wave 560ms ease-in-out
animation: octocat-wave 560ms ease-in-out;
}

@keyframes octocat-wave {
0%,
100% {
transform: rotate(0)
transform: rotate(0);
}

20%,
60% {
transform: rotate(-25deg)
transform: rotate(-25deg);
}

40%,
80% {
transform: rotate(10deg)
transform: rotate(10deg);
}
}

@media (width <=500px) {
.github_corner .octo_arm {
animation: octocat-wave 560ms ease-in-out
animation: octocat-wave 560ms ease-in-out;
}

.github_corner:hover .octo_arm {
animation: none
animation: none;
}
}
10 changes: 9 additions & 1 deletion ui/src/locales/de.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"CheckAllIssues": "Alle Fehler prüfen",
"ResourceTopology": "Ressourcen Topologie",
"KubernetesEvents": "Kubernetes Ereignisse",
"LogAggregator": "Log-Aggregator",
"Name": "Name",
"Times": "Mal",
"FilterByName": "Nach Name filtern",
Expand Down Expand Up @@ -122,5 +123,12 @@
"InputToken": "Geben Sie bitte den Token ein",
"SearchByNaturalLanguage": "Suche mit natürlicher Sprache",
"CannotBeEmpty": "Darf nicht leer sein",
"DefaultTag": "Standard-Tag"
"DefaultTag": "Standard-Tag",
"ResumeLogs": "Logs fortsetzen",
"PauseLogs": "Logs pausieren",
"ClearLogs": "Logs löschen",
"Connected": "Verbunden",
"Disconnected": "Getrennt",
"FailedToParsePodDetails": "Pod-Details konnten nicht analysiert werden",
"SelectContainer": "Container auswählen"
}
10 changes: 9 additions & 1 deletion ui/src/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"CheckAllIssues": "Check All Issues",
"ResourceTopology": "Resource Topology",
"KubernetesEvents": "Kubernetes Events",
"LogAggregator": "Log",
"Name": "Name",
"Times": "Times",
"FilterByName": "Filter by Name",
Expand Down Expand Up @@ -122,5 +123,12 @@
"InputToken": "Please Enter the Token",
"SearchByNaturalLanguage": "Search By Natural Language",
"CannotBeEmpty": "Cannot be empty",
"DefaultTag": "default tag"
"DefaultTag": "default tag",
"SelectContainer": "Select container",
"ResumeLogs": "Resume logs",
"PauseLogs": "Pause logs",
"ClearLogs": "Clear logs",
"Connected": "Connected",
"Disconnected": "Disconnected",
"FailedToParsePodDetails": "Failed to parse pod details"
}
10 changes: 9 additions & 1 deletion ui/src/locales/pt.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"CheckAllIssues": "Verificar Todos os Problemas",
"ResourceTopology": "Topologia de Recursos",
"KubernetesEvents": "Eventos do Kubernetes",
"LogAggregator": "Agregador de Logs",
"Name": "Nome",
"Times": "Vezes",
"FilterByName": "Filtrado por nome",
Expand Down Expand Up @@ -122,5 +123,12 @@
"InputToken": "Por favor, insira o token",
"SearchByNaturalLanguage": "Procure por linguagem natural",
"CannotBeEmpty": "Não pode estar vazio",
"DefaultTag": "Tag padrão"
"DefaultTag": "Tag padrão",
"ResumeLogs": "Retomar logs",
"PauseLogs": "Pausar logs",
"ClearLogs": "Limpar logs",
"Connected": "Conectado",
"Disconnected": "Desconectado",
"FailedToParsePodDetails": "Falha ao analisar detalhes do Pod",
"SelectContainer": "Selecionar container"
}
12 changes: 10 additions & 2 deletions ui/src/locales/zh.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"CheckAllIssues": "查看全部风险",
"ResourceTopology": "资源拓扑",
"KubernetesEvents": "Kubernetes 事件",
"LogAggregator": "日志",
"Name": "名称",
"Times": "",
"FilterByName": "请输入名称",
Expand Down Expand Up @@ -119,8 +120,15 @@
"LoginSuccess": "登录成功",
"Login": "登录",
"LogoutSuccess": "登出成功",
"InputToken": "请输入 token",
"InputToken": "请输入 Token",
"SearchByNaturalLanguage": "自然语言搜索",
"CannotBeEmpty": "不能为空",
"DefaultTag": "默认标签"
"DefaultTag": "默认标签",
"ResumeLogs": "继续日志",
"PauseLogs": "暂停日志",
"ClearLogs": "清除日志",
"Connected": "已连接",
"Disconnected": "已断开",
"FailedToParsePodDetails": "解析 Pod 详情失败",
"SelectContainer": "选择容器"
}
2 changes: 1 addition & 1 deletion ui/src/pages/cluster/add/styles.module.less
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
align-items: center;

.page_title {
margin: 0
margin: 0;
}
}

Expand Down
Loading

0 comments on commit 95c6585

Please sign in to comment.