Utils

Attributes

CACHE_DIR module-attribute

CACHE_DIR = join(VAR_DIR, CACHE_FOLDER_NAME)

CONFIG_DIR module-attribute

CONFIG_DIR = join(ROOT_DIR, CONFIG_FOLDER_NAME)

FBM_Component_Version module-attribute

FBM_Component_Version = Version

The Type of objects representing version numbers in Fed-BioMed

ROOT_DIR module-attribute

ROOT_DIR = _get_fedbiomed_root()

TMP_DIR module-attribute

TMP_DIR = join(VAR_DIR, TMP_FOLDER_NAME)

VAR_DIR module-attribute

VAR_DIR = join(ROOT_DIR, VAR_FOLDER_NAME)

Functions

compute_dot_product

compute_dot_product(model, params, device=None)

Compute the dot product between model and input parameters.

Parameters:

Name Type Description Default
model dict

OrderedDict representing model state

required
params dict

OrderedDict containing correction parameters

required

Returns:

Type Description
tensor

A tensor containing a single numerical value which is the dot product.

Source code in fedbiomed/common/utils/_utils.py
def compute_dot_product(model: dict, params: dict, device: Optional[str] = None) -> torch.tensor:
    """Compute the dot product between model and input parameters.

    Args:
        model: OrderedDict representing model state
        params: OrderedDict containing correction parameters

    Returns:
        A tensor containing a single numerical value which is the dot product.
    """
    model_p = model.values()
    correction_state = params.values()
    if device is None:
        if model_p:
            device = list(model_p)[0].device
        else:
            # if device is not found, set it to `cpu`
            device = 'cpu'
    dot_prod = sum([torch.sum(m * torch.tensor(p).float().to(device)) for m, p in zip(model_p, correction_state)])
    return dot_prod

convert_iterator_to_list_of_python_floats

convert_iterator_to_list_of_python_floats(iterator)

Converts numerical values of array-like object to float

Parameters:

Name Type Description Default
iterator Iterator

Array-like numeric object to convert numerics to float

required

Returns:

Type Description
List[float]

Numerical elements as converted to List of floats

Source code in fedbiomed/common/utils/_utils.py
def convert_iterator_to_list_of_python_floats(iterator: Iterator) -> List[float]:
    """Converts numerical values of array-like object to float

    Args:
        iterator: Array-like numeric object to convert numerics to float

    Returns:
        Numerical elements as converted to List of floats
    """

    if not isinstance(iterator, Iterable):
        raise FedbiomedError(f"{ErrorNumbers.FB627.value}: object {type(iterator)} is not iterable")

    list_of_floats = []
    if isinstance(iterator, dict):
        # specific processing for dictionaries
        for val in iterator.values():
            list_of_floats.append(convert_to_python_float(val))
    else:
        for it in iterator:
            list_of_floats.append(convert_to_python_float(it))
    return list_of_floats

convert_to_python_float

convert_to_python_float(value)

Convert numeric types to float

Parameters:

Name Type Description Default
value Union[Tensor, integer, floating, float, int]

value to convert python type float

required

Returns:

Type Description
float

Python float

Source code in fedbiomed/common/utils/_utils.py
def convert_to_python_float(value: Union[torch.Tensor, np.integer, np.floating, float, int]) -> float:
    """ Convert numeric types to float

    Args:
        value: value to convert python type float

    Returns:
        Python float
    """

    if not isinstance(value, (torch.Tensor, np.integer, np.floating, float, int)):
        raise FedbiomedError(
            f"{ErrorNumbers.FB627.value}: Converting {type(value)} to python to float is not supported.")

    # if the result is a tensor, convert it back to numpy
    if isinstance(value, torch.Tensor):
        value = value.numpy()

    if isinstance(value, Iterable) and value.size > 1:
        raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Can not convert array-type objects to float.")

    return float(value)

create_fedbiomed_setup_folders

create_fedbiomed_setup_folders(root)

Creates folders reequired by Fed-BioMed component setup

Parameters:

Name Type Description Default
root str

Root directory of Fed-BioMed component setup

