Skip to content

Add MDT Dialout Support #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ name = "pypi"
grpcio-tools = "*"
googleapis-common-protos = "*"
pylint = "*"
black = "*"
black = "==19.3b0"
twine = "*"

[packages]
502 changes: 267 additions & 235 deletions Pipfile.lock

Large diffs are not rendered by default.

157 changes: 157 additions & 0 deletions nxos_grpc/mdt_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""Copyright 2019 Cisco Systems

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

"""NX-OS gRPC Python MDT dialout server."""

from concurrent import futures
from threading import Lock
import grpc
from . import proto


def create_mdt_server(
insecure_addresses=["[::]:50051"], secure_addresses=[], max_workers=10
):
"""Utility method to start up a gRPC server receiving NX-OS MDT.
Derived from gRPC Basics - Python documentation.
https://grpc.io/docs/tutorials/basic/python/

Parameters
----------
insecure_addresses : list, optional
A list of strings which are IP:port(s) to serve without security.
This or secure_addresses must be specified.
secure_addresses : list, optional
A list of tuples which are (address, server_credential) to serve with security.
This or insecure_addresses must be specified.
max_workers : int, optional
The number of concurrent workers to handle RPCs.

Returns
-------
grpc_server : gRPC Server
The gRPC server instance spun up for usage.
Used to start/stop server.
mdt_server : MDTServer
The MDTServer instance.
Used to add callbacks to handle messages.
"""
grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
mdt_server = MDTServer()
proto.add_gRPCMdtDialoutServicer_to_server(mdt_server, grpc_server)
for address in insecure_addresses:
grpc_server.add_insecure_port(address)
for address, credential in secure_addresses:
grpc_server.add_secure_port(address, credential)
grpc_server.start()
return grpc_server, mdt_server


class MDTServer(proto.gRPCMdtDialoutServicer):
"""NX-OS gRPC MDT dialout server implementation.

Data is propagated to interested parties via callbacks.

Methods
-------
add_callback(...)
Add a function signature to be called per message for handling.

Examples
--------
>>> from nxos_grpc import create_mdt_server
>>> grpc_server, mdt_server = create_mdt_server()
>>> def nothing_much(message):
... print(message)
>>> mdt_server.add_callback(nothing_much)
...
"""

def __init__(self):
"""Initializes the MDTServer object and internal state items.
"""
# Uncertain if we need a lock, but concurrency
self.__chunk_map_lock = Lock()
# Map to store all our chunks
self.__chunk_map = {}
# Callbacks we've been provided
self.__callbacks = []

def add_callback(self, func):
"""Add a callback to handle each MDT message.

Parameters
----------
func : function
The function signature to be called for callback.
"""
self.__callbacks.append(func)

def MdtDialout(self, request_iterator, context):
"""Services the MDT messages from NX-OS."""
for message_chunk in request_iterator:
telemetry_pb = self.__assemble_telemetry_pb(message_chunk)
if telemetry_pb is not None:
for callback in self.__callbacks:
callback(telemetry_pb)
yield proto.MdtDialoutArgs(ReqId=message_chunk.ReqId)

def __assemble_telemetry_pb(self, message_chunk):
"""Handles NX-OS telemetry chunking.
Stores chunked messages in self.__chunk_map and returns
fully assembled messages once they match the specified totalSize.
"""
message = None
if len(message_chunk.data) < message_chunk.totalSize:
# Need to begin storing to reassemble
with self.__chunk_map_lock:
if message_chunk.ReqId in self.__chunk_map:
self.__chunk_map[message.ReqId] += message_chunk.data
else:
self.__chunk_map[message.ReqId] = message_chunk.data
if (
len(self.__chunk_map[message_chunk.ReqId])
== message_chunk.totalSize
):
message = proto.Telemetry()
message.ParseFromString(self.__chunk_map.pop(message_chunk.ReqId))
elif (
len(self.__chunk_map[message_chunk.ReqId]) > message_chunk.totalSize
):
raise Exception(
"Message %s assembly (%i) is larger than totalSize (%i)!"
% (
str(message_chunk.ReqId),
len(self.__chunk_map[message_chunk.ReqId]),
message_chunk.totalSize,
)
)
elif (
message_chunk.totalSize
and len(message_chunk.data) > message_chunk.totalSize
):
raise Exception(
"Message %s chunk (%i) is larger than totalSize (%i)!"
% (
str(message_chunk.ReqId),
len(message_chunk.data),
message_chunk.totalSize,
)
)
else:
# message_chunk is a complete telemetry message
message = proto.Telemetry()
message.ParseFromString(message_chunk.data)
return message
8 changes: 7 additions & 1 deletion nxos_grpc/proto/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .nxos_grpc_pb2_grpc import gRPCConfigOperStub
from .telemetry_bis_pb2 import Telemetry
from .nxos_grpc_pb2 import (
GetOperArgs,
GetArgs,
@@ -8,3 +8,9 @@
CloseSessionArgs,
KillArgs,
)
from .nxos_grpc_pb2_grpc import gRPCConfigOperStub
from .mdt_dialout_pb2 import MdtDialoutArgs
from .mdt_dialout_pb2_grpc import (
gRPCMdtDialoutServicer,
add_gRPCMdtDialoutServicer_to_server,
)
14 changes: 14 additions & 0 deletions nxos_grpc/proto/mdt_dialout.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package mdt_dialout;

