Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions docs/source/backends/openwrt.rst
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,21 @@ The following *configuration dictionary*:
],
},
],
}
},
# Auto-generated interfaces for bridge-vlans (e.g. "br-lan.1", "br-lan.2")
# can be overridden by defining them explicitly in the configuration.
{
"type": "ethernet",
"name": "br-lan.2",
"mtu": 1500,
"mac": "61:4A:A0:D7:3F:0E",
"addresses": [
{
"proto": "dhcp",
"family": "ipv4",
}
],
},
]
}

Expand Down Expand Up @@ -805,13 +819,13 @@ Will be rendered as follows:
list ports 'lan3:u*'
option vlan '2'

config interface 'vlan_br_lan_1'
config interface 'br_lan_1'
option device 'br-lan.1'
option proto 'none'

config interface 'vlan_br_lan_2'
config interface 'br_lan_2'
option device 'br-lan.2'
option proto 'none'
option proto 'dhcp'

config interface 'br_lan'
option device 'br-lan'
Expand Down
248 changes: 193 additions & 55 deletions netjsonconfig/backends/openwrt/converters/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from copy import deepcopy
from ipaddress import ip_address, ip_interface

from ....utils import merge_list
from ..schema import schema
from .base import OpenWrtConverter

Expand Down Expand Up @@ -101,7 +102,14 @@ def to_intermediate_loop(self, block, result, index=None):
if address:
uci_interface.update(address)
result.setdefault("network", [])
result["network"].append(self.sorted_dict(uci_interface))
# Use merge_list instead of appending the interface directly
# to allow users to override the auto-generated interface
# (e.g., when using VLAN filtering on a bridge).
result["network"] = merge_list(
result["network"],
[self.sorted_dict(uci_interface)],
identifiers=[".name", ".type"],
)
i += 1
return result

Expand Down Expand Up @@ -262,11 +270,12 @@ def __intermediate_vlan(self, uci_name, interface, vlan):
"vlan": vid,
"device": interface["ifname"],
}
if uci_name == self._get_uci_name(interface["ifname"]):
uci_vlan[".name"] = "vlan_{}".format(uci_vlan[".name"])
uci_vlan[".name"] = "vlan_{}".format(uci_vlan[".name"])
uci_vlan_interface = {
".type": "interface",
".name": uci_vlan[".name"],
# To avoid conflicts, auto-generated interfaces are prefixed with "if"
# because UCI does not support multiple blocks with the same name.
".name": f"{uci_name}_{vid}",
"device": "{ifname}.{vid}".format(ifname=interface["ifname"], vid=vid),
"proto": "none",
}
Expand Down Expand Up @@ -492,12 +501,112 @@ def __intermediate_dns_search(self, uci, address):
if dns_search:
return " ".join(dns_search)

def to_netjson(self, remove_block=True):
"""
Override the base ``to_netjson`` method to correctly handle
OpenWrt ≥ 21 (DSA) configurations.

On OpenWrt < 21 (pre-DSA), each ``interface`` block contained a complete
description of that interface. Starting with OpenWrt 21 (DSA), key
settings are split across multiple blocks (``device``, ``bridge-vlan``,
and ``interface``). This means that individual blocks are no longer
self-contained and must be parsed in a specific order to produce a
valid and consistent NetJSON representation.

Parsing order:
1. Parse all ``device`` and ``bridge-vlan`` blocks.
2. Parse ``interface`` blocks not referencing VLAN interfaces.
3. Add fallback interfaces for any unconsumed ``device_config``.
4. Parse remaining ``interface`` blocks (including VLAN interfaces).
"""

result = OrderedDict()
# Parse device blocks
result = self.__process_blocks(
result,
remove_block,
self.__skip_non_device_block,
self.__process_device_block,
)
# Parse non VLAN interfaces
result = self.__process_blocks(result, remove_block, self.__skip_vlan_block)
# Add fallback interfaces before parsing VLAN interfaces.
# This ensures that the primary bridge/device interfaces are already present so
# subsequently parsed VLAN/interface blocks can correctly reference or
# override them. This preserves the required ordering for producing
# a consistent NetJSON -> UCI mapping.
result = self.__add_fallback_interfaces(result)
# Parse remaining interfaces
result = self.__process_blocks(result, remove_block, self.should_skip_block)

