Classes
Experiment
Experiment(*args, aggregator=None, agg_optimizer=None, node_selection_strategy=None, round_limit=None, tensorboard=False, retain_full_history=True, **kwargs)
Bases: TrainingPlanWorkflow
A Federated Learning Experiment based on a Training Plan.
This class provides a comprehensive entry point for the management and orchestration of a FL experiment, including definition, execution, and interpretation of results.
Managing model parameters
The model parameters should be managed through the corresponding methods in the training_plan by accessing the experiment's training_plan()
attribute and using the set_model_params
and get_model_params
functions, e.g.
exp.training_plan().set_model_params(params_dict)
Do not set the training plan attribute directly
Setting the training_plan
attribute directly is not allowed. Instead, use the set_training_plan_class
method to set the training plan type, and the underlying model will be correctly constructed and initialized.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aggregator | Optional[Aggregator] | object defining the method for aggregating local updates. Default to None (use | None |
agg_optimizer | Optional[Optimizer] |
| None |
node_selection_strategy | Optional[Strategy] | object defining how nodes are sampled at each round for training, and how non-responding nodes are managed. Defaults to None: - use | None |
round_limit | Union[int, None] | the maximum number of training rounds (nodes <-> central server) that should be executed for the experiment. | None |
tensorboard | bool | whether to save scalar values for displaying in Tensorboard during training for each node. Currently, it is only used for loss values. - If it is true, monitor instantiates a | False |
retain_full_history | bool | whether to retain in memory the full history of node replies and aggregated params for the experiment. If False, only the last round's replies and aggregated params will be available. Defaults to True. | True |
*args | Extra positional arguments from parent class | () | |
**kwargs | Arguments of parent class | {} |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def __init__(
self,
*args,
aggregator: Optional[Aggregator] = None,
agg_optimizer: Optional[Optimizer] = None,
node_selection_strategy: Optional[Strategy] = None,
round_limit: Union[int, None] = None,
tensorboard: bool = False,
retain_full_history: bool = True,
**kwargs
) -> None:
"""Constructor of the class.
Args:
aggregator: object defining the method for aggregating
local updates. Default to None (use
[`FedAverage`][fedbiomed.researcher.aggregators.FedAverage] for aggregation)
agg_optimizer: [`Optimizer`][fedbiomed.common.optimizers.Optimizer] instance,
to refine aggregated model updates prior to their application. If None,
merely apply the aggregated updates.
node_selection_strategy: object defining how nodes are sampled at
each round for training, and how non-responding nodes are managed.
Defaults to None:
- use [`DefaultStrategy`][fedbiomed.researcher.strategies.DefaultStrategy]
if training_data is initialized
- else strategy is None (cannot be initialized), experiment cannot be launched yet
round_limit: the maximum number of training rounds (nodes <-> central server)
that should be executed for the experiment. `None` means that no limit is
defined. Defaults to None.
tensorboard: whether to save scalar values for displaying in Tensorboard
during training for each node. Currently, it is only used for loss values.
- If it is true, monitor instantiates a `Monitor` object
that write scalar logs into `./runs` directory.
- If it is False, it stops monitoring if it was active.
retain_full_history: whether to retain in memory the full history
of node replies and aggregated params for the experiment. If False, only the
last round's replies and aggregated params will be available. Defaults to True.
*args: Extra positional arguments from parent class
[`TrainingPlanWorkflow`][fedbiomed.researcher.federated_workflows.TrainingPlanWorkflow]
**kwargs: Arguments of parent class
[`TrainingPlanWorkflow`][fedbiomed.researcher.federated_workflows.TrainingPlanWorkflow]
"""
# define new members
self._node_selection_strategy: Strategy = None
self._round_limit = None
self._monitor = None
self._aggregator = None
self._agg_optimizer = None
self.aggregator_args = {}
self._aggregated_params = {}
self._training_replies: Dict = {}
self._retain_full_history = None
# initialize object
super().__init__(*args, **kwargs)
# set self._aggregator : type Aggregator
self.set_aggregator(aggregator)
# set self._agg_optimizer: type Optional[Optimizer]
self.set_agg_optimizer(agg_optimizer)
# set self._node_selection_strategy: type Union[Strategy, None]
self.set_strategy(node_selection_strategy)
# "current" means number of rounds already trained
self._set_round_current(0)
self.set_round_limit(round_limit)
# always create a monitoring process
self._monitor = Monitor()
self._reqs.add_monitor_callback(self._monitor.on_message_handler)
self.set_tensorboard(tensorboard)
# whether to retain the full experiment history or not
self.set_retain_full_history(retain_full_history)
Attributes
aggregator_args instance-attribute
aggregator_args = {}
Functions
agg_optimizer
agg_optimizer()
Retrieves the optional Optimizer used to refine aggregated model updates.
To set or update that optimizer: set_agg_optimizer
.
Returns:
Type | Description |
---|---|
Optional[Optimizer] | An Optimizer instance, |
Optional[Optimizer] | or None. |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def agg_optimizer(self) -> Optional[Optimizer]:
"""Retrieves the optional Optimizer used to refine aggregated model updates.
To set or update that optimizer:
[`set_agg_optimizer`][fedbiomed.researcher.federated_workflows.Experiment.set_agg_optimizer].
Returns:
An [Optimizer][fedbiomed.common.optimizers.Optimizer] instance,
or None.
"""
return self._agg_optimizer
aggregated_params
aggregated_params()
Retrieves all aggregated parameters of each round of training
Returns:
Type | Description |
---|---|
dict | Dictionary of aggregated parameters keys stand for each round of training |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def aggregated_params(self) -> dict:
"""Retrieves all aggregated parameters of each round of training
Returns:
Dictionary of aggregated parameters keys stand for each round of training
"""
return self._aggregated_params
aggregator
aggregator()
Retrieves aggregator class that will be used for aggregating model parameters.
To set or update aggregator: set_aggregator
.
Returns:
Type | Description |
---|---|
Aggregator | A class or an object that is an instance of Aggregator |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def aggregator(self) -> Aggregator:
"""Retrieves aggregator class that will be used for aggregating model parameters.
To set or update aggregator:
[`set_aggregator`][fedbiomed.researcher.federated_workflows.Experiment.set_aggregator].
Returns:
A class or an object that is an instance of [Aggregator][fedbiomed.researcher.aggregators.Aggregator]
"""
return self._aggregator
breakpoint
breakpoint()
Saves breakpoint with the state of the training at a current round.
The following Experiment attributes will be saved:
- round_current
- round_limit
- aggregator
- agg_optimizer
- node_selection_strategy
- aggregated_params
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def breakpoint(self) -> None:
"""
Saves breakpoint with the state of the training at a current round.
The following Experiment attributes will be saved:
- round_current
- round_limit
- aggregator
- agg_optimizer
- node_selection_strategy
- aggregated_params
"""
# need to have run at least 1 round to save a breakpoint
if self._round_current < 1:
msg = ErrorNumbers.FB413.value + \
' - need to run at least 1 before saving a breakpoint'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# conditions are met, save breakpoint
breakpoint_path, breakpoint_file_name = \
choose_bkpt_file(self._experimentation_folder, self._round_current - 1)
# predefine several breakpoint states
agg_bkpt = None
agg_optim_bkpt = None
strategy_bkpt = None
training_replies_bkpt = None
if self._aggregator is not None:
agg_bkpt = self._aggregator.save_state_breakpoint(breakpoint_path,
global_model=self.training_plan().after_training_params())
if self._agg_optimizer is not None:
# FIXME: harmonize naming of save_object
agg_optim_bkpt = self.save_optimizer(breakpoint_path)
if self._node_selection_strategy is not None:
strategy_bkpt = self._node_selection_strategy.save_state_breakpoint()
if self._training_replies is not None:
training_replies_bkpt = self.save_training_replies()
state = {
'round_current': self._round_current,
'round_limit': self._round_limit,
'aggregator': agg_bkpt,
'agg_optimizer': agg_optim_bkpt,
'node_selection_strategy': strategy_bkpt,
'aggregated_params': self.save_aggregated_params(
self._aggregated_params, breakpoint_path),
'training_replies': training_replies_bkpt,
}
super().breakpoint(state, self._round_current)
commit_experiment_history
commit_experiment_history(training_replies, aggregated_params)
Commits the experiment history to memory.
The experiment history is defined as
- training replies
- aggregated parameters
This function checks the retain_full_history flag: if it is True, it simply adds (or overwrites) the current round's entry for the training_replies and aggregated_params dictionary. If the flag is set to False, we simply store the last round's values in the same dictionary format.
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
def commit_experiment_history(self,
training_replies: Dict[str, Dict[str, Any]],
aggregated_params: Dict[str, Any]) -> None:
"""Commits the experiment history to memory.
The experiment history is defined as:
- training replies
- aggregated parameters
This function checks the retain_full_history flag: if it is True, it simply adds
(or overwrites) the current round's entry for the training_replies and aggregated_params
dictionary. If the flag is set to False, we simply store the last round's values in the
same dictionary format.
"""
if self._retain_full_history:
# append to history
self._training_replies[self._round_current] = training_replies
self._aggregated_params[self._round_current] = {'params': aggregated_params}
else:
# only store the last round's values
self._training_replies = {self._round_current: training_replies}
self._aggregated_params = {self._round_current: {'params': aggregated_params}}
info
info()
Prints out the information about the current status of the experiment.
Lists all the parameters/arguments of the experiment and informs whether the experiment can be run.
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | Inconsistent experiment due to missing variables |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def info(self) -> Tuple[Dict[str, List[str]], str]:
"""Prints out the information about the current status of the experiment.
Lists all the parameters/arguments of the experiment and informs whether the experiment can be run.
Raises:
FedbiomedExperimentError: Inconsistent experiment due to missing variables
"""
# at this point all attributes are initialized (in constructor)
info = self._create_default_info_structure()
info['Arguments'].extend([
'Aggregator',
'Strategy',
'Aggregator Optimizer',
'Rounds already run',
'Rounds total',
'Breakpoint State',
])
info['Values'].extend(['\n'.join(findall('.{1,60}',
str(e))) for e in [
self._aggregator.aggregator_name if self._aggregator is not None else None,
self._node_selection_strategy,
self._agg_optimizer,
self._round_current,
self._round_limit,
self._save_breakpoints,
]])
missing = self._check_missing_objects()
return super().info(info, missing)
load_breakpoint classmethod
load_breakpoint(breakpoint_folder_path=None)
Loads breakpoint (provided a breakpoint has been saved) so experience can be resumed. Useful if training has crashed researcher side or if user wants to resume a given experiment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cls | Type[TExperiment] | Experiment class | required |
breakpoint_folder_path | Union[str, None] | path of the breakpoint folder. Path can be absolute or relative eg: "var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads latest breakpoint of the latest experiment. Defaults to None. | None |
Returns:
Type | Description |
---|---|
TExperiment | Reinitialized experiment object. With given object, user can then use |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type, error when reading breakpoint or bad loaded breakpoint content (corrupted) |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@classmethod
@exp_exceptions
def load_breakpoint(cls: Type[TExperiment],
breakpoint_folder_path: Union[str, None] = None) -> TExperiment:
"""
Loads breakpoint (provided a breakpoint has been saved)
so experience can be resumed. Useful if training has crashed
researcher side or if user wants to resume a given experiment.
Args:
cls: Experiment class
breakpoint_folder_path: path of the breakpoint folder. Path can be absolute or relative eg:
"var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads latest breakpoint of the latest
experiment. Defaults to None.
Returns:
Reinitialized experiment object. With given object, user can then use `.run()`
method to pursue model training.
Raises:
FedbiomedExperimentError: bad argument type, error when reading breakpoint or bad loaded breakpoint
content (corrupted)
"""
loaded_exp, saved_state = super().load_breakpoint(breakpoint_folder_path)
# retrieve breakpoint sampling strategy
bkpt_sampling_strategy_args = saved_state.get("node_selection_strategy")
bkpt_sampling_strategy = cls._create_object(bkpt_sampling_strategy_args)
loaded_exp.set_strategy(bkpt_sampling_strategy)
# retrieve breakpoint researcher optimizer
bkpt_optim = Experiment._load_optimizer(saved_state.get("agg_optimizer"))
loaded_exp.set_agg_optimizer(bkpt_optim)
# changing `Experiment` attributes
loaded_exp._set_round_current(saved_state.get('round_current'))
loaded_exp._aggregated_params = loaded_exp._load_aggregated_params(
saved_state.get('aggregated_params')
)
# retrieve and change aggregator
bkpt_aggregator_args = saved_state.get("aggregator")
bkpt_aggregator = cls._create_object(bkpt_aggregator_args, training_plan=loaded_exp.training_plan())
loaded_exp.set_aggregator(bkpt_aggregator)
# load training replies
loaded_exp.load_training_replies(saved_state.get("training_replies"))
logger.info(f"Experimentation reload from {breakpoint_folder_path} successful!")
return loaded_exp
load_training_replies
load_training_replies(bkpt_training_replies)
Reads training replies from a formatted breakpoint file.
Builds a job training replies data structure .
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bkpt_training_replies | Dict[int, Dict[str, Dict[str, Any]]] | Extract from training replies saved in breakpoint | required |
Returns:
Type | Description |
---|---|
None | Training replies of already executed rounds of the experiment |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
def load_training_replies(
self,
bkpt_training_replies: Dict[int, Dict[str, Dict[str, Any]]]
) -> None:
"""Reads training replies from a formatted breakpoint file.
Builds a job training replies data structure .
Args:
bkpt_training_replies: Extract from training replies saved in breakpoint
Returns:
Training replies of already executed rounds of the experiment
"""
if not bkpt_training_replies:
logger.warning("No Replies has been found in this breakpoint")
rounds = set(bkpt_training_replies.keys())
for round_ in rounds:
# reload parameters from file params_path
for node in bkpt_training_replies[round_].values():
node["params"] = Serializer.load(node["params_path"])
bkpt_training_replies[int(round_)] = bkpt_training_replies.pop(round_)
self._training_replies = bkpt_training_replies
monitor
monitor()
Retrieves the monitor object
Monitor is responsible for receiving and parsing real-time training and validation feed-back from each node participate to federated training. See Monitor
Returns:
Type | Description |
---|---|
Monitor | Monitor object that will always exist with experiment to retrieve feed-back from the nodes. |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def monitor(self) -> Monitor:
"""Retrieves the monitor object
Monitor is responsible for receiving and parsing real-time training and validation feed-back from each node
participate to federated training. See [`Monitor`][fedbiomed.researcher.monitor.Monitor]
Returns:
Monitor object that will always exist with experiment to retrieve feed-back from the nodes.
"""
return self._monitor
retain_full_history
retain_full_history()
Retrieves the status of whether the full experiment history should be kept in memory.
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def retain_full_history(self):
"""Retrieves the status of whether the full experiment history should be kept in memory."""
return self._retain_full_history
round_current
round_current()
Retrieves the round where the experiment is at.
Returns:
Type | Description |
---|---|
int | Indicates the round number that the experiment will perform next. |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def round_current(self) -> int:
"""Retrieves the round where the experiment is at.
Returns:
Indicates the round number that the experiment will perform next.
"""
return self._round_current
round_limit
round_limit()
Retrieves the round limit from the experiment object.
Please see also set_round_limit
to change or set round limit.
Returns:
Type | Description |
---|---|
Union[int, None] | Round limit that shows maximum number of rounds that can be performed. |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def round_limit(self) -> Union[int, None]:
"""Retrieves the round limit from the experiment object.
Please see also [`set_round_limit`][fedbiomed.researcher.federated_workflows.Experiment.set_round_limit]
to change or set round limit.
Returns:
Round limit that shows maximum number of rounds that can be performed. `None` if it isn't declared yet.
"""
return self._round_limit
run
run(rounds=None, increase=False)
Run one or more rounds of an experiment, continuing from the point the experiment had reached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rounds | Optional[int] | Number of experiment rounds to run in this call. * | None |
increase | bool | automatically increase the | False |
Returns:
Type | Description |
---|---|
int | Number of rounds have been run |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type or value |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def run(self, rounds: Optional[int] = None, increase: bool = False) -> int:
"""Run one or more rounds of an experiment, continuing from the point the
experiment had reached.
Args:
rounds: Number of experiment rounds to run in this call.
* `None` means "run all the rounds remaining in the experiment" computed as
maximum rounds (`round_limit` for this experiment) minus the number of
rounds already run rounds (`round_current` for this experiment).
It does nothing and issues a warning if `round_limit` is `None` (no
round limit defined for the experiment)
* `int` >= 1 means "run at most `rounds` rounds".
If `round_limit` is `None` for the experiment, run exactly `rounds` rounds.
If a `round_limit` is set for the experiment and the number or rounds would
increase beyond the `round_limit` of the experiment:
- if `increase` is True, increase the `round_limit` to
(`round_current` + `rounds`) and run `rounds` rounds
- if `increase` is False, run (`round_limit` - `round_current`)
rounds, don't modify the maximum `round_limit` of the experiment
and issue a warning.
increase: automatically increase the `round_limit`
of the experiment for executing the specified number of `rounds`.
Does nothing if `round_limit` is `None` or `rounds` is None.
Defaults to False
Returns:
Number of rounds have been run
Raises:
FedbiomedExperimentError: bad argument type or value
"""
# check rounds is a >=1 integer or None
if rounds is None:
pass
else:
msg = ErrorNumbers.FB410.value + \
f', in method `run` param `rounds` : value {rounds}'
self._check_round_value_consistency(rounds, msg)
# check increase is a boolean
if not isinstance(increase, bool):
msg = ErrorNumbers.FB410.value + \
f', in method `run` param `increase` : type {type(increase)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# compute number of rounds to run + updated rounds limit
if rounds is None:
if isinstance(self._round_limit, int):
# run all remaining rounds in the experiment
rounds = self._round_limit - self._round_current
if rounds == 0:
# limit already reached
logger.warning(f'Round limit of {self._round_limit} already reached '
'for this experiment, do nothing.')
return 0
else:
# cannot run if no number of rounds given and no round limit exists
logger.warning('Cannot run, please specify a number of `rounds` to run or '
'set a `round_limit` to the experiment')
return 0
else:
# at this point, rounds is an int >= 1
if isinstance(self._round_limit, int):
if (self._round_current + rounds) > self._round_limit:
if increase:
# dont change rounds, but extend self._round_limit as necessary
logger.debug(f'Auto increasing total rounds for experiment from {self._round_limit} '
f'to {self._round_current + rounds}')
self._round_limit = self._round_current + rounds
else:
new_rounds = self._round_limit - self._round_current
if new_rounds == 0:
# limit already reached
logger.warning(f'Round limit of {self._round_limit} already reached '
'for this experiment, do nothing.')
return 0
else:
# reduce the number of rounds to run in the experiment
logger.warning(f'Limit of {self._round_limit} rounds for the experiment '
f'will be reached, reducing the number of rounds for this '
f'run from {rounds} to {new_rounds}')
rounds = new_rounds
# FIXME: should we print warning if both rounds and _round_limit are None?
# At this point `rounds` is an int > 0 (not None)
# run the rounds
for _ in range(rounds):
if isinstance(self._round_limit, int) and self._round_current == (self._round_limit - 1) \
and self._training_args['test_on_global_updates'] is True:
# Do "validation after a round" only if this a round limit is defined and we reached it
# and validation is active on global params
# When this condition is met, it also means we are running the last of
# the `rounds` rounds in this function
test_after = True
else:
test_after = False
increment = self.run_once(increase=False, test_after=test_after)
if increment == 0:
# should not happen
msg = ErrorNumbers.FB400.value + \
f', in method `run` method `run_once` returns {increment}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
return rounds
run_once
run_once(increase=False, test_after=False)
Run at most one round of an experiment, continuing from the point the experiment had reached.
If round_limit
is None
for the experiment (no round limit defined), run one round. If round_limit
is not None
and the round_limit
of the experiment is already reached: * if increase
is False, do nothing and issue a warning * if increase
is True, increment total number of round round_limit
and run one round
Parameters:
Name | Type | Description | Default |
---|---|---|---|
increase | bool | automatically increase the | False |
test_after | bool | if True, do a second request to the nodes after the round, only for validation on aggregated params. Intended to be used after the last training round of an experiment. Defaults to False. | False |
Returns:
Type | Description |
---|---|
int | Number of rounds really run |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type or value |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def run_once(self, increase: bool = False, test_after: bool = False) -> int:
"""Run at most one round of an experiment, continuing from the point the
experiment had reached.
If `round_limit` is `None` for the experiment (no round limit defined), run one round.
If `round_limit` is not `None` and the `round_limit` of the experiment is already reached:
* if `increase` is False, do nothing and issue a warning
* if `increase` is True, increment total number of round `round_limit` and run one round
Args:
increase: automatically increase the `round_limit` of the experiment if needed. Does nothing if
`round_limit` is `None`. Defaults to False
test_after: if True, do a second request to the nodes after the round, only for validation on aggregated
params. Intended to be used after the last training round of an experiment. Defaults to False.
Returns:
Number of rounds really run
Raises:
FedbiomedExperimentError: bad argument type or value
"""
# check increase is a boolean
if not isinstance(increase, bool):
msg = ErrorNumbers.FB410.value + \
f', in method `run_once` param `increase` : type {type(increase)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# nota: we should never have self._round_current > self._round_limit, only ==
if self._round_limit is not None and self._round_current >= self._round_limit:
if increase is True:
logger.debug(f'Auto increasing total rounds for experiment from {self._round_limit} '
f'to {self._round_current + 1}')
self._round_limit = self._round_current + 1
else:
logger.warning(f'Round limit of {self._round_limit} was reached, do nothing')
return 0
# check pre-requisites are met for running a round
# From here, node_selection_strategy is never None
# if self._node_selection_strategy is None:
# msg = ErrorNumbers.FB411.value + ', missing `node_selection_strategy`'
# logger.critical(msg)
# raise FedbiomedExperimentError(msg)
missing = self._check_missing_objects()
if missing:
raise FedbiomedExperimentError(ErrorNumbers.FB411.value + ': missing one or several object needed for'
' starting the `Experiment`. Details:\n' + missing)
# Sample nodes for training
training_nodes = self._node_selection_strategy.sample_nodes(
from_nodes=self.filtered_federation_nodes(),
round_i=self._round_current
)
# Setup Secure Aggregation (it's a noop if not active)
secagg_arguments = self.secagg_setup(training_nodes)
# Setup aggregator
self._aggregator.set_training_plan_type(self.training_plan().type())
self._aggregator.check_values(n_updates=self._training_args.get('num_updates'),
training_plan=self.training_plan())
model_params_before_round = self.training_plan().after_training_params()
aggregator_args = self._aggregator.create_aggregator_args(model_params_before_round,
training_nodes)
# Collect auxiliary variables from the aggregator optimizer, if any.
optim_aux_var = self._collect_optim_aux_var()
# update node states when list of nodes has changed from one round to another
self._update_nodes_states_agent(before_training=True)
# TODO check node state agent
nodes_state_ids = self._node_state_agent.get_last_node_states()
# if fds is updated, aggregator should be updated too
job = TrainingJob(nodes=training_nodes,
keep_files_dir=self.experimentation_path(),
experiment_id=self._experiment_id,
round_=self._round_current,
training_plan=self.training_plan(),
training_args=self._training_args,
model_args=self.model_args(),
data=self._fds,
nodes_state_ids=nodes_state_ids,
aggregator_args=aggregator_args,
do_training=True,
secagg_arguments=secagg_arguments,
optim_aux_var=optim_aux_var
)
logger.info('Sampled nodes in round ' + str(self._round_current) + ' ' + str(job.nodes))
training_replies, aux_vars = job.execute()
# update node states with node answers + when used node list has changed during the round
self._update_nodes_states_agent(before_training=False, training_replies=training_replies)
# refining/normalizing model weights received from nodes
model_params, weights, total_sample_size, encryption_factors = self._node_selection_strategy.refine(
training_replies, self._round_current)
if self._secagg.active:
flatten_params = self._secagg.aggregate(
round_=self._round_current,
encryption_factors=encryption_factors,
total_sample_size=total_sample_size,
model_params=model_params,
num_expected_params=len(self.training_plan().get_model_wrapper_class().flatten(
exclude_buffers = not self.training_args()['share_persistent_buffers']))
)
# FIXME: Access TorchModel through non-private getter once it is implemented
aggregated_params: Dict[str, Union[torch.tensor, np.ndarray]] = (
self.training_plan().get_model_wrapper_class().unflatten(
flatten_params, exclude_buffers = not self.training_args()['share_persistent_buffers'])
)
else:
# aggregate models from nodes to a global model
aggregated_params = self._aggregator.aggregate(model_params,
weights,
global_model=model_params_before_round,
training_plan=self.training_plan(),
training_replies=training_replies,
node_ids=job.nodes,
n_updates=self._training_args.get('num_updates'),
n_round=self._round_current)
# Optionally refine the aggregated updates using an Optimizer.
self._process_optim_aux_var(aux_vars)
aggregated_params = self._run_agg_optimizer(self.training_plan(),
aggregated_params)
# Update the training plan with the aggregated parameters
self.training_plan().set_model_params(aggregated_params)
# Update experiment's in-memory history
self.commit_experiment_history(training_replies, aggregated_params)
# Increase round number (should be incremented before call to `breakpoint`)
self._set_round_current(self._round_current + 1)
if self._save_breakpoints:
self.breakpoint()
# do final validation after saving breakpoint :
# not saved in breakpoint for current round, but more simple
if test_after:
# FIXME: should we sample nodes here too?
aggr_args = self._aggregator.create_aggregator_args(self.training_plan().after_training_params(),
training_nodes)
job = TrainingJob(nodes=training_nodes,
keep_files_dir=self.experimentation_path(),
experiment_id=self._experiment_id,
round_=self._round_current,
training_plan=self.training_plan(),
training_args=self._training_args,
model_args=self.model_args(),
data=self._fds,
nodes_state_ids=nodes_state_ids,
aggregator_args=aggr_args,
do_training=False
)
job.execute()
return 1
save_aggregated_params staticmethod
save_aggregated_params(aggregated_params_init, breakpoint_path)
Extract and format fields from aggregated_params that need to be saved in breakpoint.
Creates link to the params file from the breakpoint_path
and use them to reference the params files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aggregated_params_init | dict | aggregated parameters | required |
breakpoint_path | str | path to the directory where breakpoints files and links will be saved | required |
Returns:
Type | Description |
---|---|
Dict[int, dict] | Extract from |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad arguments type |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@staticmethod
@exp_exceptions
def save_aggregated_params(aggregated_params_init: dict, breakpoint_path: str) -> Dict[int, dict]:
"""Extract and format fields from aggregated_params that need to be saved in breakpoint.
Creates link to the params file from the `breakpoint_path` and use them to reference the params files.
Args:
aggregated_params_init (dict): aggregated parameters
breakpoint_path: path to the directory where breakpoints files and links will be saved
Returns:
Extract from `aggregated_params`
Raises:
FedbiomedExperimentError: bad arguments type
"""
# check arguments type, though is should have been done before
if not isinstance(aggregated_params_init, dict):
msg = ErrorNumbers.FB413.value + ' - save failed. ' + \
f'Bad type for aggregated params, should be `dict` not {type(aggregated_params_init)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
if not isinstance(breakpoint_path, str):
msg = ErrorNumbers.FB413.value + ' - save failed. ' + \
f'Bad type for breakpoint path, should be `str` not {type(breakpoint_path)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
aggregated_params = {}
for round_, params_dict in aggregated_params_init.items():
if not isinstance(params_dict, dict):
msg = ErrorNumbers.FB413.value + ' - save failed. ' + \
f'Bad type for aggregated params item {str(round_)}, ' + \
f'should be `dict` not {type(params_dict)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
params_path = os.path.join(breakpoint_path, f"aggregated_params_{uuid.uuid4()}.mpk")
Serializer.dump(params_dict['params'], params_path)
aggregated_params[round_] = {'params_path': params_path}
return aggregated_params
save_optimizer
save_optimizer(breakpoint_path)
Save the researcher-side Optimizer attached to this Experiment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
breakpoint_path | str | Path to the breakpoint folder. | required |
Returns:
Type | Description |
---|---|
Optional[str] | Path to the optimizer's save file, or None if no Optimizer is used. |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def save_optimizer(self, breakpoint_path: str) -> Optional[str]:
"""Save the researcher-side Optimizer attached to this Experiment.
Args:
breakpoint_path: Path to the breakpoint folder.
Returns:
Path to the optimizer's save file, or None if no Optimizer is used.
"""
# Case when no researcher optimizer is used.
if self._agg_optimizer is None:
return None
# Case when an Optimizer is used: save its state and return the path.
state = self._agg_optimizer.get_state()
path = os.path.join(breakpoint_path, f"optimizer_{uuid.uuid4()}.mpk")
Serializer.dump(state, path)
return path
save_training_replies
save_training_replies()
Extracts a copy of training_replies
and prepares it for saving in breakpoint
- strip unwanted fields
- structure as list/dict, so it can be saved with JSON
Returns:
Type | Description |
---|---|
Dict[int, Dict[str, Dict[str, Any]]] | Extract from |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
def save_training_replies(self) -> Dict[int, Dict[str, Dict[str, Any]]]:
"""Extracts a copy of `training_replies` and prepares it for saving in breakpoint
- strip unwanted fields
- structure as list/dict, so it can be saved with JSON
Returns:
Extract from `training_replies` formatted for breakpoint
"""
converted_training_replies = copy.deepcopy(self.training_replies())
for training_reply in converted_training_replies.values():
# we want to strip some fields for the breakpoint
for reply in training_reply.values():
reply.pop('params', None)
return converted_training_replies
set_agg_optimizer
set_agg_optimizer(agg_optimizer)
Sets the optional researcher optimizer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agg_optimizer | Optional[Optimizer] | Optional fedbiomed Optimizer instance to be used so as to refine aggregated updates prior to applying them. If None, equivalent to using vanilla SGD with 1.0 learning rate. | required |
Returns:
Type | Description |
---|---|
Optional[Optimizer] | The optional researcher optimizer attached to this Experiment. |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | if |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_agg_optimizer(
self,
agg_optimizer: Optional[Optimizer],
) -> Optional[Optimizer]:
"""Sets the optional researcher optimizer.
Args:
agg_optimizer: Optional fedbiomed Optimizer instance to be
used so as to refine aggregated updates prior to applying them.
If None, equivalent to using vanilla SGD with 1.0 learning rate.
Returns:
The optional researcher optimizer attached to this Experiment.
Raises:
FedbiomedExperimentError: if `optimizer` is of unproper type.
"""
if not (
agg_optimizer is None or
isinstance(agg_optimizer, Optimizer)
):
raise FedbiomedExperimentError(
f"{ErrorNumbers.FB410.value}: 'agg_optimizer' must be an "
f"Optimizer instance or None, not {type(agg_optimizer)}."
)
self._agg_optimizer = agg_optimizer
return self._agg_optimizer
set_aggregator
set_aggregator(aggregator=None)
Sets aggregator + verification on arguments type
Ensures consistency with the training data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aggregator | Optional[Aggregator] | Object defining the method for aggregating local updates. Default to None (use | None |
Returns:
Type | Description |
---|---|
Aggregator | aggregator (Aggregator) |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad aggregator type |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_aggregator(self, aggregator: Optional[Aggregator] = None) -> Aggregator:
"""Sets aggregator + verification on arguments type
Ensures consistency with the training data.
Args:
aggregator: Object defining the method for aggregating local updates. Default to None
(use `FedAverage` for aggregation)
Returns:
aggregator (Aggregator)
Raises:
FedbiomedExperimentError : bad aggregator type
"""
if aggregator is None:
# default aggregator
self._aggregator = FedAverage()
elif not isinstance(aggregator, Aggregator):
msg = f"{ErrorNumbers.FB410.value}: aggregator is not an instance of Aggregator."
logger.critical(msg)
raise FedbiomedTypeError(msg)
else:
# at this point, `agregator` is an instance / inheriting of `Aggregator`
self._aggregator = aggregator
self.aggregator_args["aggregator_name"] = self._aggregator.aggregator_name
# ensure consistency with federated dataset
self._aggregator.set_fds(self._fds)
return self._aggregator
set_retain_full_history
set_retain_full_history(retain_full_history_=True)
Sets the status of whether the full experiment history should be kept in memory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
retain_full_history_ | bool | whether to retain in memory the full history of node replies and aggregated params for the experiment. If False, only the last round's replies and aggregated params will be available. Defaults to True. | True |
Returns:
Type | Description |
---|---|
The status of whether the full experiment history should be kept in memory. |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_retain_full_history(self, retain_full_history_: bool = True):
"""Sets the status of whether the full experiment history should be kept in memory.
Args:
retain_full_history_: whether to retain in memory the full history of node replies and aggregated params
for the experiment. If False, only the last round's replies and aggregated params will be available.
Defaults to True.
Returns:
The status of whether the full experiment history should be kept in memory.
"""
if not isinstance(retain_full_history_, bool):
msg = ErrorNumbers.FB410.value + f': retain_full_history should be a bool, instead got ' \
f'{type(retain_full_history_)} '
logger.critical(msg)
raise FedbiomedTypeError(msg)
self._retain_full_history = retain_full_history_
return self._retain_full_history
set_round_limit
set_round_limit(round_limit)
Sets round_limit
+ verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
round_limit | Union[int, None] | the maximum number of training rounds (nodes <-> central server) that should be executed for the experiment. | required |
Returns:
Type | Description |
---|---|
Union[int, None] | Round limit for experiment of federated learning |
Raises:
Type | Description |
---|---|
FedbiomedValueError | bad rounds type or value |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_round_limit(self, round_limit: Union[int, None]) -> Union[int, None]:
"""Sets `round_limit` + verification on arguments type
Args:
round_limit: the maximum number of training rounds (nodes <-> central server) that should be executed
for the experiment. `None` means that no limit is defined.
Returns:
Round limit for experiment of federated learning
Raises:
FedbiomedValueError : bad rounds type or value
"""
# at this point round_current exists and is an int >= 0
if round_limit is None:
# no limit for training rounds
self._round_limit = None
else:
self._check_round_value_consistency(round_limit, "round_limit")
if round_limit < self._round_current:
# self._round_limit can't be less than current round
msg = f'cannot set `round_limit` to less than the number of already run rounds ' \
f'({self._round_current})'
logger.critical(msg)
raise FedbiomedValueError(msg)
else:
self._round_limit = round_limit
# at this point self._round_limit is a Union[int, None]
return self._round_limit
set_strategy
set_strategy(node_selection_strategy=None)
Sets for node_selection_strategy
+ verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_selection_strategy | Optional[Strategy] | object defining how nodes are sampled at each round for training, and how non-responding nodes are managed. Defaults to None: - use | None |
Returns:
Type | Description |
---|---|
Union[Strategy, None] | node selection strategy class |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad strategy type |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_strategy(
self,
node_selection_strategy: Optional[Strategy] = None
) -> Union[Strategy, None]:
"""Sets for `node_selection_strategy` + verification on arguments type
Args:
node_selection_strategy: object defining how nodes are sampled at each round for training, and
how non-responding nodes are managed. Defaults to None:
- use `DefaultStrategy` if training_data is initialized
- else strategy is None (cannot be initialized), experiment cannot
be launched yet
Returns:
node selection strategy class
Raises:
FedbiomedExperimentError : bad strategy type
"""
if node_selection_strategy is None:
# default node_selection_strategy
self._node_selection_strategy = DefaultStrategy()
elif not isinstance(node_selection_strategy, Strategy):
msg = f"{ErrorNumbers.FB410.value}: wrong type for " \
"node_selection_strategy {type(node_selection_strategy)} " \
"it should be an instance of Strategy"
logger.critical(msg)
raise FedbiomedTypeError(msg)
else:
self._node_selection_strategy = node_selection_strategy
# at this point self._node_selection_strategy is a Union[Strategy, None]
return self._node_selection_strategy
set_tensorboard
set_tensorboard(tensorboard)
Sets the tensorboard flag
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tensorboard | bool | If | required |
Returns:
Type | Description |
---|---|
bool | Status of tensorboard |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_tensorboard(self, tensorboard: bool) -> bool:
"""
Sets the tensorboard flag
Args:
tensorboard: If `True` tensorboard log files will be writen after receiving training feedbacks
Returns:
Status of tensorboard
"""
if isinstance(tensorboard, bool):
self._tensorboard = tensorboard
self._monitor.set_tensorboard(tensorboard)
else:
msg = ErrorNumbers.FB410.value + f' `tensorboard` : {type(tensorboard)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
return self._tensorboard
set_test_metric
set_test_metric(metric, **metric_args)
Sets a metric for federated model validation
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metric | Union[MetricTypes, str, None] | A class as an instance of | required |
**metric_args | dict | A dictionary that contains arguments for metric function. Arguments should be compatible with corresponding metrics in | {} |
Returns:
Type | Description |
---|---|
Tuple[Union[str, None], Dict[str, Any]] | Metric and metric args as tuple |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | Invalid type for |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_test_metric(self, metric: Union[MetricTypes, str, None], **metric_args: dict) -> \
Tuple[Union[str, None], Dict[str, Any]]:
""" Sets a metric for federated model validation
Args:
metric: A class as an instance of [`MetricTypes`][fedbiomed.common.metrics.MetricTypes]. [`str`][str] for
referring one of metric which provided as attributes in [`MetricTypes`]
[fedbiomed.common.metrics.MetricTypes]. None, if it isn't declared yet.
**metric_args: A dictionary that contains arguments for metric function. Arguments
should be compatible with corresponding metrics in [`sklearn.metrics`][sklearn.metrics].
Returns:
Metric and metric args as tuple
Raises:
FedbiomedExperimentError: Invalid type for `metric` argument
"""
self._training_args['test_metric'] = metric
# using **metric_args, we know `test_metric_args` is a Dict[str, Any]
self._training_args['test_metric_args'] = metric_args
return metric, metric_args
set_test_on_global_updates
set_test_on_global_updates(flag=True)
Setter for test_on_global_updates, that indicates whether to perform a validation on the federated model updates on the node side before training model locally where aggregated model parameters are received.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flag | bool | whether to perform model validation on global updates. Defaults to True. | True |
Returns:
Type | Description |
---|---|
bool | Value of the flag |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad flag type |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_test_on_global_updates(self, flag: bool = True) -> bool:
"""
Setter for test_on_global_updates, that indicates whether to perform a validation on the federated model
updates on the node side before training model locally where aggregated model parameters are received.
Args:
flag (bool, optional): whether to perform model validation on global updates. Defaults to True.
Returns:
Value of the flag `test_on_global_updates`.
Raises:
FedbiomedExperimentError : bad flag type
"""
self._training_args['test_on_global_updates'] = flag
return self._training_args['test_on_global_updates']
set_test_on_local_updates
set_test_on_local_updates(flag=True)
Setter for test_on_local_updates
, that indicates whether to perform a validation on the federated model on the node side where model parameters are updated locally after training in each node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flag | bool | whether to perform model validation on local updates. Defaults to True. | True |
Returns:
Type | Description |
---|---|
bool | value of the flag |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad flag type |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_test_on_local_updates(self, flag: bool = True) -> bool:
"""
Setter for `test_on_local_updates`, that indicates whether to perform a validation on the federated model on the
node side where model parameters are updated locally after training in each node.
Args:
flag (bool, optional): whether to perform model validation on local updates. Defaults to True.
Returns:
value of the flag `test_on_local_updates`
Raises:
FedbiomedExperimentError: bad flag type
"""
self._training_args['test_on_local_updates'] = flag
return self._training_args['test_on_local_updates']
set_test_ratio
set_test_ratio(ratio)
Sets validation ratio for model validation.
When setting test_ratio, nodes will allocate (1 - test_ratio
) fraction of data for training and the remaining for validating model. This could be useful for validating the model, once every round, as well as controlling overfitting, doing early stopping, ....
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ratio | float | validation ratio. Must be within interval [0,1]. | required |
Returns:
Type | Description |
---|---|
float | Validation ratio that is set |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad data type |
FedbiomedExperimentError | ratio is not within interval [0, 1] |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_test_ratio(self, ratio: float) -> float:
""" Sets validation ratio for model validation.
When setting test_ratio, nodes will allocate (1 - `test_ratio`) fraction of data for training and the
remaining for validating model. This could be useful for validating the model, once every round, as well as
controlling overfitting, doing early stopping, ....
Args:
ratio: validation ratio. Must be within interval [0,1].
Returns:
Validation ratio that is set
Raises:
FedbiomedExperimentError: bad data type
FedbiomedExperimentError: ratio is not within interval [0, 1]
"""
self._training_args['test_ratio'] = ratio
return ratio
set_training_data
set_training_data(training_data, from_tags=False)
Sets training data for federated training + verification on arguments type
See FederatedWorkflow.set_training_data
for more information.
Ensures consistency also with the Experiment's aggregator and node state agent
Setting to None forfeits consistency checks
Setting training_data to None does not trigger consistency checks, and may therefore leave the class in an inconsistent state.
Returns:
Type | Description |
---|---|
Union[FederatedDataSet, None] | Dataset metadata |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_training_data(
self,
training_data: Union[FederatedDataSet, dict, None],
from_tags: bool = False) -> \
Union[FederatedDataSet, None]:
"""Sets training data for federated training + verification on arguments type
See
[`FederatedWorkflow.set_training_data`][fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_training_data]
for more information.
Ensures consistency also with the Experiment's aggregator and node state agent
!!! warning "Setting to None forfeits consistency checks"
Setting training_data to None does not trigger consistency checks, and may therefore leave the class in an
inconsistent state.
Returns:
Dataset metadata
"""
super().set_training_data(training_data, from_tags)
# Below: Experiment-specific operations for consistency
if self._aggregator is not None and self._fds is not None:
# update the aggregator's training data
self._aggregator.set_fds(self._fds)
if self._node_state_agent is not None and self._fds is not None:
# update the node state agent (member of FederatedWorkflow)
self._node_state_agent.update_node_states(self.all_federation_nodes())
return self._fds
strategy
strategy()
Retrieves the class that represents the node selection strategy.
Please see also set_strategy
to set or update node selection strategy.
Returns:
Type | Description |
---|---|
Union[Strategy, None] | A class or object as an instance of |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def strategy(self) -> Union[Strategy, None]:
"""Retrieves the class that represents the node selection strategy.
Please see also [`set_strategy`][fedbiomed.researcher.federated_workflows.Experiment.set_strategy]
to set or update node selection strategy.
Returns:
A class or object as an instance of [`Strategy`][fedbiomed.researcher.strategies.Strategy]. `None` if
it is not declared yet. It means that node selection strategy will be
[`DefaultStrategy`][fedbiomed.researcher.strategies.DefaultStrategy].
"""
return self._node_selection_strategy
test_metric
test_metric()
Retrieves the metric for validation routine.
Please see also set_test_metric
to change/set test_metric
Returns:
Type | Description |
---|---|
Union[MetricTypes, str, None] | A class as an instance of |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def test_metric(self) -> Union[MetricTypes, str, None]:
"""Retrieves the metric for validation routine.
Please see also [`set_test_metric`][fedbiomed.researcher.federated_workflows.Experiment.set_test_metric]
to change/set `test_metric`
Returns:
A class as an instance of [`MetricTypes`][fedbiomed.common.metrics.MetricTypes]. [`str`][str] for referring
one of metric which provided as attributes in [`MetricTypes`][fedbiomed.common.metrics.MetricTypes].
None, if it isn't declared yet.
"""
return self._training_args['test_metric']
test_metric_args
test_metric_args()
Retrieves the metric argument for the metric function that is going to be used.
Please see also set_test_metric
to change/set test_metric
and get more information on the arguments can be used.
Returns:
Type | Description |
---|---|
Dict[str, Any] | A dictionary that contains arguments for metric function. See |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def test_metric_args(self) -> Dict[str, Any]:
"""Retrieves the metric argument for the metric function that is going to be used.
Please see also [`set_test_metric`][fedbiomed.researcher.federated_workflows.Experiment.set_test_metric]
to change/set `test_metric` and get more information on the arguments can be used.
Returns:
A dictionary that contains arguments for metric function. See [`set_test_metric`]
[fedbiomed.researcher.federated_workflows.Experiment.set_test_metric]
"""
return self._training_args['test_metric_args']
test_on_global_updates
test_on_global_updates()
Retrieves the status of whether validation will be performed on globally updated (aggregated) parameters by the nodes at the beginning of each round.
Please see also set_test_on_global_updates
.
Returns:
Type | Description |
---|---|
bool | True, if validation is active on globally updated (aggregated) parameters. False for vice versa. |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def test_on_global_updates(self) -> bool:
""" Retrieves the status of whether validation will be performed on globally updated (aggregated)
parameters by the nodes at the beginning of each round.
Please see also [`set_test_on_global_updates`]
[fedbiomed.researcher.federated_workflows.Experiment.set_test_on_global_updates].
Returns:
True, if validation is active on globally updated (aggregated) parameters. False for vice versa.
"""
return self._training_args['test_on_global_updates']
test_on_local_updates
test_on_local_updates()
Retrieves the status of whether validation will be performed on locally updated parameters by the nodes at the end of each round.
Please see also set_test_on_local_updates
.
Returns:
Type | Description |
---|---|
bool | True, if validation is active on locally updated parameters. False for vice versa. |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def test_on_local_updates(self) -> bool:
"""Retrieves the status of whether validation will be performed on locally updated parameters by
the nodes at the end of each round.
Please see also
[`set_test_on_local_updates`][fedbiomed.researcher.federated_workflows.Experiment.set_test_on_local_updates].
Returns:
True, if validation is active on locally updated parameters. False for vice versa.
"""
return self._training_args['test_on_local_updates']
test_ratio
test_ratio()
Retrieves the ratio for validation partition of entire dataset.
Please see also set_test_ratio
to change/set test_ratio
Returns:
Type | Description |
---|---|
float | The ratio for validation part, |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def test_ratio(self) -> float:
"""Retrieves the ratio for validation partition of entire dataset.
Please see also [`set_test_ratio`][fedbiomed.researcher.federated_workflows.Experiment.set_test_ratio] to
change/set `test_ratio`
Returns:
The ratio for validation part, `1 - test_ratio` is ratio for training set.
"""
return self._training_args['test_ratio']
training_replies
training_replies()
Retrieves training replies of each round of training.
Training replies contains timing statistics and the files parth/URLs that has been received after each round.
Returns:
Type | Description |
---|---|
Union[dict, None] | Dictionary of training replies with format {round (int) : replies (dict)} |
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def training_replies(self) -> Union[dict, None]:
"""Retrieves training replies of each round of training.
Training replies contains timing statistics and the files parth/URLs that has been received after each round.
Returns:
Dictionary of training replies with format {round (int) : replies (dict)}
"""
return self._training_replies
FederatedWorkflow
FederatedWorkflow(tags=None, nodes=None, training_data=None, experimentation_folder=None, secagg=False, save_breakpoints=False)
Bases: ABC
A FederatedWorkflow is the abstract entry point for the researcher to orchestrate both local and remote operations.
The FederatedWorkflow is an abstract base class from which the actual classes used by the researcher must inherit. It manages the life-cycle of:
- the training arguments
- secure aggregation
- the node state agent
Additionally, it provides the basis for the breakpoint functionality, and manages some backend functionalities such as the temporary directory, the experiment ID, etc...
The attributes training_data
and tags
are co-dependent. Attempting to modify one of those may result in side effects modifying the other, according to the following rules: - modifying tags if training data is not None will reset the training data based on the new tags - modifying the training data using a FederatedDataset object or a dict will set tags to None
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tags | Optional[List[str] | str] | list of string with data tags or string with one data tag. Empty list of tags ([]) means any dataset is accepted, it is different from None (tags not set, cannot search for training_data yet). | None |
nodes | Optional[List[str]] | list of node_ids to filter the nodes to be involved in the experiment. Defaults to None (no filtering). | None |
training_data | Union[FederatedDataSet, dict, None] |
| None |
save_breakpoints | bool | whether to save breakpoints or not after each training round. Breakpoints can be used for resuming a crashed experiment. | False |
experimentation_folder | Union[str, None] | choose a specific name for the folder where experimentation result files and breakpoints are stored. This should just contain the name for the folder not a path. The name is used as a subdirectory of | None |
secagg | Union[bool, SecureAggregation] | whether to setup a secure aggregation context for this experiment, and use it to send encrypted updates from nodes to researcher. Defaults to | False |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def __init__(
self,
tags: Optional[List[str] | str] = None,
nodes: Optional[List[str]] = None,
training_data: Union[FederatedDataSet, dict, None] = None,
experimentation_folder: Union[str, None] = None,
secagg: Union[bool, SecureAggregation] = False,
save_breakpoints: bool = False,
) -> None:
"""Constructor of the class.
Args:
tags: list of string with data tags or string with one data tag. Empty list of
tags ([]) means any dataset is accepted, it is different from None
(tags not set, cannot search for training_data yet).
nodes: list of node_ids to filter the nodes to be involved in the experiment.
Defaults to None (no filtering).
training_data:
* If it is a FederatedDataSet object, use this value as training_data.
* else if it is a dict, create and use a FederatedDataSet object
from the dict and use this value as training_data. The dict should use
node ids as keys, values being list of dicts (each dict representing a
dataset on a node).
* else if it is None (no training data provided)
- if `tags` is not None, set training_data by
searching for datasets with a query to the nodes using `tags` and `nodes`
- if `tags` is None, set training_data to None (no training_data set yet,
experiment is not fully initialized and cannot be launched)
Defaults to None (query nodes for dataset if `tags` is not None, set training_data
to None else)
save_breakpoints: whether to save breakpoints or not after each training
round. Breakpoints can be used for resuming a crashed experiment.
experimentation_folder: choose a specific name for the folder
where experimentation result files and breakpoints are stored. This
should just contain the name for the folder not a path. The name is used
as a subdirectory of `environ[EXPERIMENTS_DIR])`. Defaults to None
(auto-choose a folder name)
- Caveat : if using a specific name this experimentation will not be
automatically detected as the last experimentation by `load_breakpoint`
- Caveat : do not use a `experimentation_folder` name finishing
with numbers ([0-9]+) as this would confuse the last experimentation
detection heuristic by `load_breakpoint`.
secagg: whether to setup a secure aggregation context for this experiment, and
use it to send encrypted updates from nodes to researcher.
Defaults to `False`
"""
# predefine all class variables, so no need to write try/except
# block each time we use it
self._fds: Optional[FederatedDataSet] = None # dataset metadata from the full federation
self._reqs: Requests = Requests()
self._nodes_filter: Optional[List[str]] = None # researcher-defined nodes filter
self._tags: Optional[List[str]] = None
self._experimentation_folder: Optional[str] = None
self._secagg: Union[SecureAggregation, bool] = False
self._save_breakpoints: Optional[bool] = None
self._node_state_agent: Optional[NodeStateAgent] = None
self._researcher_id: str = environ['RESEARCHER_ID']
self._experiment_id: str = EXPERIMENT_PREFIX + str(uuid.uuid4()) # creating a unique experiment id
# set internal members from constructor arguments
self.set_secagg(secagg)
# TODO: Manage tags within the FederatedDataset to avoid conflicts
if training_data is not None and tags is not None:
msg = f"{ErrorNumbers.FB410.value}: Can not set `training_data` and `tags` at the " \
"same time. Please provide only `training_data`, or tags to search for " \
"training data."
logger.critical(msg)
raise FedbiomedValueError(msg)
# Set tags if it tags is not None
if tags:
self.set_tags(tags)
if training_data:
self.set_training_data(training_data)
self.set_nodes(nodes)
self.set_save_breakpoints(save_breakpoints)
self.set_experimentation_folder(experimentation_folder)
self._node_state_agent = NodeStateAgent(list(self._fds.data().keys())
if self._fds and self._fds.data() else [])
Attributes
id property
id
Retrieves the unique experiment identifier.
secagg property
secagg
Gets secagg object SecureAggregation
Returns:
Type | Description |
---|---|
SecureAggregation | Secure aggregation object. |
Functions
all_federation_nodes
all_federation_nodes()
Returns all the node ids in the federation
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def all_federation_nodes(self) -> List[str]:
"""Returns all the node ids in the federation"""
return list(self._fds.data().keys()) if self._fds is not None else []
breakpoint
breakpoint(state, bkpt_number)
Saves breakpoint with the state of the workflow.
The following attributes will be saved:
- tags
- experimentation_folder
- training_data
- training_args
- secagg
- node_state
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | experiment not fully defined, experiment did not run any round yet, or error when saving breakpoint |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def breakpoint(self,
state: Dict,
bkpt_number: int) -> None:
"""
Saves breakpoint with the state of the workflow.
The following attributes will be saved:
- tags
- experimentation_folder
- training_data
- training_args
- secagg
- node_state
Raises:
FedbiomedExperimentError: experiment not fully defined, experiment did not run any round
yet, or error when saving breakpoint
"""
state.update({
'id': self._experiment_id,
'breakpoint_version': str(__breakpoints_version__),
'training_data': self._fds.data(),
'experimentation_folder': self._experimentation_folder,
'tags': self._tags,
'nodes': self._nodes_filter,
'secagg': self._secagg.save_state_breakpoint(),
'node_state': self._node_state_agent.save_state_breakpoint()
})
# save state into a json file
breakpoint_path, breakpoint_file_name = \
choose_bkpt_file(self._experimentation_folder, bkpt_number - 1)
breakpoint_file_path = os.path.join(breakpoint_path, breakpoint_file_name)
try:
with open(breakpoint_file_path, 'w', encoding="UTF-8") as bkpt:
json.dump(state, bkpt)
logger.info(f"breakpoint number {bkpt_number - 1} saved at " +
os.path.dirname(breakpoint_file_path))
except (OSError, PermissionError, ValueError, TypeError, RecursionError) as e:
# - OSError: heuristic for catching open() and write() errors
# - see json.dump() documentation for documented errors for this call
msg = ErrorNumbers.FB413.value + f' - save failed with message {str(e)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg) from e
experimentation_folder
experimentation_folder()
Retrieves the folder name where experiment data/result are saved.
Please see also set_experimentation_folder
Returns:
Type | Description |
---|---|
str | File name where experiment related files are saved |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def experimentation_folder(self) -> str:
"""Retrieves the folder name where experiment data/result are saved.
Please see also [`set_experimentation_folder`]
[fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_experimentation_folder]
Returns:
File name where experiment related files are saved
"""
return self._experimentation_folder
experimentation_path
experimentation_path()
Retrieves the file path where experimentation folder is located and experiment related files are saved.
Returns:
Type | Description |
---|---|
str | Experiment directory where all experiment related files are saved |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def experimentation_path(self) -> str:
"""Retrieves the file path where experimentation folder is located and experiment related files are saved.
Returns:
Experiment directory where all experiment related files are saved
"""
return os.path.join(environ['EXPERIMENTS_DIR'], self._experimentation_folder)
filtered_federation_nodes
filtered_federation_nodes()
Returns the node ids in the federation after filtering with the nodes filter
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def filtered_federation_nodes(self) -> List[str]:
"""Returns the node ids in the federation after filtering with the nodes filter"""
if self._nodes_filter is not None:
return [node for node in self.all_federation_nodes() if node in self._nodes_filter]
else:
return self.all_federation_nodes()
info
info(info=None, missing='')
Prints out the information about the current status of the experiment.
Lists all the parameters/arguments of the experiment and informs whether the experiment can be run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info | Dict[str, List[str]] | Dictionary of sub-classes relevant attributes status that will be completed with some additional attributes status defined in this class. Defaults to None (no entries of sub-classes available or of importance). | None |
missing_object_to_check | dictionary mapping sub-classes attributes to attribute names, that may be needed to fully run the object. Defaults to None (no check will be performed). | required |
Returns:
Type | Description |
---|---|
Dict[str, List[str]] | dictionary containing all pieces of information, with 2 entries: |
str | of all argument, and |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def info(self,
info: Dict[str, List[str]] = None,
missing: str = '') -> Tuple[Dict[str, List[str]], str]:
"""Prints out the information about the current status of the experiment.
Lists all the parameters/arguments of the experiment and informs whether the experiment can be run.
Args:
info: Dictionary of sub-classes relevant attributes status that will be completed with some additional
attributes status defined in this class. Defaults to None (no entries of sub-classes available or
of importance).
missing_object_to_check: dictionary mapping sub-classes attributes to attribute names, that may be
needed to fully run the object. Defaults to None (no check will be performed).
Returns:
dictionary containing all pieces of information, with 2 entries: `Arguments` mapping a list
of all argument, and `Values` mapping a list containing all the values.
"""
if info is None:
info = self._create_default_info_structure()
info['Arguments'].extend([
'Tags',
'Nodes filter',
'Training Data',
'Experiment folder',
'Experiment Path',
'Secure Aggregation'
])
info['Values'].extend(['\n'.join(findall('.{1,60}', str(e))) for e in [
self._tags,
self._nodes_filter,
self._fds,
self._experimentation_folder,
self.experimentation_path(),
f'- Using: {self._secagg}\n- Active: {self._secagg.active}'
]])
# printing list of items set / not set yet
print(tabulate.tabulate(info, headers='keys'))
if missing:
print("\nWarning: Object not fully defined, missing"
f": \n{missing}")
else:
print(f"{self.__class__.__name__} can be run now (fully defined)")
return info, missing
load_breakpoint classmethod
load_breakpoint(breakpoint_folder_path=None)
Loads breakpoint (provided a breakpoint has been saved) so the workflow can be resumed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
breakpoint_folder_path | Optional[str] | path of the breakpoint folder. Path can be absolute or relative eg: "var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads the latest breakpoint of the latest workflow. Defaults to None. | None |
Returns:
Type | Description |
---|---|
Tuple[TFederatedWorkflow, dict] | Tuple contaning reinitialized workflow object and the saved state as a dictionary |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type, error when reading breakpoint or bad loaded breakpoint content (corrupted) |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@classmethod
@exp_exceptions
def load_breakpoint(
cls,
breakpoint_folder_path: Optional[str] = None
) -> Tuple[TFederatedWorkflow, dict]:
"""
Loads breakpoint (provided a breakpoint has been saved)
so the workflow can be resumed.
Args:
breakpoint_folder_path: path of the breakpoint folder. Path can be absolute
or relative eg: "var/experiments/Experiment_xxxx/breakpoints_xxxx".
If None, loads the latest breakpoint of the latest workflow. Defaults to None.
Returns:
Tuple contaning reinitialized workflow object and the saved state as a dictionary
Raises:
FedbiomedExperimentError: bad argument type, error when reading breakpoint or
bad loaded breakpoint content (corrupted)
"""
# check parameters type
if not isinstance(breakpoint_folder_path, str) and breakpoint_folder_path is not None:
msg = (
f"{ErrorNumbers.FB413.value}: load failed, `breakpoint_folder_path`"
f" has bad type {type(breakpoint_folder_path)}"
)
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# get breakpoint folder path (if it is None) and state file
breakpoint_folder_path, state_file = find_breakpoint_path(breakpoint_folder_path)
breakpoint_folder_path = os.path.abspath(breakpoint_folder_path)
try:
path = os.path.join(breakpoint_folder_path, state_file)
with open(path, "r", encoding="utf-8") as file:
saved_state = json.load(file)
except (json.JSONDecodeError, OSError) as exc:
# OSError: heuristic for catching file access issues
msg = (
f"{ErrorNumbers.FB413.value}: load failed,"
f" reading breakpoint file failed with message {exc}"
)
logger.critical(msg)
raise FedbiomedExperimentError(msg) from exc
if not isinstance(saved_state, dict):
msg = (
f"{ErrorNumbers.FB413.value}: load failed, breakpoint file seems"
f" corrupted. Type should be `dict` not {type(saved_state)}"
)
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# First, check version of breakpoints
bkpt_version = saved_state.get('breakpoint_version', __default_version__)
raise_for_version_compatibility(bkpt_version, __breakpoints_version__,
f"{ErrorNumbers.FB413.value}: Breakpoint "
"file was generated with version %s "
f"which is incompatible with the current version %s.")
# retrieve breakpoint training data
bkpt_fds = saved_state.get('training_data')
bkpt_fds = FederatedDataSet(bkpt_fds)
# initializing experiment
loaded_exp = cls()
loaded_exp._experiment_id = saved_state.get('id')
loaded_exp.set_training_data(bkpt_fds)
loaded_exp._tags = saved_state.get('tags')
loaded_exp.set_nodes(saved_state.get('nodes'))
loaded_exp.set_experimentation_folder(saved_state.get('experimentation_folder'))
secagg = SecureAggregation.load_state_breakpoint(saved_state.get('secagg'))
loaded_exp.set_secagg(secagg)
loaded_exp._node_state_agent.load_state_breakpoint(saved_state.get('node_state'))
loaded_exp.set_save_breakpoints(True)
return loaded_exp, saved_state
nodes
nodes()
Retrieves the nodes filter for the execution of the workflow.
If nodes is None, then no filtering is applied, and all the nodes in the federation participate in the execution of the workflow. If nodes is not None, then the semantics of the nodes filter are as follows:
node_id in nodes filter | node_id in training data | outcome |
---|---|---|
yes | yes | this node is part of the federation, and will take part in the execution the workflow |
yes | no | ignored |
no | yes | this node is part of the federation but will not be considered for executing the workflow |
no | no | ignored |
Please see set_nodes
to set nodes
.
Returns:
Type | Description |
---|---|
Union[List[str], None] | The list of nodes to keep for workflow execution, or None if no filtering is applied |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def nodes(self) -> Union[List[str], None]:
"""Retrieves the nodes filter for the execution of the workflow.
If nodes is None, then no filtering is applied, and all the nodes in the federation participate in the
execution of the workflow.
If nodes is not None, then the semantics of the nodes filter are as follows:
| node_id in nodes filter | node_id in training data | outcome |
| --- | --- | --- |
| yes | yes | this node is part of the federation, and will take part in the execution the workflow |
| yes | no | ignored |
| no | yes | this node is part of the federation but will not be considered for executing the workflow |
| no | no | ignored |
Please see [`set_nodes`][fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_nodes] to set `nodes`.
Returns:
The list of nodes to keep for workflow execution, or None if no filtering is applied
"""
return self._nodes_filter
run abstractmethod
run()
Run the experiment
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@abstractmethod
def run(self) -> int:
"""Run the experiment"""
save_breakpoints
save_breakpoints()
Retrieves the status of saving breakpoint after each round of training.
Returns:
Type | Description |
---|---|
bool |
|
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def save_breakpoints(self) -> bool:
"""Retrieves the status of saving breakpoint after each round of training.
Returns:
`True`, If saving breakpoint is active. `False`, vice versa.
"""
return self._save_breakpoints
secagg_setup
secagg_setup(sampled_nodes)
Retrieves the secagg arguments for setup.
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
def secagg_setup(self, sampled_nodes: List[str]) -> Dict:
"""Retrieves the secagg arguments for setup."""
secagg_arguments = {}
if self._secagg.active:
if not self._secagg.setup(
parties=sampled_nodes,
experiment_id=self._experiment_id,
):
raise FedbiomedSecureAggregationError(
f"{ErrorNumbers.FB417.value}: Could not setup secure aggregation crypto "
"context."
)
secagg_arguments = self._secagg.train_arguments()
return secagg_arguments
set_experimentation_folder
set_experimentation_folder(experimentation_folder=None)
Sets experimentation_folder
, the folder name where experiment data/result are saved.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
experimentation_folder | Optional[str] | File name where experiment related files are saved | None |
Returns:
Type | Description |
---|---|
str | The path to experimentation folder. |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_experimentation_folder(self, experimentation_folder: Optional[str] = None) -> str:
"""Sets `experimentation_folder`, the folder name where experiment data/result are saved.
Args:
experimentation_folder: File name where experiment related files are saved
Returns:
The path to experimentation folder.
Raises:
FedbiomedExperimentError : bad `experimentation_folder` type
"""
if experimentation_folder is None:
self._experimentation_folder = create_exp_folder()
elif isinstance(experimentation_folder, str):
sanitized_folder = sanitize_filename(experimentation_folder, platform='auto')
self._experimentation_folder = create_exp_folder(sanitized_folder)
if sanitized_folder != experimentation_folder:
logger.warning(f'`experimentation_folder` was sanitized from '
f'{experimentation_folder} to {sanitized_folder}')
else:
msg = ErrorNumbers.FB410.value + \
f' `experimentation_folder` : {type(experimentation_folder)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# at this point self._experimentation_folder is a str valid for a foldername
return self._experimentation_folder
set_nodes
set_nodes(nodes)
Sets the nodes filter + verifications on argument type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nodes | Union[List[str], None] | List of node_ids to filter the nodes to be involved in the experiment. | required |
Returns:
Type | Description |
---|---|
Union[List[str], None] | List of nodes that are set. None, if the argument |
Raises:
Type | Description |
---|---|
FedbiomedTypeError | Bad nodes type |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_nodes(self, nodes: Union[List[str], None]) -> Union[List[str], None]:
"""Sets the nodes filter + verifications on argument type
Args:
nodes: List of node_ids to filter the nodes to be involved in the experiment.
Returns:
List of nodes that are set. None, if the argument `nodes` is None.
Raises:
FedbiomedTypeError : Bad nodes type
"""
# immediately exit if setting nodes to None
if nodes is None:
self._nodes_filter = None
# set nodes
elif isinstance(nodes, list):
if not all(map(lambda node: isinstance(node, str), nodes)):
msg = ErrorNumbers.FB410.value + ' `nodes` argument must be a list of strings or None.'
logger.critical(msg)
raise FedbiomedTypeError(msg)
self._nodes_filter = nodes
else:
msg = ErrorNumbers.FB410.value + ' `nodes` argument must be a list of strings or None.'
logger.critical(msg)
raise FedbiomedTypeError(msg)
return self._nodes_filter
set_save_breakpoints
set_save_breakpoints(save_breakpoints)
Setter for save_breakpoints + verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
save_breakpoints | bool | whether to save breakpoints or not after each training round. Breakpoints can be used for resuming a crashed experiment. | required |
Returns:
Type | Description |
---|---|
bool | Status of saving breakpoints |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad save_breakpoints type |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_save_breakpoints(self, save_breakpoints: bool) -> bool:
""" Setter for save_breakpoints + verification on arguments type
Args:
save_breakpoints (bool): whether to save breakpoints or
not after each training round. Breakpoints can be used for resuming
a crashed experiment.
Returns:
Status of saving breakpoints
Raises:
FedbiomedExperimentError: bad save_breakpoints type
"""
if isinstance(save_breakpoints, bool):
self._save_breakpoints = save_breakpoints
# no warning if done during experiment, we may change breakpoint policy at any time
else:
msg = ErrorNumbers.FB410.value + f' `save_breakpoints` : {type(save_breakpoints)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
return self._save_breakpoints
set_secagg
set_secagg(secagg, scheme=SecureAggregationSchemes.LOM)
Sets secure aggregation status and scheme
Build secure aggregation controller/instance or sets given secure aggregation class
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secagg | Union[bool, SecureAggregation] | If True activates training request with secure aggregation by building | required |
scheme | SecureAggregationSchemes | Secure aggregation scheme to use. Ig a | LOM |
Returns:
Type | Description |
---|---|
Secure aggregation controller instance. |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type or value |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_secagg(
self,
secagg: Union[bool, SecureAggregation],
scheme: SecureAggregationSchemes = SecureAggregationSchemes.LOM):
"""Sets secure aggregation status and scheme
Build secure aggregation controller/instance or sets given
secure aggregation class
Args:
secagg: If True activates training request with secure aggregation by building
[`SecureAggregation`][fedbiomed.researcher.secagg.SecureAggregation] class
with default arguments. Or if argument is an instance of `SecureAggregation`
it does only assignment. Secure aggregation activation and configuration
depends on the instance provided.
scheme: Secure aggregation scheme to use. Ig a `SecureAggregation` object is provided,
the argument is not used, as the scheme comes from the object. Defaults is
SecureAggregationSchemes.LOM.
Returns:
Secure aggregation controller instance.
Raises:
FedbiomedExperimentError: bad argument type or value
"""
if not isinstance(scheme, SecureAggregationSchemes):
raise FedbiomedExperimentError(
f"{ErrorNumbers.FB410.value}: Expected `scheme` argument "
"`SecureAggregationSchemes`, but got {type(scheme)}")
if isinstance(secagg, bool):
self._secagg = SecureAggregation(scheme=scheme, active=secagg)
elif isinstance(secagg, SecureAggregation):
self._secagg = secagg
else:
raise FedbiomedExperimentError(
f"{ErrorNumbers.FB410.value}: Expected `secagg` argument bool or "
f"`SecureAggregation` but got {type(secagg)}")
return self._secagg
set_tags
set_tags(tags)
Sets tags and verification on argument type
Setting tags updates also training data by executing [set_training_data
].[fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_training_data] method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tags | Union[List[str], str] | List of string with data tags or string with one data tag. Empty list of tags ([]) means any dataset is accepted, it is different from None (tags not set, cannot search for training_data yet). | required |
Returns: List of tags that are set.
Raises:
Type | Description |
---|---|
FedbiomedTypeError | Bad tags type |
FedbiomedValueError | Some issue prevented resetting the training data after an inconsistency was detected |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_tags(
self,
tags: Union[List[str], str],
) -> List[str]:
"""Sets tags and verification on argument type
Setting tags updates also training data by executing
[`set_training_data`].[fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_training_data]
method.
Args:
tags: List of string with data tags or string with one data tag. Empty list
of tags ([]) means any dataset is accepted, it is different from None
(tags not set, cannot search for training_data yet).
Returns:
List of tags that are set.
Raises:
FedbiomedTypeError: Bad tags type
FedbiomedValueError: Some issue prevented resetting the training
data after an inconsistency was detected
"""
# preprocess the tags argument to correct typing
if not tags:
msg = f"{ErrorNumbers.FB410.value}: Invalid value for tags argument {tags}, tags " \
"should be non-empty list of str or non-empty str."
logger.critical(msg)
raise FedbiomedValueError(msg)
if isinstance(tags, list):
if not all(map(lambda tag: isinstance(tag, str), tags)):
msg = f"{ErrorNumbers.FB410.value}: `tags` must be a non-empty str or " \
"a non-empty list of str."
logger.critical(msg)
raise FedbiomedTypeError(msg)
# If it is empty list
tags_to_set = tags
elif isinstance(tags, str):
tags_to_set = [tags]
else:
msg = f"{ErrorNumbers.FB410.value} `tags` must be a non-empty str, " \
"a non-empty list of str"
logger.critical(msg)
raise FedbiomedTypeError(msg)
self._tags = tags_to_set
# Set training data
logger.info(
"Updating training data. This action will update FederatedDataset, "
"and the nodes that will participate to the experiment.")
self.set_training_data(None, from_tags=True)
return self._tags
set_training_data
set_training_data(training_data, from_tags=False)
Sets training data for federated training + verification on arguments type
The full expected behaviour when changing training data is given in the table below:
New value of training_data | from_tags | Outcome |
---|---|---|
dict or FederatedDataset | True | fail because user is attempting to set from tags but also providing a training_data argument |
dict or FederatedDataset | False | set fds attribute, set tags to None |
None | True | fail if tags are not set, else set fds attribute based tags |
None | False | set tags to None and keep same value and tags |
Setting to None forfeits consistency checks
Setting training_data to None does not trigger consistency checks, and may therefore leave the class in an inconsistent state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_data | Union[FederatedDataSet, dict, None] |
| required |
from_tags | bool | If True, query nodes for datasets when no | False |
Returns:
Type | Description |
---|---|
Union[FederatedDataSet, None] | FederatedDataSet metadata |
Raises:
Type | Description |
---|---|
FedbiomedTypeError | bad training_data or from_tags type. |
FedbiomedValueError | Invalid value for the arguments |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_training_data(
self,
training_data: Union[FederatedDataSet, dict, None],
from_tags: bool = False) -> \
Union[FederatedDataSet, None]:
"""Sets training data for federated training + verification on arguments type
The full expected behaviour when changing training data is given in the table below:
| New value of `training_data` | `from_tags` | Outcome |
| --- | --- | --- |
| dict or FederatedDataset | True | fail because user is attempting to set from tags but also providing a training_data argument|
| dict or FederatedDataset | False | set fds attribute, set tags to None |
| None | True | fail if tags are not set, else set fds attribute based tags |
| None | False | set tags to None and keep same value and tags |
!!! warning "Setting to None forfeits consistency checks"
Setting training_data to None does not trigger consistency checks, and may therefore leave the class in an
inconsistent state.
Args:
training_data:
* If it is a FederatedDataSet object, use this value as training_data.
* else if it is a dict, create and use a FederatedDataSet object from the dict
and use this value as training_data. The dict should use node ids as keys,
values being list of dicts (each dict representing a dataset on a node).
* else if it is None (no training data provided)
- if `from_tags` is True and `tags` is not None, set training_data by
searching for datasets with a query to the nodes using `tags` and `nodes`
- if `from_tags` is False or `tags` is None, set training_data to None (no training_data set yet,
experiment is not fully initialized and cannot be launched)
from_tags: If True, query nodes for datasets when no `training_data` is provided.
Not used when `training_data` is provided.
Returns:
FederatedDataSet metadata
Raises:
FedbiomedTypeError: bad training_data or from_tags type.
FedbiomedValueError: Invalid value for the arguments `training_data` or `from_tags`.
"""
if not isinstance(from_tags, bool):
msg = ErrorNumbers.FB410.value + \
f' `from_tags` : got {type(from_tags)} but expected a boolean'
logger.critical(msg)
raise FedbiomedTypeError(msg)
if from_tags and training_data is not None:
msg = ErrorNumbers.FB410.value + \
' set_training_data: cannot specify a training_data argument if ' \
'from_tags is True'
logger.critical(msg)
raise FedbiomedValueError(msg)
# case where no training data are passed
if training_data is None:
if from_tags is True:
if not self._tags:
msg = f"{ErrorNumbers.FB410.value}: attempting to " \
"set training data from undefined tags. Please consider set tags before " \
"using set_tags method of the experiment."
logger.critical(msg)
raise FedbiomedValueError(msg)
training_data = self._reqs.search(self._tags, self._nodes_filter)
else:
msg = f"{ErrorNumbers.FB410.value}: Can not set training data to `None`. " \
"Please set from_tags=True or provide a valid training data"
logger.critical(msg)
raise FedbiomedValueError(msg)
if isinstance(training_data, FederatedDataSet):
self._fds = training_data
elif isinstance(training_data, dict):
self._fds = FederatedDataSet(training_data)
else:
msg = ErrorNumbers.FB410.value + \
f' `training_data` has incorrect type: {type(training_data)}'
logger.critical(msg)
raise FedbiomedTypeError(msg)
# check and ensure consistency
self._tags = self._tags if from_tags else None
# return the new value
return self._fds
tags
tags()
Retrieves the tags from the experiment object.
Please see set_tags
to set tags.
Returns:
Type | Description |
---|---|
Union[List[str], None] | List of tags that has been set. |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def tags(self) -> Union[List[str], None]:
"""Retrieves the tags from the experiment object.
Please see [`set_tags`][fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_tags] to set tags.
Returns:
List of tags that has been set. `None` if it isn't declare yet.
"""
return self._tags
training_data
training_data()
Retrieves the training data which is an instance of FederatedDataset
This represents the dataset metadata available for the full federation.
Please see set_training_data
to set or update training data.
Returns:
Type | Description |
---|---|
Union[FederatedDataSet, None] | Object that contains metadata for the datasets of each node. |
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def training_data(self) -> Union[FederatedDataSet, None]:
"""Retrieves the training data which is an instance of
[`FederatedDataset`][fedbiomed.researcher.datasets.FederatedDataSet]
This represents the dataset metadata available for the full federation.
Please see [`set_training_data`][fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_training_data]
to set or update training data.
Returns:
Object that contains metadata for the datasets of each node. `None` if it isn't set yet.
"""
return self._fds
TrainingPlanWorkflow
TrainingPlanWorkflow(*args, training_plan_class=None, training_args=None, model_args=None, **kwargs)
Bases: FederatedWorkflow
, ABC
A TrainingPlanWorkflow
is an abstract entry point to orchestrate an experiment which uses a training plan.
In addition to the functionalities provided by FederatedWorkflow
, the TrainingPlanWorkflow
also manages the life-cycle of the training plan.
Use set_training_plan_class
to manage the training plan
Please only ever use the set_training_plan_class
function to manage the training plan. Do not set the training plan or training plan class directly!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_plan_class | Optional[TrainingPlanT] | training plan class to be used for training. For experiment to be properly and fully defined | None |
model_args | Optional[Dict] | contains model arguments passed to the constructor of the training plan when instantiating it : output and input feature dimension, etc. | None |
training_args | Optional[Union[TrainingArgs, dict]] | contains training arguments passed to the | None |
*args | Extra positional arguments from parent class | () | |
**kwargs | Arguments of parent class | {} |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def __init__(
self,
*args,
training_plan_class: Optional[TrainingPlanT] = None,
training_args: Optional[Union[TrainingArgs, dict]] = None,
model_args: Optional[Dict] = None,
**kwargs,
) -> None:
"""Constructor of the class.
Args:
training_plan_class: training plan class to be used for training.
For experiment to be properly and fully defined `training_plan_class`
needs to be a `TrainingPlanT` Defaults to None (no training plan class
defined yet.
model_args: contains model arguments passed to the constructor
of the training plan when instantiating it :
output and input feature dimension, etc.
training_args: contains training arguments passed to the `training_routine`
of the training plan when launching it: lr, epochs, batch_size...
*args: Extra positional arguments from parent class
[`FederatedWorkflow`][fedbiomed.researcher.federated_workflows.FederatedWorkflow]
**kwargs: Arguments of parent class
[`FederatedWorkflow`][fedbiomed.researcher.federated_workflows.FederatedWorkflow]
"""
# Check arguments
if training_plan_class is not None and not inspect.isclass(training_plan_class):
raise FedbiomedTypeError(
f"{ErrorNumbers.FB410.value}: bad type for argument "
f"`training_plan_class` {type(training_plan_class)}")
if training_plan_class is not None and \
not issubclass(training_plan_class, TRAINING_PLAN_TYPES):
raise FedbiomedTypeError(
f"{ErrorNumbers.FB410.value}: bad type for argument `training_plan_class`."
f" It is not subclass of supported training plans {TRAINING_PLAN_TYPES}")
# _training_plan_class determines the life-cycle of the training plan:
# if training_plass_class changes, then the training plan must be reinitialized
self._training_plan_class = None
# model args is also tied to the life-cycle of training plan:
# if model_args changes, the training plan must be reinitialized
self._model_args = None
# The _training_plan attribute represents the *actual instance*
# of a _training_plan_class that is currently
# being used in the workflow. The training plan cannot be modified by the user.
self._training_plan = None
self._training_args: Optional[TrainingArgs] = None # FIXME: is it ok to have this here?
# The _training_plan_file attribute represents the path of the file where the training plan is saved.
# It cannot be modified by the user
self._training_plan_file = None
# initialize object
super().__init__(*args, **kwargs)
self.set_training_args(training_args)
self.set_model_args(model_args)
self.set_training_plan_class(training_plan_class)
Functions
breakpoint
breakpoint(state, bkpt_number)
Saves breakpoint with the state of the workflow.
The following attributes will be saved:
- training_args
- training_plan_class
- model_args
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def breakpoint(self,
state,
bkpt_number) -> None:
"""
Saves breakpoint with the state of the workflow.
The following attributes will be saved:
- training_args
- training_plan_class
- model_args
"""
# save training plan to file
training_plan_module = 'model_' + str(uuid.uuid4())
training_plan_file = os.path.join(self.experimentation_path(), training_plan_module + '.py')
self.training_plan().save_code(training_plan_file)
state.update({
'model_args': self._model_args,
'training_plan_class_name': self._training_plan_class.__name__,
'training_args': self._training_args.get_state_breakpoint(),
})
breakpoint_path, breakpoint_file_name = \
choose_bkpt_file(self._experimentation_folder, bkpt_number - 1)
# rewrite paths in breakpoint : use the links in breakpoint directory
state['training_plan_path'] = create_unique_link(
breakpoint_path,
# - Need a file with a restricted characters set in name to be able to import as module
'model_' + str("{:04d}".format(bkpt_number - 1)), '.py',
# - Prefer relative path, eg for using experiment result after
# experiment in a different tree
os.path.join('..', os.path.basename(training_plan_file))
)
params_path = os.path.join(breakpoint_path, f"model_params_{uuid.uuid4()}.mpk")
Serializer.dump(self.training_plan().get_model_wrapper_class().get_weights(
only_trainable = False, exclude_buffers = False), params_path)
state['model_weights_path'] = params_path
super().breakpoint(state, bkpt_number)
check_training_plan_status
check_training_plan_status()
Method for checking training plan status, ie whether it is approved or not by the nodes
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | if the training data is not defined. |
Returns:
Type | Description |
---|---|
Dict | Training plan status for answering nodes |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def check_training_plan_status(self) -> Dict:
""" Method for checking training plan status, ie whether it is approved or not by the nodes
Raises:
FedbiomedExperimentError: if the training data is not defined.
Returns:
Training plan status for answering nodes
"""
if self.training_data() is None:
msg = f"{ErrorNumbers.FB410.value}. Cannot check training plan status: training data is not defined." \
f"Please either use the `set_tags` or `set_training_data` method to fix this."
logger.critical(msg)
raise FedbiomedExperimentError(msg)
job = TrainingPlanCheckJob(
nodes=self.training_data().node_ids(),
keep_files_dir=self.experimentation_path(),
experiment_id=self._experiment_id,
training_plan=self.training_plan()
)
responses = job.execute()
return responses
info
info(info=None, missing='')
Prints out the information about the current status of the experiment.
Lists all the parameters/arguments of the experiment and informs whether the experiment can be run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info | Optional[Dict] | Dictionary of sub-classes relevant attributes status that will be completed with some additional attributes status defined in this class. Defaults to None (no entries of sub-classes available or of importance). | None |
missing_object_to_check | dictionary mapping sub-classes attributes to attribute names, that may be needed to fully run the object. Defaults to None (no check will be performed). | required |
Returns:
Type | Description |
---|---|
Tuple[Dict[str, List[str]], str] | dictionary containing all pieces of information, with 2 entries: |
Raises:
Type | Description |
---|---|
KeyError | if |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def info(
self,
info: Optional[Dict] = None,
missing: str = ''
) -> Tuple[Dict[str, List[str]], str]:
"""Prints out the information about the current status of the experiment.
Lists all the parameters/arguments of the experiment and informs whether
the experiment can be run.
Args:
info: Dictionary of sub-classes relevant attributes status that will be
completed with some additional attributes status defined in this class.
Defaults to None (no entries of sub-classes available or of importance).
missing_object_to_check: dictionary mapping sub-classes attributes to
attribute names, that may be needed to fully run the object. Defaults
to None (no check will be performed).
Returns:
dictionary containing all pieces of information, with 2 entries:
`Arguments` mapping a list of all argument, and `Values` mapping
a list copntaining all the values.
Raises:
KeyError: if `Arguments` or `Values` entry is missing in passing argument `info`
"""
# at this point all attributes are initialized (in constructor)
if info is None:
info = self._create_default_info_structure()
info['Arguments'].extend([
'Training Plan Class',
'Model Arguments',
'Training Arguments'
])
info['Values'].extend(['\n'.join(findall('.{1,60}',
str(e))) for e in [
self._training_plan_class,
self._model_args,
self._training_args
]])
return super().info(info, missing)
load_breakpoint classmethod
load_breakpoint(breakpoint_folder_path=None)
Loads breakpoint (provided a breakpoint has been saved) so the workflow can be resumed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
breakpoint_folder_path | Optional[str] | path of the breakpoint folder. Path can be absolute or relative eg: "var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads the latest breakpoint of the latest workflow. Defaults to None. | None |
Returns:
Type | Description |
---|---|
Tuple[TrainingPlanWorkflowT, dict] | Reinitialized workflow object. |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type, error when reading breakpoint or bad loaded breakpoint content (corrupted) |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@classmethod
@exp_exceptions
def load_breakpoint(cls,
breakpoint_folder_path: Optional[str] = None) -> Tuple[TrainingPlanWorkflowT, dict]:
"""
Loads breakpoint (provided a breakpoint has been saved)
so the workflow can be resumed.
Args:
breakpoint_folder_path: path of the breakpoint folder. Path can be absolute or relative eg:
"var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads the latest breakpoint of the latest
workflow. Defaults to None.
Returns:
Reinitialized workflow object.
Raises:
FedbiomedExperimentError: bad argument type, error when reading breakpoint or bad loaded breakpoint
content (corrupted)
"""
loaded_exp, saved_state = super().load_breakpoint(breakpoint_folder_path)
# Define type for pylint
loaded_exp: TrainingPlanWorkflow
# Import TP class
_, tp_class = import_class_from_file(
module_path=saved_state.get("training_plan_path"),
class_name=saved_state.get("training_plan_class_name")
)
loaded_exp.set_model_args(saved_state["model_args"])
loaded_exp.set_training_plan_class(tp_class)
loaded_exp.set_training_args(
TrainingArgs.load_state_breakpoint(
saved_state.get('training_args')))
training_plan = loaded_exp.training_plan()
if training_plan is None:
msg = ErrorNumbers.FB413.value + ' - load failed, ' + \
'breakpoint file seems corrupted, `training_plan` is None'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
param_path = saved_state['model_weights_path']
params = Serializer.load(param_path)
loaded_exp.training_plan().get_model_wrapper_class().set_weights(params)
return loaded_exp, saved_state
model_args
model_args()
Retrieves model arguments.
Please see also set_model_args
Returns:
Type | Description |
---|---|
dict | The arguments that are going to be passed to the |
dict | initialization of the model instance |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def model_args(self) -> dict:
"""Retrieves model arguments.
Please see also [`set_model_args`][fedbiomed.researcher.federated_workflows.TrainingPlanWorkflow.set_model_args]
Returns:
The arguments that are going to be passed to the `init_model` function of the training plan during
initialization of the model instance
"""
return self._model_args
set_model_args
set_model_args(model_args, keep_weights=True)
Sets model_args
+ verification on arguments type
Resets the training plan
This function has an important (and intended!) side-effect: it resets the training_plan
attribute. By default, it tries to keep the same weights as the current training plan, if available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_args | dict | contains model arguments passed to the constructor of the training plan when instantiating it : output and input feature dimension, etc. | required |
keep_weights | bool | try to keep the same weights as the current training plan | True |
Returns:
Type | Description |
---|---|
dict | Model arguments that have been set. |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad model_args type |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def set_model_args(self,
model_args: dict,
keep_weights: bool = True) -> dict:
"""Sets `model_args` + verification on arguments type
!!! warning "Resets the training plan"
This function has an important (and intended!) side-effect: it resets the `training_plan` attribute.
By default, it tries to keep the same weights as the current training plan, if available.
Args:
model_args (dict): contains model arguments passed to the constructor
of the training plan when instantiating it : output and input feature
dimension, etc.
keep_weights: try to keep the same weights as the current training plan
Returns:
Model arguments that have been set.
Raises:
FedbiomedExperimentError : bad model_args type
"""
if model_args is None or isinstance(model_args, dict):
self._model_args = model_args
else:
# bad type
msg = ErrorNumbers.FB410.value + f' `model_args` : {type(model_args)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
# self._model_args always exist at this point
self._update_training_plan(keep_weights) # resets the training plan attribute
return self._model_args
set_training_args
set_training_args(training_args)
Sets training_args
+ verification on arguments type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_args | Union[dict, TrainingArgs, None] | contains training arguments passed to the training plan's | required |
Returns:
Type | Description |
---|---|
Union[dict, None] | Training arguments |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad training_args type |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def set_training_args(
self,
training_args: Union[dict, TrainingArgs, None]
) -> Union[dict, None]:
""" Sets `training_args` + verification on arguments type
Args:
training_args: contains training arguments passed to the
training plan's `training_routine` such as lr, epochs, batch_size...
Returns:
Training arguments
Raises:
FedbiomedExperimentError : bad training_args type
"""
if isinstance(training_args, TrainingArgs):
self._training_args = deepcopy(training_args)
elif isinstance(training_args, dict) or training_args is None:
self._training_args = TrainingArgs(training_args, only_required=False)
else:
msg = f"{ErrorNumbers.FB410.value} in function `set_training_args`. " \
"Expected type TrainingArgs, dict, or " \
f"None, got {type(training_args)} instead."
logger.critical(msg)
raise FedbiomedExperimentError(msg)
return self._training_args.dict()
set_training_plan_class
set_training_plan_class(training_plan_class, keep_weights=True)
Sets the training plan type + verification on arguments type
Resets the training plan
This function has an important (and intended!) side-effect: it resets the training_plan
attribute. By default, it tries to keep the same weights as the current training plan, if available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_plan_class | Union[TrainingPlanT, None] | training plan class to be used for training. For experiment to be properly and fully defined | required |
keep_weights | bool | try to keep the same weights as the current training plan | True |
Returns:
Type | Description |
---|---|
Union[TrainingPlanT, None] |
|
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad training_plan_class type |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def set_training_plan_class(self,
training_plan_class: Union[TrainingPlanT, None],
keep_weights: bool = True
) -> Union[TrainingPlanT, None]:
"""Sets the training plan type + verification on arguments type
!!! warning "Resets the training plan"
This function has an important (and intended!) side-effect: it resets the `training_plan` attribute.
By default, it tries to keep the same weights as the current training plan, if available.
Args:
training_plan_class: training plan class to be used for training.
For experiment to be properly and fully defined `training_plan_class` needs to be a `TrainingPlanT`
Defaults to None (no training plan class defined yet)
keep_weights: try to keep the same weights as the current training plan
Returns:
`training_plan_class` that is set for experiment
Raises:
FedbiomedExperimentError : bad training_plan_class type
"""
if training_plan_class is None:
self._training_plan_class = None
elif inspect.isclass(training_plan_class):
# training_plan_class must be a subclass of a valid training plan
if issubclass(training_plan_class, TRAINING_PLAN_TYPES):
# valid class
self._training_plan_class = training_plan_class
else:
# bad class
msg = ErrorNumbers.FB410.value + f' `training_plan_class` : {training_plan_class} class'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
else:
# bad type
msg = ErrorNumbers.FB410.value + f' `training_plan_class` of type: {type(training_plan_class)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
self._update_training_plan(keep_weights) # resets the training plan attribute
return self._training_plan_class
training_args
training_args()
Retrieves training arguments.
Please see also [set_training_args
][fedbiomed.researcher. federated_workflows.FederatedWorkflow.set_training_args]
Returns:
Type | Description |
---|---|
dict | The arguments that are going to be passed to the training plan's |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def training_args(self) -> dict:
"""Retrieves training arguments.
Please see also [`set_training_args`][fedbiomed.researcher.\
federated_workflows.FederatedWorkflow.set_training_args]
Returns:
The arguments that are going to be passed to the training plan's
`training_routine` to perfom training on the node side. An example
training routine: [`TorchTrainingPlan.training_routine`]
[fedbiomed.common.training_plans.TorchTrainingPlan.training_routine]
"""
return self._training_args.dict()
training_plan
training_plan()
Retrieves the training plan instance currently being used in the federated workflow.
Returns:
Type | Description |
---|---|
Optional[TrainingPlan] | training plan: the training plan instance |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def training_plan(self) -> Optional[TrainingPlan]:
"""Retrieves the training plan instance currently being used in the federated workflow.
Returns:
training plan: the training plan instance
"""
return self._training_plan
training_plan_approve
training_plan_approve(description='no description provided')
Send a training plan and a ApprovalRequest message to node(s).
This is a simple redirect to the Requests.training_plan_approve() method.
If a list of node id(s) is provided, the message will be individually sent to all nodes of the list. If the node id(s) list is None (default), the message is broadcast to all nodes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
description | str | Description for training plan approve request | 'no description provided' |
Returns:
Name | Type | Description |
---|---|---|
dict | a dictionary of pairs (node_id: status), where status indicates to the researcher | |
dict | that the training plan has been correctly downloaded on the node side. | |
Warning | dict | status does not mean that the training plan is approved, only that it has been added |
dict | to the "approval queue" on the node side. |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def training_plan_approve(self,
description: str = "no description provided") -> dict:
"""Send a training plan and a ApprovalRequest message to node(s).
This is a simple redirect to the Requests.training_plan_approve() method.
If a list of node id(s) is provided, the message will be individually sent
to all nodes of the list.
If the node id(s) list is None (default), the message is broadcast to all nodes.
Args:
description: Description for training plan approve request
Returns:
a dictionary of pairs (node_id: status), where status indicates to the researcher
that the training plan has been correctly downloaded on the node side.
Warning: status does not mean that the training plan is approved, only that it has been added
to the "approval queue" on the node side.
"""
job = TrainingPlanApproveJob(
nodes=self.training_data().node_ids(),
keep_files_dir=self.experimentation_path(),
training_plan=self.training_plan(),
description=description,
)
responses = job.execute()
return responses
training_plan_class
training_plan_class()
Retrieves the type of the training plan that is created for training.
Please see also set_training_plan_class
.
Returns:
Name | Type | Description |
---|---|---|
training_plan_class | Optional[TrainingPlanT] | the class type of the training plan. |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def training_plan_class(self) -> Optional[TrainingPlanT]:
"""Retrieves the type of the training plan that is created for training.
Please see also
[`set_training_plan_class`][fedbiomed.researcher.federated_workflows.TrainingPlanWorkflow.set_training_plan_class].
Returns:
training_plan_class: the class type of the training plan.
"""
return self._training_plan_class
training_plan_file
training_plan_file(display=True)
Retrieves the path of the file where the training plan is saved, and optionally displays it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
display | bool | If | True |
Returns:
Type | Description |
---|---|
str | Path to the training plan file |
Raises:
Type | Description |
---|---|
FedbiomedExperimentError | bad argument type, or cannot read training plan file content |
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def training_plan_file(self, display: bool = True) -> str:
"""Retrieves the path of the file where the training plan is saved, and optionally displays it.
Args:
display: If `True`, prints the content of the training plan file. Default is `True`
Returns:
Path to the training plan file
Raises:
FedbiomedExperimentError: bad argument type, or cannot read training plan file content
"""
if not isinstance(display, bool):
# bad type
msg = ErrorNumbers.FB410.value + \
f', in method `training_plan_file` param `display` : type {type(display)}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
if display and self._training_plan_file is not None:
try:
with open(self._training_plan_file) as file:
content = file.read()
file.close()
print(content)
except OSError as e:
# cannot read training plan file content
msg = ErrorNumbers.FB412.value + \
f', in method `training_plan_file` : error when reading training plan file - {e}'
logger.critical(msg)
raise FedbiomedExperimentError(msg)
return self._training_plan_file