Skip to content

add support to handle ha notifications #3659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 269 additions & 5 deletions orchagent/dash/dashhaorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,44 @@
#include "taskworker.h"
#include "pbutils.h"

#include "chrono"

using namespace std;
using namespace swss;

extern sai_dash_ha_api_t* sai_dash_ha_api;
extern sai_dash_eni_api_t* sai_dash_eni_api;
extern sai_object_id_t gSwitchId;
extern sai_dash_ha_api_t* sai_dash_ha_api;
extern sai_dash_eni_api_t* sai_dash_eni_api;
extern sai_object_id_t gSwitchId;
extern sai_switch_api_t* sai_switch_api;

static const map<sai_ha_set_event_t, string> sai_ha_set_event_type_name =
{
{ SAI_HA_SET_EVENT_DP_CHANNEL_UP, "up" },
{ SAI_HA_SET_EVENT_DP_CHANNEL_DOWN, "down" }
};

static const map<sai_dash_ha_state_t, string> sai_ha_state_name = {
{ SAI_DASH_HA_STATE_DEAD, "HA_STATE_DEAD" },
{ SAI_DASH_HA_STATE_CONNECTING, "HA_STATE_CONNECTING" },
{ SAI_DASH_HA_STATE_CONNECTED, "HA_STATE_CONNECTED" },
{ SAI_DASH_HA_STATE_INITIALIZING_TO_ACTIVE, "HA_STATE_INITIALIZING_TO_ACTIVE" },
{ SAI_DASH_HA_STATE_INITIALIZING_TO_STANDBY, "HA_STATE_INITIALIZING_TO_STANDBY" },
{ SAI_DASH_HA_STATE_PENDING_STANDALONE_ACTIVATION, "HA_STATE_PENDING_STANDALONE_ACTIVATION" },
{ SAI_DASH_HA_STATE_PENDING_ACTIVE_ACTIVATION, "HA_STATE_PENDING_ACTIVE_ACTIVATION" },
{ SAI_DASH_HA_STATE_PENDING_STANDBY_ACTIVATION, "HA_STATE_PENDING_STANDBY_ACTIVATION" },
{ SAI_DASH_HA_STATE_STANDALONE, "HA_STATE_STANDALONE" },
{ SAI_DASH_HA_STATE_ACTIVE, "HA_STATE_ACTIVE" },
{ SAI_DASH_HA_STATE_STANDBY, "HA_STATE_STANDBY" },
{ SAI_DASH_HA_STATE_DESTROYING, "HA_STATE_DESTROYING" },
{ SAI_DASH_HA_STATE_SWITCHING_TO_STANDALONE, "HA_STATE_SWITCHING_TO_STANDALONE" },
};

static const map<sai_ha_scope_event_t, string> sai_ha_scope_event_type_name =
{
{ SAI_HA_SCOPE_EVENT_STATE_CHANGED, "state_changed" },
{ SAI_HA_SCOPE_EVENT_FLOW_RECONCILE_NEEDED, "flow_reconcile_needed" },
{ SAI_HA_SCOPE_EVENT_SPLIT_BRAIN_DETECTED, "split_brain_detected" }
};

DashHaOrch::DashHaOrch(DBConnector *db, const vector<string> &tables, DashOrch *dash_orch, DBConnector *app_state_db, ZmqServer *zmqServer) :
ZmqOrch(db, tables, zmqServer),
Expand All @@ -25,6 +57,125 @@ DashHaOrch::DashHaOrch(DBConnector *db, const vector<string> &tables, DashOrch *

dash_ha_set_result_table_ = make_unique<Table>(app_state_db, APP_DASH_HA_SET_TABLE_NAME);
dash_ha_scope_result_table_ = make_unique<Table>(app_state_db, APP_DASH_HA_SCOPE_TABLE_NAME);

m_dpuStateDbConnector = make_unique<DBConnector>("DPU_STATE_DB", 0);
m_dpuStateDbHaSetTable = make_unique<Table>(m_dpuStateDbConnector.get(), STATE_DASH_HA_SET_STATE_TABLE_NAME);
m_dpuStateDbHaScopeTable = make_unique<Table>(m_dpuStateDbConnector.get(), STATE_DASH_HA_SCOPE_STATE_TABLE_NAME);

DBConnector *notificationsDb = new DBConnector("ASIC_DB", 0);
m_haSetNotificationConsumer = new NotificationConsumer(notificationsDb, "NOTIFICATIONS");
auto haSetNotificatier = new Notifier(m_haSetNotificationConsumer, this, "HA_SET_NOTIFICATIONS");

m_haScopeNotificationConsumer = new NotificationConsumer(notificationsDb, "NOTIFICATIONS");
auto haScopeNotificatier = new Notifier(m_haScopeNotificationConsumer, this, "HA_SCOPE_NOTIFICATIONS");

Orch::addExecutor(haSetNotificatier);
Orch::addExecutor(haScopeNotificatier);

register_ha_set_notifier();
register_ha_scope_notifier();
}

