Federated Workflows

Classes

Experiment

Experiment(*args, aggregator=None, agg_optimizer=None, node_selection_strategy=None, round_limit=None, tensorboard=False, retain_full_history=True, **kwargs)

Bases: TrainingPlanWorkflow

A Federated Learning Experiment based on a Training Plan.

This class provides a comprehensive entry point for the management and orchestration of a FL experiment, including definition, execution, and interpretation of results.

Managing model parameters

The model parameters should be managed through the corresponding methods in the training_plan by accessing the experiment's training_plan() attribute and using the set_model_params and get_model_params functions, e.g.

exp.training_plan().set_model_params(params_dict)

Do not set the training plan attribute directly

Setting the training_plan attribute directly is not allowed. Instead, use the set_training_plan_class method to set the training plan type, and the underlying model will be correctly constructed and initialized.

Parameters:

Name Type Description Default
aggregator Optional[Aggregator]

object defining the method for aggregating local updates. Default to None (use FedAverage for aggregation)

None
agg_optimizer Optional[Optimizer]

Optimizer instance, to refine aggregated model updates prior to their application. If None, merely apply the aggregated updates.

None
node_selection_strategy Optional[Strategy]

object defining how nodes are sampled at each round for training, and how non-responding nodes are managed. Defaults to None: - use DefaultStrategy if training_data is initialized - else strategy is None (cannot be initialized), experiment cannot be launched yet

None
round_limit Union[int, None]

the maximum number of training rounds (nodes <-> central server) that should be executed for the experiment. None means that no limit is defined. Defaults to None.

None
tensorboard bool

whether to save scalar values for displaying in Tensorboard during training for each node. Currently, it is only used for loss values. - If it is true, monitor instantiates a Monitor object that write scalar logs into ./runs directory. - If it is False, it stops monitoring if it was active.

False
retain_full_history bool

whether to retain in memory the full history of node replies and aggregated params for the experiment. If False, only the last round's replies and aggregated params will be available. Defaults to True.

True
*args

Extra positional arguments from parent class TrainingPlanWorkflow

()
**kwargs

Arguments of parent class TrainingPlanWorkflow

{}
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def __init__(
    self,
    *args,
    aggregator: Optional[Aggregator] = None,
    agg_optimizer: Optional[Optimizer] = None,
    node_selection_strategy: Optional[Strategy] = None,
    round_limit: Union[int, None] = None,
    tensorboard: bool = False,
    retain_full_history: bool = True,
    **kwargs
) -> None:
    """Constructor of the class.

    Args:
        aggregator: object defining the method for aggregating
            local updates. Default to None (use
            [`FedAverage`][fedbiomed.researcher.aggregators.FedAverage] for aggregation)

        agg_optimizer: [`Optimizer`][fedbiomed.common.optimizers.Optimizer] instance,
            to refine aggregated model updates prior to their application. If None,
            merely apply the aggregated updates.

        node_selection_strategy: object defining how nodes are sampled at
            each round for training, and how non-responding nodes are managed.
            Defaults to None:
            - use [`DefaultStrategy`][fedbiomed.researcher.strategies.DefaultStrategy]
                if training_data is initialized
            - else strategy is None (cannot be initialized), experiment cannot be launched yet

        round_limit: the maximum number of training rounds (nodes <-> central server)
            that should be executed for the experiment. `None` means that no limit is
            defined. Defaults to None.

        tensorboard: whether to save scalar values  for displaying in Tensorboard
            during training for each node. Currently, it is only used for loss values.
            - If it is true, monitor instantiates a `Monitor` object
                that write scalar logs into `./runs` directory.
            - If it is False, it stops monitoring if it was active.

        retain_full_history: whether to retain in memory the full history
            of node replies and aggregated params for the experiment. If False, only the
            last round's replies and aggregated params will be available. Defaults to True.
        *args: Extra positional arguments from parent class
            [`TrainingPlanWorkflow`][fedbiomed.researcher.federated_workflows.TrainingPlanWorkflow]
        **kwargs: Arguments of parent class
            [`TrainingPlanWorkflow`][fedbiomed.researcher.federated_workflows.TrainingPlanWorkflow]
    """
    # define new members
    self._node_selection_strategy: Strategy = None
    self._round_limit = None
    self._monitor = None
    self._aggregator = None
    self._agg_optimizer = None
    self.aggregator_args = {}
    self._aggregated_params = {}
    self._training_replies: Dict = {}
    self._retain_full_history = None

    # initialize object
    super().__init__(*args, **kwargs)

    # set self._aggregator : type Aggregator
    self.set_aggregator(aggregator)

    # set self._agg_optimizer: type Optional[Optimizer]
    self.set_agg_optimizer(agg_optimizer)

    # set self._node_selection_strategy: type Union[Strategy, None]
    self.set_strategy(node_selection_strategy)

    # "current" means number of rounds already trained
    self._set_round_current(0)
    self.set_round_limit(round_limit)

    # always create a monitoring process
    self._monitor = Monitor()
    self._reqs.add_monitor_callback(self._monitor.on_message_handler)
    self.set_tensorboard(tensorboard)

    # whether to retain the full experiment history or not
    self.set_retain_full_history(retain_full_history)

Attributes

aggregator_args instance-attribute
aggregator_args = {}

Functions

agg_optimizer
agg_optimizer()

Retrieves the optional Optimizer used to refine aggregated model updates.

To set or update that optimizer: set_agg_optimizer.

Returns:

Type Description
Optional[Optimizer]

An Optimizer instance,

Optional[Optimizer]

or None.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def agg_optimizer(self) -> Optional[Optimizer]:
    """Retrieves the optional Optimizer used to refine aggregated model updates.

    To set or update that optimizer:
    [`set_agg_optimizer`][fedbiomed.researcher.federated_workflows.Experiment.set_agg_optimizer].

    Returns:
        An [Optimizer][fedbiomed.common.optimizers.Optimizer] instance,
        or None.
    """
    return self._agg_optimizer
aggregated_params
aggregated_params()

Retrieves all aggregated parameters of each round of training

Returns:

Type Description
dict

Dictionary of aggregated parameters keys stand for each round of training

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def aggregated_params(self) -> dict:
    """Retrieves all aggregated parameters of each round of training

    Returns:
        Dictionary of aggregated parameters keys stand for each round of training
    """

    return self._aggregated_params
aggregator
aggregator()

Retrieves aggregator class that will be used for aggregating model parameters.

To set or update aggregator: set_aggregator.

Returns:

Type Description
Aggregator

A class or an object that is an instance of Aggregator

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def aggregator(self) -> Aggregator:
    """Retrieves aggregator class that will be used for aggregating model parameters.

    To set or update aggregator:
    [`set_aggregator`][fedbiomed.researcher.federated_workflows.Experiment.set_aggregator].

    Returns:
        A class or an object that is an instance of [Aggregator][fedbiomed.researcher.aggregators.Aggregator]

    """
    return self._aggregator
breakpoint
breakpoint()

Saves breakpoint with the state of the training at a current round.

The following Experiment attributes will be saved:

  • round_current
  • round_limit
  • aggregator
  • agg_optimizer
  • node_selection_strategy
  • aggregated_params
Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def breakpoint(self) -> None:
    """
    Saves breakpoint with the state of the training at a current round.

    The following Experiment attributes will be saved:

      - round_current
      - round_limit
      - aggregator
      - agg_optimizer
      - node_selection_strategy
      - aggregated_params
    """
    # need to have run at least 1 round to save a breakpoint
    if self._round_current < 1:
        msg = ErrorNumbers.FB413.value + \
            ' - need to run at least 1 before saving a breakpoint'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    # conditions are met, save breakpoint
    breakpoint_path, breakpoint_file_name = \
        choose_bkpt_file(self._experimentation_folder, self._round_current - 1)

    # predefine several breakpoint states
    agg_bkpt = None
    agg_optim_bkpt = None
    strategy_bkpt = None
    training_replies_bkpt  = None
    if self._aggregator is not None:
        agg_bkpt = self._aggregator.save_state_breakpoint(breakpoint_path,
                                                          global_model=self.training_plan().after_training_params())
    if self._agg_optimizer is not None:
        # FIXME: harmonize naming of save_object
        agg_optim_bkpt = self.save_optimizer(breakpoint_path)
    if self._node_selection_strategy is not None:
        strategy_bkpt = self._node_selection_strategy.save_state_breakpoint()
    if self._training_replies is not None:
        training_replies_bkpt = self.save_training_replies()

    state = {
        'round_current': self._round_current,
        'round_limit': self._round_limit,
        'aggregator': agg_bkpt,
        'agg_optimizer': agg_optim_bkpt,
        'node_selection_strategy': strategy_bkpt,
        'aggregated_params': self.save_aggregated_params(
            self._aggregated_params, breakpoint_path),
        'training_replies': training_replies_bkpt,
    }

    super().breakpoint(state, self._round_current)
commit_experiment_history
commit_experiment_history(training_replies, aggregated_params)

Commits the experiment history to memory.

The experiment history is defined as
  • training replies
  • aggregated parameters

This function checks the retain_full_history flag: if it is True, it simply adds (or overwrites) the current round's entry for the training_replies and aggregated_params dictionary. If the flag is set to False, we simply store the last round's values in the same dictionary format.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
def commit_experiment_history(self,
                              training_replies: Dict[str, Dict[str, Any]],
                              aggregated_params: Dict[str, Any]) -> None:
    """Commits the experiment history to memory.

    The experiment history is defined as:
        - training replies
        - aggregated parameters

    This function checks the retain_full_history flag: if it is True, it simply adds
    (or overwrites) the current round's entry for the training_replies and aggregated_params
    dictionary. If the flag is set to False, we simply store the last round's values in the
    same dictionary format.
    """
    if self._retain_full_history:
        # append to history
        self._training_replies[self._round_current] = training_replies
        self._aggregated_params[self._round_current] = {'params': aggregated_params}
    else:
        # only store the last round's values
        self._training_replies = {self._round_current: training_replies}
        self._aggregated_params = {self._round_current: {'params': aggregated_params}}
info
info()

Prints out the information about the current status of the experiment.

Lists all the parameters/arguments of the experiment and informs whether the experiment can be run.

Raises:

Type Description
FedbiomedExperimentError

Inconsistent experiment due to missing variables

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def info(self) -> Tuple[Dict[str, List[str]], str]:
    """Prints out the information about the current status of the experiment.

    Lists  all the parameters/arguments of the experiment and informs whether the experiment can be run.

    Raises:
        FedbiomedExperimentError: Inconsistent experiment due to missing variables
    """
    # at this point all attributes are initialized (in constructor)

    info = self._create_default_info_structure()

    info['Arguments'].extend([
        'Aggregator',
        'Strategy',
        'Aggregator Optimizer',
        'Rounds already run',
        'Rounds total',
        'Breakpoint State',
    ])
    info['Values'].extend(['\n'.join(findall('.{1,60}',
                                     str(e))) for e in [
        self._aggregator.aggregator_name if self._aggregator is not None else None,
        self._node_selection_strategy,
        self._agg_optimizer,
        self._round_current,
        self._round_limit,
        self._save_breakpoints,
    ]])

    missing = self._check_missing_objects()
    return super().info(info, missing)
load_breakpoint classmethod
load_breakpoint(breakpoint_folder_path=None)

Loads breakpoint (provided a breakpoint has been saved) so experience can be resumed. Useful if training has crashed researcher side or if user wants to resume a given experiment.

Parameters:

Name Type Description Default
cls Type[TExperiment]

Experiment class

required
breakpoint_folder_path Union[str, None]

path of the breakpoint folder. Path can be absolute or relative eg: "var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads latest breakpoint of the latest experiment. Defaults to None.

None

Returns:

Type Description
TExperiment

Reinitialized experiment object. With given object, user can then use .run() method to pursue model training.

Raises:

Type Description
FedbiomedExperimentError

bad argument type, error when reading breakpoint or bad loaded breakpoint content (corrupted)

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@classmethod
@exp_exceptions
def load_breakpoint(cls: Type[TExperiment],
                    breakpoint_folder_path: Union[str, None] = None) -> TExperiment:
    """
    Loads breakpoint (provided a breakpoint has been saved)
    so experience can be resumed. Useful if training has crashed
    researcher side or if user wants to resume a given experiment.

    Args:
      cls: Experiment class
      breakpoint_folder_path: path of the breakpoint folder. Path can be absolute or relative eg:
        "var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads latest breakpoint of the latest
        experiment. Defaults to None.

    Returns:
        Reinitialized experiment object. With given object, user can then use `.run()`
            method to pursue model training.

    Raises:
        FedbiomedExperimentError: bad argument type, error when reading breakpoint or bad loaded breakpoint
            content (corrupted)
    """
    loaded_exp, saved_state = super().load_breakpoint(breakpoint_folder_path)
    # retrieve breakpoint sampling strategy
    bkpt_sampling_strategy_args = saved_state.get("node_selection_strategy")
    bkpt_sampling_strategy = cls._create_object(bkpt_sampling_strategy_args)
    loaded_exp.set_strategy(bkpt_sampling_strategy)
    # retrieve breakpoint researcher optimizer
    bkpt_optim = Experiment._load_optimizer(saved_state.get("agg_optimizer"))
    loaded_exp.set_agg_optimizer(bkpt_optim)
    # changing `Experiment` attributes
    loaded_exp._set_round_current(saved_state.get('round_current'))
    loaded_exp._aggregated_params = loaded_exp._load_aggregated_params(
        saved_state.get('aggregated_params')
    )
    # retrieve and change aggregator
    bkpt_aggregator_args = saved_state.get("aggregator")
    bkpt_aggregator = cls._create_object(bkpt_aggregator_args, training_plan=loaded_exp.training_plan())
    loaded_exp.set_aggregator(bkpt_aggregator)
    # load training replies
    loaded_exp.load_training_replies(saved_state.get("training_replies"))
    logger.info(f"Experimentation reload from {breakpoint_folder_path} successful!")

    return loaded_exp
