Skip to content

Commit

Permalink
[WIP] Fix start handing of the new module system
Browse files Browse the repository at this point in the history
  • Loading branch information
vlabo committed Jun 27, 2024
1 parent 3c9b636 commit 4d9b908
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 44 deletions.
77 changes: 74 additions & 3 deletions cmds/portmaster-core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
_ "github.com/safing/portmaster/spn/captain"
)

var sigUSR1 = syscall.Signal(0xa)

func main() {
flag.Parse()

Expand Down Expand Up @@ -59,9 +61,78 @@ func main() {
return
}
// Start
err = instance.Group.Start()
go func() {
err = instance.Group.Start()
if err != nil {
fmt.Printf("instance start failed: %s\n", err)
return
}
}()

// Wait for signal.
signalCh := make(chan os.Signal, 1)
signal.Notify(
signalCh,
os.Interrupt,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT,
sigUSR1,
)

signalLoop:
for {
select {
case sig := <-signalCh:
// Only print and continue to wait if SIGUSR1
if sig == sigUSR1 {
printStackTo(os.Stderr, "PRINTING STACK ON REQUEST")
continue signalLoop
}

fmt.Println(" <INTERRUPT>") // CLI output.
slog.Warn("program was interrupted, stopping")

// catch signals during shutdown
go func() {
forceCnt := 5
for {
<-signalCh
forceCnt--
if forceCnt > 0 {
fmt.Printf(" <INTERRUPT> again, but already shutting down - %d more to force\n", forceCnt)
} else {
printStackTo(os.Stderr, "PRINTING STACK ON FORCED EXIT")
os.Exit(1)
}
}
}()

go func() {
time.Sleep(3 * time.Minute)
printStackTo(os.Stderr, "PRINTING STACK - TAKING TOO LONG FOR SHUTDOWN")
os.Exit(1)
}()

if err := instance.Stop(); err != nil {
slog.Error("failed to stop portmaster", "err", err)
continue signalLoop
}
break signalLoop

case <-instance.Done():
break signalLoop
}
}
}

func printStackTo(writer io.Writer, msg string) {
_, err := fmt.Fprintf(writer, "===== %s =====\n", msg)
if err == nil {
err = pprof.Lookup("goroutine").WriteTo(writer, 1)
}
if err != nil {
fmt.Printf("instance start failed: %s\n", err)
return
slog.Error("failed to write stack trace", "err", err)
}
}
6 changes: 5 additions & 1 deletion service/firewall/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func prep() error {
return err
}

return prepAPIAuth()
return nil
}

func start() error {
Expand Down Expand Up @@ -136,6 +136,10 @@ func New(instance instance) (*Filter, error) {
instance: instance,
}

if err := prepAPIAuth(); err != nil {
return nil, err
}

return module, nil
}

