Attributes
CACHE_DIR module-attribute
CACHE_DIR = join(VAR_DIR, CACHE_FOLDER_NAME)
CONFIG_DIR module-attribute
CONFIG_DIR = join(ROOT_DIR, CONFIG_FOLDER_NAME)
FBM_Component_Version module-attribute
FBM_Component_Version = Version
The Type of objects representing version numbers in Fed-BioMed
ROOT_DIR module-attribute
ROOT_DIR = _get_fedbiomed_root()
TMP_DIR module-attribute
TMP_DIR = join(VAR_DIR, TMP_FOLDER_NAME)
VAR_DIR module-attribute
VAR_DIR = join(ROOT_DIR, VAR_FOLDER_NAME)
Functions
compute_dot_product
compute_dot_product(model, params, device=None)
Compute the dot product between model and input parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model | dict | OrderedDict representing model state | required |
params | dict | OrderedDict containing correction parameters | required |
Returns:
Type | Description |
---|---|
tensor | A tensor containing a single numerical value which is the dot product. |
Source code in fedbiomed/common/utils/_utils.py
def compute_dot_product(model: dict, params: dict, device: Optional[str] = None) -> torch.tensor:
"""Compute the dot product between model and input parameters.
Args:
model: OrderedDict representing model state
params: OrderedDict containing correction parameters
Returns:
A tensor containing a single numerical value which is the dot product.
"""
model_p = model.values()
correction_state = params.values()
if device is None:
if model_p:
device = list(model_p)[0].device
else:
# if device is not found, set it to `cpu`
device = 'cpu'
dot_prod = sum([torch.sum(m * torch.tensor(p).float().to(device)) for m, p in zip(model_p, correction_state)])
return dot_prod
convert_iterator_to_list_of_python_floats
convert_iterator_to_list_of_python_floats(iterator)
Converts numerical values of array-like object to float
Parameters:
Name | Type | Description | Default |
---|---|---|---|
iterator | Iterator | Array-like numeric object to convert numerics to float | required |
Returns:
Type | Description |
---|---|
List[float] | Numerical elements as converted to List of floats |
Source code in fedbiomed/common/utils/_utils.py
def convert_iterator_to_list_of_python_floats(iterator: Iterator) -> List[float]:
"""Converts numerical values of array-like object to float
Args:
iterator: Array-like numeric object to convert numerics to float
Returns:
Numerical elements as converted to List of floats
"""
if not isinstance(iterator, Iterable):
raise FedbiomedError(f"{ErrorNumbers.FB627.value}: object {type(iterator)} is not iterable")
list_of_floats = []
if isinstance(iterator, dict):
# specific processing for dictionaries
for val in iterator.values():
list_of_floats.append(convert_to_python_float(val))
else:
for it in iterator:
list_of_floats.append(convert_to_python_float(it))
return list_of_floats
convert_to_python_float
convert_to_python_float(value)
Convert numeric types to float
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value | Union[Tensor, integer, floating, float, int] | value to convert python type float | required |
Returns:
Type | Description |
---|---|
float | Python float |
Source code in fedbiomed/common/utils/_utils.py
def convert_to_python_float(value: Union[torch.Tensor, np.integer, np.floating, float, int]) -> float:
""" Convert numeric types to float
Args:
value: value to convert python type float
Returns:
Python float
"""
if not isinstance(value, (torch.Tensor, np.integer, np.floating, float, int)):
raise FedbiomedError(
f"{ErrorNumbers.FB627.value}: Converting {type(value)} to python to float is not supported.")
# if the result is a tensor, convert it back to numpy
if isinstance(value, torch.Tensor):
value = value.numpy()
if isinstance(value, Iterable) and value.size > 1:
raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Can not convert array-type objects to float.")
return float(value)
create_fedbiomed_setup_folders
create_fedbiomed_setup_folders(root)
Creates folders reequired by Fed-BioMed component setup
Parameters:
Name | Type | Description | Default |
---|---|---|---|
root | str | Root directory of Fed-BioMed component setup | required |
Source code in fedbiomed/common/utils/_config_utils.py
def create_fedbiomed_setup_folders(root: str):
"""Creates folders reequired by Fed-BioMed component setup
Args:
root: Root directory of Fed-BioMed component setup
"""
etc_config_dir = os.path.join(root, CONFIG_FOLDER_NAME)
var_dir = os.path.join(root, VAR_FOLDER_NAME)
cache_dir = os.path.join(var_dir, CACHE_FOLDER_NAME)
tmp_dir = os.path.join(var_dir, TMP_FOLDER_NAME)
for dir_ in [etc_config_dir, var_dir, cache_dir, tmp_dir]:
if not os.path.isdir(dir_):
os.makedirs(dir_)
divide
divide(xs, k)
Divides a list of integers by a constant
Parameters:
Name | Type | Description | Default |
---|---|---|---|
xs | List[int] | List of integers | required |
k | int | Constant to divide by | required |
Returns:
Type | Description |
---|---|
List[int] | List of divided integers |
Source code in fedbiomed/common/utils/_secagg_utils.py
def divide(xs: List[int], k: int) -> List[int]:
"""
Divides a list of integers by a constant
Args:
xs: List of integers
k: Constant to divide by
Returns:
List of divided integers
"""
# Quicker than converting to/from numpy
return [e / k for e in xs]
get_all_existing_certificates
get_all_existing_certificates()
Gets all existing certificates from Fed-BioMed etc
directory.
This method parse all available configs in etc
directory.
Returns:
Type | Description |
---|---|
List[Dict[str, str]] | List of certificate objects that contain component type as |
List[Dict[str, str]] | (not path) as |
Source code in fedbiomed/common/utils/_config_utils.py
def get_all_existing_certificates() -> List[Dict[str, str]]:
"""Gets all existing certificates from Fed-BioMed `etc` directory.
This method parse all available configs in `etc` directory.
Returns:
List of certificate objects that contain component type as `component`, party id `id`, public key content
(not path) as `certificate`.
"""
config_files = get_all_existing_config_files()
certificates = []
for config in config_files:
certificates.append(get_component_certificate_from_config(config))
return certificates
get_all_existing_config_files
get_all_existing_config_files()
Gets all existing config files from Fed-BioMed etc
directory
Source code in fedbiomed/common/utils/_config_utils.py
def get_all_existing_config_files():
"""Gets all existing config files from Fed-BioMed `etc` directory"""
etc = os.path.join(ROOT_DIR, CONFIG_FOLDER_NAME, '')
return [file for file in glob.glob(f"{etc}*.ini")]
get_class_source
get_class_source(cls)
Get source of the class.
It uses different methods for getting the class source based on shell type; IPython,Notebook shells or Python shell
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cls | Callable | The class to extract the source code from | required |
Returns:
Name | Type | Description |
---|---|---|
str | str | Source code of the given class |
Raises:
Type | Description |
---|---|
FedbiomedError | if argument is not a class |
Source code in fedbiomed/common/utils/_utils.py
def get_class_source(cls: Callable) -> str:
"""Get source of the class.
It uses different methods for getting the class source based on shell type; IPython,Notebook
shells or Python shell
Args:
cls: The class to extract the source code from
Returns:
str: Source code of the given class
Raises:
FedbiomedError: if argument is not a class
"""
if not inspect.isclass(cls):
raise FedbiomedError(f'{ErrorNumbers.FB627.value}: The argument `cls` must be a python class')
# Check ipython status
status = is_ipython()
if status:
file = get_ipython_class_file(cls)
codes = "".join(inspect.linecache.getlines(file))
class_code = extract_symbols(codes, cls.__name__)[0][0]
return class_code
return inspect.getsource(cls)
get_component_certificate_from_config
get_component_certificate_from_config(config_path)
Gets component certificate, id and component type by given config file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_path | str | Path where config file is located. | required |
Returns:
Type | Description |
---|---|
Dict[str, str] | Certificate object that contains component type as |
Raises:
Type | Description |
---|---|
FedbiomedError |
|
Source code in fedbiomed/common/utils/_config_utils.py
def get_component_certificate_from_config(
config_path: str
) -> Dict[str, str]:
"""Gets component certificate, id and component type by given config file path.
Args:
config_path: Path where config file is located.
Returns:
Certificate object that contains component type as `component`, party id `id`, public key content
(not path) as `certificate`
Raises:
FedbiomedError:
- If config file does not contain `node_id` or `researcher_id` under `default` section.
- If config file does not contain `public_key` under `ssl` section.
- If certificate file is not found or not readable
"""
config = get_component_config(config_path)
component_id = config.get("default", "id")
component_type = config.get("default", "component")
ip = config.get("mpspdz", "mpspdz_ip")
port = config.get("mpspdz", "mpspdz_port")
certificate_path = os.path.join(os.path.dirname(config_path), config.get("mpspdz", "public_key"))
if not os.path.isfile(certificate_path):
raise FedbiomedError(f"The certificate for component '{component_id}' not found in {certificate_path}")
try:
with open(certificate_path, 'r') as file:
certificate = file.read()
except Exception as e:
raise FedbiomedError(f"Error while reading certificate -> {certificate_path}. Error: {e}")
return {
"party_id": component_id,
"certificate": certificate,
"ip": ip,
"port": port,
"component": component_type
}
get_component_config
get_component_config(config_path)
Gets config object from given config path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_path | str | The path where config file is stored. | required |
Returns:
Type | Description |
---|---|
ConfigParser | Configuration object. |
Raises:
Type | Description |
---|---|
FedbiomedError | If config file is not readable or not existing. |
Source code in fedbiomed/common/utils/_config_utils.py
def get_component_config(
config_path: str
) -> configparser.ConfigParser:
"""Gets config object from given config path.
Args:
config_path: The path where config file is stored.
Returns:
Configuration object.
Raises:
FedbiomedError: If config file is not readable or not existing.
"""
config = configparser.ConfigParser()
try:
config.read(config_path)
except Exception:
raise FedbiomedError(f"Can not read config file. Please make sure it is existing or it has valid format. "
f"{config_path}")
return config
get_existing_component_db_names
get_existing_component_db_names()
Gets DB_PATHs of all existing components in Fed-BioMed root
Source code in fedbiomed/common/utils/_config_utils.py
def get_existing_component_db_names():
"""Gets DB_PATHs of all existing components in Fed-BioMed root"""
config_files = get_all_existing_config_files()
db_names = {}
for _config in config_files:
config = get_component_config(_config)
component_id = config['default']['id']
db_name = f"{DB_PREFIX}{component_id}"
db_names = {**db_names, component_id: db_name}
return db_names
get_ipython_class_file
get_ipython_class_file(cls)
Get source file/cell-id of the class which is defined in ZMQInteractiveShell or TerminalInteractiveShell
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cls | Callable | Python class defined on the IPython kernel | required |
Returns:
Type | Description |
---|---|
str | File path or id of Jupyter cell. On IPython's interactive shell, it returns cell ID |
Source code in fedbiomed/common/utils/_utils.py
def get_ipython_class_file(cls: Callable) -> str:
"""Get source file/cell-id of the class which is defined in ZMQInteractiveShell or TerminalInteractiveShell
Args:
cls: Python class defined on the IPython kernel
Returns:
File path or id of Jupyter cell. On IPython's interactive shell, it returns cell ID
"""
# Lookup by parent module
if hasattr(cls, '__module__'):
object_ = sys.modules.get(cls.__module__)
# If module has `__file__` attribute
if hasattr(object_, '__file__'):
return object_.__file__
# If parent module is __main__
for name, member in inspect.getmembers(cls):
if inspect.isfunction(member) and cls.__qualname__ + '.' + member.__name__ == member.__qualname__:
return inspect.getfile(member)
else:
raise FedbiomedError(f'{ErrorNumbers.FB627.value}: {cls} has no attribute `__module__`, source is not found.')
get_method_spec
get_method_spec(method)
Helper to get argument specification
Parameters:
Name | Type | Description | Default |
---|---|---|---|
method | Callable | The function/method to extract argument specification from | required |
Returns:
Type | Description |
---|---|
dict | Specification of the method |
Source code in fedbiomed/common/utils/_utils.py
def get_method_spec(method: Callable) -> dict:
""" Helper to get argument specification
Args:
method: The function/method to extract argument specification from
Returns:
Specification of the method
"""
method_spec = {}
parameters = inspect.signature(method).parameters
for (key, val) in parameters.items():
method_spec[key] = {
'name': val.name,
'default': None if val.default is inspect._empty else val.default,
'annotation': None if val.default is inspect._empty else val.default
}
return method_spec
import_class_from_file
import_class_from_file(module_path, class_name)
Import a module from a file and return a specified class of the module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
module_path | str | path to python module file | required |
class_name | str | name of the class | required |
Returns:
Type | Description |
---|---|
Tuple[Any, Any] | Tuple of the module created and the training plan class loaded |
Raises:
Type | Description |
---|---|
FedbiomedError | bad argument type |
FedbiomedError | cannot load module or class |
Source code in fedbiomed/common/utils/_utils.py
def import_class_from_file(module_path: str, class_name: str) -> Tuple[Any, Any]:
"""Import a module from a file and return a specified class of the module.
Args:
module_path: path to python module file
class_name: name of the class
Returns:
Tuple of the module created and the training plan class loaded
Raises:
FedbiomedError: bad argument type
FedbiomedError: cannot load module or class
"""
if not os.path.isfile(module_path):
raise FedbiomedError(f"{ErrorNumbers.FB627}: Given path for importing {class_name} is not existing")
module_base_name = os.path.basename(module_path)
pattern = re.compile("(.*).py$")
match = pattern.match(module_base_name)
if not match:
raise FedbiomedError(f"{ErrorNumbers.FB627}: File is not a python file.")
module = match.group(1)
sys.path.insert(0, os.path.dirname(module_path))
try:
module = importlib.import_module(module)
except ModuleNotFoundError as exp:
raise FedbiomedError(f"Specified module is not existing. {exp}") from exp
try:
class_ = getattr(module, class_name)
except AttributeError as exp:
raise FedbiomedError(f"{ErrorNumbers.FB627}, Attribute error while loading the class "
f"{class_name} from {module_path}. Error: {exp}") from exp
sys.path.pop(0)
return module, class_
import_class_from_spec
import_class_from_spec(code, class_name)
Import a module from a code and extract the code of a specified class of the module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code | str | code of the module | required |
class_name | str | name of the class | required |
Returns:
Type | Description |
---|---|
Tuple[Any, Any] | Tuple of the module created and the extracted class |
Raises:
Type | Description |
---|---|
FedbiomedError | bad argument type |
FedbiomedError | cannot load module or extract clas |
Source code in fedbiomed/common/utils/_utils.py
def import_class_from_spec(code: str, class_name: str) -> Tuple[Any, Any] :
"""Import a module from a code and extract the code of a specified class of the module.
Args:
code: code of the module
class_name: name of the class
Returns:
Tuple of the module created and the extracted class
Raises:
FedbiomedError: bad argument type
FedbiomedError: cannot load module or extract clas
"""
for arg in [code, class_name]:
if not isinstance(arg, str):
raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Expected argument type is string but got '{type(arg)}'")
try:
spec = importlib.util.spec_from_loader("module_", loader=None)
module = importlib.util.module_from_spec(spec)
exec(code, module.__dict__)
except Exception as e:
raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Can not load module from given code: {e}")
try:
class_ = getattr(module, class_name)
except AttributeError:
raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Can not import {class_name} from given code")
return module, class_
import_class_object_from_file
import_class_object_from_file(module_path, class_name)
Import a module from a file and create an instance of a specified class of the module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
module_path | str | path to python module file | required |
class_name | str | name of the class | required |
Returns:
Type | Description |
---|---|
Tuple[Any, Any] | Tuple of the module created and the training plan object created |
Raises:
Type | Description |
---|---|
FedbiomedError | bad argument type |
FedbiomedError | cannot instantiate object |
Source code in fedbiomed/common/utils/_utils.py
def import_class_object_from_file(module_path: str, class_name: str) -> Tuple[Any, Any]:
"""Import a module from a file and create an instance of a specified class of the module.
Args:
module_path: path to python module file
class_name: name of the class
Returns:
Tuple of the module created and the training plan object created
Raises:
FedbiomedError: bad argument type
FedbiomedError: cannot instantiate object
"""
for arg in [module_path, class_name]:
if not isinstance(arg, str):
raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Expected argument type is string but got '{type(arg)}'")
module, train_class = import_class_from_file(module_path, class_name)
try:
train_class_instance = train_class()
except Exception as e:
raise FedbiomedError(f"{ErrorNumbers.FB627.value}: Cannot instantiate training plan object: {e}")
return module, train_class_instance
matching_parties_biprime
matching_parties_biprime(context, parties)
Check if parties of given context are compatible with the parties of a secagg biprime element.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context | dict | context to be compared with the secagg biprime element parties | required |
parties | list | the secagg biprime element parties | required |
Returns:
Type | Description |
---|---|
bool | True if this context can be used with this element, False if not. |
Source code in fedbiomed/common/utils/_secagg_utils.py
def matching_parties_biprime(context: dict, parties: list) -> bool:
"""Check if parties of given context are compatible with the parties
of a secagg biprime element.
Args:
context: context to be compared with the secagg biprime element parties
parties: the secagg biprime element parties
Returns:
True if this context can be used with this element, False if not.
"""
# Need to ensure that:
# - either the existing element is not attached to specific parties (None)
# - or existing element was established for the same parties or a superset of the parties
# (order can differ)
return (
# Commented tests can be assumed from calling functions
#
# isinstance(context, dict) and
# 'parties' in context and
# isinstance(parties, list) and
(
context['parties'] is None or (
# isinstance(context['parties'], list) and
set(parties).issubset(set(context['parties']))
)))
matching_parties_dh
matching_parties_dh(context, parties)
Check if parties of given context are compatible with the parties of a secagg Diffie Hellman element.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context | dict | context to be compared with the secagg servkey element parties | required |
parties | list | the secagg servkey element parties | required |
Returns:
Type | Description |
---|---|
bool | True if this context can be used with this element, False if not. |
Source code in fedbiomed/common/utils/_secagg_utils.py
def matching_parties_dh(context: dict, parties: list) -> bool:
"""Check if parties of given context are compatible with the parties
of a secagg Diffie Hellman element.
Args:
context: context to be compared with the secagg servkey element parties
parties: the secagg servkey element parties
Returns:
True if this context can be used with this element, False if not.
"""
# Need to ensure that:
# - no check on first party (no cryptographic material attached to the researcher).
# The context is established for a given experiment, thus a given researcher but this should
# be tested prior to this call.
# - existing element was established for the same node parties or a superset of the node parties
# (order can differ, as nodes are ordered by the cipher code)
#
# eg: [ 'un', 'deux', 'trois' ] parties compatible with [ 'un', 'trois', 'deux' ] context
# but not with [ 'deux', 'un', 'trois' ]
# eg: [ 'un', 'deux', 'trois' ] parties compatible with [ 'un', 'trois', 'quatre', 'deux' ] context
# but not with [ 'un', 'deux', 'quatre' ]
return (
# Commented tests can be assumed from calling functions
#
# isinstance(context, dict) and
# 'parties' in context and
# isinstance(context['parties'], list) and
# len(context['parties']) >= 1 and
# isinstance(parties, list) and
set(parties[1:]).issubset(set(context['parties'][1:])))
matching_parties_servkey
matching_parties_servkey(context, parties)
Check if parties of given context are compatible with the parties of a secagg servkey element.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context | dict | context to be compared with the secagg servkey element parties | required |
parties | list | the secagg servkey element parties | required |
Returns:
Type | Description |
---|---|
bool | True if this context can be used with this element, False if not. |
Source code in fedbiomed/common/utils/_secagg_utils.py
def matching_parties_servkey(context: dict, parties: list) -> bool:
"""Check if parties of given context are compatible with the parties
of a secagg servkey element.
Args:
context: context to be compared with the secagg servkey element parties
parties: the secagg servkey element parties
Returns:
True if this context can be used with this element, False if not.
"""
# Need to ensure that:
# - existing element was established for the same parties
# - first party needs to be the same for both
# - set of other parties needs to be the same for both (order can differ)
#
# eg: [ 'un', 'deux', 'trois' ] compatible with [ 'un', 'trois', 'deux' ]
# but not with [ 'deux', 'un', 'trois' ]
return (
# Commented tests can be assumed from calling functions
#
# isinstance(context, dict) and
# 'parties' in context and
# isinstance(context['parties'], list) and
# len(context['parties']) >= 1 and
# isinstance(parties, list) and
parties[0] == context['parties'][0] and
set(parties[1:]) == set(context['parties'][1:]))
multiply
multiply(xs, k)
Multiplies a list of integers by a constant
Parameters:
Name | Type | Description | Default |
---|---|---|---|
xs | List[int] | List of integers | required |
k | int | Constant to multiply by | required |
Returns:
Type | Description |
---|---|
List[int] | List of multiplied integers |
Source code in fedbiomed/common/utils/_secagg_utils.py
def multiply(xs: List[int], k: int) -> List[int]:
"""
Multiplies a list of integers by a constant
Args:
xs: List of integers
k: Constant to multiply by
Returns:
List of multiplied integers
"""
# Quicker than converting to/from numpy
return [e * k for e in xs]
quantize
quantize(weights, clipping_range=None, target_range=SAParameters.TARGET_RANGE)
Quantization step implemented by: https://dl.acm.org/doi/pdf/10.1145/3488659.3493776
This function returns a vector in the range [0, target_range-1].
Parameters:
Name | Type | Description | Default |
---|---|---|---|
weights | List[float] | List of model weight values | required |
clipping_range | Union[int, None] | Clipping range | None |
target_range | int | Target range | TARGET_RANGE |
Returns:
Type | Description |
---|---|
List[int] | Quantized model weights as numpy array. |
Source code in fedbiomed/common/utils/_secagg_utils.py
def quantize(
weights: List[float],
clipping_range: Union[int, None] = None,
target_range: int = SAParameters.TARGET_RANGE,
) -> List[int]:
"""Quantization step implemented by: https://dl.acm.org/doi/pdf/10.1145/3488659.3493776
This function returns a vector in the range [0, target_range-1].
Args:
weights: List of model weight values
clipping_range: Clipping range
target_range: Target range
Returns:
Quantized model weights as numpy array.
"""
if clipping_range is None:
clipping_range = SAParameters.CLIPPING_RANGE
_check_clipping_range(weights, clipping_range)
# CAVEAT: ensure to be converting from `float` to `uint64`` (no intermediate `int64`)
# Process ensures an to compute an `int`` in the range [0, target_range -1]
# This enables to use at most 2**64 as target_range (max value of `uint` - 1)
f = np.vectorize(
lambda x: min(
target_range - 1,
(sorted((-clipping_range, x, clipping_range))[1] +
clipping_range) *
target_range / (2 * clipping_range),
),
otypes=[np.uint64]
)
quantized_list = f(weights)
return quantized_list.tolist()
raise_for_version_compatibility
raise_for_version_compatibility(their_version, our_version, error_msg=None)
Check version compatibility and behave accordingly.
Raises an exception if the versions are incompatible, otherwise outputs a warning or info message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
their_version | Union[FBM_Component_Version, str] | the version that we detected in the component | required |
our_version | Union[FBM_Component_Version, str] | the version of the component within the current runtime | required |
error_msg | Optional[str] | an optional error message. It may contain two %s placeholders which will be substituted with the values of their_version and our_version. | None |
Raises:
Type | Description |
---|---|
FedbiomedVersionError | if the versions are incompatible |
Source code in fedbiomed/common/utils/_versions.py
def raise_for_version_compatibility(their_version: Union[FBM_Component_Version, str],
our_version: Union[FBM_Component_Version, str],
error_msg: Optional[str] = None) -> None:
"""Check version compatibility and behave accordingly.
Raises an exception if the versions are incompatible, otherwise outputs a warning or info message.
Args:
their_version: the version that we detected in the component
our_version: the version of the component within the current runtime
error_msg: an optional error message. It may contain two %s placeholders which will be substituted with
the values of their_version and our_version.
Raises:
FedbiomedVersionError: if the versions are incompatible
"""
if isinstance(our_version, str):
our_version = FBM_Component_Version(our_version)
if isinstance(their_version, str):
their_version = FBM_Component_Version(their_version)
if not isinstance(our_version, FBM_Component_Version):
msg = f"{ErrorNumbers.FB625.value}: Component version has incorrect type `our_version` type={str(type(our_version))} value={our_version}"
logger.critical(msg)
raise FedbiomedVersionError(msg)
if not isinstance(their_version, FBM_Component_Version):
msg = f"{ErrorNumbers.FB625.value}: Component version has incorrect type `their_version` type={str(type(their_version))} value={their_version}"
logger.critical(msg)
raise FedbiomedVersionError(msg)
if our_version != their_version:
# note: the checks below rely on the short-circuiting behaviour of the or operator
# (e.g. when checking our_version.minor < their_version.minor we have the guarantee that
# our_version.major == their_version.major
if our_version.major != their_version.major or \
our_version.minor < their_version.minor or \
(our_version.minor == their_version.minor and our_version.micro < their_version.micro):
msg = _create_msg_for_version_check(
f"{ErrorNumbers.FB625.value}: Found incompatible version %s, expected version %s" if error_msg is None else error_msg,
their_version,
our_version
)
logger.critical(msg)
raise FedbiomedVersionError(msg)
else:
msg = _create_msg_for_version_check(
"Found version %s, expected version %s",
their_version,
our_version
)
logger.warning(msg)
read_file
read_file(path)
Read given file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | Path to file to be read | required |
Raises:
Type | Description |
---|---|
FedbiomedError | If the file is not existing or readable. |
Source code in fedbiomed/common/utils/_utils.py
def read_file(path):
"""Read given file
Args:
path: Path to file to be read
Raises:
FedbiomedError: If the file is not existing or readable.
"""
try:
with open(path, "r") as file:
content = file.read()
file.close()
except Exception as e:
raise FedbiomedError(
f"{ErrorNumbers.FB627.value}: Can not read file {path}. Error: {e}"
)
else:
return content
reverse_quantize
reverse_quantize(weights, clipping_range=None, target_range=SAParameters.TARGET_RANGE)
Reverse quantization step implemented by: https://dl.acm.org/doi/pdf/10.1145/3488659.3493776
Args: weights: List of quantized model weights clipping_range: Clipping range used for quantization target_range: Target range used for quantization
Returns:
Type | Description |
---|---|
List[float] | Reversed quantized model weights as numpy array. |
Source code in fedbiomed/common/utils/_secagg_utils.py
def reverse_quantize(
weights: List[int],
clipping_range: Union[int, None] = None,
target_range: int = SAParameters.TARGET_RANGE,
) -> List[float]:
"""Reverse quantization step implemented by: https://dl.acm.org/doi/pdf/10.1145/3488659.3493776
Args:
weights: List of quantized model weights
clipping_range: Clipping range used for quantization
target_range: Target range used for quantization
Returns:
Reversed quantized model weights as numpy array.
"""
if clipping_range is None:
clipping_range = SAParameters.CLIPPING_RANGE
# CAVEAT: there should not be any weight received that does not fit in `uint64`
max_val = np.iinfo(np.uint64).max
if any([v > max_val or v < 0 for v in weights]):
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value}: Cannot reverse quantize, received values exceed maximum number"
)
max_range = clipping_range
min_range = -clipping_range
step_size = (max_range - min_range) / (target_range - 1)
# Compute as input type (`np.uint64` then convert to `np.float64`)
f = np.vectorize(
lambda x: (min_range + step_size * x),
otypes=[np.float64]
)
# TODO: we could check that received values are in the range
weights = np.array(weights, dtype=np.uint64)
reverse_quantized_list = f(weights)
return reverse_quantized_list.tolist()