Skip to content

Commit

Permalink
Handle allocation request retransmissions
Browse files Browse the repository at this point in the history
  • Loading branch information
mickel8 committed Aug 19, 2023
1 parent c413d53 commit f7b6fd4
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 17 deletions.
37 changes: 33 additions & 4 deletions lib/rel/allocation_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,40 @@ defmodule Rel.AllocationHandler do
@type five_tuple() ::
{:inet.ip_address(), :inet.port_number(), :inet.ip_address(), :inet.port_number(), :udp}

@typedoc """
Allocation handler init args.
* `t_id` is the origin allocation request transaction id
* `response` is the origin response for the origin alloaction request
"""
@type alloc_args() :: [
five_tuple: five_tuple(),
alloc_socket: :gen_udp.socket(),
turn_socket: :gen_udp.socket(),
username: binary(),
time_to_expiry: integer(),
t_id: integer(),
response: binary()
]

@permission_lifetime Application.compile_env!(:rel, :permission_lifetime)
@channel_lifetime Application.compile_env!(:rel, :channel_lifetime)

@spec start_link(term()) :: GenServer.on_start()
def start_link([five_tuple, alloc_socket | _rest] = args) do
@spec start_link(alloc_args()) :: GenServer.on_start()
def start_link(args) do
alloc_socket = Keyword.fetch!(args, :alloc_socket)
five_tuple = Keyword.fetch!(args, :five_tuple)
t_id = Keyword.fetch!(args, :t_id)
response = Keyword.fetch!(args, :response)

{:ok, {_alloc_ip, alloc_port}} = :inet.sockname(alloc_socket)

alloc_origin_state = %{alloc_port: alloc_port, t_id: t_id, response: response}

GenServer.start_link(
__MODULE__,
args,
name: {:via, Registry, {Registry.Allocations, five_tuple, alloc_port}}
name: {:via, Registry, {Registry.Allocations, five_tuple, alloc_origin_state}}
)
end

Expand All @@ -34,7 +57,13 @@ defmodule Rel.AllocationHandler do
end

@impl true
def init([five_tuple, socket, turn_socket, username, time_to_expiry]) do
def init(
five_tuple: five_tuple,
socket: socket,
turn_socket: turn_socket,
username: username,
time_to_expiry: time_to_expiry
) do
{c_ip, c_port, s_ip, s_port, _transport} = five_tuple
alloc_id = "(#{:inet.ntoa(c_ip)}:#{c_port}, #{:inet.ntoa(s_ip)}:#{s_port}, UDP)"
Logger.metadata(alloc: alloc_id)
Expand Down
40 changes: 27 additions & 13 deletions lib/rel/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,13 @@ defmodule Rel.Listener do
Logger.info("Received new allocation request")
{c_ip, c_port, _, _, _} = five_tuple

handle_error = fn reason, socket, c_ip, c_port, msg ->
{response, log_msg} = Utils.build_error(reason, msg.transaction_id, msg.type.method)
Logger.warning(log_msg)
:ok = :gen_udp.send(socket, c_ip, c_port, response)
end

with {:ok, key} <- Auth.authenticate(msg),
:ok <- is_not_retransmited?(msg, key, []),
:ok <- refute_allocation(five_tuple),
:ok <- check_requested_transport(msg),
:ok <- check_dont_fragment(msg),
Expand Down Expand Up @@ -169,24 +174,38 @@ defmodule Rel.Listener do
{:ok, alloc_pid} =
DynamicSupervisor.start_child(
Rel.AllocationSupervisor,
{Rel.AllocationHandler, [five_tuple, alloc_socket, socket, username, lifetime]}
{Rel.AllocationHandler,
[
five_tuple: five_tuple,
alloc_socket: alloc_socket,
turn_socket: socket,
username: username,
time_to_expiry: lifetime,
t_id: msg.transaction_id,
response: response
]}
)

:ok = :gen_udp.controlling_process(alloc_socket, alloc_pid)

:ok = :gen_udp.send(socket, c_ip, c_port, response)
else
{:error, :allocation_exists, %{t_id: origin_t_id, response: origin_response}}
when origin_t_id == msg.transaction_id ->
:ok = :gen_udp.send(socket, c_ip, c_port, origin_response)

{:error, :allocation_exists, _alloc_origin_state} ->
handle_error.(:allocation_exists, socket, c_ip, c_port, msg)

{:error, reason} ->
{response, log_msg} = Utils.build_error(reason, msg.transaction_id, msg.type.method)
Logger.warning(log_msg)
:ok = :gen_udp.send(socket, c_ip, c_port, response)
handle_error.(reason, socket, c_ip, c_port, msg)
end
end

defp handle_message(socket, five_tuple, msg) do
# TODO: are Registry entries removed fast enough?
case fetch_allocation(five_tuple) do
{:ok, alloc} ->
{:ok, alloc, _alloc_origin_state} ->
AllocationHandler.process_message(alloc, msg)

{:error, :allocation_not_found = reason} ->
Expand All @@ -209,22 +228,17 @@ defmodule Rel.Listener do
end
end

defp is_not_retransmited?(_msg, _key, _allocation_requests) do
# TODO: handle retransmitions, RFC 5766 6.2
:ok
end

defp fetch_allocation(five_tuple) do
case Registry.lookup(Registry.Allocations, five_tuple) do
[{allocation, _value}] -> {:ok, allocation}
[{alloc, alloc_origin_state}] -> {:ok, alloc, alloc_origin_state}
[] -> {:error, :allocation_not_found}
end
end

defp refute_allocation(five_tuple) do
case fetch_allocation(five_tuple) do
{:error, :allocation_not_found} -> :ok
{:ok, _alloc} -> {:error, :allocation_exists}
{:ok, _alloc, alloc_origin_state} -> {:error, :allocation_exists, alloc_origin_state}
end
end

Expand Down

0 comments on commit f7b6fd4

Please sign in to comment.