bool DashHaOrch::register_ha_set_notifier()
{
SWSS_LOG_ENTER();

sai_attribute_t attr;
sai_status_t status;
sai_attr_capability_t capability;

status = sai_query_attribute_capability(gSwitchId, SAI_OBJECT_TYPE_SWITCH,
SAI_SWITCH_ATTR_HA_SET_EVENT_NOTIFY,
&capability);
if (status != SAI_STATUS_SUCCESS)
{
SWSS_LOG_ERROR("Unable to query the HA Set event notification capability");
return false;
}

if (!capability.set_implemented)
{
SWSS_LOG_INFO("HA Set event notification not supported");
return false;
}

attr.id = SAI_SWITCH_ATTR_HA_SET_EVENT_NOTIFY;
attr.value.ptr = (void *)on_ha_set_event;

status = sai_switch_api->set_switch_attribute(gSwitchId, &attr);
if (status != SAI_STATUS_SUCCESS)
{
SWSS_LOG_ERROR("Failed to register HA Set event notification");
return false;
}

return true;
}

bool DashHaOrch::register_ha_scope_notifier()
{
SWSS_LOG_ENTER();

sai_attribute_t attr;
sai_status_t status;
sai_attr_capability_t capability;

status = sai_query_attribute_capability(gSwitchId, SAI_OBJECT_TYPE_SWITCH,
SAI_SWITCH_ATTR_HA_SCOPE_EVENT_NOTIFY,
&capability);
if (status != SAI_STATUS_SUCCESS)
{
SWSS_LOG_ERROR("Unable to query the HA Scope event notification capability");
return false;
}

if (!capability.set_implemented)
{
SWSS_LOG_INFO("HA Scope event notification not supported");
return false;
}

attr.id = SAI_SWITCH_ATTR_HA_SCOPE_EVENT_NOTIFY;
attr.value.ptr = (void *)on_ha_scope_event;

status = sai_switch_api->set_switch_attribute(gSwitchId, &attr);
if (status != SAI_STATUS_SUCCESS)
{
SWSS_LOG_ERROR("Failed to register HA Scope event notification");
return false;
}

return true;
}

std::string DashHaOrch::getHaSetObjectKey(const sai_object_id_t ha_set_oid)
{
SWSS_LOG_ENTER();

for (auto ha_set_entry : m_ha_set_entries)
{
if (ha_set_entry.second.ha_set_id == ha_set_oid)
{
return ha_set_entry.first;
}
}

return "";
}

std::string DashHaOrch::getHaScopeObjectKey(const sai_object_id_t ha_scope_oid)
{
SWSS_LOG_ENTER();

for (auto ha_scope_entry : m_ha_scope_entries)
{
if (ha_scope_entry.second.ha_scope_id == ha_scope_oid)
{
return ha_scope_entry.first;
}
}

return "";
}

bool DashHaOrch::addHaSetEntry(const std::string &key, const dash::ha_set::HaSet &entry)
Expand Down Expand Up @@ -262,7 +413,7 @@ bool DashHaOrch::addHaScopeEntry(const std::string &key, const dash::ha_scope::H
return parseHandleSaiStatusFailure(handle_status);
}
}
m_ha_scope_entries[key] = HaScopeEntry {sai_ha_scope_oid, entry};
m_ha_scope_entries[key] = HaScopeEntry {sai_ha_scope_oid, entry, getNowTime()};
SWSS_LOG_NOTICE("Created HA Scope object for %s", key.c_str());

// set HA Scope ID to ENI
Expand Down Expand Up @@ -328,7 +479,6 @@ bool DashHaOrch::setHaScopeHaRole(const std::string &key, const dash::ha_scope::
}
}

