Round

implementation of Round class of the node component

Attributes

Classes

Round

Round(root_dir, db, node_id, training_plan, training_plan_class, model_kwargs, training_kwargs, training, dataset, params, experiment_id, researcher_id, history_monitor, aggregator_args, node_args, tp_security_manager, round_number=0, dlp_and_loading_block_metadata=None, aux_vars=None)

This class represents the training part execute by a node in a given round

Parameters:

Name Type Description Default
root_dir str

Root fedbiomed directory where node instance files will be stored.

required
db str

Path to node database file.

required
node_id str

Node id

required
training_plan str

code of the training plan for this round

required
training_plan_class str

class name of the training plan

required
model_kwargs dict

contains model args. Defaults to None.

required
training_kwargs dict

contains training arguments. Defaults to None.

required
training bool

whether to perform a model training or just to perform a validation check (model infering)

required
dataset dict

dataset details to use in this round. It contains the dataset name, dataset's id, data path, its shape, its description... . Defaults to None.

required
params str

parameters of the model

required
experiment_id str

experiment id

required
researcher_id str

researcher id

required
history_monitor HistoryMonitor

Sends real-time feed-back to end-user during training

required
aggregator_args Dict[str, Any]

Arguments managed by and shared with the researcher-side aggregator.

required
node_args Dict

command line arguments for node. Can include: - gpu (bool): propose use a GPU device if any is available. - gpu_num (Union[int, None]): if not None, use the specified GPU device instead of default GPU device if this GPU device is available. - gpu_only (bool): force use of a GPU device if any available, even if researcher doesn't request for using a GPU.

required
tp_security_manager TrainingPlanSecurityManager

Training plan security manager instance.

required
dlp_and_loading_block_metadata Optional[Tuple[dict, List[dict]]]

Data loading plan to apply, or None if no DLP for this round.

None
round_number int

number of the iteration for this experiment

0
aux_vars Optional[Dict[str, AuxVar]]

Optional optimizer auxiliary variables.

None
Source code in fedbiomed/node/round.py
def __init__(
    self,
    root_dir: str,
    db: str,
    node_id: str,
    training_plan: str,
    training_plan_class: str,
    model_kwargs: dict,
    training_kwargs: dict,
    training: bool ,
    dataset: dict,
    params: str,
    experiment_id: str,
    researcher_id: str,
    history_monitor: HistoryMonitor,
    aggregator_args: Dict[str, Any],
    node_args: Dict,
    tp_security_manager: TrainingPlanSecurityManager,
    round_number: int = 0,
    dlp_and_loading_block_metadata: Optional[Tuple[dict, List[dict]]] = None,
    aux_vars: Optional[Dict[str, AuxVar]] = None,
) -> None:
    """Constructor of the class

    Args:
        root_dir: Root fedbiomed directory where node instance files will be stored.
        db: Path to node database file.
        node_id: Node id
        training_plan: code of the training plan for this round
        training_plan_class: class name of the training plan
        model_kwargs: contains model args. Defaults to None.
        training_kwargs: contains training arguments. Defaults to None.
        training: whether to perform a model training or just to perform a validation check (model infering)
        dataset: dataset details to use in this round. It contains the dataset name, dataset's id,
            data path, its shape, its description... . Defaults to None.
        params: parameters of the model
        experiment_id: experiment id
        researcher_id: researcher id
        history_monitor: Sends real-time feed-back to end-user during training
        aggregator_args: Arguments managed by and shared with the
            researcher-side aggregator.
        node_args: command line arguments for node. Can include:
            - `gpu (bool)`: propose use a GPU device if any is available.
            - `gpu_num (Union[int, None])`: if not None, use the specified GPU device instead of default
                GPU device if this GPU device is available.
            - `gpu_only (bool)`: force use of a GPU device if any available, even if researcher
                doesn't request for using a GPU.
        tp_security_manager: Training plan security manager instance.
        dlp_and_loading_block_metadata: Data loading plan to apply, or None if no DLP for this round.
        round_number: number of the iteration for this experiment
        aux_vars: Optional optimizer auxiliary variables.
    """
    self._node_id = node_id
    self._db = db
    self._dir = root_dir

    self.dataset = dataset
    self.training_plan_source = training_plan
    self.training_plan_class = training_plan_class
    self.params = params
    self.experiment_id = experiment_id
    self.researcher_id = researcher_id
    self.history_monitor = history_monitor
    self.aggregator_args = aggregator_args
    self.aux_vars = aux_vars or {}
    self.node_args = node_args
    self.training = training
    self._dlp_and_loading_block_metadata = dlp_and_loading_block_metadata
    self.training_kwargs = training_kwargs
    self.model_arguments = model_kwargs

    # Class attributes
    self.tp_security_manager = tp_security_manager
    self.training_plan = None
    self.testing_arguments = None
    self.loader_arguments = None
    self.training_arguments = None
    self._secure_aggregation = None
    self._round = round_number
    self._node_state_manager: NodeStateManager = NodeStateManager(
        self._dir, self._node_id, self._db
    )
    self._temp_dir = tempfile.TemporaryDirectory()
    self._keep_files_dir = self._temp_dir.name

