Skip to content

Commit

Permalink
Upgrading to support heartbeats from the 0.60+ host (#24)
Browse files Browse the repository at this point in the history
* Upgrading to support heartbeats from the 0.60+ host

Signed-off-by: Kevin Hoffman <[email protected]>

* the power of semver compels me

Signed-off-by: Kevin Hoffman <[email protected]>

* har har just kidding this is backwards compatible

Signed-off-by: Kevin Hoffman <[email protected]>

* guardy mcguard face

Signed-off-by: Kevin Hoffman <[email protected]>

* Pulling contract ID off new heartbeats

Signed-off-by: Kevin Hoffman <[email protected]>

---------

Signed-off-by: Kevin Hoffman <[email protected]>
  • Loading branch information
autodidaddict authored Jan 27, 2023
1 parent 2608257 commit 9b33cc2
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 37 deletions.
94 changes: 78 additions & 16 deletions lib/lattice_observer/observed/event_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,26 @@ defmodule LatticeObserver.Observed.EventProcessor do
}
end

# New heartbeat format
# "actors": {
# "MB2ZQB6ROOMAYBO4ZCTFYWN7YIVBWA3MTKZYAQKJMTIHE2ELLRW2E3ZW": 10
# },
# "friendly_name": "wandering-meadow-5880",
# "labels": {
# "hostcore.arch": "aarch64",
# "hostcore.os": "macos",
# "hostcore.osfamily": "unix"
# },
# "providers": [
# {
# "link_name": "default",
# "public_key": "VAG3QITQQ2ODAOWB5TTQSDJ53XK3SHBEIFNK4AYJ5RKAX2UNSCAPHA5M"
# }
# ],
# "uptime_human": "1 minute, 32 seconds",
# "uptime_seconds": 92,
# "version": "0.60.0"

def record_heartbeat(l = %Lattice{}, source_host, stamp, data) do
labels = Map.get(data, "labels", %{})
friendly_name = Map.get(data, "friendly_name", "")
Expand Down Expand Up @@ -229,35 +249,77 @@ defmodule LatticeObserver.Observed.EventProcessor do

l = record_host(l, source_host, labels, stamp, friendly_name)

# legacy heartbeat has a list for the actors field...
# default to "new format" if this field is missing
# TODO - once a few wasmCloud OTP releases have gone by with the new heartbeat format, stop
# parsing/processing the legacy structure
l =
List.foldl(Map.get(data, "actors", []), l, fn x, acc ->
put_actor_instance(
acc,
source_host,
x["public_key"],
x["instance_id"],
spec,
stamp,
%{}
)
end)
if is_list(Map.get(data, "actors", %{})) do
put_legacy_instances(l, source_host, spec, stamp, data)
else
actors_expanded =
Enum.flat_map(Map.get(data, "actors", %{}), fn {k, count} ->
Enum.map(1..count, fn _ -> k end)
end)

l =
List.foldl(actors_expanded, l, fn pk, acc ->
put_actor_instance(
acc,
source_host,
pk,
"n/a",
spec,
stamp,
%{}
)
end)

List.foldl(Map.get(data, "providers", []), l, fn x, acc ->
put_provider_instance(
acc,
source_host,
x["public_key"],
x["link_name"],
Map.get(x, "contract_id", "n/a"),
"n/a",
spec,
stamp,
%{}
)
end)
end

l
end

