Skip to content

Ensure unsubscribing only removes a single subscription #1315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,6 @@ defmodule Absinthe.Subscription do
def unsubscribe(pubsub, doc_id) do
registry = pubsub |> registry_name

for field_key <- pdict_fields(doc_id) do
Registry.unregister(registry, field_key)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still want to clean up the record in the registry, but this only removes the entry associated with the doc_id and not all subscriptions with the same field_key

Suggested change
for field_key <- pdict_fields(doc_id) do
Registry.unregister(registry, field_key)
end
for field_key <- pdict_fields(doc_id) do
Registry.unregister_match(registry, field_key, doc_id)
end


Registry.unregister(registry, doc_id)

pdict_delete_fields(doc_id)
Expand Down
140 changes: 140 additions & 0 deletions test/absinthe/execution/subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ defmodule Absinthe.Execution.SubscriptionTest do
# this pubsub is local and doesn't support clusters
:ok
end

def list_registry_keys() do
Registry.keys(__MODULE__, self())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the registry name that is used to store the subscription entries internally

Suggested change
Registry.keys(__MODULE__, self())
Registry.keys(__MODULE__.Registry, self())

end
end

defmodule PubSubWithDocsetRunner do
Expand Down Expand Up @@ -122,6 +126,7 @@ defmodule Absinthe.Execution.SubscriptionTest do

defmodule Schema do
use Absinthe.Schema
import_types Absinthe.Type.Custom

query do
field :foo, :string
Expand Down Expand Up @@ -190,6 +195,19 @@ defmodule Absinthe.Execution.SubscriptionTest do
end
end

field :schedule, :string do
arg :location_id, non_null(:id)
arg :date, :date

config fn
args, _ ->
{
:ok,
topic: args.location_id
}
end
end

field :multiple_topics, :string do
config fn _, _ ->
{:ok, topic: ["topic_1", "topic_2", "topic_3"]}
Expand Down Expand Up @@ -400,6 +418,128 @@ defmodule Absinthe.Execution.SubscriptionTest do
)
end

@query """
subscription ($locationId: ID!, $date: Date) {
schedule(locationId: $locationId, date: $date)
}
"""
test "subscribing twice and unsubscribing once keeps one subscription active" do
location_id = "12"
date1 = "2020-01-01"
date2 = "2020-01-02"

assert {:ok, %{"subscribed" => topic1}} =
run_subscription(
@query,
Schema,
variables: %{"locationId" => location_id, "date" => date1},
context: %{pubsub: PubSub}
)

assert {:ok, %{"subscribed" => topic2}} =
run_subscription(
@query,
Schema,
variables: %{"locationId" => location_id, "date" => date2},
context: %{pubsub: PubSub}
)

Absinthe.Subscription.unsubscribe(PubSub, topic1)

Absinthe.Subscription.publish(PubSub, "foo", schedule: location_id)

assert_receive({:broadcast, msg})

assert %{
event: "subscription:data",
result: %{data: %{"schedule" => "foo"}},
topic: topic2
} == msg
end

@query """
subscription ($locationId: ID!, $date: Date) {
schedule(locationId: $locationId, date: $date)
}
"""
test "subscribing twice and unsubscribing once means new messages are only sent to one subscription" do
location_id = "12"
date1 = "2020-01-01"
date2 = "2020-01-02"

assert {:ok, %{"subscribed" => topic1}} =
run_subscription(
@query,
Schema,
variables: %{"locationId" => location_id, "date" => date1},
context: %{pubsub: PubSub}
)

assert {:ok, %{"subscribed" => topic2}} =
run_subscription(
@query,
Schema,
variables: %{"locationId" => location_id, "date" => date2},
context: %{pubsub: PubSub}
)

Absinthe.Subscription.unsubscribe(PubSub, topic1)

Absinthe.Subscription.publish(PubSub, "foo", schedule: location_id)

assert_receive({:broadcast, msg})

assert %{
event: "subscription:data",
result: %{data: %{"schedule" => "foo"}},
topic: topic2
} == msg

refute_receive({:broadcast, _})
end

@query """
subscription ($clientId: ID!) {
thing(clientId: $clientId)
}
"""
test "repeatedly subscribing and unsubscribing on the same topic doesn't grow registry indefinitely" do
client_id = "abc"

for _i <- 1..3 do
assert {:ok, %{"subscribed" => topic}} =
run_subscription(@query, Schema,
variables: %{"clientId" => client_id},
context: %{pubsub: PubSub}
)

Absinthe.Subscription.unsubscribe(PubSub, topic)
end

assert [] == PubSub.list_registry_keys()
end

@query """
subscription ($clientId: ID!) {
thing(clientId: $clientId)
}
"""
test "repeatedly subscribing and unsubscribing on multiple topics doesn't grow registry indefinitely" do
client_id = "abc"

for i <- 1..3 do
assert {:ok, %{"subscribed" => topic}} =
run_subscription(@query, Schema,
variables: %{"clientId" => "#{client_id}#{i}"},
context: %{pubsub: PubSub}
)

Absinthe.Subscription.unsubscribe(PubSub, topic)
end

assert [] == PubSub.list_registry_keys()
end

@query """
subscription ($userId: ID!) {
user(id: $userId) { id name }
Expand Down