Round

implementation of Round class of the node component

Attributes

Classes

Round

Round(model_kwargs=None, training_kwargs=None, training=True, dataset=None, training_plan_url=None, training_plan_class=None, params_url=None, job_id=None, researcher_id=None, history_monitor=None, aggregator_args=None, node_args=None, round_number=0, dlp_and_loading_block_metadata=None, aux_var_urls=None)

This class represents the training part execute by a node in a given round

Parameters:

Name Type Description Default
model_kwargs Optional[dict]

contains model args. Defaults to None.

None
training_kwargs Optional[dict]

contains training arguments. Defaults to None.

None
training bool

whether to perform a model training or just to perform a validation check (model infering)

True
dataset Optional[dict]

dataset details to use in this round. It contains the dataset name, dataset's id, data path, its shape, its description... . Defaults to None.

None
training_plan_url Optional[str]

url from which to download training plan file

None
training_plan_class Optional[str]

name of the training plan (eg 'MyTrainingPlan')

None
params_url Optional[str]

url from which to upload/download model params

None
job_id Optional[str]

job id

None
researcher_id Optional[str]

researcher id

None
history_monitor Optional[HistoryMonitor]

Sends real-time feed-back to end-user during training

None
aggregator_args Optional[Dict[str, Any]]

Arguments managed by and shared with the researcher-side aggregator.

None
node_args Union[dict, None]

command line arguments for node. Can include: - gpu (bool): propose use a GPU device if any is available. - gpu_num (Union[int, None]): if not None, use the specified GPU device instead of default GPU device if this GPU device is available. - gpu_only (bool): force use of a GPU device if any available, even if researcher doesn't request for using a GPU.

None
aux_var_urls Optional[List[str]]

Optional tuple of URLs of files containing Optimizer auxiliary variables that are to be processed by the training plan's optimizer.

None
Source code in fedbiomed/node/round.py
def __init__(
    self,
    model_kwargs: Optional[dict] = None,
    training_kwargs: Optional[dict] = None,
    training: bool = True,
    dataset: Optional[dict] = None,
    training_plan_url: Optional[str] = None,
    training_plan_class: Optional[str] = None,
    params_url: Optional[str] = None,
    job_id: Optional[str] = None,
    researcher_id: Optional[str] = None,
    history_monitor: Optional[HistoryMonitor] = None,
    aggregator_args: Optional[Dict[str, Any]] = None,
    node_args: Union[dict, None] = None,
    round_number: int = 0,
    dlp_and_loading_block_metadata: Optional[Tuple[dict, List[dict]]] = None,
    aux_var_urls: Optional[List[str]] = None,
) -> None:
    """Constructor of the class

    Args:
        model_kwargs: contains model args. Defaults to None.
        training_kwargs: contains training arguments. Defaults to None.
        training: whether to perform a model training or just to perform a validation check (model infering)
        dataset: dataset details to use in this round. It contains the dataset name, dataset's id,
            data path, its shape, its description... . Defaults to None.
        training_plan_url: url from which to download training plan file
        training_plan_class: name of the training plan (eg 'MyTrainingPlan')
        params_url: url from which to upload/download model params
        job_id: job id
        researcher_id: researcher id
        history_monitor: Sends real-time feed-back to end-user during training
        aggregator_args: Arguments managed by and shared with the
            researcher-side aggregator.
        node_args: command line arguments for node. Can include:
            - `gpu (bool)`: propose use a GPU device if any is available.
            - `gpu_num (Union[int, None])`: if not None, use the specified GPU device instead of default
                GPU device if this GPU device is available.
            - `gpu_only (bool)`: force use of a GPU device if any available, even if researcher
                doesn't request for using a GPU.
        aux_var_urls: Optional tuple of URLs of files containing Optimizer auxiliary variables
            that are to be processed by the training plan's optimizer.
    """

    self._use_secagg: bool = False
    self.dataset = dataset
    self.training_plan_url = training_plan_url
    self.training_plan_class = training_plan_class
    self.params_url = params_url
    self.job_id = job_id
    self.researcher_id = researcher_id
    self.history_monitor = history_monitor
    self.aggregator_args = aggregator_args
    self.aux_var_urls = aux_var_urls or []

    self.tp_security_manager = TrainingPlanSecurityManager()
    self.node_args = node_args
    self.repository = Repository(environ['UPLOADS_URL'], environ['TMP_DIR'], environ['CACHE_DIR'])
    self.training_plan = None
    self.training = training
    self._dlp_and_loading_block_metadata = dlp_and_loading_block_metadata

    self.training_kwargs = training_kwargs
    self.model_arguments = model_kwargs
    self.testing_arguments = None
    self.loader_arguments = None
    self.training_arguments = None
    self._secagg_crypter = SecaggCrypter()
    self._secagg_clipping_range = None
    self._round = round_number
    self._biprime = None
    self._servkey = None
    self._optim_aux_var = {}  # type: Dict[str, Dict[str, Any]]
    self._node_state_manager = NodeStateManager(environ['DB_PATH'])