required
Source code in fedbiomed/common/utils/_config_utils.py
def create_fedbiomed_setup_folders(root: str):
    """Creates folders reequired by Fed-BioMed component setup

    Args:
        root: Root directory of Fed-BioMed component setup
    """

    etc_config_dir = os.path.join(root, CONFIG_FOLDER_NAME)
    var_dir = os.path.join(root, VAR_FOLDER_NAME)
    cache_dir = os.path.join(var_dir, CACHE_FOLDER_NAME)
    tmp_dir  = os.path.join(var_dir, TMP_FOLDER_NAME)

    for dir_ in [etc_config_dir, var_dir, cache_dir, tmp_dir]:
        if not os.path.isdir(dir_):
            os.makedirs(dir_)

divide

divide(xs, k)

Divides a list of integers by a constant

Parameters:

Name Type Description Default
xs List[int]

List of integers

required
k int

Constant to divide by

required

Returns:

Type Description
List[int]

List of divided integers

Source code in fedbiomed/common/utils/_secagg_utils.py
def divide(xs: List[int], k: int) -> List[int]:
    """
    Divides a list of integers by a constant

    Args:
        xs: List of integers
        k: Constant to divide by

    Returns:
        List of divided integers
    """
    # Quicker than converting to/from numpy
    return [e / k for e in xs]

get_all_existing_certificates

get_all_existing_certificates()

Gets all existing certificates from Fed-BioMed etc directory.

This method parse all available configs in etc directory.

Returns:

Type Description
List[Dict[str, str]]

List of certificate objects that contain component type as component, party id id, public key content

List[Dict[str, str]]

(not path) as certificate.

Source code in fedbiomed/common/utils/_config_utils.py
def get_all_existing_certificates() -> List[Dict[str, str]]:
    """Gets all existing certificates from Fed-BioMed `etc` directory.

    This method parse all available configs in `etc` directory.

    Returns:
        List of certificate objects that contain  component type as `component`, party id `id`, public key content
        (not path)  as `certificate`.
    """

    config_files = get_all_existing_config_files()

    certificates = []
    for config in config_files:
        certificates.append(get_component_certificate_from_config(config))

    return certificates

get_all_existing_config_files

get_all_existing_config_files()

Gets all existing config files from Fed-BioMed etc directory

Source code in fedbiomed/common/utils/_config_utils.py
def get_all_existing_config_files():
    """Gets all existing config files from Fed-BioMed `etc` directory"""
    etc = os.path.join(ROOT_DIR, CONFIG_FOLDER_NAME, '')
    return [file for file in glob.glob(f"{etc}*.ini")]

get_class_source

get_class_source(cls)

Get source of the class.

It uses different methods for getting the class source based on shell type; IPython,Notebook shells or Python shell

Parameters:

Name Type Description Default
cls Callable

The class to extract the source code from

required

Returns:

Name Type Description
str str

Source code of the given class

Raises:

Type Description
FedbiomedError

if argument is not a class

Source code in fedbiomed/common/utils/_utils.py
def get_class_source(cls: Callable) -> str:
    """Get source of the class.

    It uses different methods for getting the class source based on shell type; IPython,Notebook
    shells or Python shell

    Args:
        cls: The class to extract the source code from

    Returns:
        str: Source code of the given class

    Raises:
        FedbiomedError: if argument is not a class
    """

    if not inspect.isclass(cls):
        raise FedbiomedError(f'{ErrorNumbers.FB627.value}: The argument `cls` must be a python class')

    # Check ipython status
    status = is_ipython()

    if status:
        file = get_ipython_class_file(cls)
        codes = "".join(inspect.linecache.getlines(file))
        class_code = extract_symbols(codes, cls.__name__)[0][0]
        return class_code

    return inspect.getsource(cls)

get_component_certificate_from_config

get_component_certificate_from_config(config_path)

Gets component certificate, id and component type by given config file path.

Parameters:

Name Type Description Default
config_path str

Path where config file is located.

required

Returns:

Type Description
Dict[str, str]

Certificate object that contains component type as component, party id id, public key content (not path) as certificate

Raises:

Type Description
FedbiomedError
  • If config file does not contain node_id or researcher_id under default section.
  • If config file does not contain public_key under ssl section.
  • If certificate file is not found or not readable
