diff --git a/main.go b/main.go index 69d9ce6..14b25e5 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "os/signal" "path/filepath" "strings" + "syscall" "time" "github.com/nxtcoder17/fwatcher/pkg/executor" @@ -19,7 +20,7 @@ import ( var Version string func main() { - ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + ctx, stop := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM) defer stop() app := &cli.App{ @@ -32,7 +33,13 @@ func main() { Name: "debug", Usage: "toggles showing debug logs", Required: false, - Value: false, + // Value: false, + }, + &cli.BoolFlag{ + Name: "interactive", + Usage: "runs fwatcher with interactive commands", + Required: false, + // Value: false, }, &cli.StringFlag{ Name: "command", @@ -116,7 +123,8 @@ func main() { } ex := executor.NewExecutor(executor.ExecutorArgs{ - Logger: logger, + Logger: logger, + IsInteractive: cctx.Bool("interactive"), Command: func(ctx context.Context) *exec.Cmd { cmd := exec.CommandContext(ctx, execCmd, execArgs...) cmd.Stdout = os.Stdout @@ -126,7 +134,7 @@ func main() { }, }) - watcher, err := fswatcher.NewWatcher(fswatcher.WatcherArgs{ + watcher, err := fswatcher.NewWatcher(ctx, fswatcher.WatcherArgs{ Logger: logger, WatchDirs: cctx.StringSlice("dir"), @@ -143,13 +151,18 @@ func main() { go func() { <-ctx.Done() - logger.Debug("fwatcher is closing ...") + logger.Debug("fwatcher is closing 2...") + watcher.Close() <-time.After(200 * time.Millisecond) os.Exit(0) }() pwd, _ := os.Getwd() watcher.WatchEvents(func(event fswatcher.Event, fp string) error { + if ctx.Err() != nil { + logger.Debug("fwatcher is closed ...") + return nil + } relPath, err := filepath.Rel(pwd, fp) if err != nil { return err diff --git a/pkg/executor/exec.go b/pkg/executor/exec.go index a129eaa..3513feb 100644 --- a/pkg/executor/exec.go +++ b/pkg/executor/exec.go @@ -12,17 +12,21 @@ import ( ) type Executor struct { - done chan os.Signal - logger *slog.Logger - isRunning bool + done chan os.Signal + logger *slog.Logger + + mu sync.Mutex + running *os.Process + + isInteractive bool newCmd func(context.Context) *exec.Cmd - mu sync.Mutex } type ExecutorArgs struct { - Logger *slog.Logger - Command func(context.Context) *exec.Cmd + Logger *slog.Logger + Command func(context.Context) *exec.Cmd + IsInteractive bool } func NewExecutor(args ExecutorArgs) *Executor { @@ -34,65 +38,139 @@ func NewExecutor(args ExecutorArgs) *Executor { } return &Executor{ - done: done, - logger: args.Logger, - newCmd: args.Command, - mu: sync.Mutex{}, + done: done, + logger: args.Logger, + isInteractive: args.IsInteractive, + newCmd: args.Command, + mu: sync.Mutex{}, } } +// ctx is process context +func (ex *Executor) handleExits(ctx context.Context, cf context.CancelFunc) { + if ex.running == nil { + return + } + + defer func() { + ex.running = nil + cf() + }() + + select { + case <-ctx.Done(): + // INFO: process exited + ex.logger.Debug("process terminated", "pid", ex.running.Pid) + // if ex.running != nil { + // } + + if ex.isInteractive { + os.Exit(0) + } + + case sig := <-ex.done: + // INFO: fwatcher exited + ex.logger.Debug("executor terminated", "received-signal", sig) + cf() + } + + for len(ex.done) > 0 { + <-ex.done + } + + if ex.isInteractive { + // INFO: interactive, only kill the process remains + ex.logger.Debug("[exec] killing process", "pid", ex.running.Pid) + ex.running.Signal(syscall.SIGKILL) + return + } + + // INFO: non-interactive killing the entire child tree + ex.logger.Debug("[exec] killing process", "pid", ex.running.Pid) + if err := syscall.Kill(-ex.running.Pid, syscall.SIGKILL); err != nil { + if err.Error() != "no such process" { + ex.logger.Error("failed to kill process", "pid", ex.running.Pid, "err", err) + } + } +} + +// DO NOT USE: does not work yet. func (ex *Executor) Exec() error { ex.logger.Debug("[exec:pre] starting process") ex.mu.Lock() - ex.isRunning = true - defer func() { - ex.isRunning = false - ex.mu.Unlock() - }() + defer ex.mu.Unlock() ctx, cf := context.WithCancel(context.TODO()) defer cf() cmd := ex.newCmd(ctx) - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + if !ex.isInteractive { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + } if err := cmd.Start(); err != nil { return err } - go func() { - select { - case <-ctx.Done(): - ex.logger.Debug("process terminated", "pid", cmd.Process.Pid) - case <-ex.done: - ex.logger.Debug("executor terminated", "pid", cmd.Process.Pid) - } - for len(ex.done) > 0 { - <-ex.done - } - - ex.logger.Debug("[exec] killing process", "pid", cmd.Process.Pid) - if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { - if err.Error() != "no such process" { - ex.logger.Error("failed to kill process", "pid", cmd.Process.Pid, "err", err) - } + ex.running = cmd.Process + + go ex.handleExits(ctx, cf) + + // defer func() { ex.running = nil }() + // go func() { + // select { + // case <-ctx.Done(): + // // INFO: process exited + // ex.logger.Debug("process terminated", "pid", cmd.Process.Pid) + // <-time.After(1 * time.Second) + // if ex.isInteractive { + // os.Exit(0) + // } + // + // case sig := <-ex.done: + // // INFO: fwatcher exited + // ex.logger.Debug("executor terminated", "received-signal", sig) + // cf() + // } + // + // for len(ex.done) > 0 { + // <-ex.done + // } + // + // if cmd.Process != nil { + // ex.logger.Debug("[exec] killing process", "pid", cmd.Process.Pid) + // cmd.Process.Signal(syscall.SIGKILL) + // } + // + // ex.logger.Debug("[exec] killing process", "pid", cmd.Process.Pid) + // if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { + // if err.Error() != "no such process" { + // ex.logger.Error("failed to kill process", "pid", cmd.Process.Pid, "err", err) + // } + // } + // }() + + // ex.logger.Debug("[exec:post] process running") + if err := cmd.Wait(); err != nil { + ex.logger.Debug("cmd terminated with", "err", err) + err2, ok := err.(*exec.ExitError) + if ok { + ex.logger.Debug("cmd terminated with", "exit-code", err2.ExitCode()) } - }() - ex.logger.Debug("[exec:post] process running") - if err := cmd.Wait(); err != nil { if strings.HasPrefix(err.Error(), "signal:") { ex.logger.Debug("wait terminated, received", "signal", err.Error()) } ex.logger.Debug("while waiting, got", "err", err) } + ex.logger.Debug("[exec] killed process", "pid", cmd.Process.Pid) return nil } func (ex *Executor) Kill() { - if ex.isRunning { + if ex.running != nil { ex.done <- os.Signal(syscall.SIGTERM) } } diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 6c58fe4..0b73972 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -1,6 +1,7 @@ package watcher import ( + "context" "fmt" "log/slog" "os" @@ -17,14 +18,15 @@ type Watcher interface { WatchEvents(func(event Event, fp string) error) } -type eventInfo struct { - Time time.Time - FileInfo os.FileInfo - Counter int -} +// type eventInfo struct { +// Time time.Time +// FileInfo os.FileInfo +// Counter int +// } type fsnWatcher struct { - watcher *fsnotify.Watcher + watcher *fsnotify.Watcher + lfContext context.Context directoryCount int @@ -93,7 +95,6 @@ func (f fsnWatcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason strin } func (f *fsnWatcher) WatchEvents(watcherFunc func(event Event, fp string) error) { - // f.eventMap = map[string]eventInfo{} lastProcessingTime := time.Now() for { select { @@ -113,30 +114,15 @@ func (f *fsnWatcher) WatchEvents(watcherFunc func(event Event, fp string) error) f.Logger.Debug("PROCESSING", "event.name", event.Name, "event.op", event.Op.String()) - // _, err := os.Stat(event.Name) - // if err != nil { - // return - // } - - // eInfo, ok := f.eventMap[event.Name] - // if !ok { - // eInfo = eventInfo{Time: time.Now(), FileInfo: nil, Counter: 0} - // } - // eInfo.Counter += 1 - // f.eventMap[event.Name] = eInfo - if time.Since(lastProcessingTime) < f.cooldownDuration { f.Logger.Debug(fmt.Sprintf("too many events under %s, ignoring...", f.cooldownDuration.String()), "event.name", event.Name) continue } - - if err := watcherFunc(Event(event), event.Name); err != nil { + abs, _ := filepath.Abs(event.Name) + if err := watcherFunc(Event(event), abs); err != nil { f.Logger.Error("while processing event, got", "err", err) return } - // eInfo.Time = time.Now() - // eInfo.Counter = 0 - // f.eventMap[event.Name] = eInfo f.Logger.Debug("watch loop completed", "took", fmt.Sprintf("%dms", time.Since(t).Milliseconds())) } @@ -145,6 +131,13 @@ func (f *fsnWatcher) WatchEvents(watcherFunc func(event Event, fp string) error) return } f.Logger.Error("watcher error", "err", err) + case <-f.lfContext.Done(): + // when fwatcher is closing, cleaning up the watcher events and closing, otherwise rase condition might lead to staring command again + for item := range f.watcher.Events { + _ = item + } + f.Logger.Debug("fwatcher is closing ...") + return } } } @@ -219,7 +212,7 @@ type WatcherArgs struct { CooldownDuration *time.Duration } -func NewWatcher(args WatcherArgs) (Watcher, error) { +func NewWatcher(ctx context.Context, args WatcherArgs) (Watcher, error) { if args.Logger == nil { args.Logger = slog.Default() } @@ -252,6 +245,7 @@ func NewWatcher(args WatcherArgs) (Watcher, error) { fsw := &fsnWatcher{ watcher: watcher, + lfContext: ctx, Logger: args.Logger, ExcludeDirs: excludeDirs, IgnoreSuffixes: args.IgnoreSuffixes,