load_training_replies
load_training_replies(bkpt_training_replies)

Reads training replies from a formatted breakpoint file.

Builds a job training replies data structure .

Parameters:

Name Type Description Default
bkpt_training_replies Dict[int, Dict[str, Dict[str, Any]]]

Extract from training replies saved in breakpoint

required

Returns:

Type Description
None

Training replies of already executed rounds of the experiment

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
def load_training_replies(
    self,
    bkpt_training_replies: Dict[int, Dict[str, Dict[str, Any]]]
) -> None:
    """Reads training replies from a formatted breakpoint file.

    Builds a job training replies data structure .

    Args:
        bkpt_training_replies: Extract from training replies saved in breakpoint

    Returns:
        Training replies of already executed rounds of the experiment
    """
    if not bkpt_training_replies:
        logger.warning("No Replies has been found in this breakpoint")

    rounds = set(bkpt_training_replies.keys())
    for round_ in rounds:
        # reload parameters from file params_path
        for node in bkpt_training_replies[round_].values():
            node["params"] = Serializer.load(node["params_path"])
        bkpt_training_replies[int(round_)] = bkpt_training_replies.pop(round_)

    self._training_replies = bkpt_training_replies
monitor
monitor()

Retrieves the monitor object

Monitor is responsible for receiving and parsing real-time training and validation feed-back from each node participate to federated training. See Monitor

Returns:

Type Description
Monitor

Monitor object that will always exist with experiment to retrieve feed-back from the nodes.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def monitor(self) -> Monitor:
    """Retrieves the monitor object

    Monitor is responsible for receiving and parsing real-time training and validation feed-back from each node
    participate to federated training. See [`Monitor`][fedbiomed.researcher.monitor.Monitor]

    Returns:
        Monitor object that will always exist with experiment to retrieve feed-back from the nodes.
    """
    return self._monitor
retain_full_history
retain_full_history()

Retrieves the status of whether the full experiment history should be kept in memory.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def retain_full_history(self):
    """Retrieves the status of whether the full experiment history should be kept in memory."""
    return self._retain_full_history
round_current
round_current()

Retrieves the round where the experiment is at.

Returns:

Type Description
int

Indicates the round number that the experiment will perform next.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def round_current(self) -> int:
    """Retrieves the round where the experiment is at.

    Returns:
        Indicates the round number that the experiment will perform next.
    """
    return self._round_current
round_limit
round_limit()

Retrieves the round limit from the experiment object.

Please see also set_round_limit to change or set round limit.

Returns:

Type Description
Union[int, None]

Round limit that shows maximum number of rounds that can be performed. None if it isn't declared yet.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def round_limit(self) -> Union[int, None]:
    """Retrieves the round limit from the experiment object.

    Please see  also [`set_round_limit`][fedbiomed.researcher.federated_workflows.Experiment.set_round_limit]
    to change or set round limit.

    Returns:
        Round limit that shows maximum number of rounds that can be performed. `None` if it isn't declared yet.
    """
    return self._round_limit
run
run(rounds=None, increase=False)

Run one or more rounds of an experiment, continuing from the point the experiment had reached.

Parameters:

Name Type Description Default
rounds Optional[int]

Number of experiment rounds to run in this call. * None means "run all the rounds remaining in the experiment" computed as maximum rounds (round_limit for this experiment) minus the number of rounds already run rounds (round_current for this experiment). It does nothing and issues a warning if round_limit is None (no round limit defined for the experiment) * int >= 1 means "run at most rounds rounds". If round_limit is None for the experiment, run exactly rounds rounds. If a round_limit is set for the experiment and the number or rounds would increase beyond the round_limit of the experiment: - if increase is True, increase the round_limit to (round_current + rounds) and run rounds rounds - if increase is False, run (round_limit - round_current) rounds, don't modify the maximum round_limit of the experiment and issue a warning.

None
increase bool

automatically increase the round_limit of the experiment for executing the specified number of rounds. Does nothing if round_limit is None or rounds is None. Defaults to False

False

Returns:

Type Description
int

Number of rounds have been run

Raises:

Type Description
FedbiomedExperimentError

bad argument type or value

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def run(self, rounds: Optional[int] = None, increase: bool = False) -> int:
    """Run one or more rounds of an experiment, continuing from the point the
    experiment had reached.

    Args:
        rounds: Number of experiment rounds to run in this call.
            * `None` means "run all the rounds remaining in the experiment" computed as
                maximum rounds (`round_limit` for this experiment) minus the number of
                rounds already run rounds (`round_current` for this experiment).
                It does nothing and issues a warning if `round_limit` is `None` (no
                round limit defined for the experiment)
            * `int` >= 1 means "run at most `rounds` rounds".
                If `round_limit` is `None` for the experiment, run exactly `rounds` rounds.
                If a `round_limit` is set for the experiment and the number or rounds would
            increase beyond the `round_limit` of the experiment:
            - if `increase` is True, increase the `round_limit` to
              (`round_current` + `rounds`) and run `rounds` rounds
            - if `increase` is False, run (`round_limit` - `round_current`)
              rounds, don't modify the maximum `round_limit` of the experiment
              and issue a warning.
        increase: automatically increase the `round_limit`
            of the experiment for executing the specified number of `rounds`.
            Does nothing if `round_limit` is `None` or `rounds` is None.
            Defaults to False

    Returns:
        Number of rounds have been run

    Raises:
        FedbiomedExperimentError: bad argument type or value
    """
    # check rounds is a >=1 integer or None
    if rounds is None:
        pass
    else:
        msg = ErrorNumbers.FB410.value + \
            f', in method `run` param `rounds` : value {rounds}'
        self._check_round_value_consistency(rounds, msg)

    # check increase is a boolean
    if not isinstance(increase, bool):
        msg = ErrorNumbers.FB410.value + \
            f', in method `run` param `increase` : type {type(increase)}'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    # compute number of rounds to run + updated rounds limit
    if rounds is None:
        if isinstance(self._round_limit, int):
            # run all remaining rounds in the experiment
            rounds = self._round_limit - self._round_current
            if rounds == 0:
                # limit already reached
                logger.warning(f'Round limit of {self._round_limit} already reached '
                               'for this experiment, do nothing.')
                return 0
        else:
            # cannot run if no number of rounds given and no round limit exists
            logger.warning('Cannot run, please specify a number of `rounds` to run or '
                           'set a `round_limit` to the experiment')
            return 0

    else:
        # at this point, rounds is an int >= 1
        if isinstance(self._round_limit, int):
            if (self._round_current + rounds) > self._round_limit:
                if increase:
                    # dont change rounds, but extend self._round_limit as necessary
                    logger.debug(f'Auto increasing total rounds for experiment from {self._round_limit} '
                                 f'to {self._round_current + rounds}')
                    self._round_limit = self._round_current + rounds
                else:
                    new_rounds = self._round_limit - self._round_current
                    if new_rounds == 0:
                        # limit already reached
                        logger.warning(f'Round limit of {self._round_limit} already reached '
                                       'for this experiment, do nothing.')
                        return 0
                    else:
                        # reduce the number of rounds to run in the experiment
                        logger.warning(f'Limit of {self._round_limit} rounds for the experiment '
                                       f'will be reached, reducing the number of rounds for this '
                                       f'run from {rounds} to {new_rounds}')
                        rounds = new_rounds

    # FIXME: should we print warning if both rounds and _round_limit are None?
    # At this point `rounds` is an int > 0 (not None)

    # run the rounds
    for _ in range(rounds):
        if isinstance(self._round_limit, int) and self._round_current == (self._round_limit - 1) \
                and self._training_args['test_on_global_updates'] is True:
            # Do "validation after a round" only if this a round limit is defined and we reached it
            # and validation is active on global params
            # When this condition is met, it also means we are running the last of
            # the `rounds` rounds in this function
            test_after = True
        else:
            test_after = False

        increment = self.run_once(increase=False, test_after=test_after)

        if increment == 0:
            # should not happen
            msg = ErrorNumbers.FB400.value + \
                f', in method `run` method `run_once` returns {increment}'
            logger.critical(msg)
            raise FedbiomedExperimentError(msg)

    return rounds
run_once
run_once(increase=False, test_after=False)

Run at most one round of an experiment, continuing from the point the experiment had reached.

If round_limit is None for the experiment (no round limit defined), run one round. If round_limit is not None and the round_limit of the experiment is already reached: * if increase is False, do nothing and issue a warning * if increase is True, increment total number of round round_limit and run one round

Parameters:

Name Type Description Default
increase bool

automatically increase the round_limit of the experiment if needed. Does nothing if round_limit is None. Defaults to False

False
test_after bool

if True, do a second request to the nodes after the round, only for validation on aggregated params. Intended to be used after the last training round of an experiment. Defaults to False.

False

Returns:

Type Description
int

Number of rounds really run

Raises:

Type Description
FedbiomedExperimentError

bad argument type or value

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def run_once(self, increase: bool = False, test_after: bool = False) -> int:
    """Run at most one round of an experiment, continuing from the point the
    experiment had reached.

    If `round_limit` is `None` for the experiment (no round limit defined), run one round.
    If `round_limit` is not `None` and the `round_limit` of the experiment is already reached:
    * if `increase` is False, do nothing and issue a warning
    * if `increase` is True, increment total number of round `round_limit` and run one round

    Args:
        increase: automatically increase the `round_limit` of the experiment if needed. Does nothing if
            `round_limit` is `None`. Defaults to False
        test_after: if True, do a second request to the nodes after the round, only for validation on aggregated
            params. Intended to be used after the last training round of an experiment. Defaults to False.

    Returns:
        Number of rounds really run

    Raises:
        FedbiomedExperimentError: bad argument type or value
    """
    # check increase is a boolean
    if not isinstance(increase, bool):
        msg = ErrorNumbers.FB410.value + \
            f', in method `run_once` param `increase` : type {type(increase)}'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    # nota:  we should never have self._round_current > self._round_limit, only ==
    if self._round_limit is not None and self._round_current >= self._round_limit:
        if increase is True:
            logger.debug(f'Auto increasing total rounds for experiment from {self._round_limit} '
                         f'to {self._round_current + 1}')
            self._round_limit = self._round_current + 1
        else:
            logger.warning(f'Round limit of {self._round_limit} was reached, do nothing')
            return 0

    # check pre-requisites are met for running a round
    # From here, node_selection_strategy is never None
    # if self._node_selection_strategy is None:
    #     msg = ErrorNumbers.FB411.value + ', missing `node_selection_strategy`'
    #     logger.critical(msg)
    #     raise FedbiomedExperimentError(msg)

    missing = self._check_missing_objects()
    if missing:
        raise FedbiomedExperimentError(ErrorNumbers.FB411.value + ': missing one or several object needed for'
                                       ' starting the `Experiment`. Details:\n' + missing)
    # Sample nodes for training

    training_nodes = self._node_selection_strategy.sample_nodes(
        from_nodes=self.filtered_federation_nodes(),
        round_i=self._round_current
    )
    # Setup Secure Aggregation (it's a noop if not active)
    secagg_arguments = self.secagg_setup(training_nodes)

    # Setup aggregator
    self._aggregator.set_training_plan_type(self.training_plan().type())
    self._aggregator.check_values(n_updates=self._training_args.get('num_updates'),
                                  training_plan=self.training_plan())
    model_params_before_round = self.training_plan().after_training_params()
    aggregator_args = self._aggregator.create_aggregator_args(model_params_before_round,
                                                              training_nodes)

    # Collect auxiliary variables from the aggregator optimizer, if any.
    optim_aux_var = self._collect_optim_aux_var()

    # update node states when list of nodes has changed from one round to another
    self._update_nodes_states_agent(before_training=True)
    # TODO check node state agent
    nodes_state_ids = self._node_state_agent.get_last_node_states()

    # if fds is updated, aggregator should be updated too
    job = TrainingJob(nodes=training_nodes,
                      keep_files_dir=self.experimentation_path(),
                      experiment_id=self._experiment_id,
                      round_=self._round_current,
                      training_plan=self.training_plan(),
                      training_args=self._training_args,
                      model_args=self.model_args(),
                      data=self._fds,
                      nodes_state_ids=nodes_state_ids,
                      aggregator_args=aggregator_args,
                      do_training=True,
                      secagg_arguments=secagg_arguments,
                      optim_aux_var=optim_aux_var
                      )

    logger.info('Sampled nodes in round ' + str(self._round_current) + ' ' + str(job.nodes))

    training_replies, aux_vars = job.execute()

    # update node states with node answers + when used node list has changed during the round
    self._update_nodes_states_agent(before_training=False, training_replies=training_replies)

    # refining/normalizing model weights received from nodes
    model_params, weights, total_sample_size, encryption_factors = self._node_selection_strategy.refine(
        training_replies, self._round_current)

    if self._secagg.active:
        flatten_params = self._secagg.aggregate(
            round_=self._round_current,
            encryption_factors=encryption_factors,
            total_sample_size=total_sample_size,
            model_params=model_params,
            num_expected_params=len(self.training_plan().get_model_wrapper_class().flatten(
                exclude_buffers = not self.training_args()['share_persistent_buffers']))
        )
        # FIXME: Access TorchModel through non-private getter once it is implemented
        aggregated_params: Dict[str, Union[torch.tensor, np.ndarray]] = (
            self.training_plan().get_model_wrapper_class().unflatten(
                flatten_params, exclude_buffers = not self.training_args()['share_persistent_buffers'])
        )

    else:
        # aggregate models from nodes to a global model
        aggregated_params = self._aggregator.aggregate(model_params,
                                                       weights,
                                                       global_model=model_params_before_round,
                                                       training_plan=self.training_plan(),
                                                       training_replies=training_replies,
                                                       node_ids=job.nodes,
                                                       n_updates=self._training_args.get('num_updates'),
                                                       n_round=self._round_current)

    # Optionally refine the aggregated updates using an Optimizer.
    self._process_optim_aux_var(aux_vars)
    aggregated_params = self._run_agg_optimizer(self.training_plan(),
                                                aggregated_params)

    # Update the training plan with the aggregated parameters
    self.training_plan().set_model_params(aggregated_params)

    # Update experiment's in-memory history
    self.commit_experiment_history(training_replies, aggregated_params)

    # Increase round number (should be incremented before call to `breakpoint`)
    self._set_round_current(self._round_current + 1)
    if self._save_breakpoints:
        self.breakpoint()

    # do final validation after saving breakpoint :
    # not saved in breakpoint for current round, but more simple
    if test_after:
        # FIXME: should we sample nodes here too?
        aggr_args = self._aggregator.create_aggregator_args(self.training_plan().after_training_params(),
                                                            training_nodes)

        job = TrainingJob(nodes=training_nodes,
                          keep_files_dir=self.experimentation_path(),
                          experiment_id=self._experiment_id,
                          round_=self._round_current,
                          training_plan=self.training_plan(),
                          training_args=self._training_args,
                          model_args=self.model_args(),
                          data=self._fds,
                          nodes_state_ids=nodes_state_ids,
                          aggregator_args=aggr_args,
                          do_training=False
                          )
        job.execute()


    return 1
