Round

implementation of Round class of the node component

Attributes

Classes

Round

Round(training_plan, training_plan_class, model_kwargs, training_kwargs, training, dataset, params, job_id, researcher_id, history_monitor, aggregator_args, node_args, round_number=0, dlp_and_loading_block_metadata=None, aux_vars=None)

This class represents the training part execute by a node in a given round

Parameters:

Name Type Description Default
training_plan str

code of the training plan for this round

required
training_plan_class str

class name of the training plan

required
model_kwargs dict

contains model args. Defaults to None.

required
training_kwargs dict

contains training arguments. Defaults to None.

required
training bool

whether to perform a model training or just to perform a validation check (model infering)

required
dataset dict

dataset details to use in this round. It contains the dataset name, dataset's id, data path, its shape, its description... . Defaults to None.

required
params str

parameters of the model

required
job_id str

job id

required
researcher_id str

researcher id

required
history_monitor HistoryMonitor

Sends real-time feed-back to end-user during training

required
aggregator_args Dict[str, Any]

Arguments managed by and shared with the researcher-side aggregator.

required
node_args Dict

command line arguments for node. Can include: - gpu (bool): propose use a GPU device if any is available. - gpu_num (Union[int, None]): if not None, use the specified GPU device instead of default GPU device if this GPU device is available. - gpu_only (bool): force use of a GPU device if any available, even if researcher doesn't request for using a GPU.

required
dlp_and_loading_block_metadata Optional[Tuple[dict, List[dict]]]

Data loading plan to apply, or None if no DLP for this round.

None
round_number int

number of the iteration for this experiment

0
aux_var

auxiliary variables of the model.

required
Source code in fedbiomed/node/round.py
def __init__(
    self,
    training_plan: str,
    training_plan_class: str,
    model_kwargs: dict,
    training_kwargs: dict,
    training: bool ,
    dataset: dict,
    params: str,
    job_id: str,
    researcher_id: str,
    history_monitor: HistoryMonitor,
    aggregator_args: Dict[str, Any],
    node_args: Dict,
    round_number: int = 0,
    dlp_and_loading_block_metadata: Optional[Tuple[dict, List[dict]]] = None,
    aux_vars: Optional[List[str]] = None,
) -> None:
    """Constructor of the class

    Args:
        training_plan: code of the training plan for this round
        training_plan_class: class name of the training plan
        model_kwargs: contains model args. Defaults to None.
        training_kwargs: contains training arguments. Defaults to None.
        training: whether to perform a model training or just to perform a validation check (model infering)
        dataset: dataset details to use in this round. It contains the dataset name, dataset's id,
            data path, its shape, its description... . Defaults to None.
        params: parameters of the model
        job_id: job id
        researcher_id: researcher id
        history_monitor: Sends real-time feed-back to end-user during training
        aggregator_args: Arguments managed by and shared with the
            researcher-side aggregator.
        node_args: command line arguments for node. Can include:
            - `gpu (bool)`: propose use a GPU device if any is available.
            - `gpu_num (Union[int, None])`: if not None, use the specified GPU device instead of default
                GPU device if this GPU device is available.
            - `gpu_only (bool)`: force use of a GPU device if any available, even if researcher
                doesn't request for using a GPU.
        dlp_and_loading_block_metadata: Data loading plan to apply, or None if no DLP for this round.
        round_number: number of the iteration for this experiment
        aux_var: auxiliary variables of the model.
    """

    self._use_secagg: bool = False
    self.dataset = dataset
    self.training_plan_source = training_plan
    self.training_plan_class = training_plan_class
    self.params = params
    self.job_id = job_id
    self.researcher_id = researcher_id
    self.history_monitor = history_monitor
    self.aggregator_args = aggregator_args
    self.aux_vars = aux_vars or []
    self.node_args = node_args
    self.training = training
    self._dlp_and_loading_block_metadata = dlp_and_loading_block_metadata
    self.training_kwargs = training_kwargs
    self.model_arguments = model_kwargs

    # Class attributes
    self.tp_security_manager = TrainingPlanSecurityManager()
    self.training_plan = None
    self.testing_arguments = None
    self.loader_arguments = None
    self.training_arguments = None
    self._secagg_crypter = SecaggCrypter()
    self._secagg_clipping_range = None
    self._round = round_number
    self._biprime = None
    self._servkey = None
    self._node_state_manager: NodeStateManager = NodeStateManager(environ['DB_PATH'])

    self._keep_files_dir = tempfile.mkdtemp(prefix=environ['TMP_DIR'])
    atexit.register(lambda: shutil.rmtree(self._keep_files_dir))  # remove directory

