#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2020 FABRIC Testbed
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Author: Paul Ruth (pruth@renci.org)
"""
Methods to work with FABRIC `network services`_.
.. _`network services`: https://learn.fabric-testbed.net/knowledge-base/glossary/#network_service
"""
from __future__ import annotations
import logging
import threading
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from fabrictestbed.external_api.orchestrator_client import SliverDTO
from fim.slivers.path_info import Path
from fim.user import ERO, Gateway
from tabulate import tabulate
from fabrictestbed_extensions.fablib.exceptions import ResourceNotFoundError
from fabrictestbed_extensions.utils.utils import Utils
if TYPE_CHECKING:
from fabrictestbed_extensions.fablib.slice import Slice
from fabrictestbed_extensions.fablib.interface import Interface
import ipaddress
import json
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from fabrictestbed.slice_editor import Capacities, Labels
from fabrictestbed.slice_editor import NetworkService as FimNetworkService
from fabrictestbed.slice_editor import ServiceType, UserData
from fim.slivers.network_service import NetworkServiceSliver, NSLayer, ServiceType
from fim.user.network_service import MirrorDirection
from fabrictestbed_extensions.fablib.template_mixin import TemplateMixin
log = logging.getLogger("fablib")
[docs]
class NetworkService(TemplateMixin):
"""
A class for working with FABRIC network services.
"""
_show_title = "Network"
network_service_map = {
"L2Bridge": ServiceType.L2Bridge,
"L2PTP": ServiceType.L2PTP,
"L2STS": ServiceType.L2STS,
"PortMirror": ServiceType.PortMirror,
"FABNetv4": ServiceType.FABNetv4,
"FABNetv6": ServiceType.FABNetv6,
"FABNetv4Ext": ServiceType.FABNetv4Ext,
"FABNetv6Ext": ServiceType.FABNetv6Ext,
"L3VPN": ServiceType.L3VPN,
}
# Type names used in fim network services
fim_l2network_service_types = ["L2Bridge", "L2PTP", "L2STS"]
fim_l3network_service_types = [
"FABNetv4",
"FABNetv6",
"FABNetv4Ext",
"FABNetv6Ext",
"L3VPN",
]
fim_special_service_types = ["PortMirror"]
@staticmethod
def __get_fim_l2network_service_types() -> List[str]:
"""
Not intended for API use. Returns a list of FIM L2 network service types.
:return: List of FIM L2 network service types.
:rtype: List[str]
"""
return NetworkService.fim_l2network_service_types
@staticmethod
def __get_fim_l3network_service_types() -> List[str]:
"""
Not intended for API use. Returns a list of FIM L3 network service types.
:return: List of FIM L3 network service types.
:rtype: List[str]
"""
return NetworkService.fim_l3network_service_types
@staticmethod
def __get_fim_special_service_types() -> List[str]:
"""
Not intended for API use. Returns a list of FIM special service types.
:return: List of FIM special service types.
:rtype: List[str]
"""
return NetworkService.fim_special_service_types
[docs]
@staticmethod
def get_fim_network_service_types() -> List[str]:
"""
Not intended for API use. Returns a list of all FIM network service types.
:return: List of all FIM network service types.
:rtype: List[str]
"""
return (
NetworkService.__get_fim_l2network_service_types()
+ NetworkService.__get_fim_l3network_service_types()
+ NetworkService.__get_fim_special_service_types()
)
@staticmethod
def __calculate_l2_nstype(
interfaces: List[Interface] = None, ero_enabled: bool = False
) -> ServiceType:
"""
Not inteded for API use
Determines the L2 network service type based on the number of interfaces inputted.
:param interfaces: a list of interfaces
:type interfaces: list[Interface]
:param ero_enabled: Flag indicating if ERO is specified
:type ero_enabled: bool
:raises Exception: if no network service type is not appropriate for the number of interfaces
:return: the network service type
:rtype: ServiceType
"""
from fabrictestbed_extensions.fablib.facility_port import FacilityPort
# if there is a basic NIC, WAN must be STS
basic_nic_count = 0
sites = set([])
includes_facility_port = False
facility_port_interfaces = 0
for interface in interfaces:
sites.add(interface.get_site())
if isinstance(interface.get_node(), FacilityPort):
includes_facility_port = True
facility_port_interfaces += 1
if interface.get_model() == "NIC_Basic":
basic_nic_count += 1
rtn_nstype = None
if 1 >= len(sites) >= 0:
rtn_nstype = NetworkService.network_service_map["L2Bridge"]
# elif basic_nic_count == 0 and len(sites) == 2 and len(interfaces) == 2:
# #TODO: remove this when STS works on all links.
# rtn_nstype = NetworkService.network_service_map['L2PTP']
elif len(sites) == 2:
# Use L2STS when connecting two facility ports instead of L2PTP
# L2PTP limitation for Facility Ports:
# basically the layer-2 point-to-point server template applied is not popping
# vlan tags over the MPLS tunnel between two facility ports.
if (
(includes_facility_port and facility_port_interfaces < 2) or ero_enabled
) and not basic_nic_count:
# For now WAN FacilityPorts require L2PTP
rtn_nstype = NetworkService.network_service_map["L2PTP"]
elif len(interfaces) >= 2:
rtn_nstype = NetworkService.network_service_map["L2STS"]
else:
raise Exception(
f"Invalid Network Service: Networks are limited to 2 unique sites. Site requested: {sites}"
)
return rtn_nstype
@staticmethod
def __validate_nstype(type, interfaces):
"""
Not intended for API use
Verifies the network service type against the number of interfaces.
:param type: the network service type to check
:type type: ServiceType
:param interfaces: the list of interfaces to check
:type interfaces: list[Interface]
:raises Exception: if the network service type is invalid based on the number of interfaces
:return: true if the network service type is valid based on the number of interfaces
:rtype: bool
"""
# Just an empty network created; NS type would be validated when add_interface is invoked.
if not len(interfaces):
return True
from fabrictestbed_extensions.fablib.facility_port import FacilityPort
sites = set([])
nics = set([])
nodes = set([])
for interface in interfaces:
try:
sites.add(interface.get_site())
nics.add(interface.get_model())
nodes.add(interface.get_node())
except Exception as e:
log.info(
f"validate_nstype: skipping interface {interface.get_name()}, likely its a facility port"
)
# models: 'NIC_Basic', 'NIC_ConnectX_6', 'NIC_ConnectX_5'
if type == NetworkService.network_service_map["L2Bridge"]:
if len(sites) > 1:
raise Exception(
f"Network type {type} must be empty or include interfaces from exactly one site. {len(sites)} "
f"sites requested: {sites}"
)
elif type == NetworkService.network_service_map["L2PTP"]:
if not len(sites) == 2:
raise Exception(
f"Network type {type} must include interfaces from exactly two sites. {len(sites)} sites "
f"requested: {sites}"
)
if "NIC_Basic" in nics:
raise Exception(
f"Network type {type} does not support interfaces of type 'NIC_Basic'"
)
elif type == NetworkService.network_service_map["L2STS"]:
exception_list = []
if len(sites) != 2:
exception_list.append(
f"Network type {type} must include interfaces from exactly two sites. {len(sites)} sites "
f"requested: {sites}"
)
if len(interfaces) > 2:
hosts = set([])
nodes_per_site = {}
for interface in interfaces:
node = interface.get_node()
if node.get_site() not in nodes_per_site:
nodes_per_site[node.get_site()] = 0
if isinstance(node, FacilityPort):
continue
nodes_per_site[node.get_site()] += 1
for interface in interfaces:
node = interface.get_node()
if (
interface.get_model() == "NIC_Basic"
and nodes_per_site[node.get_site()] > 1
):
if node.get_host() is None:
exception_list.append(
f"Network type {type} does not support multiple NIC_Basic interfaces on VMs "
f"residing on the same host. Please see Node.set_host(host_name) to explicitly "
f"bind a nodes to a specific host. Node {node.get_name()} is unbound."
)
elif node.get_host() in hosts:
exception_list.append(
f"Network type {type} does not support multiple NIC_Basic interfaces on VMs residing "
f"on the same host. Please see Node.set_host(host_name) to explicitly bind a nodes "
f"to a specific host. Multiple nodes bound to {node.get_host()}."
)
else:
hosts.add(node.get_host())
if len(exception_list) > 0:
raise Exception(f"{exception_list}")
else:
raise Exception(f"Unknown network type {type}")
return True
[docs]
@staticmethod
def new_portmirror_service(
slice: Slice = None,
name: str = None,
mirror_interface_name: str = None,
mirror_interface_vlan: str = None,
receive_interface: Optional[Interface] = None,
mirror_direction: str = "both",
) -> NetworkService:
"""
Instantiate a new PortMirror service.
``mirror_direction`` can be ``"rx"``, ``"tx"`` or ``"both"``
(non-case-sensitive)
"""
# decode the direction
if not isinstance(mirror_interface_name, str):
raise Exception(
f"When creating a PortMirror service mirror_interface is specified by name"
)
if not isinstance(mirror_direction, str):
raise Exception(
f'When creating a PortMirror service mirror_direction is a string "rx", "tx" or "both"'
f'defaulting to "both"'
)
if not receive_interface:
raise Exception(
f"For PortMirror service the receiving interface must be specified upfront"
)
direction = MirrorDirection.Both
# enable below when we are officially off python 3.9 and into 3.10 or higher
# match mirror_direction.lower():
# case ['rx']:
# direction = MirrorDirection.RX_Only
# case ['tx']:
# direction = MirrorDirection.TX_Only
# case ['both']:
# direction = MirrorDirection.Both
# case _:
# raise Exception(f'Unknown direction specifier "{mirror_direction}" when creating PortMirror'
# f'service {name}')
no_case_direction = mirror_direction.lower()
if no_case_direction == "rx":
direction = MirrorDirection.RX_Only
elif no_case_direction == "tx":
direction = MirrorDirection.TX_Only
elif no_case_direction == "both":
pass
else:
raise Exception(
f'Unknown direction specifier "{mirror_direction}" when creating PortMirror'
f"service {name}"
)
log.info(
f"Create PortMirror Service: Slice: {slice.get_name()}, Network Name: {name} listening on "
f"{mirror_interface_name} with direction {direction}"
)
fim_network_service = slice.topology.add_port_mirror_service(
name=name,
from_interface_name=mirror_interface_name,
from_interface_vlan=mirror_interface_vlan,
to_interface=receive_interface.fim_interface,
direction=direction,
)
network_service = NetworkService(
slice=slice, fim_network_service=fim_network_service
)
network_service.init_fablib_data()
return network_service
[docs]
@staticmethod
def new_l3network(
slice: Slice = None,
name: str = None,
interfaces: List[Interface] = [],
type: str = None,
user_data: dict = {},
technology: str = None,
subnet: Optional[ipaddress.ip_network] = None,
site: str = None,
):
"""
Not inteded for API use. See slice.add_l3network
"""
if type == "IPv6":
nstype = ServiceType.FABNetv6
elif type == "IPv4":
nstype = ServiceType.FABNetv4
elif type == "IPv4Ext":
nstype = ServiceType.FABNetv4Ext
elif type == "IPv6Ext":
nstype = ServiceType.FABNetv6Ext
elif type == "L3VPN":
nstype = ServiceType.L3VPN
else:
raise Exception(
"Invalid L3 Network Type: Allowed values [IPv4, IPv4Ext, IPv6, IPv6Ext, L3VPN]"
)
# TODO: need a fabnet version of this
# validate nstype and interface List
# NetworkService.validate_nstype(nstype, interfaces)
return NetworkService.__new_network_service(
slice=slice,
name=name,
nstype=nstype,
interfaces=interfaces,
user_data=user_data,
technology=technology,
subnet=subnet,
site=site,
)
[docs]
@staticmethod
def new_l2network(
slice: Slice = None,
name: str = None,
interfaces: List[Interface] = [],
type: str = None,
user_data: dict = {},
):
"""
Not inteded for API use. See slice.add_l2network
Creates a new L2 network service.
:param slice: the fablib slice to build this network on
:type slice: Slice
:param name: the name of the new network
:type name: str
:param interfaces: a list of interfaces to build the network service on
:type interfaces: list[Interface]
:param type: the type of network service to build (optional)
:tyep type: str
:return: the new L2 network service
:rtype: NetworkService
"""
if type is None:
nstype = NetworkService.__calculate_l2_nstype(interfaces=interfaces)
else:
if type in NetworkService.__get_fim_l2network_service_types():
nstype = NetworkService.network_service_map[type]
else:
raise Exception(
f"Invalid l2 network type: {type}. Please choose from "
f"{NetworkService.__get_fim_l2network_service_types()} or None for automatic selection"
)
# validate nstype and interface List
NetworkService.__validate_nstype(nstype, interfaces)
# Set default VLANs for P2P networks that did not pass in VLANs
if nstype == ServiceType.L2PTP and len(
interfaces
): # or nstype == ServiceType.L2STS:
vlan1 = interfaces[0].get_vlan()
vlan2 = interfaces[1].get_vlan()
if vlan1 is None and vlan2 is None:
# TODO: Long term we might have multiple vlan on one property
# and will need to make sure they are unique. For now this okay
interfaces[0].set_vlan("100")
interfaces[1].set_vlan("100")
elif vlan1 is None and vlan2 is not None:
# Match VLANs if one is set.
interfaces[0].set_vlan(vlan2)
elif vlan1 is not None and vlan2 is None:
# Match VLANs if one is set.
interfaces[1].set_vlan(vlan1)
# for interface in interfaces:
# if interface.get_model() != 'NIC_Basic' and not interface.get_vlan():
#
# interface.set_vlan("100")
network_service = NetworkService.__new_network_service(
slice=slice,
name=name,
nstype=nstype,
interfaces=interfaces,
user_data=user_data,
)
return network_service
@staticmethod
def __new_network_service(
slice: Slice = None,
name: str = None,
nstype: ServiceType = None,
interfaces: List[Interface] = [],
user_data: dict = {},
technology: str = None,
subnet: Optional[ipaddress.ip_network] = None,
site: str = None,
):
"""
Not intended for API use. See slice.add_l2network
Creates a new FABRIC network service and returns the fablib instance.
:param slice: the fabric slice to build the network service with
:type slice: Slice
:param name: the name of the new network service
:type name: str
:param nstype: the type of network service to create
:type nstype: ServiceType
:param interfaces: a list of interfaces to
:type interfaces: List
:param technology: Specify the technology used should be set to AL2S when using for AL2S peering; otherwise None
:type technology: str
:param subnet: Request a specific subnet for FabNetv4, FabNetv6 or FabNetv6Ext services.
It's ignored for any other services.
:type ipaddress.ip_network
:param site: Site for L3 networks
:type site: str
:return: the new fablib network service
:rtype: NetworkService
"""
fim_interfaces = []
for interface in interfaces:
fim_interfaces.append(interface.get_fim())
log.info(
f"Create Network Service: Slice: {slice.get_name()}, Network Name: {name}, Type: {nstype}"
)
fim_network_service = slice.topology.add_network_service(
name=name,
nstype=nstype,
interfaces=fim_interfaces,
technology=technology,
)
if (
site is not None
and nstype in NetworkService.__get_fim_l3network_service_types()
):
fim_network_service.site = site
if subnet:
if nstype == ServiceType.FABNetv4:
fim_network_service.gateway = Gateway(
lab=Labels(
ipv4_subnet=subnet.with_prefixlen,
ipv4=str(next(subnet.hosts())),
)
)
elif nstype in [ServiceType.FABNetv6, ServiceType.FABNetv6Ext]:
fim_network_service.gateway = Gateway(
lab=Labels(
ipv6_subnet=subnet.with_prefixlen,
ipv6=str(next(subnet.hosts())),
)
)
network_service = NetworkService(
slice=slice, fim_network_service=fim_network_service
)
network_service.set_user_data(user_data)
network_service.init_fablib_data()
return network_service
[docs]
@staticmethod
def get_l3network_services(slice: Slice = None) -> list:
"""
Gets all L3 networks services in this slice
:return: List of all network services in this slice
:rtype: List[NetworkService]
"""
topology = slice.get_fim_topology()
rtn_network_services = []
fim_network_service = None
log.debug(
f"NetworkService.get_fim_l3network_service_types(): {NetworkService.__get_fim_l3network_service_types()}"
)
for net_name, net in topology.network_services.items():
log.debug(f"scanning network: {net_name}, net: {net}")
if (
str(net.get_property("type"))
in NetworkService.__get_fim_l3network_service_types()
):
log.debug(f"returning network: {net_name}, net: {net}")
rtn_network_services.append(
NetworkService(slice=slice, fim_network_service=net)
)
return rtn_network_services
[docs]
@staticmethod
def get_l3network_service(slice: Slice = None, name: str = None):
"""
Gets a particular L3 network service from this slice.
:param slice: the fabric slice to build this network on
:type slice: Slice
:param name: Name network
:type name: String
:return: network services on this slice
:rtype: list[NetworkService]
"""
for net in NetworkService.get_l3network_services(slice=slice):
if net.get_name() == name:
return net
raise Exception(f"Network not found. Slice {slice.slice_name}, network {name}")
[docs]
@staticmethod
def get_l2network_services(slice: Slice = None) -> list:
"""
Not inteded for API use.
Gets a list of L2 network services on a fablib slice.
:param slice: the fablib slice from which to get the network services
:type slice: Slice
:return: a list of network services on slice
:rtype: list[NetworkService]
"""
topology = slice.get_fim_topology()
rtn_network_services = []
for net_name, net in topology.network_services.items():
if (
str(net.get_property("type"))
in NetworkService.__get_fim_l2network_service_types()
):
rtn_network_services.append(
NetworkService(slice=slice, fim_network_service=net)
)
return rtn_network_services
[docs]
@staticmethod
def get_l2network_service(slice: Slice = None, name: str = None):
"""
Not inteded for API use.
Gets a particular network service on a fablib slice.
:param slice: the fablib slice from which to get the network service
:type slice: Slice
:param name: the name of the network service to get
:type name: str
:raises Exception: if the network is not found
:return: the particular network service
:rtype: NetworkService
"""
for net in NetworkService.get_l2network_services(slice=slice):
if net.get_name() == name:
return net
raise Exception(f"Network not found. Slice {slice.slice_name}, network {name}")
[docs]
@staticmethod
def get_network_services(
slice: Slice = None, output: str = "list"
) -> dict[str, NetworkService]:
"""
Gets all network services (L2 and L3) in this slice
:return: Dict of all network services in this slice
:rtype: dict[str, NetworkService]
"""
topology = slice.get_fim_topology()
rtn_network_services = {}
for net_name, net in topology.network_services.items():
if (
str(net.get_property("type"))
in NetworkService.get_fim_network_service_types()
):
rtn_network_services[net_name] = NetworkService(
slice=slice, fim_network_service=net
)
if output == "dict":
return rtn_network_services
else:
return list(rtn_network_services.values())
[docs]
@staticmethod
def get_network_service(slice: Slice = None, name: str = None):
"""
Gest a particular network service from this slice.
:param slice: the fablib slice from which to get the network services
:type slice: Slice
:param name: the name of the network service to search for
:type name: str
:return: a particular network service
:rtype: NetworkService
"""
for net in NetworkService.get_network_services(slice=slice):
if net.get_name() == name:
return net
raise Exception(f"Network not found. Slice {slice.slice_name}, network {name}")
def __init__(
self,
slice: Slice = None,
fim_network_service: FimNetworkService = None,
name: str = None,
):
"""
.. note::
Not inteded for API use.
:param slice: the fablib slice to set as instance state
:type slice: Slice
:param fim_network_service: the FIM network service to set as
instance state
:type fim_network_service: FimNetworkService
:param name: the name of the network service
:type name: str
"""
super().__init__()
self.fim_network_service = fim_network_service
self.slice = slice
self.interfaces = None
self.sliver = None
try:
if self.slice.isStable():
self.sliver = self.slice.get_sliver(
reservation_id=self.get_reservation_id()
)
except Exception:
pass
self.lock = threading.Lock()
# Caching support
self._cached_type: Optional[str] = None
self._cached_layer: Optional[str] = None
self._cached_subnet: Optional[Union[IPv4Network, IPv6Network]] = None
self._cached_gateway: Optional[Union[IPv4Address, IPv6Address]] = None
self._interfaces_cache: Dict[str, Interface] = {}
def _invalidate_cache(self):
"""Invalidate all cached properties."""
super(NetworkService, self)._invalidate_cache()
self._cached_type = None
self._cached_layer = None
self._cached_subnet = None
self._cached_gateway = None
self._interfaces_cache = {}
self.interfaces = None
[docs]
def update(self, fim_network_service: FimNetworkService = None):
"""
Update the network service with new FIM data.
:param fim_network_service: The new FIM network service data
:type fim_network_service: FimNetworkService
"""
if fim_network_service:
self.fim_network_service = fim_network_service
self._invalidate_cache()
self._fim_dirty = False
[docs]
def __str__(self):
"""
Creates a tabulated string describing the properties of the network service.
Intended for printing network service information.
:return: Tabulated string of network service information
:rtype: String
"""
table = [
["ID", self.get_reservation_id()],
["Name", self.get_name()],
["Layer", self.get_layer()],
["Type", self.get_type()],
["Site", self.get_site()],
["Gateway", self.get_gateway()],
["Subnet", self.get_subnet()],
["State", self.get_reservation_state()],
["Error", self.get_error_message()],
]
return tabulate(table) # , headers=["Property", "Value"])
[docs]
@staticmethod
def get_pretty_name_dict():
"""
Return mappings from non-pretty names to pretty names.
Pretty names are used when rendering table headers.
"""
return {
"id": "ID",
"name": "Name",
"layer": "Layer",
"type": "Type",
"site": "Site",
"gateway": "Gateway",
"subnet": "Subnet",
"state": "State",
"error": "Error",
}
[docs]
def toDict(self, skip: List[str] = None):
"""
Returns the network attributes as a dictionary.
Results are cached. Cache is invalidated when ``_invalidate_cache()``
is called.
:param skip: list of keys to skip
:type skip: List[str]
:return: network attributes as dictionary
:rtype: dict
"""
if skip is None:
skip = []
if self._cached_dict is None:
d = {}
d["id"] = str(self.get_reservation_id())
d["name"] = str(self.get_name())
d["layer"] = str(self.get_layer())
d["type"] = str(self.get_type())
d["site"] = str(self.get_site())
d["subnet"] = str(self.get_subnet())
d["gateway"] = str(self.get_gateway())
d["state"] = str(self.get_reservation_state())
d["error"] = str(self.get_error_message())
self._cached_dict = d
if not skip:
return dict(self._cached_dict)
return {k: v for k, v in self._cached_dict.items() if k not in skip}
[docs]
def generate_template_context(self, skip: List[str] = None):
"""Build a Jinja2 template context dict for this network service."""
context = self.toDict(skip=skip)
context["interfaces"] = []
return context
def _configure_template_environment(self, environment):
environment.json_encoder = json.JSONEncoder(ensure_ascii=False)
[docs]
def get_slice(self) -> Slice:
"""
Gets the fablib slice this network service is built on.
:return: the slice this network is on
:rtype: Slice
"""
return self.slice
[docs]
def get_site(self) -> Optional[str]:
"""
Gets site name on network service.
"""
sliver = self.get_sliver()
if sliver:
if isinstance(sliver, SliverDTO):
return sliver.site
elif isinstance(sliver, NetworkServiceSliver):
return sliver.site
return None
else:
return None
[docs]
def get_layer(self) -> Optional[str]:
"""
Gets the layer of the network services (L2 or L3).
Results are cached for performance.
:return: L2 or L3
:rtype: String
"""
if self._cached_layer is None:
try:
if self.get_fim():
layer = self.get_fim().get_property(pname="layer")
self._cached_layer = layer if layer else None
else:
self._cached_layer = None
except Exception as e:
log.warning(f"Failed to get layer: {e}")
self._cached_layer = None
return self._cached_layer
[docs]
def get_type(self):
"""
Gets the type of the network services.
Results are cached for performance.
:return: network service types
:rtype: String
"""
if self._cached_type is None:
try:
if self.get_fim():
ns_type = self.get_fim().get_property("type")
self._cached_type = ns_type if ns_type else None
else:
self._cached_type = None
except Exception as e:
log.warning(f"Failed to get type: {e}")
self._cached_type = None
return self._cached_type
[docs]
def get_sliver(self) -> SliverDTO:
"""
Gets the sliver.
"""
if not self.sliver and self.slice.isStable():
self.sliver = self.slice.get_sliver(
reservation_id=self.get_reservation_id()
)
return self.sliver
[docs]
def get_gateway(self) -> Optional[Union[IPv4Address, IPv6Address]]:
"""
Gets the assigned gateway for a FABnetv L3 IPv6 or IPv4 network.
Results are cached for performance. Cache is invalidated when
``set_gateway()`` or ``set_instantiated()`` is called.
:return: gateway IP
:rtype: IPv4Address or IPv6Network
"""
if self._cached_gateway is not None:
return self._cached_gateway
try:
gateway = None
if self.is_instantiated():
if self.get_layer() == NSLayer.L3:
sliver = self.get_sliver()
if isinstance(sliver, SliverDTO):
# V2 path: SliverDTO has .gateway as a dict
if sliver.sliver.get("Type") in ("FABNetv4", "FABNetv4Ext"):
gateway = IPv4Address(sliver.gateway.get("ipv4"))
else:
gateway = IPv6Address(sliver.gateway.get("ipv6"))
elif hasattr(sliver, "fim_sliver") and sliver.fim_sliver:
# V1 path: OrchestratorSliver has .fim_sliver.gateway
gateway = ipaddress.ip_address(
sliver.fim_sliver.gateway.gateway
)
else:
log.warning("Unknown sliver type in get_gateway")
else:
# L2 Network
fablib_data = self.get_fablib_data()
try:
gateway = ipaddress.ip_address(fablib_data["subnet"]["gateway"])
except Exception as e:
gateway = None
else:
# Not yet instantiated — check if gateway was explicitly
# set via set_gateway() (common for L2 networks in modify)
fablib_data = self.get_fablib_data()
try:
gw_str = fablib_data.get("subnet", {}).get("gateway")
if gw_str:
gateway = ipaddress.ip_address(gw_str)
else:
gateway = f"{self.get_name()}.gateway"
except Exception:
gateway = f"{self.get_name()}.gateway"
if gateway is not None:
self._cached_gateway = gateway
return gateway
except Exception as e:
log.warning(f"Failed to get gateway: {e}")
[docs]
def get_available_ips(
self, count: int = 256
) -> Optional[List[IPv4Address or IPv6Address]]:
"""
Gets the IPs available for a FABnet L3 network
Note: large IPv6 address spaces take considerable time to build this
list. By default this will return the first 256 addresses. If you needed
more addresses, set the count parameter.
:param count: number of addresse to include
:type slice: Slice
:return: gateway IP
:rtype: List[IPv4Address]
"""
try:
ip_list = []
gateway = self.get_gateway()
for i in range(count):
log.debug(f"adding IP {i}")
ip_list.append(gateway + i + 1)
return ip_list
except Exception as e:
log.warning(f"Failed to get_available_ips: {e}")
[docs]
def get_public_ips(self) -> Optional[Union[List[IPv4Address] or List[IPv6Address]]]:
"""
Get list of public IPs assigned to the FabNetv*Ext service
:return: List of Public IPs
:rtype: List[IPv4Address] or List[IPv6Address] or None
"""
if self.get_fim().labels is not None:
if self.get_fim().labels.ipv4 is not None:
result = []
for x in self.get_fim().labels.ipv4:
result.append(IPv4Address(x))
return result
elif self.get_fim().labels.ipv6 is not None:
result = []
for x in self.get_fim().labels.ipv6:
result.append(IPv6Address(x))
return result
return None
[docs]
def get_subnet(self) -> Optional[Union[IPv4Network, IPv6Network]]:
"""
Gets the assigned subnet for a FABnet L3 IPv6 or IPv4 network.
Results are cached for performance. Cache is invalidated when
``set_subnet()`` or ``set_instantiated()`` is called.
:return: subnet
:rtype: IPv4Network or IPv6Network
"""
if self._cached_subnet is not None:
return self._cached_subnet
try:
subnet = None
if self.is_instantiated():
if self.get_layer() == NSLayer.L3:
sliver = self.get_sliver()
if isinstance(sliver, SliverDTO):
# V2 path: SliverDTO has .gateway as a dict
if sliver.sliver.get("Type") in ("FABNetv4", "FABNetv4Ext"):
subnet_key = "ipv4_subnet"
else:
subnet_key = "ipv6_subnet"
gateway = sliver.gateway
if gateway:
subnet_str = gateway.get(subnet_key)
if subnet_str:
subnet = ipaddress.ip_network(subnet_str)
elif hasattr(sliver, "fim_sliver") and sliver.fim_sliver:
# V1 path: OrchestratorSliver has .fim_sliver.gateway
subnet = ipaddress.ip_network(sliver.fim_sliver.gateway.subnet)
else:
log.warning("Unknown sliver type in get_subnet")
else:
# L2 Network
fablib_data = self.get_fablib_data()
if fablib_data.get("subnet") and fablib_data.get("subnet").get(
"subnet"
):
try:
subnet = ipaddress.ip_network(
fablib_data["subnet"]["subnet"]
)
except Exception as e:
log.warning(f"Failed to get L2 subnet: {e}")
else:
# Not yet instantiated — check if subnet was explicitly
# set via set_subnet() (common for L2 networks in modify)
fablib_data = self.get_fablib_data()
if fablib_data.get("subnet") and fablib_data.get("subnet").get(
"subnet"
):
try:
subnet = ipaddress.ip_network(fablib_data["subnet"]["subnet"])
except Exception:
subnet = f"{self.get_name()}.subnet"
else:
subnet = f"{self.get_name()}.subnet"
if subnet is not None:
self._cached_subnet = subnet
return subnet
except Exception as e:
log.warning(f"Failed to get subnet: {e}")
[docs]
def get_interfaces(self, refresh: bool = False) -> List[Interface]:
"""
Gets the interfaces on this network service.
Results are cached. Use refresh=True to force reload.
:param refresh: force refresh from FIM
:type refresh: bool
:return: the interfaces on this network service
:rtype: List[Interfaces]
"""
if self._interfaces_cache and not refresh and not self._fim_dirty:
return list(self._interfaces_cache.values())
self._interfaces_cache = {}
self.interfaces = []
for interface in self.get_fim().interface_list:
log.debug(f"interface: {interface}")
try:
iface = self.get_slice().get_interface(name=interface.name)
self.interfaces.append(iface)
self._interfaces_cache[interface.name] = iface
except Exception:
log.warning(f"interface not found: {interface.name}")
return self.interfaces
[docs]
def get_interface(
self,
name: str = None,
refresh: bool = False,
raise_exception: bool = None,
) -> Optional[Interface]:
"""
Gets a particular interface on this network service.
:param name: the name of the interface to search for
:type name: str
:param refresh: force refresh from FIM
:type refresh: bool
:param raise_exception: if True, raise ResourceNotFoundError
when the interface is not found; if False, return None.
When None (default), falls back to the global
``FablibManager.raise_on_not_found`` setting.
:type raise_exception: bool
:return: the particular interface or None
:rtype: Optional[Interface]
:raises ResourceNotFoundError: if the interface is not found and
raising is enabled
"""
# Ensure cache is populated
if not self._interfaces_cache or refresh or self._fim_dirty:
self.get_interfaces(refresh=refresh)
# Check cache first (exact match)
if name in self._interfaces_cache:
return self._interfaces_cache[name]
# Fall back to substring match for compatibility
for iface_name, iface in self._interfaces_cache.items():
if name in iface_name:
return iface
should_raise = (
raise_exception
if raise_exception is not None
else (
self.get_fablib_manager().raise_on_not_found
if self.get_fablib_manager()
else False
)
)
if should_raise:
raise ResourceNotFoundError(f"Interface not found: {name}")
return None
[docs]
def has_interface(self, interface: Interface) -> bool:
"""
Determines whether this network service has a particular interface.
:param interface: the fablib interface to search for
:type interface: Interface
:return: whether this network service has interface
:rtype: bool
"""
for fim_interface in self.get_fim().interface_list:
if fim_interface.name.endswith(interface.get_name()):
return True
return False
[docs]
def get_fim(self) -> FimNetworkService:
"""
Gets the FABRIC information model (FIM) object.
"""
return self.fim_network_service
def __replace_network_service(self, nstype: ServiceType):
fim_interfaces = []
name = self.get_name()
# Use cache if available — get_interfaces() may fail to resolve
# FIM connection-point names back to fablib Interface objects for
# pre-submit networks (e.g. facility ports).
if self._interfaces_cache:
current_interfaces = list(self._interfaces_cache.values())
else:
current_interfaces = self.get_interfaces()
for interface in current_interfaces:
fim_interfaces.append(interface.get_fim())
self.get_fim().disconnect_interface(interface=interface.get_fim())
user_data = self.get_user_data()
saved_cache = dict(self._interfaces_cache) if self._interfaces_cache else {}
self.get_slice().topology.remove_network_service(name=self.get_name())
self.fim_network_service = self.get_slice().topology.add_network_service(
name=name, nstype=nstype, interfaces=fim_interfaces
)
# Restore the interface cache so that subsequent add_interface()
# calls still see all previously connected interfaces
self._interfaces_cache = saved_cache
self.interfaces = current_interfaces
# Invalidate type/layer caches since the FIM service was replaced
# with a different nstype (e.g. L2Bridge → L2STS). Do NOT call
# _invalidate_cache() here as it would also clear _interfaces_cache.
self._cached_type = None
self._cached_layer = None
self.set_user_data(user_data)
[docs]
def add_interface(self, interface: Interface):
"""
Add an :py:class:`.Interface` to the network service.
"""
if self.get_type() == ServiceType.PortMirror:
raise Exception(
"Interfaces cannot be attached to PortMirror service - they can only"
"be specified at service creation"
)
iface_fablib_data = interface.get_fablib_data()
# Build new_interfaces from cache if populated (reliable for
# pre-submit), falling back to FIM-based resolution. The FIM
# resolution path (get_interfaces) can silently lose interfaces
# whose FIM connection-point names don't match fablib names
# (e.g. facility ports), so the cache is the source of truth
# when available.
if self._interfaces_cache:
new_interfaces = list(self._interfaces_cache.values())
else:
new_interfaces = self.get_interfaces()
if interface not in new_interfaces:
new_interfaces.append(interface)
curr_nstype = self.get_type()
if self.get_layer() == NSLayer.L2:
ero_enabled = True if self.get_fim().ero else False
new_nstype = NetworkService.__calculate_l2_nstype(
interfaces=new_interfaces, ero_enabled=ero_enabled
)
if curr_nstype != new_nstype:
self.__replace_network_service(new_nstype)
self.get_fim().connect_interface(interface=interface.get_fim())
# Add to cache so subsequent add_interface calls see all
# previously connected interfaces
self._interfaces_cache[interface.get_name()] = interface
elif self.get_layer() == NSLayer.L3 and self.is_instantiated():
if interface.get_site() != self.get_site():
raise Exception("L3 networks can only include nodes from one site")
if "addr" in iface_fablib_data:
addr = iface_fablib_data["addr"]
else:
addr = None
if "auto" in iface_fablib_data:
auto = iface_fablib_data["auto"]
else:
auto = False
if self.get_subnet():
if addr:
iface_fablib_data["addr"] = str(self.allocate_ip(addr))
elif auto:
iface_fablib_data["addr"] = str(self.allocate_ip())
interface.set_fablib_data(iface_fablib_data)
if self.get_layer() == NSLayer.L3:
self.get_fim().connect_interface(interface=interface.get_fim())
# Add to cache after L3 connect as well
self._interfaces_cache[interface.get_name()] = interface
[docs]
def remove_interface(self, interface: Interface):
"""
Remove an :py:class:`.Interface` from the network service.
"""
iface_fablib_data = interface.get_fablib_data()
self.free_ip(interface.get_ip_addr())
# Use cache if available for accurate interface tracking
if self._interfaces_cache:
interfaces = list(self._interfaces_cache.values())
else:
interfaces = self.get_interfaces()
if interface in interfaces:
interfaces.remove(interface)
curr_nstype = self.get_type()
if self.get_layer() == NSLayer.L2:
ero_enabled = True if self.get_fim().ero else False
new_nstype = NetworkService.__calculate_l2_nstype(
interfaces=interfaces, ero_enabled=ero_enabled
)
if curr_nstype != new_nstype:
self.__replace_network_service(new_nstype)
interface.set_fablib_data(iface_fablib_data)
self.get_fim().disconnect_interface(interface=interface.get_fim())
# Remove from cache instead of clearing entirely
iface_name = interface.get_name()
self._interfaces_cache.pop(iface_name, None)
# Also remove by iterating in case the key doesn't match exactly
self._interfaces_cache = {
k: v for k, v in self._interfaces_cache.items() if v is not interface
}
[docs]
def delete(self):
"""
Delete the network service.
"""
for ifs in self.get_interfaces():
ifs.network = None
self.get_slice().get_fim_topology().remove_network_service(name=self.get_name())
# Mark slice topology as dirty so cached network/interface lists
# get refreshed on next access
self.get_slice()._topology_dirty = True
[docs]
def set_subnet(self, subnet: Union[IPv4Network, IPv6Network]):
"""
Add subnet info to the network service.
"""
fablib_data = self.get_fablib_data()
if "subnet" not in fablib_data:
fablib_data["subnet"] = {}
fablib_data["subnet"]["subnet"] = str(subnet)
fablib_data["subnet"]["allocated_ips"] = []
self.set_fablib_data(fablib_data)
self._cached_subnet = None
[docs]
def set_gateway(self, gateway: Union[IPv4Address, IPv6Address]):
"""
Add gateway info to the network service.
"""
fablib_data = self.get_fablib_data()
if "subnet" not in fablib_data:
fablib_data["subnet"] = {}
fablib_data["subnet"]["gateway"] = str(gateway)
self.set_fablib_data(fablib_data)
self._cached_gateway = None
[docs]
def get_allocated_ips(self):
"""
Get the list of IP addesses allocated for the network service.
"""
try:
allocated_ips = []
for addr in self.get_fablib_data()["subnet"]["allocated_ips"]:
allocated_ips.append(ipaddress.ip_address(addr))
return allocated_ips
except Exception as e:
return []
[docs]
def set_allocated_ip(self, addr: Optional[Union[IPv4Address, IPv6Address]] = None):
"""
Add ``addr`` to the list of allocated IPs.
"""
fablib_data = self.get_fablib_data()
if "subnet" not in fablib_data:
fablib_data["subnet"] = {}
allocated_ips = fablib_data["subnet"]["allocated_ips"]
allocated_ips.append(str(addr))
self.set_fablib_data(fablib_data)
[docs]
def allocate_ip(self, addr: Optional[Union[IPv4Address, IPv6Address]] = None):
"""
Allocate an IP for the network service.
"""
try:
self.lock.acquire()
subnet = self.get_subnet()
allocated_ips = self.get_allocated_ips()
if addr:
# if addr != subnet.network_address and addr not in allocated_ips:
if addr not in allocated_ips:
self.set_allocated_ip(addr)
return addr
elif (
type(subnet) == ipaddress.IPv4Network
or type(subnet) == ipaddress.IPv6Network
):
for host in subnet:
if host != subnet.network_address and host not in allocated_ips:
self.set_allocated_ip(host)
return host
return None
finally:
self.lock.release()
[docs]
def set_allocated_ips(self, allocated_ips: list[Union[IPv4Address, IPv6Address]]):
"""
Set a list of IPs to be "allocated IPs".
"""
fablib_data = self.get_fablib_data()
allocated_ips_strs = []
for ip in allocated_ips:
allocated_ips_strs.append(str(ip))
if "subnet" not in fablib_data:
fablib_data["subnet"] = {}
fablib_data["subnet"]["allocated_ips"] = allocated_ips_strs
self.set_fablib_data(fablib_data)
[docs]
def free_ip(self, addr: Union[IPv4Address, IPv6Address]):
"""
Remove an IP from the list of allocated IPs.
"""
try:
self.lock.acquire()
allocated_ips = self.get_allocated_ips()
if addr in allocated_ips:
allocated_ips.remove(addr)
self.set_allocated_ips(allocated_ips)
finally:
self.lock.release()
[docs]
def make_ip_publicly_routable(self, ipv6: list[str] = None, ipv4: list[str] = None):
"""
Mark a list of IPs as publicly routable.
"""
labels = self.get_fim().labels
if labels is None:
labels = Labels()
if self.fim_network_service.type == ServiceType.FABNetv4Ext:
labels = Labels.update(labels, ipv4=ipv4)
elif self.fim_network_service.type == ServiceType.FABNetv6Ext:
labels = Labels.update(labels, ipv6=ipv6)
self.fim_network_service.set_properties(labels=labels)
self._invalidate_cache()
[docs]
def init_fablib_data(self):
"""
Initialize fablib data.
"""
fablib_data = {"instantiated": "False", "mode": "manual"}
self.set_fablib_data(fablib_data)
[docs]
def is_instantiated(self):
"""
Return ``True`` if network service has been instantiated.
"""
fablib_data = self.get_fablib_data()
if "instantiated" in fablib_data and fablib_data["instantiated"] == "True":
return True
else:
return False
[docs]
def set_instantiated(self, instantiated: bool = True):
"""
Set instantiated flag in the fablib_data saved in UserData
blob in the FIM model.
:param instantiated: flag indicating if the service has been
instantiated or not
:type instantiated: bool
"""
fablib_data = self.get_fablib_data()
fablib_data["instantiated"] = str(instantiated)
self.set_fablib_data(fablib_data)
# Invalidate subnet/gateway caches since instantiation state
# affects which code path is used to compute them
self._cached_subnet = None
self._cached_gateway = None
[docs]
def config(self):
"""
Sets up the meta data for the Network Service
- For layer3 services, Subnet, gateway and allocated IPs
are updated/maintained fablib_data saved in UserData
blob in the FIM model
- For layer2 services, no action is taken
"""
if not self.is_instantiated():
self.set_instantiated(True)
# init
if self.get_layer() == NSLayer.L3:
# init fablib data for fabnet networks
self.set_subnet(self.get_subnet())
self.set_gateway(self.get_gateway())
allocated_ips = self.get_allocated_ips()
if not allocated_ips:
allocated_ips = []
if self.get_gateway() not in allocated_ips:
allocated_ips.append(self.get_gateway())
self.set_allocated_ip(self.get_gateway())
[docs]
def peer(
self,
other: NetworkService,
labels: Labels,
peer_labels: Labels,
capacities: Capacities,
):
"""
Peer a network service; used for AL2S peering between FABRIC Networks and Cloud Networks
Peer this network service to another. A few constraints are enforced like services being
of the same type. Both services will have ServicePort interfaces facing each other over a link.
It typically requires labels and capacities to put on the interface facing the other service
:param other: network service to be peered
:type other: NetworkService
:param labels: labels
:type labels: Labels
:param peer_labels: peer labels
:type peer_labels: Labels
:param capacities: capacities
:type capacities: Capacities
"""
# Peer Cloud L3VPN with FABRIC L3VPN
self.get_fim().peer(
other.get_fim(),
labels=labels,
peer_labels=peer_labels,
capacities=capacities,
)
[docs]
def set_l2_route_hops(self, hops: List[str]):
"""
Define the sequence of sites or hops to be used for a layer 2 connection.
Maps site names to corresponding layer 2 loopback IPs used in Explicit Route Options (ERO)
in the Network Service configuration.
:param hops: A list of site names to be used as hops.
:type hops: List[str]
"""
if not hops:
return # Skip if no hops provided
interfaces = self.get_interfaces()
if len(interfaces) != 2 or self.get_type() not in {
ServiceType.L2STS,
ServiceType.L2PTP,
}:
raise Exception(
"ERO can only be set for a Point-to-Point Layer2 connection."
)
src_site, dst_site = (ifs.get_site() for ifs in interfaces)
resources = self.get_fablib_manager().get_resources()
resources.validate_requested_ero_path(source=src_site, end=dst_site, hops=hops)
path = Path()
path.set_symmetric(hops)
ero = ERO()
ero.set(payload=path)
ns_type = self.__calculate_l2_nstype(interfaces=interfaces, ero_enabled=True)
self.get_fim().set_properties(type=ns_type, ero=ero)
self._invalidate_cache()
[docs]
def set_bandwidth(self, bw: int):
"""
Set the bandwidth for an L2PTP Network Service.
This sets the bandwidth uniformly for the service and both connected interfaces.
:param bw: Bandwidth in Gbps
:type bw: int
"""
if not bw:
return self # Return early if bw is 0 or None
fim = self.get_fim()
fim.capacities = Capacities(bw=bw)
self._invalidate_cache()
for interface in self.get_interfaces():
interface.set_bandwidth(bw=bw)
return self
[docs]
def get_bandwidth(self) -> int:
"""
Get the bandwidth of the network service (L2PTP only).
:return: Bandwidth in Gbps
:rtype: int
"""
fim = self.get_fim()
return getattr(fim.capacities, "bw", 1) if fim and fim.capacities else 1