save_aggregated_params staticmethod
save_aggregated_params(aggregated_params_init, breakpoint_path)

Extract and format fields from aggregated_params that need to be saved in breakpoint.

Creates link to the params file from the breakpoint_path and use them to reference the params files.

Parameters:

Name Type Description Default
aggregated_params_init dict

aggregated parameters

required
breakpoint_path str

path to the directory where breakpoints files and links will be saved

required

Returns:

Type Description
Dict[int, dict]

Extract from aggregated_params

Raises:

Type Description
FedbiomedExperimentError

bad arguments type

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@staticmethod
@exp_exceptions
def save_aggregated_params(aggregated_params_init: dict, breakpoint_path: str) -> Dict[int, dict]:
    """Extract and format fields from aggregated_params that need to be saved in breakpoint.

    Creates link to the params file from the `breakpoint_path` and use them to reference the params files.

    Args:
        aggregated_params_init (dict): aggregated parameters
        breakpoint_path: path to the directory where breakpoints files and links will be saved

    Returns:
        Extract from `aggregated_params`

    Raises:
        FedbiomedExperimentError: bad arguments type
    """
    # check arguments type, though is should have been done before
    if not isinstance(aggregated_params_init, dict):
        msg = ErrorNumbers.FB413.value + ' - save failed. ' + \
            f'Bad type for aggregated params, should be `dict` not {type(aggregated_params_init)}'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)
    if not isinstance(breakpoint_path, str):
        msg = ErrorNumbers.FB413.value + ' - save failed. ' + \
            f'Bad type for breakpoint path, should be `str` not {type(breakpoint_path)}'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    aggregated_params = {}
    for round_, params_dict in aggregated_params_init.items():
        if not isinstance(params_dict, dict):
            msg = ErrorNumbers.FB413.value + ' - save failed. ' + \
                f'Bad type for aggregated params item {str(round_)}, ' + \
                f'should be `dict` not {type(params_dict)}'
            logger.critical(msg)
            raise FedbiomedExperimentError(msg)

        params_path = os.path.join(breakpoint_path, f"aggregated_params_{uuid.uuid4()}.mpk")
        Serializer.dump(params_dict['params'], params_path)
        aggregated_params[round_] = {'params_path': params_path}

    return aggregated_params
save_optimizer
save_optimizer(breakpoint_path)

Save the researcher-side Optimizer attached to this Experiment.

Parameters:

Name Type Description Default
breakpoint_path str

Path to the breakpoint folder.

required

Returns:

Type Description
Optional[str]

Path to the optimizer's save file, or None if no Optimizer is used.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def save_optimizer(self, breakpoint_path: str) -> Optional[str]:
    """Save the researcher-side Optimizer attached to this Experiment.

    Args:
        breakpoint_path: Path to the breakpoint folder.

    Returns:
        Path to the optimizer's save file, or None if no Optimizer is used.
    """
    # Case when no researcher optimizer is used.
    if self._agg_optimizer is None:
        return None
    # Case when an Optimizer is used: save its state and return the path.
    state = self._agg_optimizer.get_state()
    path = os.path.join(breakpoint_path, f"optimizer_{uuid.uuid4()}.mpk")
    Serializer.dump(state, path)
    return path
save_training_replies
save_training_replies()

Extracts a copy of training_replies and prepares it for saving in breakpoint

  • strip unwanted fields
  • structure as list/dict, so it can be saved with JSON

Returns:

Type Description
Dict[int, Dict[str, Dict[str, Any]]]

Extract from training_replies formatted for breakpoint

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
def save_training_replies(self) -> Dict[int, Dict[str, Dict[str, Any]]]:
    """Extracts a copy of `training_replies` and prepares it for saving in breakpoint

    - strip unwanted fields
    - structure as list/dict, so it can be saved with JSON

    Returns:
        Extract from `training_replies` formatted for breakpoint
    """
    converted_training_replies = copy.deepcopy(self.training_replies())
    for training_reply in converted_training_replies.values():
        # we want to strip some fields for the breakpoint
        for reply in training_reply.values():
            reply.pop('params', None)
    return converted_training_replies
set_agg_optimizer
set_agg_optimizer(agg_optimizer)

Sets the optional researcher optimizer.

Parameters:

Name Type Description Default
agg_optimizer Optional[Optimizer]

Optional fedbiomed Optimizer instance to be used so as to refine aggregated updates prior to applying them. If None, equivalent to using vanilla SGD with 1.0 learning rate.

required

Returns:

Type Description
Optional[Optimizer]

The optional researcher optimizer attached to this Experiment.

Raises:

Type Description
FedbiomedExperimentError

if optimizer is of unproper type.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_agg_optimizer(
    self,
    agg_optimizer: Optional[Optimizer],
) -> Optional[Optimizer]:
    """Sets the optional researcher optimizer.

    Args:
        agg_optimizer: Optional fedbiomed Optimizer instance to be
            used so as to refine aggregated updates prior to applying them.
            If None, equivalent to using vanilla SGD with 1.0 learning rate.

    Returns:
        The optional researcher optimizer attached to this Experiment.

    Raises:
        FedbiomedExperimentError: if `optimizer` is of unproper type.
    """
    if not (
        agg_optimizer is None or
        isinstance(agg_optimizer, Optimizer)
    ):
        raise FedbiomedExperimentError(
            f"{ErrorNumbers.FB410.value}: 'agg_optimizer' must be an "
            f"Optimizer instance or None, not {type(agg_optimizer)}."
        )
    self._agg_optimizer = agg_optimizer
    return self._agg_optimizer
set_aggregator
set_aggregator(aggregator=None)

Sets aggregator + verification on arguments type

Ensures consistency with the training data.

Parameters:

Name Type Description Default
aggregator Optional[Aggregator]

Object defining the method for aggregating local updates. Default to None (use FedAverage for aggregation)

None

Returns:

Type Description
Aggregator

aggregator (Aggregator)

Raises:

Type Description
FedbiomedExperimentError

bad aggregator type

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_aggregator(self, aggregator: Optional[Aggregator] = None) -> Aggregator:
    """Sets aggregator + verification on arguments type

    Ensures consistency with the training data.

    Args:
        aggregator: Object defining the method for aggregating local updates. Default to None
            (use `FedAverage` for aggregation)

    Returns:
        aggregator (Aggregator)

    Raises:
        FedbiomedExperimentError : bad aggregator type
    """

    if aggregator is None:
        # default aggregator
        self._aggregator = FedAverage()

    elif not isinstance(aggregator, Aggregator):

        msg = f"{ErrorNumbers.FB410.value}: aggregator is not an instance of Aggregator."
        logger.critical(msg)
        raise FedbiomedTypeError(msg)
    else:
        # at this point, `agregator` is an instance / inheriting of `Aggregator`
        self._aggregator = aggregator
    self.aggregator_args["aggregator_name"] = self._aggregator.aggregator_name
    # ensure consistency with federated dataset
    self._aggregator.set_fds(self._fds)

    return self._aggregator
set_retain_full_history
set_retain_full_history(retain_full_history_=True)

Sets the status of whether the full experiment history should be kept in memory.

Parameters:

Name Type Description Default
retain_full_history_ bool

whether to retain in memory the full history of node replies and aggregated params for the experiment. If False, only the last round's replies and aggregated params will be available. Defaults to True.

True

Returns:

Type Description

The status of whether the full experiment history should be kept in memory.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_retain_full_history(self, retain_full_history_: bool = True):
    """Sets the status of whether the full experiment history should be kept in memory.

    Args:
        retain_full_history_: whether to retain in memory the full history of node replies and aggregated params
            for the experiment. If False, only the last round's replies and aggregated params will be available.
            Defaults to True.

    Returns:
        The status of whether the full experiment history should be kept in memory.
    """
    if not isinstance(retain_full_history_, bool):
        msg = ErrorNumbers.FB410.value + f': retain_full_history should be a bool, instead got ' \
                                         f'{type(retain_full_history_)} '
        logger.critical(msg)
        raise FedbiomedTypeError(msg)
    self._retain_full_history = retain_full_history_
    return self._retain_full_history
set_round_limit
set_round_limit(round_limit)

Sets round_limit + verification on arguments type

Parameters:

Name Type Description Default
round_limit Union[int, None]

the maximum number of training rounds (nodes <-> central server) that should be executed for the experiment. None means that no limit is defined.

required

Returns:

Type Description
Union[int, None]

Round limit for experiment of federated learning

Raises:

Type Description
FedbiomedValueError

bad rounds type or value

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_round_limit(self, round_limit: Union[int, None]) -> Union[int, None]:
    """Sets `round_limit` + verification on arguments type

    Args:
        round_limit: the maximum number of training rounds (nodes <-> central server) that should be executed
            for the experiment. `None` means that no limit is defined.

    Returns:
        Round limit for experiment of federated learning

    Raises:
        FedbiomedValueError : bad rounds type or value
    """
    # at this point round_current exists and is an int >= 0

    if round_limit is None:
        # no limit for training rounds
        self._round_limit = None
    else:
        self._check_round_value_consistency(round_limit, "round_limit")
        if round_limit < self._round_current:
            # self._round_limit can't be less than current round
            msg = f'cannot set `round_limit` to less than the number of already run rounds ' \
                f'({self._round_current})'
            logger.critical(msg)
            raise FedbiomedValueError(msg)

        else:
            self._round_limit = round_limit

    # at this point self._round_limit is a Union[int, None]
    return self._round_limit
set_strategy
set_strategy(node_selection_strategy=None)

Sets for node_selection_strategy + verification on arguments type

Parameters:

Name Type Description Default
node_selection_strategy Optional[Strategy]

object defining how nodes are sampled at each round for training, and how non-responding nodes are managed. Defaults to None: - use DefaultStrategy if training_data is initialized - else strategy is None (cannot be initialized), experiment cannot be launched yet

None

Returns:

Type Description
Union[Strategy, None]

node selection strategy class

Raises:

Type Description
FedbiomedExperimentError

bad strategy type

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_strategy(
    self,
    node_selection_strategy: Optional[Strategy] = None
) -> Union[Strategy, None]:
    """Sets for `node_selection_strategy` + verification on arguments type

    Args:
        node_selection_strategy: object defining how nodes are sampled at each round for training, and
            how non-responding nodes are managed. Defaults to None:
            - use `DefaultStrategy` if training_data is initialized
            - else strategy is None (cannot be initialized), experiment cannot
              be launched yet

    Returns:
        node selection strategy class

    Raises:
        FedbiomedExperimentError : bad strategy type
    """
    if node_selection_strategy is None:
        # default node_selection_strategy
        self._node_selection_strategy = DefaultStrategy()
    elif not isinstance(node_selection_strategy, Strategy):

        msg = f"{ErrorNumbers.FB410.value}: wrong type for " \
              "node_selection_strategy {type(node_selection_strategy)} " \
              "it should be an instance of Strategy"
        logger.critical(msg)
        raise FedbiomedTypeError(msg)
    else:
        self._node_selection_strategy = node_selection_strategy
    # at this point self._node_selection_strategy is a Union[Strategy, None]
    return self._node_selection_strategy
