Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(*): support actor_scaled event #36

Merged
merged 3 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-22.04, windows-2022, macos-11]
os: [ubuntu-22.04, windows-2022, macos-12]
elixir: [1.13.4]
otp: [25]

Expand Down
37 changes: 37 additions & 0 deletions lib/lattice_observer/observed/event_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,43 @@ defmodule LatticeObserver.Observed.EventProcessor do
}
end

# Helper function to set the actor instances when an actor is scaled
def set_actor_instances(l = %Lattice{}, host_id, pk, annotations, claims, scale)
when is_binary(pk) and is_map(annotations) and is_integer(scale) do
actor = Map.get(l.actors, pk, Actor.new(pk, "unavailable"))
actor = merge_actor(actor, l, claims)
# Reset instances
actor = %Actor{actor | instances: []}

# Create `scale` instances to add to the actor
actor =
1..scale
|> Enum.reduce(actor, fn _i, acc ->
instance = %Instance{
id: "N/A",
host_id: host_id,
annotations: annotations,
version: Map.get(claims, "version", get_claim(l, :version, pk)),
revision: Map.get(claims, "revision", get_claim(l, :rev, pk, 0))
}

%Actor{acc | instances: [instance | acc.instances]}
end)

%Lattice{
l
| actors: Map.put(l.actors, pk, actor)
}
end

# Helper function to remove all instances of an actor when it's scaled to zero
def remove_actor(l = %Lattice{}, pk) when is_binary(pk) do
%Lattice{
l
| actors: Map.delete(l.actors, pk)
}
end

def put_actor_instance(l = %Lattice{}, host_id, pk, instance_id, annotations, _stamp, claims)
when is_binary(pk) and is_binary(instance_id) and is_map(annotations) do
actor = Map.get(l.actors, pk, Actor.new(pk, "unavailable"))
Expand Down
70 changes: 70 additions & 0 deletions lib/lattice_observer/observed/lattice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,76 @@ defmodule LatticeObserver.Observed.Lattice do
EventProcessor.put_actor_instance(l, source_host, pk, instance_id, annotations, stamp, claims)
end

def apply_event(
l = %Lattice{},
%Cloudevents.Format.V_1_0.Event{
data:
%{
"public_key" => pk,
"max_instances" => scale
} = d,
source: source_host,
datacontenttype: "application/json",
time: _stamp,
type: "com.wasmcloud.lattice.actor_scaled"
}
)
when scale > 0 do
annotations = Map.get(d, "annotations", %{})
claims = Map.get(d, "claims", %{})

l =
apply_claims(l, %Claims{
call_alias: Map.get(claims, "call_alias", ""),
iss: Map.get(claims, "issuer", "N/A"),
name: Map.get(claims, "name", ""),
caps: Map.get(claims, "caps", []) |> Enum.join(","),
rev: Map.get(claims, "revision", ""),
tags: Map.get(claims, "tags", ""),
version: Map.get(claims, "version", ""),
sub: pk
})

EventProcessor.set_actor_instances(
l,
source_host,
pk,
annotations,
claims,
scale
)
end

def apply_event(
l = %Lattice{},
%Cloudevents.Format.V_1_0.Event{
data: %{
"public_key" => pk,
"max_instances" => scale
},
source: _source_host,
datacontenttype: "application/json",
time: _stamp,
type: "com.wasmcloud.lattice.actor_scaled"
}
)
when scale == 0 do
EventProcessor.remove_actor(l, pk)
end

def apply_event(
l = %Lattice{},
%Cloudevents.Format.V_1_0.Event{
source: _source_host,
datacontenttype: "application/json",
time: _stamp,
type: "com.wasmcloud.lattice.actor_scale_failed"
}
) do
# This does not currently affect state, but shouldn't generate a warning either
l
end

def apply_event(
l = %Lattice{},
%Cloudevents.Format.V_1_0.Event{
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule LatticeObserver.MixProject do
def project do
[
app: :lattice_observer,
version: "0.4.3",
version: "0.5.0",
elixir: "~> 1.12",
elixirc_paths: compiler_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
88 changes: 88 additions & 0 deletions test/observed/actors_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule LatticeObserverTest.Observed.ActorsTest do

@test_spec "testapp"
@test_host "Nxxx"
@test_host_2 "Nyyy"

describe "Observed Lattice Monitors Actor Events" do
test "Adds and Removes actors" do
Expand Down Expand Up @@ -70,5 +71,92 @@ defmodule LatticeObserverTest.Observed.ActorsTest do
}
}
end

test "Scaled event modifies actor list" do
start =
CloudEvents.actor_scaled(
"Myyy",
@test_spec,
@test_host_2,
"mything.cloud.io/actor:latest",
1
)

l = Lattice.new()
l = Lattice.apply_event(l, start)

assert l == %Lattice{
Lattice.new()
| actors: %{
"Myyy" => %Actor{
call_alias: "",
capabilities: ["test", "test2"],
id: "Myyy",
instances: [
%Instance{
host_id: "Nyyy",
id: "N/A",
revision: 0,
annotations: %{"wasmcloud.dev/appspec" => "testapp"},
version: "1.0"
}
],
issuer: "ATESTxxx",
name: "Test Actor",
tags: ""
}
},
claims: %{
"Myyy" => %LatticeObserver.Observed.Claims{
call_alias: "",
caps: "test,test2",
iss: "ATESTxxx",
name: "Test Actor",
rev: 0,
sub: "Myyy",
tags: "",
version: "1.0"
}
},
instance_tracking: %{}
}

scale_up =
CloudEvents.actor_scaled(
"Myyy",
@test_spec,
@test_host_2,
"mything.cloud.io/actor:latest",
500
)

l = Lattice.apply_event(l, scale_up)
assert l.actors["Myyy"].instances |> length() == 500

scale_down =
CloudEvents.actor_scaled(
"Myyy",
@test_spec,
@test_host_2,
"mything.cloud.io/actor:latest",
123
)

l = Lattice.apply_event(l, scale_down)
assert l.actors["Myyy"].instances |> length() == 123

scale_to_zero =
CloudEvents.actor_scaled(
"Myyy",
@test_spec,
@test_host_2,
"mything.cloud.io/actor:latest",
0
)

l = Lattice.apply_event(l, scale_to_zero)
actor = Map.get(l.actors, "Myyy", nil)
assert is_nil(actor)
end
end
end
19 changes: 19 additions & 0 deletions test/support/cloud_events.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ defmodule TestSupport.CloudEvents do
|> LatticeObserver.CloudEvent.new("actor_started", host)
end

def actor_scaled(pk, spec, host, image_ref, scale, name \\ "Test Actor") do
%{
"public_key" => pk,
"annotations" => %{@appspec => spec},
"claims" => %{
"name" => name,
"caps" => ["test", "test2"],
"version" => "1.0",
"revision" => 0,
"tags" => "",
"issuer" => "ATESTxxx"
},
"host_id" => host,
"image_ref" => image_ref,
"max_instances" => scale
}
|> LatticeObserver.CloudEvent.new("actor_scaled", host)
end

def actor_stopped(pk, instance_id, spec, host) do
%{"public_key" => pk, "instance_id" => instance_id, "annotations" => %{@appspec => spec}}
|> LatticeObserver.CloudEvent.new("actor_stopped", host)
Expand Down
Loading