Skip to content

Commit

Permalink
Merge pull request #8 from nxtcoder17/refactor/watch-and-execute
Browse files Browse the repository at this point in the history
Refactor/watch and execute
  • Loading branch information
nxtcoder17 authored Jan 18, 2025
2 parents 092ea9c + 0901da9 commit f4e8bb1
Show file tree
Hide file tree
Showing 9 changed files with 833 additions and 172 deletions.
12 changes: 11 additions & 1 deletion Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,19 @@ tasks:
upx ./bin/{{.binary}}
fi
test:executor:
cmds:
- go test ./pkg/executor/...

test:watcher:
env:
DEBUG: false
cmds:
- go test -json ./pkg/watcher/... | gotestfmt

build:dev:
cmds:
- go build -o ./bin/fwatcher-dev ./cmd
- go build -o ./bin/fwatcher ./cmd

example:http-server:
cmds:
Expand Down
111 changes: 21 additions & 90 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,14 @@ package main

import (
"context"
"fmt"
"log/slog"
"os"
"os/exec"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/nxtcoder17/fwatcher/pkg/executor"
fn "github.com/nxtcoder17/fwatcher/pkg/functions"
"github.com/nxtcoder17/fwatcher/pkg/logging"
"github.com/nxtcoder17/fwatcher/pkg/watcher"
"github.com/urfave/cli/v3"
Expand All @@ -25,21 +20,11 @@ var (
Version string
)

// DefaultIgnoreList is list of directories that are mostly ignored
var DefaultIgnoreList = []string{
".git", ".svn", ".hg", // version control
".idea", ".vscode", // IDEs
".direnv", // direnv nix
"node_modules", // node
".DS_Store", // macOS
".log", // logs
}

func main() {
cmd := &cli.Command{
Name: ProgramName,
UseShortOptionHandling: true,
Usage: "simple tool to run commands on filesystem change events",
Usage: "a simple tool to run things on filesystem change events",
ArgsUsage: "<Command To Run>",
Version: Version,
Flags: []cli.Flag{
Expand Down Expand Up @@ -77,7 +62,7 @@ func main() {
&cli.StringSliceFlag{
Name: "ignore-list",
Usage: "disables ignoring from default ignore list",
Value: DefaultIgnoreList,
Value: watcher.DefaultIgnoreList,
Aliases: []string{"I"},
},

Expand All @@ -92,17 +77,10 @@ func main() {
Usage: "interactive mode, with stdin",
},

&cli.BoolFlag{
Name: "sse",
Usage: "run watcher in sse mode",
},

&cli.StringFlag{
Name: "sse-addr",
HideDefault: false,
Usage: "run watcher in sse mode",
Sources: cli.ValueSourceChain{},
Value: ":12345",
Usage: "run watcher with Server Side Events (SSE) enabled",
},
},
Action: func(ctx context.Context, c *cli.Command) error {
Expand Down Expand Up @@ -161,86 +139,39 @@ func main() {
panic(err)
}

var ex executor.Executor
var executors []executor.Executor

switch {
case c.Bool("sse"):
{
sseAddr := c.String("sse-addr")
ex = executor.NewSSEExecutor(executor.SSEExecutorArgs{Addr: sseAddr})
logger.Info("HELLo world")
}
default:
{
execCmd := c.Args().First()
execArgs := c.Args().Tail()
ex = executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{
Logger: logger,
Interactive: c.Bool("interactive"),
Command: func(context.Context) *exec.Cmd {
if sseAddr := c.String("sse-addr"); sseAddr != "" {
executors = append(executors, executor.NewSSEExecutor(executor.SSEExecutorArgs{Addr: sseAddr}))
}

if c.NArg() > 0 {
execCmd := c.Args().First()
execArgs := c.Args().Tail()
executors = append(executors, executor.NewCmdExecutor(ctx, executor.CmdExecutorArgs{
Logger: logger,
Interactive: c.Bool("interactive"),
Commands: []func(context.Context) *exec.Cmd{
func(c context.Context) *exec.Cmd {
cmd := exec.CommandContext(ctx, execCmd, execArgs...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin
return cmd
},
// IsInteractive: true,
})
}
},
}))
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := ex.Start(); err != nil {
slog.Error("got", "err", err)
}
logger.Debug("1. start-job finished")
}()

counter := 0
pwd := fn.Must(os.Getwd())

wg.Add(1)
go func() {
defer wg.Done()
w.Watch(ctx)
logger.Debug("2. watch context closed")
}()

wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
ex.Stop()
logger.Debug("3. killed signal processed")
}()

for event := range w.GetEvents() {
logger.Debug("received", "event", event)
relPath, err := filepath.Rel(pwd, event.Name)
if err != nil {
return err
}
counter += 1
logger.Info(fmt.Sprintf("[RELOADING (%d)] due changes in %s", counter, relPath))

ex.OnWatchEvent(executor.Event{Source: event.Name})
if err := w.WatchAndExecute(ctx, executors); err != nil {
return err
}

// logger.Debug("stopping executor")
// if err := ex.Stop(); err != nil {
// return err
// }
// logger.Info("stopped executor")

wg.Wait()
return nil
},
}

ctx, stop := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGABRT)
ctx, stop := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

if err := cmd.Run(ctx, os.Args); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
pre-commit

go_1_22
gotestfmt

upx
go-task
];
Expand Down
127 changes: 78 additions & 49 deletions pkg/executor/cmd-executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@ import (
type CmdExecutor struct {
logger *slog.Logger
parentCtx context.Context
newCmd func(context.Context) *exec.Cmd
commands []func(context.Context) *exec.Cmd

interactive bool

mu sync.Mutex
abort func()
mu sync.Mutex

kill func() error
}

type CmdExecutorArgs struct {
Logger *slog.Logger
Command func(context.Context) *exec.Cmd
Commands []func(context.Context) *exec.Cmd
Interactive bool
}

Expand All @@ -33,8 +34,8 @@ func NewCmdExecutor(ctx context.Context, args CmdExecutorArgs) *CmdExecutor {

return &CmdExecutor{
parentCtx: ctx,
logger: args.Logger.With("component", "cmd-executor"),
newCmd: args.Command,
logger: args.Logger,
commands: args.Commands,
mu: sync.Mutex{},
interactive: args.Interactive,
}
Expand All @@ -47,73 +48,101 @@ func (ex *CmdExecutor) OnWatchEvent(ev Event) error {
return nil
}

// Start implements Executor.
func (ex *CmdExecutor) Start() error {
ex.mu.Lock()
ctx, cf := context.WithCancel(ex.parentCtx)
ex.abort = cf
ex.mu.Unlock()

cmd := ex.newCmd(ctx)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if ex.interactive {
cmd.Stdin = os.Stdin
cmd.SysProcAttr.Foreground = true
func killPID(pid int, logger ...*slog.Logger) error {
var l *slog.Logger
if len(logger) > 0 {
l = logger[0]
} else {
l = slog.Default()
}

if err := cmd.Start(); err != nil {
l.Debug("about to kill", "process", pid)
if err := syscall.Kill(-pid, syscall.SIGKILL); err != nil {
if err == syscall.ESRCH {
return nil
}
l.Error("failed to kill, got", "err", err)
return err
}
return nil
}

done := make(chan error)
go func() {
done <- cmd.Wait()
}()
// Start implements Executor.
func (ex *CmdExecutor) Start() error {
ex.mu.Lock()
defer ex.mu.Unlock()
for i := range ex.commands {
if err := ex.parentCtx.Err(); err != nil {
return err
}

select {
case <-ctx.Done():
ex.logger.Debug("process context done")
case err := <-done:
ex.logger.Debug("process wait completed, got", "err", err)
}
ctx, cf := context.WithCancel(ex.parentCtx)
defer cf()

ex.logger.Debug("process", "pid", cmd.Process.Pid)
cmd := ex.commands[i](ctx)

cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if ex.interactive {
cmd.Stdin = os.Stdin
cmd.SysProcAttr.Foreground = true
}

if ex.interactive {
// Send SIGTERM to the interactive process, as user will see it on his screen
proc, err := os.FindProcess(os.Getpid())
if err != nil {
if err := cmd.Start(); err != nil {
return err
}

err = proc.Signal(syscall.SIGTERM)
if err != nil {
if err != syscall.ESRCH {
ex.logger.Error("failed to kill, got", "err", err)
logger := ex.logger.With("pid", cmd.Process.Pid, "command", i+1)

ex.kill = func() error {
return killPID(cmd.Process.Pid, logger)
}

go func() {
if err := cmd.Wait(); err != nil {
logger.Debug("process finished (wait completed), got", "err", err)
}
cf()
}()

select {
case <-ctx.Done():
logger.Debug("process finished (context cancelled)")
case <-ex.parentCtx.Done():
logger.Debug("process finished (parent context cancelled)")
}

if ex.interactive {
// Send SIGTERM to the interactive process, as user will see it on his screen
proc, err := os.FindProcess(os.Getpid())
if err != nil {
return err
}

err = proc.Signal(syscall.SIGTERM)
if err != nil {
if err != syscall.ESRCH {
logger.Error("failed to kill, got", "err", err)
return err
}
return err
}
return err
}
}

if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil {
if err == syscall.ESRCH {
return nil
if err := ex.kill(); err != nil {
return err
}
ex.logger.Error("failed to kill, got", "err", err)
return err

logger.Debug("command fully executed and processed")
}

return nil
}

// Stop implements Executor.
func (ex *CmdExecutor) Stop() error {
ex.mu.Lock()
if ex.abort != nil {
ex.abort()
if ex.kill != nil {
return ex.kill()
}
ex.mu.Unlock()
return nil
}

Expand Down
Loading

0 comments on commit f4e8bb1

Please sign in to comment.