Source code in fedbiomed/common/utils/_config_utils.py
def get_component_certificate_from_config(
        config_path: str
) -> Dict[str, str]:
    """Gets component certificate, id and component type by given config file path.

    Args:
        config_path: Path where config file is located.

    Returns:
        Certificate object that contains  component type as `component`, party id `id`, public key content
            (not path)  as `certificate`

    Raises:
        FedbiomedError:
            - If config file does not contain `node_id` or `researcher_id` under `default` section.
            - If config file does not contain `public_key` under `ssl` section.
            - If certificate file is not found or not readable
    """

    config = get_component_config(config_path)
    component_id = config.get("default", "id")
    component_type = config.get("default", "component")

    ip = config.get("mpspdz", "mpspdz_ip")
    port = config.get("mpspdz", "mpspdz_port")
    certificate_path = os.path.join(os.path.dirname(config_path), config.get("mpspdz", "public_key"))

    if not os.path.isfile(certificate_path):
        raise FedbiomedError(f"The certificate for component '{component_id}' not found in {certificate_path}")

    try:
        with open(certificate_path, 'r') as file:
            certificate = file.read()
    except Exception as e:
        raise FedbiomedError(f"Error while reading certificate -> {certificate_path}. Error: {e}")

    return {
        "party_id": component_id,
        "certificate": certificate,
        "ip": ip,
        "port": port,
        "component": component_type
    }

get_component_config

get_component_config(config_path)

Gets config object from given config path.

Parameters:

Name Type Description Default
config_path str

The path where config file is stored.

required

Returns:

Type Description
ConfigParser

Configuration object.

Raises:

Type Description
FedbiomedError

If config file is not readable or not existing.

Source code in fedbiomed/common/utils/_config_utils.py
def get_component_config(
        config_path: str
) -> configparser.ConfigParser:
    """Gets config object from given config path.

    Args:
        config_path: The path where config file is stored.

    Returns:
        Configuration object.

    Raises:
        FedbiomedError: If config file is not readable or not existing.
    """
    config = configparser.ConfigParser()

    try:
        config.read(config_path)
    except Exception:
        raise FedbiomedError(f"Can not read config file. Please make sure it is existing or it has valid format. "
                             f"{config_path}")

    return config

get_existing_component_db_names

get_existing_component_db_names()

Gets DB_PATHs of all existing components in Fed-BioMed root

Source code in fedbiomed/common/utils/_config_utils.py
def get_existing_component_db_names():
    """Gets DB_PATHs of all existing components in Fed-BioMed root"""

    config_files = get_all_existing_config_files()
    db_names = {}

    for _config in config_files:
        config = get_component_config(_config)
        component_id = config['default']['id']

        db_name = f"{DB_PREFIX}{component_id}"
        db_names = {**db_names, component_id: db_name}

    return db_names

get_ipython_class_file

get_ipython_class_file(cls)

Get source file/cell-id of the class which is defined in ZMQInteractiveShell or TerminalInteractiveShell

Parameters:

Name Type Description Default
cls Callable

Python class defined on the IPython kernel

required

Returns:

Type Description
str

File path or id of Jupyter cell. On IPython's interactive shell, it returns cell ID

Source code in fedbiomed/common/utils/_utils.py
def get_ipython_class_file(cls: Callable) -> str:
    """Get source file/cell-id of the class which is defined in ZMQInteractiveShell or TerminalInteractiveShell

    Args:
        cls: Python class defined on the IPython kernel

    Returns:
        File path or id of Jupyter cell. On IPython's interactive shell, it returns cell ID
    """

    # Lookup by parent module
    if hasattr(cls, '__module__'):
        object_ = sys.modules.get(cls.__module__)
        # If module has `__file__` attribute
        if hasattr(object_, '__file__'):
            return object_.__file__

        # If parent module is __main__
        for name, member in inspect.getmembers(cls):
            if inspect.isfunction(member) and cls.__qualname__ + '.' + member.__name__ == member.__qualname__:
                return inspect.getfile(member)
    else:
        raise FedbiomedError(f'{ErrorNumbers.FB627.value}: {cls} has no attribute `__module__`, source is not found.')

get_method_spec

get_method_spec(method)

Helper to get argument specification

Parameters:

Name Type Description Default
method Callable

The function/method to extract argument specification from

required

