Source code for fabrictestbed_extensions.fablib.node

#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2020 FABRIC Testbed
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Author: Paul Ruth (pruth@renci.org)

"""
Methods to work with FABRIC `nodes`_.

.. _`nodes`: https://learn.fabric-testbed.net/knowledge-base/glossary/#node

You would add a node and operate on it like so::

    from fabrictestbed_extensions.fablib.fablib import FablibManager

    fablib = FablibManager()

    slice = fablib.new_slice(name="MySlice")
    node = slice.add_node(name="node1")
    slice.submit();

    node.execute('echo Hello, FABRIC from node `hostname -s`')

    slice.delete()
"""

from __future__ import annotations

import ipaddress
import json
import logging
import os
import re
import select
import threading
import time
import traceback
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union

import paramiko
from fabric_cf.orchestrator.orchestrator_proxy import Status
from fabrictestbed.external_api.orchestrator_client import SliverDTO
from fim.user import ComponentType, NodeType
from IPython.core.display_functions import display
from paramiko_expect import SSHClientInteraction
from tabulate import tabulate

from fabrictestbed_extensions.fablib.constants import Constants
from fabrictestbed_extensions.fablib.exceptions import (
    ResourceNotFoundError,
    SliceStateError,
    SSHError,
    ValidationError,
)
from fabrictestbed_extensions.fablib.network_service import NetworkService
from fabrictestbed_extensions.utils.utils import Utils

if TYPE_CHECKING:
    from fabrictestbed_extensions.fablib.slice import Slice

from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network, ip_address

from fabrictestbed.slice_editor import Capacities, CapacityHints, Labels
from fabrictestbed.slice_editor import Node as FimNode
from fabrictestbed.slice_editor import ServiceType, UserData
from fim.slivers.network_service import NSLayer

from fabrictestbed_extensions.fablib.component import Component
from fabrictestbed_extensions.fablib.interface import Interface
from fabrictestbed_extensions.fablib.template_mixin import TemplateMixin

log = logging.getLogger("fablib")


