Skip to content

Commit

Permalink
Merge branch 'wireapp:develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
offsoc authored Jan 7, 2025
2 parents 62bc656 + 657a46a commit 5f50ccb
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 70 deletions.
1 change: 1 addition & 0 deletions changelog.d/3-bug-fixes/WPB-14613
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug in nginz: `/consent/<foo>` requests not correctly forwarded to `galeb`.
1 change: 1 addition & 0 deletions changelog.d/5-internal/federator-signal-handling
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
federator: Install signal handlers for SIGINT and SIGTERM, close sockets when receiving these signals
2 changes: 1 addition & 1 deletion charts/nginz/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ nginx_conf:
versioned: false
strip_version: true
galeb:
- path: /consent
- path: /consent(.*)
envs:
- all
disable_zauth: true
Expand Down
2 changes: 2 additions & 0 deletions integration/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
, stm
, streaming-commons
, string-conversions
, system-linux-proc
, tagged
, temporary
, text
Expand Down Expand Up @@ -169,6 +170,7 @@ mkDerivation {
stm
streaming-commons
string-conversions
system-linux-proc
tagged
temporary
text
Expand Down
1 change: 1 addition & 0 deletions integration/integration.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ library
, stm
, streaming-commons
, string-conversions
, system-linux-proc
, tagged
, temporary
, text
Expand Down
95 changes: 62 additions & 33 deletions integration/test/Test/Events.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import qualified Network.WebSockets as WS
import Notifications
import Servant.API (AsApi, ToServant, toServant)
import Servant.API.Generic (fromServant)
import Servant.Client (AsClientT)
import qualified Servant.Client as Servant
import SetupHelpers
import Testlib.Prelude
import Testlib.ResourcePool
import UnliftIO hiding (handle)

testConsumeEventsOneWebSocket :: (HasCallStack) => App ()
Expand Down Expand Up @@ -469,34 +471,45 @@ testChannelLimit = withModifiedBackend
lift $ assertNoEvent_ ws

testChannelKilled :: (HasCallStack) => App ()
testChannelKilled = startDynamicBackendsReturnResources [def] $ \[backend] -> do
let domain = backend.berDomain
alice <- randomUser domain def
[c1, c2] <-
replicateM 2
$ addClient alice def {acapabilities = Just ["consumable-notifications"]}
>>= getJSON 201
>>= (%. "id")
>>= asString

runCodensity (createEventsWebSocket alice (Just c1)) $ \ws -> do
-- If creating the user takes longer (async) than adding the clients, we get a
-- `"user.activate"` here, so we use `assertFindsEvent`.

assertFindsEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` c1
ackEvent ws e
testChannelKilled = do
pool <- asks (.resourcePool)
runCodensity (acquireResources 1 pool) $ \[backend] -> do
-- Some times RabbitMQ still remembers connections from previous uses of the
-- dynamic backend. So we wait to ensure that we kill connection only for our
-- current.
waitUntilNoRabbitMqConns backend

runCodensity (startDynamicBackend backend def) $ \_ -> do
let domain = backend.berDomain
alice <- randomUser domain def
[c1, c2] <-
replicateM 2
$ addClient alice def {acapabilities = Just ["consumable-notifications"]}
>>= getJSON 201
>>= (%. "id")
>>= asString

runCodensity (createEventsWebSocket alice (Just c1)) $ \ws -> do
-- If creating the user takes longer (async) than adding the clients, we get a
-- `"user.activate"` here, so we use `assertFindsEvent`.
assertFindsEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` c1
ackEvent ws e

assertFindsEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` c2
assertEvent ws $ \e -> do
e %. "data.event.payload.0.type" `shouldMatch` "user.client-add"
e %. "data.event.payload.0.client.id" `shouldMatch` c2
ackEvent ws e

recoverAll
(constantDelay 500_000 <> limitRetries 10)
(const (killConnection backend))
-- The RabbitMQ admin API takes some time to see new connections, so we need
-- to try a few times.
recoverAll
(constantDelay 500_000 <> limitRetries 10)
(const (killAllRabbitMqConns backend))
waitUntilNoRabbitMqConns backend

assertWebSocketDied ws
assertNoEventHelper ws `shouldMatch` WebSocketDied

----------------------------------------------------------------------
-- helpers
Expand Down Expand Up @@ -661,15 +674,31 @@ consumeAllEvents ws = do
ackEvent ws e
consumeAllEvents ws

killConnection :: (HasCallStack) => BackendResource -> App ()
killConnection backend = do
-- | Only considers connections from cannon
waitUntilNoRabbitMqConns :: (HasCallStack) => BackendResource -> App ()
waitUntilNoRabbitMqConns backend = do
rabbitmqAdminClient <- mkRabbitMqAdminClientForResource backend
connections <- rabbitmqAdminClient.listConnectionsByVHost (Text.pack backend.berVHost)
connection <-
assertOne
[ c | c <- connections, c.userProvidedName == Just (Text.pack "pool 0")
]
void $ rabbitmqAdminClient.deleteConnection connection.name
recoverAll
(constantDelay 500_000 <> limitRetries 10)
(const (go rabbitmqAdminClient))
where
go rabbitmqAdminClient = do
cannonConnections <- getCannonConnections rabbitmqAdminClient backend.berVHost
cannonConnections `shouldMatch` ([] :: [Connection])

-- | Only kills connections from cannon
killAllRabbitMqConns :: (HasCallStack) => BackendResource -> App ()
killAllRabbitMqConns backend = do
rabbitmqAdminClient <- mkRabbitMqAdminClientForResource backend
cannonConnections <- getCannonConnections rabbitmqAdminClient backend.berVHost
assertAtLeastOne cannonConnections
for_ cannonConnections $ \connection ->
rabbitmqAdminClient.deleteConnection connection.name

getCannonConnections :: AdminAPI (AsClientT App) -> String -> App [Connection]
getCannonConnections rabbitmqAdminClient vhost = do
connections <- rabbitmqAdminClient.listConnectionsByVHost (Text.pack vhost)
pure $ filter (\c -> maybe False (fromString "pool " `Text.isPrefixOf`) c.userProvidedName) connections

mkRabbitMqAdminClientForResource :: BackendResource -> App (AdminAPI (Servant.AsClientT App))
mkRabbitMqAdminClientForResource backend = do
Expand Down
8 changes: 6 additions & 2 deletions integration/test/Test/User.hs
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,16 @@ testMigratingPasswordHashingAlgorithm = do
]
cfgArgon2id =
def
{ brigCfg = setField "settings.setPasswordHashingOptions" argon2idOpts,
{ brigCfg =
setField "settings.setPasswordHashingOptions" argon2idOpts
>=> removeField "optSettings.setSuspendInactiveUsers",
galleyCfg = setField "settings.passwordHashingOptions" argon2idOpts
}
cfgScrypt =
def
{ brigCfg = setField "settings.setPasswordHashingOptions.algorithm" "scrypt",
{ brigCfg =
setField "settings.setPasswordHashingOptions.algorithm" "scrypt"
>=> removeField "optSettings.setSuspendInactiveUsers",
galleyCfg = setField "settings.passwordHashingOptions.algorithm" "scrypt"
}
resourcePool <- asks (.resourcePool)
Expand Down
5 changes: 5 additions & 0 deletions integration/test/Testlib/Assertions.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ assertOne xs = case toList xs of
[x] -> pure x
other -> assertFailure ("Expected one, but got " <> show (length other))

assertAtLeastOne :: (HasCallStack, Foldable t) => t a -> App ()
assertAtLeastOne xs = case toList xs of
[] -> assertFailure ("Expected at least one, but got nothing")
_ -> pure ()

expectFailure :: (HasCallStack) => (AssertionFailure -> App ()) -> App a -> App ()
expectFailure checkFailure action = do
env <- ask
Expand Down
73 changes: 50 additions & 23 deletions integration/test/Testlib/ModService.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import Control.Monad.Reader
import Control.Retry (fibonacciBackoff, limitRetriesByCumulativeDelay, retrying)
import Data.Aeson hiding ((.=))
import qualified Data.Attoparsec.Text as Parser
import qualified Data.Char as Char
import Data.Default
import Data.Foldable
import Data.Function
import Data.Functor
import qualified Data.List as List
import Data.Maybe
import Data.Monoid
import Data.String
Expand All @@ -39,12 +41,14 @@ import System.Exit
import System.FilePath
import System.IO
import System.IO.Temp (createTempDirectory, writeTempFile)
import qualified System.Linux.Proc as LinuxProc
import System.Posix (keyboardSignal, killProcess, signalProcess)
import System.Posix.Types
import System.Process
import Testlib.App
import Testlib.HTTP
import Testlib.JSON
import Testlib.Ports (PortNamespace (..))
import Testlib.Printing
import Testlib.ResourcePool
import Testlib.Types
Expand Down Expand Up @@ -123,11 +127,11 @@ traverseConcurrentlyCodensity f args = do

pure result

startDynamicBackends :: [ServiceOverrides] -> ([String] -> App a) -> App a
startDynamicBackends :: (HasCallStack) => [ServiceOverrides] -> ([String] -> App a) -> App a
startDynamicBackends beOverrides k = do
startDynamicBackendsReturnResources beOverrides (\resources -> k $ map (.berDomain) resources)

startDynamicBackendsReturnResources :: [ServiceOverrides] -> ([BackendResource] -> App a) -> App a
startDynamicBackendsReturnResources :: (HasCallStack) => [ServiceOverrides] -> ([BackendResource] -> App a) -> App a
startDynamicBackendsReturnResources beOverrides k = do
let startDynamicBackendsCodensity = do
when (Prelude.length beOverrides > 3) $ lift $ failApp "Too many backends. Currently only 3 are supported."
Expand Down Expand Up @@ -271,58 +275,81 @@ startBackend ::
ServiceOverrides ->
Codensity App ()
startBackend resource overrides = do
lift $ ensureFederatorPortIsFree resource
lift $ waitForPortsToBeFree resource
traverseConcurrentlyCodensity (withProcess resource overrides) allServices
lift $ ensureBackendReachable resource.berDomain

-- | Using ss because it is most convenient. Checking if a port is free in Haskell involves binding to it which is not what we want.
ensureFederatorPortIsFree :: BackendResource -> App ()
ensureFederatorPortIsFree resource = do
serviceMap <- getServiceMap resource.berDomain
let federatorExternalPort :: Word16 = serviceMap.federatorExternal.port
waitForPortsToBeFree :: (HasCallStack) => BackendResource -> App ()
waitForPortsToBeFree backend = do
let namedPorts =
(FederatorExternal, backend.berFederatorExternal)
: (NginzHttp2, backend.berNginzHttp2Port)
: (NginzSSL, backend.berNginzSslPort)
: map (\s -> (ServiceInternal s, berInternalServicePorts backend s)) [minBound .. maxBound]
void $ UnliftIO.pooledMapConcurrentlyN 8 (uncurry $ waitForPortToBeFree backend.berDomain) namedPorts

-- | Using lsof because it is most convenient. Checking if a port is free in Haskell involves binding to it which is not what we want.
waitForPortToBeFree :: String -> PortNamespace -> Word16 -> App ()
waitForPortToBeFree domain portName portNumber = do
env <- ask
UnliftIO.timeout (env.timeOutSeconds * 1_000_000) (check federatorExternalPort) >>= \case
Nothing -> assertFailure $ "timeout waiting for federator port to be free: " <> show federatorExternalPort
Just _ -> pure ()
addFailureContext ("domain=" <> domain <> "\nportName=" <> show portName <> "\nportNumber=" <> show portNumber) $
UnliftIO.timeout (env.timeOutSeconds * 1_000_000) check >>= \case
Nothing -> assertFailure $ "timeout waiting for federator port to be free: name=" <> show portName <> ", number=" <> show portNumber
Just _ -> pure ()
where
check :: Word16 -> App ()
check federatorExternalPort = do
check :: App ()
check = do
env <- ask
let process = (proc "lsof" ["-Q", "-Fpc", "-i", ":" <> show federatorExternalPort, "-s", "TCP:LISTEN"]) {std_out = CreatePipe, std_err = CreatePipe}
let process = (proc "lsof" ["-Q", "-Fpc", "-i", ":" <> show portNumber, "-s", "TCP:LISTEN"]) {std_out = CreatePipe, std_err = CreatePipe}
(_, Just stdoutHdl, Just stderrHdl, ph) <- liftIO $ createProcess process
let prefix = "[" <> "lsof" <> "@" <> resource.berDomain <> maybe "" (":" <>) env.currentTestName <> "] "
let prefix = "[" <> "lsof(" <> show portName <> ")@" <> domain <> maybe "" (":" <>) env.currentTestName <> "] "
liftIO $ void $ forkIO $ logToConsole id prefix stderrHdl
exitCode <- liftIO $ waitForProcess ph
case exitCode of
ExitFailure _ -> assertFailure $ prefix <> "lsof failed to figure out if federator port is free"
ExitFailure _ -> assertFailure $ prefix <> "lsof failed to figure out if port is free"
ExitSuccess -> do
lsofOutput <- liftIO $ hGetContents stdoutHdl
case parseLsof (fromString lsofOutput) of
Right ((processId, processName) : _) -> do
liftIO $ putStrLn $ prefix <> "Found a process listening on port: " <> show federatorExternalPort <> ", killing the process: " <> show processName <> ", pid: " <> show processId
liftIO $ signalProcess killProcess processId
Right procs@(_ : _) -> do
liftIO $ putStrLn $ colored red $ prefix <> "Found one or more processes listening on port: " <> show portNumber
analysis <- List.intercalate "\n" <$> mapM (liftIO . uncurry analyzeRunningProcess) procs
liftIO $ putStrLn $ indent 2 analysis
liftIO $ threadDelay 100_000
check federatorExternalPort
check
Right [] -> pure ()
Left e -> assertFailure $ prefix <> "Failed while parsing lsof output with error: " <> e <> "\n" <> "lsof output:\n" <> lsofOutput
Left e -> assertFailure $ "Failed while parsing lsof output with error: " <> e <> "\n" <> "lsof output:\n" <> lsofOutput

analyzeRunningProcess :: ProcessID -> String -> IO String
analyzeRunningProcess pid pname = do
eithSocket <- LinuxProc.readProcTcpSockets (LinuxProc.ProcessId $ fromIntegral pid)
let sockInfo = case eithSocket of
Left e -> "Failed to read TCP sockets for process: error: " <> Text.unpack (LinuxProc.renderProcError e)
Right socks -> List.intercalate "\n" $ map displaySocket socks
pure $ "Process: pid=" <> show pid <> ", name=" <> pname <> "\n" <> indent 2 sockInfo
where
displaySocket :: LinuxProc.TcpSocket -> String
displaySocket sock = "local address = " <> show sock.tcpLocalAddr <> ", remote address = " <> show sock.tcpRemoteAddr <> ", tcp state = " <> show sock.tcpTcpState

-- | Example lsof output:
--
-- @
-- p61317
-- cfederator
--
-- @
parseLsof :: String -> Either String [(ProcessID, String)]
parseLsof output =
Parser.parseOnly ((Parser.sepBy lsofParser (Parser.char '\n')) <* Parser.endOfInput) (fromString output)
Parser.parseOnly (listParser <* trailingSpace <* Parser.endOfInput) (fromString output)
where
lsofParser :: Parser.Parser (ProcessID, String)
lsofParser =
(,) <$> processIdParser <* Parser.char '\n' <*> processNameParser

processIdParser = Parser.char 'p' *> Parser.decimal
processNameParser = Parser.char 'c' *> Parser.many1 (Parser.satisfy (/= '\n'))

listParser = (Parser.sepBy lsofParser (Parser.char '\n'))
trailingSpace = Parser.many' (Parser.satisfy Char.isSpace)

ensureBackendReachable :: (HasCallStack) => String -> App ()
ensureBackendReachable domain = do
env <- ask
Expand Down
1 change: 1 addition & 0 deletions integration/test/Testlib/Ports.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ data PortNamespace
| NginzHttp2
| FederatorExternal
| ServiceInternal Service
deriving (Show, Eq)

port :: (Num a) => PortNamespace -> BackendName -> a
port NginzSSL bn = mkPort 8443 bn
Expand Down
1 change: 1 addition & 0 deletions nix/manual-overrides.nix
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ hself: hsuper: {
# (we can unfortunately not do anything here but update nixpkgs)
# ------------------------------------
template = hlib.markUnbroken hsuper.template;
system-linux-proc = hlib.markUnbroken hsuper.system-linux-proc;

# -----------------
# version overrides
Expand Down
6 changes: 3 additions & 3 deletions services/federator/src/Federator/ExternalServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,6 @@ callInward component (RPC rpc) originDomain (CertHeader cert) wreq cont = do
(responseHeaders resp)
}

serveInward :: Env -> Int -> IO ()
serveInward env port =
serveServant @(ToServantApi API) env port $ toServant $ server env._httpManager env._internalPort
serveInward :: Env -> Int -> IORef [IO ()] -> IO ()
serveInward env port cleanupsRef =
serveServant @(ToServantApi API) env port cleanupsRef $ toServant $ server env._httpManager env._internalPort
6 changes: 3 additions & 3 deletions services/federator/src/Federator/InternalServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,6 @@ callOutward targetDomain component (RPC path) req cont = do
(fromLazyByteString body)
embed . cont $ streamingResponseToWai resp

serveOutward :: Env -> Int -> IO ()
serveOutward env port = do
serveServant @(ToServantApi API) env port (toServant $ server env._httpManager env._internalPort)
serveOutward :: Env -> Int -> IORef [IO ()] -> IO ()
serveOutward env port cleanupsRef = do
serveServant @(ToServantApi API) env port cleanupsRef (toServant $ server env._httpManager env._internalPort)
11 changes: 9 additions & 2 deletions services/federator/src/Federator/Interpreter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,20 @@ serveServant ::
(HasServer api '[], Metrics.RoutesToPaths api) =>
Env ->
Int ->
IORef [IO ()] ->
ServerT api (Sem AllEffects) ->
IO ()
serveServant env port server = do
serveServant env port cleanupsRef server = do
let hoistApp :: RequestId -> Server api
hoistApp rid =
hoistServerWithContext (Proxy @api) (Proxy @'[]) (runFederator env rid) server
Warp.run port
registerCleanupAction cleanupAction =
atomicModifyIORef' cleanupsRef $ \xs -> (cleanupAction : xs, ())
let settings =
Warp.setPort port
. Warp.setInstallShutdownHandler registerCleanupAction
$ Warp.defaultSettings
Warp.runSettings settings
. requestIdMiddleware env._applog federationRequestIdHeaderName
. Wai.catchErrors (view applog env) federationRequestIdHeaderName
. Metrics.servantPrometheusMiddleware (Proxy @api)
Expand Down
Loading

0 comments on commit 5f50ccb

Please sign in to comment.