return result

def __is_device_config(self, interface):
"""
determines if the configuration is a device from NetJSON
"""
return interface.get("type", None) == "device"

def __skip_non_device_block(self, block):
return self.should_skip_block(block) or (
not block.get("bridge_21", None) and not self.__is_device_config(block)
)

def __skip_vlan_block(self, block):
return self.should_skip_block(block) or (
block.get("device")
and "." in block["device"]
and block["device"].split(".")[0] in self._device_config
)

def __process_blocks(self, result, remove_block, skip_fn, handler_fn=None):
intermediate_data = self.to_netjson_clean(
self.intermediate_data[self.intermediate_key]
)
handler_fn = handler_fn or self.to_netjson_loop
for index, block in enumerate(list(intermediate_data), start=1):
if skip_fn(block):
continue
if remove_block:
self.intermediate_data[self.intermediate_key].remove(block)
result = handler_fn(block, result, index)
return result

def __process_device_block(self, block, result, index):
if block.get("type") == "bridge-vlan":
device_name = block.get("device")
if device_name and device_name not in self._device_config:
self._device_config[device_name] = {}
self.__netjson_vlan(block, self._device_config[device_name])
else:
self.__netjson_device(block)
return result

def __add_fallback_interfaces(self, result):
"""Add fallback interfaces for any unconsumed device configs."""

def make_fallback_interface(name, config):
interface_name = config.get(".name", name)
if interface_name.startswith("device_"):
interface_name = interface_name[7:] # len("device_") = 7
return OrderedDict(
{
".type": "interface",
".name": interface_name,
"device": name,
"proto": "none",
}
)

index = len(result) + 1
for name, device_config in self._device_config.copy().items():
if device_config.get("consumed", False):
continue
interface = make_fallback_interface(name, device_config)
result = self.to_netjson_loop(interface, result, index)
index += 1
return result

def to_netjson_loop(self, block, result, index):
_type = block.get(".type")
if _type == "globals":
Expand Down Expand Up @@ -551,72 +660,97 @@ def __get_device_config_for_interface(self, interface):
device = interface.get("device", "")
name = interface.get("name")
device_config = self._device_config.get(device, self._device_config.get(name))
if not device_config and "." in device:
cleaned_device, _, _ = device.rpartition(".")
device_config = self._device_config.get(cleaned_device)
if not device_config:
if "." in device:
cleaned_device, _, _ = device.rpartition(".")
device_config = self._device_config.get(cleaned_device)
if not device_config:
return device_config
if interface.get("type") == "bridge-vlan":
return device_config
# ifname has been renamed to device in OpenWrt 21.02
interface["ifname"] = interface.pop("device")
return device_config

def __update_interface_device_config(self, interface, device_config):
if interface.get("type") == "bridge-vlan":
return self.__netjson_vlan(interface, device_config)
interface = self._handle_bridge_vlan(interface, device_config)
if not interface:
return
if device_config.pop("bridge_21", None):
def __add_options_from_device_config(self, interface, device_config):
if device_config.get("bridge_21", None) and interface.get(
"ifname"
) != device_config.get("name"):
interface[".name"] = self._get_uci_name(interface["ifname"])
return interface

if device_config.get("consumed", False):
return interface

if device_config.get("bridge_21", None):
for option in device_config:
if option == "bridge_21":
continue
# ifname has been renamed to ports in OpenWrt 21.02 bridge
if option == "ports":
interface["ifname"] = " ".join(device_config[option])
else:
interface[option] = device_config[option]

# Merging L2 options to interface
for options in (
self._bridge_interface_options["all"]
+ self._bridge_interface_options["stp"]
+ self._bridge_interface_options["igmp_snooping"]
):
if options in device_config:
interface[options] = device_config.pop(options)
interface[options] = device_config.get(options)
if device_config.get("type", "").startswith("8021"):
interface["ifname"] = "".join(device_config["name"].split(".")[:-1])
device_config["consumed"] = True
return interface

def _handle_bridge_vlan(self, interface, device_config):
if "." in interface.get("ifname", ""):
_, _, vlan_id = interface["ifname"].rpartition(".")
if device_config.get("vlan_filtering", []):
for vlan in device_config["vlan_filtering"]:
if vlan["vlan"] == int(vlan_id):
return
def _handle_bridge_vlan_interface(self, interface, device_config):
ifname = interface.get("ifname", "")
if "." not in ifname:
# no VLAN suffix, nothing to do
return interface

