diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 78b628f..d27615c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,7 +14,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-22.04, windows-2022, macos-11] + os: [ubuntu-22.04, windows-2022, macos-12] elixir: [1.13.4] otp: [25] diff --git a/lib/lattice_observer/observed/event_processor.ex b/lib/lattice_observer/observed/event_processor.ex index 8ee346e..0d1990f 100644 --- a/lib/lattice_observer/observed/event_processor.ex +++ b/lib/lattice_observer/observed/event_processor.ex @@ -61,6 +61,43 @@ defmodule LatticeObserver.Observed.EventProcessor do } end + # Helper function to set the actor instances when an actor is scaled + def set_actor_instances(l = %Lattice{}, host_id, pk, annotations, claims, scale) + when is_binary(pk) and is_map(annotations) and is_integer(scale) do + actor = Map.get(l.actors, pk, Actor.new(pk, "unavailable")) + actor = merge_actor(actor, l, claims) + # Reset instances + actor = %Actor{actor | instances: []} + + # Create `scale` instances to add to the actor + actor = + 1..scale + |> Enum.reduce(actor, fn _i, acc -> + instance = %Instance{ + id: "N/A", + host_id: host_id, + annotations: annotations, + version: Map.get(claims, "version", get_claim(l, :version, pk)), + revision: Map.get(claims, "revision", get_claim(l, :rev, pk, 0)) + } + + %Actor{acc | instances: [instance | acc.instances]} + end) + + %Lattice{ + l + | actors: Map.put(l.actors, pk, actor) + } + end + + # Helper function to remove all instances of an actor when it's scaled to zero + def remove_actor(l = %Lattice{}, pk) when is_binary(pk) do + %Lattice{ + l + | actors: Map.delete(l.actors, pk) + } + end + def put_actor_instance(l = %Lattice{}, host_id, pk, instance_id, annotations, _stamp, claims) when is_binary(pk) and is_binary(instance_id) and is_map(annotations) do actor = Map.get(l.actors, pk, Actor.new(pk, "unavailable")) diff --git a/lib/lattice_observer/observed/lattice.ex b/lib/lattice_observer/observed/lattice.ex index fb0135e..ad10ba9 100644 --- a/lib/lattice_observer/observed/lattice.ex +++ b/lib/lattice_observer/observed/lattice.ex @@ -445,6 +445,76 @@ defmodule LatticeObserver.Observed.Lattice do EventProcessor.put_actor_instance(l, source_host, pk, instance_id, annotations, stamp, claims) end + def apply_event( + l = %Lattice{}, + %Cloudevents.Format.V_1_0.Event{ + data: + %{ + "public_key" => pk, + "max_instances" => scale + } = d, + source: source_host, + datacontenttype: "application/json", + time: _stamp, + type: "com.wasmcloud.lattice.actor_scaled" + } + ) + when scale > 0 do + annotations = Map.get(d, "annotations", %{}) + claims = Map.get(d, "claims", %{}) + + l = + apply_claims(l, %Claims{ + call_alias: Map.get(claims, "call_alias", ""), + iss: Map.get(claims, "issuer", "N/A"), + name: Map.get(claims, "name", ""), + caps: Map.get(claims, "caps", []) |> Enum.join(","), + rev: Map.get(claims, "revision", ""), + tags: Map.get(claims, "tags", ""), + version: Map.get(claims, "version", ""), + sub: pk + }) + + EventProcessor.set_actor_instances( + l, + source_host, + pk, + annotations, + claims, + scale + ) + end + + def apply_event( + l = %Lattice{}, + %Cloudevents.Format.V_1_0.Event{ + data: %{ + "public_key" => pk, + "max_instances" => scale + }, + source: _source_host, + datacontenttype: "application/json", + time: _stamp, + type: "com.wasmcloud.lattice.actor_scaled" + } + ) + when scale == 0 do + EventProcessor.remove_actor(l, pk) + end + + def apply_event( + l = %Lattice{}, + %Cloudevents.Format.V_1_0.Event{ + source: _source_host, + datacontenttype: "application/json", + time: _stamp, + type: "com.wasmcloud.lattice.actor_scale_failed" + } + ) do + # This does not currently affect state, but shouldn't generate a warning either + l + end + def apply_event( l = %Lattice{}, %Cloudevents.Format.V_1_0.Event{ diff --git a/mix.exs b/mix.exs index c7bda96..9c9609a 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule LatticeObserver.MixProject do def project do [ app: :lattice_observer, - version: "0.4.3", + version: "0.5.0", elixir: "~> 1.12", elixirc_paths: compiler_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/observed/actors_test.exs b/test/observed/actors_test.exs index d3b5275..803a550 100644 --- a/test/observed/actors_test.exs +++ b/test/observed/actors_test.exs @@ -5,6 +5,7 @@ defmodule LatticeObserverTest.Observed.ActorsTest do @test_spec "testapp" @test_host "Nxxx" + @test_host_2 "Nyyy" describe "Observed Lattice Monitors Actor Events" do test "Adds and Removes actors" do @@ -70,5 +71,92 @@ defmodule LatticeObserverTest.Observed.ActorsTest do } } end + + test "Scaled event modifies actor list" do + start = + CloudEvents.actor_scaled( + "Myyy", + @test_spec, + @test_host_2, + "mything.cloud.io/actor:latest", + 1 + ) + + l = Lattice.new() + l = Lattice.apply_event(l, start) + + assert l == %Lattice{ + Lattice.new() + | actors: %{ + "Myyy" => %Actor{ + call_alias: "", + capabilities: ["test", "test2"], + id: "Myyy", + instances: [ + %Instance{ + host_id: "Nyyy", + id: "N/A", + revision: 0, + annotations: %{"wasmcloud.dev/appspec" => "testapp"}, + version: "1.0" + } + ], + issuer: "ATESTxxx", + name: "Test Actor", + tags: "" + } + }, + claims: %{ + "Myyy" => %LatticeObserver.Observed.Claims{ + call_alias: "", + caps: "test,test2", + iss: "ATESTxxx", + name: "Test Actor", + rev: 0, + sub: "Myyy", + tags: "", + version: "1.0" + } + }, + instance_tracking: %{} + } + + scale_up = + CloudEvents.actor_scaled( + "Myyy", + @test_spec, + @test_host_2, + "mything.cloud.io/actor:latest", + 500 + ) + + l = Lattice.apply_event(l, scale_up) + assert l.actors["Myyy"].instances |> length() == 500 + + scale_down = + CloudEvents.actor_scaled( + "Myyy", + @test_spec, + @test_host_2, + "mything.cloud.io/actor:latest", + 123 + ) + + l = Lattice.apply_event(l, scale_down) + assert l.actors["Myyy"].instances |> length() == 123 + + scale_to_zero = + CloudEvents.actor_scaled( + "Myyy", + @test_spec, + @test_host_2, + "mything.cloud.io/actor:latest", + 0 + ) + + l = Lattice.apply_event(l, scale_to_zero) + actor = Map.get(l.actors, "Myyy", nil) + assert is_nil(actor) + end end end diff --git a/test/support/cloud_events.ex b/test/support/cloud_events.ex index 45d2058..06061a0 100644 --- a/test/support/cloud_events.ex +++ b/test/support/cloud_events.ex @@ -42,6 +42,25 @@ defmodule TestSupport.CloudEvents do |> LatticeObserver.CloudEvent.new("actor_started", host) end + def actor_scaled(pk, spec, host, image_ref, scale, name \\ "Test Actor") do + %{ + "public_key" => pk, + "annotations" => %{@appspec => spec}, + "claims" => %{ + "name" => name, + "caps" => ["test", "test2"], + "version" => "1.0", + "revision" => 0, + "tags" => "", + "issuer" => "ATESTxxx" + }, + "host_id" => host, + "image_ref" => image_ref, + "max_instances" => scale + } + |> LatticeObserver.CloudEvent.new("actor_scaled", host) + end + def actor_stopped(pk, instance_id, spec, host) do %{"public_key" => pk, "instance_id" => instance_id, "annotations" => %{@appspec => spec}} |> LatticeObserver.CloudEvent.new("actor_stopped", host)