Skip to content

OpenStack: Don't modify port dict when collecting additional information #10192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@
LOG = log.getLogger(__name__)


# A lightweight class to hold all of the additional information that we gather
# when translating from a Neutron port to a Calico WorkloadEndpoint.
class PortExtra(object):
def __init__(self):
self.fixed_ips = None
self.floating_ips = None
self.interface_name = None
self.network_name = None
self.network_qos_policy_id = None
self.project_data = None
self.qos = None
self.security_groups = None
self.security_group_names = {}


# The Calico WorkloadEndpoint that represents an OpenStack VM gets a pair of
# labels to indicate the project (aka tenant) that the VM belongs to. The
# label names are as follows, and the label values are the actual project ID
Expand All @@ -64,11 +79,6 @@
# alphanumeric character. If a project name does not already meet that, we
# substitute problem characters so that it does.

# Calico-specific keys in the port dict for storing information that
# we want to include in WorkloadEndpoints.
PORT_KEY_PROJ_DATA = 'calico-project-data'
PORT_KEY_SG_NAMES = 'calico-sg-names'


class WorkloadEndpointSyncer(ResourceSyncer):

Expand Down Expand Up @@ -145,9 +155,9 @@ def neutron_to_etcd_write_data(self, port, context, reread=False):
port = self.db.get_port(context, port['id'])
except n_exc.PortNotFound:
raise ResourceGone()
port = self.add_extra_port_information(context, port)
return (endpoint_spec(port),
endpoint_labels(port, self.namespace),
port_extra = self.get_extra_port_information(context, port)
return (endpoint_spec(port, port_extra),
endpoint_labels(port, self.namespace, port_extra),
endpoint_annotations(port))

def write_endpoint(self, port, context, must_update=False):
Expand All @@ -156,10 +166,11 @@ def write_endpoint(self, port, context, must_update=False):
port = self.db.get_port(context, port['id'])

# Fill out other information we need on the port.
port = self.add_extra_port_information(context, port)
port_extra = self.get_extra_port_information(context, port)

# Write the security policies for this port.
self.policy_syncer.write_sgs_to_etcd(port['security_groups'], context)
self.policy_syncer.write_sgs_to_etcd(port_extra.security_groups,
context)

# Implementation note: we could arguably avoid holding the transaction
# for this length and instead release it here, then use atomic CAS. The
Expand All @@ -170,8 +181,8 @@ def write_endpoint(self, port, context, must_update=False):
datamodel_v3.put("WorkloadEndpoint",
self.namespace,
endpoint_name(port),
endpoint_spec(port),
labels=endpoint_labels(port, self.namespace),
endpoint_spec(port, port_extra),
labels=endpoint_labels(port, self.namespace, port_extra),
annotations=endpoint_annotations(port),
mod_revision=mod_revision)

Expand All @@ -180,8 +191,8 @@ def delete_endpoint(self, port):
self.namespace,
endpoint_name(port))

def add_port_interface_name(self, port):
port['interface_name'] = 'tap' + port['id'][:11]
def add_port_interface_name(self, port, port_extra):
port_extra.interface_name = 'tap' + port['id'][:11]

