From b9edb7ea1f4804a343a2a836ec7bda6e7c2447de Mon Sep 17 00:00:00 2001 From: Vladimir Stoilov Date: Tue, 2 Jul 2024 12:16:46 +0300 Subject: [PATCH] [WIP] Improve startup process --- cmds/portmaster-core/main.go | 7 ++ service/firewall/interception/module.go | 6 +- .../firewall/interception/nfqueue_linux.go | 10 +- service/mgr/module.go | 15 ++- service/mgr/worker.go | 2 +- service/network/clean.go | 3 +- spn/captain/module.go | 114 +++++++++--------- spn/navigator/module.go | 71 +++++------ 8 files changed, 126 insertions(+), 102 deletions(-) diff --git a/cmds/portmaster-core/main.go b/cmds/portmaster-core/main.go index b200a7d44..b61173cbe 100644 --- a/cmds/portmaster-core/main.go +++ b/cmds/portmaster-core/main.go @@ -4,7 +4,14 @@ package main import ( "flag" "fmt" + "io" + "log/slog" + "os" + "os/signal" "runtime" + "runtime/pprof" + "syscall" + "time" "github.com/safing/portmaster/base/info" "github.com/safing/portmaster/base/log" diff --git a/service/firewall/interception/module.go b/service/firewall/interception/module.go index 189d1bfcb..76964f1af 100644 --- a/service/firewall/interception/module.go +++ b/service/firewall/interception/module.go @@ -68,8 +68,10 @@ func stop() error { } close(metrics.done) - - return stopInterception() + if err := stopInterception(); err != nil { + log.Errorf("failed to stop interception module: %s", err) + } + return nil } var ( diff --git a/service/firewall/interception/nfqueue_linux.go b/service/firewall/interception/nfqueue_linux.go index cbaad7cce..bff94fe28 100644 --- a/service/firewall/interception/nfqueue_linux.go +++ b/service/firewall/interception/nfqueue_linux.go @@ -258,30 +258,30 @@ func StartNfqueueInterception(packets chan<- packet.Packet) (err error) { err = activateNfqueueFirewall() if err != nil { - _ = StopNfqueueInterception() + // _ = StopNfqueueInterception() return fmt.Errorf("could not initialize nfqueue: %w", err) } out4Queue, err = nfq.New(17040, false) if err != nil { - _ = StopNfqueueInterception() + // _ = StopNfqueueInterception() return fmt.Errorf("nfqueue(IPv4, out): %w", err) } in4Queue, err = nfq.New(17140, false) if err != nil { - _ = StopNfqueueInterception() + // _ = StopNfqueueInterception() return fmt.Errorf("nfqueue(IPv4, in): %w", err) } if netenv.IPv6Enabled() { out6Queue, err = nfq.New(17060, true) if err != nil { - _ = StopNfqueueInterception() + // _ = StopNfqueueInterception() return fmt.Errorf("nfqueue(IPv6, out): %w", err) } in6Queue, err = nfq.New(17160, true) if err != nil { - _ = StopNfqueueInterception() + // _ = StopNfqueueInterception() return fmt.Errorf("nfqueue(IPv6, in): %w", err) } } else { diff --git a/service/mgr/module.go b/service/mgr/module.go index 4c2d0e862..e900eb2a8 100644 --- a/service/mgr/module.go +++ b/service/mgr/module.go @@ -8,6 +8,7 @@ import ( "strings" "sync" "sync/atomic" + "time" ) const ( @@ -100,7 +101,11 @@ func (g *Group) Start() error { for i, m := range g.modules { m.mgr.Info("starting") - err := m.module.Start(m.mgr) + startTime := time.Now() + + err := m.mgr.Do(m.mgr.name+" Start", func(_ *WorkerCtx) error { + return m.module.Start(m.mgr) + }) if err != nil { if !g.stopFrom(i) { g.state.Store(groupStateInvalid) @@ -109,7 +114,8 @@ func (g *Group) Start() error { } return fmt.Errorf("failed to start %s: %w", makeModuleName(m.module), err) } - m.mgr.Info("started") + duration := time.Since(startTime) + m.mgr.Info("started " + duration.String()) } g.state.Store(groupStateRunning) return nil @@ -134,7 +140,10 @@ func (g *Group) stopFrom(index int) (ok bool) { ok = true for i := index; i >= 0; i-- { m := g.modules[i] - err := m.module.Stop(m.mgr) + + err := m.mgr.Do(m.mgr.name+" Stop", func(_ *WorkerCtx) error { + return m.module.Stop(m.mgr) + }) if err != nil { m.mgr.Error("failed to stop", "err", err) ok = false diff --git a/service/mgr/worker.go b/service/mgr/worker.go index 802af72a2..22d553927 100644 --- a/service/mgr/worker.go +++ b/service/mgr/worker.go @@ -229,7 +229,7 @@ func (m *Manager) Do(name string, fn func(w *WorkerCtx) error) error { return nil case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): - // A canceled context or dexceeded eadline also means that the worker is finished. + // A canceled context or exceeded deadline also means that the worker is finished. return err default: diff --git a/service/network/clean.go b/service/network/clean.go index 962fef332..61767dc2c 100644 --- a/service/network/clean.go +++ b/service/network/clean.go @@ -53,7 +53,8 @@ func connectionCleaner(ctx *mgr.WorkerCtx) error { func cleanConnections() (activePIDs map[int]struct{}) { activePIDs = make(map[int]struct{}) - module.mgr.Go("clean connections", func(ctx *mgr.WorkerCtx) error { + // FIXME(vladimir): This was previously a MicroTask but it does not seem right, to run it asynchronously. Is'nt activePIDs going to be used after the function is called? + _ = module.mgr.Do("clean connections", func(ctx *mgr.WorkerCtx) error { now := time.Now().UTC() nowUnix := now.Unix() ignoreNewer := nowUnix - 2 diff --git a/spn/captain/module.go b/spn/captain/module.go index 4c82c1fc2..ca2b84875 100644 --- a/spn/captain/module.go +++ b/spn/captain/module.go @@ -128,75 +128,77 @@ func start() error { ships.EnableMasking(maskingBytes) // Initialize intel. - if err := registerIntelUpdateHook(); err != nil { - return err - } - if err := updateSPNIntel(module.mgr.Ctx(), nil); err != nil { - log.Errorf("spn/captain: failed to update SPN intel: %s", err) - } - - // Initialize identity and piers. - if conf.PublicHub() { - // Load identity. - if err := loadPublicIdentity(); err != nil { - // We cannot recover from this, set controlled failure (do not retry). - module.shutdownFunc(controlledFailureExitCode) - + module.mgr.Go("start", func(wc *mgr.WorkerCtx) error { + if err := registerIntelUpdateHook(); err != nil { return err } + if err := updateSPNIntel(module.mgr.Ctx(), nil); err != nil { + log.Errorf("spn/captain: failed to update SPN intel: %s", err) + } + + // Initialize identity and piers. + if conf.PublicHub() { + // Load identity. + if err := loadPublicIdentity(); err != nil { + // We cannot recover from this, set controlled failure (do not retry). + module.shutdownFunc(controlledFailureExitCode) - // Check if any networks are configured. - if !conf.HubHasIPv4() && !conf.HubHasIPv6() { - // We cannot recover from this, set controlled failure (do not retry). - module.shutdownFunc(controlledFailureExitCode) + return err + } + + // Check if any networks are configured. + if !conf.HubHasIPv4() && !conf.HubHasIPv6() { + // We cannot recover from this, set controlled failure (do not retry). + module.shutdownFunc(controlledFailureExitCode) + + return errors.New("no IP addresses for Hub configured (or detected)") + } - return errors.New("no IP addresses for Hub configured (or detected)") + // Start management of identity and piers. + if err := prepPublicIdentityMgmt(); err != nil { + return err + } + // Set ID to display on http info page. + ships.DisplayHubID = publicIdentity.ID + // Start listeners. + if err := startPiers(); err != nil { + return err + } + + // Enable connect operation. + crew.EnableConnecting(publicIdentity.Hub) } - // Start management of identity and piers. - if err := prepPublicIdentityMgmt(); err != nil { + // Subscribe to updates of cranes. + startDockHooks() + + // bootstrapping + if err := processBootstrapHubFlag(); err != nil { return err } - // Set ID to display on http info page. - ships.DisplayHubID = publicIdentity.ID - // Start listeners. - if err := startPiers(); err != nil { + if err := processBootstrapFileFlag(); err != nil { return err } - // Enable connect operation. - crew.EnableConnecting(publicIdentity.Hub) - } - - // Subscribe to updates of cranes. - startDockHooks() - - // bootstrapping - if err := processBootstrapHubFlag(); err != nil { - return err - } - if err := processBootstrapFileFlag(); err != nil { - return err - } - - // network optimizer - if conf.PublicHub() { - module.mgr.Delay("optimize network delay", 15*time.Second, optimizeNetwork).Repeat(1 * time.Minute) - } - - // client + home hub manager - if conf.Client() { - module.mgr.Go("client manager", clientManager) + // network optimizer + if conf.PublicHub() { + module.mgr.Delay("optimize network delay", 15*time.Second, optimizeNetwork).Repeat(1 * time.Minute) + } - // Reset failing hubs when the network changes while not connected. - module.instance.NetEnv().EventNetworkChange.AddCallback("reset failing hubs", func(_ *mgr.WorkerCtx, _ struct{}) (bool, error) { - if ready.IsNotSet() { - navigator.Main.ResetFailingStates(module.mgr.Ctx()) - } - return false, nil - }) - } + // client + home hub manager + if conf.Client() { + module.mgr.Go("client manager", clientManager) + // Reset failing hubs when the network changes while not connected. + module.instance.NetEnv().EventNetworkChange.AddCallback("reset failing hubs", func(_ *mgr.WorkerCtx, _ struct{}) (bool, error) { + if ready.IsNotSet() { + navigator.Main.ResetFailingStates(module.mgr.Ctx()) + } + return false, nil + }) + } + return nil + }) return nil } diff --git a/spn/navigator/module.go b/spn/navigator/module.go index 41a2cc703..b763b893d 100644 --- a/spn/navigator/module.go +++ b/spn/navigator/module.go @@ -80,49 +80,52 @@ func start() error { return err } - // Wait for geoip databases to be ready. - // Try again if not yet ready, as this is critical. - // The "wait" parameter times out after 1 second. - // Allow 30 seconds for both databases to load. -geoInitCheck: - for i := 0; i < 30; i++ { - switch { - case !geoip.IsInitialized(false, true): // First, IPv4. - case !geoip.IsInitialized(true, true): // Then, IPv6. - default: - break geoInitCheck + module.mgr.Go("initializing hubs", func(wc *mgr.WorkerCtx) error { + // Wait for geoip databases to be ready. + // Try again if not yet ready, as this is critical. + // The "wait" parameter times out after 1 second. + // Allow 30 seconds for both databases to load. + geoInitCheck: + for i := 0; i < 30; i++ { + switch { + case !geoip.IsInitialized(false, true): // First, IPv4. + case !geoip.IsInitialized(true, true): // Then, IPv6. + default: + break geoInitCheck + } } - } - err = Main.InitializeFromDatabase() - if err != nil { - // Wait for three seconds, then try again. - time.Sleep(3 * time.Second) err = Main.InitializeFromDatabase() if err != nil { - // Even if the init fails, we can try to start without it and get data along the way. - log.Warningf("spn/navigator: %s", err) + // Wait for three seconds, then try again. + time.Sleep(3 * time.Second) + err = Main.InitializeFromDatabase() + if err != nil { + // Even if the init fails, we can try to start without it and get data along the way. + log.Warningf("spn/navigator: %s", err) + } + } + err = Main.RegisterHubUpdateHook() + if err != nil { + return err } - } - err = Main.RegisterHubUpdateHook() - if err != nil { - return err - } - // TODO: delete superseded hubs after x amount of time - _ = module.mgr.Delay("update states", 3*time.Minute, Main.updateStates).Repeat(1 * time.Hour) - _ = module.mgr.Delay("update failing states", 3*time.Minute, Main.updateFailingStates).Repeat(1 * time.Minute) + // TODO: delete superseded hubs after x amount of time + _ = module.mgr.Delay("update states", 3*time.Minute, Main.updateStates).Repeat(1 * time.Hour) + _ = module.mgr.Delay("update failing states", 3*time.Minute, Main.updateFailingStates).Repeat(1 * time.Minute) - if conf.PublicHub() { - // Only measure Hubs on public Hubs. - module.mgr.Delay("measure hubs", 5*time.Minute, Main.measureHubs).Repeat(1 * time.Minute) + if conf.PublicHub() { + // Only measure Hubs on public Hubs. + module.mgr.Delay("measure hubs", 5*time.Minute, Main.measureHubs).Repeat(1 * time.Minute) - // Only register metrics on Hubs, as they only make sense there. - err := registerMetrics() - if err != nil { - return err + // Only register metrics on Hubs, as they only make sense there. + err := registerMetrics() + if err != nil { + return err + } } - } + return nil + }) return nil }