Skip to content

Commit

Permalink
w.i.p. remote reg and pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
uwiger committed May 31, 2019
1 parent b7b0748 commit 054c2df
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
30 changes: 30 additions & 0 deletions src/gproc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
-export([start_link/0,
reg/1, reg/2, reg/3, unreg/1, set_attributes/2,
reg_other/2, reg_other/3, reg_other/4, unreg_other/2,
reg_remote/2, reg_remote/3, reg_remote/4,
reg_or_locate/1, reg_or_locate/2, reg_or_locate/3,
reg_shared/1, reg_shared/2, reg_shared/3, unreg_shared/1,
set_attributes_shared/2, set_value_shared/2,
Expand Down Expand Up @@ -1115,6 +1116,33 @@ reg_other1({T,l,_} = Key, Pid, Value, As, Op) when is_pid(Pid) ->
?THROW_GPROC_ERROR(badarg)
end.

reg_remote(Key, Pid) ->
?CATCH_GPROC_ERROR(reg_remote1(Key, Pid, reg), [Key, Pid]).

reg_remote(Key, Pid, Value) ->
?CATCH_GPROC_ERROR(reg_remote1(Key, Pid, Value, [], reg), [Key, Pid, Value]).

reg_remote(Key, Pid, Value, Attrs) ->
?CATCH_GPROC_ERROR(reg_remote1(Key, Pid, Value, Attrs, reg),
[Key, Pid, Value, Attrs]).

reg_remote1({_,l,_} = Key, Pid, Op) when is_pid(Pid), node(Pid) =/= node() ->
reg_remote1(Key, Pid, default(Key), [], Op);
reg_remote1(_, _, _) ->
?THROW_GPROC_ERROR(badarg).

reg_remote1({T,l,_} = Key, Pid, Value, As, Op)
when is_pid(Pid), node(Pid) =/= node(), is_list(As) ->
if Op == reg; Op == ensure ->
if T==n; T==p; T==c; T==a; T==r; T==rc ->
call({reg_remote, Key, Pid, Value, As, Op});
true ->
?THROW_GPROC_ERROR(badarg)
end;
true ->
?THROW_GPROC_ERROR(badarg)
end.

%% @spec reg_or_locate(Key::key(), Value) -> {pid(), NewValue}
%%
%% @doc Try registering a unique name, or return existing registration.
Expand Down Expand Up @@ -2259,6 +2287,8 @@ handle_call({reg, {_T,l,_} = Key, Val, Attrs, Op}, {Pid,_}, S) ->
handle_reg_call(Key, Pid, Val, Attrs, Op, S);
handle_call({reg_other, {_T,l,_} = Key, Pid, Val, Attrs, Op}, _, S) ->
handle_reg_call(Key, Pid, Val, Attrs, Op, S);
handle_call({reg_remote, {_T,l,_} = Key, Pid, Val, Attrs, Op}, _, S) ->
handle_reg_call(Key, Pid, Val, Attrs, Op, S);
handle_call({set_attributes, {_,l,_} = Key, Attrs}, {Pid,_}, S) ->
case gproc_lib:insert_attr(Key, Attrs, Pid, l) of
false -> {reply, badarg, S};
Expand Down
14 changes: 14 additions & 0 deletions src/gproc_ps.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
-module(gproc_ps).

-export([subscribe/2,
subscribe_remote/2,
subscribe_cond/3,
subscribe_cond_remote/3,
change_cond/3,
unsubscribe/2,
publish/3,
Expand Down Expand Up @@ -79,6 +81,9 @@
subscribe(Scope, Event) when Scope==l; Scope==g ->
gproc:reg({p,Scope,{?ETag, Event}}).

subscribe_remote(Event, Pid) when is_pid(Pid), node(Pid) =/= node() ->
gproc:reg_remote({p,l,{?ETag, Event}}, Pid).

-spec subscribe_cond(scope(), event(), undefined | ets:match_spec()) -> true.
%% @doc Subscribe conditionally to events of type `Event'
%%
Expand Down Expand Up @@ -114,6 +119,15 @@ subscribe_cond(Scope, Event, Spec) when Scope==l; Scope==g ->
end,
gproc:reg({p,Scope,{?ETag, Event}}, Spec).

subscribe_cond_remote(Event, Spec, Pid)
when is_pid(Pid), node(Pid) =/= node() ->
case Spec of
undefined -> ok;
[_|_] -> _ = ets:match_spec_compile(Spec);
_ -> error(badarg)
end,
gproc:reg_remote({p,l,{?ETag, Event}}, Spec).

-spec change_cond(scope(), event(), undefined | ets:match_spec()) -> true.
%% @doc Change the condition specification of an existing subscription.
%%
Expand Down

0 comments on commit 054c2df

Please sign in to comment.