Attributes

aggregator_args instance-attribute
aggregator_args = aggregator_args
aux_vars instance-attribute
aux_vars = aux_vars or {}
dataset instance-attribute
dataset = dataset
experiment_id instance-attribute
experiment_id = experiment_id
history_monitor instance-attribute
history_monitor = history_monitor
loader_arguments instance-attribute
loader_arguments = None
model_arguments instance-attribute
model_arguments = model_kwargs
node_args instance-attribute
node_args = node_args
params instance-attribute
params = params
researcher_id instance-attribute
researcher_id = researcher_id
testing_arguments instance-attribute
testing_arguments = None
tp_security_manager instance-attribute
tp_security_manager = tp_security_manager
training instance-attribute
training = training
training_arguments instance-attribute
training_arguments = None
training_kwargs instance-attribute
training_kwargs = training_kwargs
training_plan instance-attribute
training_plan = None
training_plan_class instance-attribute
training_plan_class = training_plan_class
training_plan_source instance-attribute
training_plan_source = training_plan

Functions

collect_optim_aux_var
collect_optim_aux_var()

Collect auxiliary variables from the wrapped Optimizer, if any.

If the TrainingPlan does not use a Fed-BioMed Optimizer, return an empty dict. If it does not hold any BaseOptimizer however, raise a FedbiomedRoundError.

Returns:

Type Description
Dict[str, AuxVar]

Auxiliary variables, as a {module_name: module_auxvar} dict.

Source code in fedbiomed/node/round.py
def collect_optim_aux_var(
    self,
) -> Dict[str, AuxVar]:
    """Collect auxiliary variables from the wrapped Optimizer, if any.

    If the TrainingPlan does not use a Fed-BioMed Optimizer, return an
    empty dict. If it does not hold any BaseOptimizer however, raise a
    FedbiomedRoundError.

    Returns:
        Auxiliary variables, as a `{module_name: module_auxvar}` dict.
    """
    optimizer = self._get_base_optimizer()
    if isinstance(optimizer.optimizer, Optimizer):
        return optimizer.optimizer.get_aux()
    return {}
initialize_arguments
initialize_arguments(previous_state_id=None)

Initializes arguments for training and testing and the NodeStateManager, the latter handling Node state loading and saving.

Parameters:

Name Type Description Default
previous_state_id Optional[str]

previous Node state id. Defaults to None (which is the state_id default value for the first Round).

None

Returns:

Type Description
Optional[Dict[str, Any]]

A dictionary containing the error message if an error is triggered while parsing training and testing

Optional[Dict[str, Any]]

arguments, None otherwise.

!!! "Note" If secure aggregation is activated, model weights will be encrypted as well as the optimizer's auxiliary variables (only if the optimizer used is a DeclearnOptimizer).