Returns:

Type Description
dict

Specification of the method

Source code in fedbiomed/common/utils/_utils.py
def get_method_spec(method: Callable) -> dict:
    """ Helper to get argument specification

    Args:
        method: The function/method to extract argument specification from

    Returns:
         Specification of the method
    """

    method_spec = {}
    parameters = inspect.signature(method).parameters
    for (key, val) in parameters.items():
        method_spec[key] = {
            'name': val.name,
            'default': None if val.default is inspect._empty else val.default,
            'annotation': None if val.default is inspect._empty else val.default
        }

    return method_spec

import_class_from_file

import_class_from_file(module_path, class_name)

Import a module from a file and return a specified class of the module.

Parameters:

Name Type Description Default
module_path str

path to python module file

required
class_name str

name of the class

required

Returns:

Type Description
Tuple[Any, Any]

Tuple of the module created and the training plan class loaded

Raises:

Type Description
FedbiomedError

bad argument type

FedbiomedError

cannot load module or class

Source code in fedbiomed/common/utils/_utils.py
def import_class_from_file(module_path: str, class_name: str) -> Tuple[Any, Any]:
    """Import a module from a file and return a specified class of the module.

    Args:
        module_path: path to python module file
        class_name: name of the class

    Returns:
        Tuple of the module created and the training plan class loaded

    Raises:
        FedbiomedError: bad argument type
        FedbiomedError: cannot load module or class
    """
    if not os.path.isfile(module_path):
        raise FedbiomedError(f"{ErrorNumbers.FB627}: Given path for importing {class_name} is not existing")

    module_base_name = os.path.basename(module_path)
    pattern = re.compile("(.*).py$")

    match = pattern.match(module_base_name)
    if not match:
        raise FedbiomedError(f"{ErrorNumbers.FB627}: File is not a python file.")

    module = match.group(1)
    sys.path.insert(0, os.path.dirname(module_path))

    try:
        module = importlib.import_module(module)
    except ModuleNotFoundError as exp:
        raise FedbiomedError(f"Specified module is not existing. {exp}") from exp

    try:
        class_ = getattr(module, class_name)
    except AttributeError as exp:
        raise FedbiomedError(f"{ErrorNumbers.FB627}, Attribute error while loading the class "
                             f"{class_name} from {module_path}. Error: {exp}") from exp
    sys.path.pop(0)

    return module, class_

import_class_from_spec

import_class_from_spec(code, class_name)

Import a module from a code and extract the code of a specified class of the module.

Parameters:

Name Type Description Default
code str

code of the module

required
class_name str

name of the class

required

Returns:

Type Description
Tuple[Any, Any]

Tuple of the module created and the extracted class

Raises:

Type Description
FedbiomedError

bad argument type

FedbiomedError

cannot load module or extract clas

Source code in fedbiomed/common/utils/_utils.py
def import_class_from_spec(code: str, class_name: str) -> Tuple[Any, Any] :
    """Import a module from a code and extract the code of a specified class of the module.

    Args:
        code: code of the module
        class_name: name of the class

    Returns:
        Tuple of the module created and the extracted class

    Raises:
        FedbiomedError: bad argument type
        FedbiomedError: cannot load module or extract clas
    """

    for arg in [code, class_name]:
        if not isinstance(arg, str):
            raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Expected argument type is string but got '{type(arg)}'")

    try:
        spec = importlib.util.spec_from_loader("module_", loader=None)
        module = importlib.util.module_from_spec(spec)
        exec(code, module.__dict__)
    except Exception as e:
        raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Can not load module from given code: {e}")

    try:
        class_ = getattr(module, class_name)
    except AttributeError:
        raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Can not import {class_name} from given code")

    return module, class_

import_class_object_from_file

import_class_object_from_file(module_path, class_name)

Import a module from a file and create an instance of a specified class of the module.

Parameters:

Name Type Description Default
module_path str

path to python module file

required
class_name str

name of the class

required

Returns:

Type Description
Tuple[Any, Any]

Tuple of the module created and the training plan object created

Raises:

Type Description
FedbiomedError

bad argument type

FedbiomedError

cannot instantiate object