set_tensorboard
set_tensorboard(tensorboard)

Sets the tensorboard flag

Parameters:

Name Type Description Default
tensorboard bool

If True tensorboard log files will be writen after receiving training feedbacks

required

Returns:

Type Description
bool

Status of tensorboard

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_tensorboard(self, tensorboard: bool) -> bool:
    """
    Sets the tensorboard flag

    Args:
        tensorboard: If `True` tensorboard log files will be writen after receiving training feedbacks

    Returns:
        Status of tensorboard
    """

    if isinstance(tensorboard, bool):
        self._tensorboard = tensorboard
        self._monitor.set_tensorboard(tensorboard)
    else:
        msg = ErrorNumbers.FB410.value + f' `tensorboard` : {type(tensorboard)}'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    return self._tensorboard
set_test_metric
set_test_metric(metric, **metric_args)

Sets a metric for federated model validation

Parameters:

Name Type Description Default
metric Union[MetricTypes, str, None]

A class as an instance of MetricTypes. str for referring one of metric which provided as attributes in MetricTypes. None, if it isn't declared yet.

required
**metric_args dict

A dictionary that contains arguments for metric function. Arguments should be compatible with corresponding metrics in sklearn.metrics.

{}

Returns:

Type Description
Tuple[Union[str, None], Dict[str, Any]]

Metric and metric args as tuple

Raises:

Type Description
FedbiomedExperimentError

Invalid type for metric argument

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_test_metric(self, metric: Union[MetricTypes, str, None], **metric_args: dict) -> \
        Tuple[Union[str, None], Dict[str, Any]]:
    """ Sets a metric for federated model validation

    Args:
        metric: A class as an instance of [`MetricTypes`][fedbiomed.common.metrics.MetricTypes]. [`str`][str] for
            referring one of  metric which provided as attributes in [`MetricTypes`]
            [fedbiomed.common.metrics.MetricTypes]. None, if it isn't declared yet.
        **metric_args: A dictionary that contains arguments for metric function. Arguments
            should be compatible with corresponding metrics in [`sklearn.metrics`][sklearn.metrics].

    Returns:
        Metric and  metric args as tuple

    Raises:
        FedbiomedExperimentError: Invalid type for `metric` argument
    """
    self._training_args['test_metric'] = metric

    # using **metric_args, we know `test_metric_args` is a Dict[str, Any]
    self._training_args['test_metric_args'] = metric_args
    return metric, metric_args
set_test_on_global_updates
set_test_on_global_updates(flag=True)

Setter for test_on_global_updates, that indicates whether to perform a validation on the federated model updates on the node side before training model locally where aggregated model parameters are received.

Parameters:

Name Type Description Default
flag bool

whether to perform model validation on global updates. Defaults to True.

True

Returns:

Type Description
bool

Value of the flag test_on_global_updates.

Raises:

Type Description
FedbiomedExperimentError

bad flag type

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_test_on_global_updates(self, flag: bool = True) -> bool:
    """
    Setter for test_on_global_updates, that indicates whether to  perform a validation on the federated model
    updates on the node side before training model locally where aggregated model parameters are received.

    Args:
        flag (bool, optional): whether to perform model validation on global updates. Defaults to True.

    Returns:
        Value of the flag `test_on_global_updates`.

    Raises:
        FedbiomedExperimentError : bad flag type
    """
    self._training_args['test_on_global_updates'] = flag
    return self._training_args['test_on_global_updates']
set_test_on_local_updates
set_test_on_local_updates(flag=True)

Setter for test_on_local_updates, that indicates whether to perform a validation on the federated model on the node side where model parameters are updated locally after training in each node.

Parameters:

Name Type Description Default
flag bool

whether to perform model validation on local updates. Defaults to True.

True

Returns:

Type Description
bool

value of the flag test_on_local_updates

Raises:

Type Description
FedbiomedExperimentError

bad flag type

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_test_on_local_updates(self, flag: bool = True) -> bool:
    """
    Setter for `test_on_local_updates`, that indicates whether to perform a validation on the federated model on the
    node side where model parameters are updated locally after training in each node.

    Args:
        flag (bool, optional): whether to perform model validation on local updates. Defaults to True.

    Returns:
        value of the flag `test_on_local_updates`

    Raises:
        FedbiomedExperimentError: bad flag type
    """
    self._training_args['test_on_local_updates'] = flag
    return self._training_args['test_on_local_updates']
set_test_ratio
set_test_ratio(ratio)

Sets validation ratio for model validation.

When setting test_ratio, nodes will allocate (1 - test_ratio) fraction of data for training and the remaining for validating model. This could be useful for validating the model, once every round, as well as controlling overfitting, doing early stopping, ....

Parameters:

Name Type Description Default
ratio float

validation ratio. Must be within interval [0,1].

required

Returns:

Type Description
float

Validation ratio that is set

Raises:

Type Description
FedbiomedExperimentError

bad data type

FedbiomedExperimentError

ratio is not within interval [0, 1]

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_test_ratio(self, ratio: float) -> float:
    """ Sets validation ratio for model validation.

    When setting test_ratio, nodes will allocate (1 - `test_ratio`) fraction of data for training and the
    remaining for validating model. This could be useful for validating the model, once every round, as well as
    controlling overfitting, doing early stopping, ....

    Args:
        ratio: validation ratio. Must be within interval [0,1].

    Returns:
        Validation ratio that is set

    Raises:
        FedbiomedExperimentError: bad data type
        FedbiomedExperimentError: ratio is not within interval [0, 1]
    """
    self._training_args['test_ratio'] = ratio
    return ratio
set_training_data
set_training_data(training_data, from_tags=False)

Sets training data for federated training + verification on arguments type

See FederatedWorkflow.set_training_data for more information.

Ensures consistency also with the Experiment's aggregator and node state agent

Setting to None forfeits consistency checks

Setting training_data to None does not trigger consistency checks, and may therefore leave the class in an inconsistent state.

Returns:

Type Description
Union[FederatedDataSet, None]

Dataset metadata

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def set_training_data(
        self,
        training_data: Union[FederatedDataSet, dict, None],
        from_tags: bool = False) -> \
        Union[FederatedDataSet, None]:
    """Sets training data for federated training + verification on arguments type

    See
    [`FederatedWorkflow.set_training_data`][fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_training_data]
    for more information.

    Ensures consistency also with the Experiment's aggregator and node state agent

    !!! warning "Setting to None forfeits consistency checks"
        Setting training_data to None does not trigger consistency checks, and may therefore leave the class in an
        inconsistent state.

    Returns:
        Dataset metadata
    """
    super().set_training_data(training_data, from_tags)
    # Below: Experiment-specific operations for consistency
    if self._aggregator is not None and self._fds is not None:
        # update the aggregator's training data
        self._aggregator.set_fds(self._fds)
    if self._node_state_agent is not None and self._fds is not None:
        # update the node state agent (member of FederatedWorkflow)
        self._node_state_agent.update_node_states(self.all_federation_nodes())
    return self._fds
strategy
strategy()

Retrieves the class that represents the node selection strategy.

Please see also set_strategy to set or update node selection strategy.

Returns:

Type Description
Union[Strategy, None]

A class or object as an instance of Strategy. None if it is not declared yet. It means that node selection strategy will be DefaultStrategy.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def strategy(self) -> Union[Strategy, None]:
    """Retrieves the class that represents the node selection strategy.

    Please see also [`set_strategy`][fedbiomed.researcher.federated_workflows.Experiment.set_strategy]
    to set or update node selection strategy.

    Returns:
        A class or object as an instance of [`Strategy`][fedbiomed.researcher.strategies.Strategy]. `None` if
            it is not declared yet. It means that node selection strategy will be
            [`DefaultStrategy`][fedbiomed.researcher.strategies.DefaultStrategy].
    """
    return self._node_selection_strategy
test_metric
test_metric()

Retrieves the metric for validation routine.

Please see also set_test_metric to change/set test_metric

Returns:

Type Description
Union[MetricTypes, str, None]

A class as an instance of MetricTypes. str for referring one of metric which provided as attributes in MetricTypes. None, if it isn't declared yet.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def test_metric(self) -> Union[MetricTypes, str, None]:
    """Retrieves the metric for validation routine.

    Please see also [`set_test_metric`][fedbiomed.researcher.federated_workflows.Experiment.set_test_metric]
        to change/set `test_metric`

    Returns:
        A class as an instance of [`MetricTypes`][fedbiomed.common.metrics.MetricTypes]. [`str`][str] for referring
            one of  metric which provided as attributes in [`MetricTypes`][fedbiomed.common.metrics.MetricTypes].
            None, if it isn't declared yet.
    """

    return self._training_args['test_metric']
test_metric_args
test_metric_args()

Retrieves the metric argument for the metric function that is going to be used.

Please see also set_test_metric to change/set test_metric and get more information on the arguments can be used.

Returns:

Type Description
Dict[str, Any]

A dictionary that contains arguments for metric function. See set_test_metric

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def test_metric_args(self) -> Dict[str, Any]:
    """Retrieves the metric argument for the metric function that is going to be used.

    Please see also [`set_test_metric`][fedbiomed.researcher.federated_workflows.Experiment.set_test_metric]
    to change/set `test_metric` and get more information on the arguments can be used.

    Returns:
        A dictionary that contains arguments for metric function. See [`set_test_metric`]
            [fedbiomed.researcher.federated_workflows.Experiment.set_test_metric]
    """
    return self._training_args['test_metric_args']
test_on_global_updates
test_on_global_updates()

Retrieves the status of whether validation will be performed on globally updated (aggregated) parameters by the nodes at the beginning of each round.

Please see also set_test_on_global_updates.

Returns:

Type Description
bool

True, if validation is active on globally updated (aggregated) parameters. False for vice versa.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def test_on_global_updates(self) -> bool:
    """ Retrieves the status of whether validation will be performed on globally updated (aggregated)
    parameters by the nodes at the beginning of each round.

    Please see also [`set_test_on_global_updates`]
    [fedbiomed.researcher.federated_workflows.Experiment.set_test_on_global_updates].

    Returns:
        True, if validation is active on globally updated (aggregated) parameters. False for vice versa.
    """
    return self._training_args['test_on_global_updates']
test_on_local_updates
test_on_local_updates()

Retrieves the status of whether validation will be performed on locally updated parameters by the nodes at the end of each round.

Please see also set_test_on_local_updates.

Returns:

Type Description
bool

True, if validation is active on locally updated parameters. False for vice versa.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def test_on_local_updates(self) -> bool:
    """Retrieves the status of whether validation will be performed on locally updated parameters by
    the nodes at the end of each round.

    Please see also
        [`set_test_on_local_updates`][fedbiomed.researcher.federated_workflows.Experiment.set_test_on_local_updates].

    Returns:
        True, if validation is active on locally updated parameters. False for vice versa.
    """

    return self._training_args['test_on_local_updates']
test_ratio
test_ratio()

Retrieves the ratio for validation partition of entire dataset.

Please see also set_test_ratio to change/set test_ratio

Returns:

Type Description
float

The ratio for validation part, 1 - test_ratio is ratio for training set.

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def test_ratio(self) -> float:
    """Retrieves the ratio for validation partition of entire dataset.

    Please see also [`set_test_ratio`][fedbiomed.researcher.federated_workflows.Experiment.set_test_ratio] to
        change/set `test_ratio`

    Returns:
        The ratio for validation part, `1 - test_ratio` is ratio for training set.
    """

    return self._training_args['test_ratio']
training_replies
training_replies()

Retrieves training replies of each round of training.

Training replies contains timing statistics and the files parth/URLs that has been received after each round.

Returns:

Type Description
Union[dict, None]

Dictionary of training replies with format {round (int) : replies (dict)}

Source code in fedbiomed/researcher/federated_workflows/_experiment.py
@exp_exceptions
def training_replies(self) -> Union[dict, None]:
    """Retrieves training replies of each round of training.

    Training replies contains timing statistics and the files parth/URLs that has been received after each round.

    Returns:
        Dictionary of training replies with format {round (int) : replies (dict)}
    """

    return self._training_replies

FederatedWorkflow

FederatedWorkflow(tags=None, nodes=None, training_data=None, experimentation_folder=None, secagg=False, save_breakpoints=False)

Bases: ABC

A FederatedWorkflow is the abstract entry point for the researcher to orchestrate both local and remote operations.

The FederatedWorkflow is an abstract base class from which the actual classes used by the researcher must inherit. It manages the life-cycle of:

  • the training arguments
  • secure aggregation
  • the node state agent

Additionally, it provides the basis for the breakpoint functionality, and manages some backend functionalities such as the temporary directory, the experiment ID, etc...

The attributes training_data and tags are co-dependent. Attempting to modify one of those may result in side effects modifying the other, according to the following rules: - modifying tags if training data is not None will reset the training data based on the new tags - modifying the training data using a FederatedDataset object or a dict will set tags to None

Parameters:

Name Type Description Default
tags Optional[List[str] | str]

list of string with data tags or string with one data tag. Empty list of tags ([]) means any dataset is accepted, it is different from None (tags not set, cannot search for training_data yet).