Source code in fedbiomed/node/round.py
def initialize_arguments(self,
                         previous_state_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
    """Initializes arguments for training and testing and the NodeStateManager, the latter handling
    Node state loading and saving.

    Args:
        previous_state_id: previous Node state id. Defaults to None (which is the state_id default value for the first Round).

    Returns:
        A dictionary containing the error message if an error is triggered while parsing training and testing
        arguments, None otherwise.

    !!! "Note"
        If secure aggregation is activated, model weights will be encrypted as well as the
        optimizer's auxiliary variables (only if the optimizer used is a `DeclearnOptimizer`).
    """
    # initialize Node State Manager
    self._node_state_manager.initialize(previous_state_id=previous_state_id,
                                        testing=not self.training)
    return self._initialize_validate_training_arguments()
process_optim_aux_var
process_optim_aux_var()

Process researcher-emitted Optimizer auxiliary variables, if any.

Returns:

Type Description
Optional[str]

Error message, empty if the operation was successful.

Source code in fedbiomed/node/round.py
def process_optim_aux_var(self) -> Optional[str]:
    """Process researcher-emitted Optimizer auxiliary variables, if any.

    Returns:
        Error message, empty if the operation was successful.
    """
    # Early-exit if there are no auxiliary variables to process.
    if not any(self.aux_vars):
        return None
    # Fetch the training plan's BaseOptimizer.
    try:
        optimizer = self._get_base_optimizer()
    except FedbiomedRoundError as exc:
        return str(exc)
    # Verify that the BaseOptimizer wraps an Optimizer.
    if not isinstance(optimizer.optimizer, Optimizer):
        return (
            "Received Optimizer auxiliary variables, but the "
            "TrainingPlan does not manage a compatible Optimizer."
        )
    # Pass auxiliary variables to the Optimizer.
    try:
        optimizer.optimizer.set_aux(self.aux_vars)
    except FedbiomedOptimizerError as exc:
        return (
            "TrainingPlan Optimizer failed to ingest the provided "
            f"auxiliary variables: {repr(exc)}"
        )
    # early stop if secagg is activated and optimizer has more than one module that accepts
    # auxiliary variable
    if optimizer.count_nb_auxvar() > 1 and self._secure_aggregation.use_secagg:
        return (
            "Can not parse more than one `declearn` module requiring auxiliary variables while"
            " Secure Aggregation activated. Aborting..."
        )
    return None
run_model_training
run_model_training(tp_approval, secagg_insecure_validation, secagg_active, force_secagg, secagg_arguments=None)

Runs one round of model training

Parameters:

Name Type Description Default
tp_approval bool

True if training plan approval by node is requested

required
secagg_insecure_validation bool

True if (potentially insecure) consistency check is enabled

required
secagg_active bool

True if secure aggregation is enabled on node

required
force_secagg bool

True is secure aggregation is mandatory on node

required
secagg_arguments Union[Dict[str, Any], None]

arguments for secure aggregation, some are specific to the scheme

None

Returns:

Type Description
TrainReply

Returns the corresponding node message, training reply instance

Source code in fedbiomed/node/round.py
def run_model_training(
    self,
    tp_approval: bool,
    secagg_insecure_validation: bool,
    secagg_active: bool,
    force_secagg: bool,
    secagg_arguments: Union[Dict[str, Any], None] = None,
) -> TrainReply:
    """Runs one round of model training

    Args:
        tp_approval: True if training plan approval by node is requested
        secagg_insecure_validation: True if (potentially insecure) consistency check is enabled
        secagg_active: True if secure aggregation is enabled on node
        force_secagg: True is secure aggregation is mandatory on node
        secagg_arguments: arguments for secure aggregation, some are specific to the scheme

    Returns:
        Returns the corresponding node message, training reply instance
    """
    # Validate secagg status. Raises error if the training request is not compatible with
    # secure aggregation settings

    try:
        self._secure_aggregation = SecaggRound(
            db=self._db,
            node_id=self._node_id,
            secagg_arguments=secagg_arguments,
            secagg_active=secagg_active,
            force_secagg=force_secagg,
            experiment_id=self.experiment_id
        )
    except FedbiomedSecureAggregationError as e:
        logger.error(str(e))
        return self._send_round_reply(
            success=False,
            message='Could not configure secure aggregation on node')

    # Validate and load training plan
    if tp_approval:
        approved, training_plan_ = self.tp_security_manager.\
            check_training_plan_status(
                self.training_plan_source,
                TrainingPlanApprovalStatus.APPROVED)

        if not approved:
            return self._send_round_reply(
                False,
                f'Requested training plan is not approved by the node: {self._node_id}')
        else:
            logger.info(f'Training plan has been approved by the node {training_plan_["name"]}',
                        researcher_id=self.researcher_id)

    # Import training plan, save to file, reload, instantiate a training plan
    try:
        CurrentTPModule, CurrentTrainingPlan = utils.import_class_from_spec(
            code=self.training_plan_source, class_name=self.training_plan_class)
        self.training_plan = CurrentTrainingPlan()
    except Exception:
        error_message = "Cannot instantiate training plan object."
        return self._send_round_reply(success=False, message=error_message)

    # save and load training plan to a file to be sure
    # 1. a file is associated to training plan so we can read its source, etc.
    # 2. all dependencies are applied
    training_plan_module = 'model_' + str(uuid.uuid4())
    training_plan_file = os.path.join(self._keep_files_dir, training_plan_module + '.py')
    try:
        self.training_plan.save_code(training_plan_file, from_code=self.training_plan_source)
    except Exception as e:
        error_message = "Cannot save the training plan to a local tmp dir"
        logger.error(f"Cannot save the training plan to a local tmp dir : {e}")
        return self._send_round_reply(success=False, message=error_message)

    del CurrentTrainingPlan
    del CurrentTPModule

    try:
        CurrentTPModule, self.training_plan = utils.import_class_object_from_file(
            training_plan_file, self.training_plan_class)
    except Exception:
        error_message = "Cannot load training plan object from file."
        return self._send_round_reply(success=False, message=error_message)

    try:
        self.training_plan.post_init(model_args=self.model_arguments,
                                     training_args=self.training_arguments,
                                     aggregator_args=self.aggregator_args)
    except Exception:
        error_message = "Can't initialize training plan with the arguments."
        return self._send_round_reply(success=False, message=error_message)

    # load node state
    previous_state_id = self._node_state_manager.previous_state_id
    if previous_state_id is not None:
        try:
            self._load_round_state(previous_state_id)
        except Exception:
            # don't send error details
            return self._send_round_reply(success=False, message="Can't read previous node state.")

    # Load model parameters received from researcher
    try:
        self.training_plan.set_model_params(self.params)
    except Exception:
        error_message = "Cannot initialize model parameters."
        return self._send_round_reply(success=False, message=error_message)
    # ---------------------------------------------------------------------

    # Process Optimizer auxiliary variables, if any.
    error_message = self.process_optim_aux_var()
    if error_message:
        return self._send_round_reply(success=False, message=error_message)

    # Split training and validation data -------------------------------------
    try:

        self._set_training_testing_data_loaders()
    except FedbiomedError as fe:
        error_message = f"Can not create validation/train data: {repr(fe)}"
        return self._send_round_reply(success=False, message=error_message)
    except Exception as e:
        error_message = f"Undetermined error while creating data for training/validation. Can not create " \
                        f"validation/train data: {repr(e)}"
        return self._send_round_reply(success=False, message=error_message)
    # ------------------------------------------------------------------------


    # Validation Before Training
    if self.testing_arguments.get('test_on_global_updates', False) is not False:

        # Last control to make sure validation data loader is set.
        if self.training_plan.testing_data_loader is not None:
            try:
                self.training_plan.testing_routine(metric=self.testing_arguments.get('test_metric', None),
                                                   metric_args=self.testing_arguments.get('test_metric_args', {}),
                                                   history_monitor=self.history_monitor,
                                                   before_train=True)
            except FedbiomedError as e:
                logger.error(f"{ErrorNumbers.FB314}: During the validation phase on global parameter updates; "
                             f"{repr(e)}", researcher_id=self.researcher_id)
            except Exception as e:
                logger.error(f"Undetermined error during the testing phase on global parameter updates: "
                             f"{repr(e)}", researcher_id=self.researcher_id)
        else:
            logger.error(f"{ErrorNumbers.FB314}: Can not execute validation routine due to missing testing dataset"
                         f"Please make sure that `test_ratio` has been set correctly",
                         researcher_id=self.researcher_id)

    # If training is activated.
    if self.training:
        results = {}  # type: Dict[str, Any]

        # Perform the training round.
        if self.training_plan.training_data_loader is not None:
            try:
                rtime_before = time.perf_counter()
                ptime_before = time.process_time()
                self.training_plan.training_routine(history_monitor=self.history_monitor,
                                                    node_args=self.node_args)
                rtime_after = time.perf_counter()
                ptime_after = time.process_time()
            except Exception as exc:
                error_message = f"Cannot train model in round: {repr(exc)}"
                return self._send_round_reply(success=False, message=error_message)

        # Collect Optimizer auxiliary variables, if any.

        try:
            results['optim_aux_var'] = self.collect_optim_aux_var()
        except (FedbiomedOptimizerError, FedbiomedRoundError) as exc:
            error_message = f"Cannot collect Optimizer auxiliary variables: {repr(exc)}"
            return self._send_round_reply(success=False, message=error_message)

        # Validation after training
        if self.testing_arguments.get('test_on_local_updates', False) is not False:

            if self.training_plan.testing_data_loader is not None:
                try:
                    self.training_plan.testing_routine(metric=self.testing_arguments.get('test_metric', None),
                                                       metric_args=self.testing_arguments.get('test_metric_args',
                                                                                              {}),
                                                       history_monitor=self.history_monitor,
                                                       before_train=False)
                except FedbiomedError as e:
                    logger.error(
                        f"{ErrorNumbers.FB314.value}: During the validation phase on local parameter updates; "
                        f"{repr(e)}", researcher_id=self.researcher_id)
                except Exception as e:
                    logger.error(f"Undetermined error during the validation phase on local parameter updates"
                                 f"{repr(e)}", researcher_id=self.researcher_id)
            else:
                logger.error(
                    f"{ErrorNumbers.FB314.value}: Can not execute validation routine due to missing testing "
                    f"dataset please make sure that test_ratio has been set correctly",
                    researcher_id=self.researcher_id)

        # FIXME: this will fail if `self.training_plan.training_data_loader = None` (see issue )
        results["sample_size"] = len(self.training_plan.training_data_loader.dataset)

        results["encrypted"] = False
        model_weights = self.training_plan.after_training_params(flatten=self._secure_aggregation.use_secagg)

        if self._secure_aggregation.use_secagg:
            model_weights, enc_factor, aux_var = self._encrypt_weights_and_auxvar(
                model_weights=model_weights,
                optim_aux_var=results["optim_aux_var"],
                sample_size=results["sample_size"],
                secagg_insecure_validation=secagg_insecure_validation,
            )
            results["encrypted"] = True
            results["encryption_factor"] = enc_factor
            if aux_var is not None:
                results["optim_aux_var"] = aux_var.to_dict()
        results['params'] = model_weights
        results['optimizer_args'] = self.training_plan.optimizer_args()
        results['state_id'] = self._node_state_manager.state_id

        try:
            self._save_round_state()
        except Exception:
            # don't send details to researcher
            return self._send_round_reply(success=False, message="Can't save new node state.")

        # end : clean the namespace
        try:
            del self.training_plan
            del CurrentTPModule
        except Exception:
            logger.debug('Exception raised while deleting training plan instance')

        return self._send_round_reply(success=True,
                                      timing={'rtime_training': rtime_after - rtime_before,
                                              'ptime_training': ptime_after - ptime_before},
                                      extend_with=results)
    else:
        # Only for validation
        return self._send_round_reply(success=True)

Functions