Attributes
Functions
compute_dot_product
compute_dot_product(model, params, device=None)
Compute the dot product between model and input parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model | dict | OrderedDict representing model state | required |
params | dict | OrderedDict containing correction parameters | required |
Returns:
Type | Description |
---|---|
tensor | A tensor containing a single numerical value which is the dot product. |
Source code in fedbiomed/common/utils/_utils.py
def compute_dot_product(model: dict, params: dict, device: Optional[str] = None) -> torch.tensor:
"""Compute the dot product between model and input parameters.
Args:
model: OrderedDict representing model state
params: OrderedDict containing correction parameters
Returns:
A tensor containing a single numerical value which is the dot product.
"""
model_p = model.values()
correction_state = params.values()
if device is None:
if model_p:
device = list(model_p)[0].device
else:
# if device is not found, set it to `cpu`
device = 'cpu'
dot_prod = sum([torch.sum(m * torch.tensor(p).float().to(device)) for m, p in zip(model_p, correction_state)])
return dot_prod
convert_iterator_to_list_of_python_floats
convert_iterator_to_list_of_python_floats(iterator)
Converts numerical values of array-like object to float
Parameters:
Name | Type | Description | Default |
---|---|---|---|
iterator | Iterator | Array-like numeric object to convert numerics to float | required |
Returns:
Type | Description |
---|---|
List[float] | Numerical elements as converted to List of floats |
Source code in fedbiomed/common/utils/_utils.py
def convert_iterator_to_list_of_python_floats(iterator: Iterator) -> List[float]:
"""Converts numerical values of array-like object to float
Args:
iterator: Array-like numeric object to convert numerics to float
Returns:
Numerical elements as converted to List of floats
"""
if not isinstance(iterator, Iterable):
raise FedbiomedError(f"object {type(iterator)} is not iterable")
list_of_floats = []
if isinstance(iterator, dict):
# specific processing for dictionaries
for val in iterator.values():
list_of_floats.append(convert_to_python_float(val))
else:
for it in iterator:
list_of_floats.append(convert_to_python_float(it))
return list_of_floats
convert_to_python_float
convert_to_python_float(value)
Convert numeric types to float
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value | Union[Tensor, integer, floating, float, int] | value to convert python type float | required |
Returns:
Type | Description |
---|---|
float | Python float |
Source code in fedbiomed/common/utils/_utils.py
def convert_to_python_float(value: Union[torch.Tensor, np.integer, np.floating, float, int]) -> float:
""" Convert numeric types to float
Args:
value: value to convert python type float
Returns:
Python float
"""
if not isinstance(value, (torch.Tensor, np.integer, np.floating, float, int)):
raise FedbiomedError(f"Converting {type(value)} to python to float is not supported.")
# if the result is a tensor, convert it back to numpy
if isinstance(value, torch.Tensor):
value = value.numpy()
if isinstance(value, Iterable) and value.size > 1:
raise FedbiomedError("Can not convert array-type objects to float.")
return float(value)
get_all_existing_certificates
get_all_existing_certificates()
Gets all existing certificates from Fed-BioMed etc
directory.
This method parse all available configs in etc
directory.
Returns:
Type | Description |
---|---|
List[Dict[str, str]] | List of certificate objects that contain component type as |
List[Dict[str, str]] | (not path) as |
Source code in fedbiomed/common/utils/_config_utils.py
def get_all_existing_certificates() -> List[Dict[str, str]]:
"""Gets all existing certificates from Fed-BioMed `etc` directory.
This method parse all available configs in `etc` directory.
Returns:
List of certificate objects that contain component type as `component`, party id `id`, public key content
(not path) as `certificate`.
"""
config_files = get_all_existing_config_files()
certificates = []
for config in config_files:
certificates.append(get_component_certificate_from_config(config))
return certificates
get_all_existing_config_files
get_all_existing_config_files()
Gets all existing config files from Fed-BioMed etc
directory
Source code in fedbiomed/common/utils/_config_utils.py
def get_all_existing_config_files():
"""Gets all existing config files from Fed-BioMed `etc` directory"""
etc = os.path.join(ROOT_DIR, CONFIG_FOLDER_NAME, '')
return [file for file in glob.glob(f"{etc}*.ini")]
get_class_source
get_class_source(cls)
Get source of the class.
It uses different methods for getting the class source based on shell type; IPython,Notebook shells or Python shell
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cls | Callable | The class to extract the source code from | required |
Returns:
Name | Type | Description |
---|---|---|
str | str | Source code of the given class |
Raises:
Type | Description |
---|---|
FedbiomedError | if argument is not a class |
Source code in fedbiomed/common/utils/_utils.py
def get_class_source(cls: Callable) -> str:
"""Get source of the class.
It uses different methods for getting the class source based on shell type; IPython,Notebook
shells or Python shell
Args:
cls: The class to extract the source code from
Returns:
str: Source code of the given class
Raises:
FedbiomedError: if argument is not a class
"""
if not inspect.isclass(cls):
raise FedbiomedError('The argument `cls` must be a python class')
# Check ipython status
status = is_ipython()
if status:
file = get_ipython_class_file(cls)
codes = "".join(inspect.linecache.getlines(file))
class_code = extract_symbols(codes, cls.__name__)[0][0]
return class_code
else:
return inspect.getsource(cls)
get_component_certificate_from_config
get_component_certificate_from_config(config_path)
Gets component certificate, id and component type by given config file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_path | str | Path where config file is located. | required |
Returns:
Type | Description |
---|---|
Dict[str, str] | Certificate object that contains component type as |
Raises:
Type | Description |
---|---|
FedbiomedError |
|
Source code in fedbiomed/common/utils/_config_utils.py
def get_component_certificate_from_config(
config_path: str
) -> Dict[str, str]:
"""Gets component certificate, id and component type by given config file path.
Args:
config_path: Path where config file is located.
Returns:
Certificate object that contains component type as `component`, party id `id`, public key content
(not path) as `certificate`
Raises:
FedbiomedError:
- If config file does not contain `node_id` or `researcher_id` under `default` section.
- If config file does not contain `public_key` under `ssl` section.
- If certificate file is not found or not readable
"""
config = get_component_config(config_path)
component_id = config.get("default", "id")
component_type = config.get("default", "component")
ip = config.get("mpspdz", "mpspdz_ip")
port = config.get("mpspdz", "mpspdz_port")
certificate_path = os.path.join(os.path.dirname(config_path), config.get("mpspdz", "public_key"))
if not os.path.isfile(certificate_path):
raise FedbiomedError(f"The certificate for component '{component_id}' not found in {certificate_path}")
try:
with open(certificate_path, 'r') as file:
certificate = file.read()
except Exception as e:
raise FedbiomedError(f"Error while reading certificate -> {certificate_path}. Error: {e}")
return {
"party_id": component_id,
"certificate": certificate,
"ip": ip,
"port": port,
"component": component_type
}
get_component_config
get_component_config(config_path)
Gets config object from given config path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_path | str | The path where config file is stored. | required |
Returns:
Type | Description |
---|---|
ConfigParser | Configuration object. |
Raises:
Type | Description |
---|---|
FedbiomedError | If config file is not readable or not existing. |
Source code in fedbiomed/common/utils/_config_utils.py
def get_component_config(
config_path: str
) -> configparser.ConfigParser:
"""Gets config object from given config path.
Args:
config_path: The path where config file is stored.
Returns:
Configuration object.
Raises:
FedbiomedError: If config file is not readable or not existing.
"""
config = configparser.ConfigParser()
try:
config.read(config_path)
except Exception:
raise FedbiomedError(f"Can not read config file. Please make sure it is existing or it has valid format. "
f"{config_path}")
return config
get_existing_component_db_names
get_existing_component_db_names()
Gets DB_PATHs of all existing components in Fed-BioMed root
Source code in fedbiomed/common/utils/_config_utils.py
def get_existing_component_db_names():
"""Gets DB_PATHs of all existing components in Fed-BioMed root"""
config_files = get_all_existing_config_files()
db_names = {}
for _config in config_files:
config = get_component_config(_config)
component_id = config['default']['id']
db_name = f"{DB_PREFIX}{component_id}"
db_names = {**db_names, component_id: db_name}
return db_names
get_ipython_class_file
get_ipython_class_file(cls)
Get source file/cell-id of the class which is defined in ZMQInteractiveShell or TerminalInteractiveShell
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cls | Callable | Python class defined on the IPython kernel | required |
Returns:
Type | Description |
---|---|
str | File path or id of Jupyter cell. On IPython's interactive shell, it returns cell ID |
Source code in fedbiomed/common/utils/_utils.py
def get_ipython_class_file(cls: Callable) -> str:
"""Get source file/cell-id of the class which is defined in ZMQInteractiveShell or TerminalInteractiveShell
Args:
cls: Python class defined on the IPython kernel
Returns:
File path or id of Jupyter cell. On IPython's interactive shell, it returns cell ID
"""
# Lookup by parent module
if hasattr(cls, '__module__'):
object_ = sys.modules.get(cls.__module__)
# If module has `__file__` attribute
if hasattr(object_, '__file__'):
return object_.__file__
# If parent module is __main__
for name, member in inspect.getmembers(cls):
if inspect.isfunction(member) and cls.__qualname__ + '.' + member.__name__ == member.__qualname__:
return inspect.getfile(member)
else:
raise FedbiomedError(f'{cls} has no attribute `__module__`, source is not found.')
get_method_spec
get_method_spec(method)
Helper to get argument specification
Parameters:
Name | Type | Description | Default |
---|---|---|---|
method | Callable | The function/method to extract argument specification from | required |
Returns:
Type | Description |
---|---|
dict | Specification of the method |
Source code in fedbiomed/common/utils/_utils.py
def get_method_spec(method: Callable) -> dict:
""" Helper to get argument specification
Args:
method: The function/method to extract argument specification from
Returns:
Specification of the method
"""
method_spec = {}
parameters = inspect.signature(method).parameters
for (key, val) in parameters.items():
method_spec[key] = {
'name': val.name,
'default': None if val.default is inspect._empty else val.default,
'annotation': None if val.default is inspect._empty else val.default
}
return method_spec
is_ipython
is_ipython()
Function that checks whether the codes (function itself) is executed in ipython kernel or not
Returns:
Type | Description |
---|---|
bool | True, if python interpreter is IPython |
Source code in fedbiomed/common/utils/_utils.py
def is_ipython() -> bool:
"""
Function that checks whether the codes (function itself) is executed in ipython kernel or not
Returns:
True, if python interpreter is IPython
"""
ipython_shells = ['ZMQInteractiveShell', 'TerminalInteractiveShell']
try:
shell = get_ipython().__class__.__name__
if shell in ipython_shells:
return True
else:
return False
except NameError:
return False
matching_parties_biprime
matching_parties_biprime(context, parties)
Check if parties of given context are compatible with the parties of a secagg biprime element.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context | dict | context to be compared with the secagg biprime element parties | required |
parties | list | the secagg biprime element parties | required |
Returns:
Type | Description |
---|---|
bool | True if this context can be used with this element, False if not. |
Source code in fedbiomed/common/utils/_secagg_utils.py
def matching_parties_biprime(context: dict, parties: list) -> bool:
"""Check if parties of given context are compatible with the parties
of a secagg biprime element.
Args:
context: context to be compared with the secagg biprime element parties
parties: the secagg biprime element parties
Returns:
True if this context can be used with this element, False if not.
"""
# Need to ensure that:
# - either the existing element is not attached to specific parties (None)
# - or existing element was established for the same parties or a superset of the parties
return (
# Commented tests can be assumed from calling functions
#
# isinstance(context, dict) and
# 'parties' in context and
# isinstance(parties, list) and
(
context['parties'] is None or (
# isinstance(context['parties'], list) and
set(parties).issubset(set(context['parties']))
)))
matching_parties_servkey
matching_parties_servkey(context, parties)
Check if parties of given context are compatible with the parties of a secagg servkey element.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context | dict | context to be compared with the secagg servkey element parties | required |
parties | list | the secagg servkey element parties | required |
Returns:
Type | Description |
---|---|
bool | True if this context can be used with this element, False if not. |
Source code in fedbiomed/common/utils/_secagg_utils.py
def matching_parties_servkey(context: dict, parties: list) -> bool:
"""Check if parties of given context are compatible with the parties
of a secagg servkey element.
Args:
context: context to be compared with the secagg servkey element parties
parties: the secagg servkey element parties
Returns:
True if this context can be used with this element, False if not.
"""
# Need to ensure that:
# - existing element was established for the same parties
# - first party needs to be the same for both
# - set of other parties needs to be the same for both (order can differ)
#
# eg: [ 'un', 'deux', 'trois' ] compatible with [ 'un', 'trois', 'deux' ]
# but not with [ 'deux', 'un', 'trois' ]
return (
# Commented tests can be assumed from calling functions
#
# isinstance(context, dict) and
# 'parties' in context and
# isinstance(context['parties'], list) and
# len(context['parties']) >= 1 and
# isinstance(parties, list) and
parties[0] == context['parties'][0] and
set(parties[1:]) == set(context['parties'][1:]))
raise_for_version_compatibility
raise_for_version_compatibility(their_version, our_version, error_msg=None)
Check version compatibility and behave accordingly.
Raises an exception if the versions are incompatible, otherwise outputs a warning or info message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
their_version | Union[FBM_Component_Version, str] | the version that we detected in the component | required |
our_version | Union[FBM_Component_Version, str] | the version of the component within the current runtime | required |
error_msg | Optional[str] | an optional error message. It may contain two %s placeholders which will be substituted with the values of their_version and our_version. | None |
Raises:
Type | Description |
---|---|
FedbiomedVersionError | if the versions are incompatible |
Source code in fedbiomed/common/utils/_versions.py
def raise_for_version_compatibility(their_version: Union[FBM_Component_Version, str],
our_version: Union[FBM_Component_Version, str],
error_msg: Optional[str] = None) -> None:
"""Check version compatibility and behave accordingly.
Raises an exception if the versions are incompatible, otherwise outputs a warning or info message.
Args:
their_version: the version that we detected in the component
our_version: the version of the component within the current runtime
error_msg: an optional error message. It may contain two %s placeholders which will be substituted with
the values of their_version and our_version.
Raises:
FedbiomedVersionError: if the versions are incompatible
"""
if isinstance(our_version, str):
our_version = FBM_Component_Version(our_version)
if isinstance(their_version, str):
their_version = FBM_Component_Version(their_version)
if not isinstance(our_version, FBM_Component_Version):
msg = f"{ErrorNumbers.FB625.value}: Component version has incorrect type `our_version` type={str(type(our_version))} value={our_version}"
logger.critical(msg)
raise FedbiomedVersionError(msg)
if not isinstance(their_version, FBM_Component_Version):
msg = f"{ErrorNumbers.FB625.value}: Component version has incorrect type `their_version` type={str(type(their_version))} value={their_version}"
logger.critical(msg)
raise FedbiomedVersionError(msg)
if our_version != their_version:
# note: the checks below rely on the short-circuiting behaviour of the or operator
# (e.g. when checking our_version.minor < their_version.minor we have the guarantee that
# our_version.major == their_version.major
if our_version.major != their_version.major or \
our_version.minor < their_version.minor or \
(our_version.minor == their_version.minor and our_version.micro < their_version.micro):
msg = _create_msg_for_version_check(
f"{ErrorNumbers.FB625.value}: Found incompatible version %s, expected version %s" if error_msg is None else error_msg,
their_version,
our_version
)
logger.critical(msg)
raise FedbiomedVersionError(msg)
else:
msg = _create_msg_for_version_check(
"Found version %s, expected version %s",
their_version,
our_version
)
logger.warning(msg)
read_file
read_file(path)
Read given file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | Path to file to be read | required |
Raises:
Type | Description |
---|---|
FedbiomedError | If the file is not existing or readable. |
Source code in fedbiomed/common/utils/_utils.py
def read_file(path):
"""Read given file
Args:
path: Path to file to be read
Raises:
FedbiomedError: If the file is not existing or readable.
"""
try:
with open(path, "r") as file:
content = file.read()
file.close()
except Exception as e:
raise FedbiomedError(
f"Can not read file {path}. Error: {e}"
)
else:
return content