service gRPCMdtDialout {
rpc MdtDialout(stream MdtDialoutArgs) returns(stream MdtDialoutArgs) {};
}

message MdtDialoutArgs {
int64 ReqId = 1;
bytes data = 2;
string errors = 3;
int32 totalSize = 4; // Set for messages that are chunked, it contains the original message size.
}
115 changes: 115 additions & 0 deletions nxos_grpc/proto/mdt_dialout_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions nxos_grpc/proto/mdt_dialout_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc

from nxos_grpc.proto import mdt_dialout_pb2 as nxos__grpc_dot_proto_dot_mdt__dialout__pb2


class gRPCMdtDialoutStub(object):
# missing associated documentation comment in .proto file
pass

def __init__(self, channel):
"""Constructor.

Args:
channel: A grpc.Channel.
"""
self.MdtDialout = channel.stream_stream(
'/mdt_dialout.gRPCMdtDialout/MdtDialout',
request_serializer=nxos__grpc_dot_proto_dot_mdt__dialout__pb2.MdtDialoutArgs.SerializeToString,
response_deserializer=nxos__grpc_dot_proto_dot_mdt__dialout__pb2.MdtDialoutArgs.FromString,
)


class gRPCMdtDialoutServicer(object):
# missing associated documentation comment in .proto file
pass

def MdtDialout(self, request_iterator, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_gRPCMdtDialoutServicer_to_server(servicer, server):
rpc_method_handlers = {
'MdtDialout': grpc.stream_stream_rpc_method_handler(
servicer.MdtDialout,
request_deserializer=nxos__grpc_dot_proto_dot_mdt__dialout__pb2.MdtDialoutArgs.FromString,
response_serializer=nxos__grpc_dot_proto_dot_mdt__dialout__pb2.MdtDialoutArgs.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'mdt_dialout.gRPCMdtDialout', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
251 changes: 126 additions & 125 deletions nxos_grpc/proto/nxos_grpc_pb2.py

Large diffs are not rendered by default.

92 changes: 92 additions & 0 deletions nxos_grpc/proto/telemetry_bis.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/* ----------------------------------------------------------------------------
* telemetry_bis.proto - Telemetry protobuf definitions
*
* August 2016
*
* Copyright (c) 2016 by Cisco Systems, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ----------------------------------------------------------------------------
*/

syntax = "proto3";

option go_package = "telemetry_bis";

/*
* Common message used as a header to both compact and self-describing
* telemetry messages.
*/

message Telemetry {
oneof node_id {
string node_id_str = 1;
// bytes node_id_uuid = 2; // not produced
}
oneof subscription {
string subscription_id_str = 3;
// uint32 subscription_id = 4; // not produced
}
// string sensor_path = 5; // not produced
string encoding_path = 6;
// string model_version = 7; // not produced
uint64 collection_id = 8;
uint64 collection_start_time = 9;
uint64 msg_timestamp = 10;
repeated TelemetryField data_gpbkv = 11;
TelemetryGPBTable data_gpb = 12;
uint64 collection_end_time = 13;
// uint64 heartbeat_sequence_number = 14; // not produced
}

/*
* Messages used to export content in GPB K/V form.
*
* The set of messages in this .proto are sufficient to decode all
* telemetry messages.
*/

message TelemetryField {
uint64 timestamp = 1;
string name = 2;
oneof value_by_type {
bytes bytes_value = 4;
string string_value = 5;
bool bool_value = 6;
uint32 uint32_value = 7;
uint64 uint64_value = 8;
sint32 sint32_value = 9;
sint64 sint64_value = 10;
double double_value = 11;
float float_value = 12;
}
repeated TelemetryField fields = 15;
}

/*
* Messages used to export content in compact GPB form
*
* Per encoding-path .proto files are required to decode keys/content
* pairs below.
*/

message TelemetryGPBTable {
repeated TelemetryRowGPB row = 1;
}

message TelemetryRowGPB {
uint64 timestamp = 1;
bytes keys = 10;
bytes content = 11;
}

381 changes: 381 additions & 0 deletions nxos_grpc/proto/telemetry_bis_pb2.py

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion nxos_grpc/response.py
Original file line number Diff line number Diff line change
@@ -55,7 +55,9 @@ def build_response(reqid, response_stream):
try:
response_obj.finalize()
except json.decoder.JSONDecodeError:
logging.exception('Error finalizing response JSON! Returning potentially un-finalized elements.')
logging.exception(
"Error finalizing response JSON! Returning potentially un-finalized elements."
)
return response_obj


4 changes: 0 additions & 4 deletions update_deps.sh

This file was deleted.

7 changes: 6 additions & 1 deletion update_protos.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
#!/usr/bin/env bash
pipenv run python -m grpc_tools.protoc --proto_path=. --python_out=. --grpc_python_out=. nxos_grpc/proto/nxos_grpc.proto
PROTO_BASE_PATH=nxos_grpc/proto
for PROTO in "$PROTO_BASE_PATH"/*.proto
do
echo "Generating $PROTO"
pipenv run python -m grpc_tools.protoc --proto_path=. --python_out=. --grpc_python_out=. $PROTO
done