#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2020 FABRIC Testbed
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Author: Paul Ruth (pruth@renci.org)
"""
This module exports a :class:`FablibManager` class and a
:class:`fablib` class available, which allows you to, among other
things:
- Query FABRIC testbed resources.
- Create, modify, and delete slices.
- Manage the SSH keys you use with FABRIC.
- etc.
In most cases you would need to create a :class:`FablibManager`
instance to interact with FABRIC testbed::
from fabrictestbed_extensions.fablib.fablib import FablibManager
fablib = FablibManager()
slice = fablib.new_slice(name="MySlice")
node = slice.add_node(name="node1")
slice.submit();
See FABRIC project's `Jupyter notebook examples <examples>`_ for more
complete code samples.
.. note::
Some configuration in the form of a configuration file, environment
variables, or :class:`FablibManager` constructor parameters is
required for the library to work. Please see the FABRIC project's
`documentation on getting started <learn>`_.
.. _learn: https://learn.fabric-testbed.net/article-categories/getting-started/
.. _examples: https://github.com/fabric-testbed/jupyter-examples/
"""
from __future__ import annotations
import datetime
import logging
import os
import random
import shutil
import sys
import tarfile
import threading
import warnings
from fabrictestbed.external_api.artifact_manager import Visibility
from fabrictestbed.fabric_manager_v2 import FabricManagerV2
from fss_utils.sshkey import FABRICSSHKey
from fabrictestbed_extensions.fablib.artifact import Artifact
from fabrictestbed_extensions.utils.ceph_fs_utils import CephFsUtils
warnings.filterwarnings("always", category=DeprecationWarning)
from concurrent.futures import ThreadPoolExecutor
from ipaddress import IPv4Network, IPv6Network
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import paramiko
from fabrictestbed_extensions.fablib.config.config import Config, ConfigException
from fabrictestbed_extensions.fablib.constants import Constants
from fabrictestbed_extensions.fablib.exceptions import SliceNotFoundError
from fabrictestbed_extensions.utils.utils import Utils
if TYPE_CHECKING:
from fabrictestbed_extensions.fablib.node import Node
from fabrictestbed.slice_manager import SliceState
from fabrictestbed_extensions.fablib.crease.crinkle import CrinkleSlice
from fabrictestbed_extensions.fablib.resources_v2 import ResourcesV2
from fabrictestbed_extensions.fablib.slice import Slice
log = logging.getLogger("fablib")
[docs]
class FablibManager(Config):
"""
The main class to use when interacting with the testbed.
"""
FABNETV4_SUBNET = IPv4Network("10.128.0.0/10")
FABNETV6_SUBNET = IPv6Network("2602:FCFB:00::/40")
FABNETV4EXT_SUBNET = IPv4Network("23.134.232.0/22")
FABNETV6EXT_SUBNET = IPv6Network("2602:FCFB:00::/40")
ssh_thread_pool_executor = None
def __init__(
self,
fabric_rc: str = None,
credmgr_host: str = None,
orchestrator_host: str = None,
core_api_host: str = None,
am_host: str = None,
ceph_mgr_host: str = None,
token_location: str = None,
id_token: str = None,
project_id: str = None,
bastion_username: str = None,
bastion_key_location: str = None,
log_level: str = Constants.DEFAULT_LOG_LEVEL,
log_file: str = Constants.DEFAULT_LOG_FILE,
log_propagate: bool = Constants.DEFAULT_LOG_PROPAGATE,
data_dir: str = Constants.DEFAULT_DATA_DIR,
execute_thread_pool_size: int = 64,
offline: bool = False,
auto_token_refresh: bool = True,
validate_config: bool = True,
no_ssh: bool = False,
raise_on_not_found: bool = False,
**kwargs,
):
"""
``FablibManager`` is the main interface to FABRIC services.
A ``FablibManager`` object is used to query FABRIC testbed for
available resources, create and configure slices, manage SSH
keys in nodes in slices and FABRIC's bastion host, etc. This
requires some configuration, which is gathered from:
- constructor parameters (highest priority)
- a configuration file (medium priority, optional)
- environment variables (low priority)
- defaults (lowest priority, if needed and when possible)
The configuration file is optional. You can provide configuration
entirely through constructor parameters and/or environment variables.
If using a config file, it typically would be located at
``"${HOME}/work/fabric_config/fabric_rc"``.
:param fabric_rc: Path to fablib configuration file. Optional.
Defaults to ``"${HOME}/work/fabric_config/fabric_rc"``, but
the file doesn't need to exist.
:param credmgr_host: Name of credential manager host.
:param orchestrator_host: Name of FABRIC orchestrator host.
:param core_api_host: Name of Core API host.
:param am_host: Name of Aggregate Manager host.
:param ceph_mgr_host: Name of ceph manager host.
:param token_location: Path to the file that contains your
FABRIC auth token.
:param id_token: ID token string to use directly (optional).
If provided, token_location file check is skipped and
auto_token_refresh is automatically disabled.
:param project_id: Your FABRIC project ID, obtained from
https://cm.fabric-testbed.net/, usually via FABRIC portal.
:param bastion_username: Your username on FABRIC bastion host,
obtained from FABRIC portal.
:param bastion_key_location: Path to your bastion SSH key.
:param log_file: Path where fablib logs are written; defaults
to ``"/tmp/fablib/fablib.log"``.
:param log_level: Level of detail in the logs written.
Defaults to ``"DEBUG"``; other possible log levels are
``"INFO"``, ``"WARNING"``, ``"ERROR"``, and
``"CRITICAL"``, in reducing order of verbosity.
:param log_propagate: Whether fablib logs propagate to the root logger.
Defaults to ``False`` to avoid duplicate logging in notebooks.
:param data_dir: directory for fablib to store temporary data.
:param output: Format of fablib output; can be either
``"pandas"`` or ``"text"``. Defaults to ``"pandas"`` in a
Jupyter notebook environment; ``"text"`` otherwise.
:param execute_thread_pool_size: Number of worker threads in
the thread pool fablib uses to execute commands in nodes.
Defaults to 64.
:param offline: Avoid using FABRIC services when initializing.
This is ``False`` by default, and set to ``True`` only in
some unit tests.
:param auto_token_refresh: Auto refresh tokens (automatically disabled if id_token is provided)
:param validate_config: Whether to verify and persist configuration during initialization.
:param no_ssh: Disable SSH operations. When True, skips SSH-related initialization
(thread pool, bastion probing, SSH key validation) and prevents SSH operations
on nodes. Useful for API-only mode (e.g., MCP server). Can also be set via
the ``FABRIC_NO_SSH`` environment variable.
:param raise_on_not_found: Global default for singular getter methods
(``get_network``, ``get_interface``, etc.). When ``True``, these
methods raise ``ResourceNotFoundError`` if the resource is not found;
when ``False`` (default), they return ``None``. Individual calls
can override this via their ``raise_exception`` parameter.
"""
# If id_token is provided, disable auto_token_refresh
if id_token is not None:
auto_token_refresh = False
super().__init__(
fabric_rc=fabric_rc,
credmgr_host=credmgr_host,
orchestrator_host=orchestrator_host,
core_api_host=core_api_host,
am_host=am_host,
ceph_mgr_host=ceph_mgr_host,
token_location=token_location,
id_token=id_token,
project_id=project_id,
bastion_username=bastion_username,
bastion_key_location=bastion_key_location,
log_level=log_level,
log_file=log_file,
log_propagate=log_propagate,
data_dir=data_dir,
offline=offline,
no_ssh=no_ssh,
**kwargs,
)
self.manager = None
self.resources = None
self.auto_token_refresh = auto_token_refresh
self.last_resources_filtered_by_time = False
self.raise_on_not_found = raise_on_not_found
self.setup_logging()
self._offline = offline
self._validate_config = validate_config
self._manager_built = False
self._project_tags_cache: Optional[frozenset] = None
self._execute_thread_pool_size = execute_thread_pool_size
if not offline:
if not self.get_no_ssh():
self.ssh_thread_pool_executor = ThreadPoolExecutor(
execute_thread_pool_size
)
self.required_check()
self.lock = threading.Lock()
# These dictionaries are maintained to keep cache of the slice objects created
# Use the same objects when user queries for slices
# This was added to address the concerns for
# https://github.com/fabric-testbed/fabrictestbed-extensions/issues/379
self.__slices_by_name = {}
self.__slices_by_id = {}
[docs]
def cache_slice(self, slice_object: Slice):
"""
Caches a Slice object by its name and ID.
Adds the given slice object to both the `__slices_by_name` and `__slices_by_id`
dictionaries for quick retrieval based on either its name or its ID.
If a slice with the same name already exists in cache, it will be replaced.
This ensures that the cache always contains the most recent slice object.
:param slice_object: The Slice object to be cached.
:type slice_object: Slice
"""
with self.lock:
name = slice_object.get_name()
slice_id = slice_object.get_slice_id()
# Check if there's an existing entry with this name but different ID
# and clean up the old ID mapping
if name in self.__slices_by_name:
old_slice = self.__slices_by_name[name]
old_id = old_slice.get_slice_id()
if old_id and old_id != slice_id and old_id in self.__slices_by_id:
del self.__slices_by_id[old_id]
self.__slices_by_name[name] = slice_object
if slice_id:
self.__slices_by_id[slice_id] = slice_object
[docs]
def update_slice_cache_id(self, slice_object: Slice):
"""
Updates the cache when a slice gets a new ID (e.g., after submit).
This should be called after a slice is submitted and receives its slice_id.
:param slice_object: The Slice object whose ID was updated.
:type slice_object: Slice
"""
with self.lock:
slice_id = slice_object.get_slice_id()
if slice_id and slice_id not in self.__slices_by_id:
self.__slices_by_id[slice_id] = slice_object
[docs]
def remove_slice_from_cache(self, slice_object: Slice):
"""
Removes a Slice object from the cache by its name and ID.
Removes the slice from both `__slices_by_name` and `__slices_by_id` if present.
:param slice_object: The Slice object to be removed from the cache.
:type slice_object: Slice
"""
with self.lock:
if (
slice_object.get_slice_id()
and slice_object.get_slice_id() in self.__slices_by_id
):
self.__slices_by_id.pop(slice_object.get_slice_id())
if (
slice_object.get_name()
and slice_object.get_name() in self.__slices_by_name
):
self.__slices_by_name.pop(slice_object.get_name())
def _get_slice_from_cache(
self, slice_id: str = None, slice_name: str = None
) -> Optional[Slice]:
"""
Retrieves a Slice object from the cache by its ID or name.
Returns the cached slice if it exists, based on either the slice ID or name.
:param slice_id: The ID of the slice to retrieve.
:param slice_name: The name of the slice to retrieve.
:return: The Slice object if found, or None.
:rtype: Slice
"""
with self.lock:
if slice_id:
return self.__slices_by_id.get(slice_id)
elif slice_name:
return self.__slices_by_name.get(slice_name)
[docs]
def close(self):
"""
Clean up resources including the SSH thread pool executor.
Call this when done using the FablibManager instance, or use
the context manager protocol instead::
with FablibManager() as fablib:
# use fablib
# resources automatically cleaned up
"""
# Close cached SSH connections on all slices
if hasattr(self, "slice_cache") and self.slice_cache:
for slice_obj in self.slice_cache.values():
try:
slice_obj.close_ssh()
except Exception:
pass
if self.ssh_thread_pool_executor:
self.ssh_thread_pool_executor.shutdown(wait=False)
self.ssh_thread_pool_executor = None
# Close log handlers to avoid ResourceWarning from unclosed file handles
for handler in self.log.handlers[:]:
try:
handler.close()
self.log.removeHandler(handler)
except Exception:
pass
def __enter__(self):
"""Context manager entry point."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit point - cleans up resources."""
self.close()
return False
[docs]
def get_image_names(self) -> dict[str, dict]:
"""
Gets a list of available image names.
:return: Dictionary of images with default user and description
:rtype: dict[str, dict]
"""
return Config.get_image_names()
[docs]
def get_default_slice_key(self) -> dict[str, str]:
"""
Gets the current default_slice_keys as a dictionary containing the
public and private slice keys.
Important! Slice key management is under development and this
functionality will likely change going forward.
:return: default_slice_key dictionary
:rtype: dict[str, str]
"""
return {
"slice_public_key": self.get_default_slice_public_key(),
"slice_private_key": self.get_default_slice_private_key(),
"slice_public_key_file": self.get_default_slice_public_key_file(),
"slice_private_key_file": self.get_default_slice_private_key_file(),
}
[docs]
def validate_config(self):
"""
Validate and create Fablib config - checks if all the required
configuration exists for slice provisioning to work
successfully
- Checks Credential Manager Host is configured properly
- Checks Orchestrator Host is configured properly
- Checks Core API Host is configured properly
- Checks Bastion Host is configured properly
- Check Sliver keys exist; create sliver keys if they do
not exist
- Check Bastion keys exist and are not expired;
update/create bastion keys if expired or do not exist
- Check Bastion Username is configured
- Check Project Id is configured
.. deprecated:: 1.6.5 Use `verify_and_configure()` instead.
@raises Exception if the configuration is invalid
"""
warnings.warn(
"This function is deprecated and will be removed in future releases, "
"please use 'verify_and_configure' instead.",
DeprecationWarning,
stacklevel=2,
)
self.verify_and_configure()
[docs]
def get_user_info(self) -> dict:
"""
Get User information
:return: returns a dictionary containing User's Information
:rtype: dict
"""
return self.get_manager().get_user_info()
[docs]
def determine_bastion_username(self):
"""
Determine Bastion Username.
Query User Information from Core API and updates the bastion username
"""
# Fetch User Info and Projects
if self.get_bastion_username() is not None:
return
log.info("Fetching User's information")
user_info = self.get_user_info()
log.debug("Updating Bastion User Name")
self.set_bastion_username(
bastion_username=user_info.get(Constants.BASTION_LOGIN)
)
[docs]
def create_ssh_tunnel_config(self, overwrite: bool = False):
"""
Create zip file containing SSH keys and SSH config file to use for SSH tunnels
:param overwrite: overwrite the configuration if True, return otherwise
:type overwrite: bool
"""
bastion_ssh_config_file = self.get_bastion_ssh_config_file()
if bastion_ssh_config_file is None:
raise ConfigException("Bastion SSH Config File location not specified")
bastion_key_file = self.get_bastion_key_location()
if bastion_key_file is None or not os.path.exists(bastion_key_file):
raise ConfigException(
"Bastion SSH Key File location not specified or the file does not exist"
)
dir_path = os.path.dirname(bastion_ssh_config_file)
file_name = os.path.basename(bastion_ssh_config_file)
dir_path = os.path.join(dir_path, "fabric_ssh_tunnel_tools")
bastion_key_file_name = os.path.basename(bastion_key_file)
if os.path.exists(dir_path) and not overwrite:
log.info(
f"{dir_path} already exists and overwrite=False. Skipping creation."
)
return
if not os.path.exists(dir_path):
try:
os.makedirs(dir_path)
except OSError as e:
msg = f"Failed to create directory {dir_path}: {e}"
log.error(msg)
raise Exception(msg)
# Copy private key
dest_key_path = os.path.join(dir_path, bastion_key_file_name)
shutil.copy2(bastion_key_file, dest_key_path)
# Copy public key if it exists
public_key_file = f"{bastion_key_file}.pub"
if os.path.exists(public_key_file):
shutil.copy2(
public_key_file,
os.path.join(dir_path, os.path.basename(public_key_file)),
)
# Get slice public key path
slice_pub_key_file = self.get_default_slice_public_key_file()
if slice_pub_key_file is None or not os.path.exists(slice_pub_key_file):
raise ConfigException(
"Slice public key file is not specified or does not exist"
)
# Derive slice private key path by removing ".pub"
slice_key_file = slice_pub_key_file[:-4] # removes ".pub"
if not os.path.exists(slice_key_file):
raise ConfigException("Slice private key file does not exist")
# Copy slice public and private keys
shutil.copy2(
slice_key_file, os.path.join(dir_path, os.path.basename(slice_key_file))
)
shutil.copy2(
slice_pub_key_file,
os.path.join(dir_path, os.path.basename(slice_pub_key_file)),
)
# Write SSH config
ssh_config_path = os.path.join(dir_path, file_name)
with open(ssh_config_path, "w") as f:
f.write(
f"""UserKnownHostsFile /dev/null
StrictHostKeyChecking no
ServerAliveInterval 120
Host bastion.fabric-testbed.net
User {self.get_bastion_username()}
ForwardAgent yes
Hostname %h
IdentityFile {bastion_key_file_name}
IdentitiesOnly yes
Host * !bastion.fabric-testbed.net
ProxyJump {self.get_bastion_username()}@bastion.fabric-testbed.net:22
"""
)
# Tar the directory
tgz_path = f"{dir_path}.tgz"
with tarfile.open(tgz_path, "w:gz") as tar:
tar.add(dir_path, arcname=os.path.basename(dir_path))
# Usage instructions
msg = f"""
SSH tunnel config created and zipped at: {tgz_path}
Download Instructions:
Download your custom `fabric_ssh_tunnel_tools.tgz` file from the `fabric_config` folder.
Usage Instructions:
1. Unzip the archive and place the resulting `fabric_ssh_tunnel_tools/` folder somewhere accessible from your terminal.
2. Open a terminal window (on Windows, use PowerShell).
3. Use `cd` to navigate into the `fabric_ssh_tunnel_tools` folder.
4. In your terminal, run the SSH tunnel command generated by the next notebook cell.
"""
print(msg)
log.info(f"SSH tunnel config created at {tgz_path}")
[docs]
def create_ssh_config(self, overwrite: bool = False):
"""
Create SSH config file
:param overwrite: overwrite the configuration if True, return otherwise
:type overwrite: bool
"""
bastion_ssh_config_file = self.get_bastion_ssh_config_file()
if bastion_ssh_config_file is None:
raise ConfigException("Bastion SSH Config File location not specified")
if os.path.exists(bastion_ssh_config_file) and not overwrite:
print("Bastion SSH Config file already exists, not making updates!")
return
dir_path = os.path.dirname(bastion_ssh_config_file)
if not os.path.exists(dir_path):
try:
os.makedirs(dir_path)
except OSError as e:
msg = (
f"Directory {dir_path} does not exist, Failed to create directory {dir_path}: {e}, "
f"can not create ssh_config file!"
)
print(msg)
log.error(msg)
raise Exception(msg)
with open(bastion_ssh_config_file, "w") as f:
f.write(
f"""UserKnownHostsFile /dev/null
StrictHostKeyChecking no
ServerAliveInterval 120
Host bastion.fabric-testbed.net
User {self.get_bastion_username()}
ForwardAgent yes
Hostname %h
IdentityFile {self.get_bastion_key_location()}
IdentitiesOnly yes
Host * !bastion.fabric-testbed.net
ProxyJump {self.get_bastion_username()}@bastion.fabric-testbed.net:22
"""
)
[docs]
def validate_and_update_bastion_keys(self, validate_only: bool = False):
"""
Validate Bastion Key; if key does not exist or is expired, it creates bastion keys
:param validate_only: flag to specify to only do config validation
:type validate_only: bool
"""
log.info("Fetching User's information")
user_info = self.get_user_info()
log.debug("Updating Bastion User Name")
ssh_keys = user_info.get(Constants.SSH_KEYS)
current_bastion_key = self.get_bastion_public_key()
keys_to_remove = []
for key in ssh_keys:
expires_on = key.get(Constants.EXPIRES_ON)
expires_on_dt = datetime.datetime.fromisoformat(expires_on)
now = datetime.datetime.now(tz=datetime.timezone.utc)
if now > expires_on_dt:
keys_to_remove.append(key)
continue
key_type = key.get(Constants.FABRIC_KEY_TYPE)
if key_type and key_type != Constants.KEY_TYPE_BASTION:
keys_to_remove.append(key)
continue
for key in keys_to_remove:
ssh_keys.remove(key)
found = False
if current_bastion_key:
fabric_ssh_key = FABRICSSHKey(current_bastion_key)
found = any(
item["fingerprint"] == fabric_ssh_key.get_fingerprint()
for item in ssh_keys
)
if current_bastion_key is not None and found:
log.info(f"User: {user_info.get(Constants.EMAIL)} bastion key is valid!")
print(f"User: {user_info.get(Constants.EMAIL)} bastion key is valid!")
return
msg = f"User: {user_info.get(Constants.EMAIL)} bastion keys do not exist or are expired."
log.info(msg)
print(msg)
if not validate_only:
self.create_bastion_keys(overwrite=True)
else:
print("Please call `verify_and_configure` to renew your bastion keys!")
[docs]
def create_bastion_keys(
self,
*,
bastion_key_location: str = None,
store_pubkey: bool = True,
overwrite: bool = False,
):
"""
Create Bastion Keys
:param bastion_key_location: bastion key location
:type bastion_key_location: str
:param store_pubkey: flag indicating if the public key should be saved
:type store_pubkey: bool
:param overwrite: overwrite the bastion key file if it exists already
:type overwrite: bool
"""
if bastion_key_location is None:
bastion_key_location = self.get_bastion_key_location()
if os.path.exists(bastion_key_location) and not overwrite:
log.info(
f"Bastion keys already exist at the location: {bastion_key_location}"
)
print(f"Bastion keys already exist at the location: {bastion_key_location}")
return
log.info("Bastion Key does not exist, creating a bastion key!")
self.__create_and_save_key(
private_file_path=bastion_key_location,
description="Bastion Key Fablib",
key_type=Constants.KEY_TYPE_BASTION,
store_pubkey=store_pubkey,
)
log.info(f"Bastion Key saved at location: {bastion_key_location}")
print(f"Bastion Key saved at location: {bastion_key_location}")
[docs]
def create_sliver_keys(
self,
*,
sliver_priv_key_location: str = None,
store_pubkey: bool = True,
overwrite: bool = False,
):
"""
Create Sliver Keys
:param sliver_priv_key_location: sliver key location
:type sliver_priv_key_location: str
:param store_pubkey: flag indicating if the public key should be saved
:type store_pubkey: bool
:param overwrite: overwrite the bastion key file if it exists already
:type overwrite: bool
"""
if sliver_priv_key_location is None:
sliver_priv_key_location = self.get_default_slice_private_key_file()
if os.path.exists(sliver_priv_key_location) and not overwrite:
log.info(
f"Sliver keys already exist at the location: {sliver_priv_key_location}"
)
print(
f"Sliver keys already exist at the location: {sliver_priv_key_location}"
)
return
log.info("Creating sliver key!")
self.__create_and_save_key(
private_file_path=sliver_priv_key_location,
description="Sliver Key Fablib",
store_pubkey=store_pubkey,
key_type=Constants.KEY_TYPE_SLIVER,
)
log.info(f"Sliver Keys saved at location: {sliver_priv_key_location}")
print(f"Sliver Keys saved at location: {sliver_priv_key_location}")
def __create_and_save_key(
self,
private_file_path: str,
description: str,
key_type: str,
public_file_path: str = None,
store_pubkey: bool = False,
):
"""
Create Key and save key
:param private_file_path: private key location
:type private_file_path: str
:param description: description
:type description: str
:param key_type: key type bastion or sliver
:type key_type: str
:param public_file_path: public key location
:type public_file_path: str
:param store_pubkey flag indicating if the public key should be saved
:type store_pubkey: bool
"""
dir_path = os.path.dirname(private_file_path)
if not os.path.exists(dir_path):
try:
os.makedirs(dir_path)
except OSError as e:
msg = (
f"Directory {dir_path} does not exist! Failed to create directory {dir_path}: {e}, "
f"cannot create {key_type} keys!"
)
print(msg)
log.error(msg)
raise Exception(msg)
comment = os.path.basename(private_file_path)
ssh_keys = self.get_manager().create_ssh_keys(
key_type=key_type,
description=description,
comment=comment,
store_pubkey=store_pubkey,
)
if public_file_path is None:
public_file_path = f"{private_file_path}.pub"
Utils.save_to_file(
file_path=private_file_path, data=ssh_keys[0].get(Constants.PRIVATE_OPENSSH)
)
Utils.save_to_file(
file_path=public_file_path, data=ssh_keys[0].get(Constants.PUBLIC_OPENSSH)
)
# Set the permissions to the files
# Private Key file permissions
os.chmod(private_file_path, 0o600)
# Public Key file permissions
os.chmod(public_file_path, 0o644)
[docs]
def get_ssh_thread_pool_executor(self) -> ThreadPoolExecutor:
"""
Get :py:class:`ThreadPoolExecutor` that runs SSH commands.
"""
return self.ssh_thread_pool_executor
def __build_manager(self) -> FabricManagerV2:
"""
Not a user facing API call.
Creates a new FabricManagerV2 object.
:return: a new FabricManagerV2
:rtype: FabricManagerV2
"""
try:
log.info(
f"orchestrator_host={self.get_orchestrator_host()},"
f"credmgr_host={self.get_credmgr_host()},"
f"core_api_host={self.get_core_api_host()},"
f"am_host={self.get_am_host()},"
f"project_id={self.get_project_id()},"
f"token_location={self.get_token_location()},"
f"initialize=True,"
f"scope='all'"
)
Utils.is_reachable(hostname=self.get_credmgr_host())
Utils.is_reachable(hostname=self.get_orchestrator_host())
Utils.is_reachable(hostname=self.get_core_api_host())
# Use id_token if provided, otherwise use token_location
if self.get_id_token():
self.manager = FabricManagerV2(
credmgr_host=self.get_credmgr_host(),
orchestrator_host=self.get_orchestrator_host(),
core_api_host=self.get_core_api_host(),
am_host=self.get_am_host(),
id_token=self.get_id_token(),
project_id=self.get_project_id(),
auto_refresh=False,
no_write=True,
)
else:
self.manager = FabricManagerV2(
credmgr_host=self.get_credmgr_host(),
orchestrator_host=self.get_orchestrator_host(),
core_api_host=self.get_core_api_host(),
am_host=self.get_am_host(),
token_location=self.get_token_location(),
project_id=self.get_project_id(),
auto_refresh=self.auto_token_refresh,
)
log.debug("Fabric manager initialized!")
# Update Project ID to be same as in Slice Manager
self.set_project_id(project_id=self.manager.get_project_id())
self.runtime_config[Constants.PROJECT_NAME] = (
self.manager.get_project_name()
)
self.determine_bastion_username()
except Exception as e:
log.error(e, exc_info=True)
raise e
return self.manager
[docs]
def get_site_names(self) -> List[str]:
"""
Gets a list of all available site names.
:return: list of site names as strings
:rtype: list[str]
"""
return self.get_resources().get_site_names()
[docs]
def list_sites(
self,
output: Optional[str] = None,
fields: Optional[str] = None,
quiet: Optional[bool] = False,
filter_function=None,
update: Optional[bool] = False,
pretty_names: Optional[bool] = True,
force_refresh: Optional[bool] = False,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
avoid: Optional[List[str]] = None,
includes: Optional[List[str]] = None,
) -> object:
"""
Lists all the sites and their attributes.
There are several output options: "text", "pandas", and "json" that determine the format of the
output that is returned and (optionally) displayed/printed.
output: 'text': string formatted with tabular
'pandas': pandas dataframe
'json': string in json format
fields: json output will include all available fields/columns.
Example: fields=['Name','ConnectX-5 Available', 'NVMe Total']
filter_function: A lambda function to filter data by field values.
Example: filter_function=lambda s: s['ConnectX-5 Available'] > 3 and s['NVMe Available'] <= 10
:param output: output format
:type output: str
:param fields: list of fields (table columns) to show
:type fields: List[str]
:param quiet: True to specify printing/display
:type quiet: bool
:param filter_function: lambda function
:type filter_function: lambda
:return: table in format specified by output parameter
:param update:
:type update: bool
:param pretty_names:
:type pretty_names: bool
:param force_refresh:
:type force_refresh: bool
:param latlon: convert address to latlon, makes online call to openstreetmaps.org
:type: Object
:param start: start time in UTC format: %Y-%m-%d %H:%M:%S %z
:type: datetime
:param end: end time in UTC format: %Y-%m-%d %H:%M:%S %z
:type: datetime
:param avoid: list of sites to avoid
:type: list of string
:param includes: list of sites to include
:type: list of string
"""
return self.get_resources(
update=update,
force_refresh=force_refresh,
start=start,
end=end,
avoid=avoid,
includes=includes,
).list_sites(
output=output,
fields=fields,
quiet=quiet,
filter_function=filter_function,
pretty_names=pretty_names,
)
[docs]
def list_hosts(
self,
output: Optional[str] = None,
fields: Optional[str] = None,
quiet: Optional[bool] = False,
filter_function=None,
update: Optional[bool] = False,
pretty_names: Optional[bool] = True,
force_refresh: Optional[bool] = False,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
avoid: Optional[List[str]] = None,
includes: Optional[List[str]] = None,
) -> object:
"""
Lists all the hosts and their attributes.
There are several output options: "text", "pandas", and "json" that determine the format of the
output that is returned and (optionally) displayed/printed.
output: 'text': string formatted with tabular
'pandas': pandas dataframe
'json': string in json format
fields: json output will include all available fields/columns.
Example: fields=['Name','ConnectX-5 Available', 'NVMe Total']
filter_function: A lambda function to filter data by field values.
Example: filter_function=lambda s: s['ConnectX-5 Available'] > 3 and s['NVMe Available'] <= 10
:param output: output format
:type output: str
:param fields: list of fields (table columns) to show
:type fields: List[str]
:param quiet: True to specify printing/display
:type quiet: bool
:param filter_function: lambda function
:type filter_function: lambda
:return: table in format specified by output parameter
:param update:
:type update: bool
:param pretty_names:
:type pretty_names: bool
:param force_refresh:
:type force_refresh: bool
:param start: start time in UTC format: %Y-%m-%d %H:%M:%S %z
:type: datetime
:param end: end time in UTC format: %Y-%m-%d %H:%M:%S %z
:type: datetime
:param avoid: list of sites to avoid
:type: list of string
:param includes: list of sites to include
:type: list of string
"""
return self.get_resources(
update=update,
force_refresh=force_refresh,
start=start,
end=end,
avoid=avoid,
includes=includes,
).list_hosts(
output=output,
fields=fields,
quiet=quiet,
filter_function=filter_function,
pretty_names=pretty_names,
)
[docs]
def list_links(
self,
output: Optional[str] = None,
fields: Optional[str] = None,
quiet: Optional[bool] = False,
filter_function=None,
update: Optional[bool] = True,
pretty_names: Optional[bool] = True,
) -> object:
"""
Lists all the links and their attributes.
There are several output options: "text", "pandas", and "json" that determine the format of the
output that is returned and (optionally) displayed/printed.
output: 'text': string formatted with tabular
'pandas': pandas dataframe
'json': string in json format
fields: json output will include all available fields/columns.
Example: TODO
filter_function: A lambda function to filter data by field values.
Example: filter_function=lambda s: s['ConnectX-5 Available'] > 3 and s['NVMe Available'] <= 10
:param output: output format
:type output: str
:param fields: list of fields (table columns) to show
:type fields: List[str]
:param quiet: True to specify printing/display
:type quiet: bool
:param filter_function: lambda function
:type filter_function: lambda
:param update:
:type update: bool
:param pretty_names:
:type pretty_names: bool
:return: table in format specified by output parameter
:rtype: Object
"""
return self.get_links(update=update).list_links(
output=output,
fields=fields,
quiet=quiet,
filter_function=filter_function,
pretty_names=pretty_names,
)
[docs]
def list_facility_ports(
self,
output: Optional[str] = None,
fields: Optional[str] = None,
quiet: Optional[bool] = False,
filter_function=None,
update: Optional[bool] = False,
pretty_names: Optional[bool] = True,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
) -> object:
"""
Lists all the facility ports and their attributes.
There are several output options: "text", "pandas", and "json" that determine the format of the
output that is returned and (optionally) displayed/printed.
output: 'text': string formatted with tabular
'pandas': pandas dataframe
'json': string in json format
fields: json output will include all available fields/columns.
Example: TODO
filter_function: A lambda function to filter data by field values.
Example: filter_function=lambda s: s['ConnectX-5 Available'] > 3 and s['NVMe Available'] <= 10
:param output: output format
:type output: str
:param fields: list of fields (table columns) to show
:type fields: List[str]
:param quiet: True to specify printing/display
:type quiet: bool
:param filter_function: lambda function
:type filter_function: lambda
:param update:
:type update: bool
:param pretty_names:
:type pretty_names: bool
:param start: start time in UTC format: %Y-%m-%d %H:%M:%S %z
:type: datetime
:param end: end time in UTC format: %Y-%m-%d %H:%M:%S %z
:type: datetime
:return: table in format specified by output parameter
:rtype: Object
"""
return self.get_facility_ports(
update=update, start=start, end=end
).list_facility_ports(
output=output,
fields=fields,
quiet=quiet,
filter_function=filter_function,
pretty_names=pretty_names,
)
[docs]
def show_config(
self,
output: Optional[str] = None,
fields: list[str] = None,
quiet: Optional[bool] = False,
pretty_names: Optional[bool] = True,
):
"""
Show a table containing the current FABlib configuration parameters.
There are several output options: "text", "pandas", and "json" that determine the format of the
output that is returned and (optionally) displayed/printed.
output: 'text': string formatted with tabular
'pandas': pandas dataframe
'json': string in json format
fields: json output will include all available fields.
Example: fields=['credmgr_host','project_id', 'fablib_log_file']
:param output: output format
:type output: str
:param fields: list of fields to show
:type fields: List[str]
:param quiet: True to specify printing/display
:type quiet: bool
:param pretty_names:
:type pretty_names: bool
:return: table in format specified by output parameter
:rtype: Object
"""
if pretty_names:
pretty_names_dict = self.get_config_pretty_names_dict()
else:
pretty_names_dict = {}
return Utils.show_table(
self.get_config(),
fields=fields,
title="FABlib Config",
output=output,
quiet=quiet,
pretty_names_dict=pretty_names_dict,
)
[docs]
def show_site(
self,
site_name: str,
output: Optional[str] = None,
fields: Optional[list[str]] = None,
quiet: Optional[bool] = False,
pretty_names: Optional[bool] = True,
):
"""
Show a table with all the properties of a specific site
There are several output options: "text", "pandas", and "json" that determine the format of the
output that is returned and (optionally) displayed/printed.
output: 'text': string formatted with tabular
'pandas': pandas dataframe
'json': string in json format
fields: json output will include all available fields.
Example: fields=['credmgr_host','project_id', 'fablib_log_file']
:param site_name: the name of a site
:type site_name: str
:param output: output format
:type output: str
:param fields: list of fields to show
:type fields: List[str]
:param quiet: True to specify printing/display
:type quiet: bool
:param pretty_names:
:type pretty_names: bool
:param latlon: convert address to lat/lon
:type latlon: bool
:return: table in format specified by output parameter
:rtype: Object
"""
return str(
self.get_resources().show_site(
site_name,
fields=fields,
output=output,
quiet=quiet,
pretty_names=pretty_names,
)
)
[docs]
def get_links(self, update: bool = True) -> ResourcesV2:
"""
Get the resources object (which includes links).
:param update: whether to refresh resources
:return: ResourcesV2Wrapper
"""
return self.get_resources(update=update)
[docs]
def get_facility_ports(
self,
update: Optional[bool] = False,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
) -> ResourcesV2:
"""
Get the resources object (which includes facility ports).
:param update: whether to refresh resources
:param start: start time in UTC format: %Y-%m-%d %H:%M:%S %z
:param end: end time in UTC format: %Y-%m-%d %H:%M:%S %z
:return: ResourcesV2Wrapper
"""
if not update:
if start or end:
update = True
self.last_resources_filtered_by_time = True
elif self.last_resources_filtered_by_time:
update = True
self.last_resources_filtered_by_time = False
return self.get_resources(update=update, start=start, end=end)
[docs]
def get_resources(
self,
update: Optional[bool] = False,
force_refresh: Optional[bool] = False,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
avoid: Optional[List[str]] = None,
includes: Optional[List[str]] = None,
) -> ResourcesV2:
"""
Get a reference to the resources object. The resources object
is used to query for available resources and capacities.
:param update:
:type update: bool
:param force_refresh:
:type force_refresh: bool
:param start: start time in UTC format: %Y-%m-%d %H:%M:%S %z
:type: datetime
:param end: end time in UTC format: %Y-%m-%d %H:%M:%S %z
:type: datetime
:param avoid: list of sites to avoid
:type: list of string
:param includes: list of sites to include
:type: list of string
:return: the resources object
:rtype: ResourcesV2
"""
if not update:
if start or end:
update = True
self.last_resources_filtered_by_time = True
elif self.last_resources_filtered_by_time:
update = True
self.last_resources_filtered_by_time = False
return self.get_available_resources(
update=update,
force_refresh=force_refresh,
start=start,
end=end,
avoid=avoid,
includes=includes,
)
@staticmethod
def _slice_to_resources(slice_object: "Slice") -> List[Dict[str, Any]]:
"""
Convert an unsubmitted Slice object into the resource requirement
list expected by the find-slot API.
:param slice_object: a fablib Slice built with add_node/add_l2network/etc.
:return: list of resource dicts
"""
resources: List[Dict[str, Any]] = []
# --- Compute nodes: aggregate by site ---
site_compute: Dict[str, Dict[str, Any]] = {}
for node in slice_object.get_nodes():
site = node.get_site()
if site not in site_compute:
site_compute[site] = {
"type": "compute",
"site": site,
"cores": 0,
"ram": 0,
"disk": 0,
"components": {},
}
entry = site_compute[site]
entry["cores"] += node.get_requested_cores() or 0
entry["ram"] += node.get_requested_ram() or 0
entry["disk"] += node.get_requested_disk() or 0
for comp in node.get_components():
comp_type = comp.get_type()
fim_model = comp.get_fim_model()
if comp_type and fim_model:
# Build key matching the orchestrator summary format:
# "{ComponentType}-{model}" e.g. "SmartNIC-ConnectX-5"
comp_key = f"{comp_type}-{fim_model}"
entry["components"][comp_key] = (
entry["components"].get(comp_key, 0) + 1
)
for entry in site_compute.values():
if not entry["components"]:
del entry["components"]
resources.append(entry)
# --- L2 inter-site links ---
for net in slice_object.get_l2networks():
net_type = net.get_type()
if net_type not in ("L2PTP", "L2STS"):
continue
ifaces = net.get_interfaces()
sites = list({iface.get_site() for iface in ifaces})
if len(sites) == 2:
resources.append(
{
"type": "link",
"site_a": sites[0],
"site_b": sites[1],
"bandwidth": net.get_bandwidth(),
}
)
# --- Facility ports ---
for fp in slice_object.get_facilities():
resources.append(
{
"type": "facility_port",
"name": fp.get_name(),
"site": fp.get_site(),
"vlans": len(fp.get_interfaces()),
}
)
return resources
@staticmethod
def _normalize_component_keys(
resources: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Normalize component keys in resource dicts to match the format used
by the reports DB: ``"{ComponentType}-{Model}"`` (e.g.
``"SmartNIC-ConnectX-5"``).
Accepts both the DB format and the fablib model name format
(e.g. ``"NIC_ConnectX_5"``). Unknown keys are passed through
unchanged.
"""
from fim.slivers.component_catalog import ComponentModelTypeMap
from fabrictestbed_extensions.fablib.component import Component
# Build fablib-name -> DB-key mapping lazily
mapping: Dict[str, str] = {}
for fablib_name, model_type in Component.component_model_map.items():
catalog_entry = ComponentModelTypeMap.get(model_type)
if catalog_entry:
db_key = f"{catalog_entry['Type']}-{catalog_entry['Model']}"
mapping[fablib_name] = db_key
result = []
for r in resources:
if r.get("type") == "compute" and r.get("components"):
normalized = {}
for key, count in r["components"].items():
normalized[mapping.get(key, key)] = count
r = {**r, "components": normalized}
result.append(r)
return result
[docs]
def find_resource_slot(
self,
start: datetime.datetime,
end: datetime.datetime,
duration: int,
slice: Optional["Slice"] = None,
resources: Optional[List[Dict[str, Any]]] = None,
max_results: int = 1,
use_live_data: bool = False,
) -> Dict[str, Any]:
"""
Find time windows where requested resources are simultaneously available.
Resources can be specified either by passing an unsubmitted Slice object
(built with ``add_node()``, ``add_l2network()``, etc.) or by providing a
raw list of resource requirement dicts.
When providing ``resources`` directly, component keys in the
``"components"`` dict can use either the fablib model name
(e.g. ``"NIC_ConnectX_5"``) or the calendar/DB format
(e.g. ``"SmartNIC-ConnectX-5"``); they will be normalized
automatically.
:param start: start of the search window (UTC)
:param end: end of the search window (UTC)
:param duration: required slot length in hours
:param slice: an unsubmitted Slice whose topology will be converted
to resource requirements automatically
:param resources: list of resource requirement dicts (advanced usage)
:param max_results: maximum number of slots to return
:param use_live_data: when True, use live orchestrator data instead of
Reports API for more accurate, real-time availability
:return: dict with slot results
:raises ValueError: if both or neither of ``slice`` and ``resources``
are provided
"""
if (slice is None) == (resources is None):
raise ValueError("Exactly one of 'slice' or 'resources' must be provided.")
if slice is not None:
resources = self._slice_to_resources(slice)
resources = self._normalize_component_keys(resources)
if start and end and (end - start) < datetime.timedelta(minutes=60):
raise Exception("Time range should be at least 60 minutes long!")
if start and end and (end - start).days > 30:
raise Exception("Search range must not exceed 30 days")
return self.get_manager().find_resource_slot(
start=start,
end=end,
duration=duration,
resources=resources,
max_results=max_results,
use_live_data=use_live_data,
)
[docs]
def resources_calendar(
self,
start: datetime.datetime,
end: datetime.datetime,
interval: str = "day",
site: Optional[List[str]] = None,
host: Optional[List[str]] = None,
exclude_site: Optional[List[str]] = None,
exclude_host: Optional[List[str]] = None,
show: str = "all",
output: Optional[str] = None,
fields: Optional[List[str]] = None,
quiet: bool = False,
filter_function=None,
):
"""
Fetch and display resource availability calendar over time slots.
There are several output options: ``"text"``, ``"pandas"``, and
``"json"`` that determine the format of the output that is
returned and (optionally) displayed/printed.
output: ``'text'``: string formatted with tabulate
``'pandas'``: pandas dataframe (default in Jupyter)
``'json'``: string in json format
``'list'``: raw list of dicts
fields: list of columns to include in the output table.
Example: ``fields=['Date', 'Name', 'Cores (avail/cap)', 'RAM GB (avail/cap)']``
filter_function: A lambda to filter the flattened rows.
Example: ``filter_function=lambda r: r['Name'] == 'RENC'``
:param start: start of the calendar window (UTC)
:type start: datetime.datetime
:param end: end of the calendar window (UTC)
:type end: datetime.datetime
:param interval: time slot granularity: hour, day, or week
:type interval: str
:param site: list of site names to include
:type site: list[str]
:param host: list of host names to include
:type host: list[str]
:param exclude_site: list of site names to exclude
:type exclude_site: list[str]
:param exclude_host: list of host names to exclude
:type exclude_host: list[str]
:param show: which resource types to display: ``"sites"``,
``"hosts"``, or ``"all"`` (default)
:type show: str
:param output: output format
:type output: str
:param fields: list of fields (table columns) to show
:type fields: list[str]
:param quiet: True to suppress printing/display
:type quiet: bool
:param filter_function: lambda function to filter rows
:type filter_function: callable
:return: table in format specified by output parameter
"""
if start and end and start >= end:
raise ValueError("start must be before end")
if start and end and (end - start) < datetime.timedelta(minutes=60):
raise Exception("Time range should be at least 60 minutes long!")
if start and end and (end - start).days > 30:
raise Exception("Search range must not exceed 30 days")
calendar_data = self.get_manager().resources_calendar(
start=start,
end=end,
interval=interval,
site=site,
host=host,
exclude_site=exclude_site,
exclude_host=exclude_host,
)
return Utils.show_calendar(
calendar_data,
show=show,
fields=fields,
output=output,
quiet=quiet,
filter_function=filter_function,
)
[docs]
def get_random_site(
self,
avoid: Optional[List[str]] = None,
filter_function=None,
update: Optional[bool] = True,
) -> str:
"""
Get a random site.
:param avoid: list of site names to avoid choosing
:type avoid: List[String]
:param filter_function: filter_function
:type filter_function:
:param update: flag indicating if fetch latest availability information
:type update: bool
:return: one site name
:rtype: String
"""
return self.get_random_sites(
count=1, avoid=avoid, filter_function=filter_function, update=update
)[0]
[docs]
def get_random_sites(
self,
count: Optional[int] = 1,
avoid: Optional[List[str]] = None,
filter_function=None,
update: Optional[bool] = True,
) -> List[str]:
"""
Get a list of random sites names. Each site will be included at most once.
:param count: number of sites to return.
:type count: int
:param avoid: list of site names to avoid choosing
:type avoid: List[String]
:param filter_function: filter_function
:type filter_function:
:param update: flag indicating if fetch latest availability information
:type update: bool
:return: list of random site names.
:rtype: List[String]
"""
if avoid is None:
avoid = []
else:
avoid = list(avoid) # Make a copy to avoid modifying the original
def combined_filter_function(site):
"""
Filter out "impossible" sites.
Always filter out sites in maintenance and sites that
can't support any VMs.
"""
if filter_function is None:
if site["state"] == "Active" and site["hosts"] > 0:
return True
else:
if (
filter_function(site)
and site["state"] == "Active"
and site["hosts"] > 0
):
return True
return False
for site in self.get_avoid():
if site not in avoid:
avoid.append(site)
site_list = self.list_sites(
output="list",
quiet=True,
filter_function=combined_filter_function,
update=update,
# if filter function is not specified, no need for latlon
)
sites = list(map(lambda x: x["name"], site_list))
# sites = self.get_resources().get_site_list()
for site in avoid:
if site in sites:
sites.remove(site)
rtn_sites = []
for i in range(count):
if len(sites) > 0:
rand_site = random.choice(sites)
sites.remove(rand_site)
rtn_sites.append(rand_site)
else:
rtn_sites.append(None)
return rtn_sites
[docs]
def probe_bastion_host(self) -> Optional[bool]:
"""
See if bastion will admit us with our configuration.
Bastion hosts are configured to block hosts that attempts to
use it with too many repeated authentication failures. We
want to avoid that.
Returns ``True`` if connection attempt succeeds. Raises an
error in the event of failure.
"""
bastion_client = paramiko.SSHClient()
bastion_client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
bastion_host = self.get_bastion_host()
bastion_username = self.get_bastion_username()
bastion_key_path = self.get_bastion_key_location()
bastion_key_passphrase = self.get_bastion_key_passphrase()
try:
log.info(
f"Probing bastion host {bastion_host} with "
f"username: {bastion_username}, key: {bastion_key_path}, "
f"key passphrase: {'hidden' if bastion_key_passphrase else None}"
)
result = bastion_client.connect(
hostname=bastion_host,
username=bastion_username,
key_filename=bastion_key_path,
passphrase=bastion_key_passphrase,
allow_agent=False,
look_for_keys=False,
)
# Things should be fine if we are here.
if result is None:
log.info(f"Connection with {bastion_host} appears to be working")
return True
except paramiko.SSHException as e:
note = "Hint: check your bastion key. Is it valid? Is it expired?"
log.error(f"Error connecting to bastion host {bastion_host}: {e} ({note})")
# Since Python 3.11, we have BaseException.add_note(),
# which is a nicer way of adding some extra information to
# the exception.
#
# https://docs.python.org/3.11/whatsnew/3.11.html#pep-678-exceptions-can-be-enriched-with-notes
#
# With Python versions prior to that, we just append a
# hint to BaseException.args tuple.
if sys.version_info.minor >= 11:
e.add_note(note)
else:
e.args = e.args + (note,)
raise e
except Exception as e:
log.error(f"Error connecting to bastion host {bastion_host}: {e}")
raise e
finally:
bastion_client.close()
[docs]
def get_manager(self) -> FabricManagerV2:
"""Gets the manager of this fablib object.
Lazily initializes the FabricManagerV2 on first access, avoiding
network calls during FablibManager construction.
:return: the manager on this fablib object
:rtype: FabricManagerV2
"""
if not self._manager_built and not self._offline:
self.__build_manager()
self._manager_built = True
if self._validate_config and not self.get_no_ssh():
self.verify_and_configure(validate_only=True)
return self.manager
[docs]
def new_slice(
self,
name: str,
storage: bool = False,
storage_cluster: str = None,
) -> Slice:
"""
Creates a new slice with the given name.
:param name: the name to give the slice
:type name: String
:param storage: Enable CephFS storage on every node added to
this slice. Default: False
:type storage: bool
:param storage_cluster: Ceph cluster name. When ``None``, the
first available cluster is auto-discovered at post-boot time.
:type storage_cluster: str
:return: a new slice
:rtype: Slice
"""
new_slice = Slice.new_slice(
self, name=name, storage=storage, storage_cluster=storage_cluster
)
return new_slice
[docs]
def new_crinkle_slice(
self,
name: str,
pcaps_dir: str = ".query_analysis_pcaps",
name_prefix: str = "C",
) -> CrinkleSlice:
"""
Creates a new Crinkle slice with the given name and with an Analyzer node.
:param name: the name to give the slice
:type name: String
:param analyzer_name: Name for the
:type analyzer_name: String
:param cores:
:type cores: int
:param ram:
:type ram: int
:param disk:
:type disk: int
:param site:
:type site: String
:param pcaps_dir:
:type pcaps_dir: String
:return: A new Crinkle slice
:rtype: CrinkleSlice
"""
new_slice = CrinkleSlice.new_slice(
self, name=name, pcaps_dir=pcaps_dir, name_prefix=name_prefix
)
return new_slice
[docs]
def get_available_resources(
self,
update: Optional[bool] = False,
force_refresh: Optional[bool] = False,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
avoid: Optional[List[str]] = None,
includes: Optional[List[str]] = None,
) -> ResourcesV2:
"""
Get the available resources.
Optionally update the available resources by querying the
FABRIC services. Otherwise, this method returns the existing
information.
:param update:
:type update: bool
:param force_refresh:
:type force_refresh: bool
:param start: start time in UTC format: %Y-%m-%d %H:%M:%S %z
:type: datetime
:param end: end time in UTC format: %Y-%m-%d %H:%M:%S %z
:type: datetime
:param avoid: list of sites to avoid
:type: list of string
:param includes: list of sites to include
:type: list of string
:return: Available ResourcesV2Wrapper object
"""
if start and end and (end - start) < datetime.timedelta(minutes=60):
raise Exception("Time range should be at least 60 minutes long!")
if self.resources is None:
self.resources = ResourcesV2(
self,
force_refresh=force_refresh,
start=start,
end=end,
avoid=avoid,
includes=includes,
)
elif update:
self.resources.update(
force_refresh=force_refresh,
start=start,
end=end,
avoid=avoid,
includes=includes,
)
return self.resources
[docs]
def list_slices(
self,
excludes: Optional[list[SliceState]] = None,
output: Optional[str] = None,
fields: Optional[list[str]] = None,
quiet: Optional[bool] = False,
filter_function=None,
pretty_names: Optional[bool] = True,
user_only: Optional[bool] = True,
show_un_submitted: Optional[bool] = False,
):
"""
Lists all the slices created by a user.
There are several output options: "text", "pandas", and "json" that determine the format of the
output that is returned and (optionally) displayed/printed.
output: 'text': string formatted with tabular
'pandas': pandas dataframe
'json': string in json format
fields: json output will include all available fields/columns.
Example: fields=['Name','State']
filter_function: A lambda function to filter data by field values.
Example: filter_function=lambda s: s['State'] == 'Configuring'
:param excludes: slice states to exclude. Defaults to [SliceState.Dead, SliceState.Closing].
:type excludes: list[SliceState]
:param output: output format
:type output: str
:param fields: list of fields (table columns) to show
:type fields: List[str]
:param quiet: True to specify printing/display
:type quiet: bool
:param filter_function: lambda function
:type filter_function: lambda
:return: table in format specified by output parameter
:param pretty_names: pretty_names
:type pretty_names: bool
:param user_only: True indicates return own slices; False indicates return project slices
:type user_only: bool
:param show_un_submitted: True indicates to also show unsubmitted slices
:type show_un_submitted: bool
:rtype: Object
"""
table = []
for slice in self.get_slices(
excludes=excludes, user_only=user_only, show_un_submitted=show_un_submitted
):
table.append(slice.toDict())
if pretty_names:
pretty_names_dict = Slice.get_pretty_names_dict()
else:
pretty_names_dict = {}
return Utils.list_table(
table,
fields=fields,
title="Slices",
output=output,
quiet=quiet,
filter_function=filter_function,
pretty_names_dict=pretty_names_dict,
)
[docs]
def show_slice(
self,
name: Optional[str] = None,
id: Optional[str] = None,
output: Optional[str] = None,
fields: Optional[list[str]] = None,
quiet: Optional[bool] = False,
pretty_names: Optional[bool] = True,
user_only: Optional[bool] = True,
show_un_submitted: Optional[bool] = False,
):
"""
Show a table with all the properties of a specific site
There are several output options: "text", "pandas", and "json" that determine the format of the
output that is returned and (optionally) displayed/printed.
output: 'text': string formatted with tabular
'pandas': pandas dataframe
'json': string in json format
fields: json output will include all available fields.
Example: fields=['Name','State']
:param name: the name of a slice
:type name: str
:param id: the slice id
:type name: str
:param output: output format
:type output: str
:param fields: list of fields to show
:type fields: List[str]
:param quiet: True to specify printing/display
:type quiet: bool
:param pretty_names: pretty_names
:type pretty_names: bool
:param user_only: True indicates return own slices; False indicates return project slices
:type user_only: bool
:param show_un_submitted: True indicates to also show unsubmitted slices
:type show_un_submitted: bool
:return: table in format specified by output parameter
:rtype: Object
"""
slice = self.get_slice(
name=name,
slice_id=id,
user_only=user_only,
show_un_submitted=show_un_submitted,
)
return slice.show(
output=output, fields=fields, quiet=quiet, pretty_names=pretty_names
)
[docs]
def get_slices(
self,
excludes: Optional[List[SliceState]] = None,
slice_name: Optional[str] = None,
slice_id: Optional[str] = None,
user_only: Optional[bool] = True,
show_un_submitted: Optional[bool] = False,
) -> List[Slice]:
"""
Gets a list of slices from the slice manager.
By default this method ignores Dead and Closing slices. Optional,
parameter allows excluding a different list of slice states. Pass
an empty list (i.e. excludes=[]) to get a list of all slices.
:param excludes: A list of slice states to exclude from the output list.
Defaults to [SliceState.Dead, SliceState.Closing].
:type excludes: List[SliceState]
:param slice_name: Filter by slice name
:type slice_name: str
:param slice_id: Filter by slice ID
:type slice_id: str
:param user_only: True indicates return own slices; False indicates return project slices
:type user_only: bool
:param show_un_submitted: Show unsubmitted slices
:type show_un_submitted: bool
:return: a list of slices
:rtype: List[Slice]
"""
import time
# Handle default for excludes (avoid mutable default argument)
if excludes is None:
excludes = [SliceState.Dead, SliceState.Closing]
if self.get_log_level() == logging.DEBUG:
start = time.time()
existing_slice = self._get_slice_from_cache(
slice_id=slice_id, slice_name=slice_name
)
if existing_slice and (existing_slice.get_slice_id() or show_un_submitted):
existing_slice.update()
return_slices = [existing_slice]
return return_slices
excludes_states = []
for exclude in excludes:
excludes_states.append(str(exclude))
slices = self.get_manager().list_slices(
exclude_states=excludes_states,
name=slice_name,
slice_id=slice_id,
limit=200,
as_self=user_only,
return_fmt="dto",
)
if self.get_log_level() == logging.DEBUG:
end = time.time()
log.debug(
f"Running self.get_slice_manager().slices(): elapsed time: {end - start} seconds"
)
return_slices = []
for slice in slices:
slice_object = Slice.get_slice(self, sm_slice=slice, user_only=user_only)
return_slices.append(slice_object)
return return_slices
[docs]
def get_slice(
self,
name: str = None,
slice_id: str = None,
user_only: bool = True,
show_un_submitted: bool = False,
) -> Slice:
"""
Gets a slice by name or slice_id. Dead and Closing slices may have
non-unique names and must be queried by slice_id. Slices in all other
states are guaranteed to have unique names and can be queried by name.
If both a name and slice_id are provided, the slice matching the
slice_id will be returned.
:param name: The name of the desired slice
:type name: String
:param slice_id: The ID of the desired slice
:type slice_id: String
:param user_only: True indicates return own slices; False indicates return project slices
:type user_only: bool
:param show_un_submitted: Show unsubmitted slices
:type show_un_submitted: bool
:raises: Exception: if slice name or slice id are not inputted
:return: the slice, if found
:rtype: Slice
"""
# Get the appropriate slices list
if slice_id:
# if getting by slice_id consider all slices
slices = self.get_slices(
excludes=[],
slice_id=slice_id,
user_only=user_only,
show_un_submitted=show_un_submitted,
)
if len(slices) == 1:
return slices[0]
else:
raise Exception(f"More than 1 slice found with slice_id: {slice_id}")
elif name:
# if getting by name then only consider active slices
slices = self.get_slices(
excludes=[SliceState.Dead, SliceState.Closing],
slice_name=name,
user_only=user_only,
show_un_submitted=show_un_submitted,
)
if len(slices) > 0:
return slices[0]
else:
raise SliceNotFoundError(
f'Unable to find slice "{name}" for this project. Check slice name spelling and project id.'
)
else:
raise Exception(
"get_slice requires slice name (name) or slice id (slice_id)"
)
[docs]
def get_crinkle_slices(
self,
excludes: List[SliceState] = [SliceState.Dead, SliceState.Closing],
slice_name: str = None,
slice_id: str = None,
user_only: bool = True,
show_un_submitted: bool = False,
pcaps_dir: str = ".query_analysis_pcaps",
name_prefix: str = "C",
) -> List[CrinkleSlice]:
"""
Gets a list of Crinkle slices from the slice manager.
By default this method ignores Dead and Closing slices. Optional,
parameter allows excluding a different list of slice states. Pass
an empty list (i.e. excludes=[]) to get a list of all slices.
:param excludes: A list of slice states to exclude from the output list
:type excludes: List[SliceState]
:param slice_name:
:param slice_id:
:param user_only: True indicates return own slices; False indicates return project slices
:type user_only: bool
:param show_un_submitted: Show unsubmitted slices;
:type show_un_submitted: bool
:param pcaps_dir: The working directory for .pcap file transfers
:type pcaps_dir: String
:param name_prefix: The name prefix to use for autogenerated Crinkle objects. Must match what was used in slice creation
:type name_prefix: String
:return: a list of slices
:rtype: List[Slice]
"""
import time
# Handle default for excludes (avoid mutable default argument)
if excludes is None:
excludes = [SliceState.Dead, SliceState.Closing]
if self.get_log_level() == logging.DEBUG:
start = time.time()
existing_slice = self._get_slice_from_cache(
slice_id=slice_id, slice_name=slice_name
)
if existing_slice and (existing_slice.get_slice_id() or show_un_submitted):
existing_slice.update()
return_slices = [existing_slice]
return return_slices
excludes_states = []
for exclude in excludes:
excludes_states.append(str(exclude))
slices = self.get_manager().list_slices(
exclude_states=excludes_states,
name=slice_name,
slice_id=slice_id,
limit=200,
as_self=user_only,
return_fmt="dto",
)
if self.get_log_level() == logging.DEBUG:
end = time.time()
log.debug(
f"Running self.get_slice_manager().slices(): elapsed time: {end - start} seconds"
)
return_slices = []
for slice in slices:
slice_object = CrinkleSlice.get_slice(
self,
sm_slice=slice,
user_only=user_only,
pcaps_dir=pcaps_dir,
name_prefix=name_prefix,
)
return_slices.append(slice_object)
return return_slices
[docs]
def get_crinkle_slice(
self,
name: str = None,
slice_id: str = None,
user_only: bool = True,
show_un_submitted: bool = False,
pcaps_dir: str = ".query_analysis_pcaps",
name_prefix: str = "C",
) -> CrinkleSlice:
"""
Gets a Crinkle slice by name or slice_id. Dead and Closing slices may have
non-unique names and must be queried by slice_id. Slices in all other
states are guaranteed to have unique names and can be queried by name.
If both a name and slice_id are provided, the slice matching the
slice_id will be returned.
:param name: The name of the desired slice
:type name: String
:param slice_id: The ID of the desired slice
:type slice_id: String
:param user_only: True indicates return own slices; False indicates return project slices
:type user_only: bool
:param show_un_submitted: Show unsubmitted slices
:type show_un_submitted: bool
:param pcaps_dir: The working directory for .pcap file transfers
:type pcaps_dir: String
:param name_prefix: The name prefix to use for autogenerated Crinkle objects. Must match what was used in slice creation
:type name_prefix: String
:raises: Exception: if slice name or slice id are not inputted
:return: the slice, if found
:rtype: Slice
"""
# Get the appropriate slices list
if slice_id:
# if getting by slice_id consider all slices
slices = self.get_crinkle_slices(
excludes=[],
slice_id=slice_id,
user_only=user_only,
show_un_submitted=show_un_submitted,
pcaps_dir=pcaps_dir,
name_prefix=name_prefix,
)
if len(slices) == 1:
return slices[0]
else:
raise Exception(f"More than 1 slice found with slice_id: {slice_id}")
elif name:
# if getting by name then only consider active slices
slices = self.get_crinkle_slices(
excludes=[SliceState.Dead, SliceState.Closing],
slice_name=name,
user_only=user_only,
show_un_submitted=show_un_submitted,
pcaps_dir=pcaps_dir,
name_prefix=name_prefix,
)
if len(slices) > 0:
return slices[0]
else:
raise Exception(
f'Unable to find slice "{name}" for this project. Check slice name spelling and project id.'
)
else:
raise Exception(
"get_slice requires slice name (name) or slice id (slice_id)"
)
[docs]
def delete_slice(self, slice_name: str = None):
"""
Deletes a slice by name.
:param slice_name: the name of the slice to delete
:type slice_name: String
"""
slice_obj = self.get_slice(slice_name, show_un_submitted=True)
slice_obj.delete()
self.remove_slice_from_cache(slice_obj)
[docs]
def delete_all(self, progress: bool = True):
"""
Deletes all slices on the slice manager.
:param progress: optional progress printing to stdout
:type progress: Bool
"""
slices = self.get_slices(show_un_submitted=True)
for slice_obj in slices:
try:
if progress:
print(f"Deleting slice {slice_obj.get_name()}", end="")
slice_obj.delete()
self.remove_slice_from_cache(slice_obj)
if progress:
print(", Success!")
except Exception as e:
if progress:
print(", Failed!")
[docs]
def validate_node(self, node: Node, allocated: dict = None) -> Tuple[bool, str]:
"""
Validate a node w.r.t available resources on a site before submission.
Delegates to :class:`NodeValidatorV2` with pre-fetched resources.
This method must remain on ``FablibManager`` because
``Node.add_component()`` calls ``self.get_fablib_manager().validate_node()``.
:return: Tuple indicating status for validation and error message in case of failure
:rtype: Tuple[bool, str]
"""
from fabrictestbed_extensions.fablib.validator import NodeValidator
return NodeValidator.validate_node(
node=node,
resources=self.get_resources(),
allocated=allocated,
project_tags=self.get_project_tags(),
)
[docs]
def create_artifact(
self,
artifact_title: str,
description_short: str,
description_long: str,
authors: List[str],
tags: List[str],
visibility: Visibility = Visibility.Author,
update_existing: bool = True,
) -> Artifact:
"""
Create a new artifact or update an existing one.
:param artifact_title: Title of the artifact
:param description_short: Short description of the artifact
:param description_long: Long description of the artifact
:param authors: List of authors associated with the artifact
:param tags: List of tags associated with the artifact
:param visibility: Visibility level of the artifact
:param update_existing: Flag indicating whether to update an existing artifact
:return: Dictionary containing the artifact details
:raises FabricManagerV2Exception: If there is an error in creating or updating the artifact.
"""
artifact_info = self.get_manager().create_artifact(
artifact_title=artifact_title,
description_short=description_short,
description_long=description_long,
authors=authors,
tags=tags,
visibility=visibility,
update_existing=update_existing,
)
return Artifact(artifact_info=artifact_info, fablib_manager=self)
[docs]
def get_artifacts(
self,
artifact_title: str = None,
artifact_id: str = None,
tag: str = None,
) -> List[Artifact]:
"""
Gets a list of artifacts either based on artifact id, artifact title or tag.
:param artifact_title:
:param artifact_id:
:param tag:
:return: a list of Artifacts
:rtype: List[Artifact]
"""
import time
if self.get_log_level() == logging.DEBUG:
start = time.time()
if artifact_id:
artifacts = self.get_manager().list_artifacts(artifact_id=artifact_id)
elif artifact_title:
artifacts = self.get_manager().list_artifacts(search=artifact_title)
elif tag:
artifacts = self.get_manager().list_artifacts(search=tag)
else:
artifacts = self.get_manager().list_artifacts()
if self.get_log_level() == logging.DEBUG:
end = time.time()
log.debug(
f"Running self.get_manager().list_artifacts(): elapsed time: {end - start} seconds"
)
return_artifacts = []
for a in artifacts:
return_artifacts.append(Artifact(artifact_info=a, fablib_manager=self))
return return_artifacts
[docs]
def list_artifacts(
self,
output=None,
fields=None,
quiet=False,
filter_function=None,
pretty_names=True,
) -> object:
"""
List artifacts based on a search query.
:param search: Search query for filtering artifacts
:param output: Output format - 'text', 'pandas', 'json'
:param fields: List of fields (table columns) to show
:param quiet: True to suppress printing/display
:param filter_function: Lambda function to filter data by field values
:param pretty_names: Whether to use pretty names for fields
:return: Table in format specified by output parameter
:raises FabricManagerV2Exception: If there is an error in listing the artifacts.
"""
# Fetch the list of artifacts from the manager
table = [a.to_dict() for a in self.get_artifacts()]
# Use the existing list_table function for output formatting
table = Utils.list_table(
table,
fields=fields,
title="Artifacts",
output=output,
quiet=quiet,
filter_function=filter_function,
pretty_names_dict=Artifact.pretty_names if pretty_names else None,
)
return table
[docs]
def delete_artifact(self, artifact_id: str = None, artifact_title: str = None):
"""
Delete an artifact by its ID or title.
This method deletes an artifact from the system. Either the `artifact_id` or `artifact_title`
must be provided to identify the artifact to be deleted. If `artifact_id` is not provided,
the method will search for the artifact using `artifact_title` and then delete it.
:param artifact_id: The unique identifier of the artifact to be deleted.
:param artifact_title: The title of the artifact to be deleted.
:raises ValueError: If neither `artifact_id` nor `artifact_title` is provided.
:raises FabricManagerV2Exception: If an error occurs during the deletion process.
"""
if artifact_id:
self.get_manager().delete_artifact(artifact_id=artifact_id)
elif artifact_title:
artifacts = self.get_artifacts(artifact_title=artifact_title)
if len(artifacts) == 1:
self.get_manager().delete_artifact(
artifact_id=artifacts[0].to_dict().get("uuid")
)
else:
raise ValueError("More than one artifact found")
else:
raise ValueError("artifact_title or artifact_id must be provided")
[docs]
def upload_file_to_artifact(
self, file_to_upload: str, artifact_id: str = None, artifact_title: str = None
) -> dict:
"""
Upload a file to an existing artifact.
This method uploads a file to an artifact identified by either its `artifact_id` or `artifact_title`.
If `artifact_id` is not provided, the method will search for the artifact using `artifact_title`
before uploading the file.
:param file_to_upload: The path to the file that should be uploaded.
:param artifact_id: The unique identifier of the artifact to which the file will be uploaded.
:param artifact_title: The title of the artifact to which the file will be uploaded.
:return: A dictionary containing the details of the uploaded file.
:raises ValueError: If neither `artifact_id` nor `artifact_title` is provided.
:raises FabricManagerV2Exception: If an error occurs during the upload process.
"""
return self.get_manager().upload_file_to_artifact(
file_to_upload=file_to_upload,
artifact_id=artifact_id,
artifact_title=artifact_title,
)
[docs]
def download_artifact(
self,
download_dir: str,
artifact_id: str = None,
artifact_title: str = None,
version: str = None,
version_urn: str = None,
) -> str:
"""
Download an artifact to a specified directory.
This method downloads an artifact identified by either its `artifact_id` or `artifact_title` to the
specified `download_dir`.
If `artifact_id` is not provided, the method will search for the artifact using `artifact_title`.
:param download_dir: The directory where the artifact will be downloaded.
:param artifact_id: The unique identifier of the artifact to download.
:param artifact_title: The title of the artifact to download.
:param version: The specific version of the artifact to download (optional).
:param version_urn: Version urn for the artifact
:return: The path to the downloaded artifact.
:raises ValueError: If neither `artifact_id` nor `artifact_title` is provided.
:raises FabricManagerV2Exception: If an error occurs during the download process.
"""
return self.get_manager().download_artifact(
download_dir=download_dir,
artifact_id=artifact_id,
artifact_title=artifact_title,
version=version,
version_urn=version_urn,
)
def _fetch_all_pages(
self,
fetch_func,
offset: int = 0,
limit: int = 200,
) -> List:
"""
Generic paginated fetch helper.
Fetches all pages from a paginated API endpoint.
:param fetch_func: Function that takes offset and limit kwargs and returns a list
:param offset: Starting offset (default: 0)
:param limit: Page size (default: 200)
:return: Combined list of all results
:rtype: List
"""
results = []
current_offset = offset
while True:
page_data = fetch_func(offset=current_offset, limit=limit)
if not page_data:
break
results.extend(page_data)
# Check if we got fewer items than requested, meaning we've reached the end
if len(page_data) < limit:
break
current_offset += len(page_data)
return results
[docs]
def list_storage(
self,
offset: int = 0,
limit: int = 200,
fetch_all: bool = False,
output=None,
fields=None,
quiet=False,
filter_function=None,
) -> object:
"""
List storage volumes available for the project.
:param offset: Pagination offset (default: 0).
:type offset: int
:param limit: Maximum number of records to fetch per page (default: 200).
:type limit: int
:param fetch_all: If True, automatically fetch all storage volumes across all pages (default: False).
:type fetch_all: bool
:param output: Output format - 'text', 'pandas', 'json'
:param fields: List of fields (table columns) to show
:param quiet: True to suppress printing/display
:param filter_function: Lambda function to filter data by field values
:return: Table in format specified by output parameter
:raises FabricManagerV2Exception: If there is an error in listing storage volumes.
"""
if fetch_all:
storage_list = self._fetch_all_pages(
self.get_manager().list_storage,
offset=offset,
limit=limit,
)
else:
storage_list = self.get_manager().list_storage(offset=offset, limit=limit)
# Use the existing list_table function for output formatting
table = Utils.list_table(
storage_list,
fields=fields,
title="Storage Volumes",
output=output,
quiet=quiet,
filter_function=filter_function,
)
return table
[docs]
def get_storage(self, uuid: str) -> list:
"""
Get a specific storage volume by UUID.
:param uuid: Storage volume UUID.
:type uuid: str
:return: Storage volume details.
:rtype: list
:raises FabricManagerV2Exception: If there is an error retrieving the storage volume.
"""
return self.get_manager().get_storage(uuid=uuid)
[docs]
def discover_ceph_clusters(self, verify: bool = True) -> list:
"""
Discover Ceph clusters via the Ceph Manager API.
Calls :py:meth:`CephFsUtils.list_clusters_from_api` using this object's
Ceph Manager base URL and token file.
:param bool verify: Verify TLS certificates when calling the API.
Defaults to ``True``.
:return: List of cluster name
:rtype: list
:raises RuntimeError: If the API call fails.
"""
return CephFsUtils.list_clusters_from_api(
base_url=self.get_ceph_mgr_host(),
token_file=self.get_token_location(), # or token=...
verify=verify,
)
[docs]
def discover_user_ceph_clusters(self, verify: bool = True) -> list:
"""
Discover Ceph clusters where the current user has a CephX keyring.
Queries all clusters and returns only those where the user's entity
has an active keyring.
:param bool verify: Verify TLS certificates when calling the API.
:return: List of cluster names where the user has credentials.
:rtype: list
"""
user_entity = self.get_bastion_username()
if not user_entity:
self.determine_bastion_username()
user_entity = self.get_bastion_username()
if not user_entity:
raise ValueError("User/bastion login is empty.")
if not user_entity.startswith("client."):
user_entity = f"client.{user_entity}"
return CephFsUtils.discover_user_clusters(
user_entity=user_entity,
base_url=self.get_ceph_mgr_host(),
token_file=self.get_token_location(),
verify=verify,
)
[docs]
def generate_ceph_bundle(
self,
cluster: str,
out_base: str = "~/.ceph",
mount_root: str = "/mnt/cephfs",
verify: bool = True,
) -> dict:
"""
Generate a local bundle for mounting CephFS paths for the current user
on a specific cluster.
This:
1. Fetches the minimal ``ceph.conf`` for ``region``.
2. Exports the user's keyring from that cluster.
3. Writes files under ``out_base/<region>/``:
``ceph.conf``, ``ceph.client.<user>.secret``,
``ceph.client.<user>.keyring``, and a mount script
``mount_<user>.sh`` that mounts every path found in the MDS caps.
:param str cluster: Target cluster name (e.g., ``"europe"``).
:param str out_base: Output root directory for artifacts
(default: ``"~/.ceph"``).
:param str mount_root: Mount prefix used by the generated script
(default: ``"/mnt/cephfs"``).
:param bool verify: Verify TLS certificates when calling the API.
Defaults to ``True``.
:return: Details of the generated bundle, for example::
{
"cluster_dir": "~/.ceph/europe",
"ceph_conf": "~/.ceph/europe/ceph.conf",
"secret_file": "~/.ceph/europe/ceph.client.alice.secret",
"keyring_file": "~/.ceph/europe/ceph.client.alice.keyring",
"mount_script": "~/.ceph/europe/mount_alice.sh",
"entity": "client.alice",
"user": "alice",
"mounts": [
{
"fsname": "CEPH-FS-01",
"path": "/volumes/_nogroup/alice/....",
"mount_point": "/mnt/cephfs/europe/alice/volumes__nogroup_alice____"
},
...
]
}
:rtype: dict
:raises ValueError: If ``cluster`` is unknown or the user name is empty.
:raises RuntimeError: If no clusters are discovered or keyring is unavailable.
"""
# Accept either "alice" or "client.alice" from your accessor
user_entity = self.get_bastion_username()
if not user_entity:
self.determine_bastion_username()
user_entity = self.get_bastion_username()
if not user_entity:
raise ValueError("User/bastion login is empty.")
if not user_entity.startswith("client."):
user_entity = f"client.{user_entity}"
return CephFsUtils.build_for_user_from_api(
base_url=self.get_ceph_mgr_host(),
user_entity=user_entity,
cluster=cluster,
token_file=self.get_token_location(), # or token=...
verify=verify,
out_base=out_base,
mount_root_default=mount_root,
)