Skip to content

Commit

Permalink
[WIP] Improve startup process
Browse files Browse the repository at this point in the history
  • Loading branch information
vlabo committed Jul 2, 2024
1 parent 4d9b908 commit b9edb7e
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 102 deletions.
7 changes: 7 additions & 0 deletions cmds/portmaster-core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ package main
import (
"flag"
"fmt"
"io"
"log/slog"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"syscall"
"time"

"github.com/safing/portmaster/base/info"
"github.com/safing/portmaster/base/log"
Expand Down
6 changes: 4 additions & 2 deletions service/firewall/interception/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ func stop() error {
}

close(metrics.done)

return stopInterception()
if err := stopInterception(); err != nil {
log.Errorf("failed to stop interception module: %s", err)
}
return nil
}

var (
Expand Down
10 changes: 5 additions & 5 deletions service/firewall/interception/nfqueue_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,30 +258,30 @@ func StartNfqueueInterception(packets chan<- packet.Packet) (err error) {

err = activateNfqueueFirewall()
if err != nil {
_ = StopNfqueueInterception()
// _ = StopNfqueueInterception()
return fmt.Errorf("could not initialize nfqueue: %w", err)
}

out4Queue, err = nfq.New(17040, false)
if err != nil {
_ = StopNfqueueInterception()
// _ = StopNfqueueInterception()
return fmt.Errorf("nfqueue(IPv4, out): %w", err)
}
in4Queue, err = nfq.New(17140, false)
if err != nil {
_ = StopNfqueueInterception()
// _ = StopNfqueueInterception()
return fmt.Errorf("nfqueue(IPv4, in): %w", err)
}

if netenv.IPv6Enabled() {
out6Queue, err = nfq.New(17060, true)
if err != nil {
_ = StopNfqueueInterception()
// _ = StopNfqueueInterception()
return fmt.Errorf("nfqueue(IPv6, out): %w", err)
}
in6Queue, err = nfq.New(17160, true)
if err != nil {
_ = StopNfqueueInterception()
// _ = StopNfqueueInterception()
return fmt.Errorf("nfqueue(IPv6, in): %w", err)
}
} else {
Expand Down
15 changes: 12 additions & 3 deletions service/mgr/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"
)

const (
Expand Down Expand Up @@ -100,7 +101,11 @@ func (g *Group) Start() error {

for i, m := range g.modules {
m.mgr.Info("starting")
err := m.module.Start(m.mgr)
startTime := time.Now()

err := m.mgr.Do(m.mgr.name+" Start", func(_ *WorkerCtx) error {
return m.module.Start(m.mgr)
})
if err != nil {
if !g.stopFrom(i) {
g.state.Store(groupStateInvalid)
Expand All @@ -109,7 +114,8 @@ func (g *Group) Start() error {
}
return fmt.Errorf("failed to start %s: %w", makeModuleName(m.module), err)
}
m.mgr.Info("started")
duration := time.Since(startTime)
m.mgr.Info("started " + duration.String())
}
g.state.Store(groupStateRunning)
return nil
Expand All @@ -134,7 +140,10 @@ func (g *Group) stopFrom(index int) (ok bool) {
ok = true
for i := index; i >= 0; i-- {
m := g.modules[i]
err := m.module.Stop(m.mgr)

err := m.mgr.Do(m.mgr.name+" Stop", func(_ *WorkerCtx) error {
return m.module.Stop(m.mgr)
})
if err != nil {
m.mgr.Error("failed to stop", "err", err)
ok = false
Expand Down
2 changes: 1 addition & 1 deletion service/mgr/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (m *Manager) Do(name string, fn func(w *WorkerCtx) error) error {
return nil

case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
// A canceled context or dexceeded eadline also means that the worker is finished.
// A canceled context or exceeded deadline also means that the worker is finished.
return err

default:
Expand Down
3 changes: 2 additions & 1 deletion service/network/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func connectionCleaner(ctx *mgr.WorkerCtx) error {
func cleanConnections() (activePIDs map[int]struct{}) {
activePIDs = make(map[int]struct{})

module.mgr.Go("clean connections", func(ctx *mgr.WorkerCtx) error {
// FIXME(vladimir): This was previously a MicroTask but it does not seem right, to run it asynchronously. Is'nt activePIDs going to be used after the function is called?
_ = module.mgr.Do("clean connections", func(ctx *mgr.WorkerCtx) error {
now := time.Now().UTC()
nowUnix := now.Unix()
ignoreNewer := nowUnix - 2
Expand Down
114 changes: 58 additions & 56 deletions spn/captain/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,75 +128,77 @@ func start() error {
ships.EnableMasking(maskingBytes)

// Initialize intel.
if err := registerIntelUpdateHook(); err != nil {
return err
}
if err := updateSPNIntel(module.mgr.Ctx(), nil); err != nil {
log.Errorf("spn/captain: failed to update SPN intel: %s", err)
}

// Initialize identity and piers.
if conf.PublicHub() {
// Load identity.
if err := loadPublicIdentity(); err != nil {
// We cannot recover from this, set controlled failure (do not retry).
module.shutdownFunc(controlledFailureExitCode)

module.mgr.Go("start", func(wc *mgr.WorkerCtx) error {
if err := registerIntelUpdateHook(); err != nil {
return err
}
if err := updateSPNIntel(module.mgr.Ctx(), nil); err != nil {
log.Errorf("spn/captain: failed to update SPN intel: %s", err)
}

// Initialize identity and piers.
if conf.PublicHub() {
// Load identity.
if err := loadPublicIdentity(); err != nil {
// We cannot recover from this, set controlled failure (do not retry).
module.shutdownFunc(controlledFailureExitCode)

// Check if any networks are configured.
if !conf.HubHasIPv4() && !conf.HubHasIPv6() {
// We cannot recover from this, set controlled failure (do not retry).
module.shutdownFunc(controlledFailureExitCode)
return err
}

// Check if any networks are configured.
if !conf.HubHasIPv4() && !conf.HubHasIPv6() {
// We cannot recover from this, set controlled failure (do not retry).
module.shutdownFunc(controlledFailureExitCode)

return errors.New("no IP addresses for Hub configured (or detected)")
}

return errors.New("no IP addresses for Hub configured (or detected)")
// Start management of identity and piers.
if err := prepPublicIdentityMgmt(); err != nil {
return err
}
// Set ID to display on http info page.
ships.DisplayHubID = publicIdentity.ID
// Start listeners.
if err := startPiers(); err != nil {
return err
}

// Enable connect operation.
crew.EnableConnecting(publicIdentity.Hub)
}

// Start management of identity and piers.
if err := prepPublicIdentityMgmt(); err != nil {
// Subscribe to updates of cranes.
startDockHooks()

// bootstrapping
if err := processBootstrapHubFlag(); err != nil {
return err
}
// Set ID to display on http info page.
ships.DisplayHubID = publicIdentity.ID
// Start listeners.
if err := startPiers(); err != nil {
if err := processBootstrapFileFlag(); err != nil {
return err
}

// Enable connect operation.
crew.EnableConnecting(publicIdentity.Hub)
}

// Subscribe to updates of cranes.
startDockHooks()

// bootstrapping
if err := processBootstrapHubFlag(); err != nil {
return err
}
if err := processBootstrapFileFlag(); err != nil {
return err
}

// network optimizer
if conf.PublicHub() {
module.mgr.Delay("optimize network delay", 15*time.Second, optimizeNetwork).Repeat(1 * time.Minute)
}

// client + home hub manager
if conf.Client() {
module.mgr.Go("client manager", clientManager)
// network optimizer
if conf.PublicHub() {
module.mgr.Delay("optimize network delay", 15*time.Second, optimizeNetwork).Repeat(1 * time.Minute)
}

// Reset failing hubs when the network changes while not connected.
module.instance.NetEnv().EventNetworkChange.AddCallback("reset failing hubs", func(_ *mgr.WorkerCtx, _ struct{}) (bool, error) {
if ready.IsNotSet() {
navigator.Main.ResetFailingStates(module.mgr.Ctx())
}
return false, nil
})
}
// client + home hub manager
if conf.Client() {
module.mgr.Go("client manager", clientManager)

// Reset failing hubs when the network changes while not connected.
module.instance.NetEnv().EventNetworkChange.AddCallback("reset failing hubs", func(_ *mgr.WorkerCtx, _ struct{}) (bool, error) {
if ready.IsNotSet() {
navigator.Main.ResetFailingStates(module.mgr.Ctx())
}
return false, nil
})
}
return nil
})
return nil
}

Expand Down
71 changes: 37 additions & 34 deletions spn/navigator/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,49 +80,52 @@ func start() error {
return err
}

// Wait for geoip databases to be ready.
// Try again if not yet ready, as this is critical.
// The "wait" parameter times out after 1 second.
// Allow 30 seconds for both databases to load.
geoInitCheck:
for i := 0; i < 30; i++ {
switch {
case !geoip.IsInitialized(false, true): // First, IPv4.
case !geoip.IsInitialized(true, true): // Then, IPv6.
default:
break geoInitCheck
module.mgr.Go("initializing hubs", func(wc *mgr.WorkerCtx) error {
// Wait for geoip databases to be ready.
// Try again if not yet ready, as this is critical.
// The "wait" parameter times out after 1 second.
// Allow 30 seconds for both databases to load.
geoInitCheck:
for i := 0; i < 30; i++ {
switch {
case !geoip.IsInitialized(false, true): // First, IPv4.
case !geoip.IsInitialized(true, true): // Then, IPv6.
default:
break geoInitCheck
}
}
}

err = Main.InitializeFromDatabase()
if err != nil {
// Wait for three seconds, then try again.
time.Sleep(3 * time.Second)
err = Main.InitializeFromDatabase()
if err != nil {
// Even if the init fails, we can try to start without it and get data along the way.
log.Warningf("spn/navigator: %s", err)
// Wait for three seconds, then try again.
time.Sleep(3 * time.Second)
err = Main.InitializeFromDatabase()
if err != nil {
// Even if the init fails, we can try to start without it and get data along the way.
log.Warningf("spn/navigator: %s", err)
}
}
err = Main.RegisterHubUpdateHook()
if err != nil {
return err
}
}
err = Main.RegisterHubUpdateHook()
if err != nil {
return err
}

// TODO: delete superseded hubs after x amount of time
_ = module.mgr.Delay("update states", 3*time.Minute, Main.updateStates).Repeat(1 * time.Hour)
_ = module.mgr.Delay("update failing states", 3*time.Minute, Main.updateFailingStates).Repeat(1 * time.Minute)
// TODO: delete superseded hubs after x amount of time
_ = module.mgr.Delay("update states", 3*time.Minute, Main.updateStates).Repeat(1 * time.Hour)
_ = module.mgr.Delay("update failing states", 3*time.Minute, Main.updateFailingStates).Repeat(1 * time.Minute)

if conf.PublicHub() {
// Only measure Hubs on public Hubs.
module.mgr.Delay("measure hubs", 5*time.Minute, Main.measureHubs).Repeat(1 * time.Minute)
if conf.PublicHub() {
// Only measure Hubs on public Hubs.
module.mgr.Delay("measure hubs", 5*time.Minute, Main.measureHubs).Repeat(1 * time.Minute)

// Only register metrics on Hubs, as they only make sense there.
err := registerMetrics()
if err != nil {
return err
// Only register metrics on Hubs, as they only make sense there.
err := registerMetrics()
if err != nil {
return err
}
}
}
return nil
})

return nil
}
Expand Down

0 comments on commit b9edb7e

Please sign in to comment.