Source code in fedbiomed/common/utils/_utils.py
def import_class_object_from_file(module_path: str, class_name: str) -> Tuple[Any, Any]:
    """Import a module from a file and create an instance of a specified class of the module.

    Args:
        module_path: path to python module file
        class_name: name of the class

    Returns:
        Tuple of the module created and the training plan object created

    Raises:
        FedbiomedError: bad argument type
        FedbiomedError: cannot instantiate object
    """
    for arg in [module_path, class_name]:
        if not isinstance(arg, str):
            raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Expected argument type is string but got '{type(arg)}'")

    module, train_class = import_class_from_file(module_path, class_name)

    try:
        train_class_instance = train_class()
    except Exception as e:
        raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Cannot instantiate training plan object: {e}")

    return module, train_class_instance

matching_parties_biprime

matching_parties_biprime(context, parties)

Check if parties of given context are compatible with the parties of a secagg biprime element.

Parameters:

Name Type Description Default
context dict

context to be compared with the secagg biprime element parties

required
parties list

the secagg biprime element parties

required

Returns:

Type Description
bool

True if this context can be used with this element, False if not.

Source code in fedbiomed/common/utils/_secagg_utils.py
def matching_parties_biprime(context: dict, parties: list) -> bool:
    """Check if parties of given context are compatible with the parties
        of a secagg biprime element.

        Args:
            context: context to be compared with the secagg biprime element parties
            parties: the secagg biprime element parties

        Returns:
            True if this context can be used with this element, False if not.
    """
    # Need to ensure that:
    # - either the existing element is not attached to specific parties (None)
    # - or existing element was established for the same parties or a superset of the parties
    #   (order can differ)
    return (
        # Commented tests can be assumed from calling functions
        #
        # isinstance(context, dict) and
        # 'parties' in context and
        # isinstance(parties, list) and
        (
            context['parties'] is None or (
                # isinstance(context['parties'], list) and
                set(parties).issubset(set(context['parties']))
            )))

matching_parties_dh

matching_parties_dh(context, parties)

Check if parties of given context are compatible with the parties of a secagg Diffie Hellman element.

Parameters:

Name Type Description Default
context dict

context to be compared with the secagg servkey element parties

required
parties list

the secagg servkey element parties

required

Returns:

Type Description
bool

True if this context can be used with this element, False if not.

Source code in fedbiomed/common/utils/_secagg_utils.py
def matching_parties_dh(context: dict, parties: list) -> bool:
    """Check if parties of given context are compatible with the parties
        of a secagg Diffie Hellman element.

        Args:
            context: context to be compared with the secagg servkey element parties
            parties: the secagg servkey element parties

        Returns:
            True if this context can be used with this element, False if not.
        """
    # Need to ensure that:
    # - no check on first party (no cryptographic material attached to the researcher).
    #   The context is established for a given experiment, thus a given researcher but this should
    #   be tested prior to this call.
    # - existing element was established for the same node parties or a superset of the node parties
    #   (order can differ, as nodes are ordered by the cipher code)
    #
    # eg: [ 'un', 'deux', 'trois' ] parties compatible with [ 'un', 'trois', 'deux' ] context
    # but not with [ 'deux', 'un', 'trois' ]
    # eg: [ 'un', 'deux', 'trois' ] parties compatible with [ 'un', 'trois', 'quatre', 'deux' ] context
    # but not with [ 'un', 'deux', 'quatre' ]
    return (
        # Commented tests can be assumed from calling functions
        #
        # isinstance(context, dict) and
        # 'parties' in context and
        # isinstance(context['parties'], list) and
        # len(context['parties']) >= 1 and
        # isinstance(parties, list) and
        set(parties[1:]).issubset(set(context['parties'][1:])))

matching_parties_servkey

matching_parties_servkey(context, parties)

Check if parties of given context are compatible with the parties of a secagg servkey element.

Parameters:

Name Type Description Default
context dict

context to be compared with the secagg servkey element parties

required
parties list

the secagg servkey element parties

required

Returns:

Type Description
bool

True if this context can be used with this element, False if not.