Attributes

aggregator_args instance-attribute
aggregator_args = aggregator_args
aux_vars instance-attribute
aux_vars = aux_vars or []
dataset instance-attribute
dataset = dataset
history_monitor instance-attribute
history_monitor = history_monitor
job_id instance-attribute
job_id = job_id
loader_arguments instance-attribute
loader_arguments = None
model_arguments instance-attribute
model_arguments = model_kwargs
node_args instance-attribute
node_args = node_args
params instance-attribute
params = params
researcher_id instance-attribute
researcher_id = researcher_id
testing_arguments instance-attribute
testing_arguments = None
tp_security_manager instance-attribute
tp_security_manager = TrainingPlanSecurityManager()
training instance-attribute
training = training
training_arguments instance-attribute
training_arguments = None
training_kwargs instance-attribute
training_kwargs = training_kwargs
training_plan instance-attribute
training_plan = None
training_plan_class instance-attribute
training_plan_class = training_plan_class
training_plan_source instance-attribute
training_plan_source = training_plan

Functions

collect_optim_aux_var
collect_optim_aux_var()

Collect auxiliary variables from the wrapped Optimizer, if any.

If the TrainingPlan does not use a Fed-BioMed Optimizer, return an empty dict. If it does not hold any BaseOptimizer however, raise a FedbiomedRoundError.

Returns:

Type Description
Dict[str, Any]

Auxiliary variables

Source code in fedbiomed/node/round.py
def collect_optim_aux_var(self) -> Dict[str, Any]:
    """Collect auxiliary variables from the wrapped Optimizer, if any.

    If the TrainingPlan does not use a Fed-BioMed Optimizer, return an
    empty dict. If it does not hold any BaseOptimizer however, raise a
    FedbiomedRoundError.

    Returns:
        Auxiliary variables
    """
    optimizer = self._get_base_optimizer()
    if isinstance(optimizer.optimizer, Optimizer):
        aux_var = optimizer.optimizer.get_aux()

        if aux_var and self._use_secagg:
            # TODO: remove the following warning when secagg compatibility has been fixed
            # if secagg is used, raise a warning that encryption is not working with auxiliary variable
            logger.warning(f'Node {environ["NODE_ID"]} optimizer is sending auxiliary variables to the '
                           'Researcher, but those are not encrypted with SecAgg.'
                           'Auxiliary Variables may contain sensitive information about the Nodes.'
                           'This issue will be fixed in a future version of Fed-BioMed',
                           researcher_id=self.researcher_id)
        return aux_var
    return {}
initialize_arguments
initialize_arguments(previous_state_id=None)

Initializes arguments for training and testing and the NodeStateManager, the latter handling Node state loading and saving.

Parameters:

Name Type Description Default
previous_state_id Optional[str]

previous Node state id. Defaults to None (which is the state_id default value for the first Round).

None

Returns:

Type Description
Optional[Dict[str, Any]]

A dictionary containing the error message if an error is triggered while parsing training and testing

Optional[Dict[str, Any]]

arguments, None otherwise.

