Skip to content

Commit

Permalink
feat: add api receive tlp audit log
Browse files Browse the repository at this point in the history
  • Loading branch information
lmquang committed Dec 26, 2024
1 parent 9eee21b commit 80f60b9
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 51 deletions.
65 changes: 36 additions & 29 deletions cmd/teleport-discord-bot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/dwarvesf/teleport-discord-bot/internal/config"
"github.com/dwarvesf/teleport-discord-bot/internal/discord"
Expand All @@ -24,37 +25,55 @@ func main() {

repo.ConnectDatabase()

// Create HTTP server
httpServer := httpserver.NewServer(cfg.Port)
defer func() {
if shutdownErr := httpServer.Shutdown(context.Background()); shutdownErr != nil {
fmt.Fprintf(os.Stderr, "Error shutting down HTTP server: %v\n", shutdownErr)
}
}()
// Create a context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create Discord client
discordClient := discord.NewClient(cfg)

// Create HTTP server with Discord client
httpServer := httpserver.NewServer(cfg.Port, discordClient)

// Create Teleport plugin
plugin, err := teleport.NewPlugin(cfg, discordClient)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create Teleport plugin: %v\n", err)
os.Exit(1)
}
defer plugin.Close()

// Create a context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// Graceful shutdown function
gracefulShutdown := func() {
fmt.Println("Initiating graceful shutdown...")

// Cancel context to stop ongoing operations
cancel()

// Shutdown HTTP server
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()

if err := httpServer.Shutdown(shutdownCtx); err != nil {
fmt.Fprintf(os.Stderr, "Error shutting down HTTP server: %v\n", err)
}

// Close Teleport plugin
plugin.Close()

fmt.Println("Teleport Discord bot shutdown complete")
os.Exit(0)
}

// Run the plugin in a separate goroutine
errChan := make(chan error, 1)
go func() {
errChan <- plugin.Run(ctx)
if err := plugin.Run(ctx); err != nil {
fmt.Fprintf(os.Stderr, "Plugin error: %v\n", err)
gracefulShutdown()
}
}()

// Start the HTTP server
Expand All @@ -63,19 +82,7 @@ func main() {
os.Exit(1)
}

// Wait for either a signal or an error
select {
case sig := <-sigChan:
fmt.Printf("Received signal %v, shutting down...\n", sig)
cancel()
case err := <-errChan:
if err != nil {
fmt.Fprintf(os.Stderr, "Plugin error: %v\n", err)
os.Exit(1)
}
}

// Wait for the plugin to finish
<-errChan
fmt.Println("Teleport Discord bot shutdown complete")
// Wait for interrupt signal
<-sigChan
gracefulShutdown()
}
1 change: 0 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func Load() (*Config, error) {
cfg := &Config{
ProxyAddr: os.Getenv("PROXY_ADDR"),
DiscordWebhookURL: os.Getenv("DISCORD_WEBHOOK_URL"),
WatcherList: os.Getenv("WATCHER_LIST"),
AuthPem: os.Getenv("AUTH_PEM"),
Port: "8080",
}
Expand Down
16 changes: 8 additions & 8 deletions internal/discord/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"strings"
"time"

"github.com/dwarvesf/teleport-discord-bot/internal/config"
repo "github.com/dwarvesf/teleport-discord-bot/internal/repository"

"github.com/gravitational/teleport/api/types"
"github.com/gtuk/discordwebhook"

"github.com/dwarvesf/teleport-discord-bot/internal/config"
repo "github.com/dwarvesf/teleport-discord-bot/internal/repository"
)

// Client handles Discord webhook notifications
Expand All @@ -26,8 +26,8 @@ func NewClient(cfg *config.Config) *Client {
}
}