None
nodes Optional[List[str]]

list of node_ids to filter the nodes to be involved in the experiment. Defaults to None (no filtering).

None
training_data Union[FederatedDataSet, dict, None]
  • If it is a FederatedDataSet object, use this value as training_data.
  • else if it is a dict, create and use a FederatedDataSet object from the dict and use this value as training_data. The dict should use node ids as keys, values being list of dicts (each dict representing a dataset on a node).
  • else if it is None (no training data provided)
  • if tags is not None, set training_data by searching for datasets with a query to the nodes using tags and nodes
  • if tags is None, set training_data to None (no training_data set yet, experiment is not fully initialized and cannot be launched) Defaults to None (query nodes for dataset if tags is not None, set training_data to None else)
None
save_breakpoints bool

whether to save breakpoints or not after each training round. Breakpoints can be used for resuming a crashed experiment.

False
experimentation_folder Union[str, None]

choose a specific name for the folder where experimentation result files and breakpoints are stored. This should just contain the name for the folder not a path. The name is used as a subdirectory of environ[EXPERIMENTS_DIR]). Defaults to None (auto-choose a folder name) - Caveat : if using a specific name this experimentation will not be automatically detected as the last experimentation by load_breakpoint - Caveat : do not use a experimentation_folder name finishing with numbers ([0-9]+) as this would confuse the last experimentation detection heuristic by load_breakpoint.

None
secagg Union[bool, SecureAggregation]

whether to setup a secure aggregation context for this experiment, and use it to send encrypted updates from nodes to researcher. Defaults to False

False
Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def __init__(
    self,
    tags: Optional[List[str] | str] = None,
    nodes: Optional[List[str]] = None,
    training_data: Union[FederatedDataSet, dict, None] = None,
    experimentation_folder: Union[str, None] = None,
    secagg: Union[bool, SecureAggregation] = False,
    save_breakpoints: bool = False,
) -> None:
    """Constructor of the class.

    Args:
        tags: list of string with data tags or string with one data tag. Empty list of
            tags ([]) means any dataset is accepted, it is different from None
            (tags not set, cannot search for training_data yet).

        nodes: list of node_ids to filter the nodes to be involved in the experiment.
            Defaults to None (no filtering).

        training_data:
            * If it is a FederatedDataSet object, use this value as training_data.
            * else if it is a dict, create and use a FederatedDataSet object
                from the dict and use this value as training_data. The dict should use
                node ids as keys, values being list of dicts (each dict representing a
                dataset on a node).
            * else if it is None (no training data provided)
              - if `tags` is not None, set training_data by
                searching for datasets with a query to the nodes using `tags` and `nodes`
              - if `tags` is None, set training_data to None (no training_data set yet,
                experiment is not fully initialized and cannot be launched)
            Defaults to None (query nodes for dataset if `tags` is not None, set training_data
            to None else)
        save_breakpoints: whether to save breakpoints or not after each training
            round. Breakpoints can be used for resuming a crashed experiment.

        experimentation_folder: choose a specific name for the folder
            where experimentation result files and breakpoints are stored. This
            should just contain the name for the folder not a path. The name is used
            as a subdirectory of `environ[EXPERIMENTS_DIR])`. Defaults to None
            (auto-choose a folder name)
            - Caveat : if using a specific name this experimentation will not be
                automatically detected as the last experimentation by `load_breakpoint`
            - Caveat : do not use a `experimentation_folder` name finishing
                with numbers ([0-9]+) as this would confuse the last experimentation
                detection heuristic by `load_breakpoint`.
        secagg: whether to setup a secure aggregation context for this experiment, and
            use it to send encrypted updates from nodes to researcher.
            Defaults to `False`
    """
    # predefine all class variables, so no need to write try/except
    # block each time we use it
    self._fds: Optional[FederatedDataSet] = None  # dataset metadata from the full federation
    self._reqs: Requests = Requests()
    self._nodes_filter: Optional[List[str]] = None  # researcher-defined nodes filter
    self._tags: Optional[List[str]] = None
    self._experimentation_folder: Optional[str] = None
    self._secagg: Union[SecureAggregation, bool] = False
    self._save_breakpoints: Optional[bool] = None
    self._node_state_agent: Optional[NodeStateAgent] = None
    self._researcher_id: str = environ['RESEARCHER_ID']
    self._experiment_id: str = EXPERIMENT_PREFIX + str(uuid.uuid4())  # creating a unique experiment id

    # set internal members from constructor arguments
    self.set_secagg(secagg)

    # TODO: Manage tags within the FederatedDataset to avoid conflicts
    if training_data is not None and tags is not None:
        msg = f"{ErrorNumbers.FB410.value}: Can not set `training_data` and `tags` at the " \
            "same time. Please provide only `training_data`, or tags to search for " \
            "training data."
        logger.critical(msg)
        raise FedbiomedValueError(msg)

    # Set tags if it tags is not None
    if tags:
        self.set_tags(tags)

    if training_data:
        self.set_training_data(training_data)

    self.set_nodes(nodes)
    self.set_save_breakpoints(save_breakpoints)

    self.set_experimentation_folder(experimentation_folder)
    self._node_state_agent = NodeStateAgent(list(self._fds.data().keys())
                                            if self._fds and self._fds.data() else [])

Attributes

id property
id

Retrieves the unique experiment identifier.

secagg property
secagg

Gets secagg object SecureAggregation

Returns:

Type Description
SecureAggregation

Secure aggregation object.

Functions

all_federation_nodes
all_federation_nodes()

Returns all the node ids in the federation

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def all_federation_nodes(self) -> List[str]:
    """Returns all the node ids in the federation"""
    return list(self._fds.data().keys()) if self._fds is not None else []
breakpoint
breakpoint(state, bkpt_number)

Saves breakpoint with the state of the workflow.

The following attributes will be saved:

  • tags
  • experimentation_folder
  • training_data
  • training_args
  • secagg
  • node_state

Raises:

Type Description
FedbiomedExperimentError

experiment not fully defined, experiment did not run any round yet, or error when saving breakpoint

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def breakpoint(self,
               state: Dict,
               bkpt_number: int) -> None:
    """
    Saves breakpoint with the state of the workflow.

    The following attributes will be saved:

      - tags
      - experimentation_folder
      - training_data
      - training_args
      - secagg
      - node_state

    Raises:
        FedbiomedExperimentError: experiment not fully defined, experiment did not run any round
            yet, or error when saving breakpoint
    """
    state.update({
        'id': self._experiment_id,
        'breakpoint_version': str(__breakpoints_version__),
        'training_data': self._fds.data(),
        'experimentation_folder': self._experimentation_folder,
        'tags': self._tags,
        'nodes': self._nodes_filter,
        'secagg': self._secagg.save_state_breakpoint(),
        'node_state': self._node_state_agent.save_state_breakpoint()
    })

    # save state into a json file
    breakpoint_path, breakpoint_file_name = \
        choose_bkpt_file(self._experimentation_folder, bkpt_number - 1)
    breakpoint_file_path = os.path.join(breakpoint_path, breakpoint_file_name)
    try:
        with open(breakpoint_file_path, 'w', encoding="UTF-8") as bkpt:
            json.dump(state, bkpt)
        logger.info(f"breakpoint number {bkpt_number - 1} saved at " +
                    os.path.dirname(breakpoint_file_path))
    except (OSError, PermissionError, ValueError, TypeError, RecursionError) as e:
        # - OSError: heuristic for catching open() and write() errors
        # - see json.dump() documentation for documented errors for this call
        msg = ErrorNumbers.FB413.value + f' - save failed with message {str(e)}'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg) from e
experimentation_folder
experimentation_folder()

Retrieves the folder name where experiment data/result are saved.

Please see also set_experimentation_folder

Returns:

Type Description
str

File name where experiment related files are saved

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def experimentation_folder(self) -> str:
    """Retrieves the folder name where experiment data/result are saved.

    Please see also [`set_experimentation_folder`]
    [fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_experimentation_folder]

    Returns:
        File name where experiment related files are saved
    """

    return self._experimentation_folder
experimentation_path
experimentation_path()

Retrieves the file path where experimentation folder is located and experiment related files are saved.

Returns:

Type Description
str

Experiment directory where all experiment related files are saved

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def experimentation_path(self) -> str:
    """Retrieves the file path where experimentation folder is located and experiment related files are saved.

    Returns:
        Experiment directory where all experiment related files are saved
    """

    return os.path.join(environ['EXPERIMENTS_DIR'], self._experimentation_folder)
filtered_federation_nodes
filtered_federation_nodes()

Returns the node ids in the federation after filtering with the nodes filter

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def filtered_federation_nodes(self) -> List[str]:
    """Returns the node ids in the federation after filtering with the nodes filter"""
    if self._nodes_filter is not None:
        return [node for node in self.all_federation_nodes() if node in self._nodes_filter]
    else:
        return self.all_federation_nodes()
info
info(info=None, missing='')

Prints out the information about the current status of the experiment.

Lists all the parameters/arguments of the experiment and informs whether the experiment can be run.

Parameters:

Name Type Description Default
info Dict[str, List[str]]

Dictionary of sub-classes relevant attributes status that will be completed with some additional attributes status defined in this class. Defaults to None (no entries of sub-classes available or of importance).

None
missing_object_to_check

dictionary mapping sub-classes attributes to attribute names, that may be needed to fully run the object. Defaults to None (no check will be performed).

required

Returns:

Type Description
Dict[str, List[str]]

dictionary containing all pieces of information, with 2 entries: Arguments mapping a list

str

of all argument, and Values mapping a list containing all the values.

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def info(self,
         info: Dict[str, List[str]] = None,
         missing: str = '') -> Tuple[Dict[str, List[str]], str]:
    """Prints out the information about the current status of the experiment.

    Lists  all the parameters/arguments of the experiment and informs whether the experiment can be run.

    Args:
        info: Dictionary of sub-classes relevant attributes status that will be completed with some additional
            attributes status defined in this class. Defaults to None (no entries of sub-classes available or
            of importance).
        missing_object_to_check: dictionary mapping sub-classes attributes to attribute names, that may be
            needed to fully run the object. Defaults to None (no check will be performed).

    Returns:
        dictionary containing all pieces of information, with 2 entries: `Arguments` mapping a list
        of all argument, and `Values` mapping a list containing all the values.
    """
    if info is None:
        info = self._create_default_info_structure()
    info['Arguments'].extend([
        'Tags',
        'Nodes filter',
        'Training Data',
        'Experiment folder',
        'Experiment Path',
        'Secure Aggregation'
    ])

    info['Values'].extend(['\n'.join(findall('.{1,60}', str(e))) for e in [
        self._tags,
        self._nodes_filter,
        self._fds,
        self._experimentation_folder,
        self.experimentation_path(),
        f'- Using: {self._secagg}\n- Active: {self._secagg.active}'
    ]])

    # printing list of items set / not set yet
    print(tabulate.tabulate(info, headers='keys'))

    if missing:
        print("\nWarning: Object not fully defined, missing"
              f": \n{missing}")
    else:
        print(f"{self.__class__.__name__} can be run now (fully defined)")
    return info, missing
load_breakpoint classmethod
load_breakpoint(breakpoint_folder_path=None)

Loads breakpoint (provided a breakpoint has been saved) so the workflow can be resumed.

Parameters:

Name Type Description Default
breakpoint_folder_path Optional[str]

path of the breakpoint folder. Path can be absolute or relative eg: "var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads the latest breakpoint of the latest workflow. Defaults to None.

None

Returns:

Type Description
Tuple[TFederatedWorkflow, dict]

Tuple contaning reinitialized workflow object and the saved state as a dictionary

Raises:

Type Description
FedbiomedExperimentError

