From ffbb78475dbe88b330bb937d8e54b70e2c3cdaea Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Tue, 7 Jan 2025 09:24:52 +0100 Subject: [PATCH 1/4] federator: Close sockets on SIGINT/SIGTERM (#4398) * testMigratingPasswordHashingAlgorithm: Disable suspending inactive users The test takes long and suspending inactive users causes flakes * integration: Add HasCallStack to functions for starting dynamic backends * federator: Close sockets on SIGINT/SIGTERM * integration: Delete ensureFederatorPortIsFree The function could potentially kill other services as they could inherit a leaked federator socket FD. Now that we handle signals in the federator, this should not be problem just like other services. --- .../5-internal/federator-signal-handling | 1 + integration/test/Test/User.hs | 8 ++- integration/test/Testlib/ModService.hs | 58 +------------------ .../federator/src/Federator/ExternalServer.hs | 6 +- .../federator/src/Federator/InternalServer.hs | 6 +- .../federator/src/Federator/Interpreter.hs | 11 +++- services/federator/src/Federator/Run.hs | 17 +++++- 7 files changed, 38 insertions(+), 69 deletions(-) create mode 100644 changelog.d/5-internal/federator-signal-handling diff --git a/changelog.d/5-internal/federator-signal-handling b/changelog.d/5-internal/federator-signal-handling new file mode 100644 index 00000000000..228dd91de64 --- /dev/null +++ b/changelog.d/5-internal/federator-signal-handling @@ -0,0 +1 @@ +federator: Install signal handlers for SIGINT and SIGTERM, close sockets when receiving these signals \ No newline at end of file diff --git a/integration/test/Test/User.hs b/integration/test/Test/User.hs index 548c34b1daf..c318d0e55a3 100644 --- a/integration/test/Test/User.hs +++ b/integration/test/Test/User.hs @@ -190,12 +190,16 @@ testMigratingPasswordHashingAlgorithm = do ] cfgArgon2id = def - { brigCfg = setField "settings.setPasswordHashingOptions" argon2idOpts, + { brigCfg = + setField "settings.setPasswordHashingOptions" argon2idOpts + >=> removeField "optSettings.setSuspendInactiveUsers", galleyCfg = setField "settings.passwordHashingOptions" argon2idOpts } cfgScrypt = def - { brigCfg = setField "settings.setPasswordHashingOptions.algorithm" "scrypt", + { brigCfg = + setField "settings.setPasswordHashingOptions.algorithm" "scrypt" + >=> removeField "optSettings.setSuspendInactiveUsers", galleyCfg = setField "settings.passwordHashingOptions.algorithm" "scrypt" } resourcePool <- asks (.resourcePool) diff --git a/integration/test/Testlib/ModService.hs b/integration/test/Testlib/ModService.hs index 2e6d1dce6a8..7d2356b9711 100644 --- a/integration/test/Testlib/ModService.hs +++ b/integration/test/Testlib/ModService.hs @@ -18,7 +18,6 @@ import Control.Monad.Extra import Control.Monad.Reader import Control.Retry (fibonacciBackoff, limitRetriesByCumulativeDelay, retrying) import Data.Aeson hiding ((.=)) -import qualified Data.Attoparsec.Text as Parser import Data.Default import Data.Foldable import Data.Function @@ -30,17 +29,14 @@ import Data.String.Conversions (cs) import qualified Data.Text as Text import qualified Data.Text.IO as Text import Data.Traversable -import Data.Word import qualified Data.Yaml as Yaml import GHC.Stack import qualified Network.HTTP.Client as HTTP import System.Directory (copyFile, createDirectoryIfMissing, doesDirectoryExist, doesFileExist, listDirectory, removeDirectoryRecursive, removeFile) -import System.Exit import System.FilePath import System.IO import System.IO.Temp (createTempDirectory, writeTempFile) import System.Posix (keyboardSignal, killProcess, signalProcess) -import System.Posix.Types import System.Process import Testlib.App import Testlib.HTTP @@ -49,7 +45,6 @@ import Testlib.Printing import Testlib.ResourcePool import Testlib.Types import Text.RawString.QQ -import qualified UnliftIO import Prelude withModifiedBackend :: (HasCallStack) => ServiceOverrides -> ((HasCallStack) => String -> App a) -> App a @@ -123,11 +118,11 @@ traverseConcurrentlyCodensity f args = do pure result -startDynamicBackends :: [ServiceOverrides] -> ([String] -> App a) -> App a +startDynamicBackends :: (HasCallStack) => [ServiceOverrides] -> ([String] -> App a) -> App a startDynamicBackends beOverrides k = do startDynamicBackendsReturnResources beOverrides (\resources -> k $ map (.berDomain) resources) -startDynamicBackendsReturnResources :: [ServiceOverrides] -> ([BackendResource] -> App a) -> App a +startDynamicBackendsReturnResources :: (HasCallStack) => [ServiceOverrides] -> ([BackendResource] -> App a) -> App a startDynamicBackendsReturnResources beOverrides k = do let startDynamicBackendsCodensity = do when (Prelude.length beOverrides > 3) $ lift $ failApp "Too many backends. Currently only 3 are supported." @@ -271,58 +266,9 @@ startBackend :: ServiceOverrides -> Codensity App () startBackend resource overrides = do - lift $ ensureFederatorPortIsFree resource traverseConcurrentlyCodensity (withProcess resource overrides) allServices lift $ ensureBackendReachable resource.berDomain --- | Using ss because it is most convenient. Checking if a port is free in Haskell involves binding to it which is not what we want. -ensureFederatorPortIsFree :: BackendResource -> App () -ensureFederatorPortIsFree resource = do - serviceMap <- getServiceMap resource.berDomain - let federatorExternalPort :: Word16 = serviceMap.federatorExternal.port - env <- ask - UnliftIO.timeout (env.timeOutSeconds * 1_000_000) (check federatorExternalPort) >>= \case - Nothing -> assertFailure $ "timeout waiting for federator port to be free: " <> show federatorExternalPort - Just _ -> pure () - where - check :: Word16 -> App () - check federatorExternalPort = do - env <- ask - let process = (proc "lsof" ["-Q", "-Fpc", "-i", ":" <> show federatorExternalPort, "-s", "TCP:LISTEN"]) {std_out = CreatePipe, std_err = CreatePipe} - (_, Just stdoutHdl, Just stderrHdl, ph) <- liftIO $ createProcess process - let prefix = "[" <> "lsof" <> "@" <> resource.berDomain <> maybe "" (":" <>) env.currentTestName <> "] " - liftIO $ void $ forkIO $ logToConsole id prefix stderrHdl - exitCode <- liftIO $ waitForProcess ph - case exitCode of - ExitFailure _ -> assertFailure $ prefix <> "lsof failed to figure out if federator port is free" - ExitSuccess -> do - lsofOutput <- liftIO $ hGetContents stdoutHdl - case parseLsof (fromString lsofOutput) of - Right ((processId, processName) : _) -> do - liftIO $ putStrLn $ prefix <> "Found a process listening on port: " <> show federatorExternalPort <> ", killing the process: " <> show processName <> ", pid: " <> show processId - liftIO $ signalProcess killProcess processId - liftIO $ threadDelay 100_000 - check federatorExternalPort - Right [] -> pure () - Left e -> assertFailure $ prefix <> "Failed while parsing lsof output with error: " <> e <> "\n" <> "lsof output:\n" <> lsofOutput - --- | Example lsof output: --- --- @ --- p61317 --- cfederator --- @ -parseLsof :: String -> Either String [(ProcessID, String)] -parseLsof output = - Parser.parseOnly ((Parser.sepBy lsofParser (Parser.char '\n')) <* Parser.endOfInput) (fromString output) - where - lsofParser :: Parser.Parser (ProcessID, String) - lsofParser = - (,) <$> processIdParser <* Parser.char '\n' <*> processNameParser - - processIdParser = Parser.char 'p' *> Parser.decimal - processNameParser = Parser.char 'c' *> Parser.many1 (Parser.satisfy (/= '\n')) - ensureBackendReachable :: (HasCallStack) => String -> App () ensureBackendReachable domain = do env <- ask diff --git a/services/federator/src/Federator/ExternalServer.hs b/services/federator/src/Federator/ExternalServer.hs index 238fc493c35..fb2bdb44960 100644 --- a/services/federator/src/Federator/ExternalServer.hs +++ b/services/federator/src/Federator/ExternalServer.hs @@ -170,6 +170,6 @@ callInward component (RPC rpc) originDomain (CertHeader cert) wreq cont = do (responseHeaders resp) } -serveInward :: Env -> Int -> IO () -serveInward env port = - serveServant @(ToServantApi API) env port $ toServant $ server env._httpManager env._internalPort +serveInward :: Env -> Int -> IORef [IO ()] -> IO () +serveInward env port cleanupsRef = + serveServant @(ToServantApi API) env port cleanupsRef $ toServant $ server env._httpManager env._internalPort diff --git a/services/federator/src/Federator/InternalServer.hs b/services/federator/src/Federator/InternalServer.hs index e7caef5fd45..75c41da708d 100644 --- a/services/federator/src/Federator/InternalServer.hs +++ b/services/federator/src/Federator/InternalServer.hs @@ -130,6 +130,6 @@ callOutward targetDomain component (RPC path) req cont = do (fromLazyByteString body) embed . cont $ streamingResponseToWai resp -serveOutward :: Env -> Int -> IO () -serveOutward env port = do - serveServant @(ToServantApi API) env port (toServant $ server env._httpManager env._internalPort) +serveOutward :: Env -> Int -> IORef [IO ()] -> IO () +serveOutward env port cleanupsRef = do + serveServant @(ToServantApi API) env port cleanupsRef (toServant $ server env._httpManager env._internalPort) diff --git a/services/federator/src/Federator/Interpreter.hs b/services/federator/src/Federator/Interpreter.hs index e59f6a4cb0c..cc333f02be8 100644 --- a/services/federator/src/Federator/Interpreter.hs +++ b/services/federator/src/Federator/Interpreter.hs @@ -106,13 +106,20 @@ serveServant :: (HasServer api '[], Metrics.RoutesToPaths api) => Env -> Int -> + IORef [IO ()] -> ServerT api (Sem AllEffects) -> IO () -serveServant env port server = do +serveServant env port cleanupsRef server = do let hoistApp :: RequestId -> Server api hoistApp rid = hoistServerWithContext (Proxy @api) (Proxy @'[]) (runFederator env rid) server - Warp.run port + registerCleanupAction cleanupAction = + atomicModifyIORef' cleanupsRef $ \xs -> (cleanupAction : xs, ()) + let settings = + Warp.setPort port + . Warp.setInstallShutdownHandler registerCleanupAction + $ Warp.defaultSettings + Warp.runSettings settings . requestIdMiddleware env._applog federationRequestIdHeaderName . Wai.catchErrors (view applog env) federationRequestIdHeaderName . Metrics.servantPrometheusMiddleware (Proxy @api) diff --git a/services/federator/src/Federator/Run.hs b/services/federator/src/Federator/Run.hs index 83b9883b414..87fa12710b8 100644 --- a/services/federator/src/Federator/Run.hs +++ b/services/federator/src/Federator/Run.hs @@ -35,7 +35,7 @@ module Federator.Run where import Control.Concurrent.Async -import Control.Exception (bracket) +import Control.Exception (bracket, catch) import Control.Lens ((^.)) import Data.Id import Data.Metrics.GC @@ -50,6 +50,8 @@ import Network.HTTP.Client qualified as HTTP import Prometheus import System.Logger qualified as Log import System.Logger.Extended qualified as LogExt +import System.Posix (installHandler) +import System.Posix.Signals qualified as Sig import Util.Options import Wire.API.Federation.Component import Wire.Network.DNS.Helper qualified as DNS @@ -64,10 +66,13 @@ run opts = do let resolvConf = mkResolvConf (optSettings opts) DNS.defaultResolvConf DNS.withCachingResolver resolvConf $ \res -> do logger <- LogExt.mkLogger (Opt.logLevel opts) (Opt.logNetStrings opts) (Opt.logFormat opts) + cleanupActionsRef <- newIORef [] bracket (newEnv opts res logger) closeEnv $ \env -> do - let externalServer = serveInward env portExternal - internalServer = serveOutward env portInternal + let externalServer = serveInward env portExternal cleanupActionsRef + internalServer = serveOutward env portInternal cleanupActionsRef withMonitor logger (onNewSSLContext env) (optSettings opts) $ do + _ <- installHandler Sig.sigINT (Sig.CatchOnce $ cleanup cleanupActionsRef) Nothing + _ <- installHandler Sig.sigTERM (Sig.CatchOnce $ cleanup cleanupActionsRef) Nothing internalServerThread <- async internalServer externalServerThread <- async externalServer void $ waitAnyCancel [internalServerThread, externalServerThread] @@ -75,6 +80,12 @@ run opts = do endpointInternal = federatorInternal opts portInternal = fromIntegral $ endpointInternal.port + cleanup :: IORef [IO ()] -> IO () + cleanup cleanupsRef = do + cleanupActions <- readIORef cleanupsRef + for_ cleanupActions $ \action -> + action `catch` (\(_ :: SomeException) -> pure ()) + endpointExternal = federatorExternal opts portExternal = fromIntegral $ endpointExternal.port From 1e0175f8416de60f29f7d30eeec885aa7a893c29 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Tue, 7 Jan 2025 13:20:40 +0100 Subject: [PATCH 2/4] integration/testChannelKilled: Wait for connection to disappear before asserting that WS should be closed (#4400) Also ensure that lingering connections from previous uses of the dynamic backend are not causing flakiness. --- integration/test/Test/Events.hs | 95 +++++++++++++++++--------- integration/test/Testlib/Assertions.hs | 5 ++ 2 files changed, 67 insertions(+), 33 deletions(-) diff --git a/integration/test/Test/Events.hs b/integration/test/Test/Events.hs index f2d5dcde4a4..a386f65fdb9 100644 --- a/integration/test/Test/Events.hs +++ b/integration/test/Test/Events.hs @@ -21,9 +21,11 @@ import qualified Network.WebSockets as WS import Notifications import Servant.API (AsApi, ToServant, toServant) import Servant.API.Generic (fromServant) +import Servant.Client (AsClientT) import qualified Servant.Client as Servant import SetupHelpers import Testlib.Prelude +import Testlib.ResourcePool import UnliftIO hiding (handle) testConsumeEventsOneWebSocket :: (HasCallStack) => App () @@ -469,34 +471,45 @@ testChannelLimit = withModifiedBackend lift $ assertNoEvent_ ws testChannelKilled :: (HasCallStack) => App () -testChannelKilled = startDynamicBackendsReturnResources [def] $ \[backend] -> do - let domain = backend.berDomain - alice <- randomUser domain def - [c1, c2] <- - replicateM 2 - $ addClient alice def {acapabilities = Just ["consumable-notifications"]} - >>= getJSON 201 - >>= (%. "id") - >>= asString - - runCodensity (createEventsWebSocket alice (Just c1)) $ \ws -> do - -- If creating the user takes longer (async) than adding the clients, we get a - -- `"user.activate"` here, so we use `assertFindsEvent`. - - assertFindsEvent ws $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" - e %. "data.event.payload.0.client.id" `shouldMatch` c1 - ackEvent ws e +testChannelKilled = do + pool <- asks (.resourcePool) + runCodensity (acquireResources 1 pool) $ \[backend] -> do + -- Some times RabbitMQ still remembers connections from previous uses of the + -- dynamic backend. So we wait to ensure that we kill connection only for our + -- current. + waitUntilNoRabbitMqConns backend + + runCodensity (startDynamicBackend backend def) $ \_ -> do + let domain = backend.berDomain + alice <- randomUser domain def + [c1, c2] <- + replicateM 2 + $ addClient alice def {acapabilities = Just ["consumable-notifications"]} + >>= getJSON 201 + >>= (%. "id") + >>= asString + + runCodensity (createEventsWebSocket alice (Just c1)) $ \ws -> do + -- If creating the user takes longer (async) than adding the clients, we get a + -- `"user.activate"` here, so we use `assertFindsEvent`. + assertFindsEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" + e %. "data.event.payload.0.client.id" `shouldMatch` c1 + ackEvent ws e - assertFindsEvent ws $ \e -> do - e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" - e %. "data.event.payload.0.client.id" `shouldMatch` c2 + assertEvent ws $ \e -> do + e %. "data.event.payload.0.type" `shouldMatch` "user.client-add" + e %. "data.event.payload.0.client.id" `shouldMatch` c2 + ackEvent ws e - recoverAll - (constantDelay 500_000 <> limitRetries 10) - (const (killConnection backend)) + -- The RabbitMQ admin API takes some time to see new connections, so we need + -- to try a few times. + recoverAll + (constantDelay 500_000 <> limitRetries 10) + (const (killAllRabbitMqConns backend)) + waitUntilNoRabbitMqConns backend - assertWebSocketDied ws + assertNoEventHelper ws `shouldMatch` WebSocketDied ---------------------------------------------------------------------- -- helpers @@ -661,15 +674,31 @@ consumeAllEvents ws = do ackEvent ws e consumeAllEvents ws -killConnection :: (HasCallStack) => BackendResource -> App () -killConnection backend = do +-- | Only considers connections from cannon +waitUntilNoRabbitMqConns :: (HasCallStack) => BackendResource -> App () +waitUntilNoRabbitMqConns backend = do rabbitmqAdminClient <- mkRabbitMqAdminClientForResource backend - connections <- rabbitmqAdminClient.listConnectionsByVHost (Text.pack backend.berVHost) - connection <- - assertOne - [ c | c <- connections, c.userProvidedName == Just (Text.pack "pool 0") - ] - void $ rabbitmqAdminClient.deleteConnection connection.name + recoverAll + (constantDelay 500_000 <> limitRetries 10) + (const (go rabbitmqAdminClient)) + where + go rabbitmqAdminClient = do + cannonConnections <- getCannonConnections rabbitmqAdminClient backend.berVHost + cannonConnections `shouldMatch` ([] :: [Connection]) + +-- | Only kills connections from cannon +killAllRabbitMqConns :: (HasCallStack) => BackendResource -> App () +killAllRabbitMqConns backend = do + rabbitmqAdminClient <- mkRabbitMqAdminClientForResource backend + cannonConnections <- getCannonConnections rabbitmqAdminClient backend.berVHost + assertAtLeastOne cannonConnections + for_ cannonConnections $ \connection -> + rabbitmqAdminClient.deleteConnection connection.name + +getCannonConnections :: AdminAPI (AsClientT App) -> String -> App [Connection] +getCannonConnections rabbitmqAdminClient vhost = do + connections <- rabbitmqAdminClient.listConnectionsByVHost (Text.pack vhost) + pure $ filter (\c -> maybe False (fromString "pool " `Text.isPrefixOf`) c.userProvidedName) connections mkRabbitMqAdminClientForResource :: BackendResource -> App (AdminAPI (Servant.AsClientT App)) mkRabbitMqAdminClientForResource backend = do diff --git a/integration/test/Testlib/Assertions.hs b/integration/test/Testlib/Assertions.hs index 569eb5d4ca2..f31e8b7814c 100644 --- a/integration/test/Testlib/Assertions.hs +++ b/integration/test/Testlib/Assertions.hs @@ -45,6 +45,11 @@ assertOne xs = case toList xs of [x] -> pure x other -> assertFailure ("Expected one, but got " <> show (length other)) +assertAtLeastOne :: (HasCallStack, Foldable t) => t a -> App () +assertAtLeastOne xs = case toList xs of + [] -> assertFailure ("Expected at least one, but got nothing") + _ -> pure () + expectFailure :: (HasCallStack) => (AssertionFailure -> App ()) -> App a -> App () expectFailure checkFailure action = do env <- ask From 5da2473513e42df56e35fe4d69e3e21060a6b1f8 Mon Sep 17 00:00:00 2001 From: Stefan Matting Date: Tue, 7 Jan 2025 15:28:27 +0100 Subject: [PATCH 3/4] fix bug: consent request not correctly forwarded to galeb (#4376) --- changelog.d/3-bug-fixes/WPB-14613 | 1 + charts/nginz/values.yaml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/3-bug-fixes/WPB-14613 diff --git a/changelog.d/3-bug-fixes/WPB-14613 b/changelog.d/3-bug-fixes/WPB-14613 new file mode 100644 index 00000000000..ac522e721d8 --- /dev/null +++ b/changelog.d/3-bug-fixes/WPB-14613 @@ -0,0 +1 @@ +Fix bug in nginz: `/consent/` requests not correctly forwarded to `galeb`. diff --git a/charts/nginz/values.yaml b/charts/nginz/values.yaml index 8560dab1ac4..6614cae2a0c 100644 --- a/charts/nginz/values.yaml +++ b/charts/nginz/values.yaml @@ -708,7 +708,7 @@ nginx_conf: versioned: false strip_version: true galeb: - - path: /consent + - path: /consent(.*) envs: - all disable_zauth: true From 657a46a7d6e25243e7ee4c1bc6f150591ba788c1 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Tue, 7 Jan 2025 16:31:26 +0100 Subject: [PATCH 4/4] integration: Bring back port checking for dynamic backends (#4401) Instead of killing processes, wait for the port to be freed. --- integration/default.nix | 2 + integration/integration.cabal | 1 + integration/test/Testlib/ModService.hs | 81 ++++++++++++++++++++++++++ integration/test/Testlib/Ports.hs | 1 + nix/manual-overrides.nix | 1 + 5 files changed, 86 insertions(+) diff --git a/integration/default.nix b/integration/default.nix index 32715de71bc..3443839384f 100644 --- a/integration/default.nix +++ b/integration/default.nix @@ -70,6 +70,7 @@ , stm , streaming-commons , string-conversions +, system-linux-proc , tagged , temporary , text @@ -169,6 +170,7 @@ mkDerivation { stm streaming-commons string-conversions + system-linux-proc tagged temporary text diff --git a/integration/integration.cabal b/integration/integration.cabal index 8c4e3eb456c..a90cdd090fa 100644 --- a/integration/integration.cabal +++ b/integration/integration.cabal @@ -265,6 +265,7 @@ library , stm , streaming-commons , string-conversions + , system-linux-proc , tagged , temporary , text diff --git a/integration/test/Testlib/ModService.hs b/integration/test/Testlib/ModService.hs index 7d2356b9711..cda93648798 100644 --- a/integration/test/Testlib/ModService.hs +++ b/integration/test/Testlib/ModService.hs @@ -18,10 +18,13 @@ import Control.Monad.Extra import Control.Monad.Reader import Control.Retry (fibonacciBackoff, limitRetriesByCumulativeDelay, retrying) import Data.Aeson hiding ((.=)) +import qualified Data.Attoparsec.Text as Parser +import qualified Data.Char as Char import Data.Default import Data.Foldable import Data.Function import Data.Functor +import qualified Data.List as List import Data.Maybe import Data.Monoid import Data.String @@ -29,22 +32,28 @@ import Data.String.Conversions (cs) import qualified Data.Text as Text import qualified Data.Text.IO as Text import Data.Traversable +import Data.Word import qualified Data.Yaml as Yaml import GHC.Stack import qualified Network.HTTP.Client as HTTP import System.Directory (copyFile, createDirectoryIfMissing, doesDirectoryExist, doesFileExist, listDirectory, removeDirectoryRecursive, removeFile) +import System.Exit import System.FilePath import System.IO import System.IO.Temp (createTempDirectory, writeTempFile) +import qualified System.Linux.Proc as LinuxProc import System.Posix (keyboardSignal, killProcess, signalProcess) +import System.Posix.Types import System.Process import Testlib.App import Testlib.HTTP import Testlib.JSON +import Testlib.Ports (PortNamespace (..)) import Testlib.Printing import Testlib.ResourcePool import Testlib.Types import Text.RawString.QQ +import qualified UnliftIO import Prelude withModifiedBackend :: (HasCallStack) => ServiceOverrides -> ((HasCallStack) => String -> App a) -> App a @@ -266,9 +275,81 @@ startBackend :: ServiceOverrides -> Codensity App () startBackend resource overrides = do + lift $ waitForPortsToBeFree resource traverseConcurrentlyCodensity (withProcess resource overrides) allServices lift $ ensureBackendReachable resource.berDomain +waitForPortsToBeFree :: (HasCallStack) => BackendResource -> App () +waitForPortsToBeFree backend = do + let namedPorts = + (FederatorExternal, backend.berFederatorExternal) + : (NginzHttp2, backend.berNginzHttp2Port) + : (NginzSSL, backend.berNginzSslPort) + : map (\s -> (ServiceInternal s, berInternalServicePorts backend s)) [minBound .. maxBound] + void $ UnliftIO.pooledMapConcurrentlyN 8 (uncurry $ waitForPortToBeFree backend.berDomain) namedPorts + +-- | Using lsof because it is most convenient. Checking if a port is free in Haskell involves binding to it which is not what we want. +waitForPortToBeFree :: String -> PortNamespace -> Word16 -> App () +waitForPortToBeFree domain portName portNumber = do + env <- ask + addFailureContext ("domain=" <> domain <> "\nportName=" <> show portName <> "\nportNumber=" <> show portNumber) $ + UnliftIO.timeout (env.timeOutSeconds * 1_000_000) check >>= \case + Nothing -> assertFailure $ "timeout waiting for federator port to be free: name=" <> show portName <> ", number=" <> show portNumber + Just _ -> pure () + where + check :: App () + check = do + env <- ask + let process = (proc "lsof" ["-Q", "-Fpc", "-i", ":" <> show portNumber, "-s", "TCP:LISTEN"]) {std_out = CreatePipe, std_err = CreatePipe} + (_, Just stdoutHdl, Just stderrHdl, ph) <- liftIO $ createProcess process + let prefix = "[" <> "lsof(" <> show portName <> ")@" <> domain <> maybe "" (":" <>) env.currentTestName <> "] " + liftIO $ void $ forkIO $ logToConsole id prefix stderrHdl + exitCode <- liftIO $ waitForProcess ph + case exitCode of + ExitFailure _ -> assertFailure $ prefix <> "lsof failed to figure out if port is free" + ExitSuccess -> do + lsofOutput <- liftIO $ hGetContents stdoutHdl + case parseLsof (fromString lsofOutput) of + Right procs@(_ : _) -> do + liftIO $ putStrLn $ colored red $ prefix <> "Found one or more processes listening on port: " <> show portNumber + analysis <- List.intercalate "\n" <$> mapM (liftIO . uncurry analyzeRunningProcess) procs + liftIO $ putStrLn $ indent 2 analysis + liftIO $ threadDelay 100_000 + check + Right [] -> pure () + Left e -> assertFailure $ "Failed while parsing lsof output with error: " <> e <> "\n" <> "lsof output:\n" <> lsofOutput + +analyzeRunningProcess :: ProcessID -> String -> IO String +analyzeRunningProcess pid pname = do + eithSocket <- LinuxProc.readProcTcpSockets (LinuxProc.ProcessId $ fromIntegral pid) + let sockInfo = case eithSocket of + Left e -> "Failed to read TCP sockets for process: error: " <> Text.unpack (LinuxProc.renderProcError e) + Right socks -> List.intercalate "\n" $ map displaySocket socks + pure $ "Process: pid=" <> show pid <> ", name=" <> pname <> "\n" <> indent 2 sockInfo + where + displaySocket :: LinuxProc.TcpSocket -> String + displaySocket sock = "local address = " <> show sock.tcpLocalAddr <> ", remote address = " <> show sock.tcpRemoteAddr <> ", tcp state = " <> show sock.tcpTcpState + +-- | Example lsof output: +-- +-- @ +-- p61317 +-- cfederator +-- +-- @ +parseLsof :: String -> Either String [(ProcessID, String)] +parseLsof output = + Parser.parseOnly (listParser <* trailingSpace <* Parser.endOfInput) (fromString output) + where + lsofParser :: Parser.Parser (ProcessID, String) + lsofParser = + (,) <$> processIdParser <* Parser.char '\n' <*> processNameParser + processIdParser = Parser.char 'p' *> Parser.decimal + processNameParser = Parser.char 'c' *> Parser.many1 (Parser.satisfy (/= '\n')) + + listParser = (Parser.sepBy lsofParser (Parser.char '\n')) + trailingSpace = Parser.many' (Parser.satisfy Char.isSpace) + ensureBackendReachable :: (HasCallStack) => String -> App () ensureBackendReachable domain = do env <- ask diff --git a/integration/test/Testlib/Ports.hs b/integration/test/Testlib/Ports.hs index 52d9aa2d05c..2574077a038 100644 --- a/integration/test/Testlib/Ports.hs +++ b/integration/test/Testlib/Ports.hs @@ -8,6 +8,7 @@ data PortNamespace | NginzHttp2 | FederatorExternal | ServiceInternal Service + deriving (Show, Eq) port :: (Num a) => PortNamespace -> BackendName -> a port NginzSSL bn = mkPort 8443 bn diff --git a/nix/manual-overrides.nix b/nix/manual-overrides.nix index 2d16e57bd25..89369893038 100644 --- a/nix/manual-overrides.nix +++ b/nix/manual-overrides.nix @@ -59,6 +59,7 @@ hself: hsuper: { # (we can unfortunately not do anything here but update nixpkgs) # ------------------------------------ template = hlib.markUnbroken hsuper.template; + system-linux-proc = hlib.markUnbroken hsuper.system-linux-proc; # ----------------- # version overrides