Classes
SecaggBiprimeContext
SecaggBiprimeContext(parties, secagg_id=None)
Bases: SecaggContext
Handles a Secure Aggregation biprime context element on the researcher side.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parties | List[str] | list of parties participating to the secagg context element setup, named by their unique id ( | required |
secagg_id | Union[str, None] | optional secagg context element ID to use for this element. Default is None, which means a unique element ID will be generated. | None |
Raises:
Type | Description |
---|---|
FedbiomedSecaggError | bad argument type or value |
Source code in fedbiomed/researcher/secagg/_secagg_context.py
def __init__(self, parties: List[str], secagg_id: Union[str, None] = None):
"""Constructor of the class.
Args:
parties: list of parties participating to the secagg context element setup, named
by their unique id (`node_id`, `researcher_id`).
There must be at least 3 parties, and the first party is this researcher
secagg_id: optional secagg context element ID to use for this element.
Default is None, which means a unique element ID will be generated.
Raises:
FedbiomedSecaggError: bad argument type or value
"""
super().__init__(parties, None, secagg_id)
self._element = SecaggElementTypes.BIPRIME
self._secagg_manager = _BPrimeManager
SecaggContext
SecaggContext(parties, experiment_id, secagg_id=None)
Bases: ABC
Handles a Secure Aggregation context element on the researcher side.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parties | List[str] | list of parties participating in the secagg context element setup, named by their unique id ( | required |
experiment_id | Union[str, None] | ID of the experiment to which this secagg context element is attached. None means the element is not attached to a specific experiment | required |
secagg_id | Union[str, None] | optional secagg context element ID to use for this element. Default is None, which means a unique element ID will be generated. | None |
Raises:
Type | Description |
---|---|
FedbiomedSecaggError | bad argument type or value |
Source code in fedbiomed/researcher/secagg/_secagg_context.py
def __init__(self, parties: List[str], experiment_id: Union[str, None], secagg_id: Union[str, None] = None):
"""Constructor of the class.
Args:
parties: list of parties participating in the secagg context element setup, named
by their unique id (`node_id`, `researcher_id`).
There must be at least 3 parties, and the first party is this researcher
experiment_id: ID of the experiment to which this secagg context element is attached.
None means the element is not attached to a specific experiment
secagg_id: optional secagg context element ID to use for this element.
Default is None, which means a unique element ID will be generated.
Raises:
FedbiomedSecaggError: bad argument type or value
"""
self._v = Validator()
self._v.register("nonempty_str_or_none", self._check_secagg_id_type, override=True)
try:
self._v.validate(secagg_id, "nonempty_str_or_none")
except ValidatorError as e:
errmess = f'{ErrorNumbers.FB415.value}: bad parameter `secagg_id` must be a None or non-empty string: {e}'
logger.error(errmess)
raise FedbiomedSecaggError(errmess)
try:
self._v.validate(parties, list)
for p in parties:
self._v.validate(p, str)
except ValidatorError as e:
errmess = f'{ErrorNumbers.FB415.value}: bad parameter `parties` must be a list of strings: {e}'
logger.error(errmess)
raise FedbiomedSecaggError(errmess)
if len(parties) < 3:
errmess = f'{ErrorNumbers.FB415.value}: bad parameter `parties` : {parties} : need ' \
'at least 3 parties for secure aggregation'
logger.error(errmess)
raise FedbiomedSecaggError(errmess)
if environ['ID'] != parties[0]:
raise FedbiomedSecaggError(
f'{ErrorNumbers.FB415.value}: researcher should be the first party.'
)
self._secagg_id = secagg_id if secagg_id is not None else 'secagg_' + str(uuid.uuid4())
self._parties = parties
self._researcher_id = environ['ID']
self._requests = Requests()
self._status = False
self._context = None
self._experiment_id = None
# set experiment ID using setter to validate
self.set_experiment_id(experiment_id)
# one controller per secagg object to prevent any file conflict
self._MPC = MPCController(
tmp_dir=environ["TMP_DIR"],
component_type=ComponentType.RESEARCHER,
component_id=environ["ID"]
)
# to be set in subclasses
self._secagg_manager = None
Attributes
context property
context
experiment_id property
experiment_id
parties property
parties
Getter for secagg parties
Returns:
Type | Description |
---|---|
str | Parties that participates secure aggregation |
secagg_id property
secagg_id
status property
status
Getter for secagg context element status
Returns:
Type | Description |
---|---|
bool |
|
Functions
delete
delete()
Delete secagg context element on defined parties.
Returns:
Type | Description |
---|---|
bool | True if secagg context element could be deleted for all parties, False if at least one of the parties could not delete context element. |
Source code in fedbiomed/researcher/secagg/_secagg_context.py
def delete(self) -> bool:
"""Delete secagg context element on defined parties.
Returns:
True if secagg context element could be deleted for all parties, False if at least
one of the parties could not delete context element.
"""
self._status = False
self._context = None
msg = ResearcherMessages.format_outgoing_message({
'researcher_id': self._researcher_id,
'secagg_id': self._secagg_id,
'element': self._element.value,
'experiment_id': self._experiment_id,
'command': 'secagg-delete',
})
return self._secagg_round(msg, False, self._delete_payload)
load_state_breakpoint staticmethod
load_state_breakpoint(state)
Method for loading secagg state from breakpoint state
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state | Dict[str, Any] | The state that will be loaded | required |
Source code in fedbiomed/researcher/secagg/_secagg_context.py
@staticmethod
def load_state_breakpoint(
state: Dict[str, Any]
) -> 'SecaggContext':
"""
Method for loading secagg state from breakpoint state
Args:
state: The state that will be loaded
"""
# Get class
cls = getattr(importlib.import_module(state["module"]), state["class"])
# Validate experiment id
spec = get_method_spec(cls)
if 'experiment_id' in spec:
secagg = cls(**state["arguments"])
else:
state["arguments"].pop('experiment_id')
secagg = cls(**state["arguments"])
for key, value in state["attributes"].items():
setattr(secagg, key, value)
return secagg
save_state_breakpoint
save_state_breakpoint()
Method for saving secagg state for saving breakpoints
Returns:
Type | Description |
---|---|
Dict[str, Any] | The state of the secagg |
Source code in fedbiomed/researcher/secagg/_secagg_context.py
def save_state_breakpoint(self) -> Dict[str, Any]:
"""Method for saving secagg state for saving breakpoints
Returns:
The state of the secagg
"""
# `_v` and `_requests` dont need to be savec (properly initiated in constructor)
state = {
"class": type(self).__name__,
"module": self.__module__,
"arguments": {
"secagg_id": self._secagg_id,
"parties": self._parties,
"experiment_id": self._experiment_id,
},
"attributes": {
"_status": self._status,
"_context": self._context,
"_researcher_id": self._researcher_id,
}
}
return state
set_experiment_id
set_experiment_id(experiment_id)
Setter for secagg context element experiment_id
Parameters:
Name | Type | Description | Default |
---|---|---|---|
experiment_id | Union[str, None] | ID of the experiment to which this secagg context element is attached. | required |
Raises:
Type | Description |
---|---|
FedbiomedSecaggError | bad argument type or value |
Source code in fedbiomed/researcher/secagg/_secagg_context.py
def set_experiment_id(self, experiment_id: Union[str, None]) -> None:
"""Setter for secagg context element experiment_id
Args:
experiment_id: ID of the experiment to which this secagg context element is attached.
Raises:
FedbiomedSecaggError: bad argument type or value
"""
if not isinstance(experiment_id, (str, type(None))):
errmess = f'{ErrorNumbers.FB415.value}: bad parameter `experiment_id` must be a str or None if the ' \
f'context is set for biprime.'
logger.error(errmess)
raise FedbiomedSecaggError(errmess)
self._experiment_id = experiment_id
setup
setup()
Setup secagg context element on defined parties.
Returns:
Type | Description |
---|---|
bool | True if secagg context element could be setup for all parties, False if at least one of the parties could not setup context element. |
Source code in fedbiomed/researcher/secagg/_secagg_context.py
def setup(self) -> bool:
"""Setup secagg context element on defined parties.
Returns:
True if secagg context element could be setup for all parties, False if at least
one of the parties could not setup context element.
"""
msg = ResearcherMessages.format_outgoing_message({
'researcher_id': self._researcher_id,
'secagg_id': self._secagg_id,
'element': self._element.value,
'experiment_id': self._experiment_id,
'parties': self._parties,
'command': 'secagg',
})
return self._secagg_round(msg, True, self._payload)
SecaggServkeyContext
SecaggServkeyContext(parties, experiment_id, secagg_id=None)
Bases: SecaggContext
Handles a Secure Aggregation server key context element on the researcher side.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parties | List[str] | list of parties participating in the secagg context element setup, named by their unique id ( | required |
experiment_id | str | ID of the experiment to which this secagg context element is attached. | required |
secagg_id | Union[str, None] | optional secagg context element ID to use for this element. Default is None, which means a unique element ID will be generated. | None |
Raises:
Type | Description |
---|---|
FedbiomedSecaggError | bad argument type or value |
Source code in fedbiomed/researcher/secagg/_secagg_context.py
def __init__(self, parties: List[str], experiment_id: str, secagg_id: Union[str, None] = None):
"""Constructor of the class.
Args:
parties: list of parties participating in the secagg context element setup, named
by their unique id (`node_id`, `researcher_id`).
There must be at least 3 parties, and the first party is this researcher
experiment_id: ID of the experiment to which this secagg context element is attached.
secagg_id: optional secagg context element ID to use for this element.
Default is None, which means a unique element ID will be generated.
Raises:
FedbiomedSecaggError: bad argument type or value
"""
super().__init__(parties, experiment_id, secagg_id)
if not self._experiment_id:
errmess = f'{ErrorNumbers.FB415.value}: bad parameter `experiment_id` must be non empty string'
logger.error(errmess)
raise FedbiomedSecaggError(errmess)
self._element = SecaggElementTypes.SERVER_KEY
self._secagg_manager = _SKManager
SecureAggregation
SecureAggregation(active=True, clipping_range=None)
Secure aggregation controller of researcher component.
This class is responsible for;
- setting up the context for Joye-Libert secure aggregation
- Applying secure aggregation after receiving encrypted model parameters from nodes
Attributes:
Name | Type | Description |
---|---|---|
clipping_range | Optional[int] | Clipping range that will be used for quantization of model parameters on the node side. |
_biprime | Optional[SecaggBiprimeContext] | Biprime-key context setup instance. |
_parties | Optional[List[str]] | Nodes and researcher that participates federated training |
_experiment_id | Optional[str] | ID of the current experiment. |
_servkey | Optional[SecaggServkeyContext] | Server-key context setup instance. |
_secagg_crypter | SecaggCrypter | Secure aggregation encrypter and decrypter to decrypt encrypted model parameters. |
_secagg_random | Optional[float] | Random float generated tobe sent to node to validate secure aggregation after aggregation encrypted parameters. |
Assigns default values for attributes
Parameters:
Name | Type | Description | Default |
---|---|---|---|
active | bool | True if secure aggregation is activated for the experiment | True |
clipping_range | Union[None, int] | Clipping range that will be used for quantization of model parameters on the node side. The default will be | None |
Raises:
Type | Description |
---|---|
FedbiomedSecureAggregationError | bad argument type |
Source code in fedbiomed/researcher/secagg/_secure_aggregation.py
def __init__(
self,
active: bool = True,
clipping_range: Union[None, int] = None,
) -> None:
"""Class constructor
Assigns default values for attributes
Args:
active: True if secure aggregation is activated for the experiment
clipping_range: Clipping range that will be used for quantization of model
parameters on the node side. The default will be
[`VEParameters.CLIPPING_RANGE`][fedbiomed.common.constants.VEParameters].
The default value will be automatically set on the node side.
Raises:
FedbiomedSecureAggregationError: bad argument type
"""
if not isinstance(active, bool):
raise FedbiomedSecureAggregationError(
f"{ErrorNumbers.FB417.value}: The argument `active` should be bool of type, "
f"but got {type(active)} "
)
if clipping_range is not None and \
(not isinstance(clipping_range, int) or isinstance(clipping_range, bool)):
raise FedbiomedSecureAggregationError(
f"{ErrorNumbers.FB417.value}: Clipping range should be None or an integer, "
f"but got not {type(clipping_range)}"
)
self.clipping_range: Optional[int] = clipping_range
self._active: bool = active
self._parties: Optional[List[str]] = None
self._experiment_id: Optional[str] = None
self._servkey: Optional[SecaggServkeyContext] = None
self._biprime: Optional[SecaggBiprimeContext] = None
self._secagg_random: Optional[float] = None
self._secagg_crypter: SecaggCrypter = SecaggCrypter()
Attributes
active property
active
biprime property
biprime
Gets biprime object
Returns:
Type | Description |
---|---|
Union[None, SecaggBiprimeContext] | Biprime object, None if biprime is not setup |
clipping_range instance-attribute
clipping_range = clipping_range
experiment_id property
experiment_id
parties property
parties
servkey property
servkey
Gets servkey object
Returns:
Type | Description |
---|---|
Union[None, SecaggServkeyContext] | Servkey object, None if servkey is not setup |
Functions
activate
activate(status)
Set activate status of secure aggregation
Returns:
Type | Description |
---|---|
bool | Status of secure aggregation True if it is activated |
Source code in fedbiomed/researcher/secagg/_secure_aggregation.py
def activate(self, status) -> bool:
"""Set activate status of secure aggregation
Returns:
Status of secure aggregation True if it is activated
"""
if not isinstance(status, bool):
raise FedbiomedSecureAggregationError(
f"{ErrorNumbers.FB417.value}: The argument `status` for activation should be True or False, "
f"but got {type(status)} "
)
self._active = status
return self._active
aggregate
aggregate(round_, total_sample_size, model_params, encryption_factors=None, num_expected_params=1)
Aggregates given model parameters
Parameters:
Name | Type | Description | Default |
---|---|---|---|
round_ | int | current training round number | required |
total_sample_size | int | sum of number of samples used by all nodes | required |
model_params | Dict[str, List[int]] | model parameters from the participating nodes | required |
encryption_factors | Union[Dict[str, List[int]], None] | encryption factors from the participating nodes | None |
num_expected_params | int | number of decrypted parameters to decode from the model parameters | 1 |
Returns:
Type | Description |
---|---|
List[float] | Aggregated parameters |
Raises:
Type | Description |
---|---|
FedbiomedSecureAggregationError | secure aggregation context not properly configured |
FedbiomedSecureAggregationError | secure aggregation computation error |
Source code in fedbiomed/researcher/secagg/_secure_aggregation.py
def aggregate(
self,
round_: int,
total_sample_size: int,
model_params: Dict[str, List[int]],
encryption_factors: Union[Dict[str, List[int]], None] = None,
num_expected_params: int = 1
) -> List[float]:
"""Aggregates given model parameters
Args:
round_: current training round number
total_sample_size: sum of number of samples used by all nodes
model_params: model parameters from the participating nodes
encryption_factors: encryption factors from the participating nodes
num_expected_params: number of decrypted parameters to decode from the model parameters
Returns:
Aggregated parameters
Raises:
FedbiomedSecureAggregationError: secure aggregation context not properly configured
FedbiomedSecureAggregationError: secure aggregation computation error
"""
if self._biprime is None or self._servkey is None:
raise FedbiomedSecureAggregationError(
f"{ErrorNumbers.FB417.value}: Can not aggregate parameters, one of Biprime or Servkey context is"
f"not configured. Please setup secure aggregation before the aggregation.")
if not self._biprime.status or not self._servkey.status:
raise FedbiomedSecureAggregationError(
f"{ErrorNumbers.FB417.value}: Can not aggregate parameters, one of Biprime or Servkey context is"
f"not set properly")
biprime = self._biprime.context["context"]["biprime"]
key = self._servkey.context["context"]["server_key"]
num_nodes = len(model_params)
aggregate = functools.partial(self._secagg_crypter.aggregate,
current_round=round_,
num_nodes=num_nodes,
key=key,
total_sample_size=total_sample_size,
biprime=biprime,
clipping_range=self.clipping_range)
# Validate secure aggregation
if self._secagg_random is not None:
if encryption_factors is None:
raise FedbiomedSecureAggregationError(
f"{ErrorNumbers.FB417.value}: Secure aggregation random validation has been set but the encryption "
f"factors are not provided. Please provide encrypted `secagg_random` values in different parties. "
f"Or to not set/get `secagg_random()` before the aggregation.")
logger.info("Validating secure aggregation results...")
encryption_factors = [f for k, f in encryption_factors.items()]
validation: List[float] = aggregate(params=encryption_factors, num_expected_params=1)
if len(validation) != 1 or not math.isclose(validation[0], self._secagg_random, abs_tol=0.03):
raise FedbiomedSecureAggregationError(
f"{ErrorNumbers.FB417.value}: Aggregation is failed due to incorrect decryption."
)
logger.info("Validation is completed.")
elif encryption_factors is not None:
logger.warning("Encryption factors are provided while secagg random is None. Please make sure secure "
"aggregation steps are applied correctly.")
logger.info("Aggregating encrypted parameters. This process may take some time depending on model size.")
# Aggregate parameters
params = [p for _, p in model_params.items()]
aggregated_params = aggregate(params=params, num_expected_params=num_expected_params)
return aggregated_params
load_state_breakpoint classmethod
load_state_breakpoint(state)
Create a SecureAggregation
object from a saved state
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state | Dict | saved state to restore in the created object | required |
Returns:
Type | Description |
---|---|
SecureAggregation | The created |
Source code in fedbiomed/researcher/secagg/_secure_aggregation.py
@classmethod
def load_state_breakpoint(
cls,
state: Dict
) -> 'SecureAggregation':
"""Create a `SecureAggregation` object from a saved state
Args:
state: saved state to restore in the created object
Returns:
The created `SecureAggregation` object
"""
secagg = cls(**state["arguments"])
if state["attributes"]["_biprime"] is not None:
state["attributes"]["_biprime"] = SecaggBiprimeContext. \
load_state_breakpoint(state=state["attributes"]["_biprime"])
if state["attributes"]["_servkey"] is not None:
state["attributes"]["_servkey"] = SecaggServkeyContext. \
load_state_breakpoint(state=state["attributes"]["_servkey"])
# Set attributes
for name, val in state["attributes"].items():
setattr(secagg, name, val)
return secagg
save_state_breakpoint
save_state_breakpoint()
Saves state of the secagg
Returns:
Type | Description |
---|---|
Dict[str, Any] | The secagg state to be saved |
Source code in fedbiomed/researcher/secagg/_secure_aggregation.py
def save_state_breakpoint(self) -> Dict[str, Any]:
"""Saves state of the secagg
Returns:
The secagg state to be saved
"""
state = {
"class": type(self).__name__,
"module": self.__module__,
"arguments": {
'active': self._active,
'clipping_range': self.clipping_range,
},
"attributes": {
"_biprime": self._biprime.save_state_breakpoint() if self._biprime is not None else None,
"_servkey": self._servkey.save_state_breakpoint() if self._servkey is not None else None,
"_experiment_id": self._experiment_id,
"_parties": self._parties
}
}
return state
setup
setup(parties, experiment_id, force=False)
Setup secure aggregation instruments.
Requires setting parties
and experiment_id
if they are not set in previous secagg setups. It is possible to execute without any argument if SecureAggregation has already parties
and experiment_id
defined. This feature provides researcher execute secagg.setup()
if any connection issue
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parties | List[str] | Parties that participates secure aggregation | required |
experiment_id | str | The id of the experiment | required |
force | bool | Forces secagg setup even context is already existing | False |
Returns:
Type | Description |
---|---|
Status of setup |
Raises FedbiomedSecureAggregationError: Invalid argument type
Source code in fedbiomed/researcher/secagg/_secure_aggregation.py
def setup(self,
parties: List[str],
experiment_id: str,
force: bool = False):
"""Setup secure aggregation instruments.
Requires setting `parties` and `experiment_id` if they are not set in previous secagg
setups. It is possible to execute without any argument if SecureAggregation
has already `parties` and `experiment_id` defined. This feature provides researcher
execute `secagg.setup()` if any connection issue
Args:
parties: Parties that participates secure aggregation
experiment_id: The id of the experiment
force: Forces secagg setup even context is already existing
Returns:
Status of setup
Raises
FedbiomedSecureAggregationError: Invalid argument type
"""
if not isinstance(parties, list):
raise FedbiomedSecureAggregationError(
f"{ErrorNumbers.FB417.value}: Expected argument `parties` list but got {type(parties)}"
)
if not isinstance(experiment_id, str):
raise FedbiomedSecureAggregationError(
f"{ErrorNumbers.FB417.value}: Expected argument `experiment_id` string but got {type(parties)}"
)
self._configure_round(parties, experiment_id)
if self._biprime is None or self._servkey is None:
raise FedbiomedSecureAggregationError(
f"{ErrorNumbers.FB417.value}: server key or biprime contexts is not fully configured."
)
if not self._biprime.status or force:
self._biprime.setup()
if not self._servkey.status or force:
self._servkey.setup()
return True
train_arguments
train_arguments()
Gets train arguments for secagg train request
Returns:
Type | Description |
---|---|
Dict | Arguments that is going tobe attached to the experiment. |
Source code in fedbiomed/researcher/secagg/_secure_aggregation.py
def train_arguments(self) -> Dict:
"""Gets train arguments for secagg train request
Returns:
Arguments that is going tobe attached to the experiment.
"""
return {'secagg_servkey_id': self._servkey.secagg_id if self._servkey is not None else None,
'secagg_biprime_id': self._biprime.secagg_id if self._biprime is not None else None,
'secagg_random': self._secagg_random,
'secagg_clipping_range': self.clipping_range}