Skip to content

Commit

Permalink
fix: handle encoding error (#1225)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco authored Nov 14, 2024
1 parent d3b7746 commit b2803d6
Show file tree
Hide file tree
Showing 27 changed files with 92 additions and 65 deletions.
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Extensions.PostgresCdcRls do

@behaviour Realtime.PostgresCdc
require Logger
import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs

alias RealtimeWeb.Endpoint
alias Extensions.PostgresCdcRls, as: Rls
Expand Down
3 changes: 2 additions & 1 deletion lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do

require Logger

import Realtime.Helpers, only: [cancel_timer: 1, log_error: 2]
import Realtime.Logs
import Realtime.Helpers

alias DBConnection.Backoff

Expand Down
5 changes: 3 additions & 2 deletions lib/extensions/postgres_cdc_rls/subscription_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
"""
use GenServer
require Logger
import Realtime.Logs

alias Extensions.PostgresCdcRls, as: Rls

Expand Down Expand Up @@ -165,7 +166,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
q1

{:error, reason} ->
Helpers.log_error("SubscriptionDeletionFailed", reason)
log_error("SubscriptionDeletionFailed", reason)

q
end
Expand Down Expand Up @@ -204,7 +205,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
end

def handle_info(msg, state) do
Helpers.log_error("UnhandledProcessMessage", msg)
log_error("UnhandledProcessMessage", msg)

{:noreply, state}
end
Expand Down
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
"""
require Logger
import Postgrex, only: [transaction: 2, query: 3, rollback: 2]
import Realtime.Helpers, only: [to_log: 1, log_error: 2]
import Realtime.Logs

@type conn() :: Postgrex.conn()

Expand Down
6 changes: 3 additions & 3 deletions lib/extensions/postgres_cdc_rls/subscriptions_checker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
@moduledoc false
use GenServer
require Logger

import Realtime.Logs
alias Extensions.PostgresCdcRls, as: Rls

alias Realtime.Database
Expand Down Expand Up @@ -107,7 +107,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
q1

{:error, reason} ->
Helpers.log_error("UnableToDeletePhantomSubscriptions", reason)
log_error("UnableToDeletePhantomSubscriptions", reason)

q
end
Expand Down Expand Up @@ -177,7 +177,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
else
case Rpc.call(node, __MODULE__, :not_alive_pids, [pids], timeout: 15_000) do
{:badrpc, _} = error ->
Helpers.log_error("UnableToCheckProcessesOnRemoteNode", error)
log_error("UnableToCheckProcessesOnRemoteNode", error)
acc

pids ->
Expand Down
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_stream/cdc_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Extensions.PostgresCdcStream do
@behaviour Realtime.PostgresCdc

require Logger
import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs
alias Extensions.PostgresCdcStream, as: Stream
alias Realtime.Rpc

Expand Down
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_stream/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Extensions.PostgresCdcStream.Replication do
use Postgrex.ReplicationConnection
require Logger

import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs
import Realtime.Adapters.Postgres.Protocol

alias Extensions.PostgresCdcStream
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/broadcast_changes/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Realtime.BroadcastChanges.Handler do

import Realtime.Adapters.Postgres.Protocol
import Realtime.Adapters.Postgres.Decoder
import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs

alias Realtime.Adapters.Postgres.Decoder
alias Realtime.Adapters.Postgres.Protocol.KeepAlive
Expand Down
5 changes: 2 additions & 3 deletions lib/realtime/database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ defmodule Realtime.Database do
@moduledoc """
Handles tenant database operations
"""
import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs

alias Realtime.Api.Tenant
alias Realtime.Crypto
alias Realtime.Helpers
alias Realtime.PostgresCdc
alias Realtime.Rpc

Expand Down Expand Up @@ -138,7 +137,7 @@ defmodule Realtime.Database do

{:error, e} ->
Process.exit(conn, :kill)
Helpers.log_error("UnableToConnectToTenantDatabase", e)
log_error("UnableToConnectToTenantDatabase", e)
{:error, e}
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/gen_counter/gen_counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Realtime.GenCounter do
"""
use GenServer
require Logger
import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs
alias Realtime.GenCounter

defstruct id: nil, counters: []
Expand Down
14 changes: 0 additions & 14 deletions lib/realtime/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,4 @@ defmodule Realtime.Helpers do
end
end)
end

@doc """
Prepares a value to be logged
"""
def to_log(value) when is_binary(value), do: value
def to_log(value), do: inspect(value, pretty: true)

@doc """
Logs error with a given Operational Code
"""
@spec log_error(String.t(), any(), keyword()) :: :ok
def log_error(code, error, metadata \\ []) do
Logger.error("#{code}: #{to_log(error)}", [error_code: code] ++ metadata)
end
end
41 changes: 41 additions & 0 deletions lib/realtime/logs.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule Realtime.Logs do
@moduledoc """
Logging operations for Realtime
"""
require Logger

@doc """
Prepares a value to be logged
"""
def to_log(value) when is_binary(value), do: value
def to_log(value), do: inspect(value, pretty: true)

@doc """
Logs error with a given Operational Code
"""
@spec log_error(String.t(), any(), keyword()) :: :ok
def log_error(code, error, metadata \\ []) do
Logger.error("#{code}: #{to_log(error)}", [error_code: code] ++ metadata)
end
end

defimpl Jason.Encoder, for: DBConnection.ConnectionError do
def encode(
%DBConnection.ConnectionError{message: message, reason: reason, severity: severity},
_opts
) do
inspect(%{message: message, reason: reason, severity: severity}, pretty: true)
end
end

defimpl Jason.Encoder, for: Postgrex.Error do
def encode(
%Postgrex.Error{
message: message,
postgres: %{code: code, schema: schema, table: table}
},
_opts
) do
inspect(%{message: message, schema: schema, table: table, code: code}, pretty: true)
end
end
5 changes: 2 additions & 3 deletions lib/realtime/monitoring/latency.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ defmodule Realtime.Latency do
"""

use GenServer

require Logger
import Realtime.Logs

alias Realtime.Helpers
alias Realtime.Nodes
alias Realtime.Rpc

Expand Down Expand Up @@ -107,7 +106,7 @@ defmodule Realtime.Latency do

case response do
{:badrpc, reason} ->
Helpers.log_error(
log_error(
"RealtimeNodeDisconnected",
"Unable to connect to #{short_name} from #{region}: #{reason}"
)
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Realtime.Repo do
adapter: Ecto.Adapters.Postgres

import Ecto.Query
import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs

def with_dynamic_repo(config, callback) do
default_dynamic_repo = get_dynamic_repo()
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Realtime.Rpc do
RPC module for Realtime with the intent of standardizing the RPC interface and collect telemetry
"""
alias Realtime.Telemetry
import Realtime.Helpers
import Realtime.Logs

@doc """
Calls external node using :rpc.call/5 and collects telemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Realtime.Tenants.Authorization.Policies.BroadcastPolicies do
"""
require Logger
import Ecto.Query
import Realtime.Helpers, only: [log_error: 2, to_log: 1]
import Realtime.Logs

alias Realtime.Api.Message
alias Realtime.Repo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Realtime.Tenants.Authorization.Policies.PresencePolicies do
"""
require Logger
import Ecto.Query
import Realtime.Helpers, only: [to_log: 1, log_error: 2]
import Realtime.Logs

alias Realtime.Api.Message
alias Realtime.Repo
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Realtime.Tenants.Connect do

require Logger

import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs

alias Realtime.Api.Tenant
alias Realtime.BroadcastChanges.Handler
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/tenants/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Realtime.Tenants.Janitor do
use GenServer
require Logger

import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs

alias Realtime.Api.Tenant
alias Realtime.Database
Expand Down
4 changes: 2 additions & 2 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Realtime.Tenants.Migrations do

require Logger

import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs

alias Realtime.Crypto
alias Realtime.Database
Expand Down Expand Up @@ -203,7 +203,7 @@ defmodule Realtime.Tenants.Migrations do
If not all migrations have been run, it will run the missing migrations.
"""
@spec maybe_run_migrations(pid(), Tenant.t()) :: :ok
def maybe_run_migrations(db_conn, tenant) do
def maybe_run_migrations(_db_conn, tenant) do
# Logger.metadata(external_id: tenant.external_id, project: tenant.external_id)

# check_migrations_exist_query =
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime_web/channels/auth/channels_authorization.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule RealtimeWeb.ChannelsAuthorization do
Check connection is authorized to access channel
"""
require Logger
import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs

def authorize(token, jwt_secret, jwt_jwks) when is_binary(token) do
token
Expand Down
9 changes: 5 additions & 4 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule RealtimeWeb.RealtimeChannel do
"""
use RealtimeWeb, :channel
require Logger
import Realtime.Logs

alias DBConnection.Backoff

Expand Down Expand Up @@ -240,7 +241,7 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, assign(socket, :pg_sub_ref, nil)}

error ->
Helpers.log_error("UnableToSubscribeToPostgres", error)
log_error("UnableToSubscribeToPostgres", error)
push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end
Expand All @@ -267,13 +268,13 @@ defmodule RealtimeWeb.RealtimeChannel do
shutdown_response(socket, "Fields `role` and `exp` are required in JWT")

{:error, error} ->
message = "Access token has expired: " <> Helpers.to_log(error)
message = "Access token has expired: " <> to_log(error)
shutdown_response(socket, message)
end
end

def handle_info(msg, socket) do
Helpers.log_error("UnhandledSystemMessage", msg)
log_error("UnhandledSystemMessage", msg)
{:noreply, socket}
end

Expand Down Expand Up @@ -522,7 +523,7 @@ defmodule RealtimeWeb.RealtimeChannel do

defp shutdown_response(%{assigns: %{channel_name: channel_name}} = socket, message)
when is_binary(message) do
Helpers.log_error("ChannelShutdown", message)
log_error("ChannelShutdown", message)
push_system_message("system", socket, "error", message, channel_name)
{:stop, :shutdown, socket}
end
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime_web/channels/realtime_channel/logging.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
Log functions for Realtime channels to ensure
"""
require Logger
import Realtime.Helpers, only: [log_error: 2, to_log: 1]
import Realtime.Logs

@doc """
Logs messages according to user options given on config
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime_web/channels/user_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule RealtimeWeb.UserSocket do

require Logger

import Realtime.Helpers, only: [log_error: 2]
import Realtime.Logs

alias Realtime.Api.Tenant
alias Realtime.Crypto
Expand Down
Loading

0 comments on commit b2803d6

Please sign in to comment.