Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle allocation request retransmissions #13

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions lib/rel/allocation_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,40 @@ defmodule Rel.AllocationHandler do
@type five_tuple() ::
{:inet.ip_address(), :inet.port_number(), :inet.ip_address(), :inet.port_number(), :udp}

@typedoc """
Allocation handler init args.

* `t_id` is the origin allocation request transaction id
* `response` is the origin response for the origin alloaction request
"""
@type alloc_args() :: [
five_tuple: five_tuple(),
alloc_socket: :gen_udp.socket(),
turn_socket: :gen_udp.socket(),
username: binary(),
time_to_expiry: integer(),
t_id: integer(),
response: binary()
]

@permission_lifetime Application.compile_env!(:rel, :permission_lifetime)
@channel_lifetime Application.compile_env!(:rel, :channel_lifetime)

@spec start_link(term()) :: GenServer.on_start()
def start_link([five_tuple, alloc_socket | _rest] = args) do
@spec start_link(alloc_args()) :: GenServer.on_start()
def start_link(args) do
alloc_socket = Keyword.fetch!(args, :alloc_socket)
five_tuple = Keyword.fetch!(args, :five_tuple)
t_id = Keyword.fetch!(args, :t_id)
response = Keyword.fetch!(args, :response)

{:ok, {_alloc_ip, alloc_port}} = :inet.sockname(alloc_socket)

alloc_origin_state = %{alloc_port: alloc_port, t_id: t_id, response: response}

GenServer.start_link(
__MODULE__,
args,
name: {:via, Registry, {Registry.Allocations, five_tuple, alloc_port}}
name: {:via, Registry, {Registry.Allocations, five_tuple, alloc_origin_state}}
)
end

Expand All @@ -34,7 +57,13 @@ defmodule Rel.AllocationHandler do
end

@impl true
def init([five_tuple, socket, turn_socket, username, time_to_expiry]) do
def init(args) do
five_tuple = Keyword.fetch!(args, :five_tuple)
alloc_socket = Keyword.fetch!(args, :alloc_socket)
turn_socket = Keyword.fetch!(args, :turn_socket)
username = Keyword.fetch!(args, :username)
time_to_expiry = Keyword.fetch!(args, :time_to_expiry)

{c_ip, c_port, s_ip, s_port, _transport} = five_tuple
alloc_id = "(#{:inet.ntoa(c_ip)}:#{c_port}, #{:inet.ntoa(s_ip)}:#{s_port}, UDP)"
Logger.metadata(alloc: alloc_id)
Expand All @@ -48,7 +77,7 @@ defmodule Rel.AllocationHandler do
%{
alloc_id: alloc_id,
turn_socket: turn_socket,
socket: socket,
socket: alloc_socket,
five_tuple: five_tuple,
username: username,
expiry_timestamp: System.os_time(:second) + time_to_expiry,
Expand Down
42 changes: 29 additions & 13 deletions lib/rel/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,13 @@ defmodule Rel.Listener do
Logger.info("Received new allocation request")
{c_ip, c_port, _, _, _} = five_tuple

handle_error = fn reason, socket, c_ip, c_port, msg ->
{response, log_msg} = Utils.build_error(reason, msg.transaction_id, msg.type.method)
Logger.warning(log_msg)
:ok = :gen_udp.send(socket, c_ip, c_port, response)
end

with {:ok, key} <- Auth.authenticate(msg),
:ok <- is_not_retransmited?(msg, key, []),
:ok <- refute_allocation(five_tuple),
:ok <- check_requested_transport(msg),
:ok <- check_dont_fragment(msg),
Expand Down Expand Up @@ -169,24 +174,39 @@ defmodule Rel.Listener do
{:ok, alloc_pid} =
DynamicSupervisor.start_child(
Rel.AllocationSupervisor,
{Rel.AllocationHandler, [five_tuple, alloc_socket, socket, username, lifetime]}
{Rel.AllocationHandler,
[
five_tuple: five_tuple,
alloc_socket: alloc_socket,
turn_socket: socket,
username: username,
time_to_expiry: lifetime,
t_id: msg.transaction_id,
response: response
]}
)

:ok = :gen_udp.controlling_process(alloc_socket, alloc_pid)

:ok = :gen_udp.send(socket, c_ip, c_port, response)
else
{:error, :allocation_exists, %{t_id: origin_t_id, response: origin_response}}
when origin_t_id == msg.transaction_id ->
Logger.info("Allocation request retransmission")
:ok = :gen_udp.send(socket, c_ip, c_port, origin_response)

{:error, :allocation_exists, _alloc_origin_state} ->
handle_error.(:allocation_exists, socket, c_ip, c_port, msg)

{:error, reason} ->
{response, log_msg} = Utils.build_error(reason, msg.transaction_id, msg.type.method)
Logger.warning(log_msg)
:ok = :gen_udp.send(socket, c_ip, c_port, response)
handle_error.(reason, socket, c_ip, c_port, msg)
end
end

defp handle_message(socket, five_tuple, msg) do
# TODO: are Registry entries removed fast enough?
case fetch_allocation(five_tuple) do
{:ok, alloc} ->
{:ok, alloc, _alloc_origin_state} ->
AllocationHandler.process_message(alloc, msg)

{:error, :allocation_not_found = reason} ->
Expand All @@ -209,22 +229,17 @@ defmodule Rel.Listener do
end
end

defp is_not_retransmited?(_msg, _key, _allocation_requests) do
# TODO: handle retransmitions, RFC 5766 6.2
:ok
end

defp fetch_allocation(five_tuple) do
case Registry.lookup(Registry.Allocations, five_tuple) do
[{allocation, _value}] -> {:ok, allocation}
[{alloc, alloc_origin_state}] -> {:ok, alloc, alloc_origin_state}
[] -> {:error, :allocation_not_found}
end
end

defp refute_allocation(five_tuple) do
case fetch_allocation(five_tuple) do
{:error, :allocation_not_found} -> :ok
{:ok, _alloc} -> {:error, :allocation_exists}
{:ok, _alloc, alloc_origin_state} -> {:error, :allocation_exists, alloc_origin_state}
end
end

Expand Down Expand Up @@ -295,6 +310,7 @@ defmodule Rel.Listener do
used_alloc_ports =
Registry.Allocations
|> Registry.select([{{:_, :_, :"$3"}, [], [:"$3"]}])
|> Enum.map(fn alloc_origin_state -> Map.fetch!(alloc_origin_state, :alloc_port) end)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this one. On the one hand, we do this only when creating a new allocation. On the other hand, I wonder about getting rid of registry in favor of simple map stored by the listener process 🤔

|> MapSet.new()

available_alloc_ports = MapSet.difference(@default_alloc_ports, used_alloc_ports)
Expand Down
Loading