Skip to content

Commit

Permalink
fix: add logging to tinybird
Browse files Browse the repository at this point in the history
  • Loading branch information
pierre818181 committed Jan 28, 2025
1 parent b2f8040 commit 23f3aee
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 3 deletions.
190 changes: 188 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"strings"
"sync"
"time"

Expand All @@ -17,7 +19,9 @@ import (
)

var mutex = &sync.Mutex{}
var gqlMutex = &sync.Mutex{}
var Version = "dev"
var testNumberChannel = make(chan int)

type Test struct {
ID *int `json:"id,omitempty"`
Expand Down Expand Up @@ -80,6 +84,8 @@ func JSON(c *gin.Context, code int, obj interface{}) {
}

func init() {
os.Setenv("TINYBIRD_TOKEN", "p.eyJ1IjogImZhYzExMWQ5LWNiOWUtNDEyMi1hNDA0LTU4ODY3NzM4ZjU1YSIsICJpZCI6ICI5ZGY4MWU4YS02YWNhLTRmYmItYmNhOS01NzVjMmE3ODZlMDIiLCAiaG9zdCI6ICJ1c19lYXN0In0._vhxP9aotN5nWJbjdu4SHu33iFZcKmNPjZHIW9nWLrg")

// Initialize logger
var err error
if os.Getenv("ENV") == "local" {
Expand All @@ -105,7 +111,7 @@ func init() {
testConfig[i] = test
testConfig[i].ID = &i
if test.Timeout == nil {
threeHundred := 300
threeHundred := 300 * 1000
testConfig[i].Timeout = &threeHundred
}
}
Expand All @@ -122,6 +128,9 @@ func (h *Handler) Health(c *gin.Context) {
}

func sendResultsToGraphQL(status string, errorReason *string) {
gqlMutex.Lock()
defer gqlMutex.Unlock()

runpodPodId := os.Getenv("RUNPOD_POD_ID")
jwtToken := os.Getenv("RUNPOD_JWT_TOKEN")
runpodTestId := os.Getenv("RUNPOD_TEST_ID")
Expand Down Expand Up @@ -175,7 +184,7 @@ func sendResultsToGraphQL(status string, errorReason *string) {
}

func cancelJob(timeout int, jobIndex int) {
time.Sleep(time.Duration(timeout) * time.Second)
time.Sleep(time.Duration(timeout) * time.Millisecond)

mutex.Lock()
defer mutex.Unlock()
Expand Down Expand Up @@ -213,6 +222,7 @@ func (h *Handler) JobTake(c *gin.Context) {

go cancelJob(*nextTestPayload.Timeout, currentTestPtr)

testNumberChannel <- currentTestPtr
JSON(c, 200, gin.H{
"delayTime": 0,
"error": "",
Expand Down Expand Up @@ -306,15 +316,191 @@ func LoggerMiddleware(logger *zap.Logger) gin.HandlerFunc {
}
}

func sendLogsToTinyBird(logBuffer chan string) {
// Start goroutine to collect and send logs
buffer := make([]map[string]interface{}, 0)
tinybirdToken := os.Getenv("TINYBIRD_TOKEN")
runpodPodId := os.Getenv("RUNPOD_POD_ID")

testNumber := 7081

go func() {
for num := range testNumberChannel {
testNumber = num
}
}()

for logMsg := range logBuffer {
level := "info"
if strings.HasPrefix(logMsg, "#ERROR:") {
level = "error"
logMsg = strings.TrimPrefix(logMsg, "#ERROR:")
}
// Create log entry
logEntry := map[string]interface{}{
"testId": os.Getenv("RUNPOD_TEST_ID"),
"level": level,
"podId": runpodPodId,
"testNumber": testNumber,
"message": logMsg,
"timestamp": time.Now().UTC().Format(time.RFC1123Z),
}

buffer = append(buffer, logEntry)

// Send logs when buffer is full or channel is closed
if len(buffer) >= 16 {
url := "https://api.us-east.tinybird.co/v0/events?wait=true&name=github_build_logs"

var records []string
for _, entry := range buffer {
jsonBytes, err := json.Marshal(entry)
if err == nil {
records = append(records, string(jsonBytes))
}
}
payload := strings.Join(records, "\n")

go func(payload string) {
// Create and send request
req, err := http.NewRequest("POST", url, strings.NewReader(payload))
if err == nil {
req.Header.Set("Authorization", "Bearer "+tinybirdToken)
req.Header.Set("Content-Type", "text/plain")

client := &http.Client{Timeout: 2 * time.Second}
resp, err := client.Do(req)
if err != nil {
log.Error("Failed to send logs to tinybird", zap.Error(err))
} else if resp.StatusCode > 200 {
body, _ := io.ReadAll(resp.Body)
log.Error("Tinybird request failed",
zap.Int("status", resp.StatusCode),
zap.String("response", string(body)))
resp.Body.Close()
}
}

buffer = make([]map[string]interface{}, 0)
}(payload)
}
}

// Send any remaining logs in buffer
if len(buffer) > 0 {
url := "https://api.us-east.tinybird.co/v0/events?wait=true&name=github_build_logs"

var records []string
for _, entry := range buffer {
jsonBytes, err := json.Marshal(entry)
if err == nil {
records = append(records, string(jsonBytes))
}
}
payload := strings.Join(records, "\n")

req, err := http.NewRequest("POST", url, strings.NewReader(payload))
if err == nil {
req.Header.Set("Authorization", "Bearer "+tinybirdToken)
req.Header.Set("Content-Type", "text/plain")

client := &http.Client{Timeout: 2 * time.Second}
resp, err := client.Do(req)
if err != nil {
log.Error("Failed to send final logs to tinybird", zap.Error(err))
} else if resp.StatusCode > 200 {
body, _ := io.ReadAll(resp.Body)
log.Error("Final tinybird request failed",
zap.Int("status", resp.StatusCode),
zap.String("response", string(body)))
resp.Body.Close()
}
}
}
}

func runCommand(command string) {
// Create a buffered channel for logs
logBuffer := make(chan string, 16)
logBuffer <- fmt.Sprintf("Running command: %s", command)

log.Info("Running command", zap.String("command", command))
cmd := exec.Command("sh", "-c", command)
err := cmd.Start()
if err != nil {
logBuffer <- fmt.Sprintf("Failed to start command: %s", err.Error())
errorMsg := fmt.Sprintf("Failed to start command: %s", err.Error())
sendResultsToGraphQL("FAILED", &errorMsg)
log.Fatal("Failed to start command", zap.Error(err))
}

// Create pipes for stdout and stderr
stdout, err := cmd.StdoutPipe()
if err != nil {
logBuffer <- fmt.Sprintf("Failed to create stdout pipe: %s", err.Error())
log.Error("Failed to create stdout pipe", zap.Error(err))
return
}
stderr, err := cmd.StderrPipe()
if err != nil {
logBuffer <- fmt.Sprintf("Failed to create stderr pipe: %s", err.Error())
log.Error("Failed to create stderr pipe", zap.Error(err))
return
}

go sendLogsToTinyBird(logBuffer)

// Start goroutines to continuously read from pipes
go func() {
buf := make([]byte, 1024)
for {
n, err := stdout.Read(buf)
if n > 0 {
log.Info("Command stdout", zap.ByteString("output", buf[:n]))

// Add log to buffer channel
select {
case logBuffer <- string(buf[:n]):
// Log added to buffer
default:
// Channel full, log discarded
log.Warn("Log buffer full, discarding log")
}

}
if err != nil {
logBuffer <- fmt.Sprintf("Failed to read stdout: %s", err.Error())
break
}
}
}()

go func() {
buf := make([]byte, 1024)
for {
n, err := stderr.Read(buf)
if n > 0 {
log.Info("Command stderr", zap.String("output", string(buf[:n])))

// Add log to buffer channel
select {
case logBuffer <- fmt.Sprintf("#ERROR: %s", string(buf[:n])):
// Log added to buffer
default:
// Channel full, log discarded
log.Warn("Log buffer full, discarding log")
}

}
if err != nil {
break
}
}
}()

cmd.Wait()

close(logBuffer)
}

func RunServer() {
Expand Down
3 changes: 2 additions & 1 deletion runpod.tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"input": {
"text": "Hello world",
"language": "en"
}
},
"timeout": 10000
},
{
"name": "validation_text_input",
Expand Down

0 comments on commit 23f3aee

Please sign in to comment.