Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PB-8293 : Implement a new log writer which flushes the logs in every 30 seconds (for real time logs) #397

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ staticcheck:
errcheck:
docker run --rm -v $(shell pwd):/go/src/github.com/portworx/kdmp $(DOCK_BUILD_CNT) \
/bin/bash -c "cd /go/src/github.com/portworx/kdmp; \
GO111MODULE=off go get -u github.com/kisielk/errcheck; \
go install github.com/kisielk/errcheck@v1.7.0; \
git config --global --add safe.directory /go/src/github.com/portworx/kdmp; \
errcheck -ignoregenerated -ignorepkg fmt -verbose -blank $(PKGS); \
errcheck -ignoregenerated -ignorepkg fmt -verbose -blank -tags unittest $(PKGS)"
Expand Down
44 changes: 42 additions & 2 deletions pkg/kopia/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"os/exec"
"sync"
"time"

cmdexec "github.com/portworx/kdmp/pkg/executor"
"github.com/sirupsen/logrus"
)

const (
commandExecLogInterval = 30 * time.Second
)

// BackupSummaryResponse describes single snapshot entry.
type BackupSummaryResponse struct {
ID string `json:"id"`
Expand Down Expand Up @@ -74,6 +81,34 @@ type backupExecutor struct {
lastError error
}

type logWriter struct {
logger *log.Logger
lastLogTime time.Time
interval time.Duration
mu sync.Mutex
}

func newLogWriter(l *log.Logger, interval time.Duration) *logWriter {
return &logWriter{
logger: l,
interval: interval,
}
}

func (lw *logWriter) Write(p []byte) (n int, err error) {
lw.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this mutex. What is the scenario when write will be called from two parallel path.
Isn't multi writer already takes care of writing to two file descriptor in parallel with synchronization on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though we create two separate objects by calling newLogWriter for stderr and stdout, there is a chance that write can be called by multiple goroutines at the same time if the snapshot create command generates output quickly. It's unlikely to happen, but I have added a mutex for extra integrity of the output data

defer lw.mu.Unlock()

now := time.Now()
// log if the interval has passed since the last log
// this is to avoid bloating the logs
if now.Sub(lw.lastLogTime) >= lw.interval {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the behavior if backup gets completed with 30 sec. Nothing will be written ? Say some error in the command line happens and command is done. I am confused about the lastLogTime's value when for the first time the write call is invoked ? Any default value need to be assigned? or is it because it will be zero and So now - 0 already takes care of it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First write will not wait for 30secs, it will be written as when it comes.
To explain on the lastLogTime value for the first time will be 0 time format.
if block basically does (time.Now() - lastLogTime ) > 30 sec, Which will succeed .

lw.logger.Println(string(p), time.Now())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test case showcasing failure of kopia command if it can happen within 30 sec and log appears as expected then we are good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer the ticket, have attached for the same

lw.lastLogTime = now
}
return len(p), nil
}

// GetBackupCommand returns a wrapper over the kopia backup command
func GetBackupCommand(path, repoName, password, provider, sourcePath string) (*Command, error) {
if repoName == "" {
Expand Down Expand Up @@ -103,8 +138,13 @@ func NewBackupExecutor(cmd *Command) Executor {

func (b *backupExecutor) Run() error {
b.execCmd = b.cmd.BackupCmd()
b.execCmd.Stdout = b.outBuf
b.execCmd.Stderr = b.errBuf

// Create multi-writers to stream output to both buffer and CLI
stdoutWriter := io.MultiWriter(b.outBuf, newLogWriter(log.New(os.Stdout, "", 0), commandExecLogInterval))
stderrWriter := io.MultiWriter(b.errBuf, newLogWriter(log.New(os.Stderr, "", 0), commandExecLogInterval))
Copy link
Contributor

@lalat-das lalat-das Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not doubting the golang primitives here but it would be good if we can check outBuf errBuf are working as expected by checking summary response also populated as expected. A unit test around that would be great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not doubting , but still doubting :) . Have attached the logs for the same in the ticket. Thanks


b.execCmd.Stdout = stdoutWriter
b.execCmd.Stderr = stderrWriter

if err := b.execCmd.Start(); err != nil {
b.lastError = err
Expand Down
11 changes: 9 additions & 2 deletions pkg/kopia/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kopia
import (
"bytes"
"fmt"
"io"
"log"
"os"
"os/exec"
"strconv"
Expand Down Expand Up @@ -98,8 +100,13 @@ func NewRestoreExecutor(cmd *Command) Executor {
func (b *restoreExecutor) Run() error {

b.execCmd = b.cmd.RestoreCmd()
b.execCmd.Stdout = b.outBuf
b.execCmd.Stderr = b.errBuf

// Create multi-writers to stream output to both buffer and CLI
stdoutWriter := io.MultiWriter(b.outBuf, newLogWriter(log.New(os.Stdout, "", 0), commandExecLogInterval))
stderrWriter := io.MultiWriter(b.errBuf, newLogWriter(log.New(os.Stderr, "", 0), commandExecLogInterval))

b.execCmd.Stdout = stdoutWriter
b.execCmd.Stderr = stderrWriter

if err := b.execCmd.Start(); err != nil {
b.lastError = err
Expand Down
Loading