bad argument type, error when reading breakpoint or bad loaded breakpoint content (corrupted)

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@classmethod
@exp_exceptions
def load_breakpoint(
    cls,
    breakpoint_folder_path: Optional[str] = None
) -> Tuple[TFederatedWorkflow, dict]:
    """
    Loads breakpoint (provided a breakpoint has been saved)
    so the workflow can be resumed.

    Args:
      breakpoint_folder_path: path of the breakpoint folder. Path can be absolute
        or relative eg: "var/experiments/Experiment_xxxx/breakpoints_xxxx".
        If None, loads the latest breakpoint of the latest workflow. Defaults to None.

    Returns:
        Tuple contaning reinitialized workflow object and the saved state as a dictionary

    Raises:
        FedbiomedExperimentError: bad argument type, error when reading breakpoint or
            bad loaded breakpoint content (corrupted)
    """
    # check parameters type
    if not isinstance(breakpoint_folder_path, str) and breakpoint_folder_path is not None:
        msg = (
            f"{ErrorNumbers.FB413.value}: load failed, `breakpoint_folder_path`"
            f" has bad type {type(breakpoint_folder_path)}"
        )
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    # get breakpoint folder path (if it is None) and state file
    breakpoint_folder_path, state_file = find_breakpoint_path(breakpoint_folder_path)
    breakpoint_folder_path = os.path.abspath(breakpoint_folder_path)

    try:
        path = os.path.join(breakpoint_folder_path, state_file)
        with open(path, "r", encoding="utf-8") as file:
            saved_state = json.load(file)
    except (json.JSONDecodeError, OSError) as exc:
        # OSError: heuristic for catching file access issues
        msg = (
            f"{ErrorNumbers.FB413.value}: load failed,"
            f" reading breakpoint file failed with message {exc}"
        )
        logger.critical(msg)
        raise FedbiomedExperimentError(msg) from exc
    if not isinstance(saved_state, dict):
        msg = (
            f"{ErrorNumbers.FB413.value}: load failed, breakpoint file seems"
            f" corrupted. Type should be `dict` not {type(saved_state)}"
        )
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    # First, check version of breakpoints
    bkpt_version = saved_state.get('breakpoint_version', __default_version__)
    raise_for_version_compatibility(bkpt_version, __breakpoints_version__,
                                    f"{ErrorNumbers.FB413.value}: Breakpoint "
                                    "file was generated with version %s "
                                    f"which is incompatible with the current version %s.")

    # retrieve breakpoint training data
    bkpt_fds = saved_state.get('training_data')
    bkpt_fds = FederatedDataSet(bkpt_fds)

    # initializing experiment
    loaded_exp = cls()
    loaded_exp._experiment_id = saved_state.get('id')
    loaded_exp.set_training_data(bkpt_fds)
    loaded_exp._tags = saved_state.get('tags')
    loaded_exp.set_nodes(saved_state.get('nodes'))
    loaded_exp.set_experimentation_folder(saved_state.get('experimentation_folder'))
    saved_state_secagg = saved_state.get('secagg')
    loaded_exp.set_secagg(eval(saved_state_secagg['class']).load_state_breakpoint(saved_state_secagg))
    loaded_exp._node_state_agent.load_state_breakpoint(saved_state.get('node_state'))
    loaded_exp.set_save_breakpoints(True)

    return loaded_exp, saved_state
nodes
nodes()

Retrieves the nodes filter for the execution of the workflow.

If nodes is None, then no filtering is applied, and all the nodes in the federation participate in the execution of the workflow. If nodes is not None, then the semantics of the nodes filter are as follows:

node_id in nodes filter node_id in training data outcome
yes yes this node is part of the federation, and will take part in the execution the workflow
yes no ignored
no yes this node is part of the federation but will not be considered for executing the workflow
no no ignored

Please see set_nodes to set nodes.

Returns:

Type Description
Union[List[str], None]

The list of nodes to keep for workflow execution, or None if no filtering is applied

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def nodes(self) -> Union[List[str], None]:
    """Retrieves the nodes filter for the execution of the workflow.

    If nodes is None, then no filtering is applied, and all the nodes in the federation participate in the
    execution of the workflow.
    If nodes is not None, then the semantics of the nodes filter are as follows:

    | node_id in nodes filter | node_id in training data | outcome |
    | --- | --- | --- |
    | yes | yes | this node is part of the federation, and will take part in the execution the workflow |
    | yes | no | ignored |
    | no | yes | this node is part of the federation but will not be considered for executing the workflow |
    | no | no | ignored |

    Please see [`set_nodes`][fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_nodes] to set `nodes`.

    Returns:
        The list of nodes to keep for workflow execution, or None if no filtering is applied
    """
    return self._nodes_filter
run abstractmethod
run()

Run the experiment

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@abstractmethod
def run(self) -> int:
    """Run the experiment"""
save_breakpoints
save_breakpoints()

Retrieves the status of saving breakpoint after each round of training.

Returns:

Type Description
bool

True, If saving breakpoint is active. False, vice versa.

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def save_breakpoints(self) -> bool:
    """Retrieves the status of saving breakpoint after each round of training.

    Returns:
        `True`, If saving breakpoint is active. `False`, vice versa.
    """

    return self._save_breakpoints
secagg_setup
secagg_setup(sampled_nodes)

Retrieves the secagg arguments for setup.

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
def secagg_setup(self, sampled_nodes: List[str]) -> Dict:
    """Retrieves the secagg arguments for setup."""
    secagg_arguments = {}
    if self._secagg.active:

        if not self._secagg.setup(parties=[environ["ID"]] + sampled_nodes, experiment_id=self._experiment_id):
            msg = f"{ErrorNumbers.FB417.value}: Could not setup secure aggregation crypto context."
            logger.critical(msg)
            raise FedbiomedSecureAggregationError(msg)
        secagg_arguments = self._secagg.train_arguments()
    return secagg_arguments
set_experimentation_folder
set_experimentation_folder(experimentation_folder=None)

Sets experimentation_folder, the folder name where experiment data/result are saved.

Parameters:

Name Type Description Default
experimentation_folder Optional[str]

File name where experiment related files are saved

None

Returns:

Type Description
str

The path to experimentation folder.

Raises:

Type Description
FedbiomedExperimentError

bad experimentation_folder type

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_experimentation_folder(self, experimentation_folder: Optional[str] = None) -> str:
    """Sets `experimentation_folder`, the folder name where experiment data/result are saved.

    Args:
        experimentation_folder: File name where experiment related files are saved

    Returns:
        The path to experimentation folder.

    Raises:
        FedbiomedExperimentError : bad `experimentation_folder` type
    """
    if experimentation_folder is None:
        self._experimentation_folder = create_exp_folder()
    elif isinstance(experimentation_folder, str):
        sanitized_folder = sanitize_filename(experimentation_folder, platform='auto')
        self._experimentation_folder = create_exp_folder(sanitized_folder)
        if sanitized_folder != experimentation_folder:
            logger.warning(f'`experimentation_folder` was sanitized from '
                           f'{experimentation_folder} to {sanitized_folder}')
    else:
        msg = ErrorNumbers.FB410.value + \
            f' `experimentation_folder` : {type(experimentation_folder)}'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

        # at this point self._experimentation_folder is a str valid for a foldername

    return self._experimentation_folder
set_nodes
set_nodes(nodes)

Sets the nodes filter + verifications on argument type

Parameters:

Name Type Description Default
nodes Union[List[str], None]

List of node_ids to filter the nodes to be involved in the experiment.

required

Returns:

Type Description
Union[List[str], None]

List of nodes that are set. None, if the argument nodes is None.

Raises:

Type Description
FedbiomedTypeError

Bad nodes type

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_nodes(self, nodes: Union[List[str], None]) -> Union[List[str], None]:
    """Sets the nodes filter + verifications on argument type

    Args:
        nodes: List of node_ids to filter the nodes to be involved in the experiment.

    Returns:
        List of nodes that are set. None, if the argument `nodes` is None.

    Raises:
        FedbiomedTypeError : Bad nodes type
    """
    # immediately exit if setting nodes to None
    if nodes is None:
        self._nodes_filter = None
    # set nodes
    elif isinstance(nodes, list):
        if not all(map(lambda node: isinstance(node, str), nodes)):
            msg = ErrorNumbers.FB410.value + ' `nodes` argument must be a list of strings or None.'
            logger.critical(msg)
            raise FedbiomedTypeError(msg)
        self._nodes_filter = nodes
    else:
        msg = ErrorNumbers.FB410.value + ' `nodes` argument must be a list of strings or None.'
        logger.critical(msg)
        raise FedbiomedTypeError(msg)
    return self._nodes_filter
set_save_breakpoints
set_save_breakpoints(save_breakpoints)

Setter for save_breakpoints + verification on arguments type

Parameters:

Name Type Description Default
save_breakpoints bool

whether to save breakpoints or not after each training round. Breakpoints can be used for resuming a crashed experiment.

required

Returns:

Type Description
bool

Status of saving breakpoints

Raises:

Type Description
FedbiomedExperimentError

bad save_breakpoints type

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_save_breakpoints(self, save_breakpoints: bool) -> bool:
    """ Setter for save_breakpoints + verification on arguments type

    Args:
        save_breakpoints (bool): whether to save breakpoints or
            not after each training round. Breakpoints can be used for resuming
            a crashed experiment.

    Returns:
        Status of saving breakpoints

    Raises:
        FedbiomedExperimentError: bad save_breakpoints type
    """
    if isinstance(save_breakpoints, bool):
        self._save_breakpoints = save_breakpoints
        # no warning if done during experiment, we may change breakpoint policy at any time
    else:
        msg = ErrorNumbers.FB410.value + f' `save_breakpoints` : {type(save_breakpoints)}'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    return self._save_breakpoints
set_secagg
set_secagg(secagg, scheme=SecureAggregationSchemes.LOM)

Sets secure aggregation status and scheme

Build secure aggregation controller/instance or sets given secure aggregation class

Parameters:

Name Type Description Default
secagg Union[bool, SecureAggregation]

If True activates training request with secure aggregation by building SecureAggregation class with default arguments. Or if argument is an instance of SecureAggregation it does only assignment. Secure aggregation activation and configuration depends on the instance provided.

required
scheme SecureAggregationSchemes

Secure aggregation scheme to use. Ig a SecureAggregation object is provided, the argument is not used, as the scheme comes from the object. Defaults is SecureAggregationSchemes.LOM.

LOM

Returns:

Type Description

Secure aggregation controller instance.

Raises:

Type Description
FedbiomedExperimentError

bad argument type or value

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_secagg(
        self,
        secagg: Union[bool, SecureAggregation],
        scheme: SecureAggregationSchemes = SecureAggregationSchemes.LOM):
    """Sets secure aggregation status and scheme

    Build secure aggregation controller/instance or sets given
    secure aggregation class

    Args:
        secagg: If True activates training request with secure aggregation by building
            [`SecureAggregation`][fedbiomed.researcher.secagg.SecureAggregation] class
            with default arguments. Or if argument is an instance of `SecureAggregation`
            it does only assignment. Secure aggregation activation and configuration
            depends on the instance provided.
        scheme: Secure aggregation scheme to use. Ig a `SecureAggregation` object is provided,
            the argument is not used, as the scheme comes from the object. Defaults is
            SecureAggregationSchemes.LOM.

    Returns:
        Secure aggregation controller instance.

    Raises:
        FedbiomedExperimentError: bad argument type or value
    """
    if not isinstance(scheme, SecureAggregationSchemes):
        raise FedbiomedExperimentError(
            f"{ErrorNumbers.FB410.value}: Expected `scheme` argument "
            "`SecureAggregationSchemes`, but got {type(scheme)}")

    if isinstance(secagg, bool):
        self._secagg = SecureAggregation(scheme=scheme, active=secagg)
    elif isinstance(secagg, SecureAggregation):
        self._secagg = secagg
    else:
        raise FedbiomedExperimentError(
            f"{ErrorNumbers.FB410.value}: Expected `secagg` argument bool or "
            f"`SecureAggregation` but got {type(secagg)}")

    return self._secagg
set_tags
set_tags(tags)

Sets tags and verification on argument type

Setting tags updates also training data by executing [set_training_data].[fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_training_data] method.

Parameters:

Name Type Description Default
tags Union[List[str], str]

List of string with data tags or string with one data tag. Empty list of tags ([]) means any dataset is accepted, it is different from None (tags not set, cannot search for training_data yet).

required

Returns: List of tags that are set.

Raises:

Type Description
FedbiomedTypeError

Bad tags type

FedbiomedValueError

Some issue prevented resetting the training data after an inconsistency was detected

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_tags(
    self,
    tags: Union[List[str], str],
) -> List[str]:
    """Sets tags and verification on argument type

    Setting tags updates also training data by executing
    [`set_training_data`].[fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_training_data]
    method.

    Args:
        tags: List of string with data tags or string with one data tag. Empty list
            of tags ([]) means any dataset is accepted, it is different from None
            (tags not set, cannot search for training_data yet).
    Returns:
        List of tags that are set.

    Raises:
        FedbiomedTypeError: Bad tags type
        FedbiomedValueError: Some issue prevented resetting the training
            data after an inconsistency was detected
    """
    # preprocess the tags argument to correct typing
    if not tags:
        msg = f"{ErrorNumbers.FB410.value}: Invalid value for tags argument {tags}, tags " \
            "should be non-empty list of str or non-empty str."
        logger.critical(msg)
        raise FedbiomedValueError(msg)

    if isinstance(tags, list):
        if not all(map(lambda tag: isinstance(tag, str), tags)):
            msg = f"{ErrorNumbers.FB410.value}: `tags` must be a non-empty str or " \
                "a non-empty list of str."
            logger.critical(msg)
            raise FedbiomedTypeError(msg)

        # If it is empty list
        tags_to_set = tags

    elif isinstance(tags, str):
        tags_to_set = [tags]
    else:
        msg = f"{ErrorNumbers.FB410.value} `tags` must be a non-empty str, " \
            "a non-empty list of str"
        logger.critical(msg)
        raise FedbiomedTypeError(msg)

    self._tags = tags_to_set

    # Set training data
    logger.info(
        "Updating training data. This action will update FederatedDataset, "
        "and the nodes that will participate to the experiment.")

    self.set_training_data(None, from_tags=True)

    return self._tags
set_training_data
set_training_data(training_data, from_tags=False)

Sets training data for federated training + verification on arguments type

The full expected behaviour when changing training data is given in the table below:

New value of training_data from_tags Outcome
dict or FederatedDataset True fail because user is attempting to set from tags but also providing a training_data argument
dict or FederatedDataset False set fds attribute, set tags to None
None True fail if tags are not set, else set fds attribute based tags
None False set tags to None and keep same value and tags

Setting to None forfeits consistency checks

Setting training_data to None does not trigger consistency checks, and may therefore leave the class in an inconsistent state.