m_ha_scope_entries[key].metadata.set_ha_role(entry.ha_role());
SWSS_LOG_NOTICE("Set HA Scope role for %s to %s", key.c_str(), (dash::types::HaRole_Name(entry.ha_role())).c_str());

return true;
Expand Down Expand Up @@ -512,3 +662,117 @@ void DashHaOrch::doTask(ConsumerBase &consumer)
SWSS_LOG_ERROR("Unknown table: %s", consumer.getTableName().c_str());
}
}

void DashHaOrch::doTask(NotificationConsumer &consumer)
{
SWSS_LOG_ENTER();;

std::string op;
std::string data;
std::vector<swss::FieldValueTuple> values;

consumer.pop(op, data, values);
Copy link
Contributor

@vivekrnv vivekrnv Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use pops api and process multiple notifications at once.

Also, Make sure doTask(NotificationConsumer &consumer) is called during the empty doTask() Eg:

o->doTask();

DashHaOrch::doTask()
{
    # Finish pending tasks
   doTask(*m_haSetNotificationConsumer);
   doTask(*m_haScopeNotificationConsumer);
}

if not there is a possibility we might miss some notifications

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use pops api and process multiple notifications at once.

Updated.

Also, Make sure doTask(NotificationConsumer &consumer) is called during the empty doTask()

That's a great tip! I added the executors to m_consumerMap in DashHaOrch constructor

Orch::addExecutor(haSetNotificatier);
Orch::addExecutor(haScopeNotificatier);

So it should have been taken care of:

void Orch::doTask()
{
for (auto &it : m_consumerMap)
{
it.second->drain();
}
}

Don't think I need to override the method in DashHaOrch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, i see. We should be good then.


if (op == "ha_set_event")
{
std::time_t now_time = getNowTime();

uint32_t count;
sai_ha_set_event_data_t *ha_set_event = nullptr;

sai_deserialize_ha_set_event_ntf(data, count, &ha_set_event);

for (uint32_t i = 0; i < count; i++)
{
sai_object_id_t ha_set_id = ha_set_event[i].ha_set_id;
sai_ha_set_event_t event_type = ha_set_event[i].event_type;

SWSS_LOG_INFO("Get HA Set event notification id:%" PRIx64 " event: Data plane channel goes %s", ha_set_id, sai_ha_set_event_type_name.at(event_type).c_str());

auto key = getHaSetObjectKey(ha_set_id);
if (key.empty())
{
SWSS_LOG_ERROR("HA Set object not found for ID: %" PRIx64, ha_set_id);
continue;
}
std::vector<FieldValueTuple> fvs = {
{"last_updated_time", to_string(now_time)},
{"dp_channel_is_alive", sai_ha_set_event_type_name.at(event_type)}
};
m_dpuStateDbHaSetTable->set(key, fvs);
}
sai_deserialize_free_ha_set_event_ntf(count, ha_set_event);
}

if (op == "ha_scope_event")
{
std::time_t now_time = getNowTime();

uint32_t count;
sai_ha_scope_event_data_t *ha_scope_event = nullptr;

sai_deserialize_ha_scope_event_ntf(data, count, &ha_scope_event);

for (uint32_t i = 0; i < count; i++)
{
sai_ha_scope_event_t event_type = ha_scope_event[i].event_type;
sai_object_id_t ha_scope_id = ha_scope_event[i].ha_scope_id;

SWSS_LOG_INFO("Get HA Scope event notification id:%" PRIx64 " event: %s", ha_scope_id, sai_ha_scope_event_type_name.at(event_type).c_str());

auto key = getHaScopeObjectKey(ha_scope_id);
if (key.empty())
{
SWSS_LOG_ERROR("HA Scope object not found for ID: %" PRIx64, ha_scope_id);
continue;
}

std::vector<FieldValueTuple> fvs = {
{"last_updated_time", to_string(now_time)}
};

auto ha_role = to_pb(ha_scope_event[i].ha_role);
std::time_t role_start_time = now_time;

if (m_ha_scope_entries[key].metadata.ha_role() != ha_role)
{
m_ha_scope_entries[key].metadata.set_ha_role(ha_role);
m_ha_scope_entries[key].last_role_start_time = now_time;
SWSS_LOG_NOTICE("HA Scope role changed for %s to %s", key.c_str(), dash::types::HaRole_Name(ha_role).c_str());
} else
{
role_start_time = m_ha_scope_entries[key].last_role_start_time;
}

fvs.push_back({"ha_role", dash::types::HaRole_Name(ha_role)});
fvs.push_back({"ha_role_start_time ", to_string(role_start_time)});

switch (event_type)
{
case SAI_HA_SCOPE_EVENT_FLOW_RECONCILE_NEEDED:
fvs.push_back({"flow_reconcile_pending", "true"});
break;
case SAI_HA_SCOPE_EVENT_SPLIT_BRAIN_DETECTED:
fvs.push_back({"brainsplit_recover_pending", "true"});
break;
case SAI_HA_SCOPE_EVENT_STATE_CHANGED:
if (in(ha_scope_event[i].ha_state, {SAI_DASH_HA_STATE_PENDING_STANDALONE_ACTIVATION,
SAI_DASH_HA_STATE_PENDING_ACTIVE_ACTIVATION,
SAI_DASH_HA_STATE_PENDING_STANDBY_ACTIVATION}))
{
fvs.push_back({"activate_role_pending", "true"});
SWSS_LOG_NOTICE("DPU is pending on role activation for %s", key.c_str());
}

fvs.push_back({"ha_state", sai_ha_state_name.at(ha_scope_event[i].ha_state)});
break;
default:
SWSS_LOG_ERROR("Unknown HA Scope event type %d for %s", event_type, key.c_str());
}

m_dpuStateDbHaScopeTable->set(key, fvs);

}
sai_deserialize_free_ha_scope_event_ntf(count, ha_scope_event);
}
}
28 changes: 27 additions & 1 deletion orchagent/dash/dashhaorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include "zmqorch.h"
#include "zmqserver.h"
#include "saitypes.h"
#include "notifier.h"
#include "directory.h"
#include "sai_serialize.h"
#include "notifications.h"