def get_security_groups_for_port(self, context, port):
"""Checks which security groups apply for a given port.
Expand Down Expand Up @@ -225,51 +236,52 @@ def get_floating_ips_for_port(self, context, port):
)
]

def get_network_properties_for_port(self, context, port):
def get_network_properties_for_port(self, context, port, port_extra):
network = context.session.query(
models_v2.Network
).filter_by(
id=port['network_id']
).first()

try:
port['network_name'] = datamodel_v3.sanitize_label_name_value(
port_extra.network_name = datamodel_v3.sanitize_label_name_value(
network['name'],
NETWORK_NAME_MAX_LENGTH,
)
except Exception:
LOG.warning(f"Failed to find network name for port {port['id']}")

if 'qos_policy_id' in network:
port['network_qos_policy_id'] = network['qos_policy_id']
port_extra.network_qos_policy_id = network['qos_policy_id']

def add_extra_port_information(self, context, port):
"""add_extra_port_information
def get_extra_port_information(self, context, port):
"""get_extra_port_information

Gets extra information for a port that is needed before sending it to
etcd.
"""
LOG.debug("port = %r", port)
port['fixed_ips'] = self.get_fixed_ips_for_port(
port_extra = PortExtra()
port_extra.fixed_ips = self.get_fixed_ips_for_port(
context, port
)
port['floating_ips'] = self.get_floating_ips_for_port(
port_extra.floating_ips = self.get_floating_ips_for_port(
context, port
)
port['security_groups'] = self.get_security_groups_for_port(
port_extra.security_groups = self.get_security_groups_for_port(
context, port
)
self.get_network_properties_for_port(context, port)
self.get_network_properties_for_port(context, port, port_extra)

self.add_port_gateways(port, context)
self.add_port_interface_name(port)
self.add_port_project_data(port, context)
self.add_port_sg_names(port, context)
self.add_port_qos(port, context)
self.add_port_gateways(context, port_extra)
self.add_port_interface_name(port, port_extra)
self.add_port_project_data(port, context, port_extra)
self.add_port_sg_names(context, port_extra)
self.add_port_qos(port, context, port_extra)

return port
return port_extra

def add_port_gateways(self, port, context):
def add_port_gateways(self, context, port_extra):
"""add_port_gateways

Determine the gateway IP addresses for a given port's IP addresses, and
Expand All @@ -278,11 +290,11 @@ def add_port_gateways(self, port, context):
This method assumes it's being called from within a database
transaction and does not take out another one.
"""
for ip in port['fixed_ips']:
for ip in port_extra.fixed_ips:
subnet = self.db.get_subnet(context, ip['subnet_id'])
ip['gateway'] = subnet['gateway_ip']

def add_port_sg_names(self, port, context):
def add_port_sg_names(self, context, port_extra):
"""add_port_sg_names

Determine and store the name of each security group that a port uses.
Expand All @@ -296,17 +308,16 @@ def add_port_sg_names(self, port, context):
# race with multiple servers or threads trying to do this at the same
# time. Adding "default_sg=True" here suppresses that creation
# attempt.
port[PORT_KEY_SG_NAMES] = {}
filters = {'id': port['security_groups']}
filters = {'id': port_extra.security_groups}
for sg in self.db.get_security_groups(context, filters=filters,
default_sg=True):
sg_name = datamodel_v3.sanitize_label_name_value(
sg['name'],
SG_NAME_MAX_LENGTH
)
port[PORT_KEY_SG_NAMES][sg['id']] = sg_name
port_extra.security_group_names[sg['id']] = sg_name

def add_port_qos(self, port, context):
def add_port_qos(self, port, context, port_extra):
"""add_port_qos

Determine and store QoS parameters for a port.
Expand All @@ -316,7 +327,7 @@ def add_port_qos(self, port, context):
"""
qos = {}

qos_policy_id = port.get('qos_policy_id') or port.get('network_qos_policy_id')
qos_policy_id = port.get('qos_policy_id') or port_extra.network_qos_policy_id
LOG.debug("QoS Policy ID = %r", qos_policy_id)
if qos_policy_id:
rules = context.session.query(
Expand Down Expand Up @@ -357,14 +368,13 @@ def add_port_qos(self, port, context):
if cfg.CONF.calico.max_egress_connections_per_port != 0:
qos['egressMaxConnections'] = cfg.CONF.calico.max_egress_connections_per_port

port['qos'] = qos
port_extra.qos = qos

def add_port_project_data(self, port, context):
def add_port_project_data(self, port, context, port_extra):
"""add_port_project_data

Determine the OpenStack project name and parent ID for a given
port's project/tenant ID, and add it as
port['calico-project-data'].
port's project/tenant ID, and add it as port_extra.project_data.
"""
proj_id = port.get('project_id', port.get('tenant_id'))
if proj_id is None:
Expand All @@ -375,7 +385,7 @@ def add_port_project_data(self, port, context):
proj_data = self.proj_data_cache.get(proj_id)
if proj_data is not None:
LOG.debug("Project data %r was cached", proj_data)
port[PORT_KEY_PROJ_DATA] = proj_data
port_extra.project_data = proj_data
return

# Not cached, so look up the port's project in the Keystone DB.
Expand All @@ -385,7 +395,7 @@ def add_port_project_data(self, port, context):
LOG.warning("Unable to find project data for port: %r", port)
return

port[PORT_KEY_PROJ_DATA] = proj_data
port_extra.project_data = proj_data

def cache_port_project_data(self):
"""cache_port_project_data
Expand Down Expand Up @@ -418,10 +428,10 @@ def escape_dashes(s):
)


def endpoint_labels(port, namespace):
def endpoint_labels(port, namespace, port_extra):
labels = {}
for sg_id in port['security_groups']:
sg_name = port.get(PORT_KEY_SG_NAMES, {}).get(sg_id, '')
for sg_id in port_extra.security_groups:
sg_name = port_extra.security_group_names.get(sg_id, '')
labels[SG_LABEL_PREFIX + sg_id] = sg_name
if sg_name:
labels[SG_NAME_LABEL_PREFIX + sg_name] = sg_id
Expand All @@ -431,19 +441,19 @@ def endpoint_labels(port, namespace):
proj_id = port.get('project_id', port.get('tenant_id'))
if proj_id is not None:
labels[PROJECT_ID_LABEL_NAME] = proj_id
if PORT_KEY_PROJ_DATA in port:
name, parent_id = port[PORT_KEY_PROJ_DATA]
if port_extra.project_data:
name, parent_id = port_extra.project_data
labels[PROJECT_NAME_LABEL_NAME] = name
labels[PROJECT_PARENT_ID_LABEL_NAME] = parent_id

network_name = port.get('network_name')
network_name = port_extra.network_name
if network_name is not None:
labels[NETWORK_NAME_LABEL_NAME] = network_name
return labels


# Represent a Neutron port as a Calico v3 WorkloadEndpoint spec.
def endpoint_spec(port):
def endpoint_spec(port, port_extra):
"""endpoint_spec

Generate JSON WorkloadEndpointSpec for the given Neutron port.
Expand All @@ -455,15 +465,15 @@ def endpoint_spec(port):
'workload': port['device_id'],
'node': port['binding:host_id'],
'endpoint': port['id'],
'interfaceName': port['interface_name'],
'interfaceName': port_extra.interface_name,
'mac': port['mac_address'],
}

# Collect IPv4 and IPv6 addresses. On the way, also set the corresponding
# gateway fields. If there is more than one IPv4 or IPv6 gateway, the last
# one (in port['fixed_ips']) wins.
# one (in port_extra.fixed_ips) wins.
ip_nets = []
for ip in port['fixed_ips']:
for ip in port_extra.fixed_ips:
if ':' in ip['ip_address']:
ip_nets.append(ip['ip_address'] + '/128')
if ip['gateway'] is not None:
Expand All @@ -490,16 +500,16 @@ def endpoint_spec(port):
data['allowedIps'] = allowed_ips

ip_nats = []
for ip in port['floating_ips']:
for ip in port_extra.floating_ips:
ip_nats.append({
'internalIP': ip['int_ip'],
'externalIP': ip['ext_ip'],
})
if ip_nats:
data['ipNATs'] = ip_nats

if port['qos']:
data['qosControls'] = port['qos']
if port_extra.qos:
data['qosControls'] = port_extra.qos

# Return that data.
return data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,6 @@ def test_start_two_ports(self):
# Remove the QoS policy from the network again.
_log.debug("Retest after removing all QoS policy")
del self.osdb_networks[1]['qos_policy_id']
del self.osdb_ports[0]['network_qos_policy_id']
self.driver.update_port_postcommit(context)

# Expected changes
Expand Down