_, _, vlan_id = interface["ifname"].rpartition(".")
for vlan in device_config.get("vlan_filtering", []):
if vlan["vlan"] == int(vlan_id):
if interface.get("proto") == "none" and interface.keys() == {
".type",
".name",
"ifname",
"proto",
}:
# Return None to ignore this auto-generated interface.
return
# Auto-generated interface is being overridden by user.
# Override the ".name" to avoid setting "network" field
# in NetJSON output.
interface[".name"] = self._get_uci_name(interface["ifname"])
break
return interface

def __netjson_dsa_interface(self, interface):
if self.__is_device_config(interface) or interface.get("bridge_21", None):
self.__netjson_device(interface)
else:
device_config = self.__get_device_config_for_interface(interface)
if device_config:
interface = self.__update_interface_device_config(
interface, device_config
)
# if device_config is empty but the interface references it
elif "device" in interface and "ifname" not in interface:
# .name may have '.' substituted with _,
# which will yield unexpected results
# for this reason we use the name stored
# in the device property before removing it
interface["ifname"] = interface.pop("device")
# Device configs are now handled in the first pass and removed,
# so we only process actual interface blocks here
device_config = self.__get_device_config_for_interface(interface)
if device_config:
interface = self._handle_bridge_vlan_interface(interface, device_config)
if not interface:
return
interface = self.__add_options_from_device_config(interface, device_config)
# if device_config is empty but the interface references it
elif "device" in interface and "ifname" not in interface:
# .name may have '.' substituted with _,
# which will yield unexpected results
# for this reason we use the name stored
# in the device property before removing it
interface["ifname"] = interface.pop("device")
return interface

def __netjson_device(self, interface):
interface["network"] = interface.pop(".name").lstrip("device_")
name = interface.pop(".name")
# Remove "device_" prefix if present
if name.startswith("device_"):
interface["network"] = name[7:] # len("device_") = 7
else:
interface["network"] = name
for option in [
"txqueuelen",
"neighreachabletime",
Expand Down Expand Up @@ -647,27 +781,31 @@ def __netjson_device(self, interface):
except KeyError:
continue
name = interface.get("name")
self._device_config[name] = interface
try:
self._device_config[name].update(interface)
except KeyError:
self._device_config[name] = interface

def __netjson_vlan(self, vlan, device_config):
# Clean up VLAN filtering option from the native config
if device_config.get("vlan_filtering") == "1":
device_config.pop("vlan_filtering")
netjson_vlan = {"vlan": int(vlan["vlan"]), "ports": []}
for port in vlan.get("ports", []):
port_config = port.split(":")
port = {"ifname": port_config[0]}
tagging = port_config[1][0]
pvid = False
if len(port_config[1]) > 1:
pvid = True
port.update(
{
"tagging": tagging,
"primary_vid": pvid,
}
)
port = {
"ifname": port_config[0],
"tagging": "u",
"primary_vid": False,
}
if len(port_config) > 1:
port["tagging"] = port_config[1][0]
if len(port_config[1]) > 1:
port["primary_vid"] = True
netjson_vlan["ports"].append(port)
if isinstance(device_config["vlan_filtering"], list):
try:
device_config["vlan_filtering"].append(netjson_vlan)
else:
except KeyError:
device_config["vlan_filtering"] = [netjson_vlan]
return

Expand Down
7 changes: 5 additions & 2 deletions netjsonconfig/backends/openwrt/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,12 @@ def _set_uci_block_type(self, block):
# process bridges using the interface converter,
# therefore we need to update block type here.
if block[".type"] in ["device", "bridge-vlan"]:
if block.get("type") in ["bridge", "8021q", "8021ad"]:
if (
block.get("type") in ["bridge", "8021q", "8021ad"]
or block[".type"] == "bridge-vlan"
):
block["bridge_21"] = True
elif block[".type"] == "bridge-vlan":
if block[".type"] == "bridge-vlan":
block["type"] = "bridge-vlan"
elif not block.get("type", None):
block["type"] = "device"
Expand Down
Loading