defp put_legacy_instances(l = %Lattice{}, source_host, spec, stamp, data) do
l =
List.foldl(Map.get(data, "providers", []), l, fn x, acc ->
put_provider_instance(
List.foldl(Map.get(data, "actors", []), l, fn x, acc ->
put_actor_instance(
acc,
source_host,
x["public_key"],
x["link_name"],
x["contract_id"],
x["instance_id"],
spec,
stamp,
%{}
)
end)

l
List.foldl(Map.get(data, "providers", []), l, fn x, acc ->
put_provider_instance(
acc,
source_host,
x["public_key"],
x["link_name"],
x["contract_id"],
x["instance_id"],
spec,
stamp,
%{}
)
end)
end

def record_host(l = %Lattice{}, source_host, labels, stamp, friendly_name \\ "") do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule LatticeObserver.MixProject do
def project do
[
app: :lattice_observer,
version: "0.2.2",
version: "0.3.0",
elixir: "~> 1.12",
elixirc_paths: compiler_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
18 changes: 2 additions & 16 deletions test/observed/claims_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,9 @@ defmodule LatticeObserverTest.Observed.ClaimsTest do
CloudEvents.host_heartbeat(
"Nxxxx",
%{foo: "bar", baz: "biz"},
%{"Mxxxx" => 1},
[
%{
"public_key" => "Mxxxx",
"instance_id" => "42"
}
],
[
%{
"contract_id" => "wasmcloud:test",
"instance_id" => "43",
"link_name" => "default",
"public_key" => "Vxxxx"
}
Expand Down Expand Up @@ -113,16 +106,9 @@ defmodule LatticeObserverTest.Observed.ClaimsTest do
CloudEvents.host_heartbeat(
"Nxxxx",
%{foo: "bar", baz: "biz"},
%{"Mxxxx" => 1},
[
%{
"public_key" => "Mxxxx",
"instance_id" => "42"
}
],
[
%{
"contract_id" => "wasmcloud:test",
"instance_id" => "43",
"link_name" => "default",
"public_key" => "Vxxxx"
}
Expand Down
122 changes: 119 additions & 3 deletions test/observed/hosts_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,36 @@ defmodule LatticeObserverTest.Observed.HostsTest do
Lattice.new()
end

test "Properly records host heartbeat" do
hb = CloudEvents.host_heartbeat(@test_host, %{foo: "bar", baz: "biz"})
# updated heartbeat shape
# "actors": {
# "MB2ZQB6ROOMAYBO4ZCTFYWN7YIVBWA3MTKZYAQKJMTIHE2ELLRW2E3ZW": 10
# },
# "friendly_name": "wandering-meadow-5880",
# "labels": {
# "hostcore.arch": "aarch64",
# "hostcore.os": "macos",
# "hostcore.osfamily": "unix"
# },
# "providers": [
# {
# "link_name": "default",
# "public_key": "VAG3QITQQ2ODAOWB5TTQSDJ53XK3SHBEIFNK4AYJ5RKAX2UNSCAPHA5M"
# }
# ],
# "uptime_human": "1 minute, 32 seconds",
# "uptime_seconds": 92,
# "version": "0.60.0"

test "Propertly records LEGACY host heartbeat" do
hb = CloudEvents.host_heartbeat_old(@test_host, %{foo: "bar", baz: "biz"})
stamp = EventProcessor.timestamp_from_iso8601(hb.time)
l = Lattice.apply_event(Lattice.new(), hb)

assert l.hosts[@test_host].labels == %{baz: "biz", foo: "bar"}
assert l.hosts[@test_host].status == :healthy
assert l.hosts[@test_host].last_seen == stamp

hb2 = CloudEvents.host_heartbeat(@test_host, %{foo: "bar", baz: "biz"})
hb2 = CloudEvents.host_heartbeat_old(@test_host, %{foo: "bar", baz: "biz"})
stamp2 = EventProcessor.timestamp_from_iso8601(hb2.time)
l = Lattice.apply_event(l, hb2)

Expand Down Expand Up @@ -221,5 +241,101 @@ defmodule LatticeObserverTest.Observed.HostsTest do
}
}
end

test "Properly records host heartbeat" do
hb = CloudEvents.host_heartbeat(@test_host, %{foo: "bar", baz: "biz"})
stamp = EventProcessor.timestamp_from_iso8601(hb.time)
l = Lattice.apply_event(Lattice.new(), hb)

assert l.hosts[@test_host].labels == %{baz: "biz", foo: "bar"}
assert l.hosts[@test_host].status == :healthy
assert l.hosts[@test_host].last_seen == stamp

hb2 = CloudEvents.host_heartbeat(@test_host, %{foo: "bar", baz: "biz"})
stamp2 = EventProcessor.timestamp_from_iso8601(hb2.time)
l = Lattice.apply_event(l, hb2)

assert l.hosts[@test_host].labels == %{baz: "biz", foo: "bar"}
assert l.hosts[@test_host].status == :healthy
assert l.hosts[@test_host].last_seen == stamp2

hb3 =
CloudEvents.host_heartbeat(
@test_host,
%{foo: "bar"},
%{
"Mxxxx" => 1,
"Mxxxy" => 2
},
[
%{
"public_key" => "Vxxxxx",
"contract_id" => "wasmcloud:test",
"link_name" => "default"
}
]
)

# Scenario: heartbeat contains fewer actors than the previous one, yet
# the lattice did not receive an actor stopped event.
hb4 =
CloudEvents.host_heartbeat(
@test_host,
%{foo: "bar"},
%{
"Mxxxx" => 1
},
[
%{
"public_key" => "Vxxxxx",
"contract_id" => "wasmcloud:test",
"link_name" => "default"
}
]
)

l = Lattice.apply_event(Lattice.new(), hb3) |> Lattice.apply_event(hb4)

# The Mxxxy actor isn't here because of authoritative heartbeats
assert l.actors == %{
"Mxxxx" => %LatticeObserver.Observed.Actor{
call_alias: "",
capabilities: [],
id: "Mxxxx",
instances: [
%LatticeObserver.Observed.Instance{
host_id: "Nxxx",
id: "n/a",
revision: 0,
spec_id: "",
version: ""
}
],
issuer: "",
name: "unavailable",
tags: ""
}
}

assert l.providers == %{
{"Vxxxxx", "default"} => %LatticeObserver.Observed.Provider{
contract_id: "wasmcloud:test",
id: "Vxxxxx",
instances: [
%LatticeObserver.Observed.Instance{
host_id: "Nxxx",
id: "n/a",
revision: 0,
spec_id: "",
version: ""
}
],
issuer: "",
link_name: "default",
name: "unavailable",
tags: ""
}
}
end
end
end
11 changes: 10 additions & 1 deletion test/support/cloud_events.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ defmodule TestSupport.CloudEvents do
|> LatticeObserver.CloudEvent.new("provider_stopped", host)
end

def host_heartbeat(host, labels, actors \\ [], providers \\ []) do
def host_heartbeat(host, labels, actors \\ %{}, providers \\ []) do
%{
"actors" => actors,
"providers" => providers,
Expand Down Expand Up @@ -152,4 +152,13 @@ defmodule TestSupport.CloudEvents do
}
|> LatticeObserver.CloudEvent.new("invocation_failed", host)
end

def host_heartbeat_old(host, labels, actors \\ [], providers \\ []) do
%{
"actors" => actors,
"providers" => providers,
"labels" => labels
}
|> LatticeObserver.CloudEvent.new("host_heartbeat", host)
end
end

0 comments on commit 9b33cc2

Please sign in to comment.