diff --git a/lib/rel/allocation_handler.ex b/lib/rel/allocation_handler.ex index 0d0e297..1ba3452 100644 --- a/lib/rel/allocation_handler.ex +++ b/lib/rel/allocation_handler.ex @@ -14,17 +14,40 @@ defmodule Rel.AllocationHandler do @type five_tuple() :: {:inet.ip_address(), :inet.port_number(), :inet.ip_address(), :inet.port_number(), :udp} + @typedoc """ + Allocation handler init args. + + * `t_id` is the origin allocation request transaction id + * `response` is the origin response for the origin alloaction request + """ + @type alloc_args() :: [ + five_tuple: five_tuple(), + alloc_socket: :gen_udp.socket(), + turn_socket: :gen_udp.socket(), + username: binary(), + time_to_expiry: integer(), + t_id: integer(), + response: binary() + ] + @permission_lifetime Application.compile_env!(:rel, :permission_lifetime) @channel_lifetime Application.compile_env!(:rel, :channel_lifetime) - @spec start_link(term()) :: GenServer.on_start() - def start_link([five_tuple, alloc_socket | _rest] = args) do + @spec start_link(alloc_args()) :: GenServer.on_start() + def start_link(args) do + alloc_socket = Keyword.fetch!(args, :alloc_socket) + five_tuple = Keyword.fetch!(args, :five_tuple) + t_id = Keyword.fetch!(args, :t_id) + response = Keyword.fetch!(args, :response) + {:ok, {_alloc_ip, alloc_port}} = :inet.sockname(alloc_socket) + alloc_origin_state = %{alloc_port: alloc_port, t_id: t_id, response: response} + GenServer.start_link( __MODULE__, args, - name: {:via, Registry, {Registry.Allocations, five_tuple, alloc_port}} + name: {:via, Registry, {Registry.Allocations, five_tuple, alloc_origin_state}} ) end @@ -34,7 +57,13 @@ defmodule Rel.AllocationHandler do end @impl true - def init([five_tuple, socket, turn_socket, username, time_to_expiry]) do + def init(args) do + five_tuple = Keyword.fetch!(args, :five_tuple) + alloc_socket = Keyword.fetch!(args, :alloc_socket) + turn_socket = Keyword.fetch!(args, :turn_socket) + username = Keyword.fetch!(args, :username) + time_to_expiry = Keyword.fetch!(args, :time_to_expiry) + {c_ip, c_port, s_ip, s_port, _transport} = five_tuple alloc_id = "(#{:inet.ntoa(c_ip)}:#{c_port}, #{:inet.ntoa(s_ip)}:#{s_port}, UDP)" Logger.metadata(alloc: alloc_id) @@ -48,7 +77,7 @@ defmodule Rel.AllocationHandler do %{ alloc_id: alloc_id, turn_socket: turn_socket, - socket: socket, + socket: alloc_socket, five_tuple: five_tuple, username: username, expiry_timestamp: System.os_time(:second) + time_to_expiry, diff --git a/lib/rel/listener.ex b/lib/rel/listener.ex index be78fba..841d6fa 100644 --- a/lib/rel/listener.ex +++ b/lib/rel/listener.ex @@ -124,8 +124,13 @@ defmodule Rel.Listener do Logger.info("Received new allocation request") {c_ip, c_port, _, _, _} = five_tuple + handle_error = fn reason, socket, c_ip, c_port, msg -> + {response, log_msg} = Utils.build_error(reason, msg.transaction_id, msg.type.method) + Logger.warning(log_msg) + :ok = :gen_udp.send(socket, c_ip, c_port, response) + end + with {:ok, key} <- Auth.authenticate(msg), - :ok <- is_not_retransmited?(msg, key, []), :ok <- refute_allocation(five_tuple), :ok <- check_requested_transport(msg), :ok <- check_dont_fragment(msg), @@ -169,24 +174,45 @@ defmodule Rel.Listener do {:ok, alloc_pid} = DynamicSupervisor.start_child( Rel.AllocationSupervisor, - {Rel.AllocationHandler, [five_tuple, alloc_socket, socket, username, lifetime]} + {Rel.AllocationHandler, + [ + five_tuple: five_tuple, + alloc_socket: alloc_socket, + turn_socket: socket, + username: username, + time_to_expiry: lifetime, + t_id: msg.transaction_id, + response: response + ]} ) :ok = :gen_udp.controlling_process(alloc_socket, alloc_pid) :ok = :gen_udp.send(socket, c_ip, c_port, response) else + {:error, :allocation_exists, %{t_id: origin_t_id, response: origin_response}} + when origin_t_id == msg.transaction_id -> + Logger.info("Allocation request retransmission") + # Section 6.2 suggests we should adjust LIFETIME attribute + # but this would require asking allocation process for the + # current time-to-expiry or saving additional fields in the + # origin_alloc_state. In most cases, this shouldn't be a problem as + # client is encouraged to refresh its allocation one minute + # before its deadline + :ok = :gen_udp.send(socket, c_ip, c_port, origin_response) + + {:error, :allocation_exists, _alloc_origin_state} -> + handle_error.(:allocation_exists, socket, c_ip, c_port, msg) + {:error, reason} -> - {response, log_msg} = Utils.build_error(reason, msg.transaction_id, msg.type.method) - Logger.warning(log_msg) - :ok = :gen_udp.send(socket, c_ip, c_port, response) + handle_error.(reason, socket, c_ip, c_port, msg) end end defp handle_message(socket, five_tuple, msg) do # TODO: are Registry entries removed fast enough? case fetch_allocation(five_tuple) do - {:ok, alloc} -> + {:ok, alloc, _alloc_origin_state} -> AllocationHandler.process_message(alloc, msg) {:error, :allocation_not_found = reason} -> @@ -209,14 +235,9 @@ defmodule Rel.Listener do end end - defp is_not_retransmited?(_msg, _key, _allocation_requests) do - # TODO: handle retransmitions, RFC 5766 6.2 - :ok - end - defp fetch_allocation(five_tuple) do case Registry.lookup(Registry.Allocations, five_tuple) do - [{allocation, _value}] -> {:ok, allocation} + [{alloc, alloc_origin_state}] -> {:ok, alloc, alloc_origin_state} [] -> {:error, :allocation_not_found} end end @@ -224,7 +245,7 @@ defmodule Rel.Listener do defp refute_allocation(five_tuple) do case fetch_allocation(five_tuple) do {:error, :allocation_not_found} -> :ok - {:ok, _alloc} -> {:error, :allocation_exists} + {:ok, _alloc, alloc_origin_state} -> {:error, :allocation_exists, alloc_origin_state} end end @@ -295,6 +316,7 @@ defmodule Rel.Listener do used_alloc_ports = Registry.Allocations |> Registry.select([{{:_, :_, :"$3"}, [], [:"$3"]}]) + |> Enum.map(fn alloc_origin_state -> Map.fetch!(alloc_origin_state, :alloc_port) end) |> MapSet.new() available_alloc_ports = MapSet.difference(@default_alloc_ports, used_alloc_ports)