Source code in fedbiomed/common/utils/_secagg_utils.py
def matching_parties_servkey(context: dict, parties: list) -> bool:
    """Check if parties of given context are compatible with the parties
        of a secagg servkey element.

        Args:
            context: context to be compared with the secagg servkey element parties
            parties: the secagg servkey element parties

        Returns:
            True if this context can be used with this element, False if not.
        """
    # Need to ensure that:
    # - existing element was established for the same parties
    # - first party needs to be the same for both
    # - set of other parties needs to be the same for both (order can differ)
    #
    # eg: [ 'un', 'deux', 'trois' ] compatible with [ 'un', 'trois', 'deux' ]
    # but not with [ 'deux', 'un', 'trois' ]
    return (
        # Commented tests can be assumed from calling functions
        #
        # isinstance(context, dict) and
        # 'parties' in context and
        # isinstance(context['parties'], list) and
        # len(context['parties']) >= 1 and
        # isinstance(parties, list) and
        parties[0] == context['parties'][0] and
        set(parties[1:]) == set(context['parties'][1:]))

multiply

multiply(xs, k)

Multiplies a list of integers by a constant

Parameters:

Name Type Description Default
xs List[int]

List of integers

required
k int

Constant to multiply by

required

Returns:

Type Description
List[int]

List of multiplied integers

Source code in fedbiomed/common/utils/_secagg_utils.py
def multiply(xs: List[int], k: int) -> List[int]:
    """
    Multiplies a list of integers by a constant

    Args:
        xs: List of integers
        k: Constant to multiply by

    Returns:
        List of multiplied integers
    """
    # Quicker than converting to/from numpy
    return [e * k for e in xs]

quantize

quantize(weights, clipping_range=None, target_range=SAParameters.TARGET_RANGE)

Quantization step implemented by: https://dl.acm.org/doi/pdf/10.1145/3488659.3493776

This function returns a vector in the range [0, target_range-1].

Parameters:

Name Type Description Default
weights List[float]

List of model weight values

required
clipping_range Union[int, None]

Clipping range

None
target_range int

Target range

TARGET_RANGE

Returns:

Type Description
List[int]

Quantized model weights as numpy array.

Source code in fedbiomed/common/utils/_secagg_utils.py
def quantize(
    weights: List[float],
    clipping_range: Union[int, None] = None,
    target_range: int = SAParameters.TARGET_RANGE,
) -> List[int]:
    """Quantization step implemented by: https://dl.acm.org/doi/pdf/10.1145/3488659.3493776

    This function returns a vector in the range [0, target_range-1].

    Args:
        weights: List of model weight values
        clipping_range: Clipping range
        target_range: Target range

    Returns:
        Quantized model weights as numpy array.

    """
    if clipping_range is None:
        clipping_range = SAParameters.CLIPPING_RANGE

    _check_clipping_range(weights, clipping_range)

    # CAVEAT: ensure to be converting from `float` to `uint64`` (no intermediate `int64`)
    # Process ensures an to compute an `int`` in the range [0, target_range -1]
    # This enables to use at most 2**64 as target_range (max value of `uint` - 1)
    f = np.vectorize(
        lambda x: min(
            target_range - 1,
            (sorted((-clipping_range, x, clipping_range))[1] +
            clipping_range) *
            target_range / (2 * clipping_range),
        ),
        otypes=[np.uint64]
    )
    quantized_list = f(weights)

    return quantized_list.tolist()

raise_for_version_compatibility

raise_for_version_compatibility(their_version, our_version, error_msg=None)

Check version compatibility and behave accordingly.

Raises an exception if the versions are incompatible, otherwise outputs a warning or info message.

Parameters:

Name Type Description Default
their_version Union[FBM_Component_Version, str]

the version that we detected in the component

required
our_version Union[FBM_Component_Version, str]

the version of the component within the current runtime

required
error_msg Optional[str]

an optional error message. It may contain two %s placeholders which will be substituted with the values of their_version and our_version.

None

Raises:

Type Description
FedbiomedVersionError

if the versions are incompatible