Attributes

aggregator_args instance-attribute
aggregator_args = aggregator_args
aux_var_urls instance-attribute
aux_var_urls = aux_var_urls or []
dataset instance-attribute
dataset = dataset
history_monitor instance-attribute
history_monitor = history_monitor
job_id instance-attribute
job_id = job_id
loader_arguments instance-attribute
loader_arguments = None
model_arguments instance-attribute
model_arguments = model_kwargs
node_args instance-attribute
node_args = node_args
params_url instance-attribute
params_url = params_url
repository instance-attribute
repository = Repository(environ['UPLOADS_URL'], environ['TMP_DIR'], environ['CACHE_DIR'])
researcher_id instance-attribute
researcher_id = researcher_id
testing_arguments instance-attribute
testing_arguments = None
tp_security_manager instance-attribute
tp_security_manager = TrainingPlanSecurityManager()
training instance-attribute
training = training
training_arguments instance-attribute
training_arguments = None
training_kwargs instance-attribute
training_kwargs = training_kwargs
training_plan instance-attribute
training_plan = None
training_plan_class instance-attribute
training_plan_class = training_plan_class
training_plan_url instance-attribute
training_plan_url = training_plan_url

Functions

collect_optim_aux_var
collect_optim_aux_var()

Collect auxiliary variables from the wrapped Optimizer, if any.

If the TrainingPlan does not use a Fed-BioMed Optimizer, return an empty dict. If it does not hold any BaseOptimizer however, raise a FedbiomedRoundError.

Source code in fedbiomed/node/round.py
def collect_optim_aux_var(self) -> Dict[str, Any]:
    """Collect auxiliary variables from the wrapped Optimizer, if any.

    If the TrainingPlan does not use a Fed-BioMed Optimizer, return an
    empty dict. If it does not hold any BaseOptimizer however, raise a
    FedbiomedRoundError.
    """
    optimizer = self._get_base_optimizer()
    if isinstance(optimizer.optimizer, Optimizer):
        aux_var = optimizer.optimizer.get_aux()

        if aux_var and self._use_secagg:
            # TODO: remove the following warning when secagg compatibility has been fixed
            # if secagg is used, raise a warning that encryption is not working with auxiliary variable
            logger.warning(f'Node {environ["NODE_ID"]} optimizer is sending auxiliary variables to the Researcher, '
                           'but those are not encrypted with SecAgg.'
                           'Auxiliary Variables may contain sensitive information about the Nodes.'
                           'This issue will be fixed in a future version of Fed-BioMed')
        return aux_var
    return {}
download_aggregator_args
download_aggregator_args()

Retrieves aggregator arguments, that are sent through file exchange service

Returns:

Type Description
Tuple[bool, str]

Tuple[bool, str]: a tuple containing: a bool that indicates the success of operation a string containing the error message