// sendWebhookNotification sends a message to the configured Discord webhook
func (d *Client) sendWebhookNotification(message discordwebhook.Message) error {
// SendWebhookNotification sends a message to the configured Discord webhook
func (d *Client) SendWebhookNotification(message discordwebhook.Message) error {
return discordwebhook.SendMessage(d.url, message)
}

Expand Down Expand Up @@ -105,7 +105,7 @@ func (d *Client) HandleNewAccessRequest(r types.AccessRequest) error {
Content: ptrString(approverIDs),
}

if err := d.sendWebhookNotification(message); err != nil {
if err := d.SendWebhookNotification(message); err != nil {
return fmt.Errorf("failed to send new access request notification: %w", err)
}

Expand Down Expand Up @@ -148,7 +148,7 @@ func (d *Client) HandleApproveAccessRequest(r types.AccessRequest) error {
Embeds: &[]discordwebhook.Embed{embed},
}

if err := d.sendWebhookNotification(message); err != nil {
if err := d.SendWebhookNotification(message); err != nil {
return fmt.Errorf("failed to send access request approval notification: %w", err)
}

Expand Down Expand Up @@ -195,7 +195,7 @@ func (d *Client) HandleDenyAccessRequest(r types.AccessRequest) error {
Embeds: &[]discordwebhook.Embed{embed},
}

if err := d.sendWebhookNotification(message); err != nil {
if err := d.SendWebhookNotification(message); err != nil {
return fmt.Errorf("failed to send access request denial notification: %w", err)
}

Expand Down
171 changes: 158 additions & 13 deletions internal/httpserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,27 @@ import (
"fmt"
"net/http"
"sync"
"time"

"github.com/gtuk/discordwebhook"

"github.com/dwarvesf/teleport-discord-bot/internal/discord"
)

// Server represents an HTTP server for health checks and other utilities
type Server struct {
server *http.Server
port string
mu sync.Mutex
server *http.Server
port string
mu sync.Mutex
discord *discord.Client
}

// NewServer creates a new HTTP server with a healthz endpoint
func NewServer(port string) *Server {
func NewServer(port string, discordClient *discord.Client) *Server {
mux := http.NewServeMux()
s := &Server{
port: port,
port: port,
discord: discordClient,
server: &http.Server{
Addr: fmt.Sprintf(":%v", port),
Handler: mux,
Expand All @@ -31,6 +38,7 @@ func NewServer(port string) *Server {

// Add event log endpoint
mux.HandleFunc("/events.log", s.eventLogHandler)
mux.HandleFunc("/session.log", s.sessionLogHandler)

return s
}
Expand All @@ -43,9 +51,26 @@ func (s *Server) healthzHandler(w http.ResponseWriter, r *http.Request) {

// EventLogBody represents the structure of a Fluentd-like event log payload
type EventLogBody struct {
Tag string `json:"tag"`
Time float64 `json:"time"`
Record map[string]interface{} `json:"record"`
AccessRequests []string `json:"access_requests"`
ClusterName string `json:"cluster_name"`
Code string `json:"code"`
DbName string `json:"db_name"`
DbOrigin string `json:"db_origin"`
DbProtocol string `json:"db_protocol"`
DbQuery string `json:"db_query"`
DbService string `json:"db_service"`
DbType string `json:"db_type"`
DbURI string `json:"db_uri"`
DbUser string `json:"db_user"`
Ei int `json:"ei"`
Query string `json:"query"`
Event string `json:"event"`
PrivateKeyPolicy string `json:"private_key_policy"`
Sid string `json:"sid"`
Success bool `json:"success"`
UID string `json:"uid"`
User string `json:"user"`
UserKind int `json:"user_kind"`
}

func (s *Server) eventLogHandler(w http.ResponseWriter, r *http.Request) {
Expand All @@ -65,18 +90,138 @@ func (s *Server) eventLogHandler(w http.ResponseWriter, r *http.Request) {

// Print out the event log details in a structured format
fmt.Printf("Fluentd Event Log Received:\n")
fmt.Printf("Tag: %s\n", body.Tag)
fmt.Printf("Timestamp: %f\n", body.Time)
fmt.Println("Record:")
for key, value := range body.Record {
fmt.Printf(" %s: %v\n", key, value)
fmt.Printf("User: %v, DbQuery: %v\n", body.User, body.DbQuery)

query := ""
switch body.Event {
case "db.session.query":
if body.DbQuery == "" {
// Respond with success
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
return
}
query = body.DbQuery
case "db.session.postgres.statements.parse":
if body.Query == "" {
// Respond with success
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
return
}
query = body.Query
default:
// Respond with success
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
return
}

// Prepare Discord embed message
if s.discord != nil {
// Determine embed color based on success
color := "15158332" // Red for failure
if body.Success {
color = "3066993" // Green for success
}

// Prepare fields for the embed
fields := []discordwebhook.Field{
{
Name: ptrString("User"),
Value: ptrString(body.User),
Inline: ptrBool(true),
},
}

// Add additional fields based on event type
if body.DbName != "" {
fields = append(fields,
discordwebhook.Field{
Name: ptrString("Database User"),
Value: ptrString(body.DbUser),
Inline: ptrBool(true),
},
discordwebhook.Field{
Name: ptrString("Database"),
Value: ptrString(body.DbName),
Inline: ptrBool(true),
},
discordwebhook.Field{
Name: ptrString("Query"),
Value: ptrString(query),
Inline: ptrBool(false),
},
discordwebhook.Field{
Name: ptrString("Success"),
Value: ptrString(fmt.Sprintf("%v", body.Success)),
Inline: ptrBool(true),
},
discordwebhook.Field{
Name: ptrString("Time"),
Value: ptrString(time.Now().Format(time.RFC3339)),
Inline: ptrBool(true),
},
)
}

// Create embed
embed := discordwebhook.Embed{
Title: ptrString("Teleport Event Log"),
Description: ptrString(fmt.Sprintf("Event details for %s", body.Event)),
Color: ptrString(color),
Fields: &fields,
}

// Prepare message
message := discordwebhook.Message{
Embeds: &[]discordwebhook.Embed{embed},
}

// Send webhook
if err := s.discord.SendWebhookNotification(message); err != nil {
fmt.Printf("Failed to send Discord webhook: %v\n", err)
}
}

// Respond with success
w.WriteHeader(http.StatusOK)
w.Write([]byte("Event log received successfully"))
}

func (s *Server) sessionLogHandler(w http.ResponseWriter, r *http.Request) {
// Ensure only POST method is accepted
// if r.Method != http.MethodPost {
// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
// return
// }

// Decode JSON body
var body map[string]interface{}
err := json.NewDecoder(r.Body).Decode(&body)
if err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}

// Print out the event log details in a structured format
fmt.Printf("Fluentd Session Log Received:\n")
fmt.Printf("Session: %v\n", body)

// Respond with success
w.WriteHeader(http.StatusOK)
w.Write([]byte("Session log received successfully"))
}

// Helper functions for Discord webhook
func ptrString(s string) *string {
return &s
}

func ptrBool(b bool) *bool {
return &b
}

// Start starts the HTTP server in a separate goroutine
func (s *Server) Start() error {
s.mu.Lock()
Expand Down

0 comments on commit 80f60b9

Please sign in to comment.