Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
53 changes: 28 additions & 25 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,31 @@ jobs:
image: hexpm/elixir:${{ matrix.image }}

steps:
- name: Checkout
uses: actions/checkout@v5

- name: Hex and Rebar setup
run: |
mix local.hex --force
mix local.rebar --force

- name: Restore deps and _build cache
uses: actions/cache@v4
with:
path: |
deps
_build
key: deps-${{ runner.os }}-${{ matrix.image }}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
deps-${{ runner.os }}-${{ matrix.image }}

- name: Install dependencies
run: mix deps.get --only test

- name: Run tests
run: |
epmd -daemon
mix test
- name: Checkout
uses: actions/checkout@v5

- name: Hex and Rebar setup
run: |
mix local.hex --force
mix local.rebar --force

- name: Restore deps and _build cache
uses: actions/cache@v4
with:
path: |
deps
_build
key: deps-${{ runner.os }}-${{ matrix.image }}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
deps-${{ runner.os }}-${{ matrix.image }}

- name: Install dependencies
run: mix deps.get --only test

- name: Check formatted
run: mix format --check-formatted

- name: Run tests
run: |
epmd -daemon
mix test
2 changes: 1 addition & 1 deletion lib/phoenix/pubsub/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ defmodule Phoenix.PubSub.Application do
[]
end
end
end
end
9 changes: 6 additions & 3 deletions lib/phoenix/pubsub/pg2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,19 @@ defmodule Phoenix.PubSub.PG2 do
broadcast_pool_size = Keyword.get(opts, :broadcast_pool_size, pool_size)

if pool_size < broadcast_pool_size do
{:error, "the :pool_size option must be greater than or equal to the :broadcast_pool_size option"}
{:error,
"the :pool_size option must be greater than or equal to the :broadcast_pool_size option"}
else
adapter_name = Keyword.fetch!(opts, :adapter_name)
Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size, broadcast_pool_size}, name: :"#{adapter_name}_supervisor")

Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size, broadcast_pool_size},
name: :"#{adapter_name}_supervisor"
)
end
end

@impl true
def init({name, adapter_name, pool_size, broadcast_pool_size}) do

listener_groups = groups(adapter_name, pool_size)
broadcast_groups = groups(adapter_name, broadcast_pool_size)

Expand Down
12 changes: 8 additions & 4 deletions lib/phoenix/tracker/clock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ defmodule Phoenix.Tracker.Clock do
@moduledoc false
alias Phoenix.Tracker.State

@type context :: State.context
@type clock :: {State.name, context}
@type context :: State.context()
@type clock :: {State.name(), context}

@doc """
Returns a list of replicas from a list of contexts.
"""
@spec clockset_replicas([clock]) :: [State.name]
@spec clockset_replicas([clock]) :: [State.name()]
def clockset_replicas(clockset) do
for {replica, _} <- clockset, do: replica
end
Expand All @@ -18,8 +18,10 @@ defmodule Phoenix.Tracker.Clock do
"""
@spec append_clock([clock], clock) :: [clock]
def append_clock(clockset, {_, clock}) when map_size(clock) == 0, do: clockset

def append_clock(clockset, {node, clock}) do
big_clock = combine_clocks(clockset)

cond do
dominates?(clock, big_clock) -> [{node, clock}]
dominates?(big_clock, clock) -> clockset
Expand All @@ -32,6 +34,7 @@ defmodule Phoenix.Tracker.Clock do
"""
@spec dominates?(context, context) :: boolean
def dominates?(c1, c2) when map_size(c1) < map_size(c2), do: false

def dominates?(c1, c2) do
Enum.reduce_while(c2, true, fn {replica, clock}, true ->
if Map.get(c1, replica, 0) >= clock do
Expand All @@ -47,6 +50,7 @@ defmodule Phoenix.Tracker.Clock do
"""
def dominates_or_equal?(c1, c2) when c1 == %{} and c2 == %{}, do: true
def dominates_or_equal?(c1, _c2) when c1 == %{}, do: false

def dominates_or_equal?(c1, c2) do
Enum.reduce_while(c1, true, fn {replica, clock}, true ->
if clock >= Map.get(c2, replica, 0) do
Expand Down Expand Up @@ -87,7 +91,7 @@ defmodule Phoenix.Tracker.Clock do
if dominates?(clock, clock2) do
{set, true}
else
{[{node2, clock2}| set], insert || !dominates?(clock2, clock)}
{[{node2, clock2} | set], insert || !dominates?(clock2, clock)}
end
end)
|> case do
Expand Down
17 changes: 12 additions & 5 deletions lib/phoenix/tracker/delta_generation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,35 @@ defmodule Phoenix.Tracker.DeltaGeneration do

Falls back to extracting entire crdt if unable to match delta.
"""
@spec extract(State.t, [State.delta], State.name, State.context) :: State.delta | State.t
@spec extract(State.t(), [State.delta()], State.name(), State.context()) ::
State.delta() | State.t()
def extract(%State{mode: :normal} = state, generations, remote_ref, remote_context) do
case delta_fulfilling_clock(generations, remote_context) do
{delta, index} ->
if index, do: Logger.debug "#{inspect state.replica}: sending delta generation #{index + 1}"
if index,
do: Logger.debug("#{inspect(state.replica)}: sending delta generation #{index + 1}")

State.extract(delta, remote_ref, remote_context)

nil ->
Logger.debug "#{inspect state.replica}: falling back to sending entire crdt"
Logger.debug("#{inspect(state.replica)}: falling back to sending entire crdt")
State.extract(state, remote_ref, remote_context)
end
end

@spec push(State.t, [State.delta], State.delta, [pos_integer]) :: [State.delta]
@spec push(State.t(), [State.delta()], State.delta(), [pos_integer]) :: [State.delta()]
def push(%State{mode: :normal} = parent, [] = _generations, %State{mode: :delta} = delta, opts) do
parent.delta
|> List.duplicate(Enum.count(opts))
|> do_push(delta, opts, {delta, []})
end

def push(%State{mode: :normal} = _parent, generations, %State{mode: :delta} = delta, opts) do
do_push(generations, delta, opts, {delta, []})
end

defp do_push([], _delta, [], {_prev, acc}), do: Enum.reverse(acc)

defp do_push([gen | generations], delta, [gen_max | opts], {prev, acc}) do
case State.merge_deltas(gen, delta) do
{:ok, merged} ->
Expand All @@ -47,7 +54,7 @@ defmodule Phoenix.Tracker.DeltaGeneration do
@doc """
Prunes permanently downed replicates from the delta generation list
"""
@spec remove_down_replicas([State.delta], Replica.replica_ref) :: [State.delta]
@spec remove_down_replicas([State.delta()], Replica.replica_ref()) :: [State.delta()]
def remove_down_replicas(generations, replica_ref) do
Enum.map(generations, fn %State{mode: :delta} = gen ->
State.remove_down_replicas(gen, replica_ref)
Expand Down
Loading