Source code in fedbiomed/node/round.py
def download_aggregator_args(self) -> Tuple[bool, str]:
    """Retrieves aggregator arguments, that are sent through file exchange service

    Returns:
        Tuple[bool, str]: a tuple containing:
            a bool that indicates the success of operation
            a string containing the error message
    """
    # download heavy aggregator args (if any)

    if self.aggregator_args is not None:

        for arg_name, aggregator_arg in self.aggregator_args.items():
            if isinstance(aggregator_arg, dict):
                url = aggregator_arg.get('url', False)

                if any((url, arg_name)):
                    # if both `filename` and `arg_name` fields are present, it means that parameters
                    # should be retrieved using file
                    # exchanged system
                    success, param_path, error_msg = self.download_file(url, f"{arg_name}_{uuid.uuid4()}.mpk")
                    if not success:
                        return success, error_msg
                    else:
                        # FIXME: should we load parameters here or in the training plan
                        self.aggregator_args[arg_name] = {'param_path': param_path,
                                                          # 'params': training_plan.load(param_path,
                                                          # update_model=True)
                                                          }
                    self.aggregator_args[arg_name] = Serializer.load(param_path)
        return True, ''
    else:
        return True, "no file downloads required for aggregator args"
download_file
download_file(url, file_path)

Downloads file from file exchange system

Parameters:

Name Type Description Default
url str

url used to download file

required
file_path str

file path used to store the downloaded content

required

Returns:

Type Description
Tuple[bool, str, str]

Tuple[bool, str, str]: tuple that contains: bool that indicates the success of the download str that returns the complete path file str containing the error message (if any). Returns empty string if operation successful.

Source code in fedbiomed/node/round.py
def download_file(self, url: str, file_path: str) -> Tuple[bool, str, str]:
    """Downloads file from file exchange system

    Args:
        url (str): url used to download file
        file_path (str): file path used to store the downloaded content

    Returns:
        Tuple[bool, str, str]: tuple that contains:
            bool that indicates the success of the download
            str that returns the complete path file
            str containing the error message (if any). Returns empty
            string if operation successful.
    """
    status, params_path = self.repository.download_file(url, file_path)
    if (status != 200) or (params_path is None):
        error_message = f"Cannot download param file: {url}"
        return False, '', error_message
    return True, params_path, ''
download_optimizer_aux_var
download_optimizer_aux_var()

Download Optimizer auxiliary variables, if any.

When this operation is successful, it updates a private attribute of this instance storing the obtained auxiliary variables (if any).

Returns:

Name Type Description
success bool

Whether the operation was successful.

error_message str

Error message in case the operation failed.

Source code in fedbiomed/node/round.py
def download_optimizer_aux_var(self) -> Tuple[bool, str]:
    """Download Optimizer auxiliary variables, if any.

    When this operation is successful, it updates a private attribute
    of this instance storing the obtained auxiliary variables (if any).

    Returns:
        success: Whether the operation was successful.
        error_message: Error message in case the operation failed.
    """
    aux_var = {}  # type: Dict[str, Dict[str, Any]]
    for url in self.aux_var_urls:
        # Download the file.
        path = f"aux_var_{uuid.uuid4()}.mpk"
        success, path, err_msg = self.download_file(url, path)
        if not success:
            error_message = (
                f"Failed to download Optimizer aux var file: {err_msg}"
            )
            return False, error_message
        # Deserialize the file and add its contents to `aux_var`.
        try:
            aux_new = Serializer.load(path)
        except Exception as exc:
            error_message = (
                f"Failed to de-serialize Optimizer aux var file {path}"
                f": {repr(exc)}"
            )
            return False, error_message
        aux_var.update(aux_new)
    # If everything went fine, update private attribute and return flags.
    self._optim_aux_var = aux_var
    return True, ""
initialize_arguments
initialize_arguments(previous_state_id=None)

Initializes arguments for training and testing and the NodeStateManager, the latter handling Node state loading and saving.

Parameters:

Name Type Description Default
previous_state_id Optional[str]

previous Node state id. Defaults to None (which is the state_id default value for the first Round).

None

Returns:

Type Description
Optional[Dict[str, Any]]

A dictionary containing the error message if an error is triggered while parsing training and testing

Optional[Dict[str, Any]]

arguments, None otherwise.

