Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

This new version of Phoenix.PubSub provides a simpler, more extensible, and more performant Phoenix.PubSub API. For users of Phoenix.PubSub, the API is the same, although frameworks and other adapters will have to migrate accordingly (which often means less code).

## 2.1.4 (2024-09-27)
## 2.1.4 (unreleased)

### Enhancements
- Add `:permdown_on_shutdown` option.
- Add `Phoenix.PubSub.subscribe_once/3`.

## 2.1.3 (2023-06-14)

Expand Down
18 changes: 18 additions & 0 deletions lib/phoenix/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ defmodule Phoenix.PubSub do
`Phoenix.PubSub.unsubscribe/2`, all duplicate subscriptions
will be dropped.

See `subscribe_once/3` to avoid duplicate subscriptions.

## Options

* `:metadata` - provides metadata to be attached to this
Expand All @@ -208,6 +210,22 @@ defmodule Phoenix.PubSub do
end
end

@doc """
Subscribes the caller to the PubSub adapter's topic like `subscribe/3` unless
the calling process is already subscribed, in which case it returns `{:error,
:already_subscribed}`.
"""
@spec subscribe_once(t, topic, keyword) :: :ok | {:error, term}
def subscribe_once(pubsub, topic, opts \\ [])
when is_atom(pubsub) and is_binary(topic) and is_list(opts) do
subscriptions = Registry.lookup(pubsub, topic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I believe there is a race condition here. In betwen the Registry lookup and the else path, another process could have subscribed.

The only way this could work is by doing an insert_new operation to :ets

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you can only subscribe yourself, it cannot be a race condition. However, because this is a duplicate registry, there is basicaly no cheap way to check if you are already subscribed. If you have a topic with 100k entries, then this function will be very expensive because you are bringing a 100k elements list to memory, which is why we don't provide this functionality, it is basically an anti-pattern. So it is more scalable to guarantee code wise you won't subscribe multiple times. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a topic with 100k entries

Meaning 100k subscribers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given what you said, I'd like to do a followup docs PR to explain why the caller should guard against multiple subscriptions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be lovely!!! ❤️

if Enum.any?(subscriptions, fn {pid, _} -> pid == self() end) do
{:error, :already_subscribed}
else
subscribe(pubsub, topic, opts)
end
end

@doc """
Unsubscribes the caller from the PubSub adapter's topic.
"""
Expand Down
21 changes: 21 additions & 0 deletions test/shared/pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,27 @@ defmodule Phoenix.PubSubTest do
assert_receive {:custom, nil, :none, :direct}
assert_receive {:custom, :special, :none, :direct}
end

@tag pool_size: size
test "pool #{size}: subscribe_once/2 prevents duplicate subscriptions",
config do
# Subscribe twice
PubSub.subscribe(config.pubsub, config.topic)
PubSub.subscribe(config.pubsub, config.topic)
:ok = PubSub.broadcast(config.pubsub, config.topic, :ping)
{:messages, messages} = Process.info(self(), :messages)
# duplicate message
assert messages == [:ping, :ping]
PubSub.unsubscribe(config.pubsub, config.topic)

# Duplicate calls do nothing
:ok = PubSub.subscribe_once(config.pubsub, config.topic)
{:error, :already_subscribed} = PubSub.subscribe_once(config.pubsub, config.topic)
:ok = PubSub.broadcast(config.pubsub, config.topic, :pong)
{:messages, messages} = Process.info(self(), :messages)
# no duplicate message
assert messages == [:ping, :ping, :pong]
end
end

@tag pool_size: 4
Expand Down