Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Device Connections #1572

Merged
merged 21 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/nerves_hub/deployments.ex
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ defmodule NervesHub.Deployments do
^deployment.id
)
})
|> where([d], not is_nil(d.connection_last_seen_at))
|> where([d], d.status == :provisioned)
|> where(
[d],
d.deployment_id == ^deployment.id or
Expand Down
111 changes: 51 additions & 60 deletions lib/nerves_hub/devices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule NervesHub.Devices do
alias NervesHub.Deployments.Orchestrator
alias NervesHub.Devices.CACertificate
alias NervesHub.Devices.Alarms
alias NervesHub.Devices.Connections
alias NervesHub.Devices.Device
alias NervesHub.Devices.DeviceCertificate
alias NervesHub.Devices.DeviceHealth
Expand All @@ -34,6 +35,13 @@ defmodule NervesHub.Devices do
Repo.get(Device, device_id)
end

def get_device(device_id, :preload_latest_connection) when is_integer(device_id) do
Device
|> where(id: ^device_id)
|> Connections.preload_latest_connection()
|> Repo.one()
end

def get_active_device(filters) do
Device
|> Repo.exclude_deleted()
Expand All @@ -45,14 +53,18 @@ defmodule NervesHub.Devices do
end

def get_devices_by_org_id_and_product_id(org_id, product_id) do
query =
from(
d in Device,
where: d.org_id == ^org_id,
where: d.product_id == ^product_id
)
Device
|> where([d], d.org_id == ^org_id)
|> where([d], d.product_id == ^product_id)
|> Repo.exclude_deleted()
|> Repo.all()
end

query
def get_devices_by_org_id_and_product_id(org_id, product_id, :preload_latest_connection) do
Device
|> where([d], d.org_id == ^org_id)
|> where([d], d.product_id == ^product_id)
|> Connections.preload_latest_connection()
|> Repo.exclude_deleted()
|> Repo.all()
end
Expand All @@ -73,6 +85,7 @@ defmodule NervesHub.Devices do
|> order_by(^sort_devices(sorting))
|> filtering(filters)
|> preload([d, o, p, dp, f], org: o, product: p, deployment: {dp, firmware: f})
|> Connections.preload_latest_connection()
|> Repo.paginate(pagination)
end

Expand All @@ -83,6 +96,7 @@ defmodule NervesHub.Devices do

Device
|> where([d], d.product_id == ^product_id)
|> Connections.preload_latest_connection()
|> Repo.exclude_deleted()
|> filtering(filters)
|> order_by(^sort_devices(sorting))
Expand All @@ -91,18 +105,20 @@ defmodule NervesHub.Devices do

def get_minimal_device_location_by_org_id_and_product_id(org_id, product_id) do
Device
|> select([d], %{
|> where(org_id: ^org_id)
|> where(product_id: ^product_id)
|> where([d], not is_nil(fragment("?->'location'->'latitude'", d.connection_metadata)))
|> where([d], not is_nil(fragment("?->'location'->'longitude'", d.connection_metadata)))
|> join(:left, [d], dc in subquery(Connections.latest_row_query()), on: dc.device_id == d.id)
|> where([d, dc], dc.rn == 1)
|> select([d, dc], %{
id: d.id,
identifier: d.identifier,
connection_status: d.connection_status,
connection_status: dc.status,
latitude: fragment("?->'location'->'latitude'", d.connection_metadata),
longitude: fragment("?->'location'->'longitude'", d.connection_metadata),
firmware_uuid: fragment("?->'uuid'", d.firmware_metadata)
})
|> where(org_id: ^org_id)
|> where(product_id: ^product_id)
|> where([d], not is_nil(fragment("?->'location'->'latitude'", d.connection_metadata)))
|> where([d], not is_nil(fragment("?->'location'->'longitude'", d.connection_metadata)))
|> Repo.exclude_deleted()
|> Repo.all()
end
Expand Down Expand Up @@ -164,17 +180,24 @@ defmodule NervesHub.Devices do
{_, ""} ->
query

{:alarm, value} ->
where(query, [d], d.id in subquery(Alarms.query_devices_with_alarm(value)))

{:alarm_status, "with"} ->
where(query, [d], d.id in subquery(Alarms.query_devices_with_alarms()))

{:alarm_status, "without"} ->
where(query, [d], d.id not in subquery(Alarms.query_devices_with_alarms()))

{:alarm, value} ->
where(query, [d], d.id in subquery(Alarms.query_devices_with_alarm(value)))
{:connection, "not_seen"} ->
where(query, [d], d.status == :registered)

{:connection, _value} ->
where(query, [d], d.connection_status == ^String.to_atom(value))
{:connection, value} ->
where(
query,
[d],
d.id in subquery(Connections.query_devices_with_connection_status(value))
)

{:connection_type, value} ->
where(query, [d], ^value in d.connection_types)
Expand Down Expand Up @@ -310,6 +333,11 @@ defmodule NervesHub.Devices do
|> preload([d, device_certificates: dc], device_certificates: dc)
end

defp join_and_preload(query, :latest_connection) do
query
|> Connections.preload_latest_connection()
end

@spec get_shared_secret_auth(String.t()) ::
{:ok, SharedSecretAuth.t()} | {:error, :not_found}
def get_shared_secret_auth(key) do
Expand Down Expand Up @@ -391,6 +419,12 @@ defmodule NervesHub.Devices do
|> Repo.insert()
end

def set_as_provisioned!(device) do
device
|> Device.changeset(%{status: :provisioned, first_seen_at: DateTime.utc_now()})
|> Repo.update!()
end

def delete_device(%Device{} = device) do
device_certificates_query = from(dc in DeviceCertificate, where: dc.device_id == ^device.id)
changeset = Repo.soft_delete_changeset(device)
Expand Down Expand Up @@ -657,49 +691,6 @@ defmodule NervesHub.Devices do
|> Repo.all()
end

def device_connected(device) do
device
|> clear_connection_information()
|> Device.changeset(%{
connection_status: :connected,
connection_established_at: DateTime.utc_now(),
connection_disconnected_at: nil,
connection_last_seen_at: DateTime.utc_now()
})
|> Repo.update()
end

def device_heartbeat(device) do
device
|> clear_connection_information()
|> Device.changeset(%{
connection_status: :connected,
connection_disconnected_at: nil,
connection_last_seen_at: DateTime.utc_now()
})
|> Repo.update()
end

def device_disconnected(device) do
device
|> clear_connection_information()
|> Device.changeset(%{
connection_status: :disconnected,
connection_disconnected_at: DateTime.utc_now(),
connection_last_seen_at: DateTime.utc_now()
})
|> Repo.update()
end

defp clear_connection_information(device) do
%{
device
| connection_status: nil,
connection_disconnected_at: "dummy",
connection_last_seen_at: nil
}
end

def clean_connection_states() do
interval = Application.get_env(:nerves_hub, :device_last_seen_update_interval_minutes)
a_minute_ago = DateTime.shift(DateTime.utc_now(), minute: -(interval + 1))
Expand Down
131 changes: 131 additions & 0 deletions lib/nerves_hub/devices/connections.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
defmodule NervesHub.Devices.Connections do
@moduledoc """
Handles connection data for devices, reported from device socket.
"""
import Ecto.Query

alias NervesHub.Devices.Device
alias NervesHub.Devices.DeviceConnection
alias NervesHub.Repo

@doc """
Get all connections for a device.
"""
@spec get_device_connections(non_neg_integer()) :: [DeviceConnection.t()]
def get_device_connections(device_id) do
DeviceConnection
|> where(device_id: ^device_id)
|> order_by(asc: :last_seen_at)
|> Repo.all()
end

@doc """
Get latest inserted connection for a device.
"""
@spec get_latest_for_device(non_neg_integer()) :: DeviceConnection.t() | nil
def get_latest_for_device(device_id) do
DeviceConnection
|> where(device_id: ^device_id)
|> order_by(desc: :last_seen_at)
|> limit(1)
|> Repo.one()
end

@doc """
Preload latest respective connection in a device query.
"""
@spec preload_latest_connection(Query.t()) :: Query.t()
def preload_latest_connection(query) do
query
|> preload(device_connections: ^distinct_on_device())
end

@doc """
Creates a device connection, reported from device socket
"""
@spec device_connected(non_neg_integer()) ::
{:ok, DeviceConnection.t()} | {:error, Ecto.Changeset.t()}
def device_connected(device_id) do
now = DateTime.utc_now()

%{
device_id: device_id,
established_at: now,
last_seen_at: now,
status: :connected
}
|> DeviceConnection.create_changeset()
|> Repo.insert()
end

@doc """
Updates the `last_seen_at`field for a device connection with current timestamp
"""
@spec device_heartbeat(UUIDv7.t()) :: {:ok, DeviceConnection.t()} | {:error, Ecto.Changeset.t()}
def device_heartbeat(ref_id) do
DeviceConnection
|> Repo.get!(ref_id)
|> DeviceConnection.update_changeset(%{last_seen_at: DateTime.utc_now()})
|> Repo.update()
end

@doc """
Updates `status` and relevant timestamps for a device connection record,
and stores the reason for disconnection if provided.
"""
@spec device_disconnected(UUIDv7.t(), String.t() | nil) ::
{:ok, DeviceConnection.t()} | {:error, Ecto.Changeset.t()}
def device_disconnected(ref_id, reason \\ nil) do
now = DateTime.utc_now()

DeviceConnection
|> Repo.get!(ref_id)
|> DeviceConnection.update_changeset(%{
last_seen_at: now,
disconnected_at: now,
disconnected_reason: reason,
status: :disconnected
})
|> Repo.update()
end

@doc """
Selects devices id's which has provided status in it's latest connection record.
"""
@spec query_devices_with_connection_status(String.t()) :: Query.t()
def query_devices_with_connection_status(status) do
(lr in subquery(latest_row_query()))
|> from()
|> where([lr], lr.rn == 1)
|> where(
[lr],
lr.status == ^String.to_existing_atom(status)
)
|> join(:inner, [lr], d in Device, on: lr.device_id == d.id)
|> select([lr, d], d.id)
end

@doc """
Generates a query to retrieve the most recent `DeviceConnection` for devices.
The query includes the row number (`rn`)
for each record, which is used to identify the most recent connection.

Returns an Ecto query.
"""
@spec latest_row_query() :: Query.t()
def latest_row_query() do
DeviceConnection
|> select([dc], %{
device_id: dc.device_id,
status: dc.status,
last_seen_at: dc.last_seen_at,
rn: row_number() |> over(partition_by: dc.device_id, order_by: [desc: dc.last_seen_at])
})
end

defp distinct_on_device() do
DeviceConnection
|> distinct(:device_id)
|> order_by([:device_id, desc: :last_seen_at])
end
end
25 changes: 19 additions & 6 deletions lib/nerves_hub/devices/device.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule NervesHub.Devices.Device do

alias NervesHub.Accounts.Org
alias NervesHub.Devices.DeviceCertificate
alias NervesHub.Devices.DeviceConnection
alias NervesHub.Deployments.Deployment
alias NervesHub.Firmwares.FirmwareMetadata
alias NervesHub.Products.Product
Expand All @@ -26,7 +27,9 @@ defmodule NervesHub.Devices.Device do
:connection_disconnected_at,
:connection_last_seen_at,
:connection_types,
:connection_metadata
:connection_metadata,
:status,
:first_seen_at
]
@required_params [:org_id, :product_id, :identifier]

Expand All @@ -36,6 +39,7 @@ defmodule NervesHub.Devices.Device do
belongs_to(:deployment, Deployment)
embeds_one(:firmware_metadata, FirmwareMetadata, on_replace: :update)
has_many(:device_certificates, DeviceCertificate, on_delete: :delete_all)
has_many(:device_connections, DeviceConnection, on_delete: :delete_all)

field(:identifier, :string)
field(:description, :string)
Expand All @@ -45,6 +49,20 @@ defmodule NervesHub.Devices.Device do
field(:update_attempts, {:array, :utc_datetime}, default: [])
field(:updates_blocked_until, :utc_datetime)

field(:status, Ecto.Enum,
values: [:registered, :provisioned],
default: :registered
)
joshk marked this conversation as resolved.
Show resolved Hide resolved

field(:first_seen_at, :utc_datetime)

field(:connection_types, {:array, Ecto.Enum}, values: [:cellular, :ethernet, :wifi])
field(:connecting_code, :string)
field(:connection_metadata, :map, default: %{})

timestamps()

# Deprecated fields, replaced with device_connections table.
field(:connection_status, Ecto.Enum,
values: [:connected, :disconnected, :not_seen],
default: :not_seen
Expand All @@ -53,11 +71,6 @@ defmodule NervesHub.Devices.Device do
field(:connection_established_at, :utc_datetime)
field(:connection_disconnected_at, :utc_datetime)
field(:connection_last_seen_at, :utc_datetime)
field(:connection_types, {:array, Ecto.Enum}, values: [:cellular, :ethernet, :wifi])
field(:connecting_code, :string)
field(:connection_metadata, :map, default: %{})

timestamps()
end

def changeset(%Device{} = device, params) do
Expand Down
Loading
Loading