Parameters:

Name Type Description Default
training_data Union[FederatedDataSet, dict, None]
  • If it is a FederatedDataSet object, use this value as training_data.
  • else if it is a dict, create and use a FederatedDataSet object from the dict and use this value as training_data. The dict should use node ids as keys, values being list of dicts (each dict representing a dataset on a node).
  • else if it is None (no training data provided)
  • if from_tags is True and tags is not None, set training_data by searching for datasets with a query to the nodes using tags and nodes
  • if from_tags is False or tags is None, set training_data to None (no training_data set yet, experiment is not fully initialized and cannot be launched)
required
from_tags bool

If True, query nodes for datasets when no training_data is provided. Not used when training_data is provided.

False

Returns:

Type Description
Union[FederatedDataSet, None]

FederatedDataSet metadata

Raises:

Type Description
FedbiomedTypeError

bad training_data or from_tags type.

FedbiomedValueError

Invalid value for the arguments training_data or from_tags.

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def set_training_data(
        self,
        training_data: Union[FederatedDataSet, dict, None],
        from_tags: bool = False) -> \
        Union[FederatedDataSet, None]:
    """Sets training data for federated training + verification on arguments type


    The full expected behaviour when changing training data is given in the table below:

    | New value of `training_data` | `from_tags` | Outcome |
    | --- | --- | --- |
    | dict or FederatedDataset | True  | fail because user is attempting to set from tags but also providing a training_data argument|
    | dict or FederatedDataset | False | set fds attribute, set tags to None |
    | None | True | fail if tags are not set, else set fds attribute based tags |
    | None | False | set tags to None and keep same value and tags |

    !!! warning "Setting to None forfeits consistency checks"
        Setting training_data to None does not trigger consistency checks, and may therefore leave the class in an
        inconsistent state.

    Args:
        training_data:
            * If it is a FederatedDataSet object, use this value as training_data.
            * else if it is a dict, create and use a FederatedDataSet object from the dict
              and use this value as training_data. The dict should use node ids as keys,
              values being list of dicts (each dict representing a dataset on a node).
            * else if it is None (no training data provided)
              - if `from_tags` is True and `tags` is not None, set training_data by
                searching for datasets with a query to the nodes using `tags` and `nodes`
              - if `from_tags` is False or `tags` is None, set training_data to None (no training_data set yet,
                experiment is not fully initialized and cannot be launched)
        from_tags: If True, query nodes for datasets when no `training_data` is provided.
            Not used when `training_data` is provided.

    Returns:
        FederatedDataSet metadata

    Raises:
        FedbiomedTypeError: bad training_data or from_tags type.
        FedbiomedValueError: Invalid value for the arguments  `training_data` or `from_tags`.
    """

    if not isinstance(from_tags, bool):
        msg = ErrorNumbers.FB410.value + \
            f' `from_tags` : got {type(from_tags)} but expected a boolean'
        logger.critical(msg)
        raise FedbiomedTypeError(msg)
    if from_tags and training_data is not None:
        msg = ErrorNumbers.FB410.value + \
            ' set_training_data: cannot specify a training_data argument if ' \
            'from_tags is True'
        logger.critical(msg)
        raise FedbiomedValueError(msg)

    # case where no training data are passed
    if training_data is None:
        if from_tags is True:
            if not self._tags:
                msg = f"{ErrorNumbers.FB410.value}: attempting to " \
                    "set training data from undefined tags. Please consider set tags before " \
                    "using set_tags method of the experiment."
                logger.critical(msg)
                raise FedbiomedValueError(msg)
            training_data = self._reqs.search(self._tags, self._nodes_filter)
        else:
            msg = f"{ErrorNumbers.FB410.value}: Can not set training data to `None`. " \
                "Please set from_tags=True or provide a valid training data"
            logger.critical(msg)
            raise FedbiomedValueError(msg)

    if isinstance(training_data, FederatedDataSet):
        self._fds = training_data
    elif isinstance(training_data, dict):
        self._fds = FederatedDataSet(training_data)
    else:
        msg = ErrorNumbers.FB410.value + \
            f' `training_data` has incorrect type: {type(training_data)}'
        logger.critical(msg)
        raise FedbiomedTypeError(msg)

    # check and ensure consistency
    self._tags = self._tags if from_tags else None

    # return the new value
    return self._fds
tags
tags()

Retrieves the tags from the experiment object.

Please see set_tags to set tags.

Returns:

Type Description
Union[List[str], None]

List of tags that has been set. None if it isn't declare yet.

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def tags(self) -> Union[List[str], None]:
    """Retrieves the tags from the experiment object.

    Please see [`set_tags`][fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_tags] to set tags.

    Returns:
        List of tags that has been set. `None` if it isn't declare yet.
    """
    return self._tags
training_data
training_data()

Retrieves the training data which is an instance of FederatedDataset

This represents the dataset metadata available for the full federation.

Please see set_training_data to set or update training data.

Returns:

Type Description
Union[FederatedDataSet, None]

Object that contains metadata for the datasets of each node. None if it isn't set yet.

Source code in fedbiomed/researcher/federated_workflows/_federated_workflow.py
@exp_exceptions
def training_data(self) -> Union[FederatedDataSet, None]:
    """Retrieves the training data which is an instance of
    [`FederatedDataset`][fedbiomed.researcher.datasets.FederatedDataSet]

    This represents the dataset metadata available for the full federation.

    Please see [`set_training_data`][fedbiomed.researcher.federated_workflows.FederatedWorkflow.set_training_data]
    to set or update training data.

    Returns:
        Object that contains metadata for the datasets of each node. `None` if it isn't set yet.
    """
    return self._fds

TrainingPlanWorkflow

TrainingPlanWorkflow(*args, training_plan_class=None, training_args=None, model_args=None, **kwargs)

Bases: FederatedWorkflow, ABC

A TrainingPlanWorkflow is an abstract entry point to orchestrate an experiment which uses a training plan.

In addition to the functionalities provided by FederatedWorkflow, the TrainingPlanWorkflow also manages the life-cycle of the training plan.

Use set_training_plan_class to manage the training plan

Please only ever use the set_training_plan_class function to manage the training plan. Do not set the training plan or training plan class directly!

Parameters:

Name Type Description Default
training_plan_class Optional[TrainingPlanT]

training plan class to be used for training. For experiment to be properly and fully defined training_plan_class needs to be a TrainingPlanT Defaults to None (no training plan class defined yet.

None
model_args Optional[Dict]

contains model arguments passed to the constructor of the training plan when instantiating it : output and input feature dimension, etc.

None
training_args Optional[Union[TrainingArgs, dict]]

contains training arguments passed to the training_routine of the training plan when launching it: lr, epochs, batch_size...

None
*args

Extra positional arguments from parent class FederatedWorkflow

()
**kwargs

Arguments of parent class FederatedWorkflow

{}
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def __init__(
    self,
    *args,
    training_plan_class: Optional[TrainingPlanT] = None,
    training_args: Optional[Union[TrainingArgs, dict]] = None,
    model_args: Optional[Dict] = None,
    **kwargs,
) -> None:
    """Constructor of the class.

    Args:
        training_plan_class: training plan class to be used for training.
            For experiment to be properly and fully defined `training_plan_class`
            needs to be a `TrainingPlanT` Defaults to None (no training plan class
            defined yet.
        model_args: contains model arguments passed to the constructor
            of the training plan when instantiating it :
            output and input feature dimension, etc.
        training_args: contains training arguments passed to the `training_routine`
            of the training plan when launching it: lr, epochs, batch_size...
        *args: Extra positional arguments from parent class
            [`FederatedWorkflow`][fedbiomed.researcher.federated_workflows.FederatedWorkflow]
        **kwargs: Arguments of parent class
            [`FederatedWorkflow`][fedbiomed.researcher.federated_workflows.FederatedWorkflow]
    """
    # Check arguments
    if training_plan_class is not None and not inspect.isclass(training_plan_class):
        raise FedbiomedTypeError(
            f"{ErrorNumbers.FB410.value}: bad type for argument "
            f"`training_plan_class` {type(training_plan_class)}")

    if training_plan_class is not None and \
            not issubclass(training_plan_class, TRAINING_PLAN_TYPES):

        raise FedbiomedTypeError(
            f"{ErrorNumbers.FB410.value}: bad type for argument `training_plan_class`."
            f" It is not subclass of supported training plans {TRAINING_PLAN_TYPES}")

    # _training_plan_class determines the life-cycle of the training plan:
    # if training_plass_class changes, then the training plan must be reinitialized
    self._training_plan_class = None
    # model args is also tied to the life-cycle of training plan:
    # if model_args changes, the training plan must be reinitialized
    self._model_args = None
    # The _training_plan attribute represents the *actual instance*
    # of a _training_plan_class that is currently
    # being used in the workflow. The training plan cannot be modified by the user.
    self._training_plan = None
    self._training_args: Optional[TrainingArgs] = None  # FIXME: is it ok to have this here?
    # The _training_plan_file attribute represents the path of the file where the training plan is saved.
    # It cannot be modified by the user
    self._training_plan_file = None

    # initialize object
    super().__init__(*args, **kwargs)

    self.set_training_args(training_args)
    self.set_model_args(model_args)
    self.set_training_plan_class(training_plan_class)

Functions

breakpoint
breakpoint(state, bkpt_number)

Saves breakpoint with the state of the workflow.

The following attributes will be saved:

  • training_args
  • training_plan_class
  • model_args
Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def breakpoint(self,
               state,
               bkpt_number) -> None:
    """
    Saves breakpoint with the state of the workflow.

    The following attributes will be saved:

      - training_args
      - training_plan_class
      - model_args
    """
    # save training plan to file
    training_plan_module = 'model_' + str(uuid.uuid4())
    training_plan_file = os.path.join(self.experimentation_path(), training_plan_module + '.py')
    self.training_plan().save_code(training_plan_file)

    state.update({
        'model_args': self._model_args,
        'training_plan_class_name': self._training_plan_class.__name__,
        'training_args': self._training_args.get_state_breakpoint(),
    })

    breakpoint_path, breakpoint_file_name = \
        choose_bkpt_file(self._experimentation_folder, bkpt_number - 1)

    # rewrite paths in breakpoint : use the links in breakpoint directory
    state['training_plan_path'] = create_unique_link(
        breakpoint_path,
        # - Need a file with a restricted characters set in name to be able to import as module
        'model_' + str("{:04d}".format(bkpt_number - 1)), '.py',
        # - Prefer relative path, eg for using experiment result after
        # experiment in a different tree
        os.path.join('..', os.path.basename(training_plan_file))
    )
    params_path = os.path.join(breakpoint_path, f"model_params_{uuid.uuid4()}.mpk")
    Serializer.dump(self.training_plan().get_model_wrapper_class().get_weights(
        only_trainable = False, exclude_buffers = False), params_path)
    state['model_weights_path'] = params_path

    super().breakpoint(state, bkpt_number)
check_training_plan_status
check_training_plan_status()

Method for checking training plan status, ie whether it is approved or not by the nodes

Raises:

Type Description
FedbiomedExperimentError

if the training data is not defined.

Returns:

Type Description
Dict

Training plan status for answering nodes

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def check_training_plan_status(self) -> Dict:
    """ Method for checking training plan status, ie whether it is approved or not by the nodes

    Raises:
        FedbiomedExperimentError: if the training data is not defined.

    Returns:
        Training plan status for answering nodes
    """
    if self.training_data() is None:
        msg = f"{ErrorNumbers.FB410.value}. Cannot check training plan status: training data is not defined." \
              f"Please either use the `set_tags` or `set_training_data` method to fix this."
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    job = TrainingPlanCheckJob(
        nodes=self.training_data().node_ids(),
        keep_files_dir=self.experimentation_path(),
        experiment_id=self._experiment_id,
        training_plan=self.training_plan()
    )
    responses = job.execute()
    return responses
info
info(info=None, missing='')

Prints out the information about the current status of the experiment.

Lists all the parameters/arguments of the experiment and informs whether the experiment can be run.

Parameters:

Name Type Description Default
info Optional[Dict]

Dictionary of sub-classes relevant attributes status that will be completed with some additional attributes status defined in this class. Defaults to None (no entries of sub-classes available or of importance).

None
missing_object_to_check

dictionary mapping sub-classes attributes to attribute names, that may be needed to fully run the object. Defaults to None (no check will be performed).

required

Returns:

Type Description
Tuple[Dict[str, List[str]], str]

dictionary containing all pieces of information, with 2 entries: Arguments mapping a list of all argument, and Values mapping a list copntaining all the values.

Raises:

Type Description
KeyError

if Arguments or Values entry is missing in passing argument info

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def info(
    self,
    info: Optional[Dict] = None,
    missing: str = ''
) -> Tuple[Dict[str, List[str]], str]:
    """Prints out the information about the current status of the experiment.

    Lists  all the parameters/arguments of the experiment and informs whether
    the experiment can be run.

    Args:
        info: Dictionary of sub-classes relevant attributes status that will be
            completed with some additional attributes status defined in this class.
            Defaults to None (no entries of sub-classes available or of importance).
        missing_object_to_check: dictionary mapping sub-classes attributes to
            attribute names, that may be needed to fully run the object. Defaults
            to None (no check will be performed).

    Returns:
        dictionary containing all pieces of information, with 2 entries:
            `Arguments` mapping a list of all argument, and `Values` mapping
            a list copntaining all the values.

    Raises:
        KeyError: if `Arguments` or `Values` entry is missing in passing argument `info`
    """
    # at this point all attributes are initialized (in constructor)
    if info is None:
        info = self._create_default_info_structure()
    info['Arguments'].extend([
        'Training Plan Class',
        'Model Arguments',
        'Training Arguments'
    ])
    info['Values'].extend(['\n'.join(findall('.{1,60}',
                                     str(e))) for e in [
        self._training_plan_class,
        self._model_args,
        self._training_args
    ]])

    return super().info(info, missing)
load_breakpoint classmethod
load_breakpoint(breakpoint_folder_path=None)

Loads breakpoint (provided a breakpoint has been saved) so the workflow can be resumed.

Parameters:

Name Type Description Default
breakpoint_folder_path Optional[str]

path of the breakpoint folder. Path can be absolute or relative eg: "var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads the latest breakpoint of the latest workflow. Defaults to None.

None

Returns:

Type Description
Tuple[TrainingPlanWorkflowT, dict]

Reinitialized workflow object.

Raises:

Type Description
FedbiomedExperimentError

bad argument type, error when reading breakpoint or bad loaded breakpoint content (corrupted)

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@classmethod
@exp_exceptions
def load_breakpoint(cls,
                    breakpoint_folder_path: Optional[str] = None) -> Tuple[TrainingPlanWorkflowT, dict]:
    """
    Loads breakpoint (provided a breakpoint has been saved)
    so the workflow can be resumed.

    Args:
      breakpoint_folder_path: path of the breakpoint folder. Path can be absolute or relative eg:
        "var/experiments/Experiment_xxxx/breakpoints_xxxx". If None, loads the latest breakpoint of the latest
        workflow. Defaults to None.

    Returns:
        Reinitialized workflow object.

    Raises:
        FedbiomedExperimentError: bad argument type, error when reading breakpoint or bad loaded breakpoint
            content (corrupted)
    """
    loaded_exp, saved_state = super().load_breakpoint(breakpoint_folder_path)

    # Define type for pylint
    loaded_exp: TrainingPlanWorkflow

    # Import TP class
    _, tp_class = import_class_from_file(
        module_path=saved_state.get("training_plan_path"),
        class_name=saved_state.get("training_plan_class_name")
    )

    loaded_exp.set_model_args(saved_state["model_args"])
    loaded_exp.set_training_plan_class(tp_class)
    loaded_exp.set_training_args(
        TrainingArgs.load_state_breakpoint(
            saved_state.get('training_args')))
    training_plan = loaded_exp.training_plan()
    if training_plan is None:
        msg = ErrorNumbers.FB413.value + ' - load failed, ' + \
            'breakpoint file seems corrupted, `training_plan` is None'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)
    param_path = saved_state['model_weights_path']
    params = Serializer.load(param_path)
    loaded_exp.training_plan().get_model_wrapper_class().set_weights(params)

    return loaded_exp, saved_state