Source code in fedbiomed/node/round.py
def initialize_arguments(self,
                         previous_state_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
    """Initializes arguments for training and testing and the NodeStateManager, the latter handling
    Node state loading and saving.

    Args:
        previous_state_id: previous Node state id. Defaults to None (which is the state_id default value for the first Round).

    Returns:
        A dictionary containing the error message if an error is triggered while parsing training and testing
        arguments, None otherwise.
    """
    # initialize Node State Manager
    self._node_state_manager.initialize(previous_state_id=previous_state_id,
                                        testing=not self.training)
    return self._initialize_validate_training_arguments()
process_optim_aux_var
process_optim_aux_var()

Process researcher-emitted Optimizer auxiliary variables, if any.

Returns:

Type Description
Optional[str]

Error message, empty if the operation was successful.

Source code in fedbiomed/node/round.py
def process_optim_aux_var(self) -> Optional[str]:
    """Process researcher-emitted Optimizer auxiliary variables, if any.

    Returns:
        Error message, empty if the operation was successful.
    """
    # Early-exit if there are no auxiliary variables to process.
    if not any(self.aux_vars):
        return


    aux_vars = {}
    aux_vars.update(self.aux_vars[0])
    aux_vars.update(self.aux_vars[1])

    # Fetch the training plan's BaseOptimizer.
    try:
        optimizer = self._get_base_optimizer()
    except FedbiomedRoundError as exc:
        return str(exc)

    # Verify that the BaseOptimizer wraps an Optimizer.
    if not isinstance(optimizer.optimizer, Optimizer):
        return (
            "Received Optimizer auxiliary variables, but the "
            "TrainingPlan does not manage a compatible Optimizer."
        )
    # Pass auxiliary variables to the Optimizer.
    try:
        optimizer.optimizer.set_aux(aux_vars)
    except FedbiomedOptimizerError as exc:
        return (
            "TrainingPlan Optimizer failed to ingest the provided "
            f"auxiliary variables: {repr(exc)}"
        )

    return
run_model_training
run_model_training(secagg_arguments=None)

Runs one round of model training

Parameters:

Name Type Description Default
secagg_arguments Union[Dict, None]
  • secagg_servkey_id: Secure aggregation Servkey context id. None means that the parameters are not going to be encrypted
  • secagg_biprime_id: Secure aggregation Biprime context ID.
  • secagg_random: Float value to validate secure aggregation on the researcher side
None

Returns:

Type Description
Dict[str, Any]

Returns the corresponding node message, training reply instance

Source code in fedbiomed/node/round.py
def run_model_training(
        self,
        secagg_arguments: Union[Dict, None] = None,
) -> Dict[str, Any]:
    """Runs one round of model training

    Args:
        secagg_arguments:
            - secagg_servkey_id: Secure aggregation Servkey context id. None means that the parameters
                are not going to be encrypted
            - secagg_biprime_id: Secure aggregation Biprime context ID.
            - secagg_random: Float value to validate secure aggregation on the researcher side

    Returns:
        Returns the corresponding node message, training reply instance
    """
    # Validate secagg status. Raises error if the training request is not compatible with
    # secure aggregation settings
    try:
        secagg_arguments = {} if secagg_arguments is None else secagg_arguments
        self._use_secagg = self._configure_secagg(
            secagg_servkey_id=secagg_arguments.get('secagg_servkey_id'),
            secagg_biprime_id=secagg_arguments.get('secagg_biprime_id'),
            secagg_random=secagg_arguments.get('secagg_random')
        )
    except FedbiomedRoundError as e:
        return self._send_round_reply(success=False, message=str(e))

    # Validate and load training plan
    if environ["TRAINING_PLAN_APPROVAL"]:
        approved, training_plan_ = self.tp_security_manager.\
            check_training_plan_status(
                self.training_plan_source,
                TrainingPlanApprovalStatus.APPROVED)

        if not approved:
            return self._send_round_reply(
                False,
                f'Requested training plan is not approved by the node: {environ["NODE_ID"]}')
        else:
            logger.info(f'Training plan has been approved by the node {training_plan_["name"]}',
                        researcher_id=self.researcher_id)

    # Import training plan, save to file, reload, instantiate a training plan
    try:
        CurrentTPModule, CurrentTrainingPlan = utils.import_class_from_spec(
            code=self.training_plan_source, class_name=self.training_plan_class)
        self.training_plan = CurrentTrainingPlan()
    except Exception as e:
        error_message = "Cannot instantiate training plan object."
        return self._send_round_reply(success=False, message=error_message)

    # save and load training plan to a file to be sure
    # 1. a file is associated to training plan so we can read its source, etc.
    # 2. all dependencies are applied
    training_plan_module = 'model_' + str(uuid.uuid4())
    training_plan_file = os.path.join(self._keep_files_dir, training_plan_module + '.py')
    try:
        self.training_plan.save_code(training_plan_file, from_code=self.training_plan_source)
    except Exception as e:
        error_message = "Cannot save the training plan to a local tmp dir"
        logger.error(f"Cannot save the training plan to a local tmp dir : {e}")
        return self._send_round_reply(success=False, message=error_message)

    del CurrentTrainingPlan
    del CurrentTPModule

    try:
        CurrentTPModule, self.training_plan = utils.import_class_object_from_file(
            training_plan_file, self.training_plan_class)
    except Exception as e:
        error_message = "Cannot load training plan object from file."
        return self._send_round_reply(success=False, message=error_message)

    try:
        self.training_plan.post_init(model_args=self.model_arguments,
                                     training_args=self.training_arguments,
                                     aggregator_args=self.aggregator_args)
    except Exception as e:
        error_message = "Can't initialize training plan with the arguments."
        return self._send_round_reply(success=False, message=error_message)

    # load node state
    previous_state_id = self._node_state_manager.previous_state_id
    if previous_state_id is not None:
        try:
            self._load_round_state(previous_state_id)
        except Exception:
            # don't send error details
            return self._send_round_reply(success=False, message="Can't read previous node state.")

    # Load model parameters received from researcher
    try:
        self.training_plan.set_model_params(self.params)
    except Exception as e:
        error_message = "Cannot initialize model parameters."
        return self._send_round_reply(success=False, message=error_message)
    # ---------------------------------------------------------------------

    # Process Optimizer auxiliary variables, if any.
    error_message = self.process_optim_aux_var()
    if error_message:
        return self._send_round_reply(success=False, message=error_message)

    # Split training and validation data -------------------------------------
    try:

        self._set_training_testing_data_loaders()
    except FedbiomedError as fe:
        error_message = f"Can not create validation/train data: {repr(fe)}"
        return self._send_round_reply(success=False, message=error_message)
    except Exception as e:
        error_message = f"Undetermined error while creating data for training/validation. Can not create " \
                        f"validation/train data: {repr(e)}"
        return self._send_round_reply(success=False, message=error_message)
    # ------------------------------------------------------------------------


    # Validation Before Training
    if self.testing_arguments.get('test_on_global_updates', False) is not False:

        # Last control to make sure validation data loader is set.
        if self.training_plan.testing_data_loader is not None:
            try:
                self.training_plan.testing_routine(metric=self.testing_arguments.get('test_metric', None),
                                                   metric_args=self.testing_arguments.get('test_metric_args', {}),
                                                   history_monitor=self.history_monitor,
                                                   before_train=True)
            except FedbiomedError as e:
                logger.error(f"{ErrorNumbers.FB314}: During the validation phase on global parameter updates; "
                             f"{repr(e)}", researcher_id=self.researcher_id)
            except Exception as e:
                logger.error(f"Undetermined error during the testing phase on global parameter updates: "
                             f"{repr(e)}", researcher_id=self.researcher_id)
        else:
            logger.error(f"{ErrorNumbers.FB314}: Can not execute validation routine due to missing testing dataset"
                         f"Please make sure that `test_ratio` has been set correctly",
                         researcher_id=self.researcher_id)

    # If training is activated.
    if self.training:
        results = {}

        # Perform the training round.
        if self.training_plan.training_data_loader is not None:
            try:
                rtime_before = time.perf_counter()
                ptime_before = time.process_time()
                self.training_plan.training_routine(history_monitor=self.history_monitor,
                                                    node_args=self.node_args)
                rtime_after = time.perf_counter()
                ptime_after = time.process_time()
            except Exception as exc:
                error_message = f"Cannot train model in round: {repr(exc)}"
                return self._send_round_reply(success=False, message=error_message)

        # Collect Optimizer auxiliary variables, if any.
        try:
            results['optim_aux_var'] = self.collect_optim_aux_var()
        except (FedbiomedOptimizerError, FedbiomedRoundError) as exc:
            error_message = f"Cannot collect Optimizer auxiliary variables: {repr(exc)}"
            return self._send_round_reply(success=False, message=error_message)

        # Validation after training
        if self.testing_arguments.get('test_on_local_updates', False) is not False:

            if self.training_plan.testing_data_loader is not None:
                try:
                    self.training_plan.testing_routine(metric=self.testing_arguments.get('test_metric', None),
                                                       metric_args=self.testing_arguments.get('test_metric_args',
                                                                                              {}),
                                                       history_monitor=self.history_monitor,
                                                       before_train=False)
                except FedbiomedError as e:
                    logger.error(
                        f"{ErrorNumbers.FB314.value}: During the validation phase on local parameter updates; "
                        f"{repr(e)}", researcher_id=self.researcher_id)
                except Exception as e:
                    logger.error(f"Undetermined error during the validation phase on local parameter updates"
                                 f"{repr(e)}", researcher_id=self.researcher_id)
            else:
                logger.error(
                    f"{ErrorNumbers.FB314.value}: Can not execute validation routine due to missing testing "
                    f"dataset please make sure that test_ratio has been set correctly",
                    researcher_id=self.researcher_id)

        # FIXME: this will fail if `self.training_plan.training_data_loader = None` (see issue )
        results["sample_size"] = len(self.training_plan.training_data_loader.dataset)

        results["encrypted"] = False
        model_weights = self.training_plan.after_training_params(flatten=self._use_secagg)

        if self._use_secagg:

            logger.info("Encrypting model parameters. This process can take some time depending on model size.",
                        researcher_id=self.researcher_id)

            encrypt = functools.partial(
                self._secagg_crypter.encrypt,
                num_nodes=len(self._servkey["parties"]) - 1,  # -1: don't count researcher
                current_round=self._round,
                key=self._servkey["context"]["server_key"],
                biprime=self._biprime["context"]["biprime"],
                weight=results["sample_size"],
                clipping_range=secagg_arguments.get('secagg_clipping_range')
            )
            model_weights = encrypt(params=model_weights)
            results["encrypted"] = True
            results["encryption_factor"] = encrypt(params=[secagg_arguments["secagg_random"]])
            logger.info("Encryption is completed!",
                        researcher_id=self.researcher_id)

        results['params'] = model_weights
        results['optimizer_args'] = self.training_plan.optimizer_args()
        results['state_id'] = self._node_state_manager.state_id

        try:
            self._save_round_state()
        except Exception:
            # don't send details to researcher
            return self._send_round_reply(success=False, message="Can't save new node state.")

        # end : clean the namespace
        try:
            del self.training_plan
            del CurrentTPModule
        except Exception:
            logger.debug(f'Exception raised while deleting training plan instance')

        return self._send_round_reply(success=True,
                                      timing={'rtime_training': rtime_after - rtime_before,
                                              'ptime_training': ptime_after - ptime_before},
                                      extend_with=results)
    else:
        # Only for validation
        return self._send_round_reply(success=True)