Expand Down
12 changes: 6 additions & 6 deletions service/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Instance struct {
rng *rng.Rng
base *base.Base

core *core.Core
updates *updates.Updates
geoip *geoip.GeoIP
netenv *netenv.NetEnv
Expand Down Expand Up @@ -86,7 +87,6 @@ type Instance struct {
process *process.ProcessModule
resolver *resolver.ResolverModule
sync *sync.Sync
core *core.Core
}

// New returns a new portmaster service instance.
Expand Down Expand Up @@ -133,6 +133,10 @@ func New(version string, svcCfg *ServiceConfig) (*Instance, error) {
}

// Global service modules
instance.core, err = core.New(instance)
if err != nil {
return nil, fmt.Errorf("create core module: %w", err)
}
instance.updates, err = updates.New(instance, svcCfg.ShutdownFunc)
if err != nil {
return nil, fmt.Errorf("create updates module: %w", err)
Expand Down Expand Up @@ -245,10 +249,6 @@ func New(version string, svcCfg *ServiceConfig) (*Instance, error) {
if err != nil {
return nil, fmt.Errorf("create sync module: %w", err)
}
instance.core, err = core.New(instance)
if err != nil {
return nil, fmt.Errorf("create core module: %w", err)
}

// Add all modules to instance group.
instance.Group = mgr.NewGroup(
Expand All @@ -261,6 +261,7 @@ func New(version string, svcCfg *ServiceConfig) (*Instance, error) {
instance.rng,
instance.base,

instance.core,
instance.updates,
instance.geoip,
instance.netenv,
Expand Down Expand Up @@ -290,7 +291,6 @@ func New(version string, svcCfg *ServiceConfig) (*Instance, error) {
instance.process,
instance.resolver,
instance.sync,
instance.core,
)

// FIXME: call this before to trigger shutdown/restart event
Expand Down
60 changes: 52 additions & 8 deletions service/mgr/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,43 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
)

const (
groupStateOff int32 = iota
groupStateStarting
groupStateRunning
groupStateStopping
groupStateInvalid
)

func groupStateToString(state int32) string {
switch state {
case groupStateOff:
return "off"
case groupStateStarting:
return "starting"
case groupStateRunning:
return "running"
case groupStateStopping:
return "stopping"
case groupStateInvalid:
return "invalid"
}

return "unknown"
}

// Group describes a group of modules.
type Group struct {
modules []*groupModule

ctx context.Context
cancelCtx context.CancelFunc
ctxLock sync.Mutex

state atomic.Int32
}

type groupModule struct {
Expand Down Expand Up @@ -64,22 +92,42 @@ func NewGroup(modules ...Module) *Group {
// If a module fails to start, itself and all previous modules
// will be stopped in the reverse order.
func (g *Group) Start() error {
if !g.state.CompareAndSwap(groupStateOff, groupStateStarting) {
return fmt.Errorf("group is not off, state: %s", groupStateToString(g.state.Load()))
}

g.initGroupContext()

for i, m := range g.modules {
m.mgr.Info("starting")
err := m.module.Start(m.mgr)
if err != nil {
g.stopFrom(i)
if !g.stopFrom(i) {
g.state.Store(groupStateInvalid)
} else {
g.state.Store(groupStateOff)
}
return fmt.Errorf("failed to start %s: %w", makeModuleName(m.module), err)
}
m.mgr.Info("started")
}
g.state.Store(groupStateRunning)
return nil
}

// Stop stops all modules in the group in the reverse order.
func (g *Group) Stop() (ok bool) {
return g.stopFrom(len(g.modules) - 1)
func (g *Group) Stop() error {
if !g.state.CompareAndSwap(groupStateRunning, groupStateStopping) {
return fmt.Errorf("group is not running, state: %s", groupStateToString(g.state.Load()))
}

if !g.stopFrom(len(g.modules) - 1) {
g.state.Store(groupStateInvalid)
return errors.New("failed to stop")
}

g.state.Store(groupStateOff)
return nil
}

func (g *Group) stopFrom(index int) (ok bool) {
Expand Down Expand Up @@ -150,11 +198,7 @@ func RunModules(ctx context.Context, modules ...Module) error {

// Stop module when context is canceled.
<-ctx.Done()
if !g.Stop() {
return errors.New("failed to stop")
}

return nil
return g.Stop()
}

func makeModuleName(m Module) string {
Expand Down
12 changes: 8 additions & 4 deletions service/mgr/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *StateMgr) Add(s State) {
m.states = append(m.states, s)
}

m.statesEventMgr.Submit(m.Export())
m.statesEventMgr.Submit(m.export())
}

// Remove removes the state with the given ID.
Expand All @@ -85,7 +85,7 @@ func (m *StateMgr) Remove(id string) {
return s.ID == id
})

m.statesEventMgr.Submit(m.Export())
m.statesEventMgr.Submit(m.export())
}

// Clear removes all states.
Expand All @@ -95,14 +95,18 @@ func (m *StateMgr) Clear() {

m.states = nil

m.statesEventMgr.Submit(m.Export())
m.statesEventMgr.Submit(m.export())
}

// Export returns the current states.
func (m *StateMgr) Export() StateUpdate {
m.statesLock.Lock()
defer m.statesLock.Unlock()

return m.export()
}

// Export returns the current states.
func (m *StateMgr) export() StateUpdate {
name := ""
if m.mgr != nil {
name = m.mgr.name
Expand Down
2 changes: 1 addition & 1 deletion service/mgr/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (w *WorkerCtx) LogAttrs(level slog.Level, msg string, attrs ...slog.Attr) {
// - Panic catching.
// - Flow control helpers.
func (m *Manager) Go(name string, fn func(w *WorkerCtx) error) {
m.logger.Log(m.ctx, slog.LevelInfo, "worker started", "name", name)
// m.logger.Log(m.ctx, slog.LevelInfo, "worker started", "name", name)
go m.manageWorker(name, fn)
}

Expand Down
10 changes: 8 additions & 2 deletions service/mgr/workermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ func (s *WorkerMgr) Delay(duration time.Duration) *WorkerMgr {
defer s.actionLock.Unlock()

s.delay.Stop()
s.delay = s.newDelay(duration)
s.delay = nil
if duration > 0 {
s.delay = s.newDelay(duration)
}

s.check()
return s
Expand All @@ -276,7 +279,10 @@ func (s *WorkerMgr) Repeat(interval time.Duration) *WorkerMgr {
defer s.actionLock.Unlock()

s.repeat.Stop()
s.repeat = s.newRepeat(interval)
s.repeat = nil
if interval > 0 {
s.repeat = s.newRepeat(interval)
}

s.check()
return s
Expand Down
3 changes: 1 addition & 2 deletions service/profile/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import (
)

var (
migrations = migration.New("core:migrations/profile")
// module *modules.Module
migrations = migration.New("core:migrations/profile")
updatesPath string
)

Expand Down
8 changes: 1 addition & 7 deletions service/updates/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@ const (
)

func init() {
// FIXME:
// module = modules.Register(ModuleName, prep, start, stop, "base")
// module.RegisterEvent(VersionUpdateEvent, true)
// module.RegisterEvent(ResourceUpdateEvent, true)

flag.StringVar(&updateServerFromFlag, "update-server", "", "set an alternative update server (full URL)")
flag.StringVar(&userAgentFromFlag, "update-agent", "", "set an alternative user agent for requests to the update server")
}
Expand All @@ -111,8 +106,7 @@ func prep() error {
func start() error {
initConfig()

_ = module.mgr.Repeat("automatic restart", 10*time.Minute, automaticRestart)

module.restartWorkerMgr = module.mgr.Repeat("automatic restart", 10*time.Minute, automaticRestart)
module.instance.Config().EventConfigChange.AddCallback("update registry config", updateRegistryConfig)

// create registry
Expand Down
4 changes: 3 additions & 1 deletion service/updates/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ type Updates struct {
instance instance
shutdownFunc func(exitCode int)

updateWorkerMgr *mgr.WorkerMgr
updateWorkerMgr *mgr.WorkerMgr
restartWorkerMgr *mgr.WorkerMgr

EventResourcesUpdated *mgr.EventMgr[struct{}]
EventVersionsUpdated *mgr.EventMgr[struct{}]
Expand All @@ -29,6 +30,7 @@ func (u *Updates) Start(m *mgr.Manager) error {
u.mgr = m
u.EventResourcesUpdated = mgr.NewEventMgr[struct{}](ResourceUpdateEvent, u.mgr)
u.EventVersionsUpdated = mgr.NewEventMgr[struct{}](VersionUpdateEvent, u.mgr)

u.States = mgr.NewStateMgr(u.mgr)

return start()
Expand Down
Loading

0 comments on commit 4d9b908

Please sign in to comment.