model_args
model_args()

Retrieves model arguments.

Please see also set_model_args

Returns:

Type Description
dict

The arguments that are going to be passed to the init_model function of the training plan during

dict

initialization of the model instance

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def model_args(self) -> dict:
    """Retrieves model arguments.

    Please see also [`set_model_args`][fedbiomed.researcher.federated_workflows.TrainingPlanWorkflow.set_model_args]

    Returns:
        The arguments that are going to be passed to the `init_model` function of the training plan during
        initialization of the model instance
    """
    return self._model_args
set_model_args
set_model_args(model_args, keep_weights=True)

Sets model_args + verification on arguments type

Resets the training plan

This function has an important (and intended!) side-effect: it resets the training_plan attribute. By default, it tries to keep the same weights as the current training plan, if available.

Parameters:

Name Type Description Default
model_args dict

contains model arguments passed to the constructor of the training plan when instantiating it : output and input feature dimension, etc.

required
keep_weights bool

try to keep the same weights as the current training plan

True

Returns:

Type Description
dict

Model arguments that have been set.

Raises:

Type Description
FedbiomedExperimentError

bad model_args type

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def set_model_args(self,
                   model_args: dict,
                   keep_weights: bool = True) -> dict:
    """Sets `model_args` + verification on arguments type

    !!! warning "Resets the training plan"
        This function has an important (and intended!) side-effect: it resets the `training_plan` attribute.
        By default, it tries to keep the same weights as the current training plan, if available.

    Args:
        model_args (dict): contains model arguments passed to the constructor
            of the training plan when instantiating it : output and input feature
            dimension, etc.
        keep_weights: try to keep the same weights as the current training plan

    Returns:
        Model arguments that have been set.

    Raises:
        FedbiomedExperimentError : bad model_args type
    """
    if model_args is None or isinstance(model_args, dict):
        self._model_args = model_args
    else:
        # bad type
        msg = ErrorNumbers.FB410.value + f' `model_args` : {type(model_args)}'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)
    # self._model_args always exist at this point

    self._update_training_plan(keep_weights)  # resets the training plan attribute

    return self._model_args
set_training_args
set_training_args(training_args)

Sets training_args + verification on arguments type

Parameters:

Name Type Description Default
training_args Union[dict, TrainingArgs, None]

contains training arguments passed to the training plan's training_routine such as lr, epochs, batch_size...

required

Returns:

Type Description
Union[dict, None]

Training arguments

Raises:

Type Description
FedbiomedExperimentError

bad training_args type

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def set_training_args(
    self,
    training_args: Union[dict, TrainingArgs, None]
) -> Union[dict, None]:
    """ Sets `training_args` + verification on arguments type

    Args:
        training_args: contains training arguments passed to the
            training plan's `training_routine` such as lr, epochs, batch_size...

    Returns:
        Training arguments

    Raises:
        FedbiomedExperimentError : bad training_args type
    """

    if isinstance(training_args, TrainingArgs):
        self._training_args = deepcopy(training_args)
    elif isinstance(training_args, dict) or training_args is None:
        self._training_args = TrainingArgs(training_args, only_required=False)
    else:
        msg = f"{ErrorNumbers.FB410.value} in function `set_training_args`. " \
              "Expected type TrainingArgs, dict, or " \
              f"None, got {type(training_args)} instead."
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    return self._training_args.dict()
set_training_plan_class
set_training_plan_class(training_plan_class, keep_weights=True)

Sets the training plan type + verification on arguments type

Resets the training plan

This function has an important (and intended!) side-effect: it resets the training_plan attribute. By default, it tries to keep the same weights as the current training plan, if available.

Parameters:

Name Type Description Default
training_plan_class Union[TrainingPlanT, None]

training plan class to be used for training. For experiment to be properly and fully defined training_plan_class needs to be a TrainingPlanT Defaults to None (no training plan class defined yet)

required
keep_weights bool

try to keep the same weights as the current training plan

True

Returns:

Type Description
Union[TrainingPlanT, None]

training_plan_class that is set for experiment

Raises:

Type Description
FedbiomedExperimentError

bad training_plan_class type

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def set_training_plan_class(self,
                            training_plan_class: Union[TrainingPlanT, None],
                            keep_weights: bool = True
                            ) -> Union[TrainingPlanT, None]:
    """Sets  the training plan type + verification on arguments type

    !!! warning "Resets the training plan"
        This function has an important (and intended!) side-effect: it resets the `training_plan` attribute.
        By default, it tries to keep the same weights as the current training plan, if available.

    Args:
        training_plan_class: training plan class to be used for training.
            For experiment to be properly and fully defined `training_plan_class` needs to be a `TrainingPlanT`
            Defaults to None (no training plan class defined yet)
        keep_weights: try to keep the same weights as the current training plan

    Returns:
        `training_plan_class` that is set for experiment

    Raises:
        FedbiomedExperimentError : bad training_plan_class type
    """
    if training_plan_class is None:
        self._training_plan_class = None
    elif inspect.isclass(training_plan_class):
        # training_plan_class must be a subclass of a valid training plan
        if issubclass(training_plan_class, TRAINING_PLAN_TYPES):
            # valid class
            self._training_plan_class = training_plan_class
        else:
            # bad class
            msg = ErrorNumbers.FB410.value + f' `training_plan_class` : {training_plan_class} class'
            logger.critical(msg)
            raise FedbiomedExperimentError(msg)
    else:
        # bad type
        msg = ErrorNumbers.FB410.value + f' `training_plan_class` of type: {type(training_plan_class)}'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    self._update_training_plan(keep_weights)  # resets the training plan attribute

    return self._training_plan_class
training_args
training_args()

Retrieves training arguments.

Please see also [set_training_args][fedbiomed.researcher. federated_workflows.FederatedWorkflow.set_training_args]

Returns:

Type Description
dict

The arguments that are going to be passed to the training plan's training_routine to perfom training on the node side. An example training routine: TorchTrainingPlan.training_routine

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def training_args(self) -> dict:
    """Retrieves training arguments.

    Please see also [`set_training_args`][fedbiomed.researcher.\
    federated_workflows.FederatedWorkflow.set_training_args]

    Returns:
        The arguments that are going to be passed to the training plan's
            `training_routine` to perfom training on the node side. An example
            training routine: [`TorchTrainingPlan.training_routine`]
            [fedbiomed.common.training_plans.TorchTrainingPlan.training_routine]
    """

    return self._training_args.dict()
training_plan
training_plan()

Retrieves the training plan instance currently being used in the federated workflow.

Returns:

Type Description
Optional[TrainingPlan]

training plan: the training plan instance

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def training_plan(self) -> Optional[TrainingPlan]:
    """Retrieves the training plan instance currently being used in the federated workflow.

    Returns:
        training plan: the training plan instance
    """
    return self._training_plan
training_plan_approve
training_plan_approve(description='no description provided')

Send a training plan and a ApprovalRequest message to node(s).

This is a simple redirect to the Requests.training_plan_approve() method.

If a list of node id(s) is provided, the message will be individually sent to all nodes of the list. If the node id(s) list is None (default), the message is broadcast to all nodes.

Parameters:

Name Type Description Default
description str

Description for training plan approve request

'no description provided'

Returns:

Name Type Description
dict

a dictionary of pairs (node_id: status), where status indicates to the researcher

dict

that the training plan has been correctly downloaded on the node side.

Warning dict

status does not mean that the training plan is approved, only that it has been added

dict

to the "approval queue" on the node side.

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def training_plan_approve(self,
                          description: str = "no description provided") -> dict:
    """Send a training plan and a ApprovalRequest message to node(s).

    This is a simple redirect to the Requests.training_plan_approve() method.

    If a list of node id(s) is provided, the message will be individually sent
    to all nodes of the list.
    If the node id(s) list is None (default), the message is broadcast to all nodes.

    Args:
        description: Description for training plan approve request

    Returns:
        a dictionary of pairs (node_id: status), where status indicates to the researcher
        that the training plan has been correctly downloaded on the node side.
        Warning: status does not mean that the training plan is approved, only that it has been added
        to the "approval queue" on the node side.
    """
    job = TrainingPlanApproveJob(
        nodes=self.training_data().node_ids(),
        keep_files_dir=self.experimentation_path(),
        training_plan=self.training_plan(),
        description=description,
    )
    responses = job.execute()
    return responses
training_plan_class
training_plan_class()

Retrieves the type of the training plan that is created for training.

Please see also set_training_plan_class.

Returns:

Name Type Description
training_plan_class Optional[TrainingPlanT]

the class type of the training plan.

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def training_plan_class(self) -> Optional[TrainingPlanT]:
    """Retrieves the type of the training plan that is created for training.

    Please see also
    [`set_training_plan_class`][fedbiomed.researcher.federated_workflows.TrainingPlanWorkflow.set_training_plan_class].

    Returns:
        training_plan_class: the class type of the training plan.
    """

    return self._training_plan_class
training_plan_file
training_plan_file(display=True)

Retrieves the path of the file where the training plan is saved, and optionally displays it.

Parameters:

Name Type Description Default
display bool

If True, prints the content of the training plan file. Default is True

True

Returns:

Type Description
str

Path to the training plan file

Raises:

Type Description
FedbiomedExperimentError

bad argument type, or cannot read training plan file content

Source code in fedbiomed/researcher/federated_workflows/_training_plan_workflow.py
@exp_exceptions
def training_plan_file(self, display: bool = True) -> str:
    """Retrieves the path of the file where the training plan is saved, and optionally displays it.

    Args:
        display: If `True`, prints the content of the training plan file. Default is `True`

    Returns:
        Path to the training plan file

    Raises:
        FedbiomedExperimentError: bad argument type, or cannot read training plan file content
    """
    if not isinstance(display, bool):
        # bad type
        msg = ErrorNumbers.FB410.value + \
            f', in method `training_plan_file` param `display` : type {type(display)}'
        logger.critical(msg)
        raise FedbiomedExperimentError(msg)

    if display and self._training_plan_file is not None:
        try:
            with open(self._training_plan_file) as file:
                content = file.read()
                file.close()
                print(content)
        except OSError as e:
            # cannot read training plan file content
            msg = ErrorNumbers.FB412.value + \
                f', in method `training_plan_file` : error when reading training plan file - {e}'
            logger.critical(msg)
            raise FedbiomedExperimentError(msg)

    return self._training_plan_file