Skip to content

Commit

Permalink
feat: concurrent connections limiter (#751)
Browse files Browse the repository at this point in the history
* Revert "Revert "feat: concurrent connections limiter (#696)" (#750)"

This reverts commit 1598989.

* fix: don't override connection opts

* fix: only build memcache config if memcache is actually required

* fix: keep a consistent connection_opts throughout various environments

* chore: resolve credo complaint

---------

Co-authored-by: Paul Swartz <[email protected]>
  • Loading branch information
nlwstein and paulswartz authored Feb 27, 2024
1 parent f199993 commit fedc026
Show file tree
Hide file tree
Showing 17 changed files with 397 additions and 17 deletions.
4 changes: 3 additions & 1 deletion apps/api_accounts/lib/api_accounts/key.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ defmodule ApiAccounts.Key do
field(:requested_date, :datetime)
field(:approved, :boolean, default: false)
field(:locked, :boolean, default: false)
field(:static_concurrent_limit, :integer)
field(:streaming_concurrent_limit, :integer)
field(:daily_limit, :integer)
field(:rate_request_pending, :boolean, default: false)
field(:api_version, :string)
Expand All @@ -28,7 +30,7 @@ defmodule ApiAccounts.Key do
@doc false
def changeset(struct, params \\ %{}) do
fields = ~w(
created requested_date approved locked daily_limit rate_request_pending api_version description allowed_domains
created requested_date approved locked static_concurrent_limit streaming_concurrent_limit daily_limit rate_request_pending api_version description allowed_domains
)a
cast(struct, params, fields)
end
Expand Down
20 changes: 20 additions & 0 deletions apps/api_web/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,33 @@ config :api_web, ApiWeb.Endpoint,

config :api_web, :signing_salt, "NdisAeo6Jf02spiKqa"

config :api_web, RateLimiter.Memcache,
connection_opts: [
namespace: "api_dev_rate_limit",
hostname: "localhost",
coder: Memcache.Coder.JSON
]

config :api_web, :rate_limiter,
clear_interval: 60_000,
limiter: ApiWeb.RateLimiter.ETS,
max_anon_per_interval: 5_000,
max_registered_per_interval: 100_000,
wait_time_ms: 0

config :api_web, :rate_limiter_concurrent,
enabled: false,
memcache: false,
log_statistics: true,
limit_users: false,
# How many seconds tolerated when calculating whether a connection is still open
# 45 - 30 (see ApiWeb.EventStream.Initialize's timeout value) gives us a buffer of 15 seconds:
heartbeat_tolerance: 45,
# Default concurrent connections - these can be overridden on a per-key basis in the admin UI:
max_anon_static: 5,
max_registered_streaming: 10,
max_registered_static: 20

config :api_web, ApiWeb.Plugs.ModifiedSinceHandler, check_caller: false

config :api_web, :api_pipeline,
Expand Down
2 changes: 2 additions & 0 deletions apps/api_web/config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ config :logger, :console, format: "[$level] $message\n", level: :debug
# Do not configure such in production as keeping
# and calculating stacktraces is usually expensive.
config :phoenix, :stacktrace_depth, 20

config :api_web, :rate_limiter_concurrent, enabled: false, memcache: false
4 changes: 4 additions & 0 deletions apps/api_web/config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ config :ehmon, :report_mf, {:ehmon, :info_report}

config :logster, :filter_parameters, ~w(password password_confirm)

config :api_web, :rate_limiter_concurrent,
enabled: true,
memcache: true

config :api_web, :rate_limiter,
clear_interval: 60_000,
max_anon_per_interval: 20,
Expand Down
7 changes: 6 additions & 1 deletion apps/api_web/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ config :api_web, :rate_limiter,
config :api_web, RateLimiter.Memcache,
connection_opts: [
namespace: "api_test_rate_limit",
hostname: "localhost"
hostname: "localhost",
coder: Memcache.Coder.JSON
]

config :api_web, ApiWeb.Plugs.ModifiedSinceHandler, check_caller: true
Expand All @@ -26,3 +27,7 @@ config :recaptcha,

# Print only warnings and errors during test
config :logger, level: :warn

config :api_web, :rate_limiter_concurrent,
enabled: false,
memcache: false
2 changes: 2 additions & 0 deletions apps/api_web/lib/api_web.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ defmodule ApiWeb do
# no cover
children = [
# Start the endpoint when the application starts
ApiWeb.RateLimiter.Memcache.Supervisor,
ApiWeb.RateLimiter,
ApiWeb.RateLimiter.RateLimiterConcurrent,
{RequestTrack, [name: ApiWeb.RequestTrack]},
ApiWeb.EventStream.Supervisor,
ApiWeb.Endpoint,
Expand Down
1 change: 1 addition & 0 deletions apps/api_web/lib/api_web/api_controller_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule ApiWeb.ApiControllerHelpers do
plug(:split_include)
plug(ApiWeb.Plugs.ModifiedSinceHandler, caller: __MODULE__)
plug(ApiWeb.Plugs.RateLimiter)
plug(ApiWeb.Plugs.RateLimiterConcurrent)

def index(conn, params), do: ApiControllerHelpers.index(__MODULE__, conn, params)

Expand Down
15 changes: 15 additions & 0 deletions apps/api_web/lib/api_web/event_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule ApiWeb.EventStream do
import Plug.Conn
alias __MODULE__.Supervisor
alias ApiWeb.Plugs.CheckForShutdown
alias ApiWeb.RateLimiter.RateLimiterConcurrent
require Logger

@enforce_keys [:conn, :pid, :timeout]
Expand Down Expand Up @@ -53,6 +54,13 @@ defmodule ApiWeb.EventStream do

@spec hibernate_loop(state) :: Plug.Conn.t()
def hibernate_loop(state) do
if Map.has_key?(state.conn.assigns, :api_user) do
# Update the concurrent rate limit cache to ensure any flushing doesn't impact long-running connections:
RateLimiterConcurrent.add_lock(state.conn.assigns.api_user, self(), true)
else
Logger.warn("#{__MODULE__} missing_api_user - cannot rate limit!")
end

case receive_result(state) do
{:continue, state} ->
:proc_lib.hibernate(__MODULE__, :hibernate_loop, [state])
Expand Down Expand Up @@ -130,6 +138,13 @@ defmodule ApiWeb.EventStream do
end

defp unsubscribe(state) do
if Map.has_key?(state.conn.assigns, :api_user) do
# clean up our concurrent connections lock:
RateLimiterConcurrent.remove_lock(state.conn.assigns.api_user, self(), true)
else
Logger.warn("#{__MODULE__} missing_api_user - cannot rate limit!")
end

# consume any extra messages received after unsubscribing
receive do
{:events, _} ->
Expand Down
70 changes: 70 additions & 0 deletions apps/api_web/lib/api_web/plugs/rate_limiter_concurrent.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defmodule ApiWeb.Plugs.RateLimiterConcurrent do
@moduledoc """
Plug to invoke the concurrent rate limiter.
"""

import Plug.Conn
import Phoenix.Controller, only: [render: 3, put_view: 2]

require Logger

alias ApiWeb.RateLimiter.RateLimiterConcurrent

@rate_limit_concurrent_config Application.compile_env!(:api_web, :rate_limiter_concurrent)

def init(opts), do: opts

def call(conn, _opts) do
if enabled?() do
event_stream? = Plug.Conn.get_req_header(conn, "accept") == ["text/event-stream"]

{at_limit?, remaining, limit} =
RateLimiterConcurrent.check_concurrent_rate_limit(conn.assigns.api_user, event_stream?)

if log_statistics?() do
Logger.info(
"ApiWeb.Plugs.RateLimiterConcurrent event=request_statistics api_user=#{conn.assigns.api_user.id} at_limit=#{at_limit?} remaining=#{remaining - 1} limit=#{limit} event_stream=#{event_stream?}"
)
end

# Allow negative limits to allow unlimited use:
if limit_users?() and limit >= 0 and at_limit? do
conn
|> put_concurrent_rate_limit_headers(limit, remaining)
|> put_status(429)
|> put_view(ApiWeb.ErrorView)
|> render("429.json-api", [])
|> halt()
else
RateLimiterConcurrent.add_lock(conn.assigns.api_user, self(), event_stream?)

conn
|> put_concurrent_rate_limit_headers(limit, remaining - 1)
|> register_before_send(fn conn ->
RateLimiterConcurrent.remove_lock(conn.assigns.api_user, self(), event_stream?)
conn
end)
end
else
conn
end
end

defp put_concurrent_rate_limit_headers(conn, limit, remaining) do
conn
|> put_resp_header("x-concurrent-ratelimit-limit", "#{limit}")
|> put_resp_header("x-concurrent-ratelimit-remaining", "#{remaining}")
end

def enabled? do
Keyword.fetch!(@rate_limit_concurrent_config, :enabled)
end

def limit_users? do
Keyword.fetch!(@rate_limit_concurrent_config, :limit_users)
end

def log_statistics? do
Keyword.fetch!(@rate_limit_concurrent_config, :log_statistics)
end
end
12 changes: 6 additions & 6 deletions apps/api_web/lib/api_web/rate_limiter/memcache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ defmodule ApiWeb.RateLimiter.Memcache do
"""
@behaviour ApiWeb.RateLimiter.Limiter
alias ApiWeb.RateLimiter.Memcache.Supervisor
use GenServer

@impl ApiWeb.RateLimiter.Limiter
def start_link(opts) do
clear_interval_ms = Keyword.fetch!(opts, :clear_interval)
clear_interval = div(clear_interval_ms, 1000)

connection_opts =
[ttl: clear_interval * 2] ++ ApiWeb.config(RateLimiter.Memcache, :connection_opts)
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

Supervisor.start_link(connection_opts)
@impl true
def init(opts) do
{:ok, opts}
end

@impl ApiWeb.RateLimiter.Limiter
Expand Down
38 changes: 31 additions & 7 deletions apps/api_web/lib/api_web/rate_limiter/memcache/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,34 @@ defmodule ApiWeb.RateLimiter.Memcache.Supervisor do
"""
@worker_count 5
@registry_name __MODULE__.Registry
@rate_limit_config Application.compile_env!(:api_web, :rate_limiter)

def start_link(connection_opts) do
use Agent

def start_link(_) do
registry = {Registry, keys: :unique, name: @registry_name}

workers =
for i <- 1..@worker_count do
Supervisor.child_spec({Memcache, [connection_opts, [name: worker_name(i)]]}, id: i)
end
children =
if memcache_required?() do
clear_interval_ms = Keyword.fetch!(@rate_limit_config, :clear_interval)
clear_interval = div(clear_interval_ms, 1000)

connection_opts_config =
:api_web
|> Application.fetch_env!(RateLimiter.Memcache)
|> Keyword.fetch!(:connection_opts)

connection_opts = [ttl: clear_interval * 2] ++ connection_opts_config

children = [registry | workers]
workers =
for i <- 1..@worker_count do
Supervisor.child_spec({Memcache, [connection_opts, [name: worker_name(i)]]}, id: i)
end

[registry | workers]
else
[registry]
end

Supervisor.start_link(
children,
Expand All @@ -31,7 +49,13 @@ defmodule ApiWeb.RateLimiter.Memcache.Supervisor do
{:via, Registry, {@registry_name, index}}
end

defp random_child do
defp memcache_required? do
(ApiWeb.RateLimiter.RateLimiterConcurrent.enabled?() and
ApiWeb.RateLimiter.RateLimiterConcurrent.memcache?()) or
ApiWeb.config(:rate_limiter, :limiter) == ApiWeb.RateLimiter.Memcache
end

def random_child do
worker_name(:rand.uniform(@worker_count))
end
end
Loading

0 comments on commit fedc026

Please sign in to comment.