[docs] class Node(TemplateMixin): """ A class for working with FABRIC nodes. """ _default_skip = ["ssh_command"] _show_title = "Node" default_cores = 2 default_ram = 8 default_disk = 10 default_image = "default_rocky_9" def __init__( self, slice: Slice, node: FimNode, validate: bool = False, raise_exception: bool = False, ): """ Node constructor, usually invoked by ``Slice.add_node()``. :param slice: the fablib slice to have this node on :type slice: Slice :param node: the FIM node that this Node represents :type node: Node :param validate: Validate node can be allocated w.r.t available resources :type validate: bool :param raise_exception: Raise exception in case validation failes :type raise_exception: bool """ super().__init__() self.fim_node = node self.slice = slice self.ip_addr_list_json = None self.validate = validate self.raise_exception = raise_exception self.node_type = NodeType.VM self.components = {} self.interfaces = {} self.sliver = None self.username = None self.reservation_id = None self.cores = None self.ram = None self.disk = None self.image = None self.image_type = None self.host = None self.site = None self.management_ip = None # V2 specific: cached FIM properties (None means not yet cached) # Must be initialized before set_username() which calls get_image() self._cached_site: Optional[str] = None self._cached_management_ip: Optional[str] = None self._cached_image_type: Optional[str] = None self._cached_image_ref: Optional[str] = None self._cached_requested_disk: Optional[int] = None self._cached_allocated_disk: Optional[int] = None self._cached_requested_ram: Optional[int] = None self._cached_allocated_ram: Optional[int] = None self._cached_requested_cores: Optional[int] = None self._cached_allocated_cores: Optional[int] = None self._cached_instance_name: Optional[str] = None self._cached_type: Optional[NodeType] = None # Persistent network configuration backend: 'nmcli', 'netplan', or 'ip' self._net_config_backend: Optional[str] = None self._persistent_config: bool = True # SSH connection cache for reuse across execute/upload/download calls self._ssh_bastion: Optional[paramiko.SSHClient] = None self._ssh_client: Optional[paramiko.SSHClient] = None self._ssh_lock = threading.Lock() try: self.set_username() except Exception as e: log.debug(f"Could not set username during init: {e}") self.username = None try: if slice.isStable(): self.sliver = slice.get_sliver(reservation_id=self.get_reservation_id()) except Exception: pass logging.getLogger("paramiko").setLevel(logging.WARNING) def _invalidate_cache(self): """ Invalidate all cached properties. Called when the FIM node is updated. """ super(Node, self)._invalidate_cache() self._cached_site = None self._cached_management_ip = None self._cached_image_type = None self._cached_image_ref = None self._cached_requested_disk = None self._cached_allocated_disk = None self._cached_requested_ram = None self._cached_allocated_ram = None self._cached_requested_cores = None self._cached_allocated_cores = None self._cached_instance_name = None
[docs] def __str__(self): """ Creates a tabulated string describing the properties of the node. Intended for printing node information. :return: Tabulated string of node information :rtype: String """ table = [ ["ID", self.get_reservation_id()], ["Name", self.get_name()], ["Cores", self.get_cores()], ["RAM", self.get_ram()], ["Disk", self.get_disk()], ["Image", self.get_image()], ["Image Type", self.get_image_type()], ["Host", self.get_host()], ["Site", self.get_site()], ["Management IP", self.get_management_ip()], ["Reservation State", self.get_reservation_state()], ["Error Message", self.get_error_message()], ["SSH Command", self.get_ssh_command()], ] return tabulate(table) # , headers=["Property", "Value"])
[docs] def get_sliver(self) -> SliverDTO: """ Gets the node SM sliver. :note: Not intended as API call. :return: SM sliver for the node :rtype: Sliver """ return self.sliver
[docs] @staticmethod def new_node( slice: Slice = None, name: str = None, site: str = None, avoid: List[str] = None, validate: bool = False, raise_exception: bool = False, ) -> Node: """ Creates a new FABRIC node on the slice. Not intended for API use. Use slice.add_node() instead. :param slice: the fablib slice to build the new node on :type slice: SliceV2 :param name: the name of the new node :type name: str :param site: the name of the site to build the node on :type site: str :param avoid: a list of site names to avoid :type avoid: List[str] :param validate: Validate node can be allocated w.r.t available resources :type validate: bool :param raise_exception: Raise exception if validation fails :type raise_exception: bool :return: a new Node :rtype: Node """ if avoid is None: avoid = [] if site is None: [site] = slice.get_fablib_manager().get_random_sites(avoid=avoid) log.info(f"Adding node: {name}, slice: {slice.get_name()}, site: {site}") # Create FIM node and NodeV2 instance node = Node( slice, slice.topology.add_node(name=name, site=site), validate=validate, raise_exception=raise_exception, ) node.set_capacities( cores=Node.default_cores, ram=Node.default_ram, disk=Node.default_disk ) node.set_image(Node.default_image) node.init_fablib_data() return node
[docs] @staticmethod def get_node(slice: Slice = None, node=None): """ Returns a new fablib node using existing FABRIC resources. :note: Not intended for API call. :param slice: the fablib slice storing the existing node :type slice: Slice :param node: the FIM node stored in this fablib node :type node: Node :return: a new fablib node storing resources :rtype: Node """ return Node(slice=slice, node=node)
[docs] @staticmethod def get_pretty_name_dict(): """ Return mappings from non-pretty names to pretty names. Pretty names are in table headers. """ return { "id": "ID", "name": "Name", "cores": "Cores", "ram": "RAM", "disk": "Disk", "image": "Image", "image_type": "Image Type", "host": "Host", "site": "Site", "username": "Username", "management_ip": "Management IP", "state": "State", "error": "Error", "ssh_command": "SSH Command", "public_ssh_key_file": "Public SSH Key File", "private_ssh_key_file": "Private SSH Key File", }
[docs] def toDict(self, skip: Optional[List[str]] = None): """ Returns the node attributes as a dictionary. Results are cached. Cache is invalidated when ``_invalidate_cache()`` is called. :param skip: list of keys to exclude :type skip: List[str] :return: node attributes as dictionary :rtype: dict """ if skip is None: skip = [] if self._cached_dict is None: d = {} d["id"] = str(self.get_reservation_id()) d["name"] = str(self.get_name()) d["cores"] = str(self.get_cores()) d["ram"] = str(self.get_ram()) d["disk"] = str(self.get_disk()) d["image"] = str(self.get_image()) d["image_type"] = str(self.get_image_type()) d["host"] = str(self.get_host()) d["site"] = str(self.get_site()) d["username"] = str(self.get_username()) d["management_ip"] = ( str(self.get_management_ip()).strip() if str(self.get_reservation_state()) == "Active" and self.get_management_ip() else "" ) d["state"] = str(self.get_reservation_state()) d["error"] = str(self.get_error_message()) # ssh_command is deferred to avoid recursion: get_ssh_command() # calls render_template() which calls toDict() again. d["ssh_command"] = "" d["public_ssh_key_file"] = str(self.get_public_key_file()) d["private_ssh_key_file"] = str(self.get_private_key_file()) self._cached_dict = d # Now that _cached_dict exists, render_template can use it # without recursion. if str(self.get_reservation_state()) == "Active": self._cached_dict["ssh_command"] = str(self.get_ssh_command()) if not skip: return dict(self._cached_dict) return {k: v for k, v in self._cached_dict.items() if k not in skip}
[docs] def generate_template_context(self, skip: List[str] = None): """Build a Jinja2 template context dict for this node.""" if skip is None: skip = [] if "ssh_command" not in skip: skip.append("ssh_command") context = self.toDict(skip=skip) context["components"] = [] return context
[docs] def show( self, fields=None, output=None, quiet=False, colors=False, pretty_names=True ): """ Show a table containing the current node attributes. There are several output options: ``"text"``, ``"pandas"``, and ``"json"`` that determine the format of the output that is returned and (optionally) displayed/printed. :param output: output format. Options are: - ``"text"``: string formatted with tabular - ``"pandas"``: pandas dataframe - ``"json"``: string in json format :type output: str :param fields: List of fields to show. JSON output will include all available fields. :type fields: List[str] :param quiet: True to specify printing/display :type quiet: bool :param colors: True to specify state colors for pandas output :type colors: bool :return: table in format specified by output parameter :rtype: Object Here's an example of ``fields``:: fields=['Name','State'] """ data = self.toDict() # if fields == None: # fields = ["ID", "Name", "Cores", "RAM", "Disk", # "Image", "Image Type","Host", "Site", # "Management IP", "State", # "Error","SSH Command" # ] def state_color(val): if val == "Active": color = f"{Constants.SUCCESS_LIGHT_COLOR}" elif val == "Configuring": color = f"{Constants.IN_PROGRESS_LIGHT_COLOR}" elif val == "Closed": color = f"{Constants.ERROR_LIGHT_COLOR}" else: color = "" return "background-color: %s" % color if pretty_names: pretty_names_dict = self.get_pretty_name_dict() else: pretty_names_dict = {} if colors and Utils.is_jupyter_notebook(): table = Utils.show_table( data, fields=fields, title=self._show_title, output="pandas", quiet=True, pretty_names_dict=pretty_names_dict, ) table.applymap(state_color) if quiet is False: display(table) else: table = Utils.show_table( data, fields=fields, title=self._show_title, output=output, quiet=quiet, pretty_names_dict=pretty_names_dict, ) return table
[docs] def list_components( self, fields=None, output=None, quiet=False, filter_function=None, pretty_names=True, refresh: bool = False, ): """ Lists all the components in the node with their attributes. There are several output options: ``"text"``, ``"pandas"``, and ``"json"`` that determine the format of the output that is returned and (optionally) displayed/printed. :param output: output format. Output can be one of: - ``"text"``: string formatted with tabular - ``"pandas"``: pandas dataframe - ``"json"``: string in json format :type output: str :param fields: list of fields (table columns) to show. JSON output will include all available fields/columns. :type fields: List[str] :param quiet: True to specify printing/display :type quiet: bool :param filter_function: A lambda function to filter data by field values. :type filter_function: lambda :param refresh: Refresh the components with latest Fim info :type refresh: bool :return: table in format specified by output parameter :rtype: Object Here's an example of ``fields``:: fields=['Name','Model'] Here's an example of ``filter_function``:: filter_function=lambda s: s['Model'] == 'NIC_Basic' """ components = [] for component in self.get_components(refresh=refresh): components.append(component.get_name()) def combined_filter_function(x): if filter_function is None: if x["name"] in set(components): return True else: if filter_function(x) and x["name"] in set(components): return True return False if pretty_names and len(self.get_components(refresh=refresh)) > 0: pretty_names_dict = self.get_components(refresh=refresh)[ 0 ].get_pretty_name_dict() else: pretty_names_dict = {} return self.get_slice().list_components( fields=fields, output=output, quiet=quiet, filter_function=combined_filter_function, pretty_names=pretty_names_dict, refresh=refresh, )
[docs] def list_interfaces( self, fields=None, output=None, quiet=False, filter_function=None, pretty_names=True, refresh: bool = False, ): """ Lists all the interfaces in the node with their attributes. There are several output options: ``"text"``, ``"pandas"``, and ``"json"`` that determine the format of the output that is returned and (optionally) displayed/printed. :param output: Output format. Options are: - ``"text"``: string formatted with tabular - ``"pandas"``: pandas dataframe - ``"json"``: string in json format :type output: str :param fields: List of fields (table columns) to show. JSON output will include all available fields/columns. :type fields: List[str] :param quiet: True to specify printing/display :type quiet: bool :param filter_function: A lambda function to filter data by field values. :type filter_function: lambda :param refresh: Refresh the components with latest Fim info :type refresh: bool :return: table in format specified by output parameter :rtype: Object Example of ``fields``:: fields=['Name','MAC'] Example of ``filter_function``:: filter_function=lambda s: s['Node'] == 'Node1' """ if str(self.get_reservation_state()) != "Active": log.debug( f"Node {self.get_name()} is {self.get_reservation_state()}, Skipping get interfaces." ) return ifaces = list(self.get_interfaces(refresh=refresh, output="dict").keys()) def combined_filter_function(x): ifname = x["name"] if isinstance(x["name"], dict): ifname = x["name"]["value"] if filter_function is None: if ifname in set(ifaces): return True else: if filter_function(x) and ifname in set(ifaces): return True return False return self.get_slice().list_interfaces( fields=fields, output=output, quiet=quiet, filter_function=combined_filter_function, pretty_names=pretty_names, refresh=refresh, )
[docs] def list_networks( self, fields=None, output=None, quiet=False, filter_function=None, pretty_names=True, refresh: bool = False, ): """ Lists all the networks attached to the nodes with their attributes. There are several output options: ``"text"``, ``"pandas"``, and ``"json"`` that determine the format of the output that is returned and (optionally) displayed/printed. :param output: Output format. Options are: - ``"text"``: string formatted with tabular - ``"pandas"``: pandas dataframe - ``"json"``: string in JSON format :type output: str :param fields: List of fields (table columns) to show. JSON output will include all available fields/columns. :type fields: List[str] :param quiet: True to specify printing/display :type quiet: bool :param filter_function: A lambda function to filter data by field values. :type filter_function: lambda :param pretty_names: Use "nicer" names in column headers. Default is ``True``. :type pretty_names: bool :param refresh: Refresh the object with latest Fim info :type refresh: bool :return: table in format specified by output parameter :rtype: Object Example of ``fields``:: fields=['Name','Type'] Example of ``filter_function``:: filter_function=lambda s: s['Type'] == 'FABNetv4' """ interfaces = self.get_interfaces(refresh=refresh) networks = self.get_networks(refresh=refresh) networks = [] for iface in interfaces: networks.append(iface.get_network().get_name()) def combined_filter_function(x): if filter_function is None: if x["name"]["value"] in set(networks): return True else: if filter_function(x) and x["name"]["value"] in set(networks): return True return False return self.get_slice().list_networks( fields=fields, output=output, quiet=quiet, filter_function=combined_filter_function, pretty_names=pretty_names, )
[docs] def get_networks(self, refresh: bool = False): """ Get a list of networks attached to the node. :param refresh: Refresh the object with latest Fim info :type refresh: bool """ networks = [] for interface in self.get_interfaces(refresh=refresh): networks.append(interface.get_network()) return networks
[docs] def set_capacities(self, cores: int = 2, ram: int = 2, disk: int = 10): """ Sets the capacities of the FABRIC node. :param cores: the number of cores to set on this node :type cores: int :param ram: the amount of RAM to set on this node :type ram: int :param disk: the amount of disk space to set on this node :type disk: int """ cores = int(cores) ram = int(ram) disk = int(disk) cap = Capacities(core=cores, ram=ram, disk=disk) self.get_fim().set_properties(capacities=cap) self._invalidate_cache()
[docs] def set_instance_type(self, instance_type: str): """ Sets the instance type of this fablib node on the FABRIC node. :param instance_type: the name of the instance type to set :type instance_type: String """ self.get_fim().set_properties( capacity_hints=CapacityHints(instance_type=instance_type) ) self._invalidate_cache()
[docs] def set_username(self, username: str = None): """ Sets this fablib node's username :note: Not intended as an API call. :param username: Optional username parameter. The username likely should be picked to match the image type. """ try: if self.get_type() == NodeType.Switch and not username: self.username = Constants.FABRIC_USER return username = ( self.get_fablib_manager() .get_os_images() .get(self.get_image(), {}) .get("default_user") ) if username is not None: self.username = username elif self.get_image() is None: self.username = None elif "default_centos10_stream" == self.get_image(): self.username = "cloud-user" elif "default_centos9_stream" == self.get_image(): self.username = "cloud-user" elif "centos" in self.get_image(): self.username = "centos" elif "ubuntu" in self.get_image(): self.username = "ubuntu" elif "rocky" in self.get_image(): self.username = "rocky" elif "fedora" in self.get_image(): self.username = "fedora" elif "cirros" in self.get_image(): self.username = "cirros" elif "debian" in self.get_image(): self.username = "debian" elif "freebsd" in self.get_image(): self.username = "freebsd" elif "openbsd" in self.get_image(): self.username = "openbsd" elif "kali" in self.get_image(): self.username = "kali" else: self.username = None except Exception as e: log.error("Failed to set username parameter: %s", e) self.username = None
[docs] def set_image(self, image: str, username: str = None, image_type: str = "qcow2"): """ Sets the image information of this fablib node on the FABRIC node. :param image: the image reference to set :type image: String :param username: the username of this fablib node. Currently unused. :type username: String :param image_type: the image type to set :type image_type: String """ self.get_fim().set_properties(image_type=image_type, image_ref=image) self._invalidate_cache() self.set_username(username=username)
[docs] def set_host(self, host_name: str = None): """ Sets the hostname of this fablib node on the FABRIC node. :param host_name: the hostname. example: host_name='renc-w2.fabric-testbed.net' :type host_name: String """ # example: host_name='renc-w2.fabric-testbed.net' labels = Labels() labels.instance_parent = host_name self.get_fim().set_properties(labels=labels) self._invalidate_cache() # set an attribute used to get host before Submit self.host = host_name
[docs] def set_site(self, site): """ Sets the site of this fablib node on FABRIC. :param site: the site :type host_name: String """ # example: host_name='renc-w2.fabric-testbed.net' self.get_fim().site = site self._invalidate_cache()
[docs] def get_slice(self) -> Slice: """ Gets the fablib slice associated with this node. :return: the fablib slice on this node :rtype: Slice """ return self.slice
[docs] def get_instance_name(self) -> Optional[str]: """ Gets the instance name of the FABRIC node. :return: the instance name of the node :rtype: String """ if self._cached_instance_name is None: try: if self.get_fim(): label_allocations = self.get_fim().get_property( pname="label_allocations" ) if label_allocations: self._cached_instance_name = label_allocations.instance except Exception: self._cached_instance_name = None return self._cached_instance_name
[docs] def get_cores(self) -> Optional[int]: """ Gets the number of cores on the FABRIC node. :return: the number of cores on the node :rtype: int """ if self._cached_allocated_cores is None: try: if self.get_fim(): capacities = self.get_fim().get_property( pname="capacity_allocations" ) if capacities: self._cached_allocated_cores = capacities.core except Exception: self._cached_allocated_cores = None return self._cached_allocated_cores if self._cached_allocated_cores else 0
[docs] def get_requested_cores(self) -> Optional[int]: """ Gets the requested number of cores on the FABRIC node. :return: the requested number of cores on the node :rtype: int """ if self._cached_requested_cores is None: try: if self.get_fim(): capacities = self.get_fim().get_property(pname="capacities") if capacities: self._cached_requested_cores = capacities.core except Exception: self._cached_requested_cores = None return self._cached_requested_cores if self._cached_requested_cores else 0
[docs] def get_ram(self) -> Optional[int]: """ Gets the amount of RAM on the FABRIC node. :return: the amount of RAM on the node :rtype: int """ if self._cached_allocated_ram is None: try: if self.get_fim(): capacities = self.get_fim().get_property( pname="capacity_allocations" ) if capacities: self._cached_allocated_ram = capacities.ram except Exception: self._cached_allocated_ram = None return self._cached_allocated_ram if self._cached_allocated_ram else 0
[docs] def get_requested_ram(self) -> Optional[int]: """ Gets the requested amount of RAM on the FABRIC node. :return: the requested amount of RAM on the node :rtype: int """ if self._cached_requested_ram is None: try: if self.get_fim(): capacities = self.get_fim().get_property(pname="capacities") if capacities: self._cached_requested_ram = capacities.ram except Exception: self._cached_requested_ram = None return self._cached_requested_ram if self._cached_requested_ram else 0
[docs] def get_disk(self) -> Optional[int]: """ Gets the amount of disk space on the FABRIC node. :return: the amount of disk space on the node :rtype: int """ if self._cached_allocated_disk is None: try: if self.get_fim(): capacities = self.get_fim().get_property( pname="capacity_allocations" ) if capacities: self._cached_allocated_disk = capacities.disk except Exception: self._cached_allocated_disk = None return self._cached_allocated_disk if self._cached_allocated_disk else 0
[docs] def get_requested_disk(self) -> Optional[int]: """ Gets the amount of disk space on the FABRIC node. :return: the amount of disk space on the node :rtype: int """ if self._cached_requested_disk is None: try: if self.get_fim(): capacities = self.get_fim().get_property(pname="capacities") if capacities: self._cached_requested_disk = capacities.disk except Exception: self._cached_requested_disk = None return self._cached_requested_disk if self._cached_requested_disk else 0
[docs] def get_image(self) -> Optional[str]: """ Gets the image reference on the FABRIC node. :return: the image reference on the node :rtype: String """ if self._cached_image_ref is None: try: if self.get_fim(): self._cached_image_ref = self.get_fim().image_ref else: self._cached_image_ref = None except Exception: self._cached_image_ref = None return self._cached_image_ref
[docs] def get_image_type(self) -> Optional[str]: """ Gets the image type on the FABRIC node. :return: the image type on the node :rtype: String """ if self._cached_image_type is None: try: if self.get_fim(): self._cached_image_type = self.get_fim().image_type else: self._cached_image_type = None except Exception: self._cached_image_type = None return self._cached_image_type
[docs] def get_host(self) -> Optional[str]: """ Gets the hostname on the FABRIC node. :return: the hostname on the node :rtype: String """ if not self.host: try: label_allocations = self.get_fim().get_property( pname="label_allocations" ) if label_allocations: self.host = label_allocations.instance_parent else: labels = self.get_fim().get_property(pname="labels") if labels: self.host = labels.instance_parent except Exception: return None return self.host
[docs] def get_site(self) -> str: """ Gets the site this node is on. Results are cached for performance. :return: the site this node is on :rtype: String """ if self._cached_site is None: try: self._cached_site = self.fim_node.site except Exception: self._cached_site = None return self._cached_site if self._cached_site is not None else ""
[docs] def get_type(self) -> Optional[NodeType]: """ Gets the type of the network services. Results are cached for performance. :return: network service types :rtype: NodeType """ if self._cached_type is None: try: if self.get_fim(): node_type = self.get_fim().get_property("type") self._cached_type = node_type if node_type else None else: self._cached_type = None except Exception as e: log.warning(f"Failed to get type: {e}") self._cached_type = None return self._cached_type
[docs] def get_management_ip(self) -> str: """ Gets the management IP of the node (IPv6). First attempts to read from the FIM topology. If the topology does not have the IP populated, falls back to the sliver info returned by the orchestrator. Results are cached for performance. :return: management IP :rtype: String """ if self._cached_management_ip is None: # Try FIM topology first try: ip = self.fim_node.management_ip if ip is not None: self._cached_management_ip = str(ip) except Exception: pass # Fall back to sliver info from orchestrator if self._cached_management_ip is None: try: if self.sliver is not None and self.sliver.mgmt_ip: self._cached_management_ip = str(self.sliver.mgmt_ip) except Exception: pass return self._cached_management_ip
[docs] def get_username(self) -> str: """ Gets the username on this fablib node. :return: the username on this node :rtype: String """ return self.username
[docs] def get_public_key(self) -> str: """ Gets the public key on fablib node. Important! Slice key management is underdevelopment and this functionality will likely change going forward. :return: the public key on the node :rtype: String """ return self.get_slice().get_slice_public_key()
[docs] def get_public_key_file(self) -> str: """ Gets the public key file path on the fablib node. Important! Slice key management is underdevelopment and this functionality will likely change going forward. :return: the public key path :rtype: String """ return self.get_slice().get_slice_public_key_file()
[docs] def get_private_key(self) -> str: """ Gets the private key on the fablib node. Important! Slice key management is underdevelopment and this functionality will likely change going forward. :return: the private key on the node :rtype: String """ return self.get_slice().get_slice_private_key()
[docs] def get_private_key_file(self) -> str: """ Gets the private key file path on the fablib slice. Important! Slice key management is underdevelopment and this functionality will likely change going forward. :return: the private key path :rtype: String """ return self.get_slice().get_slice_private_key_file()
[docs] def get_private_key_passphrase(self) -> str: """ Gets the private key passphrase on the FABLIB slice. Important! Slice key management is underdevelopment and this functionality will likely change going forward. :return: the private key passphrase :rtype: String """ return self.get_slice().get_private_key_passphrase()
[docs] def add_component( self, model: str = None, name: str = None, user_data: dict = {} ) -> Component: """ Creates a new FABRIC component using this fablib node. Example models include: - NIC_Basic: A single port 100 Gbps SR-IOV Virtual Function on a Mellanox ConnectX-6 - NIC_ConnectX_5: A dual port 25 Gbps Mellanox ConnectX-5 - NIC_ConnectX_6: A dual port 100 Gbps Mellanox ConnectX-6 - NVME_P4510: NVMe Storage Device - GPU_TeslaT4: Tesla T4 GPU - GPU_RTX6000: RTX6000 GPU - GPU_A30: A30 GPU - GPU_A40: A40 GPU - FPGA_Xilinx_U280: Xilinx U280 FPGA card .. note:: Some component types (e.g. ``NVME_P4510``, GPUs, FPGAs) require specific project-level permissions. If your project lacks the necessary permissions, the slice submission will fail with an error from the orchestrator. :param model: the name of the component model to add :type model: String :param name: the name of the new component :type name: String :return: the new component :rtype: Component """ component = Component.new_component( node=self, model=model, name=name, user_data=user_data ) if self.validate: status, error = self.get_fablib_manager().validate_node(node=self) if not status: component.delete() component = None log.warning(error) if self.raise_exception: raise ValueError(error) if component: self.components[component.get_name()] = component self.interfaces.update(component.get_interfaces(output="dict")) return component
[docs] def get_components(self, refresh: bool = False) -> List[Component]: """ Gets a list of components associated with this node. Results are cached. Use refresh=True to force reload from FIM. :param refresh: Refresh the component objects with latest FIM info :type refresh: bool :return: a list of components on this node :rtype: List[Component] """ # Skip refresh if cache is valid if self.components and not refresh and not self._fim_dirty: return list(self.components.values()) # Get components from FIM (single access) fim_components = self.fim_node.components if refresh or not self.components: self.components.clear() # Update or create component objects for component_name, fim_component in fim_components.items(): if component_name not in self.components: self.components[component_name] = Component(self, fim_component) elif refresh: # Update existing component's FIM reference self.components[component_name].fim_component = fim_component # Remove components no longer in FIM current_names = set(fim_components.keys()) to_remove = [name for name in self.components if name not in current_names] for name in to_remove: del self.components[name] return list(self.components.values())
[docs] def get_component(self, name: str, refresh: bool = False) -> Component: """ Retrieve a component associated with this node. Results are cached. Use refresh=True to force reload from FIM. :param name: Name of the component to retrieve :type name: str :param refresh: Whether to refresh the component from the latest FIM data :type refresh: bool :return: The requested component :rtype: Component :raises Exception: If the component is not found """ try: calculated_name = Component.calculate_name(node=self, name=name) # Check cache first (if not forcing refresh) if not refresh and not self._fim_dirty: for key in (calculated_name, name): if key in self.components: return self.components[key] # Get from FIM (single access) fim_components = self.fim_node.components fim_comp = fim_components.get(calculated_name) or fim_components.get(name) if not fim_comp: raise ResourceNotFoundError(f"Component not found in FIM: {name}") # Create and cache new component key = calculated_name if fim_comp.name == calculated_name else name component = Component(self, fim_comp) self.components[key] = component return component except Exception as e: log.error(f"Error retrieving component '{name}': {e}", exc_info=True) raise ResourceNotFoundError(f"Component not found: {name}")
[docs] def get_interfaces( self, include_subs: bool = True, refresh: bool = False, output: str = "list" ) -> Union[Dict[str, Interface], List[Interface]]: """ Gets a list of the interfaces associated with the FABRIC node. Results are cached. Use refresh=True to force reload from FIM. :param include_subs: Flag indicating if sub interfaces should be included :type include_subs: bool :param refresh: Refresh the interface objects with latest FIM info :type refresh: bool :param output: Return type - 'list' or 'dict' :type output: str :return: interfaces on the node :rtype: Union[Dict[str, Interface], List[Interface]] """ # Skip refresh if cache is valid if self.interfaces and not refresh and not self._fim_dirty: if output == "dict": return self.interfaces return list(self.interfaces.values()) # Rebuild interface cache from components self.interfaces.clear() for component in self.get_components(refresh=refresh): c_interfaces = component.get_interfaces( include_subs=include_subs, refresh=refresh, output="dict" ) self.interfaces.update(c_interfaces) if output == "dict": return self.interfaces return list(self.interfaces.values())
[docs] def get_interface( self, name: str = None, network_name: str = None, refresh: bool = False, raise_exception: bool = None, ) -> Optional[Interface]: """ Gets a particular interface associated with a FABRIC node. Accepts either the interface name or a network_name. If a network name is used, returns the interface connected to that network. If both name and network_name are provided, name takes precedence. :param name: interface name to search for :type name: str :param network_name: network name to search for :type network_name: str :param refresh: Refresh interface objects with latest FIM info :type refresh: bool :param raise_exception: if True, raise ResourceNotFoundError when the interface is not found; if False, return None. When None (default), falls back to the global ``FablibManager.raise_on_not_found`` setting. :type raise_exception: bool :return: an interface on the node or None :rtype: Optional[Interface] :raises ResourceNotFoundError: if the interface is not found and raising is enabled """ # Ensure interfaces are loaded interfaces = self.get_interfaces(refresh=refresh, output="dict") if name is not None: interface = interfaces.get(name) if interface is not None: return interface elif network_name is not None: for interface in interfaces.values(): if ( interface is not None and interface.get_network() is not None and interface.get_network().get_name() == network_name ): return interface should_raise = ( raise_exception if raise_exception is not None else ( self.get_fablib_manager().raise_on_not_found if self.get_fablib_manager() else False ) ) if should_raise: raise ResourceNotFoundError(f"Interface not found: {name or network_name}") return None
[docs] def get_ssh_command(self) -> str: """ Gets an SSH command used to access this node from a terminal. :return: the SSH command to access this node :rtype: str """ try: ssh_cmd = self.render_template( self.get_fablib_manager().get_ssh_command_line(), skip=[ "ssh_command", "interfaces", "state", "error", "components", "networks", ], ) bastion_config = self.get_fablib_manager().get_bastion_ssh_config_file() if bastion_config and not os.path.exists(bastion_config): ssh_cmd += ( f" (WARNING: SSH config file '{bastion_config}' " f"not found. Run fablib.create_ssh_config() first.)" ) return ssh_cmd except Exception: return self.get_fablib_manager().get_ssh_command_line()
[docs] def validIPAddress(self, IP: str) -> str: """ Checks if the IP string is a valid IP address. :param IP: the IP string to check :type IP: String :return: the type of IP address the IP string is, or 'Invalid' :rtype: String """ try: return "IPv4" if type(ip_address(IP)) is IPv4Address else "IPv6" except ValueError: return "Invalid"
[docs] def get_paramiko_key( self, private_key_file: str = None, get_private_key_passphrase: str = None ) -> paramiko.PKey: """ Get SSH pubkey, for internal use. :return: an SSH pubkey. :rtype: paramiko.PKey """ # TODO: This is a bit of a hack and should probably test he keys for their types # rather than relying on execptions if get_private_key_passphrase: try: return paramiko.RSAKey.from_private_key_file( self.get_private_key_file(), password=self.get_private_key_passphrase(), ) except: pass try: return paramiko.ecdsakey.ECDSAKey.from_private_key_file( self.get_private_key_file(), password=self.get_private_key_passphrase(), ) except: pass else: try: return paramiko.RSAKey.from_private_key_file( self.get_private_key_file() ) except: pass try: return paramiko.ecdsakey.ECDSAKey.from_private_key_file( self.get_private_key_file() ) except: pass raise ValidationError(f"ssh key invalid: FABRIC requires RSA or ECDSA keys")
def _get_ssh_connection( self, username: str = None, private_key_file: str = None, private_key_passphrase: str = None, ) -> tuple: """Get or create a cached SSH connection to this node via bastion. Returns a (bastion, client) tuple. Thread-safe. If the cached connection is still alive it is reused; otherwise a fresh one is created and cached. :param username: SSH username override :type username: str :param private_key_file: path to private key file override :type private_key_file: str :param private_key_passphrase: passphrase override :type private_key_passphrase: str :return: (bastion, client) tuple of paramiko.SSHClient :rtype: tuple """ with self._ssh_lock: # Check if cached connection is still alive if ( self._ssh_client and self._ssh_client.get_transport() and self._ssh_client.get_transport().is_active() and self._ssh_bastion and self._ssh_bastion.get_transport() and self._ssh_bastion.get_transport().is_active() ): return self._ssh_bastion, self._ssh_client # Close any stale connections self._close_ssh_connections() # Resolve credentials fablib_manager = self.get_fablib_manager() management_ip = str(self.get_management_ip()) node_username = username or self.username node_key_file = private_key_file or self.get_private_key_file() node_key_passphrase = ( private_key_passphrase or self.get_private_key_passphrase() ) ip_type = self.validIPAddress(management_ip) if ip_type == "IPv4": src_addr = ("0.0.0.0", 22) elif ip_type == "IPv6": src_addr = ("::", 22) else: raise ValidationError(f"Invalid management IP: {management_ip}") dest_addr = (str(management_ip), 22) key = self.get_paramiko_key( private_key_file=node_key_file, get_private_key_passphrase=node_key_passphrase, ) # Connect to bastion bastion = paramiko.SSHClient() bastion.set_missing_host_key_policy(paramiko.AutoAddPolicy()) bastion.connect( fablib_manager.get_bastion_host(), username=fablib_manager.get_bastion_username(), key_filename=fablib_manager.get_bastion_key_location(), passphrase=fablib_manager.get_bastion_key_passphrase(), ) bastion_transport = bastion.get_transport() bastion_channel = bastion_transport.open_channel( "direct-tcpip", dest_addr, src_addr ) # Connect to node via bastion tunnel client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect( management_ip, username=node_username, pkey=key, sock=bastion_channel, ) # Cache the connections self._ssh_bastion = bastion self._ssh_client = client return bastion, client def _close_ssh_connections(self): """Close cached SSH connections without acquiring the lock.""" for conn in (self._ssh_client, self._ssh_bastion): if conn: try: conn.close() except Exception as e: log.debug(f"Exception closing SSH connection: {e}") self._ssh_client = None self._ssh_bastion = None
[docs] def close_ssh(self): """Close cached SSH connections to this node. Releases the cached bastion and node SSH connections. Safe to call multiple times. New connections will be created automatically on the next execute/upload/download call. """ with self._ssh_lock: self._close_ssh_connections()
[docs] def execute_thread( self, command: str, retry: int = 3, retry_interval: int = 10, username: str = None, private_key_file: str = None, private_key_passphrase: str = None, output_file: str = None, ) -> threading.Thread: """ Creates a thread that calls node.execute(). Results (i.e. stdout, stderr) from the thread can be retrieved with by calling thread.result() :param command: the command to run :type command: str :param retry: the number of times to retry SSH upon failure :type retry: int :param retry_interval: the number of seconds to wait before retrying SSH upon failure :type retry_interval: int :param username: username :type username: str :param private_key_file: path to private key file :type private_key_file: str :param private_key_passphrase: pass phrase :type private_key_passphrase: str :param output_file: path to a file where the stdout/stderr will be written. None for no file output :type output_file: List[str] :return: a thread that called node.execute() :raise Exception: if management IP is invalid """ return ( self.get_fablib_manager() .get_ssh_thread_pool_executor() .submit( self.execute, command, retry=retry, retry_interval=retry_interval, username=username, private_key_file=private_key_file, private_key_passphrase=private_key_passphrase, output_file=output_file, quiet=True, ) )
[docs] def execute( self, command: ( str | list[str] | list[tuple[str, str, int]] ), # Supports single command, list of commands, or interactive tuples retry: int = 3, retry_interval: int = 10, username: str = None, private_key_file: str = None, private_key_passphrase: str = None, quiet: bool = False, read_timeout: int = 10, timeout=None, output_file: str = None, display: bool = True, # Show interactive execution output ): """ Runs one or more commands on the FABRIC node using SSH, supporting both standard and interactive execution. :param command: Commands to execute. Can be: - A string (single command) - A list of strings (multiple commands executed sequentially) - A list of (command, prompt, timeout) tuples (interactive commands with expected prompts) :type command: str | list[str] | list[tuple[str, str, int]] :param retry: Number of retry attempts in case of failure. :type retry: int :param retry_interval: Time interval (seconds) between retries. :type retry_interval: int :param username: SSH username. :type username: str :param private_key_file: Path to the private key file. :type private_key_file: str :param private_key_passphrase: Passphrase for private key. :type private_key_passphrase: str :param quiet: Suppress output if True. :type quiet: bool :param prompt_changes: Whether to allow changing prompts dynamically in interactive mode. :type prompt_changes: bool :param read_timeout: Time to wait before reading stdout/stderr. :type read_timeout: int :param timeout: Command timeout in seconds. :type timeout: int :param output_file: File path for output logging. :type output_file: str :param display: Show interactive execution output if True. :type display: bool :return: A tuple (stdout, stderr). :rtype: Tuple[str, str] :raises Exception: If SSH connection fails. :raises RuntimeError: If no_ssh mode is enabled. """ if self.get_fablib_manager().get_no_ssh(): raise RuntimeError( "SSH operations are disabled (no_ssh=True). " "This fablib instance is configured for API-only operations." ) log.debug( f"Executing on node: {self.get_name()}, IP: {self.get_management_ip()}, Command: {command}" ) # Validate management IP management_ip = self.get_management_ip() if not management_ip: raise SliceStateError(f"Node {self.get_name()} has no valid management IP.") # Ensure the node is active if self.get_reservation_state() != "Active": raise SliceStateError( f"Node {self.get_name()} is in state {self.get_reservation_state()}, cannot execute command." ) fablib_manager = self.get_fablib_manager() log_debug = fablib_manager.get_log_level() == logging.DEBUG if log_debug: start_time = time.time() # Get bastion and node credentials bastion_username = fablib_manager.get_bastion_username() bastion_key_file = fablib_manager.get_bastion_key_location() node_username = username or self.username node_key_file = private_key_file or self.get_private_key_file() node_key_passphrase = ( private_key_passphrase or self.get_private_key_passphrase() ) # Determine connection settings ip_type = self.validIPAddress(management_ip) if ip_type == "IPv4": src_addr = ("0.0.0.0", 22) elif ip_type == "IPv6": src_addr = ("::", 22) else: raise ValidationError(f"Invalid management IP: {management_ip}") dest_addr = (str(management_ip), 22) # Detect interactive vs. standard execution interactive_mode = ( all(isinstance(cmd, tuple) for cmd in command) if isinstance(command, list) else False ) standard_mode = ( isinstance(command, str) or all(isinstance(cmd, str) for cmd in command) if isinstance(command, list) else False ) # If standard commands, convert list to a single shell command string if standard_mode and isinstance(command, list): command = " && ".join(command) attempt = 0 while attempt < max(1, retry): try: bastion, client = self._get_ssh_connection( username=node_username, private_key_file=node_key_file, private_key_passphrase=node_key_passphrase, ) # Handle interactive execution if interactive_mode: return self._interactive_execute( client=client, commands=command, quiet=quiet, display=display, output_file=output_file, ) if timeout: command = f"sudo timeout --foreground -k 10 {timeout} {command}\n" stdin, stdout, stderr = client.exec_command(command) stdin.close() # Read stdout and stderr in chunks stdout_chunks, stderr_chunks = [], [] while ( not stdout.channel.closed or stdout.channel.recv_ready() or stderr.channel.recv_stderr_ready() ): readq, _, _ = select.select([stdout.channel], [], [], read_timeout) for c in readq: if c.recv_ready(): stdout_bytes = c.recv(len(c.in_buffer)) if not quiet: print(stdout_bytes.decode(), end="") if output_file: with open(output_file, "a") as f: f.write(stdout_bytes.decode()) stdout_chunks.append(stdout_bytes) if c.recv_stderr_ready(): stderr_bytes = c.recv_stderr(len(c.in_stderr_buffer)) if not quiet: print(f"\x1b[31m{stderr_bytes.decode()}\x1b[0m", end="") if output_file: with open(output_file, "a") as f: f.write(stderr_bytes.decode()) stderr_chunks.append(stderr_bytes) stdout.close() stderr.close() rtn_stdout = b"".join(stdout_chunks).decode() rtn_stderr = b"".join(stderr_chunks).decode() if log_debug: elapsed_time = time.time() - start_time log.debug( f"Command executed in {elapsed_time:.2f}s, stdout: {rtn_stdout}, stderr: {rtn_stderr}" ) return rtn_stdout, rtn_stderr except paramiko.ssh_exception.PasswordRequiredException as e: self.close_ssh() raise SSHError( f"SSH key is encrypted and no passphrase was provided. " f"Set the private key passphrase in your fablib configuration " f"or use an unencrypted key. Original error: {e}" ) from e except paramiko.AuthenticationException as e: self.close_ssh() raise SSHError( f"SSH authentication failed for node {self.get_name()} " f"({management_ip}): {e}" ) from e except Exception as e: log.warning( f"SSH attempt {attempt + 1}/{retry} failed for node " f"{self.get_name()} ({management_ip}): {e}" ) self.close_ssh() attempt += 1 if attempt >= retry: raise SSHError( f"SSH connection to node {self.get_name()} ({management_ip}) " f"failed after {retry} attempts. Last error: {e}" ) from e time.sleep(retry_interval) raise SSHError("ssh failed: Should not get here")
def _interactive_execute( self, client: paramiko.SSHClient, commands: list[tuple[str, str, int]], quiet: bool, display: bool, output_file: str, ): """ Executes an interactive session over SSH. :param client: Active SSH client. :type client: paramiko.SSHClient :param commands: List of (command, prompt, timeout) tuples for interactive execution. :type commands: list[tuple] :param quiet: Suppress output if True. :type quiet: bool :param display: Show interactive execution output if True. :type display: bool :param output_file: Path to a file where stdout/stderr will be written. :type output_file: str :return: A tuple (stdout, stderr). :rtype: Tuple[str, str] """ with SSHClientInteraction(client, timeout=10, display=display) as interact: for cmd, new_prompt, timeout in commands: interact.send(cmd) interact.expect(new_prompt, timeout=timeout) rtn_stdout = interact.current_output_clean.replace("\\n", "\n") rtn_stderr = "" if not quiet: print(rtn_stdout, rtn_stderr) if output_file: with open(output_file, "a") as file: file.write(rtn_stdout + rtn_stderr) return rtn_stdout, rtn_stderr
[docs] def upload_file_thread( self, local_file_path: str, remote_file_path: str = ".", retry: int = 3, retry_interval: int = 10, ): """ Creates a thread that calls ``node.upload_file()``. Results from the thread can be retrieved with by calling ``thread.result()``. :param local_file_path: the path to the file to upload :type local_file_path: str :param remote_file_path: the destination path of the file on the node :type remote_file_path: str :param retry: how many times to retry SCP upon failure :type retry: int :param retry_interval: how often to retry SCP on failure :type retry_interval: int :return: a thread that called ``node.execute()`` :rtype: Thread :raise Exception: if management IP is invalid """ return ( self.get_fablib_manager() .get_ssh_thread_pool_executor() .submit( self.upload_file, local_file_path, remote_file_path, retry=retry, retry_interval=retry_interval, ) )
[docs] def upload_file( self, local_file_path: str, remote_file_path: str = ".", retry: int = 3, retry_interval: int = 10, ): """ Upload a local file to a remote location on the node. :param local_file_path: the path to the file to upload :type local_file_path: str :param remote_file_path: the destination path of the file on the node :type remote_file_path: str :param retry: how many times to retry SCP upon failure :type retry: int :param retry_interval: how often to retry SCP on failure :type retry_interval: int :raise Exception: if management IP is invalid :raises RuntimeError: if no_ssh mode is enabled """ if self.get_fablib_manager().get_no_ssh(): raise RuntimeError( "SSH operations are disabled (no_ssh=True). " "This fablib instance is configured for API-only operations." ) log.debug(f"upload node: {self.get_name()}, local_file_path: {local_file_path}") if self.get_fablib_manager().get_log_level() == logging.DEBUG: start = time.time() for attempt in range(int(retry)): ftp_client = None try: bastion, client = self._get_ssh_connection() ftp_client = client.open_sftp() file_attributes = ftp_client.put(local_file_path, remote_file_path) if self.get_fablib_manager().get_log_level() == logging.DEBUG: end = time.time() log.debug( f"Running node.upload_file(): file: {local_file_path}, " f"elapsed time: {end - start} seconds" ) return file_attributes except Exception as e: log.warning(f"Exception on upload_file() attempt #{attempt}: {e}") self.close_ssh() if attempt + 1 == retry: raise e time.sleep(retry_interval) finally: if ftp_client: try: ftp_client.close() except Exception as e: log.debug(f"Exception in ftp_client.close(): {e}") raise SSHError("scp upload failed")
[docs] def download_file_thread( self, local_file_path: str, remote_file_path: str, retry: int = 3, retry_interval: int = 10, ): """ Creates a thread that calls node.download_file(). Results from the thread can be retrieved with by calling thread.result() :param local_file_path: the destination path for the remote file :type local_file_path: str :param remote_file_path: the path to the remote file to download :type remote_file_path: str :param retry: how many times to retry SCP upon failure :type retry: int :param retry_interval: how often to retry SCP upon failure :type retry_interval: int :return: a thread that called node.download_file() :rtype: Thread :raise Exception: if management IP is invalid """ return ( self.get_fablib_manager() .get_ssh_thread_pool_executor() .submit( self.download_file, local_file_path, remote_file_path, retry=retry, retry_interval=retry_interval, ) )
[docs] def download_file( self, local_file_path: str, remote_file_path: str, retry: int = 3, retry_interval: int = 10, ): """ Download a remote file from the node to a local destination. :param local_file_path: the destination path for the remote file :type local_file_path: str :param remote_file_path: the path to the remote file to download :type remote_file_path: str :param retry: how many times to retry SCP upon failure :type retry: int :param retry_interval: how often to retry SCP upon failure :type retry_interval: int :raises RuntimeError: if no_ssh mode is enabled """ if self.get_fablib_manager().get_no_ssh(): raise RuntimeError( "SSH operations are disabled (no_ssh=True). " "This fablib instance is configured for API-only operations." ) log.debug( f"download node: {self.get_name()}, remote_file_path: {remote_file_path}" ) if self.get_fablib_manager().get_log_level() == logging.DEBUG: start = time.time() for attempt in range(int(retry)): ftp_client = None try: bastion, client = self._get_ssh_connection() ftp_client = client.open_sftp() file_attributes = ftp_client.get(remote_file_path, local_file_path) if self.get_fablib_manager().get_log_level() == logging.DEBUG: end = time.time() log.debug( f"Running node.download(): file: {remote_file_path}, " f"elapsed time: {end - start} seconds" ) return file_attributes except Exception as e: log.warning( f"Exception in download_file() (attempt #{attempt} of {retry}): {e}" ) self.close_ssh() if attempt + 1 == retry: raise e time.sleep(retry_interval) finally: if ftp_client: try: ftp_client.close() except Exception as e: log.debug(f"Exception in ftp_client.close(): {e}") raise SSHError("scp download failed")
[docs] def upload_directory_thread( self, local_directory_path: str, remote_directory_path: str, retry: int = 3, retry_interval: int = 10, ): """ Creates a thread that calls ``Node.upload_directory()``. Results from the thread can be retrieved with by calling ``thread.result()``. :param local_directory_path: the path to the directory to upload :type local_directory_path: str :param remote_directory_path: the destination path of the directory on the node :type remote_directory_path: str :param retry: how many times to retry SCP upon failure :type retry: int :param retry_interval: how often to retry SCP on failure :type retry_interval: int :return: a thread that called ``node.upload_directory()`` :rtype: Thread :raise Exception: if management IP is invalid """ return ( self.get_fablib_manager() .get_ssh_thread_pool_executor() .submit( self.upload_directory, local_directory_path, remote_directory_path, retry=retry, retry_interval=retry_interval, ) )
[docs] def upload_directory( self, local_directory_path: str, remote_directory_path: str, retry: int = 3, retry_interval: int = 10, ): """ Upload a directory to remote location on the node. Makes a gzipped tarball of a directory and uploads it to a node. Then unzips and untars the directory at the ``remote_directory_path``. :param local_directory_path: the path to the directory to upload :type local_directory_path: str :param remote_directory_path: the destination path of the directory on the node :type remote_directory_path: str :param retry: how many times to retry SCP upon failure :type retry: int :param retry_interval: how often to retry SCP on failure :type retry_interval: int :raise Exception: if management IP is invalid :raises RuntimeError: if no_ssh mode is enabled """ if self.get_fablib_manager().get_no_ssh(): raise RuntimeError( "SSH operations are disabled (no_ssh=True). " "This fablib instance is configured for API-only operations." ) import os import tarfile import tempfile log.debug( f"upload node: {self.get_name()}, local_directory_path: {local_directory_path}" ) output_filename = local_directory_path.split("/")[-1] root_size = len(local_directory_path) - len(output_filename) temp_name = next(tempfile._get_candidate_names()) temp_file = "/tmp/" + str(temp_name) + ".tar.gz" with tarfile.open(temp_file, "w:gz") as tar_handle: for root, dirs, files in os.walk(local_directory_path): for file in files: tar_handle.add( os.path.join(root, file), arcname=os.path.join(root, file)[root_size:], recursive=True, ) for directory in dirs: tar_handle.add( os.path.join(root, directory), arcname=os.path.join(root, directory)[root_size:], recursive=True, ) self.upload_file(temp_file, temp_file, retry, retry_interval) os.remove(temp_file) self.execute( "mkdir -p " + remote_directory_path + "; tar -xf " + temp_file + " -C " + remote_directory_path + "; rm " + temp_file, retry, retry_interval, quiet=True, ) return "success"
[docs] def download_directory_thread( self, local_directory_path: str, remote_directory_path: str, retry: int = 3, retry_interval: int = 10, ): """ Creates a thread that calls node.download_directory. Results from the thread can be retrieved with by calling thread.result() :param local_directory_path: the path to the directory to upload :type local_directory_path: str :param remote_directory_path: the destination path of the directory on the node :type remote_directory_path: str :param retry: how many times to retry SCP upon failure :type retry: int :param retry_interval: how often to retry SCP on failure :type retry_interval: int :raise Exception: if management IP is invalid """ return ( self.get_fablib_manager() .get_ssh_thread_pool_executor() .submit( self.download_directory, local_directory_path, remote_directory_path, retry=retry, retry_interval=retry_interval, ) )
[docs] def download_directory( self, local_directory_path: str, remote_directory_path: str, retry: int = 3, retry_interval: int = 10, ): """ Downloads a directory from remote location on the node. Makes a gzipped tarball of a directory and downloads it from a node. Then unzips and tars the directory at the local_directory_path :param local_directory_path: the path to the directory to upload :type local_directory_path: str :param remote_directory_path: the destination path of the directory on the node :type remote_directory_path: str :param retry: how many times to retry SCP upon failure :type retry: int :param retry_interval: how often to retry SCP on failure :type retry_interval: int :raise Exception: if management IP is invalid :raises RuntimeError: if no_ssh mode is enabled """ if self.get_fablib_manager().get_no_ssh(): raise RuntimeError( "SSH operations are disabled (no_ssh=True). " "This fablib instance is configured for API-only operations." ) import os import tarfile log.debug( f"upload node: {self.get_name()}, local_directory_path: {local_directory_path}" ) temp_file = "/tmp/unpackingfile.tar.gz" self.execute( "tar -czf " + temp_file + " " + remote_directory_path, retry, retry_interval, quiet=True, ) self.download_file(temp_file, temp_file, retry, retry_interval) tar_file = tarfile.open(temp_file) tar_file.extractall(local_directory_path) self.execute("rm " + temp_file, retry, retry_interval, quiet=True) os.remove(temp_file) return "success"
[docs] def test_ssh(self) -> bool: """ Test whether SSH is functional on the node. :return: true if SSH is working, false otherwise (always False if no_ssh) :rtype: bool """ if self.get_fablib_manager().get_no_ssh(): return False log.debug(f"test_ssh: node {self.get_name()}") try: if self.get_management_ip() is None: log.debug( f"Node: {self.get_name()} failed test_ssh because management_ip == None" ) self.execute( f"echo test_ssh from {self.get_name()}", retry=1, retry_interval=10, quiet=True, ) except Exception as e: # log.debug(f"{e}") log.debug(e, exc_info=True) return False return True
[docs] def get_management_os_interface(self) -> Optional[str]: """ Gets the name of the management interface used by the node's operating system, based on the default route. :return: Name of the management interface or None if not found :rtype: Optional[str] """ log.debug(f"{self.get_name()}->get_management_os_interface") # Check both IPv4 and IPv6 routes for ip_version in ["ip", "ip -6"]: interface = self._get_default_interface(ip_version) if interface: return interface log.warning( f"{self.get_name()}->get_management_os_interface: No management interface found" ) return None
def _get_default_interface(self, ip_version: str) -> Optional[str]: """ Helper method to get the default interface for a given IP version. :param ip_version: 'ip' for IPv4 or 'ip -6' for IPv6 :return: Name of the default interface or None if not found """ command = f"sudo {ip_version} -j route list" log.debug(f"Executing: {command}") try: stdout, stderr = self.execute(command, quiet=True) stdout_json = json.loads(stdout) for route in stdout_json: if route.get("dst") == "default": if "dev" in route.keys(): interface = route.get("dev") log.debug( f"Found default route on {ip_version} with interface: {interface}" ) return interface elif "nexthops" in route.keys(): for nexthop in route.get("nexthops"): log.debug(f"nexthop : {nexthop.get('gateway')}") interface = nexthop.get("dev") return interface else: return None except (json.JSONDecodeError, KeyError) as e: log.error(f"Failed to parse route list for {ip_version}: {e}") except Exception as e: log.error(f"Error executing command '{command}': {e}") return None
[docs] def get_dataplane_os_interfaces(self) -> List[dict]: """ Gets a list of all the dataplane interface names used by the node's operating system. :return: interface names :rtype: List[String] """ management_dev = self.get_management_os_interface() stdout, stderr = self.execute("sudo ip -j addr list", quiet=True) stdout_json = json.loads(stdout) dataplane_devs = [] for i in stdout_json: if i["ifname"] != "lo" and i["ifname"] != management_dev: dataplane_devs.append({"ifname": i["ifname"], "mac": i["address"]}) return dataplane_devs
# ========================================================================= # Persistent network configuration backend detection and helpers # ========================================================================= def _detect_net_config_backend(self) -> str: """ Detect the available network configuration backend. Checks in order: nmcli (running NetworkManager), netplan, then falls back to raw ip commands. :return: 'nmcli', 'netplan', or 'ip' :rtype: str """ if self._net_config_backend is not None: return self._net_config_backend try: stdout, stderr = self.execute( "command -v nmcli", # "command -v nmcli && nmcli general status", quiet=True, ) # nmcli general status outputs STATE as "connected" (or # "connected-local", "connecting") when NetworkManager is # running. If NM is stopped, the command itself fails. # if stdout and ("connected" in stdout.lower() or # "connecting" in stdout.lower()): if stdout and stdout.strip(): self._net_config_backend = "nmcli" log.info(f"{self.get_name()}: detected nmcli backend") return self._net_config_backend except Exception: pass try: stdout, stderr = self.execute("command -v netplan", quiet=True) if stdout and stdout.strip(): self._net_config_backend = "netplan" log.info(f"{self.get_name()}: detected netplan backend") return self._net_config_backend except Exception: pass self._net_config_backend = "ip" log.info(f"{self.get_name()}: falling back to ip backend") return self._net_config_backend def _get_effective_backend(self, persistent: Optional[bool] = None) -> str: """ Return the effective backend considering the persistent flag. :param persistent: Override for self._persistent_config. If False, forces legacy 'ip' backend. :return: 'nmcli', 'netplan', or 'ip' """ use_persistent = ( persistent if persistent is not None else self._persistent_config ) if not use_persistent: return "ip" return self._detect_net_config_backend() def _unmanage_mgmt_interface(self): """ When using nmcli backend, find the management interface and set it to unmanaged so NetworkManager does not interfere with it. The management interface is identified by looking up which device carries the node's management IP address. """ mgmt_ip = self.get_management_ip() if not mgmt_ip: log.warning(f"{self.get_name()}: no management IP, skipping unmanage") return try: # Find the device that carries the management IP stdout, stderr = self.execute( f"ip -o addr show | grep '{mgmt_ip}/' | awk '{{print $2}}' | head -1", quiet=True, ) if stdout and stdout.strip(): mgmt_dev = stdout.strip() self.execute( f"sudo nmcli device set {mgmt_dev} managed no", quiet=True, ) log.info( f"{self.get_name()}: set management interface " f"{mgmt_dev} ({mgmt_ip}) to unmanaged" ) # Preserve IPv6 RA acceptance on the management interface self._preserve_mgmt_ipv6_ra(mgmt_dev) else: log.warning( f"{self.get_name()}: could not find device for " f"management IP {mgmt_ip}" ) except Exception as e: log.warning( f"{self.get_name()}: failed to unmanage management interface: {e}" ) def _preserve_mgmt_ipv6_ra(self, mgmt_dev: str): """ Ensure the management interface continues to accept IPv6 Router Advertisements even when IPv6 forwarding is enabled (e.g. by Docker). By default the Linux kernel stops processing RAs when net.ipv6.conf.<dev>.forwarding=1. Setting accept_ra=2 overrides this behaviour so SLAAC addresses and the IPv6 default route are maintained. :param mgmt_dev: management network device name (e.g. 'eth0') """ try: self.execute( f"sudo sysctl -w net.ipv6.conf.{mgmt_dev}.accept_ra=2 > /dev/null 2>&1", quiet=True, ) log.info( f"{self.get_name()}: set accept_ra=2 on {mgmt_dev} " f"to preserve IPv6 RA under forwarding" ) except Exception as e: log.warning( f"{self.get_name()}: failed to set accept_ra=2 on {mgmt_dev}: {e}" ) @staticmethod def _nm_conn_name(device_name: str) -> str: """ Return a deterministic NetworkManager connection name for a device. :param device_name: OS device name (e.g. 'enp7s0', 'enp7s0.100') :return: connection name like 'fabric-enp7s0' """ return f"fabric-{device_name}" def _detect_ip_version_for_interface(self, interface: Interface) -> str: """ Detect the IP version ('ipv4' or 'ipv6') for a given interface based on its network service type. :param interface: the fablib interface :return: 'ipv4' or 'ipv6' """ try: network = interface.get_network() if network and network.get_type() in [ ServiceType.FABNetv6, ServiceType.FABNetv6Ext, ]: return "ipv6" except Exception: pass return "ipv4" # ---- nmcli helpers ---- def _nmcli_ensure_connection( self, conn_name: str, ifname: str, ip_version: str, addresses: str, conn_type: str = "ethernet", vlan_id: Optional[str] = None, vlan_parent: Optional[str] = None, mtu: Optional[str] = None, ): """ Create or modify an nmcli connection. :param conn_name: NM connection name (e.g. 'fabric-enp7s0') :param ifname: OS interface name :param ip_version: 'ipv4' or 'ipv6' :param addresses: CIDR address (e.g. '10.0.0.1/24') :param conn_type: 'ethernet' or 'vlan' :param vlan_id: VLAN ID when conn_type is 'vlan' :param vlan_parent: parent device when conn_type is 'vlan' :param mtu: optional MTU value """ # Check if connection already exists stdout, stderr = self.execute( f"sudo nmcli -t -f NAME c show 2>/dev/null | grep -Fx '{conn_name}' || true", quiet=True, ) exists = conn_name in (stdout or "").strip() if not exists: cmd = f"sudo nmcli c add type {conn_type} ifname {ifname} con-name {conn_name}" if conn_type == "vlan" and vlan_id and vlan_parent: cmd += f" dev {vlan_parent} id {vlan_id}" cmd += ( f" {ip_version}.method manual" f" {ip_version}.addresses {addresses}" f" connection.autoconnect yes" ) if mtu: cmd += f" 802-3-ethernet.mtu {mtu}" self.execute(cmd, quiet=True) else: cmd = ( f"sudo nmcli c mod {conn_name}" f" {ip_version}.method manual" f" {ip_version}.addresses {addresses}" f" connection.autoconnect yes" ) if mtu: cmd += f" 802-3-ethernet.mtu {mtu}" self.execute(cmd, quiet=True) def _nmcli_up(self, conn_name: str): """ Bring up an nmcli connection. :param conn_name: NM connection name """ self.execute( f"sudo nmcli c up {conn_name} 2>/dev/null || sudo nmcli c up {conn_name}", quiet=True, ) def _nmcli_down(self, conn_name: str): """ Bring down an nmcli connection. :param conn_name: NM connection name """ self.execute(f"sudo nmcli c down {conn_name} 2>/dev/null || true", quiet=True) def _nmcli_route_add( self, conn_name: str, subnet: str, gateway: str, ip_version: str, ): """ Add a route to an nmcli connection. :param conn_name: NM connection name :param subnet: destination subnet (e.g. '10.128.0.0/10') :param gateway: next-hop gateway IP :param ip_version: 'ipv4' or 'ipv6' """ self.execute( f'sudo nmcli c mod {conn_name} +{ip_version}.routes "{subnet} {gateway}"', quiet=True, ) def _nmcli_delete(self, conn_name: str): """ Delete an nmcli connection. :param conn_name: NM connection name """ self.execute(f"sudo nmcli c delete {conn_name} 2>/dev/null || true", quiet=True) def _nmcli_configure_pbr( self, conn_name: str, ip_version: str, addr: str, prefix: str, gateway: str, subnet: str, pbr_table: int = 100, pbr_priority: int = 1000, route_metric: int = 200, ): """ Configure Policy-Based Routing on an nmcli connection. Used for FabNetv4Ext/FabNetv6Ext networks. :param conn_name: NM connection name :param ip_version: 'ipv4' or 'ipv6' :param addr: IP address (without prefix) :param prefix: prefix length :param gateway: next-hop gateway IP :param subnet: interface subnet in CIDR notation :param pbr_table: routing table number :param pbr_priority: routing rule priority :param route_metric: route metric """ ip_vaddr = "::" if ip_version == "ipv6" else "0.0.0.0" default_route = "::/0" if ip_version == "ipv6" else "0.0.0.0/0" # Check if management network uses same IP version ip_flag = "-6" if ip_version == "ipv6" else "-4" stdout, stderr = self.execute( f"ip {ip_flag} route show default | awk '{{print $3; exit}}'", quiet=True, ) mgmt_has_same_version = bool((stdout or "").strip()) if mgmt_has_same_version: # PBR mode: never-default + route-table + routing-rules self.execute( f"sudo nmcli c mod {conn_name} {ip_version}.never-default yes", quiet=True, ) self.execute( f"sudo nmcli c mod {conn_name} {ip_version}.route-table {pbr_table}", quiet=True, ) self.execute( f'sudo nmcli c mod {conn_name} +{ip_version}.routes "{subnet} {ip_vaddr}"', quiet=True, ) self.execute( f'sudo nmcli c mod {conn_name} +{ip_version}.routes "{default_route} {gateway}"', quiet=True, ) self.execute( f'sudo nmcli c mod {conn_name} +{ip_version}.routing-rules "priority {pbr_priority} from {addr}/{prefix} table {pbr_table}"', quiet=True, ) self.execute( f"sudo nmcli c mod {conn_name} {ip_version}.route-metric {route_metric}", quiet=True, ) else: # Standard default route (different IP version for management) self.execute( f"sudo nmcli c mod {conn_name} {ip_version}.never-default no", quiet=True, ) self.execute( f'sudo nmcli c mod {conn_name} +{ip_version}.routes "{default_route} {gateway}"', quiet=True, ) def _nmcli_configure_fabnet_routes( self, conn_name: str, ip_version: str, gateway: str, network_type, ): """ Configure routes for FabNetv4/FabNetv6 networks (non-Ext). Adds never-default and a route to the FABRIC supernet. :param conn_name: NM connection name :param ip_version: 'ipv4' or 'ipv6' :param gateway: next-hop gateway IP :param network_type: the ServiceType """ if network_type == ServiceType.FABNetv4: fabric_route = "10.128.0.0/10" else: fabric_route = "2602:FCFB:00::/40" self.execute( f"sudo nmcli c mod {conn_name} {ip_version}.never-default yes", quiet=True, ) self.execute( f'sudo nmcli c mod {conn_name} +{ip_version}.routes "{fabric_route} {gateway}"', quiet=True, ) # ---- netplan helpers ---- def _netplan_write_config( self, ifname: str, addresses: List[str], routes: Optional[List[dict]] = None, routing_policy: Optional[List[dict]] = None, is_vlan: bool = False, vlan_id: Optional[str] = None, vlan_link: Optional[str] = None, mtu: Optional[str] = None, ): """ Write a netplan YAML config for a FABRIC interface. :param ifname: OS interface name :param addresses: list of CIDR addresses :param routes: optional list of route dicts with 'to' and 'via' keys :param routing_policy: optional list of routing-policy dicts :param is_vlan: whether this is a VLAN interface :param vlan_id: VLAN ID :param vlan_link: parent device for VLAN :param mtu: optional MTU value """ safe_name = ifname.replace(".", "-") config_file = f"/etc/netplan/90-fabric-{safe_name}.yaml" if is_vlan and vlan_id and vlan_link: iface_config = { "id": int(vlan_id), "link": vlan_link, "addresses": addresses, } if routes: iface_config["routes"] = routes if routing_policy: iface_config["routing-policy"] = routing_policy if mtu: iface_config["mtu"] = int(mtu) yaml_content = { "network": { "version": 2, "renderer": "networkd", "vlans": {ifname: iface_config}, } } else: iface_config = { "dhcp4": False, "dhcp6": False, "addresses": addresses, } if routes: iface_config["routes"] = routes if routing_policy: iface_config["routing-policy"] = routing_policy if mtu: iface_config["mtu"] = int(mtu) yaml_content = { "network": { "version": 2, "renderer": "networkd", "ethernets": {ifname: iface_config}, } } import yaml yaml_str = yaml.dump(yaml_content, default_flow_style=False) # Escape single quotes for shell yaml_str_escaped = yaml_str.replace("'", "'\\''") self.execute( f"echo '{yaml_str_escaped}' | sudo tee {config_file} > /dev/null && sudo chmod 600 {config_file}", quiet=True, ) def _netplan_apply(self): """ Apply netplan configuration. """ self.execute("sudo netplan apply", quiet=True) def _netplan_add_route(self, ifname: str, subnet: str, gateway: str): """ Add a route to an existing netplan config for a FABRIC interface. Reads the current netplan YAML, merges the new route entry, and rewrites the file. Idempotent — skips if the exact route already exists. Does NOT call ``_netplan_apply()``; the caller is responsible (to allow batching multiple routes). :param ifname: OS interface name (e.g. ``enp7s0`` or ``enp7s0.100``) :param subnet: destination network in CIDR notation (e.g. ``10.128.0.0/10``) :param gateway: next-hop gateway address (e.g. ``10.130.1.1``) """ import yaml safe_name = ifname.replace(".", "-") config_file = f"/etc/netplan/90-fabric-{safe_name}.yaml" # Read existing config stdout, stderr = self.execute(f"sudo cat {config_file}", quiet=True) config = yaml.safe_load(stdout) if not config or "network" not in config: raise RuntimeError( f"No valid netplan config found at {config_file} for {ifname}" ) network = config["network"] # Find interface config block in ethernets or vlans iface_config = None for section in ("ethernets", "vlans"): if section in network and ifname in network[section]: iface_config = network[section][ifname] break if iface_config is None: raise RuntimeError(f"Interface {ifname} not found in {config_file}") # Build the new route entry new_route = {"to": subnet, "via": gateway} # Ensure routes list exists if "routes" not in iface_config: iface_config["routes"] = [] # Skip if exact route already present (idempotent) for existing in iface_config["routes"]: if existing.get("to") == subnet and existing.get("via") == gateway: return iface_config["routes"].append(new_route) # Rewrite the config file yaml_str = yaml.dump(config, default_flow_style=False) yaml_str_escaped = yaml_str.replace("'", "'\\''") self.execute( f"echo '{yaml_str_escaped}' | sudo tee {config_file} > /dev/null && sudo chmod 600 {config_file}", quiet=True, ) def _netplan_remove_config(self, ifname: str): """ Remove a netplan config file for a FABRIC interface. :param ifname: OS interface name """ safe_name = ifname.replace(".", "-") config_file = f"/etc/netplan/90-fabric-{safe_name}.yaml" self.execute(f"sudo rm -f {config_file}", quiet=True) # ========================================================================= # End persistent network configuration helpers # =========================================================================
[docs] def flush_all_os_interfaces(self): """ Flushes the configuration of all dataplane interfaces in the node. """ for iface in self.get_dataplane_os_interfaces(): self.flush_os_interface(iface["ifname"])
[docs] def flush_os_interface(self, os_iface: str, persistent: Optional[bool] = None): """ Flush the configuration of an interface in the node :param os_iface: the name of the interface to flush :type os_iface: String :param persistent: if True, also remove persistent config (nmcli/netplan) :type persistent: bool """ backend = self._get_effective_backend(persistent) if backend == "nmcli": conn_name = self._nm_conn_name(os_iface) self._nmcli_delete(conn_name) elif backend == "netplan": self._netplan_remove_config(os_iface) self._netplan_apply() # Always flush via ip commands as well for immediate cleanup stdout, stderr = self.execute(f"sudo ip addr flush dev {os_iface}", quiet=True) stdout, stderr = self.execute( f"sudo ip -6 addr flush dev {os_iface}", quiet=True )
[docs] def ip_addr_list(self, output="json", update=False): """ Return the list of IP addresses assciated with this node. :param output: Output format; ``"json"`` by default. :param update: Setting this to ``True`` will force-update the cached list of IP addresses; default is ``False``. :returns: When ``output`` is set to ``"json"`` (which is the default), the result of running ``ip -j[son] addr list``, converted to a Python object. Otherwise the result of ``ip addr list``. """ try: if self.ip_addr_list_json is not None and not update: return self.ip_addr_list_json else: if output == "json": stdout, stderr = self.execute(f"sudo ip -j addr list", quiet=True) self.ip_addr_list_json = json.loads(stdout) return self.ip_addr_list_json else: stdout, stderr = self.execute(f"sudo ip addr list", quiet=True) return stdout except Exception as e: log.debug(f"Failed to get ip addr list: {e}") raise e
[docs] def ip_route_add( self, subnet: Union[IPv4Network, IPv6Network], gateway: Union[IPv4Address, IPv6Address], interface: Interface = None, persistent: Optional[bool] = None, ): """ Add a route on the node. :param subnet: The destination subnet :type subnet: IPv4Network or IPv6Network :param gateway: The next hop gateway. :type gateway: IPv4Address or IPv6Address :param interface: Optional interface for nmcli connection lookup :type interface: Interface :param persistent: Override persistent config setting :type persistent: bool """ backend = self._get_effective_backend(persistent) ip_version = "ipv6" if type(subnet) == IPv6Network else "ipv4" # Auto-discover interface from gateway when not explicitly provided if interface is None: interface = self._find_interface_for_gateway(gateway) if backend == "nmcli" and interface is not None: try: device_name = interface.get_device_name() conn_name = self._nm_conn_name(device_name) self._nmcli_route_add(conn_name, str(subnet), str(gateway), ip_version) self._nmcli_up(conn_name) return except Exception as e: log.warning(f"nmcli route add failed, falling back to ip: {e}") elif backend == "netplan" and interface is not None: try: device_name = interface.get_device_name() self._netplan_add_route(device_name, str(subnet), str(gateway)) self._netplan_apply() return except Exception as e: log.warning(f"netplan route add failed, falling back to ip: {e}") # Legacy ip route add self._ip_route_add_legacy(subnet, gateway)
def _ip_route_add_legacy( self, subnet: Union[IPv4Network, IPv6Network], gateway: Union[IPv4Address, IPv6Address], ): """Legacy ip route add using raw ip commands.""" ip_command = "" if type(subnet) == IPv6Network: ip_command = "sudo ip -6" elif type(subnet) == IPv4Network: ip_command = "sudo ip" try: self.execute(f"{ip_command} route add {subnet} via {gateway}", quiet=True) except Exception as e: log.warning(f"Failed to add route: {e}") raise e
[docs] def network_manager_stop(self): """ Stop network manager on the node. """ try: stdout, stderr = self.execute( f"sudo systemctl stop NetworkManager", quiet=True ) log.info( f"Stopped NetworkManager with 'sudo systemctl stop " f"NetworkManager': stdout: {stdout}\nstderr: {stderr}" ) except Exception as e: log.warning(f"Failed to stop network manager: {e}") raise e
[docs] def network_manager_start(self): """ (re)Start network manager on the node. """ try: stdout, stderr = self.execute( f"sudo systemctl restart NetworkManager", quiet=True ) log.info( f"Started NetworkManager with 'sudo systemctl start NetworkManager': stdout: {stdout}\nstderr: {stderr}" ) except Exception as e: log.warning(f"Failed to start network manager: {e}") raise e
[docs] def get_ip_routes(self): """ Get a list of routes from the node. """ try: stdout, stderr = self.execute("ip -j route list", quiet=True) return json.loads(stdout) except Exception as e: log.warning(f"Exception: {e}")
# fablib.Node.get_ip_addrs()
[docs] def get_ip_addrs(self): """ Get a list of ip address info from the node. """ try: stdout, stderr = self.execute("ip -j addr list", quiet=True) addrs = json.loads(stdout) return addrs except Exception as e: log.warning(f"Exception: {e}")
[docs] def ip_route_del( self, subnet: Union[IPv4Network, IPv6Network], gateway: Union[IPv4Address, IPv6Address], ): """ Delete a route on the node. :param subnet: The destination subnet :type subnet: IPv4Network or IPv6Network :param gateway: The next hop gateway. :type gateway: IPv4Address or IPv6Address """ ip_command = "" if type(subnet) == IPv6Network: ip_command = "sudo ip -6" elif type(subnet) == IPv4Network: ip_command = "sudo ip" try: self.execute(f"{ip_command} route del {subnet} via {gateway}", quiet=True) except Exception as e: log.warning(f"Failed to del route: {e}") raise e
[docs] def ip_addr_add( self, addr: Union[IPv4Address, IPv6Address], subnet: Union[IPv4Network, IPv6Network], interface: Interface, persistent: Optional[bool] = None, ): """ Add an IP to an interface on the node. :param addr: IP address :type addr: IPv4Address or IPv6Address :param subnet: subnet. :type subnet: IPv4Network or IPv6Network :param interface: the FABlib interface. :type interface: Interface :param persistent: Override persistent config setting :type persistent: bool """ backend = self._get_effective_backend(persistent) device_name = interface.get_device_name() cidr = f"{addr}/{subnet.prefixlen}" ip_version = "ipv6" if type(subnet) == IPv6Network else "ipv4" if backend == "nmcli": try: conn_name = self._nm_conn_name(device_name) self._nmcli_ensure_connection( conn_name=conn_name, ifname=device_name, ip_version=ip_version, addresses=cidr, ) self._nmcli_up(conn_name) return except Exception as e: log.warning(f"nmcli ip_addr_add failed, falling back to ip: {e}") elif backend == "netplan": try: self._netplan_write_config( ifname=device_name, addresses=[cidr], ) self._netplan_apply() return except Exception as e: log.warning(f"netplan ip_addr_add failed, falling back to ip: {e}") # Legacy fallback self._ip_addr_add_legacy(addr, subnet, interface)
def _ip_addr_add_legacy( self, addr: Union[IPv4Address, IPv6Address], subnet: Union[IPv4Network, IPv6Network], interface: Interface, ): """Legacy ip addr add using raw ip commands.""" ip_command = "" if type(subnet) == IPv6Network: ip_command = "sudo ip -6" elif type(subnet) == IPv4Network: ip_command = "sudo ip" try: self.execute( f"{ip_command} addr add {addr}/{subnet.prefixlen} dev {interface.get_device_name()} ", quiet=True, ) except Exception as e: log.warning(f"Failed to add addr: {e}") raise e
[docs] def ip_addr_del( self, addr: Union[IPv4Address, IPv6Address], subnet: Union[IPv4Network, IPv6Network], interface: Interface, ): """ Delete an IP to an interface on the node. :param addr: IP address :type addr: IPv4Address or IPv6Address :param subnet: subnet. :type subnet: IPv4Network or IPv6Network :param interface: the FABlib interface. :type interface: Interface """ ip_command = "" if type(subnet) == IPv6Network: ip_command = "sudo ip -6" elif type(subnet) == IPv4Network: ip_command = "sudo ip" try: self.execute( f"{ip_command} addr del {addr}/{subnet.prefixlen} dev {interface.get_device_name()} ", quiet=True, ) except Exception as e: log.warning(f"Failed to del addr: {e}") raise e
[docs] def un_manage_interface(self, interface: Interface): """ Mark an interface unmanaged by Network Manager. When using the nmcli backend, this is a no-op because NM manages the interface persistently. Only applies to legacy (ip) backend. :param interface: the FABlib interface. :type interface: Interface """ if interface is None: return backend = self._get_effective_backend() if backend == "nmcli": # No-op: NM manages the interface persistently now return try: self.execute( f"sudo nmcli dev set {interface.get_physical_os_interface_name()} managed no", quiet=True, ) except Exception as e: log.warning(f"Failed to mark interface as unmanaged: {e}")
def _ip_link_up_legacy( self, subnet: Union[IPv4Network, IPv6Network], interface: Interface ): """Legacy ip link up using raw ip commands.""" if not interface: return ip_command = None try: network = interface.get_network() if not network: return elif network.get_layer() == NSLayer.L3: if network.get_type() in [ ServiceType.FABNetv6, ServiceType.FABNetv6Ext, ]: ip_command = "sudo ip -6" elif interface.get_network().get_type() in [ ServiceType.FABNetv4, ServiceType.FABNetv4Ext, ]: ip_command = "sudo ip" else: ip_command = "sudo ip" except Exception as e: log.warning(f"Failed to down link: {e}") return try: self.execute( f"{ip_command} link set dev {interface.get_physical_os_interface_name()} up", quiet=True, ) except Exception as e: log.warning(f"Failed to up link: {e}") raise e try: self.execute( f"{ip_command} link set dev {interface.get_device_name()} up", quiet=True, ) except Exception as e: log.warning(f"Failed to up link: {e}") raise e def _ip_link_down_legacy( self, subnet: Union[IPv4Network, IPv6Network], interface: Interface ): """Legacy ip link down using raw ip commands.""" ip_command = None try: if interface.get_network().get_layer() == NSLayer.L3: if interface.get_network().get_type() in [ ServiceType.FABNetv6, ServiceType.FABNetv6Ext, ]: ip_command = "sudo ip -6" elif interface.get_network().get_type() in [ ServiceType.FABNetv4, ServiceType.FABNetv4Ext, ]: ip_command = "sudo ip" else: ip_command = "sudo ip" except Exception as e: # log.warning(f"Failed to down link: {e}") return try: self.execute( f"{ip_command} link set dev {interface.get_device_name()} down", quiet=True, ) except Exception as e: log.warning(f"Failed to down link: {e}") raise e
[docs] def set_ip_os_interface( self, os_iface: str = None, vlan: str = None, ip: str = None, cidr: str = None, mtu: str = None, persistent: Optional[bool] = None, ): """ Configure IP Address on network interface as seen inside the VM :param os_iface: Interface name as seen by the OS such as eth1 etc. :type os_iface: String :param vlan: Vlan tag :type vlan: String :param ip: IP address to be assigned to the tagged interface :type ip: String :param cidr: CIDR associated with IP address :type ip: String :param mtu: MTU size :type mtu: String :param persistent: Override persistent config setting :type persistent: bool NOTE: This does not add the IP information in the fablib_data """ if cidr: cidr = str(cidr) if mtu: mtu = str(mtu) backend = self._get_effective_backend(persistent) ip_version = "ipv6" if self.validIPAddress(ip) == "IPv6" else "ipv4" if backend == "nmcli" and ip is not None and cidr is not None: try: if vlan is not None: vlan = str(vlan) target_iface = f"{os_iface}.{vlan}" conn_name = self._nm_conn_name(target_iface) self._nmcli_ensure_connection( conn_name=conn_name, ifname=target_iface, ip_version=ip_version, addresses=f"{ip}/{cidr}", conn_type="vlan", vlan_id=vlan, vlan_parent=os_iface, mtu=mtu, ) else: conn_name = self._nm_conn_name(os_iface) self._nmcli_ensure_connection( conn_name=conn_name, ifname=os_iface, ip_version=ip_version, addresses=f"{ip}/{cidr}", mtu=mtu, ) self._nmcli_up(conn_name) return except Exception as e: log.warning( f"nmcli set_ip_os_interface failed, falling back to ip: {e}" ) # Legacy fallback self._set_ip_os_interface_legacy(os_iface, vlan, ip, cidr, mtu)
def _set_ip_os_interface_legacy( self, os_iface: str = None, vlan: str = None, ip: str = None, cidr: str = None, mtu: str = None, ): """Legacy set_ip_os_interface using raw ip commands.""" if self.validIPAddress(ip) == "IPv4": ip_command = "sudo ip" elif self.validIPAddress(ip) == "IPv6": ip_command = "sudo ip -6" else: raise ValidationError( f"Invalid IP {ip}. IP must be vaild IPv4 or IPv6 string." ) # Bring up base iface log.debug( f"{self.get_name()}->set_ip_os_interface: os_iface {os_iface}, vlan {vlan}, ip {ip}, cidr {cidr}, mtu {mtu}" ) command = f"{ip_command} link set dev {os_iface} up" if mtu is not None: command += f" mtu {mtu}" stdout, stderr = self.execute(command, quiet=True) # config vlan iface if vlan is not None: # create vlan iface command = f"{ip_command} link add link {os_iface} name {os_iface}.{vlan} type vlan id {vlan}" stdout, stderr = self.execute(command, quiet=True) # bring up vlan iface os_iface = f"{os_iface}.{vlan}" command = f"{ip_command} link set dev {os_iface} up" if mtu != None: command += f" mtu {mtu}" stdout, stderr = self.execute(command, quiet=True) if ip is not None and cidr is not None: # Set ip command = f"{ip_command} addr add {ip}/{cidr} dev {os_iface}" stdout, stderr = self.execute(command, quiet=True) stdout, stderr = self.execute(command, quiet=True)
[docs] def clear_all_ifaces(self): """ Flush all interfaces and delete VLAN os interfaces """ # TODO: Add docstring after doc networking classes self.remove_all_vlan_os_interfaces() self.flush_all_os_interfaces()
[docs] def remove_all_vlan_os_interfaces(self): """ Delete all VLAN os interfaces """ # TODO: Add docstring after doc networking classes management_os_iface = self.get_management_os_interface() stdout, stderr = self.execute("sudo ip -j addr list", quiet=True) stdout_json = json.loads(stdout) dataplane_devs = [] for i in stdout_json: if i["ifname"] == management_os_iface or i["ifname"] == "lo": stdout_json.remove(i) continue # If iface is vlan linked to base iface if "link" in i.keys(): self.remove_vlan_os_interface(os_iface=i["ifname"])
[docs] def remove_vlan_os_interface( self, os_iface: str = None, persistent: Optional[bool] = None ): """ Remove one VLAN OS interface :param os_iface: the name of the VLAN interface to remove :type os_iface: String :param persistent: Override persistent config setting :type persistent: bool """ backend = self._get_effective_backend(persistent) if backend == "nmcli": conn_name = self._nm_conn_name(os_iface) self._nmcli_delete(conn_name) # Also do legacy removal to clean up the kernel device self._remove_vlan_os_interface_legacy(os_iface)
def _remove_vlan_os_interface_legacy(self, os_iface: str = None): """Legacy remove_vlan_os_interface using raw ip commands.""" command = f"sudo ip -j addr show {os_iface}" stdout, stderr = self.execute(command, quiet=True) try: [stdout_json] = json.loads(stdout) except Exception as e: log.warning(f"os_iface: {os_iface}, stdout: {stdout}, stderr: {stderr}") raise e link = stdout_json["link"] command = f"sudo ip link del link {link} name {os_iface}" stdout, stderr = self.execute(command, quiet=True)
[docs] def add_vlan_os_interface( self, os_iface: str = None, vlan: str = None, ip: str = None, cidr: str = None, mtu: str = None, interface: Interface = None, persistent: Optional[bool] = None, ): """ Add VLAN tagged interface for a given interface and set IP address on it :param os_iface: Interface name as seen by the OS such as eth1 etc. :type os_iface: String :param vlan: Vlan tag :type vlan: String :param ip: IP address to be assigned to the tagged interface :type ip: String :param cidr: CIDR associated with IP address :type ip: String :param mtu: MTU size :type mtu: String :param interface: Interface for which tagged interface has to be added :type interface: Interface :param persistent: Override persistent config setting :type persistent: bool """ if vlan: vlan = str(vlan) if cidr: cidr = str(cidr) if mtu: mtu = str(mtu) backend = self._get_effective_backend(persistent) ip_version = ( self._detect_ip_version_for_interface(interface) if interface else "ipv4" ) # Only use persistent backends when we have an IP to configure. # When called without ip/cidr (e.g., from config_vlan_iface just to # create the VLAN device), use legacy ip commands to create the device; # the nmcli connection will be created later when config() adds the IP. if ip and cidr and backend == "nmcli": try: vlan_iface = f"{os_iface}.{vlan}" conn_name = self._nm_conn_name(vlan_iface) self._nmcli_ensure_connection( conn_name=conn_name, ifname=vlan_iface, ip_version=ip_version, addresses=f"{ip}/{cidr}", conn_type="vlan", vlan_id=vlan, vlan_parent=os_iface, mtu=mtu, ) self._nmcli_up(conn_name) return except Exception as e: log.warning( f"nmcli add_vlan_os_interface failed, falling back to ip: {e}" ) elif ip and cidr and backend == "netplan": try: vlan_iface = f"{os_iface}.{vlan}" self._netplan_write_config( ifname=vlan_iface, addresses=[f"{ip}/{cidr}"], is_vlan=True, vlan_id=vlan, vlan_link=os_iface, mtu=mtu, ) self._netplan_apply() return except Exception as e: log.warning( f"netplan add_vlan_os_interface failed, falling back to ip: {e}" ) # Legacy fallback self._add_vlan_os_interface_legacy(os_iface, vlan, ip, cidr, mtu, interface)
def _add_vlan_os_interface_legacy( self, os_iface: str = None, vlan: str = None, ip: str = None, cidr: str = None, mtu: str = None, interface: Interface = None, ): """Legacy add_vlan_os_interface using raw ip commands.""" ip_command = "sudo ip" try: if interface and interface.get_network().get_layer() == NSLayer.L3: if interface.get_network().get_type() in [ ServiceType.FABNetv6, ServiceType.FABNetv6Ext, ]: ip_command = "sudo ip -6" elif interface.get_network().get_type() in [ ServiceType.FABNetv4, ServiceType.FABNetv4Ext, ]: ip_command = "sudo ip" else: ip_command = "sudo ip" except Exception as e: log.warning(f"Failed to get network layer and/or type: {e}") ip_command = "sudo ip" command = f"{ip_command} link add link {os_iface} name {os_iface}.{vlan} type vlan id {vlan}" stdout, stderr = self.execute(command, quiet=True) command = f"{ip_command} link set dev {os_iface} up" stdout, stderr = self.execute(command, quiet=True) command = f"{ip_command} link set dev {os_iface}.{vlan} up" stdout, stderr = self.execute(command, quiet=True) if ip and cidr: self._set_ip_os_interface_legacy( os_iface=f"{os_iface}.{vlan}", ip=ip, cidr=cidr, mtu=mtu )
[docs] def ping_test(self, dst_ip: str) -> bool: """ Test a ping from the node to a destination IP :param dst_ip: destination IP String. :type dst_ip: String """ # TODO: Add docstring after doc networking classes log.debug(f"ping_test: node {self.get_name()}") command = f"ping -c 1 {dst_ip} 2>&1 > /dev/null && echo Success" stdout, stderr = self.execute(command, quiet=True) if stdout.replace("\n", "") == "Success": return True else: return False
[docs] def get_storage(self, name: str) -> Component: """ Gets a particular storage associated with this node. :param name: the name of the storage :type name: String :raise Exception: if storage not found by name :return: the storage on the FABRIC node :rtype: Component """ try: return Component(self, self.get_fim().components[name]) except Exception as e: log.error(e, exc_info=True) raise ResourceNotFoundError(f"Storage not found: {name}")
[docs] def add_storage(self, name: str, auto_mount: bool = False) -> Component: """ Creates a new FABRIC NAS Storage component and attaches it to the Node. The ``auto_mount`` flag is passed to the orchestrator via FIM. If the orchestrator does not honour the flag, the storage volume will need to be mounted manually after the slice reaches ``StableOK``. For CephFS persistent storage that is automatically mounted during post-boot configuration, use :meth:`enable_storage` instead. :param name: Name of the Storage volume created for the project outside the scope of the Slice :param auto_mount: Request the orchestrator to mount the storage volume automatically. :type auto_mount: bool :rtype: Component """ return Component.new_storage(node=self, name=name, auto_mount=auto_mount)
[docs] def get_fim(self): """ Get FABRIC Information Model (fim) object for the node. """ return self.fim_node
[docs] def get_fim_node(self): """ Get FABRIC Information Model (fim) node object. Alias for :meth:`get_fim` for backward compatibility. """ return self.fim_node
[docs] def delete(self): """ Remove the node from the slice. All components and interfaces associated with the Node are removed from the Slice. """ for component in self.get_components(refresh=True): component.delete() self.get_slice().get_fim_topology().remove_node(name=self.get_name()) # Mark slice topology as dirty so cached node/interface lists # get refreshed on next access self.get_slice()._topology_dirty = True
[docs] def init_fablib_data(self): """ Initialize fablib data. Called by :py:meth:`new_node()`. """ fablib_data = { "instantiated": "False", "run_update_commands": "False", "post_boot_commands": [], "post_update_commands": [], } self.set_fablib_data(fablib_data)
[docs] def enable_storage(self, cluster: str = None): """ Enable CephFS storage on this node. Adds a FABNetv4 network (if not already present) and marks the node so that :py:meth:`Slice.post_boot_config` will automatically generate credentials, upload them, and mount the CephFS filesystem. :param cluster: Ceph cluster name (e.g. ``"europe"``). When ``None`` the first available cluster is auto-discovered at post-boot time. :type cluster: str """ if self.has_storage(): return self.add_fabnet(name="CEPH_STORAGE") fablib_data = self.get_fablib_data() fablib_data["storage"] = True if cluster: fablib_data["storage_cluster"] = cluster self.set_fablib_data(fablib_data)
[docs] def has_storage(self) -> bool: """ Check whether CephFS storage is enabled on this node. :return: ``True`` if storage was enabled via :py:meth:`enable_storage`. :rtype: bool """ return bool(self.get_fablib_data().get("storage", False))
[docs] def get_storage_cluster(self) -> Optional[str]: """ Return the Ceph cluster name associated with this node, or ``None`` if none was specified. :return: cluster name or ``None`` :rtype: str or None """ return self.get_fablib_data().get("storage_cluster")
def _set_storage_cluster(self, cluster: str): """ Set the Ceph cluster name in fablib_data (internal use). :param cluster: Ceph cluster name :type cluster: str """ fablib_data = self.get_fablib_data() fablib_data["storage_cluster"] = cluster self.set_fablib_data(fablib_data)
[docs] def add_route( self, subnet: Union[IPv4Network, IPv6Network], next_hop: Union[IPv4Address, IPv6Address, NetworkService], ): """ Add a route. :param subnet: an IPv4 or IPv6 address. :type subnet:IPv4Network or IPv6Network. :param next_hop: a gateway address (IPv4Address or IPv6Address) or a NetworkService. :type next_hop: IPv4Address or IPv6Address or NetworkService. """ if type(next_hop) == NetworkService: next_hop = next_hop.get_name() fablib_data = self.get_fablib_data() if "routes" not in fablib_data: fablib_data["routes"] = [] fablib_data["routes"].append({"subnet": str(subnet), "next_hop": str(next_hop)}) self.set_fablib_data(fablib_data)
[docs] def add_post_update_command(self, command: str): """ Run a command after boot. """ fablib_data = self.get_fablib_data() if "post_update_commands" not in fablib_data: fablib_data["post_update_commands"] = [] fablib_data["post_update_commands"].append(command) self.set_fablib_data(fablib_data)
[docs] def get_post_update_commands(self): """ Get the list of commands that are to be run after boot. """ fablib_data = self.get_fablib_data() if "post_update_commands" in fablib_data: return fablib_data["post_update_commands"] else: return []
[docs] def add_post_boot_upload_directory( self, local_directory_path: str, remote_directory_path: str = "." ): """ Upload a directory to the node after boot. :param local_directory_path: local directory. :type local_directory_path: str :param remote_directory_path: directory on the node. :type remote_directory_path: str """ fablib_data = self.get_fablib_data() if "post_boot_tasks" not in fablib_data: fablib_data["post_boot_tasks"] = [] fablib_data["post_boot_tasks"].append( ("upload_directory", local_directory_path, remote_directory_path) ) self.set_fablib_data(fablib_data)
[docs] def add_post_boot_upload_file( self, local_file_path: str, remote_file_path: str = "." ): """ Upload a file to the node after boot. :param local_file_path: path to file on local filesystem. :type local_file_path: str :param remote_file_path: path to file on the node. :type remote_file_path: str """ fablib_data = self.get_fablib_data() if "post_boot_tasks" not in fablib_data: fablib_data["post_boot_tasks"] = [] fablib_data["post_boot_tasks"].append( ("upload_file", local_file_path, remote_file_path) ) self.set_fablib_data(fablib_data)
[docs] def add_post_boot_execute(self, command: str): """ Execute a command on the node after boot. :param command: command to be executed on the node. :type command: str """ fablib_data = self.get_fablib_data() if "post_boot_tasks" not in fablib_data: fablib_data["post_boot_tasks"] = [] fablib_data["post_boot_tasks"].append(("execute", command)) self.set_fablib_data(fablib_data)
[docs] def post_boot_tasks(self): """ Get the list of tasks to be performed on this node after boot. """ fablib_data = self.get_fablib_data() if "post_boot_tasks" in fablib_data: return fablib_data["post_boot_tasks"] else: return []
[docs] def get_routes(self): """ .. warning:: This method is for fablib internal use, and will be made private in the future. """ try: return self.get_fablib_data()["routes"] except Exception as e: return []
def _find_interface_for_gateway(self, gateway) -> Optional[Interface]: """ Find the interface whose network gateway matches the given gateway. :param gateway: gateway IP address :return: matching Interface or None """ for iface in self.get_interfaces(): try: network = iface.get_network() if ( network and network.get_gateway() and str(network.get_gateway()) == str(gateway) ): return iface except Exception: continue return None
[docs] def config_routes(self): """ .. warning:: This method is for fablib internal use, and will be made private in the future. """ routes = self.get_routes() backend = self._get_effective_backend() needs_netplan_apply = False for route in routes: try: next_hop = ipaddress.ip_address(route["next_hop"]) except Exception as e: net_name = route["next_hop"].split(".")[0] next_hop = ( self.get_slice().get_network(name=str(net_name)).get_gateway() ) try: subnet = ipaddress.ip_network(route["subnet"]) except Exception as e: net_name = route["subnet"].split(".")[0] subnet = self.get_slice().get_network(name=str(net_name)).get_subnet() # Find the interface for this gateway so the route can be added # to the correct connection/config target_iface = self._find_interface_for_gateway(next_hop) if backend == "netplan" and target_iface is not None: try: device_name = target_iface.get_device_name() self._netplan_add_route(device_name, str(subnet), str(next_hop)) needs_netplan_apply = True continue except Exception as e: log.warning( f"netplan route add failed for {subnet}, falling back: {e}" ) self.ip_route_add( subnet=ipaddress.ip_network(subnet), gateway=next_hop, interface=target_iface, ) if needs_netplan_apply: self._netplan_apply()
[docs] def run_post_boot_tasks(self, log_dir: str = "."): """ Run post-boot tasks. Called by :py:meth:`config()`. Post-boot tasks are list of commands associated with `post_boot_tasks` in fablib data. """ log.debug(f"run_post_boot_tasks: {self.get_name()}") fablib_data = self.get_fablib_data() if "post_boot_tasks" in fablib_data: commands = fablib_data["post_boot_tasks"] else: commands = [] log.debug(f"run_post_boot_tasks: commands: {commands}") for command in commands: log.debug(f"run_post_boot_tasks: command: {command}") if command[0] == "execute": self.execute( self.render_template(command[1]), quiet=True, output_file=f"{log_dir}/{self.get_name()}.log", ) elif command[0] == "upload_file": log.debug(f"run_post_boot_tasks: upload_file: {command}") rtnval = self.upload_file(command[1], command[2]) log.debug(f"run_post_boot_tasks: upload_file rtnval: {rtnval}") elif command[0] == "upload_directory": log.debug(f"run_post_boot_tasks: upload_directory: {command}") rtnval = self.upload_directory(command[1], command[2]) log.debug(f"run_post_boot_tasks: upload_directory rtnval: {rtnval}") else: log.error(f"Invalid post boot command: {command}")
[docs] def run_post_update_commands(self, log_dir: str = "."): """ Run post-update commands. Called by :py:meth:`config()`. Post-update commands are list of commands associated with `post_update_commands` in fablib data. """ fablib_data = self.get_fablib_data() if "post_update_commands" in fablib_data: commands = fablib_data["post_update_commands"] else: commands = [] for command in commands: self.execute( command, quiet=True, output_file=f"{log_dir}/{self.get_name()}.log" )
[docs] def is_instantiated(self): """ Returns `True` if the node has been instantiated. """ fablib_data = self.get_fablib_data() if "instantiated" not in fablib_data: log.debug( f"is_instantiated False, {self.get_name()}, fablib_data['instantiated']: does not exist" ) return False if fablib_data["instantiated"] == "True": log.debug( f"is_instantiated True, {self.get_name()}, fablib_data['instantiated']: {fablib_data['instantiated']}" ) return True else: log.debug( f"is_instantiated False, {self.get_name()}, fablib_data['instantiated']: {fablib_data['instantiated']}" ) return False
[docs] def set_instantiated(self, instantiated: bool = True): """ Mark node as instantiated. Called by :py:meth:`config()`. """ fablib_data = self.get_fablib_data() fablib_data["instantiated"] = str(instantiated) self.set_fablib_data(fablib_data)
[docs] def run_update_commands(self): """ Returns `True` if `run_update_commands` flag is set. """ fablib_data = self.get_fablib_data() if fablib_data["run_update_commands"] == "True": return True else: return False
[docs] def set_run_update_commands(self, run_update_commands: bool = True): """ Set `run_update_commands` flag. """ fablib_data = self.get_fablib_data() fablib_data["run_update_commands"] = str(run_update_commands) self.set_fablib_data(fablib_data)
[docs] def config(self, log_dir=".", refresh: bool = False): """ Run configuration tasks for this node. :param refresh: Refresh the object with latest Fim info :type refresh: bool .. note :: Use this method in order to re-apply configuration to a rebooted node. Normally this method is invoked by ``Slice.submit()`` or ``Slice.modify()``. Configuration tasks include: - Setting hostname. - Configuring interfaces. - Configuring routes. - Running post-boot tasks added by ``add_post_boot_execute()``, ``add_post_boot_upload_file()``, and ``add_post_boot_upload_directory()``. - Running post-update commands added by ``add_post_update_command()``. """ self.execute(f"sudo hostnamectl set-hostname '{self.get_name()}'", quiet=True) for iface in self.get_interfaces(refresh=refresh): iface.config() self.config_routes() if not self.is_instantiated(): self.set_instantiated(True) self.run_post_boot_tasks() if self.run_update_commands(): self.run_post_update_commands() return "Done"
[docs] def add_fabnet( self, name="FABNET", net_type="IPv4", nic_type="NIC_Basic", routes=None, subnet: Optional[ipaddress.ip_network] = None, ): """ Add a simple layer 3 network to this node. :param name: a name for the network. Default is ``"FABNET"``. :param net_type: Network type, ``"IPv4"`` or ``"IPv6"``. :param nic_type: a NIC type. Default is ``"NIC_Basic"``. :param routes: a list of routes to add. Default is ``None``. :param subnet: Request a specific subnet for FabNetv4, FabNetv6 or FabNetv6Ext services. It's ignored for any other services. :type subnet: ipaddress.ip_network """ site = self.get_site() net_name = f"{name}_{net_type}_{site}" net = self.get_slice().get_network(net_name) if not net: net = self.get_slice().add_l3network( name=net_name, type=net_type, subnet=subnet, site=site ) # Add ccontrol plane network to node1 iface = self.add_component( model=nic_type, name=f"{net_name}_nic" ).get_interfaces()[0] net.add_interface(iface) iface.set_mode("auto") if routes: for route in routes: self.add_route(subnet=route, next_hop=net.get_gateway()) else: if net_type == "IPv4": self.add_route( subnet=self.get_fablib_manager().FABNETV4_SUBNET, next_hop=net.get_gateway(), ) elif net_type == "IPv6": self.add_route( subnet=self.get_fablib_manager().FABNETV6_SUBNET, next_hop=net.get_gateway(), )
[docs] def poa( self, operation: str, vcpu_cpu_map: List[Dict[str, str]] = None, node_set: List[str] = None, keys: List[Dict[str, str]] = None, bdf: List[str] = None, ) -> Union[Dict, str]: """ Perform operation action on a VM; an action which is triggered by CF via the Aggregate :param operation: operation to be performed :param vcpu_cpu_map: map virtual cpu to host cpu map :param node_set: list of numa nodes :param keys: list of ssh keys :param bdf: list of PCI Ids :raise Exception: in case of failure :return: State of POA or Dictionary containing the info, in case of INFO POAs """ result = ( self.get_fablib_manager() .get_manager() .poa( sliver_id=self.get_reservation_id(), operation=operation, vcpu_cpu_map=vcpu_cpu_map, node_set=node_set, keys=keys, bdf=bdf, ) ) log.info( f"POA {operation} completed for {self.get_reservation_id()}/{self.get_name()}: {type(result)}" ) return result
[docs] def get_cpu_info(self) -> dict: """ Get CPU Information for the Node and the host on which the VM is running :return: cpu info dict """ """ Host INFO looks like: {'Node 0': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Node 1': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Node 2': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Node 3': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Node 4': {'Heap': '0', 'Huge': '0', 'Private': '1', 'Stack': '0', 'Total': '1'}, 'Node 5': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Node 6': {'Heap': '6', 'Huge': '0', 'Private': '32812', 'Stack': '0', 'Total': '32817'}, 'Node 7': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Total': {'Heap': '6', 'Huge': '0', 'Private': '32813', 'Stack': '0', 'Total': '32818'}} VM INFO looks like: In this example below, no CPU pinning has been applied so CPU Affinity lists all the CPUs After the pinning has been applied, CPU Affinity would show only the pinned CPU [{'CPU': '116', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '20.2s', 'State': 'running', 'VCPU': '0'}, {'CPU': '118', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '9.0s', 'State': 'running', 'VCPU': '1'}, {'CPU': '117', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '8.9s', 'State': 'running', 'VCPU': '2'}, {'CPU': '119', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '8.8s', 'State': 'running', 'VCPU': '3'}, {'CPU': '52', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '0.8s', 'State': 'running', 'VCPU': '4'}, {'CPU': '88', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '0.4s', 'State': 'running', 'VCPU': '5'}, {'CPU': '54', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '1.4s', 'State': 'running', 'VCPU': '6'}, {'CPU': '55', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '0.8s', 'State': 'running', 'VCPU': '7'}, {'CPU': '116', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '1.1s', 'State': 'running', 'VCPU': '8'}, {'CPU': '117', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '1.3s', 'State': 'running', 'VCPU': '9'}, {'CPU': '113', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '0.5s', 'State': 'running', 'VCPU': '10'}, {'CPU': '119', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '1.4s', 'State': 'running', 'VCPU': '11'}, {'CPU': '116', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '0.7s', 'State': 'running', 'VCPU': '12'}, {'CPU': '53', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '2.1s', 'State': 'running', 'VCPU': '13'}, {'CPU': '117', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '1.3s', 'State': 'running', 'VCPU': '14'}, {'CPU': '49', 'CPU Affinity': '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127', 'CPU time': '0.8s', 'State': 'running', 'VCPU': '15'}] """ # Get CPU Info for the VM and Host on which VM resides cpu_info = self.poa(operation="cpuinfo") log.info(f"HOST CPU INFO: {cpu_info.get(self.get_host())}") log.info(f"Instance CPU INFO: {cpu_info.get(self.get_instance_name())}") if cpu_info == "Failed": raise SliceStateError("POA Failed to get CPU INFO") return cpu_info
[docs] def get_numa_info(self) -> dict: """ Get Numa Information for the Node and the host on which the VM is running :return: numa info dict """ """ Host INFO looks like: {'available': '8 nodes (0-7)', 'node 0': {'cpus': '0 1 2 3 4 5 6 7 64 65 66 67 68 69 70 71', 'free': '18366 MB', 'size': '63794 MB'}, 'node 1': {'cpus': '8 9 10 11 12 13 14 15 72 73 74 75 76 77 78 79', 'free': '61574 MB', 'size': '64466 MB'}, 'node 2': {'cpus': '16 17 18 19 20 21 22 23 80 81 82 83 84 85 86 87', 'free': '654 MB', 'size': '64507 MB'}, 'node 3': {'cpus': '24 25 26 27 28 29 30 31 88 89 90 91 92 93 94 95', 'free': '350 MB', 'size': '64495 MB'}, 'node 4': {'cpus': '32 33 34 35 36 37 38 39 96 97 98 99 100 101 102 103', 'free': '43491 MB', 'size': '64507 MB'}, 'node 5': {'cpus': '40 41 42 43 44 45 46 47 104 105 106 107 108 109 110 111', 'free': '46958 MB', 'size': '64507 MB'}, 'node 6': {'cpus': '48 49 50 51 52 53 54 55 112 113 114 115 116 117 118 119', 'free': '10348 MB', 'size': '64507 MB'}, 'node 7': {'cpus': '56 57 58 59 60 61 62 63 120 121 122 123 124 125 126 127', 'free': '63374 MB', 'size': '64506 MB'}} VM INFO looks like: {'Node 0': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Node 1': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Node 2': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Node 3': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Node 4': {'Heap': '0', 'Huge': '0', 'Private': '1', 'Stack': '0', 'Total': '1'}, 'Node 5': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Node 6': {'Heap': '6', 'Huge': '0', 'Private': '32812', 'Stack': '0', 'Total': '32817'}, 'Node 7': {'Heap': '0', 'Huge': '0', 'Private': '0', 'Stack': '0', 'Total': '0'}, 'Total': {'Heap': '6', 'Huge': '0', 'Private': '32813', 'Stack': '0', 'Total': '32818'}} """ # Get Numa Info for the VM and Host on which VM resides numa_info = self.poa(operation="numainfo") log.info(f"HOST Numa INFO: {numa_info.get(self.get_host())}") log.info(f"Instance Numa INFO: {numa_info.get(self.get_instance_name())}") if numa_info == "Failed": raise SliceStateError("POA Failed to get Numa INFO") return numa_info
[docs] def pin_cpu(self, component_name: str, cpu_range_to_pin: str = None): """ Pin the cpus for the VM to the numa node associated with the component. :param component_name: Component Name :param cpu_range_to_pin: range of the cpus to pin; example: 0-1 or 0 """ try: allocated_cpu_list = list(range(0, self.get_cores())) if cpu_range_to_pin is None: result_list = allocated_cpu_list else: start, end = map(int, cpu_range_to_pin.split("-")) result_list = list(range(start, end + 1)) set_cpu = set(allocated_cpu_list) if any(item not in set_cpu for item in result_list): raise ValidationError( f"Requested CPU range outside the Cores allocated {self.get_cores()} to Node" ) # Get CPU Info for the VM and Host on which VM resides cpu_info = self.get_cpu_info() pinned_cpus = cpu_info.get(self.get_host()).get("pinned_cpus") # Find Numa Node for the NIC numa_node = self.get_component(name=component_name).get_numa_node() # Find CPUs assigned to the numa node numa_cpu_range_str = cpu_info.get(self.get_host()).get( f"NUMA node{numa_node} CPU(s):" ) # Determine the CPU range belonging to the Numa Node numa_cpu_range = [] for r in numa_cpu_range_str.split(","): start, end = map(int, r.split("-")) numa_cpu_range.extend(map(str, range(start, end + 1))) # Exclude any Pinned CPUs available_cpus = list(set(numa_cpu_range) - set(pinned_cpus)) number_of_cpus_to_pin = len(result_list) number_of_available_cpus = len(available_cpus) # Verify Requested CPUs do not exceed Available CPUs if number_of_cpus_to_pin > number_of_available_cpus: msg = ( f"Not enough Host CPUs available to pin! Requested CPUs: {number_of_cpus_to_pin} " f"Available CPUs: {number_of_available_cpus}" ) log.warning(msg) number_of_cpus_to_pin = number_of_available_cpus if not number_of_cpus_to_pin: raise ValidationError(msg) # Build the VCPU to CPU Mapping vcpu_cpu_map = [] for x in range(number_of_cpus_to_pin): temp = {"vcpu": str(result_list[x]), "cpu": str(available_cpus[x])} vcpu_cpu_map.append(temp) msg = ( f"Pinning Node: {self.get_name()} CPUs for component: {component_name} to " f"Numa Node: {numa_node}" ) log.info(f"{msg} CPU Map: {vcpu_cpu_map}") print(msg) # Issue POA status = self.poa(operation="cpupin", vcpu_cpu_map=vcpu_cpu_map) if status == "Failed": raise SliceStateError("POA Failed") log.info(f"CPU Pinning complete for node: {self.get_name()}") except Exception as e: log.error(traceback.format_exc()) log.error(f"Failed to Pin CPU for node: {self.get_name()} e: {e}") raise e
[docs] def rescan_pci(self, component_name: str = None): """ Rescan PCI devices for a specific component or all components. :param component_name: Name of the component to rescan. If None, rescans all components. :raises RuntimeError: If no PCI devices are found or if the rescan operation fails. """ try: # Retrieve list of PCI addresses to rescan components = ( [self.get_component(component_name)] if component_name else self.get_components() ) if not components or any(c is None for c in components): raise ( ValueError(f"Component '{component_name}' not found.") if component_name else RuntimeError("No components found.") ) bdfs = [] for comp in components: pci_addr = comp.get_pci_addr() if pci_addr: pci_list = pci_addr if isinstance(pci_addr, list) else [pci_addr] # Skip Shared NICs if comp.get_type() == str(ComponentType.SharedNIC): continue bdfs.extend(pci_list) if not bdfs: raise RuntimeError("No PCI devices available to rescan on the node.") # Perform the PCI rescan status = self.poa(operation="rescan", bdf=bdfs) if isinstance(status, str) and status.lower() == "failed": raise RuntimeError("PCI rescan operation (POA) failed.") log.info(f"PCI rescan completed successfully for node: {self.get_name()}") except Exception as e: log.error(f"Failed PCI rescan for node {self.get_name()}: {e}") log.debug(traceback.format_exc()) raise
[docs] def os_reboot(self): """ Request Openstack to reboot the VM. NOTE: This is not same as rebooting the VM via reboot or init 6 command. Instead this is like openstack server reboot. """ status = self.poa(operation="reboot") if status == "Failed": raise SliceStateError("Failed to reboot the server") log.info(f"Node: {self.get_name()} rebooted!")
[docs] def numa_tune(self): """ Pin the memory for the VM to the numa node associated with the components """ try: # Get CPU Info for the VM and Host on which VM resides numa_info = self.get_numa_info() total_available_memory = 0 numa_nodes = [] for c in self.get_components(): # Find Numa Node for the NIC numa_node = c.get_numa_node() # Skip Numa node if already checked if numa_node in numa_nodes: continue log.info(f"Numa Node {numa_node} for component: {c.get_name()}") # Free Memory for the Numa Node numa_memory_free_str = ( numa_info.get(self.get_host()).get(f"node {numa_node}").get("free") ) log.info(f"Numa Node {numa_node} free memory: {numa_memory_free_str}") numa_memory_free = int(re.search(r"\d+", numa_memory_free_str).group()) log.info(f"Numa Node {numa_node} free memory: {numa_memory_free}") # Memory allocated to VM on the Numa Node log.info(f"VM memory: {numa_info.get(self.get_instance_name())}") log.info( f"VM memory: {numa_info.get(self.get_instance_name()).get(f'Node {numa_node}')}" ) vm_mem = ( numa_info.get(self.get_instance_name()) .get(f"Node {numa_node}") .get("Total") ) log.info(f"VM memory: {vm_mem}") # Exclude VM memory available_memory_on_node = int(numa_memory_free) + int(vm_mem) log.info(f"Available memory: {available_memory_on_node}") if available_memory_on_node <= 0: continue numa_nodes.append(numa_node) # Compute the total available Memory total_available_memory += available_memory_on_node requested_vm_memory = self.get_ram() * 1024 if requested_vm_memory > total_available_memory: raise ValidationError( f"Cannot numatune VM to Numa Nodes {numa_nodes}; requested memory " f"{requested_vm_memory} exceeds available: {total_available_memory}" ) msg = ( f"Numa tune Node: {self.get_name()} Memory to Numa Nodes: {numa_nodes}" ) log.info(msg) print(msg) # Issue POA status = self.poa(operation="numatune", node_set=numa_nodes) if status == "Failed": log.error(f"Numa tune failed for node: {self.get_name()}") else: log.info(f"Numa tune complete for node: {self.get_name()}") except Exception as e: log.error(traceback.format_exc()) log.error(f"Failed to Numa tune for node: {self.get_name()} e: {e}") raise e
[docs] def add_public_key( self, *, sliver_key_name: str = None, email: str = None, sliver_public_key: str = None, ): """ Add public key to a node; - Adds user's portal public key identified by sliver_key_name to the node - Adds portal public key identified by sliver_key_name for a user identified by email to the node - Add public key from the sliver_public_key to the node :param sliver_key_name: Sliver Key Name on the Portal :type sliver_key_name: str :param email: Email :type email: str :param sliver_public_key: Public sliver key :type sliver_public_key: str :raises Exception in case of errors """ self.__ssh_key_helper( sliver_key_name=sliver_key_name, sliver_public_key=sliver_public_key, email=email, )
[docs] def remove_public_key( self, *, sliver_key_name: str = None, email: str = None, sliver_public_key: str = None, ): """ Remove public key to a node; - Remove user's portal public key identified by sliver_key_name to the node - Remove portal public key identified by sliver_key_name for a user identified by email to the node - Remove public key from the sliver_public_key to the node :param sliver_key_name: Sliver Key Name on the Portal :type sliver_key_name: str :param email: Email :type email: str :param sliver_public_key: Public sliver key :type sliver_public_key: str :raises Exception in case of errors """ self.__ssh_key_helper( sliver_key_name=sliver_key_name, email=email, sliver_public_key=sliver_public_key, remove=True, )
def __ssh_key_helper( self, *, sliver_key_name: str = None, email: str = None, sliver_public_key: str = None, remove: bool = False, ): """ Add/Remove public key to a node; - Adds/Remove user's portal public key identified by sliver_key_name to the node - Adds/Remove portal public key identified by sliver_key_name for a user identified by email to the node - Add/Remove public key from the sliver_public_key to the node :param sliver_key_name: Sliver Key Name on the Portal :type sliver_key_name: str :param email: Email :type email: str :param sliver_public_key: Public sliver key :type sliver_public_key: str :param remove: Flag indicating if the key should be removed :type remove: bool :raises Exception in case of errors """ if sliver_key_name is None and sliver_public_key is None: raise ValueError( f"Either sliver_key_name: {sliver_key_name} or " f"sliver_public_key_file: {sliver_public_key} must be specified!" ) # Fetch the public key from portal if sliver_key_name is not None: ssh_keys = self.get_fablib_manager().get_manager().get_ssh_keys(email=email) found = None if ssh_keys is not None and len(ssh_keys): for item in ssh_keys: if sliver_key_name == item["comment"]: found = item break if not found: raise ResourceNotFoundError(f"Sliver key: {sliver_key_name} not found!") sliver_public_key = f"{found['ssh_key_type']} {found['public_key']}" operation = "addkey" if not remove else "removekey" key_dict = {"key": sliver_public_key, "comment": f"{operation}-by-poa-fablib"} status = self.poa(operation=operation, keys=[key_dict]) if status == "Failed": raise SliceStateError(f"Failed to {operation} the node") log.info(f"{operation} to the node {self.get_name()} successful!") print(f"{operation} to the node {self.get_name()} successful!")
[docs] def update(self, fim_node: FimNode): """Update this node with a new FIM node and refresh components/interfaces.""" if fim_node: self.fim_node = fim_node self._invalidate_cache() try: self.get_components(refresh=True) self.get_interfaces(refresh=True) except Exception as e: # Component/interface queries may fail if the topology is in # a transitional state (e.g. mid-modify). The FIM reference # is already updated; caches will rebuild on next access. log.debug( f"Node {self.get_name()}: error refreshing caches " f"during update: {e}" ) self._fim_dirty = False