Skip to content

Commit

Permalink
Listen in active mode on listener socket
Browse files Browse the repository at this point in the history
  • Loading branch information
mickel8 committed Aug 19, 2023
1 parent a31e0d9 commit 0c3180a
Showing 1 changed file with 11 additions and 15 deletions.
26 changes: 11 additions & 15 deletions lib/rel/listener.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Rel.Listener do
@moduledoc false
use Task, restart: :permanent
use GenServer, restart: :permanent

require Logger

Expand All @@ -25,11 +25,11 @@ defmodule Rel.Listener do

@spec start_link(term()) :: {:ok, pid()}
def start_link(args) do
Task.start_link(__MODULE__, :listen, args)
GenServer.start_link(__MODULE__, args)
end

@spec listen(:inet.ip_address(), :inet.port_number()) :: :ok
def listen(ip, port) do
@impl true
def init([ip, port]) do
listener_addr = "#{:inet.ntoa(ip)}:#{port}/UDP"

Logger.info("Starting a new listener on: #{listener_addr}")
Expand All @@ -40,28 +40,24 @@ defmodule Rel.Listener do
port,
[
{:ifaddr, ip},
{:active, false},
{:active, true},
{:recbuf, 1024 * 1024},
:binary
]
)

spawn(Rel.Monitor, :start, [self(), socket])

recv_loop(socket)
{:ok, %{socket: socket}}
end

defp recv_loop(socket) do
case :gen_udp.recv(socket, 0) do
{:ok, {client_addr, client_port, packet}} ->
:telemetry.execute([:listener, :client], %{inbound: byte_size(packet)})
@impl true
def handle_info({:udp, socket, client_addr, client_port, packet}, %{socket: socket} = state) do
:telemetry.execute([:listener, :client], %{inbound: byte_size(packet)})

process(socket, client_addr, client_port, packet)
recv_loop(socket)
process(socket, client_addr, client_port, packet)

{:error, reason} ->
Logger.error("Couldn't receive from the socket, reason: #{inspect(reason)}")
end
{:noreply, state}
end

defp process(socket, client_ip, client_port, <<two_bits::2, _rest::bitstring>> = packet) do
Expand Down

0 comments on commit 0c3180a

Please sign in to comment.