Source code in fedbiomed/common/utils/_versions.py
def raise_for_version_compatibility(their_version: Union[FBM_Component_Version, str],
                                    our_version: Union[FBM_Component_Version, str],
                                    error_msg: Optional[str] = None) -> None:
    """Check version compatibility and behave accordingly.

    Raises an exception if the versions are incompatible, otherwise outputs a warning or info message.

    Args:
        their_version: the version that we detected in the component
        our_version: the version of the component within the current runtime
        error_msg: an optional error message. It may contain two %s placeholders which will be substituted with
            the values of their_version and our_version.

    Raises:
        FedbiomedVersionError: if the versions are incompatible
    """
    if isinstance(our_version, str):
        our_version = FBM_Component_Version(our_version)
    if isinstance(their_version, str):
        their_version = FBM_Component_Version(their_version)
    if not isinstance(our_version, FBM_Component_Version):
        msg = f"{ErrorNumbers.FB625.value}: Component version has incorrect type `our_version` type={str(type(our_version))} value={our_version}"
        logger.critical(msg)
        raise FedbiomedVersionError(msg)
    if not isinstance(their_version, FBM_Component_Version):
        msg = f"{ErrorNumbers.FB625.value}: Component version has incorrect type `their_version` type={str(type(their_version))} value={their_version}"
        logger.critical(msg)
        raise FedbiomedVersionError(msg)
    if our_version != their_version:
        # note: the checks below rely on the short-circuiting behaviour of the or operator
        # (e.g. when checking our_version.minor < their_version.minor we have the guarantee that
        # our_version.major == their_version.major
        if our_version.major != their_version.major or \
                our_version.minor < their_version.minor or \
                (our_version.minor == their_version.minor and our_version.micro < their_version.micro):
            msg = _create_msg_for_version_check(
                f"{ErrorNumbers.FB625.value}: Found incompatible version %s, expected version %s" if error_msg is None else error_msg,
                their_version,
                our_version
            )
            logger.critical(msg)
            raise FedbiomedVersionError(msg)
        else:
            msg = _create_msg_for_version_check(
                "Found version %s, expected version %s",
                their_version,
                our_version
            )
            logger.warning(msg)

read_file

read_file(path)

Read given file

Parameters:

Name Type Description Default
path

Path to file to be read

required

Raises:

Type Description
FedbiomedError

If the file is not existing or readable.

Source code in fedbiomed/common/utils/_utils.py
def read_file(path):
    """Read given file

    Args:
        path: Path to file to be read

    Raises:
        FedbiomedError: If the file is not existing or readable.
    """
    try:
        with open(path, "r") as file:
            content = file.read()
            file.close()
    except Exception as e:
        raise FedbiomedError(
            f"{ErrorNumbers.FB627.value}: Can not read file {path}. Error: {e}"
        )
    else:
        return content

reverse_quantize

reverse_quantize(weights, clipping_range=None, target_range=SAParameters.TARGET_RANGE)

Reverse quantization step implemented by: https://dl.acm.org/doi/pdf/10.1145/3488659.3493776

Args: weights: List of quantized model weights clipping_range: Clipping range used for quantization target_range: Target range used for quantization

Returns:

Type Description
List[float]

Reversed quantized model weights as numpy array.

Source code in fedbiomed/common/utils/_secagg_utils.py
def reverse_quantize(
    weights: List[int],
    clipping_range: Union[int, None] = None,
    target_range: int = SAParameters.TARGET_RANGE,
) -> List[float]:
    """Reverse quantization step implemented by: https://dl.acm.org/doi/pdf/10.1145/3488659.3493776

     Args:
        weights: List of quantized model weights
        clipping_range: Clipping range used for quantization
        target_range: Target range used for quantization

    Returns:
        Reversed quantized model weights as numpy array.
    """

    if clipping_range is None:
        clipping_range = SAParameters.CLIPPING_RANGE

    # CAVEAT: there should not be any weight received that does not fit in `uint64`
    max_val = np.iinfo(np.uint64).max
    if any([v > max_val or v < 0 for v in weights]):
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: Cannot reverse quantize, received values exceed maximum number"
        )

    max_range = clipping_range
    min_range = -clipping_range
    step_size = (max_range - min_range) / (target_range - 1)
    # Compute as input type (`np.uint64` then convert to `np.float64`)
    f = np.vectorize(
        lambda x: (min_range + step_size * x),
        otypes=[np.float64]
    )

    # TODO: we could check that received values are in the range
    weights = np.array(weights, dtype=np.uint64)
    reverse_quantized_list = f(weights)

    return reverse_quantized_list.tolist()