Skip to content

Commit

Permalink
Re-arch
Browse files Browse the repository at this point in the history
  • Loading branch information
pojiro committed Nov 4, 2024
1 parent 0f33f30 commit d1894d3
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 123 deletions.
2 changes: 1 addition & 1 deletion lib/modbuzz.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ defmodule Modbuzz do
def start_rtu_client(name, device_name, transport_opts) do
case DynamicSupervisor.start_child(
Modbuzz.Application.client_supervisor_name(),
{Modbuzz.RTU.Client,
{Modbuzz.RTU.ClientSupervisor,
[name: name, device_name: device_name, transport_opts: transport_opts]}
) do
{:ok, _pid} -> :ok
Expand Down
24 changes: 24 additions & 0 deletions lib/modbuzz/pdu.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,32 @@ defmodule Modbuzz.PDU do
def decode_response(<<unquote(modbus_error_code), _rest::binary>> = binary) do
{:error, Modbuzz.PDU.Protocol.decode(%unquote(err_module){}, binary)}
end

# NOTE: Folloing functions are for Modbuzz.RTU.ADU
case modbus_function_code do
mfc when mfc in [0x01, 0x02, 0x03, 0x04] ->
def response_length(<<unquote(mfc), byte_count, _rest::binary>>) do
# {:ok, function_code + byte count itself + byte_count}
{:ok, 1 + 1 + byte_count}
end

mfc when mfc in [0x05, 0x06, 0x0F, 0x10] ->
def response_length(<<unquote(mfc), _rest::binary>>) do
# {:ok, function_code + address + value}
{:ok, 1 + 2 + 2}
end

mfc when mfc in [0x08] ->
def response_length(<<unquote(mfc), _rest::binary>>) do
# {:ok, function_code + sub-function + data}
{:ok, 1 + 2 + 2}
end
end
end

# NOTE: We need this fallback, because the binary is not always guaranteed to be correct.
def response_length(<<_, _rest::binary>>), do: {:error, :unknown}

def to_error(%req{}, exception_code \\ @illegal_data_address) do
Module.split(req)
|> List.replace_at(-1, "Err")
Expand Down
22 changes: 6 additions & 16 deletions lib/modbuzz/rtu/adu.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Modbuzz.RTU.ADU do

@crc_defn :cerlc.init(:crc16_modbus)

@type t :: %__MODULE__{unit_id: 0x00..0xF7, pdu: binary(), crc_valid?: boolean()}
defstruct unit_id: 0x00, pdu: nil, crc_valid?: true

def new(pdu, unit_id) when is_binary(pdu) do
Expand All @@ -17,17 +18,13 @@ defmodule Modbuzz.RTU.ADU do
binary <> crc(binary)
end

def decode_response(binary) when is_binary(binary) do
with <<_unit_id, function_code, _rest::binary>> <- binary,
{:ok, length} <- expected_binary_length(function_code, binary),
true <- byte_size(binary) == length do
<<binary::binary-size(length - 2), crc::binary-size(2)>> = binary

if(crc(binary) == crc) do
<<unit_id, pdu::binary>> = binary
def decode_response(<<unit_id, binary::binary>>) do
with {:ok, pdu_length} <- Modbuzz.PDU.response_length(binary),
<<pdu::binary-size(pdu_length), crc::binary-size(2)>> <- binary do
if(crc(<<unit_id, pdu::binary>>) == crc) do

Check warning on line 24 in lib/modbuzz/rtu/adu.ex

View workflow job for this annotation

GitHub Actions / code_styles

The condition of `if` should not be wrapped in parentheses.
{:ok, %__MODULE__{unit_id: unit_id, pdu: pdu}}
else
{:error, :crc_error}
{:error, %__MODULE__{unit_id: unit_id, pdu: pdu, crc_valid?: false}}
end
else
_ -> {:error, :binary_is_short}
Expand Down Expand Up @@ -63,11 +60,4 @@ defmodule Modbuzz.RTU.ADU do
defp crc(binary) do
<<:cerlc.calc_crc(binary, @crc_defn)::little-size(16)>>
end

defp expected_binary_length(function_code, binary) when function_code in [0x03] do
<<_unit_id, _function_code, byte_count, _rest::binary>> = binary
{:ok, 1 + 1 + 1 + byte_count + 2}
rescue
error -> {:error, error}
end
end
139 changes: 33 additions & 106 deletions lib/modbuzz/rtu/client.ex
Original file line number Diff line number Diff line change
@@ -1,135 +1,62 @@
defmodule Modbuzz.RTU.Client do

Check warning on line 1 in lib/modbuzz/rtu/client.ex

View workflow job for this annotation

GitHub Actions / code_styles

Modules should have a @moduledoc tag.
use GenServer

alias Modbuzz.RTU.Log

alias Modbuzz.RTU.ADU
alias Modbuzz.PDU
alias Modbuzz.RTU.ADU
alias Modbuzz.RTU.Client.Receiver

@spec start_link(keyword()) :: GenServer.on_start()
def start_link(args) when is_list(args) do
name = Keyword.get(args, :name, __MODULE__)
GenServer.start_link(__MODULE__, args, name: name)
end
@server_device_busy 0x06

@spec call(
GenServer.server(),
unit_id :: 0x00..0xFF,
request :: Modbuzz.PDU.Protocol.t(),
timeout()
) :: {:ok, response :: term()} | {:error, reason :: term()}
def call(name \\ __MODULE__, unit_id, request, timeout \\ 5000)
when unit_id in 0x00..0xFF and is_struct(request) and is_integer(timeout) do
GenServer.call(name, {:call, unit_id, request, timeout})
def start_link(args) do
name = Keyword.fetch!(args, :name)
GenServer.start_link(__MODULE__, args, name: name)
end

def init(args) when is_list(args) do
def init(args) do
name = Keyword.fetch!(args, :name)
transport = Keyword.get(args, :transport, Circuits.UART)
transport_opts = Keyword.get(args, :transport_opts, []) ++ [active: false]
transport_opts = Keyword.get(args, :transport_opts, []) ++ [active: true]
device_name = Keyword.fetch!(args, :device_name)

{:ok, pid} = transport.start_link([])
:ok = transport.open(pid, device_name, transport_opts)
receiver = Receiver.name(name)

{:ok, transport_pid} = transport.start_link([])
:ok = transport.open(transport_pid, device_name, transport_opts)
:ok = transport.controlling_process(transport_pid, GenServer.whereis(receiver))

{:ok,
%{
transport: transport,
transport_opts: transport_opts,
device_name: device_name,
pid: pid,
binary: <<>>,
recall: false
transport_pid: transport_pid,
receiver: receiver
}}
end

def handle_call({:call, unit_id, request, timeout}, from, state) do
%{transport: transport, pid: pid} = state

adu = PDU.encode_request!(request) |> ADU.new(unit_id) |> ADU.encode()

case transport.write(pid, adu, timeout) do
:ok ->
{:noreply, state, {:continue, {:read, unit_id, request, timeout, from}}}

{:error, reason} ->
Log.warning(":call write failed, :recall", reason, state)

{:noreply, %{state | recall: true},
{:continue, {:recall, unit_id, request, timeout, from}}}
end
end

def handle_continue({:recall, unit_id, request, timeout, from}, %{recall: true} = state) do
def handle_call({:call, unit_id, request, _timeout}, from, state) do
%{
transport: transport,
transport_opts: transport_opts,
device_name: device_name,
pid: pid
transport_pid: transport_pid,
receiver: receiver
} = state

adu = PDU.encode_request!(request) |> ADU.new(unit_id) |> ADU.encode()

with {:close, :ok} <- {:close, transport.close(pid)},
{:open, :ok} <- {:open, transport.open(pid, device_name, transport_opts)},
{:write, :ok} <- {:write, transport.write(pid, adu, timeout)} do
{:noreply, state, {:continue, {:read, unit_id, request, timeout, from}}}
else
{:close, {:error, reason}} ->
Log.warning(":recall close failed", reason, state)
{:noreply, %{state | recall: false}}

{:open, {:error, reason}} ->
Log.warning(":recall open failed", reason, state)
{:noreply, %{state | recall: false}}

{:write, {:error, reason}} ->
Log.warning(":recall write failed", reason, state)
{:noreply, %{state | recall: false}}
end
end

def handle_continue({:read, unit_id, request, timeout, from}, state) do
%{transport: transport, pid: pid, recall: recall} = state

case transport.read(pid, 100) do
{:ok, <<>>} ->
if recall do
Log.error(":recall no response", nil, state)
{:noreply, %{state | binary: <<>>, recall: false}}
else
Log.warning(":call no response, :recall", nil, state)
adu = PDU.encode_request!(request) |> ADU.new(unit_id)

{:noreply, %{state | binary: <<>>, recall: true},
{:continue, {:recall, unit_id, request, timeout, from}}}
end
if Receiver.busy_with?(receiver, adu) do
err = PDU.to_error(request, @server_device_busy)

{:ok, binary} ->
new_binary = state.binary <> binary

with {:ok, %ADU{unit_id: ^unit_id, pdu: pdu}} <- ADU.decode_response(new_binary) do
res_tuple = PDU.decode_response(pdu)
GenServer.reply(from, res_tuple)
{:noreply, %{state | binary: <<>>, recall: false}}
else
{:error, :binary_is_short} ->
{:noreply, %{state | binary: new_binary},
{:continue, {:read, unit_id, request, timeout, from}}}

{:error, :crc_error} ->
GenServer.reply(from, {:error, :crc_error})
{:noreply, %{state | binary: <<>>, recall: false}}
end
{:reply, {:error, err}, state}
else
to = from

{:error, reason} ->
if recall do
Log.error(":recall read failed", reason, state)
{:noreply, %{state | binary: <<>>, recall: false}}
else
Log.warning(":call read failed, :recall", reason, state)
case Receiver.will_respond(receiver, to, adu) do
:ok ->
binary = ADU.encode(adu)
transport.write(transport_pid, binary)
{:noreply, state}

{:noreply, %{state | binary: <<>>, recall: true},
{:continue, {:recall, unit_id, request, timeout, from}}}
end
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
end
end
114 changes: 114 additions & 0 deletions lib/modbuzz/rtu/client/receiver.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
defmodule Modbuzz.RTU.Client.Receiver do

Check warning on line 1 in lib/modbuzz/rtu/client/receiver.ex

View workflow job for this annotation

GitHub Actions / code_styles

Modules should have a @moduledoc tag.
use GenServer

alias Modbuzz.PDU
alias Modbuzz.RTU.ADU

@server_device_failure 0x04
@server_device_busy 0x06
@unit_id_max 247

defguardp is_valid_unit_id(unit_id) when unit_id >= 0 and unit_id <= @unit_id_max

def name(client_name) do
{:via, Registry, {Modbuzz.Registry, {client_name, __MODULE__}}}
end

def busy_with?(name, adu) when is_struct(adu, ADU) and is_valid_unit_id(adu.unit_id) do
GenServer.call(name, {:busy_with?, adu})
end

def will_respond(name, to, adu) when is_struct(adu, ADU) and is_valid_unit_id(adu.unit_id) do
GenServer.call(name, {:will_respond, to, adu})
end

def start_link(args) do
client_name = Keyword.fetch!(args, :client_name)
GenServer.start_link(__MODULE__, args, name: name(client_name))
end

def init(args) do
timeout = Keyword.get(args, :timeout, 100)

{:ok,
%{
callers: List.duplicate(nil, @unit_id_max + 1),
timeout: timeout,
binary: <<>>
}}
end

def handle_call({:busy_with?, adu}, _from, state) do
%{callers: callers} = state

caller = Enum.fetch!(callers, adu.unit_id)

{:reply, not is_nil(caller), state}
end

def handle_call({:will_respond, to, adu}, _from, state) do
%{callers: callers, timeout: timeout} = state

caller = Enum.fetch!(callers, adu.unit_id)

if is_nil(caller) do
Process.send_after(self(), {:no_response?, adu}, timeout)

callers = List.replace_at(callers, adu.unit_id, to)
{:reply, :ok, %{state | callers: callers}}
else
{:ok, req} = PDU.decode_request(adu.pdu)
err = PDU.to_error(req, @server_device_busy)
GenServer.reply(to, {:error, err})

{:noreply, state}
end
end

def handle_info({:no_response?, adu}, state) do
%{callers: callers} = state

caller = Enum.fetch!(callers, adu.unit_id)

if is_nil(caller) do
# already responded
{:noreply, state}
else
# something wrong, treat as server failure
{:ok, req} = PDU.decode_request(adu.pdu)
err = PDU.to_error(req, @server_device_failure)
GenServer.reply(caller, {:error, err})

callers = List.replace_at(callers, adu.unit_id, nil)
{:noreply, %{state | callers: callers, binary: <<>>}}
end
end

def handle_info({:circuits_uart, _device_name, binary}, state) do
%{callers: callers} = state

new_binary = state.binary <> binary

# NOTE: unit_id: 1, functions_code: 1, crc: 2, so at least 4 bytes
with true <- byte_size(new_binary) > 4 || {:error, :binary_is_short},
{:ok, %ADU{unit_id: unit_id, pdu: pdu}} <- ADU.decode_response(new_binary) do
res_tuple = PDU.decode_response(pdu)

caller = Enum.fetch!(callers, unit_id)
if not is_nil(caller), do: GenServer.reply(caller, res_tuple)

callers = List.replace_at(callers, unit_id, nil)
{:noreply, %{state | callers: callers, binary: <<>>}}
else
{:error, :binary_is_short} ->
{:noreply, %{state | binary: new_binary}}

{:error, %ADU{unit_id: unit_id, pdu: _pdu, crc_valid?: false}} ->
caller = Enum.fetch!(callers, unit_id)
if not is_nil(caller), do: GenServer.reply(caller, {:error, :crc_error})

callers = List.replace_at(callers, unit_id, nil)
{:noreply, %{state | callers: callers, binary: <<>>}}
end
end
end
19 changes: 19 additions & 0 deletions lib/modbuzz/rtu/client_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Modbuzz.RTU.ClientSupervisor do
@moduledoc false

use Supervisor

def start_link(args) do
Supervisor.start_link(__MODULE__, args)
end

@doc false
def init(args) do
children = [
{Modbuzz.RTU.Client.Receiver, [client_name: Keyword.fetch!(args, :name)]},
{Modbuzz.RTU.Client, args}
]

Supervisor.init(children, strategy: :rest_for_one)
end
end

0 comments on commit d1894d3

Please sign in to comment.