Skip to content

Commit

Permalink
Merge pull request [#3](#3)
Browse files Browse the repository at this point in the history
feat: fixes executor for interactive programs
  • Loading branch information
nxtcoder17 authored Dec 13, 2024
2 parents beb84ec + 8dde257 commit 54ae2d2
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 66 deletions.
23 changes: 18 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"

"github.com/nxtcoder17/fwatcher/pkg/executor"
Expand All @@ -19,7 +20,7 @@ import (
var Version string

func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
ctx, stop := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

app := &cli.App{
Expand All @@ -32,7 +33,13 @@ func main() {
Name: "debug",
Usage: "toggles showing debug logs",
Required: false,
Value: false,
// Value: false,
},
&cli.BoolFlag{
Name: "interactive",
Usage: "runs fwatcher with interactive commands",
Required: false,
// Value: false,
},
&cli.StringFlag{
Name: "command",
Expand Down Expand Up @@ -116,7 +123,8 @@ func main() {
}

ex := executor.NewExecutor(executor.ExecutorArgs{
Logger: logger,
Logger: logger,
IsInteractive: cctx.Bool("interactive"),
Command: func(ctx context.Context) *exec.Cmd {
cmd := exec.CommandContext(ctx, execCmd, execArgs...)
cmd.Stdout = os.Stdout
Expand All @@ -126,7 +134,7 @@ func main() {
},
})

watcher, err := fswatcher.NewWatcher(fswatcher.WatcherArgs{
watcher, err := fswatcher.NewWatcher(ctx, fswatcher.WatcherArgs{
Logger: logger,

WatchDirs: cctx.StringSlice("dir"),
Expand All @@ -143,13 +151,18 @@ func main() {

go func() {
<-ctx.Done()
logger.Debug("fwatcher is closing ...")
logger.Debug("fwatcher is closing 2...")
watcher.Close()
<-time.After(200 * time.Millisecond)
os.Exit(0)
}()

pwd, _ := os.Getwd()
watcher.WatchEvents(func(event fswatcher.Event, fp string) error {
if ctx.Err() != nil {
logger.Debug("fwatcher is closed ...")
return nil
}
relPath, err := filepath.Rel(pwd, fp)
if err != nil {
return err
Expand Down
150 changes: 114 additions & 36 deletions pkg/executor/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@ import (
)

type Executor struct {
done chan os.Signal
logger *slog.Logger
isRunning bool
done chan os.Signal
logger *slog.Logger

mu sync.Mutex
running *os.Process

isInteractive bool

newCmd func(context.Context) *exec.Cmd
mu sync.Mutex
}

type ExecutorArgs struct {
Logger *slog.Logger
Command func(context.Context) *exec.Cmd
Logger *slog.Logger
Command func(context.Context) *exec.Cmd
IsInteractive bool
}

func NewExecutor(args ExecutorArgs) *Executor {
Expand All @@ -34,65 +38,139 @@ func NewExecutor(args ExecutorArgs) *Executor {
}

return &Executor{
done: done,
logger: args.Logger,
newCmd: args.Command,
mu: sync.Mutex{},
done: done,
logger: args.Logger,
isInteractive: args.IsInteractive,
newCmd: args.Command,
mu: sync.Mutex{},
}
}

// ctx is process context
func (ex *Executor) handleExits(ctx context.Context, cf context.CancelFunc) {
if ex.running == nil {
return
}

defer func() {
ex.running = nil
cf()
}()

select {
case <-ctx.Done():
// INFO: process exited
ex.logger.Debug("process terminated", "pid", ex.running.Pid)
// if ex.running != nil {
// }

if ex.isInteractive {
os.Exit(0)
}

case sig := <-ex.done:
// INFO: fwatcher exited
ex.logger.Debug("executor terminated", "received-signal", sig)
cf()
}

for len(ex.done) > 0 {
<-ex.done
}

if ex.isInteractive {
// INFO: interactive, only kill the process remains
ex.logger.Debug("[exec] killing process", "pid", ex.running.Pid)
ex.running.Signal(syscall.SIGKILL)
return
}

// INFO: non-interactive killing the entire child tree
ex.logger.Debug("[exec] killing process", "pid", ex.running.Pid)
if err := syscall.Kill(-ex.running.Pid, syscall.SIGKILL); err != nil {
if err.Error() != "no such process" {
ex.logger.Error("failed to kill process", "pid", ex.running.Pid, "err", err)
}
}
}

// DO NOT USE: does not work yet.
func (ex *Executor) Exec() error {
ex.logger.Debug("[exec:pre] starting process")
ex.mu.Lock()
ex.isRunning = true

defer func() {
ex.isRunning = false
ex.mu.Unlock()
}()
defer ex.mu.Unlock()

ctx, cf := context.WithCancel(context.TODO())
defer cf()

cmd := ex.newCmd(ctx)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if !ex.isInteractive {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
}

if err := cmd.Start(); err != nil {
return err
}

go func() {
select {
case <-ctx.Done():
ex.logger.Debug("process terminated", "pid", cmd.Process.Pid)
case <-ex.done:
ex.logger.Debug("executor terminated", "pid", cmd.Process.Pid)
}
for len(ex.done) > 0 {
<-ex.done
}

ex.logger.Debug("[exec] killing process", "pid", cmd.Process.Pid)
if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil {
if err.Error() != "no such process" {
ex.logger.Error("failed to kill process", "pid", cmd.Process.Pid, "err", err)
}
ex.running = cmd.Process

go ex.handleExits(ctx, cf)

// defer func() { ex.running = nil }()
// go func() {
// select {
// case <-ctx.Done():
// // INFO: process exited
// ex.logger.Debug("process terminated", "pid", cmd.Process.Pid)
// <-time.After(1 * time.Second)
// if ex.isInteractive {
// os.Exit(0)
// }
//
// case sig := <-ex.done:
// // INFO: fwatcher exited
// ex.logger.Debug("executor terminated", "received-signal", sig)
// cf()
// }
//
// for len(ex.done) > 0 {
// <-ex.done
// }
//
// if cmd.Process != nil {
// ex.logger.Debug("[exec] killing process", "pid", cmd.Process.Pid)
// cmd.Process.Signal(syscall.SIGKILL)
// }
//
// ex.logger.Debug("[exec] killing process", "pid", cmd.Process.Pid)
// if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil {
// if err.Error() != "no such process" {
// ex.logger.Error("failed to kill process", "pid", cmd.Process.Pid, "err", err)
// }
// }
// }()

// ex.logger.Debug("[exec:post] process running")
if err := cmd.Wait(); err != nil {
ex.logger.Debug("cmd terminated with", "err", err)
err2, ok := err.(*exec.ExitError)
if ok {
ex.logger.Debug("cmd terminated with", "exit-code", err2.ExitCode())
}
}()

ex.logger.Debug("[exec:post] process running")
if err := cmd.Wait(); err != nil {
if strings.HasPrefix(err.Error(), "signal:") {
ex.logger.Debug("wait terminated, received", "signal", err.Error())
}
ex.logger.Debug("while waiting, got", "err", err)
}
ex.logger.Debug("[exec] killed process", "pid", cmd.Process.Pid)

return nil
}

func (ex *Executor) Kill() {
if ex.isRunning {
if ex.running != nil {
ex.done <- os.Signal(syscall.SIGTERM)
}
}
44 changes: 19 additions & 25 deletions pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package watcher

import (
"context"
"fmt"
"log/slog"
"os"
Expand All @@ -17,14 +18,15 @@ type Watcher interface {
WatchEvents(func(event Event, fp string) error)
}

type eventInfo struct {
Time time.Time
FileInfo os.FileInfo
Counter int
}
// type eventInfo struct {
// Time time.Time
// FileInfo os.FileInfo
// Counter int
// }

type fsnWatcher struct {
watcher *fsnotify.Watcher
watcher *fsnotify.Watcher
lfContext context.Context

directoryCount int

Expand Down Expand Up @@ -93,7 +95,6 @@ func (f fsnWatcher) ignoreEvent(event fsnotify.Event) (ignore bool, reason strin
}

func (f *fsnWatcher) WatchEvents(watcherFunc func(event Event, fp string) error) {
// f.eventMap = map[string]eventInfo{}
lastProcessingTime := time.Now()
for {
select {
Expand All @@ -113,30 +114,15 @@ func (f *fsnWatcher) WatchEvents(watcherFunc func(event Event, fp string) error)

f.Logger.Debug("PROCESSING", "event.name", event.Name, "event.op", event.Op.String())

// _, err := os.Stat(event.Name)
// if err != nil {
// return
// }

// eInfo, ok := f.eventMap[event.Name]
// if !ok {
// eInfo = eventInfo{Time: time.Now(), FileInfo: nil, Counter: 0}
// }
// eInfo.Counter += 1
// f.eventMap[event.Name] = eInfo

if time.Since(lastProcessingTime) < f.cooldownDuration {
f.Logger.Debug(fmt.Sprintf("too many events under %s, ignoring...", f.cooldownDuration.String()), "event.name", event.Name)
continue
}

if err := watcherFunc(Event(event), event.Name); err != nil {
abs, _ := filepath.Abs(event.Name)
if err := watcherFunc(Event(event), abs); err != nil {
f.Logger.Error("while processing event, got", "err", err)
return
}
// eInfo.Time = time.Now()
// eInfo.Counter = 0
// f.eventMap[event.Name] = eInfo

f.Logger.Debug("watch loop completed", "took", fmt.Sprintf("%dms", time.Since(t).Milliseconds()))
}
Expand All @@ -145,6 +131,13 @@ func (f *fsnWatcher) WatchEvents(watcherFunc func(event Event, fp string) error)
return
}
f.Logger.Error("watcher error", "err", err)
case <-f.lfContext.Done():
// when fwatcher is closing, cleaning up the watcher events and closing, otherwise rase condition might lead to staring command again
for item := range f.watcher.Events {
_ = item
}
f.Logger.Debug("fwatcher is closing ...")
return
}
}
}
Expand Down Expand Up @@ -219,7 +212,7 @@ type WatcherArgs struct {
CooldownDuration *time.Duration
}

func NewWatcher(args WatcherArgs) (Watcher, error) {
func NewWatcher(ctx context.Context, args WatcherArgs) (Watcher, error) {
if args.Logger == nil {
args.Logger = slog.Default()
}
Expand Down Expand Up @@ -252,6 +245,7 @@ func NewWatcher(args WatcherArgs) (Watcher, error) {

fsw := &fsnWatcher{
watcher: watcher,
lfContext: ctx,
Logger: args.Logger,
ExcludeDirs: excludeDirs,
IgnoreSuffixes: args.IgnoreSuffixes,
Expand Down

0 comments on commit 54ae2d2

Please sign in to comment.