Skip to content

Commit

Permalink
Adding encryption support to netmiko CLI tools (#3505)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktbyers authored Oct 7, 2024
1 parent 3e0623e commit 9211005
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 3 deletions.
2 changes: 2 additions & 0 deletions netmiko/cli_tools/fix.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
management api http-commands
no protocol https ssl profile
10 changes: 8 additions & 2 deletions netmiko/cli_tools/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Any, Dict

from netmiko import ConnectHandler
from netmiko.utilities import load_devices, obtain_all_devices
from netmiko.utilities import obtain_all_devices, load_netmiko_yml
from netmiko.encryption_handling import decrypt_config, get_encryption_key
from netmiko.cli_tools import ERROR_PATTERN


Expand All @@ -25,7 +26,12 @@ def obtain_devices(device_or_group: str) -> Dict[str, Dict[str, Any]]:
a device-name. A group-name will be a list of device-names. A device-name
will just be a dictionary of device parameters (ConnectHandler **kwargs).
"""
my_devices = load_devices()
config_params, my_devices = load_netmiko_yml()
use_encryption = config_params.get("encryption", False)
encryption_type = config_params.get("encryption_type", "fernet")
if use_encryption:
key = get_encryption_key()
my_devices = decrypt_config(my_devices, key, encryption_type)
if device_or_group == "all":
devices = obtain_all_devices(my_devices)
else:
Expand Down
61 changes: 61 additions & 0 deletions netmiko/cli_tools/netmiko_encrypt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env python3
import os
import argparse
from getpass import getpass

from netmiko.utilities import load_netmiko_yml
from netmiko.encryption_handling import encrypt_value


def main():
parser = argparse.ArgumentParser(
description="Encrypt data using Netmiko's encryption."
)
parser.add_argument("data", help="The data to encrypt", nargs="?")
parser.add_argument(
"--key",
help="The encryption key (if not provided, will use NETMIKO_TOOLS_KEY env variable)",
)
parser.add_argument(
"--type",
choices=["fernet", "aes128"],
help="Encryption type (if not provided, will read from .netmiko.yml)",
)

args = parser.parse_args()

if args.data:
data = args.data
else:
data = getpass("Enter the data to encrypt: ")

# Get encryption key
if args.key:
key = args.key.encode()
else:
key = os.environ.get("NETMIKO_TOOLS_KEY")
if not key:
msg = """Encryption key not provided.
Use --key or set NETMIKO_TOOLS_KEY environment variable."""
raise ValueError(msg)
key = key.encode()

# Get encryption type
if args.type:
encryption_type = args.type
else:
config_params, my_devices = load_netmiko_yml()
encryption_type = config_params.get("encryption_type", "fernet")

if not encryption_type:
msg = """Encryption type not provided.
Use --type or set 'encryption_type' in .netmiko.yml in the '__meta__' section."""
raise ValueError(msg)

# Encrypt the password
encrypted_data = encrypt_value(data, key, encryption_type)
print(f"\nEncrypted data: {encrypted_data}\n")


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions netmiko/cli_tools/netmiko_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

COMMAND = "netmiko-show"

# FIX: --list-devices currently fails due to missing 'device/group'


def main_ep():
sys.exit(main(sys.argv[1:]))
Expand Down
95 changes: 95 additions & 0 deletions netmiko/encryption_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import os
import base64
from typing import Dict, Any, Union

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes


ENCRYPTION_PREFIX: str = "__encrypt__"


def get_encryption_key() -> bytes:
key: Union[str, None] = os.environ.get("NETMIKO_TOOLS_KEY")
if not key:
raise ValueError(
"Encryption key not found. Set the 'NETMIKO_TOOLS_KEY' environment variable."
)
return key.encode()


def decrypt_value(encrypted_value: str, key: bytes, encryption_type: str) -> str:
# Remove the encryption prefix
encrypted_value = encrypted_value.replace(ENCRYPTION_PREFIX, "", 1)

# Extract salt and ciphertext
salt_str, ciphertext_str = encrypted_value.split(":", 1)
salt = base64.b64decode(salt_str)
ciphertext = base64.b64decode(ciphertext_str)

kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
derived_key: bytes = kdf.derive(key)

if encryption_type == "fernet":
f = Fernet(base64.urlsafe_b64encode(derived_key))
return f.decrypt(ciphertext).decode()
elif encryption_type == "aes128":
iv = ciphertext[:16]
ciphertext = ciphertext[16:]
cipher = Cipher(algorithms.AES(derived_key[:16]), modes.CBC(iv))
decryptor = cipher.decryptor()
padded: bytes = decryptor.update(ciphertext) + decryptor.finalize()
unpadded: bytes = padded[: -padded[-1]]
return unpadded.decode()
else:
raise ValueError(f"Unsupported encryption type: {encryption_type}")


def decrypt_config(
config: Dict[str, Any], key: bytes, encryption_type: str
) -> Dict[str, Any]:
for device, params in config.items():
if isinstance(params, dict):
for field, value in params.items():
if isinstance(value, str) and value.startswith(ENCRYPTION_PREFIX):
len_prefix = len(ENCRYPTION_PREFIX)
data: str = value[len_prefix:]
params[field] = decrypt_value(data, key, encryption_type)
return config


def encrypt_value(value: str, key: bytes, encryption_type: str) -> str:
salt: bytes = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
derived_key: bytes = kdf.derive(key)

if encryption_type == "fernet":
f = Fernet(base64.urlsafe_b64encode(derived_key))
fernet_encrypted: bytes = f.encrypt(value.encode())
encrypted_data = fernet_encrypted
elif encryption_type == "aes128":
iv: bytes = os.urandom(16)
cipher = Cipher(algorithms.AES(derived_key[:16]), modes.CBC(iv))
encryptor = cipher.encryptor()
padded: bytes = value.encode() + b"\0" * (16 - len(value) % 16)
aes_encrypted: bytes = iv + encryptor.update(padded) + encryptor.finalize()
encrypted_data = aes_encrypted
else:
raise ValueError(f"Unsupported encryption type: {encryption_type}")

# Combine salt and encrypted data, and add prefix
b64_salt: str = base64.b64encode(salt).decode()
b64_encrypted: str = base64.b64encode(encrypted_data).decode()
return f"{ENCRYPTION_PREFIX}{b64_salt}:{b64_encrypted}"
16 changes: 15 additions & 1 deletion netmiko/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ def load_yaml_file(yaml_file: Union[str, bytes, "PathLike[Any]"]) -> Any:
sys.exit("Unable to open YAML file")


def load_netmiko_yml(file_name: Union[str, bytes, "PathLike[Any]", None] = None) -> Any:
"""
Load and parse the .netmiko.yml as determined by 'find_cfg_file'.
Parsing:
Retrieve and extract 'config' parameters: __meta__ field
Determine if encryption is being used and decrypt any encrypted fields
"""
yaml_devices_file = find_cfg_file(file_name)
netmiko_yaml_data = load_yaml_file(yaml_devices_file)
config_params = netmiko_yaml_data.pop("__meta__", {})
return config_params, netmiko_yaml_data


def load_devices(file_name: Union[str, bytes, "PathLike[Any]", None] = None) -> Any:
"""Find and load .netmiko.yml file."""
yaml_devices_file = find_cfg_file(file_name)
Expand Down Expand Up @@ -146,7 +160,7 @@ def find_cfg_file(
if files:
return files[0]
raise IOError(
".netmiko.yml file not found in NETMIKO_TOOLS environment variable directory,"
".netmiko.yml file not found in NETMIKO_TOOLS_CFG environment variable directory,"
" current directory, or home directory."
)

Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ ignore_errors = True

[mypy-netmiko.cli_tools.argument_handling]
ignore_errors = True

[mypy-netmiko.cli_tools.netmiko_encrypt]
ignore_errors = True

0 comments on commit 9211005

Please sign in to comment.