Source code in fedbiomed/node/round.py
def initialize_arguments(self,
                         previous_state_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
    """Initializes arguments for training and testing and the NodeStateManager, the latter handling
    Node state loading and saving.

    Args:
        previous_state_id: previous Node state id. Defaults to None (which is the state_id default value for the first Round).

    Returns:
        A dictionary containing the error message if an error is triggered while parsing training and testing
        arguments, None otherwise.
    """
    # initialize Node State Manager
    self._node_state_manager.initialize(previous_state_id=previous_state_id,
                                        testing=not self.training)
    return self._initialize_validate_training_arguments()
process_optim_aux_var
process_optim_aux_var()

Process researcher-emitted Optimizer auxiliary variables, if any.

Returns:

Type Description
str

Error message, empty if the operation was successful.

Source code in fedbiomed/node/round.py
def process_optim_aux_var(self) -> str:
    """Process researcher-emitted Optimizer auxiliary variables, if any.

    Returns:
        Error message, empty if the operation was successful.
    """
    # Early-exit if there are no auxiliary variables to process.
    if not self._optim_aux_var:
        return ""
    # Fetch the training plan's BaseOptimizer.
    try:
        optimizer = self._get_base_optimizer()
    except FedbiomedRoundError as exc:
        return str(exc)
    # Verify that the BaseOptimizer wraps an Optimizer.
    if not isinstance(optimizer.optimizer, Optimizer):
        return (
            "Received Optimizer auxiliary variables, but the "
            "TrainingPlan does not manage a compatible Optimizer."
        )
    # Pass auxiliary variables to the Optimizer.
    try:
        optimizer.optimizer.set_aux(self._optim_aux_var)
    except FedbiomedOptimizerError as exc:
        return (
            "TrainingPlan Optimizer failed to ingest the provided "
            f"auxiliary variables: {repr(exc)}"
        )
    return ""
run_model_training
run_model_training(secagg_arguments=None)

This method downloads training plan file; then runs the training of a model and finally uploads model params to the file repository

Parameters:

Name Type Description Default
secagg_arguments Union[Dict, None]
  • secagg_servkey_id: Secure aggregation Servkey context id. None means that the parameters are not going to be encrypted
  • secagg_biprime_id: Secure aggregation Biprime context ID.
  • secagg_random: Float value to validate secure aggregation on the researcher side
None

Returns:

Type Description
Dict[str, Any]

Returns the corresponding node message, training reply instance

Source code in fedbiomed/node/round.py
def run_model_training(
        self,
        # state_id: Optional[str] = None,
        secagg_arguments: Union[Dict, None] = None,
) -> Dict[str, Any]:
    """This method downloads training plan file; then runs the training of a model
    and finally uploads model params to the file repository

    Args:
        secagg_arguments:
            - secagg_servkey_id: Secure aggregation Servkey context id. None means that the parameters
                are not going to be encrypted
            - secagg_biprime_id: Secure aggregation Biprime context ID.
            - secagg_random: Float value to validate secure aggregation on the researcher side

    Returns:
        Returns the corresponding node message, training reply instance
    """
    # Validate secagg status. Raises error if the training request is not compatible with
    # secure aggregation settings
    try:
        secagg_arguments = {} if secagg_arguments is None else secagg_arguments
        self._use_secagg = self._configure_secagg(
            secagg_servkey_id=secagg_arguments.get('secagg_servkey_id'),
            secagg_biprime_id=secagg_arguments.get('secagg_biprime_id'),
            secagg_random=secagg_arguments.get('secagg_random')
        )
    except FedbiomedRoundError as e:
        return self._send_round_reply(success=False, message=str(e))

    # Download and validate the training plan.
    # Download the model weights and any auxiliary information.
    try:
        # module name cannot contain dashes
        import_module = 'training_plan_' + str(uuid.uuid4().hex)
        status, _ = self.repository.download_file(self.training_plan_url,
                                                  import_module + '.py')

        if status != 200:
            error_message = "Cannot download training plan file: " + self.training_plan_url
            return self._send_round_reply(success=False, message=error_message)
        else:
            if environ["TRAINING_PLAN_APPROVAL"]:
                approved, training_plan_ = self.tp_security_manager.check_training_plan_status(
                    os.path.join(environ["TMP_DIR"], import_module + '.py'),
                    TrainingPlanApprovalStatus.APPROVED)

                if not approved:
                    error_message = f'Requested training plan is not approved by the node: {environ["NODE_ID"]}'
                    return self._send_round_reply(success=False, message=error_message)
                else:
                    logger.info(f'Training plan has been approved by the node {training_plan_["name"]}')

        # Retrieve model parameters and other auxiliary information.
        success, params_path, error_msg = self.download_file(
            self.params_url, f"my_model_{uuid.uuid4()}.mpk"
        )
        if success:
            success, error_msg = self.download_aggregator_args()
        if success:
            success, error_msg = self.download_optimizer_aux_var()
        if not success:
            return self._send_round_reply(success=False, message=error_msg)

    except Exception as e:
        # FIXME: this will trigger if model is not approved by node
        error_message = f"Cannot download training plan files: {repr(e)}"
        return self._send_round_reply(success=False, message=error_message)

    # import module, declare the training plan, load parameters
    try:
        sys.path.insert(0, environ['TMP_DIR'])
        module = importlib.import_module(import_module)
        train_class = getattr(module, self.training_plan_class)
        self.training_plan = train_class()
        sys.path.pop(0)
    except Exception as e:
        error_message = f"Cannot instantiate training plan object: {repr(e)}"
        return self._send_round_reply(success=False, message=error_message)

    try:
        self.training_plan.post_init(model_args=self.model_arguments,
                                     training_args=self.training_arguments,
                                     aggregator_args=self.aggregator_args)
    except Exception as e:
        error_message = f"Can't initialize training plan with the arguments: {repr(e)}"
        return self._send_round_reply(success=False, message=error_message)

    # load node state
    previous_state_id = self._node_state_manager.previous_state_id
    if previous_state_id is not None:
        try:
            self._load_round_state(previous_state_id)
        except Exception:
            # don't send error details
            return self._send_round_reply(success=False, message="Can't read previous node state.")

    # import model params into the training plan instance
    try:
        params = Serializer.load(params_path)["model_weights"]
        self.training_plan.set_model_params(params)
    except Exception as e:
        error_message = f"Cannot initialize model parameters: {repr(e)}"
        return self._send_round_reply(success=False, message=error_message)

    # Process Optimizer auxiliary variables, if any.
    error_message = self.process_optim_aux_var()
    if error_message:
        return self._send_round_reply(success=False, message=error_message)

    # Split training and validation data
    try:

        self._set_training_testing_data_loaders()
    except FedbiomedError as fe:
        error_message = f"Can not create validation/train data: {repr(fe)}"
        return self._send_round_reply(success=False, message=error_message)
    except Exception as e:
        error_message = f"Undetermined error while creating data for training/validation. Can not create " \
                        f"validation/train data: {repr(e)}"
        return self._send_round_reply(success=False, message=error_message)

    # Validation Before Training
    if self.testing_arguments.get('test_on_global_updates', False) is not False:

        # Last control to make sure validation data loader is set.
        if self.training_plan.testing_data_loader is not None:
            try:
                self.training_plan.testing_routine(metric=self.testing_arguments.get('test_metric', None),
                                                   metric_args=self.testing_arguments.get('test_metric_args', {}),
                                                   history_monitor=self.history_monitor,
                                                   before_train=True)
            except FedbiomedError as e:
                logger.error(f"{ErrorNumbers.FB314}: During the validation phase on global parameter updates; "
                             f"{repr(e)}")
            except Exception as e:
                logger.error(f"Undetermined error during the testing phase on global parameter updates: "
                             f"{repr(e)}")
        else:
            logger.error(f"{ErrorNumbers.FB314}: Can not execute validation routine due to missing testing dataset"
                         f"Please make sure that `test_ratio` has been set correctly")

    # If training is activated.
    if self.training:
        results = {}

        # Perform the training round.
        if self.training_plan.training_data_loader is not None:
            try:
                rtime_before = time.perf_counter()
                ptime_before = time.process_time()
                self.training_plan.training_routine(history_monitor=self.history_monitor,
                                                    node_args=self.node_args)
                rtime_after = time.perf_counter()
                ptime_after = time.process_time()
            except Exception as exc:
                error_message = f"Cannot train model in round: {repr(exc)}"
                return self._send_round_reply(success=False, message=error_message)

        # Collect Optimizer auxiliary variables, if any.
        try:
            results['optim_aux_var'] = self.collect_optim_aux_var()
        except (FedbiomedOptimizerError, FedbiomedRoundError) as exc:
            error_message = f"Cannot collect Optimizer auxiliary variables: {repr(exc)}"
            return self._send_round_reply(success=False, message=error_message)

        # Validation after training
        if self.testing_arguments.get('test_on_local_updates', False) is not False:

            if self.training_plan.testing_data_loader is not None:
                try:
                    self.training_plan.testing_routine(metric=self.testing_arguments.get('test_metric', None),
                                                       metric_args=self.testing_arguments.get('test_metric_args',
                                                                                              {}),
                                                       history_monitor=self.history_monitor,
                                                       before_train=False)
                except FedbiomedError as e:
                    logger.error(
                        f"{ErrorNumbers.FB314.value}: During the validation phase on local parameter updates; "
                        f"{repr(e)}")
                except Exception as e:
                    logger.error(f"Undetermined error during the validation phase on local parameter updates"
                                 f"{repr(e)}")
            else:
                logger.error(
                    f"{ErrorNumbers.FB314.value}: Can not execute validation routine due to missing testing "
                    f"dataset please make sure that test_ratio has been set correctly")

        # FIXME: this will fail if `self.training_plan.training_data_loader = None` (see issue )
        sample_size = len(self.training_plan.training_data_loader.dataset)

        results["encrypted"] = False
        model_weights = self.training_plan.after_training_params(flatten=self._use_secagg)
        if self._use_secagg:
            logger.info("Encrypting model parameters. This process can take some time depending on model size.")

            encrypt = functools.partial(
                self._secagg_crypter.encrypt,
                num_nodes=len(self._servkey["parties"]) - 1,  # -1: don't count researcher
                current_round=self._round,
                key=self._servkey["context"]["server_key"],
                biprime=self._biprime["context"]["biprime"],
                weight=sample_size,
                clipping_range=secagg_arguments.get('secagg_clipping_range')
            )
            model_weights = encrypt(params=model_weights)
            results["encrypted"] = True
            results["encryption_factor"] = encrypt(params=[secagg_arguments["secagg_random"]])
            logger.info("Encryption is completed!")

        try:
            # Nota: for simplicty sake, we are saving Node state even if test_ratio = 1
            self._save_round_state()
        except Exception:
            # don't send details to researcher
            return self._send_round_reply(success=False, message="Can't save new node state.")

        results['researcher_id'] = self.researcher_id
        results['job_id'] = self.job_id
        results['state_id'] = self._node_state_manager.state_id
        results['model_weights'] = model_weights
        results['node_id'] = environ['NODE_ID']
        results['optimizer_args'] = self.training_plan.optimizer_args()

        try:
            # TODO: add validation status to these results?
            # Dump the results to a msgpack file.
            filename = os.path.join(environ["TMP_DIR"], f"node_params_{uuid.uuid4()}.mpk")
            Serializer.dump(results, filename)
            # Upload that file to the remote repository.
            res = self.repository.upload_file(filename)
            logger.info("results uploaded successfully ")

        except Exception as exc:
            return self._send_round_reply(success=False, message=f"Cannot upload results: {exc}")


        # end : clean the namespace
        try:
            del self.training_plan
            del import_module
        except Exception as e:
            logger.debug(f'Exception raise while deleting training plan instance: {repr(e)}')

        # save collected statistics
        return self._send_round_reply(success=True,
                                      timing={'rtime_training': rtime_after - rtime_before,
                                              'ptime_training': ptime_after - ptime_before},
                                      params_url=res['file'],
                                      sample_size=sample_size)
    else:
        # Only for validation
        return self._send_round_reply(success=True)