Skip to content

Replace priority queue with limited queue to prevent OOM polling issue #392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

Sarah-Singh16
Copy link

@Sarah-Singh16 Sarah-Singh16 commented May 13, 2025

Why I did it

fixes #26

How I did it

Replaced all priority queues with limited queues so that when limit is reached, a final end-process message is sent to the user and the process is stopped. The limited queue will check if the current queue size has reached the specified maximum queue size (Default 100MB). If the queue has reached its maximum size, it will return a Subscribe output queue exhausted error.

How to verify it

Connect telegraf to sonic device. Run telegraf on lower CPU limit ~20% and poll in SAMPLE mode.

UT: Shows Subscribe output queue exhausted error when item value queue size reaches the maximum output queue size (Default 100MB)

2025-07-25T19:04:12Z D! [inputs.gnmi] Connection to gNMI device 100.94.113.225:8080 closed
2025-07-25T19:04:12Z E! [inputs.gnmi] Error in plugin: aborted gNMI subscription: rpc error: code = InvalidArgument desc = Queue error:  rpc error: code = ResourceExhausted desc = Subscribe output queue exhausted

UT2: Shows Subscribe output queue exhausted error when item notification queue size reaches the maximum output queue size (Default 100MB)

2025-07-25T19:15:45Z D! [inputs.gnmi] Connection to gNMI device 100.94.113.225:8080 closed
2025-07-25T19:15:45Z E! [inputs.gnmi] Error in plugin: aborted gNMI subscription: rpc error: code = InvalidArgument desc = Subscribe output queue exhausted

UT3: Shows Maximum number of subscriptions reached error when the amount of subscription clients reaches the maximum client amount (Default 10)

2025-07-25T19:18:32Z D! [inputs.gnmi] Connection to gNMI device 100.94.113.225:8080 established
2025-07-25T19:18:33Z D! [inputs.gnmi] Connection to gNMI device 100.94.113.225:8080 closed
2025-07-25T19:18:33Z E! [inputs.gnmi] Error in plugin: aborted gNMI subscription: rpc error: code = ResourceExhausted desc = Maximum number of subscriptions reached

Which release branch to backport (provide reason below if selected)

  • 201811
  • 201911
  • 202006
  • 202012
  • 202106
  • 202111

Description for the changelog

Link to config_db schema for YANG module changes

A picture of a cute animal (not mandatory but encouraged)

Copy link

linux-foundation-easycla bot commented May 13, 2025

CLA Signed

The committers listed above are authorized under a signed CLA.

@mssonicbld
Copy link

/azp run

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@Sarah-Singh16
Copy link
Author

/azp run

Copy link

Commenter does not have sufficient privileges for PR 392 in repo sonic-net/sonic-gnmi

@mssonicbld
Copy link

/azp run

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@@ -15,6 +14,10 @@ import (
"sync"
)

var (
OutputQueSize uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No value is assigned to this global variable. So, NewLimitedQueue is always created with maxsize 0 (line 48). Does is mean unlimted size?

@@ -172,6 +176,56 @@ type DbClient struct {
errors int64
}

var queueLengthSum uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move all LimitedQueue related code to separate file? It is not a db_client specific utility


type LimitedQueue struct {
Q *queue.PriorityQueue
maxSize uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queue size is global but the maxsize is per-subscribe value. What is the usecase?

func (q *LimitedQueue) EnqueueItem(item Value) error {
queueLengthLock.Lock()
defer queueLengthLock.Unlock()
ilen := uint64(len(item.Val.GetJsonIetfVal()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot assume the value will always be ietf json encoded. Translib client uses the scalar values as prescribed by the gNMI spec -- fills the Notification field. Other clients are filling string & proto_bytes too.

There is a proto.Size() API to calculate message size. But calling it here may be costly. Needs some benchmarking. Alternatively we can think of limiting by number of messages.

return q.Q.Put(item)
} else {
log.Error("Telemetry output queue full, discarding item!")
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue #26 talks about closing the subscription with RESOURCE_EXHAUSTED status.

@Sarah-Singh16 Sarah-Singh16 force-pushed the add_limited_queue_for_oom_fix branch from 7f5a572 to b475b8e Compare July 22, 2025 00:11
@mssonicbld
Copy link

/azp run

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@@ -173,6 +176,55 @@ type DbClient struct {
errors int64
}

type LimitedQueue struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving to a separate file, say queue.go

func (q *LimitedQueue) EnqueueItem(item Value) error {
q.queueLengthLock.Lock()
defer q.queueLengthLock.Unlock()
ilen := (uint64)(proto.Size(item.Notification))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all data clients populate Value.Notification field -- AFAIK only the TranslClient uses it. Queue limit checks will not be applied to subscriptions served by other data clients

putFatalMsg(c.q, msg)
}

func putFatalMsg(q *queue.PriorityQueue, msg string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generic putFatalMsg() function is better than having separate enqueueFatalMsgXYZ functions per data client. You can even consider defining the generic function as a member of LimitedQueue struct.


func putFatalMsg(q *queue.PriorityQueue, msg string) {
q.Put(Value{
log.Error(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use log.ErrorDepth(1, msg) to preserve caller's file/line info

@@ -330,7 +394,7 @@ func (c *DbClient) AppDBPollRun(q *queue.PriorityQueue, poll chan struct{}, w *s
},
},
}
c.q.Put(Value{spbv})
c.q.EnqueueItem(Value{spbv})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible "queue full" error is ignored

if st.Code() == codes.ResourceExhausted {
enqueFatalMsgTranslib(c, st.Message())
}
}
log.Warning(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate log.. enqueFatalMsgTranslib() logs it again

@@ -186,8 +193,16 @@ func (ts *translSubscriber) notify(v *translib.SubscribeResponse) error {
}

spbv := &spb.Value{Notification: msg}
ts.client.q.Put(Value{spbv})
ts.client.q.EnqueueItem(Value{spbv})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate EnqueueItem() call.. There is one more in line 198, which seems to be the correct one

@@ -175,6 +177,8 @@ func setupFlags(fs *flag.FlagSet) (*TelemetryConfig, *gnmi.Config, error) {
Vrf: fs.String("vrf", "", "VRF name, when zmq_address belong on a VRF, need VRF name to bind ZMQ."),
EnableCrl: fs.Bool("enable_crl", false, "Enable certificate revocation list"),
CrlExpireDuration: fs.Int("crl_expire_duration", 86400, "Certificate revocation list cache expire duration"),
OutputQueSz: fs.Uint64("output_queue_size", 100, "Output Queue Maximum Size (MB)"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now this limit is only applied to notification messages and not for Get response messages. Consider renaming this option as "notification_queue_limit"

@sachinholla
Copy link
Contributor

Hi @qiluo-msft, @ganglyu -- this change touches all db clients. Please take a look

@ganglyu ganglyu requested a review from zbud-msft July 23, 2025 07:54
return err
}
} else {
log.V(6).Infof("Added spbv #%v", spbv)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log is never reached -- status.FromError() returns ok=true for nil errors.
See https://pkg.go.dev/google.golang.org/grpc/status#FromError

@@ -164,6 +166,11 @@ func (ts *translSubscriber) processResponses(q *queue.PriorityQueue) {
}

if err := ts.notify(v); err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.ResourceExhausted {
enqueFatalMsgTranslib(c, st.Message())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop the loop after adding a fatal message

@mssonicbld
Copy link

/azp run

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Memory grows until OOM with slow telemetry collector and lots of data.
3 participants