From 8bc1dfea0064c95eb7dcf5b854c4d344f2dfe573 Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Sat, 11 Jan 2025 21:26:27 +0530 Subject: [PATCH 1/6] feat: extracts watcher and execution logic to a library function --- cmd/main.go | 107 +++++++---------------------------- pkg/executor/sse-executor.go | 21 ++++++- pkg/watcher/lib.go | 67 ++++++++++++++++++++++ pkg/watcher/watcher.go | 35 ++++++------ 4 files changed, 127 insertions(+), 103 deletions(-) create mode 100644 pkg/watcher/lib.go diff --git a/cmd/main.go b/cmd/main.go index 09383dc..9456796 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,19 +2,14 @@ package main import ( "context" - "fmt" - "log/slog" "os" "os/exec" "os/signal" - "path/filepath" "strings" - "sync" "syscall" "time" "github.com/nxtcoder17/fwatcher/pkg/executor" - fn "github.com/nxtcoder17/fwatcher/pkg/functions" "github.com/nxtcoder17/fwatcher/pkg/logging" "github.com/nxtcoder17/fwatcher/pkg/watcher" "github.com/urfave/cli/v3" @@ -25,16 +20,6 @@ var ( Version string ) -// DefaultIgnoreList is list of directories that are mostly ignored -var DefaultIgnoreList = []string{ - ".git", ".svn", ".hg", // version control - ".idea", ".vscode", // IDEs - ".direnv", // direnv nix - "node_modules", // node - ".DS_Store", // macOS - ".log", // logs -} - func main() { cmd := &cli.Command{ Name: ProgramName, @@ -77,7 +62,7 @@ func main() { &cli.StringSliceFlag{ Name: "ignore-list", Usage: "disables ignoring from default ignore list", - Value: DefaultIgnoreList, + Value: watcher.DefaultIgnoreList, Aliases: []string{"I"}, }, @@ -161,81 +146,33 @@ func main() { panic(err) } - var ex executor.Executor + var executors []executor.Executor - switch { - case c.Bool("sse"): - { - sseAddr := c.String("sse-addr") - ex = executor.NewSSEExecutor(executor.SSEExecutorArgs{Addr: sseAddr}) - logger.Info("HELLo world") - } - default: - { - execCmd := c.Args().First() - execArgs := c.Args().Tail() - ex = executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ - Logger: logger, - Interactive: c.Bool("interactive"), - Command: func(context.Context) *exec.Cmd { - cmd := exec.CommandContext(ctx, execCmd, execArgs...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Stdin = os.Stdin - return cmd - }, - // IsInteractive: true, - }) - } + if c.Bool("sse") { + sseAddr := c.String("sse-addr") + executors = append(executors, executor.NewSSEExecutor(executor.SSEExecutorArgs{Addr: sseAddr})) } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if err := ex.Start(); err != nil { - slog.Error("got", "err", err) - } - logger.Debug("1. start-job finished") - }() - - counter := 0 - pwd := fn.Must(os.Getwd()) - - wg.Add(1) - go func() { - defer wg.Done() - w.Watch(ctx) - logger.Debug("2. watch context closed") - }() - - wg.Add(1) - go func() { - defer wg.Done() - <-ctx.Done() - ex.Stop() - logger.Debug("3. killed signal processed") - }() - - for event := range w.GetEvents() { - logger.Debug("received", "event", event) - relPath, err := filepath.Rel(pwd, event.Name) - if err != nil { - return err - } - counter += 1 - logger.Info(fmt.Sprintf("[RELOADING (%d)] due changes in %s", counter, relPath)) - - ex.OnWatchEvent(executor.Event{Source: event.Name}) + if c.NArg() > 0 { + execCmd := c.Args().First() + execArgs := c.Args().Tail() + executors = append(executors, executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Interactive: c.Bool("interactive"), + Command: func(context.Context) *exec.Cmd { + cmd := exec.CommandContext(ctx, execCmd, execArgs...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Stdin = os.Stdin + return cmd + }, + })) } - // logger.Debug("stopping executor") - // if err := ex.Stop(); err != nil { - // return err - // } - // logger.Info("stopped executor") + if err := w.WatchAndExecute(ctx, executors); err != nil { + return err + } - wg.Wait() return nil }, } diff --git a/pkg/executor/sse-executor.go b/pkg/executor/sse-executor.go index e9d8507..8ff4481 100644 --- a/pkg/executor/sse-executor.go +++ b/pkg/executor/sse-executor.go @@ -2,6 +2,7 @@ package executor import ( "encoding/json" + "errors" "fmt" "log/slog" "net/http" @@ -16,6 +17,8 @@ any client can connect to this event at /event type SSEExectuor struct { ch chan Event server *http.Server + + logger *slog.Logger } // OnWatchEvent implements Executor. @@ -31,7 +34,14 @@ func (s *SSEExectuor) OnWatchEvent(event Event) error { // Start implements Executor. func (s *SSEExectuor) Start() error { - return s.server.ListenAndServe() + s.logger.Info("Server Side Event notifier server started", "addr", s.server.Addr) + if err := s.server.ListenAndServe(); err != nil { + if errors.Is(err, http.ErrServerClosed) { + return nil + } + return err + } + return nil } // Stop implements Executor. @@ -43,6 +53,8 @@ var _ Executor = (*SSEExectuor)(nil) type SSEExecutorArgs struct { Addr string + + Logger *slog.Logger } func NewSSEExecutor(args SSEExecutorArgs) *SSEExectuor { @@ -68,10 +80,15 @@ func NewSSEExecutor(args SSEExecutorArgs) *SSEExectuor { } }) + logger := args.Logger + if logger == nil { + logger = slog.Default() + } + server := http.Server{ Addr: args.Addr, Handler: mux, } - return &SSEExectuor{ch: ch, server: &server} + return &SSEExectuor{ch: ch, server: &server, logger: logger} } diff --git a/pkg/watcher/lib.go b/pkg/watcher/lib.go new file mode 100644 index 0000000..d201a27 --- /dev/null +++ b/pkg/watcher/lib.go @@ -0,0 +1,67 @@ +package watcher + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/nxtcoder17/fwatcher/pkg/executor" +) + +func (f *Watcher) WatchAndExecute(ctx context.Context, executors []executor.Executor) error { + var wg sync.WaitGroup + + for _i := range executors { + i := _i + ex := executors[i] + + wg.Add(1) + go func() { + defer wg.Done() + if err := ex.Start(); err != nil { + f.Logger.Error("got", "err", err) + } + f.Logger.Debug("1. executor start finished", "executor", i) + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + ex.Stop() + f.Logger.Debug("2. passed context is DONE", "executor", i) + }() + } + + wg.Add(1) + go func() { + defer wg.Done() + f.Watch(ctx) + }() + + pwd, err := os.Getwd() + if err != nil { + return nil + } + + counter := 0 + for event := range f.GetEvents() { + f.Logger.Debug("received", "event", event) + relPath, err := filepath.Rel(pwd, event.Name) + if err != nil { + return err + } + counter += 1 + f.Logger.Info(fmt.Sprintf("[RELOADING (%d)] due changes in %s", counter, relPath)) + + for i := range executors { + executors[i].OnWatchEvent(executor.Event{Source: event.Name}) + } + } + + wg.Wait() + + return nil +} diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 7ddce22..a271868 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -12,14 +12,7 @@ import ( "github.com/fsnotify/fsnotify" ) -type Watcher interface { - Close() error - RecursiveAdd(dir ...string) error - Watch(ctx context.Context) - GetEvents() chan Event -} - -type fsnWatcher struct { +type Watcher struct { watcher *fsnotify.Watcher directoryCount int @@ -36,7 +29,7 @@ type fsnWatcher struct { } // GetEvents implements Watcher. -func (f *fsnWatcher) GetEvents() chan Event { +func (f *Watcher) GetEvents() chan Event { return f.eventsCh } @@ -50,7 +43,7 @@ var ( Chmod = fsnotify.Chmod ) -func (f fsnWatcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason string) { +func (f Watcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason string) { // INFO: any file change emits a chain of events, but // we can always expect a Write event out of that event chain if event.Op != fsnotify.Write { @@ -103,7 +96,7 @@ func (f fsnWatcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason strin return true, "event ignored as suffix is not present in only-watch-suffixes" } -func (f *fsnWatcher) Watch(ctx context.Context) { +func (f *Watcher) Watch(ctx context.Context) { lastProcessingTime := time.Now() for { @@ -161,7 +154,7 @@ func (f *fsnWatcher) Watch(ctx context.Context) { } } -func (f *fsnWatcher) RecursiveAdd(dirs ...string) error { +func (f *Watcher) RecursiveAdd(dirs ...string) error { for _, dir := range dirs { if _, ok := f.watchingDirs[dir]; ok { continue @@ -206,7 +199,7 @@ func (f *fsnWatcher) RecursiveAdd(dirs ...string) error { return nil } -func (f *fsnWatcher) addToWatchList(dir string) error { +func (f *Watcher) addToWatchList(dir string) error { if err := f.watcher.Add(dir); err != nil { f.Logger.Error("failed to add directory", "dir", dir, "err", err) return err @@ -216,7 +209,7 @@ func (f *fsnWatcher) addToWatchList(dir string) error { return nil } -func (f *fsnWatcher) Close() error { +func (f *Watcher) Close() error { return f.watcher.Close() } @@ -234,7 +227,17 @@ type WatcherArgs struct { Interactive bool } -func NewWatcher(ctx context.Context, args WatcherArgs) (Watcher, error) { +// DefaultIgnoreList is list of directories that are mostly ignored +var DefaultIgnoreList = []string{ + ".git", ".svn", ".hg", // version control + ".idea", ".vscode", // IDEs + ".direnv", // direnv nix + "node_modules", // node + ".DS_Store", // macOS + ".log", // logs +} + +func NewWatcher(ctx context.Context, args WatcherArgs) (*Watcher, error) { if args.Logger == nil { args.Logger = slog.Default() } @@ -264,7 +267,7 @@ func NewWatcher(ctx context.Context, args WatcherArgs) (Watcher, error) { args.WatchDirs = append(args.WatchDirs, dir) } - fsw := &fsnWatcher{ + fsw := &Watcher{ watcher: watcher, Logger: args.Logger, ExcludeDirs: excludeDirs, From 9c6a666870460317d5fe6b69a8684a5ce0345c92 Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Sun, 12 Jan 2025 07:36:08 +0530 Subject: [PATCH 2/6] feat: multiple commands in cmd executor --- Taskfile.yml | 2 +- cmd/main.go | 20 +++------ pkg/executor/cmd-executor.go | 83 +++++++++++++++++++----------------- pkg/watcher/lib.go | 13 +----- 4 files changed, 51 insertions(+), 67 deletions(-) diff --git a/Taskfile.yml b/Taskfile.yml index 3f73889..fc25563 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -32,7 +32,7 @@ tasks: build:dev: cmds: - - go build -o ./bin/fwatcher-dev ./cmd + - go build -o ./bin/fwatcher ./cmd example:http-server: cmds: diff --git a/cmd/main.go b/cmd/main.go index 9456796..2f39a9d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -24,7 +24,7 @@ func main() { cmd := &cli.Command{ Name: ProgramName, UseShortOptionHandling: true, - Usage: "simple tool to run commands on filesystem change events", + Usage: "a simple tool to run things on filesystem change events", ArgsUsage: "", Version: Version, Flags: []cli.Flag{ @@ -77,17 +77,10 @@ func main() { Usage: "interactive mode, with stdin", }, - &cli.BoolFlag{ - Name: "sse", - Usage: "run watcher in sse mode", - }, - &cli.StringFlag{ Name: "sse-addr", HideDefault: false, - Usage: "run watcher in sse mode", - Sources: cli.ValueSourceChain{}, - Value: ":12345", + Usage: "run watcher with Server Side Events (SSE) enabled", }, }, Action: func(ctx context.Context, c *cli.Command) error { @@ -148,8 +141,7 @@ func main() { var executors []executor.Executor - if c.Bool("sse") { - sseAddr := c.String("sse-addr") + if sseAddr := c.String("sse-addr"); sseAddr != "" { executors = append(executors, executor.NewSSEExecutor(executor.SSEExecutorArgs{Addr: sseAddr})) } @@ -159,12 +151,12 @@ func main() { executors = append(executors, executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ Logger: logger, Interactive: c.Bool("interactive"), - Command: func(context.Context) *exec.Cmd { + Commands: func(context.Context) []*exec.Cmd { cmd := exec.CommandContext(ctx, execCmd, execArgs...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.Stdin = os.Stdin - return cmd + return []*exec.Cmd{cmd} }, })) } @@ -177,7 +169,7 @@ func main() { }, } - ctx, stop := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGABRT) + ctx, stop := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM) defer stop() if err := cmd.Run(ctx, os.Args); err != nil { diff --git a/pkg/executor/cmd-executor.go b/pkg/executor/cmd-executor.go index e121c9c..61e5823 100644 --- a/pkg/executor/cmd-executor.go +++ b/pkg/executor/cmd-executor.go @@ -12,7 +12,7 @@ import ( type CmdExecutor struct { logger *slog.Logger parentCtx context.Context - newCmd func(context.Context) *exec.Cmd + newCmds func(context.Context) []*exec.Cmd interactive bool @@ -22,7 +22,7 @@ type CmdExecutor struct { type CmdExecutorArgs struct { Logger *slog.Logger - Command func(context.Context) *exec.Cmd + Commands func(context.Context) []*exec.Cmd Interactive bool } @@ -34,7 +34,7 @@ func NewCmdExecutor(ctx context.Context, args CmdExecutorArgs) *CmdExecutor { return &CmdExecutor{ parentCtx: ctx, logger: args.Logger.With("component", "cmd-executor"), - newCmd: args.Command, + newCmds: args.Commands, mu: sync.Mutex{}, interactive: args.Interactive, } @@ -54,54 +54,57 @@ func (ex *CmdExecutor) Start() error { ex.abort = cf ex.mu.Unlock() - cmd := ex.newCmd(ctx) - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - if ex.interactive { - cmd.Stdin = os.Stdin - cmd.SysProcAttr.Foreground = true - } - - if err := cmd.Start(); err != nil { - return err - } + cmds := ex.newCmds(ctx) - done := make(chan error) - go func() { - done <- cmd.Wait() - }() + for _, cmd := range cmds { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if ex.interactive { + cmd.Stdin = os.Stdin + cmd.SysProcAttr.Foreground = true + } - select { - case <-ctx.Done(): - ex.logger.Debug("process context done") - case err := <-done: - ex.logger.Debug("process wait completed, got", "err", err) - } + if err := cmd.Start(); err != nil { + return err + } - ex.logger.Debug("process", "pid", cmd.Process.Pid) + done := make(chan error) + go func() { + done <- cmd.Wait() + }() - if ex.interactive { - // Send SIGTERM to the interactive process, as user will see it on his screen - proc, err := os.FindProcess(os.Getpid()) - if err != nil { - return err + select { + case <-ctx.Done(): + ex.logger.Debug("process context done") + case err := <-done: + ex.logger.Debug("process wait completed, got", "err", err) } - err = proc.Signal(syscall.SIGTERM) - if err != nil { - if err != syscall.ESRCH { - ex.logger.Error("failed to kill, got", "err", err) + ex.logger.Debug("process", "pid", cmd.Process.Pid) + + if ex.interactive { + // Send SIGTERM to the interactive process, as user will see it on his screen + proc, err := os.FindProcess(os.Getpid()) + if err != nil { + return err + } + + err = proc.Signal(syscall.SIGTERM) + if err != nil { + if err != syscall.ESRCH { + ex.logger.Error("failed to kill, got", "err", err) + return err + } return err } - return err } - } - if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { - if err == syscall.ESRCH { - return nil + if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { + if err == syscall.ESRCH { + return nil + } + ex.logger.Error("failed to kill, got", "err", err) + return err } - ex.logger.Error("failed to kill, got", "err", err) - return err } return nil diff --git a/pkg/watcher/lib.go b/pkg/watcher/lib.go index d201a27..951eae2 100644 --- a/pkg/watcher/lib.go +++ b/pkg/watcher/lib.go @@ -3,8 +3,6 @@ package watcher import ( "context" "fmt" - "os" - "path/filepath" "sync" "github.com/nxtcoder17/fwatcher/pkg/executor" @@ -41,20 +39,11 @@ func (f *Watcher) WatchAndExecute(ctx context.Context, executors []executor.Exec f.Watch(ctx) }() - pwd, err := os.Getwd() - if err != nil { - return nil - } - counter := 0 for event := range f.GetEvents() { f.Logger.Debug("received", "event", event) - relPath, err := filepath.Rel(pwd, event.Name) - if err != nil { - return err - } counter += 1 - f.Logger.Info(fmt.Sprintf("[RELOADING (%d)] due changes in %s", counter, relPath)) + f.Logger.Info(fmt.Sprintf("[RELOADING (%d)] due changes in %s", counter, event.Name)) for i := range executors { executors[i].OnWatchEvent(executor.Event{Source: event.Name}) From 71e0bd4ab1d9f163fb01de491a77c3faacd142f7 Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Tue, 14 Jan 2025 00:56:49 +0530 Subject: [PATCH 3/6] feat: fixes cmd executor's only first command execution bug - adds tests to cmd executor to prevent it from happening in future --- pkg/executor/cmd-executor.go | 28 +++++++----- pkg/executor/cmd-executor_test.go | 76 +++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 12 deletions(-) create mode 100644 pkg/executor/cmd-executor_test.go diff --git a/pkg/executor/cmd-executor.go b/pkg/executor/cmd-executor.go index 61e5823..3645761 100644 --- a/pkg/executor/cmd-executor.go +++ b/pkg/executor/cmd-executor.go @@ -13,6 +13,7 @@ type CmdExecutor struct { logger *slog.Logger parentCtx context.Context newCmds func(context.Context) []*exec.Cmd + commands []func(context.Context) *exec.Cmd interactive bool @@ -21,8 +22,9 @@ type CmdExecutor struct { } type CmdExecutorArgs struct { - Logger *slog.Logger - Commands func(context.Context) []*exec.Cmd + Logger *slog.Logger + // Commands func(context.Context) []*exec.Cmd + Commands []func(context.Context) *exec.Cmd Interactive bool } @@ -32,9 +34,10 @@ func NewCmdExecutor(ctx context.Context, args CmdExecutorArgs) *CmdExecutor { } return &CmdExecutor{ - parentCtx: ctx, - logger: args.Logger.With("component", "cmd-executor"), - newCmds: args.Commands, + parentCtx: ctx, + logger: args.Logger.With("component", "cmd-executor"), + // newCmds: args.Commands, + commands: args.Commands, mu: sync.Mutex{}, interactive: args.Interactive, } @@ -49,14 +52,14 @@ func (ex *CmdExecutor) OnWatchEvent(ev Event) error { // Start implements Executor. func (ex *CmdExecutor) Start() error { - ex.mu.Lock() - ctx, cf := context.WithCancel(ex.parentCtx) - ex.abort = cf - ex.mu.Unlock() + for i := range ex.commands { + ex.mu.Lock() + ctx, cf := context.WithCancel(ex.parentCtx) + ex.abort = cf + ex.mu.Unlock() - cmds := ex.newCmds(ctx) + cmd := ex.commands[i](ctx) - for _, cmd := range cmds { cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} if ex.interactive { cmd.Stdin = os.Stdin @@ -100,11 +103,12 @@ func (ex *CmdExecutor) Start() error { if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { if err == syscall.ESRCH { - return nil + continue } ex.logger.Error("failed to kill, got", "err", err) return err } + ex.logger.Debug("command fully executed and processed") } return nil diff --git a/pkg/executor/cmd-executor_test.go b/pkg/executor/cmd-executor_test.go new file mode 100644 index 0000000..2e36c27 --- /dev/null +++ b/pkg/executor/cmd-executor_test.go @@ -0,0 +1,76 @@ +package executor + +import ( + "bytes" + "context" + "io" + "log/slog" + "os/exec" + "strings" + "testing" +) + +func Test_Exectuor_Start(t *testing.T) { + newCmd := func(stdout io.Writer, cmd string, args ...string) func(c context.Context) *exec.Cmd { + return func(c context.Context) *exec.Cmd { + cmd := exec.CommandContext(c, cmd, args...) + cmd.Stdout = stdout + return cmd + } + } + + tests := []struct { + name string + commands func(stdout io.Writer) []func(c context.Context) *exec.Cmd + output []string + }{ + // TODO: add your tests + { + name: "1. with single command", + commands: func(stdout io.Writer) []func(c context.Context) *exec.Cmd { + return []func(c context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + } + }, + output: []string{ + "hi", + }, + }, + { + name: "2. with multiple commands", + commands: func(stdout io.Writer) []func(c context.Context) *exec.Cmd { + return []func(c context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + } + }, + output: []string{ + "hi", + "hello", + }, + }, + } + + logger := slog.Default() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := new(bytes.Buffer) + ex := NewCmdExecutor(context.TODO(), CmdExecutorArgs{ + Logger: logger, + Commands: tt.commands(b), + Interactive: false, + }) + + if err := ex.Start(); err != nil { + t.Error(err) + } + + want := strings.Join(tt.output, "\n") + got := strings.TrimSpace(b.String()) + + if got != want { + t.Errorf("FAILED (%s)\n\t got: %s\n\twant: %s\n", tt.name, got, want) + } + }) + } +} From 4504748399c2346848d1fcd6f9e1507049e2fda4 Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Fri, 17 Jan 2025 00:40:59 +0530 Subject: [PATCH 4/6] test: fixes executor flows, and adds tests --- Taskfile.yml | 4 + cmd/main.go | 14 +- pkg/executor/cmd-executor.go | 4 +- pkg/executor/cmd-executor_test.go | 2 +- pkg/watcher/lib.go | 59 +++- pkg/watcher/lib_test.go | 460 ++++++++++++++++++++++++++++++ 6 files changed, 520 insertions(+), 23 deletions(-) create mode 100644 pkg/watcher/lib_test.go diff --git a/Taskfile.yml b/Taskfile.yml index fc25563..1306141 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -30,6 +30,10 @@ tasks: upx ./bin/{{.binary}} fi + test:executor: + cmds: + - go test ./pkg/executor/... + build:dev: cmds: - go build -o ./bin/fwatcher ./cmd diff --git a/cmd/main.go b/cmd/main.go index 2f39a9d..02909e1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -151,12 +151,14 @@ func main() { executors = append(executors, executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ Logger: logger, Interactive: c.Bool("interactive"), - Commands: func(context.Context) []*exec.Cmd { - cmd := exec.CommandContext(ctx, execCmd, execArgs...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Stdin = os.Stdin - return []*exec.Cmd{cmd} + Commands: []func(context.Context) *exec.Cmd{ + func(c context.Context) *exec.Cmd { + cmd := exec.CommandContext(ctx, execCmd, execArgs...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Stdin = os.Stdin + return cmd + }, }, })) } diff --git a/pkg/executor/cmd-executor.go b/pkg/executor/cmd-executor.go index 3645761..3819c88 100644 --- a/pkg/executor/cmd-executor.go +++ b/pkg/executor/cmd-executor.go @@ -77,9 +77,9 @@ func (ex *CmdExecutor) Start() error { select { case <-ctx.Done(): - ex.logger.Debug("process context done") + ex.logger.Debug("process finished (context cancelled)") case err := <-done: - ex.logger.Debug("process wait completed, got", "err", err) + ex.logger.Debug("process finished (wait completed), got", "err", err) } ex.logger.Debug("process", "pid", cmd.Process.Pid) diff --git a/pkg/executor/cmd-executor_test.go b/pkg/executor/cmd-executor_test.go index 2e36c27..531d071 100644 --- a/pkg/executor/cmd-executor_test.go +++ b/pkg/executor/cmd-executor_test.go @@ -37,7 +37,7 @@ func Test_Exectuor_Start(t *testing.T) { }, }, { - name: "2. with multiple commands", + name: "2. testing", commands: func(stdout io.Writer) []func(c context.Context) *exec.Cmd { return []func(c context.Context) *exec.Cmd{ newCmd(stdout, "echo", "hi"), diff --git a/pkg/watcher/lib.go b/pkg/watcher/lib.go index 951eae2..98ba9f4 100644 --- a/pkg/watcher/lib.go +++ b/pkg/watcher/lib.go @@ -11,32 +11,63 @@ import ( func (f *Watcher) WatchAndExecute(ctx context.Context, executors []executor.Executor) error { var wg sync.WaitGroup - for _i := range executors { - i := _i - ex := executors[i] + l := len(executors) - wg.Add(1) - go func() { - defer wg.Done() - if err := ex.Start(); err != nil { - f.Logger.Error("got", "err", err) - } - f.Logger.Debug("1. executor start finished", "executor", i) - }() + for i := 0; i < l-1; i++ { + ex := executors[i] - wg.Add(1) go func() { - defer wg.Done() <-ctx.Done() ex.Stop() - f.Logger.Debug("2. passed context is DONE", "executor", i) }() + + switch ex.(type) { + case *executor.SSEExectuor: + { + wg.Add(1) + go func() { + defer wg.Done() + ex.Start() + }() + } + default: + { + if err := ex.Start(); err != nil { + return err + } + + // INFO: just for cleanup purposes + if err := ex.Stop(); err != nil { + return err + } + } + } } + ex := executors[l-1] + + wg.Add(1) + go func() { + defer wg.Done() + if err := ex.Start(); err != nil { + f.Logger.Error("got", "err", err) + } + f.Logger.Debug("final executor start finished") + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + ex.Stop() + f.Logger.Debug("2. context cancelled") + }() + wg.Add(1) go func() { defer wg.Done() f.Watch(ctx) + f.Logger.Debug("3. watcher closed") }() counter := 0 diff --git a/pkg/watcher/lib_test.go b/pkg/watcher/lib_test.go new file mode 100644 index 0000000..628142a --- /dev/null +++ b/pkg/watcher/lib_test.go @@ -0,0 +1,460 @@ +package watcher + +import ( + "bytes" + "context" + "io" + "log/slog" + "os" + "os/exec" + "strings" + "testing" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/nxtcoder17/fwatcher/pkg/executor" +) + +func Test_Watcher_WatchAndExecute(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + + slog.SetDefault(logger) + + newCmd := func(stdout io.Writer, cmd string, args ...string) func(c context.Context) *exec.Cmd { + return func(c context.Context) *exec.Cmd { + cmd := exec.CommandContext(c, cmd, args...) + cmd.Stdout = stdout + return cmd + } + } + + tests := []struct { + name string + sendEvents func(ch chan Event) + executors func(ctx context.Context, stdout io.Writer) []executor.Executor + want []string + }{ + { + name: "1. single executor, single command", + sendEvents: func(ch chan Event) { + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + }, + }, + + { + name: "2. single executor, multiple commands", + sendEvents: func(ch chan Event) { + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + }, + }, + + { + name: "3. multiple executor, single command each", + sendEvents: func(ch chan Event) { + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + }, + }, + + { + name: "4. multiple executor, multiple commands each", + sendEvents: func(ch chan Event) { + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "no hi"), + newCmd(stdout, "echo", "no hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + "no hi", + "no hello", + }, + }, + + { + name: "5. single executor, single command, single change event", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + // this one after first event + "hi", + }, + }, + + { + name: "6. single executor, single command, multiple change events", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + + <-time.After(40 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + + // this one after first event + "hi", + + // this one after second event + "hi", + }, + }, + + { + name: "7. single executor, multiple commands, single change event", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + + // after first change event + "hi", + "hello", + }, + }, + + { + name: "8. single executor, multiple commands, multiple change events", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + + // after first change event + "hi", + "hello", + + // after second change event + "hi", + "hello", + }, + }, + + { + name: "9. multiple executor, single command, single change event", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + // after first change event + "hi", + }, + }, + + { + name: "10. multiple executor, single command, multiple change event", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + + // after first change event + "hi", + + // after second change event + "hi", + }, + }, + + { + name: "11. multiple executor, multiple commands, single change event", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + + // after first change event + "hi", + "hello", + + // after second change event + "hi", + "hello", + }, + }, + + { + name: "12. multiple executor, multiple commands, multiple change events", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + + // after first change event + "hi", + "hello", + }, + }, + + { + name: "13. multiple executor with SSE, multiple commands, multiple change events", + sendEvents: func(ch chan Event) { + <-time.After(20 * time.Millisecond) + ch <- Event{Name: "sample", Op: fsnotify.Create} + }, + + executors: func(ctx context.Context, stdout io.Writer) []executor.Executor { + return []executor.Executor{ + executor.NewSSEExecutor(executor.SSEExecutorArgs{ + Addr: ":8919", + Logger: logger, + }), + + executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{ + Logger: logger, + Commands: []func(context.Context) *exec.Cmd{ + newCmd(stdout, "echo", "hi"), + newCmd(stdout, "echo", "hello"), + }, + Interactive: false, + }), + } + }, + want: []string{ + "hi", + "hello", + + // after first change event + "hi", + "hello", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + eventCh := make(chan Event) + go tt.sendEvents(eventCh) + + b := new(bytes.Buffer) + + w, _ := fsnotify.NewWatcher() + + watcher := Watcher{ + watcher: w, + Logger: logger, + cooldownDuration: 5 * time.Millisecond, + eventsCh: eventCh, + } + + ctx, cf := context.WithTimeout(context.TODO(), 100*time.Millisecond) + // ctx, cf := context.WithCancel(context.TODO()) + defer cf() + + executors := tt.executors(ctx, b) + + if err := watcher.WatchAndExecute(ctx, executors); err != nil { + t.Error(err) + } + + want := strings.Join(tt.want, "\n") + got := strings.TrimSpace(b.String()) + + if got != want { + t.Errorf("FAILED (%s)\n\t got: %s\n\twant: %s\n", tt.name, got, want) + } + }) + } +} From 2217d37d1bbd1068cfa724c48df0a0723f50132e Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Fri, 17 Jan 2025 13:48:08 +0530 Subject: [PATCH 5/6] feat: adds watch, and ignore extensions, along with dirs --- Taskfile.yml | 6 ++++++ flake.nix | 2 ++ pkg/executor/cmd-executor.go | 10 +++++++--- pkg/watcher/lib.go | 1 + pkg/watcher/lib_test.go | 7 ++++++- pkg/watcher/watcher.go | 23 +++++++++++++++++++++-- 6 files changed, 43 insertions(+), 6 deletions(-) diff --git a/Taskfile.yml b/Taskfile.yml index 1306141..10d2127 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -34,6 +34,12 @@ tasks: cmds: - go test ./pkg/executor/... + test:watcher: + env: + DEBUG: false + cmds: + - go test -json ./pkg/watcher/... | gotestfmt + build:dev: cmds: - go build -o ./bin/fwatcher ./cmd diff --git a/flake.nix b/flake.nix index 6080261..ba7028f 100644 --- a/flake.nix +++ b/flake.nix @@ -20,6 +20,8 @@ pre-commit go_1_22 + gotestfmt + upx go-task ]; diff --git a/pkg/executor/cmd-executor.go b/pkg/executor/cmd-executor.go index 3819c88..319a882 100644 --- a/pkg/executor/cmd-executor.go +++ b/pkg/executor/cmd-executor.go @@ -77,9 +77,11 @@ func (ex *CmdExecutor) Start() error { select { case <-ctx.Done(): - ex.logger.Debug("process finished (context cancelled)") + ex.logger.Debug("process finished (context cancelled)", "command", cmd.String()) + case <-ex.parentCtx.Done(): + ex.logger.Debug("process finished (parent context cancelled)", "command", cmd.String()) case err := <-done: - ex.logger.Debug("process finished (wait completed), got", "err", err) + ex.logger.Debug("process finished (wait completed), got", "err", err, "command", cmd.String()) } ex.logger.Debug("process", "pid", cmd.Process.Pid) @@ -101,11 +103,13 @@ func (ex *CmdExecutor) Start() error { } } + ex.logger.Debug("about to kill", "process", cmd.Process.Pid) if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { + ex.logger.Error("failed to kill, got", "err", err) if err == syscall.ESRCH { continue } - ex.logger.Error("failed to kill, got", "err", err) + // ex.logger.Error("failed to kill, got", "err", err) return err } ex.logger.Debug("command fully executed and processed") diff --git a/pkg/watcher/lib.go b/pkg/watcher/lib.go index 98ba9f4..e89a8d1 100644 --- a/pkg/watcher/lib.go +++ b/pkg/watcher/lib.go @@ -74,6 +74,7 @@ func (f *Watcher) WatchAndExecute(ctx context.Context, executors []executor.Exec for event := range f.GetEvents() { f.Logger.Debug("received", "event", event) counter += 1 + f.Logger.Info(fmt.Sprintf("[RELOADING (%d)] due changes in %s", counter, event.Name)) for i := range executors { diff --git a/pkg/watcher/lib_test.go b/pkg/watcher/lib_test.go index 628142a..10ab6dd 100644 --- a/pkg/watcher/lib_test.go +++ b/pkg/watcher/lib_test.go @@ -16,8 +16,13 @@ import ( ) func Test_Watcher_WatchAndExecute(t *testing.T) { + logLevel := slog.LevelInfo + if os.Getenv("DEBUG") == "true" { + logLevel = slog.LevelDebug + } + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ - Level: slog.LevelDebug, + Level: logLevel, })) slog.SetDefault(logger) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index a271868..51ddea8 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -20,8 +20,9 @@ type Watcher struct { Logger *slog.Logger OnlySuffixes []string IgnoreSuffixes []string - ExcludeDirs map[string]struct{} - watchingDirs map[string]struct{} + + ExcludeDirs map[string]struct{} + watchingDirs map[string]struct{} cooldownDuration time.Duration @@ -237,6 +238,10 @@ var DefaultIgnoreList = []string{ ".log", // logs } +var DefaultIgnoreExtensions = []string{ + ".log", +} + func NewWatcher(ctx context.Context, args WatcherArgs) (*Watcher, error) { if args.Logger == nil { args.Logger = slog.Default() @@ -256,6 +261,20 @@ func NewWatcher(ctx context.Context, args WatcherArgs) (*Watcher, error) { excludeDirs[dir] = struct{}{} } + for _, dir := range args.WatchDirs { + if strings.HasPrefix(dir, "-") { + excludeDirs[dir[1:]] = struct{}{} + } + } + + args.IgnoreExtensions = append(args.IgnoreExtensions, DefaultIgnoreExtensions...) + + for _, ext := range args.WatchExtensions { + if strings.HasPrefix(ext, "-") { + excludeDirs[ext[1:]] = struct{}{} + } + } + watcher, err := fsnotify.NewWatcher() if err != nil { args.Logger.Error("failed to create watcher, got", "err", err) From 0901da9fe445b40cfaf7594773a8b5024b580575 Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Fri, 17 Jan 2025 23:50:55 +0530 Subject: [PATCH 6/6] fix: fixes executor command control with mutex over Start() call --- pkg/executor/cmd-executor.go | 82 ++++++++++++++++++++++-------------- pkg/watcher/watcher.go | 45 ++++++++++++++------ 2 files changed, 83 insertions(+), 44 deletions(-) diff --git a/pkg/executor/cmd-executor.go b/pkg/executor/cmd-executor.go index 319a882..5608c12 100644 --- a/pkg/executor/cmd-executor.go +++ b/pkg/executor/cmd-executor.go @@ -12,18 +12,17 @@ import ( type CmdExecutor struct { logger *slog.Logger parentCtx context.Context - newCmds func(context.Context) []*exec.Cmd commands []func(context.Context) *exec.Cmd interactive bool - mu sync.Mutex - abort func() + mu sync.Mutex + + kill func() error } type CmdExecutorArgs struct { - Logger *slog.Logger - // Commands func(context.Context) []*exec.Cmd + Logger *slog.Logger Commands []func(context.Context) *exec.Cmd Interactive bool } @@ -34,9 +33,8 @@ func NewCmdExecutor(ctx context.Context, args CmdExecutorArgs) *CmdExecutor { } return &CmdExecutor{ - parentCtx: ctx, - logger: args.Logger.With("component", "cmd-executor"), - // newCmds: args.Commands, + parentCtx: ctx, + logger: args.Logger, commands: args.Commands, mu: sync.Mutex{}, interactive: args.Interactive, @@ -50,13 +48,36 @@ func (ex *CmdExecutor) OnWatchEvent(ev Event) error { return nil } +func killPID(pid int, logger ...*slog.Logger) error { + var l *slog.Logger + if len(logger) > 0 { + l = logger[0] + } else { + l = slog.Default() + } + + l.Debug("about to kill", "process", pid) + if err := syscall.Kill(-pid, syscall.SIGKILL); err != nil { + if err == syscall.ESRCH { + return nil + } + l.Error("failed to kill, got", "err", err) + return err + } + return nil +} + // Start implements Executor. func (ex *CmdExecutor) Start() error { + ex.mu.Lock() + defer ex.mu.Unlock() for i := range ex.commands { - ex.mu.Lock() + if err := ex.parentCtx.Err(); err != nil { + return err + } + ctx, cf := context.WithCancel(ex.parentCtx) - ex.abort = cf - ex.mu.Unlock() + defer cf() cmd := ex.commands[i](ctx) @@ -70,22 +91,26 @@ func (ex *CmdExecutor) Start() error { return err } - done := make(chan error) + logger := ex.logger.With("pid", cmd.Process.Pid, "command", i+1) + + ex.kill = func() error { + return killPID(cmd.Process.Pid, logger) + } + go func() { - done <- cmd.Wait() + if err := cmd.Wait(); err != nil { + logger.Debug("process finished (wait completed), got", "err", err) + } + cf() }() select { case <-ctx.Done(): - ex.logger.Debug("process finished (context cancelled)", "command", cmd.String()) + logger.Debug("process finished (context cancelled)") case <-ex.parentCtx.Done(): - ex.logger.Debug("process finished (parent context cancelled)", "command", cmd.String()) - case err := <-done: - ex.logger.Debug("process finished (wait completed), got", "err", err, "command", cmd.String()) + logger.Debug("process finished (parent context cancelled)") } - ex.logger.Debug("process", "pid", cmd.Process.Pid) - if ex.interactive { // Send SIGTERM to the interactive process, as user will see it on his screen proc, err := os.FindProcess(os.Getpid()) @@ -96,23 +121,18 @@ func (ex *CmdExecutor) Start() error { err = proc.Signal(syscall.SIGTERM) if err != nil { if err != syscall.ESRCH { - ex.logger.Error("failed to kill, got", "err", err) + logger.Error("failed to kill, got", "err", err) return err } return err } } - ex.logger.Debug("about to kill", "process", cmd.Process.Pid) - if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { - ex.logger.Error("failed to kill, got", "err", err) - if err == syscall.ESRCH { - continue - } - // ex.logger.Error("failed to kill, got", "err", err) + if err := ex.kill(); err != nil { return err } - ex.logger.Debug("command fully executed and processed") + + logger.Debug("command fully executed and processed") } return nil @@ -120,11 +140,9 @@ func (ex *CmdExecutor) Start() error { // Stop implements Executor. func (ex *CmdExecutor) Stop() error { - ex.mu.Lock() - if ex.abort != nil { - ex.abort() + if ex.kill != nil { + return ex.kill() } - ex.mu.Unlock() return nil } diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 51ddea8..7929da3 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -27,6 +27,8 @@ type Watcher struct { cooldownDuration time.Duration eventsCh chan Event + + shouldLogWatchEvents bool } // GetEvents implements Watcher. @@ -73,7 +75,6 @@ func (f Watcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason string) for _, suffix := range f.IgnoreSuffixes { if strings.HasSuffix(event.Name, suffix) { - f.Logger.Debug("file is ignored", "file", event.Name) return true, fmt.Sprintf("because, file has suffix (%s), which is in ignore suffixes array(%+v)", suffix, f.IgnoreSuffixes) } } @@ -84,7 +85,6 @@ func (f Watcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason string) matched := false for _, suffix := range f.OnlySuffixes { - f.Logger.Debug(fmt.Sprintf("[only-suffix] suffix: (%s), event.name: %s", suffix, event.Name)) if strings.HasSuffix(event.Name, suffix) { matched = true break @@ -127,27 +127,39 @@ func (f *Watcher) Watch(ctx context.Context) { } t := time.Now() - f.Logger.Debug(fmt.Sprintf("event %+v received", event)) + if f.shouldLogWatchEvents { + f.Logger.Debug(fmt.Sprintf("event %+v received", event)) + } if ignore, reason := f.ignoreEvent(event); ignore { - f.Logger.Debug("IGNORING", "event.name", event.Name, "reason", reason) + if f.shouldLogWatchEvents { + f.Logger.Debug("IGNORING", "event.name", event.Name, "reason", reason) + } continue } - f.Logger.Debug("PROCESSING", "event.name", event.Name, "event.op", event.Op.String()) + if f.shouldLogWatchEvents { + f.Logger.Debug("PROCESSING", "event.name", event.Name, "event.op", event.Op.String()) + } if time.Since(lastProcessingTime) < f.cooldownDuration { - f.Logger.Debug(fmt.Sprintf("too many events under %s, ignoring...", f.cooldownDuration.String()), "event.name", event.Name) + if f.shouldLogWatchEvents { + f.Logger.Debug(fmt.Sprintf("too many events under %s, ignoring...", f.cooldownDuration.String()), "event.name", event.Name) + } continue } f.eventsCh <- Event(event) - f.Logger.Debug("watch loop completed", "took", fmt.Sprintf("%dms", time.Since(t).Milliseconds())) + if f.shouldLogWatchEvents { + f.Logger.Debug("watch loop completed", "took", fmt.Sprintf("%dms", time.Since(t).Milliseconds())) + } } case <-ctx.Done(): - f.Logger.Debug("watcher is closing", "reason", "context closed") + if f.shouldLogWatchEvents { + f.Logger.Debug("watcher is closing", "reason", "context closed") + } close(f.eventsCh) f.watcher.Close() return @@ -175,7 +187,9 @@ func (f *Watcher) RecursiveAdd(dirs ...string) error { } if _, ok := f.ExcludeDirs[filepath.Base(dir)]; ok { - f.Logger.Debug("EXCLUDED from watchlist", "dir", dir) + if f.shouldLogWatchEvents { + f.Logger.Debug("EXCLUDED from watchlist", "dir", dir) + } continue } @@ -206,7 +220,9 @@ func (f *Watcher) addToWatchList(dir string) error { return err } f.directoryCount++ - f.Logger.Debug("ADDED to watchlist", "dir", dir, "count", f.directoryCount) + if f.shouldLogWatchEvents { + f.Logger.Debug("ADDED to watchlist", "dir", dir, "count", f.directoryCount) + } return nil } @@ -226,6 +242,8 @@ type WatcherArgs struct { CooldownDuration *time.Duration Interactive bool + + ShouldLogWatchEvents bool } // DefaultIgnoreList is list of directories that are mostly ignored @@ -257,7 +275,9 @@ func NewWatcher(ctx context.Context, args WatcherArgs) (*Watcher, error) { excludeDirs := map[string]struct{}{} for _, dir := range args.IgnoreDirs { - args.Logger.Debug("EXCLUDED from watching", "dir", dir) + if args.ShouldLogWatchEvents { + args.Logger.Debug("EXCLUDED from watching", "dir", dir) + } excludeDirs[dir] = struct{}{} } @@ -295,7 +315,8 @@ func NewWatcher(ctx context.Context, args WatcherArgs) (*Watcher, error) { cooldownDuration: cooldown, watchingDirs: make(map[string]struct{}), - eventsCh: make(chan Event), + shouldLogWatchEvents: args.ShouldLogWatchEvents, + eventsCh: make(chan Event), } if err := fsw.RecursiveAdd(args.WatchDirs...); err != nil {