#include "dash_api/ha_set.pb.h"
#include "dash_api/ha_scope.pb.h"
Expand All @@ -23,23 +27,30 @@ struct HaScopeEntry
{
sai_object_id_t ha_scope_id;
dash::ha_scope::HaScope metadata;
std::time_t last_role_start_time;
};

typedef std::map<std::string, HaSetEntry> HaSetTable;
typedef std::map<std::string, HaScopeEntry> HaScopeTable;

template <typename T>
bool in(T value, std::initializer_list<T> list) {
return std::find(list.begin(), list.end(), value) != list.end();
}

class DashHaOrch : public ZmqOrch
{
public:
DashHaOrch(swss::DBConnector *db, const std::vector<std::string> &tableNames, DashOrch *dash_orch, swss::DBConnector *app_state_db, swss::ZmqServer *zmqServer);

private:
protected:
HaSetTable m_ha_set_entries;
HaScopeTable m_ha_scope_entries;

DashOrch *m_dash_orch;

void doTask(ConsumerBase &consumer);
void doTask(swss::NotificationConsumer &consumer);
void doTaskEniTable(ConsumerBase &consumer);
void doTaskHaSetTable(ConsumerBase &consumer);
void doTaskHaScopeTable(ConsumerBase &consumer);
Expand All @@ -52,9 +63,24 @@ class DashHaOrch : public ZmqOrch
bool setHaScopeFlowReconcileRequest(const std::string &key);
bool setHaScopeActivateRoleRequest(const std::string &key);
bool setEniHaScopeId(const sai_object_id_t eni_id, const sai_object_id_t ha_scope_id);
bool register_ha_set_notifier();
bool register_ha_scope_notifier();

std::string getHaSetObjectKey(const sai_object_id_t ha_set_id);
std::string getHaScopeObjectKey(const sai_object_id_t ha_scope_id);
std::time_t getNowTime(){
return std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
};

std::unique_ptr<swss::Table> dash_ha_set_result_table_;
std::unique_ptr<swss::Table> dash_ha_scope_result_table_;

std::unique_ptr<swss::DBConnector> m_dpuStateDbConnector;
std::unique_ptr<swss::Table> m_dpuStateDbHaSetTable;
std::unique_ptr<swss::Table> m_dpuStateDbHaScopeTable;

swss::NotificationConsumer* m_haSetNotificationConsumer;
swss::NotificationConsumer* m_haScopeNotificationConsumer;

public:
const HaSetTable& getHaSetEntries() const { return m_ha_set_entries; };
Expand Down
Loading
Loading