TrainingPlans

The fedbiomed.common.training_plans module includes training plan classes that are used for federated training

Classes

BaseTrainingPlan

BaseTrainingPlan()

Base class for training plan

All concrete, framework- and/or model-specific training plans should inherit from this class, and implement: * the post_init method: to process model and training hyper-parameters * the training_routine method: to train the model for one round * the predict method: to compute predictions over a given batch * (opt.) the testing_step method: to override the evaluation behavior and compute a batch-wise (set of) metric(s)

Attributes:

Name Type Description
dataset_path Union[str, None]

The path that indicates where dataset has been stored

pre_processes Dict[str, PreProcessDict]

Preprocess functions that will be applied to the training data at the beginning of the training routine.

training_data_loader Union[DataLoader, NPDataLoader, None]

Data loader used in the training routine.

testing_data_loader Union[DataLoader, NPDataLoader, None]

Data loader used in the validation routine.

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def __init__(self) -> None:
    """Construct the base training plan."""
    self._dependencies: List[str] = []
    self.dataset_path: Union[str, None] = None
    self.pre_processes: Dict[str, PreProcessDict] = OrderedDict()
    self.training_data_loader: Union[DataLoader, NPDataLoader, None] = None
    self.testing_data_loader: Union[DataLoader, NPDataLoader, None] = None

    # Arguments provided by the researcher; they will be populated by post_init
    self._model_args: Dict[str, Any] = None
    self._aggregator_args: Dict[str, Any] = None
    self._optimizer_args: Dict[str, Any] = None
    self._loader_args: Dict[str, Any] = None
    self._training_args: Dict[str, Any] = None

    self._error_msg_import_model: str = f"{ErrorNumbers.FB605.value}: Training Plan's Model is not initialized.\n" +\
                                        "To %s a model, you should do it through `fedbiomed.researcher.federated_workflows.Experiment`'s interface" +\
                                        " and not directly from Training Plan"

Attributes

dataset_path instance-attribute
dataset_path = None
dependencies property
dependencies
pre_processes instance-attribute
pre_processes = OrderedDict()
testing_data_loader instance-attribute
testing_data_loader = None
training_data_loader instance-attribute
training_data_loader = None

Functions

add_preprocess
add_preprocess(method, process_type)

Register a pre-processing method to be executed on training data.

Parameters:

Name Type Description Default
method Callable

Pre-processing method to be run before training.

required
process_type ProcessTypes

Type of pre-processing that will be run. The expected signature of method and the arguments passed to it depend on this parameter.

required
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def add_preprocess(
        self,
        method: Callable,
        process_type: ProcessTypes
    ) -> None:
    """Register a pre-processing method to be executed on training data.

    Args:
        method: Pre-processing method to be run before training.
        process_type: Type of pre-processing that will be run.
            The expected signature of `method` and the arguments
            passed to it depend on this parameter.
    """
    if not callable(method):
        msg = (
            f"{ErrorNumbers.FB605.value}: error while adding "
            "preprocess, `method` should be callable."
        )
        logger.critical(msg)
        raise FedbiomedTrainingPlanError(msg)
    if not isinstance(process_type, ProcessTypes):
        msg = (
            f"{ErrorNumbers.FB605.value}: error while adding "
            "preprocess, `process_type` should be an instance "
            "of `fedbiomed.common.constants.ProcessTypes`."
        )
        logger.critical(msg)
        raise FedbiomedTrainingPlanError(msg)
    # NOTE: this may be revised into a list rather than OrderedDict
    self.pre_processes[method.__name__] = {
        'method': method,
        'process_type': process_type
    }
after_training_params
after_training_params(flatten=False)

Return the wrapped model's parameters for aggregation.

This method returns a dict containing parameters that need to be reported back and aggregated in a federated learning setting.

It may also implement post-processing steps to make these parameters suitable for sharing with the researcher after training - hence its being used over get_model_params at the end of training rounds.

Returns:

Type Description
Union[Dict[str, Any], List[float]]

The trained parameters to aggregate.

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def after_training_params(
    self,
    flatten: bool = False,
) -> Union[Dict[str, Any], List[float]]:
    """Return the wrapped model's parameters for aggregation.

    This method returns a dict containing parameters that need to be
    reported back and aggregated in a federated learning setting.

    It may also implement post-processing steps to make these parameters
    suitable for sharing with the researcher after training - hence its
    being used over `get_model_params` at the end of training rounds.

    Returns:
        The trained parameters to aggregate.
    """
    exclude_buffers = not self._training_args['share_persistent_buffers']
    if flatten:
        return self._model.flatten(exclude_buffers=exclude_buffers)
    return self.get_model_params(exclude_buffers=exclude_buffers)
export_model
export_model(filename)

Export the wrapped model to a dump file.

Parameters:

Name Type Description Default
filename str

path to the file where the model will be saved.

required

Raises:

Type Description
FedBiomedTrainingPlanError

raised if model has not be initialized through the

!!! info "Notes": This method is designed to save the model to a local dump file for easy re-use by the same user, possibly outside of Fed-BioMed. It is not designed to produce trustworthy data dumps and is not used to exchange models and their weights as part of the federated learning process.

To save the model parameters for sharing as part of the FL process,
use the `after_training_params` method (or `get_model_params` one
outside of a training context) and export results using
[`Serializer`][fedbiomed.common.serializer.Serializer].
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def export_model(self, filename: str) -> None:
    """Export the wrapped model to a dump file.

    Args:
        filename: path to the file where the model will be saved.

    Raises:
        FedBiomedTrainingPlanError: raised if model has not be initialized through the
        `post_init` method. If you need to export the model, you must do it through
        [`Experiment`][`fedbiomed.researcher.federated_workflows.Experiment`]'s interface.

    !!! info "Notes":
        This method is designed to save the model to a local dump
        file for easy re-use by the same user, possibly outside of
        Fed-BioMed. It is not designed to produce trustworthy data
        dumps and is not used to exchange models and their weights
        as part of the federated learning process.

        To save the model parameters for sharing as part of the FL process,
        use the `after_training_params` method (or `get_model_params` one
        outside of a training context) and export results using
        [`Serializer`][fedbiomed.common.serializer.Serializer].
    """
    if self._model is None:
        raise FedbiomedTrainingPlanError(self._error_msg_import_model % "export")
    self._model.export(filename)
get_model_params
get_model_params(only_trainable=False, exclude_buffers=True)

Return a copy of the model's trainable weights.

The type of data structure used to store weights depends on the actual framework of the wrapped model.

Parameters:

Name Type Description Default
only_trainable bool

Whether to ignore non-trainable model parameters from outputs (e.g. frozen neural network layers' parameters), or include all model parameters (the default).

False
exclude_buffers bool

Whether to ignore buffers (the default), or include them.

True

Returns:

Type Description
Dict[str, Any]

Model weights, as a dictionary mapping parameters' names to their value.

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def get_model_params(self,
                     only_trainable: bool = False,
                     exclude_buffers: bool = True) -> Dict[str, Any]:
    """Return a copy of the model's trainable weights.

    The type of data structure used to store weights depends on the actual
    framework of the wrapped model.

    Args:
        only_trainable: Whether to ignore non-trainable model parameters
            from outputs (e.g. frozen neural network layers' parameters),
            or include all model parameters (the default).
        exclude_buffers: Whether to ignore buffers (the default), or
            include them.

    Returns:
        Model weights, as a dictionary mapping parameters' names to their value.
    """
    return self._model.get_weights(only_trainable=only_trainable, exclude_buffers=exclude_buffers)
get_model_wrapper_class
get_model_wrapper_class()

Gets training plan's model wrapper class.

Returns:

Type Description
Optional[Model]

the wrapper class for the model, or None

Optional[Model]

if model is not instantiated.

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def get_model_wrapper_class(self) -> Optional[Model]:
    """Gets training plan's model wrapper class.

    Returns:
        the wrapper class for the model, or None
        if model is not instantiated.
    """
    return self._model
import_model
import_model(filename)

Import and replace the wrapped model from a dump file.

Parameters:

Name Type Description Default
filename str

path to the file where the model has been exported.

required

Raises:

Type Description
FedBiomedTrainingPlanError

raised if model has not be initialized through the

!!! info "Notes": This method is designed to load the model from a local dump file, that might not be in a trustworthy format. It should therefore only be used to re-load data exported locally and not received from someone else, including other FL peers.

To load model parameters shared as part of the FL process, use the
[`Serializer`][fedbiomed.common.serializer.Serializer] to read the
network-exchanged file, and the `set_model_params` method to assign
the loaded values into the wrapped model.
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def import_model(self, filename: str) -> None:
    """Import and replace the wrapped model from a dump file.

    Args:
        filename: path to the file where the model has been exported.

    Raises:
        FedBiomedTrainingPlanError: raised if model has not be initialized through the
        `post_init` method. If you need to export the model from the Training Plan, you
        must do it through [`Experiment`][`fedbiomed.researcher.federated_workflows.Experiment`]'s
        interface.

    !!! info "Notes":
        This method is designed to load the model from a local dump
        file, that might not be in a trustworthy format. It should
        therefore only be used to re-load data exported locally and
        not received from someone else, including other FL peers.

        To load model parameters shared as part of the FL process, use the
        [`Serializer`][fedbiomed.common.serializer.Serializer] to read the
        network-exchanged file, and the `set_model_params` method to assign
        the loaded values into the wrapped model.
    """
    if self._model is None:
        raise FedbiomedTrainingPlanError(self._error_msg_import_model % "import")
    try:
        self._model.reload(filename)
    except FedbiomedModelError as exc:
        msg = (
            f"{ErrorNumbers.FB304.value}: failed to import a model from "
            f"a dump file: {exc}"
        )
        logger.critical(msg)
        raise FedbiomedTrainingPlanError(msg) from exc
init_dependencies
init_dependencies()

Default method where dependencies are returned

Returns:

Type Description
List[str]

Empty list as default

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def init_dependencies(self) -> List[str]:
    """Default method where dependencies are returned

    Returns:
        Empty list as default
    """
    return []
init_optimizer abstractmethod
init_optimizer()

Method for declaring optimizer by default

Returns:

Type Description
Any

either framework specific optimizer (or None) or

Any

FedBiomed [Optimizers][fedbiomed.common.optimizers.Optimizer]

Source code in fedbiomed/common/training_plans/_base_training_plan.py
@abstractmethod
def init_optimizer(self) -> Any:
    """Method for declaring optimizer by default

    Returns:
        either framework specific optimizer (or None) or
        FedBiomed [`Optimizers`][`fedbiomed.common.optimizers.Optimizer`]
    """
loader_args
loader_args()

Retrieve loader arguments

Returns:

Type Description
Dict[str, Any]

Loader arguments

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def loader_args(self) -> Dict[str, Any]:
    """Retrieve loader arguments

    Returns:
        Loader arguments
    """
    return self._loader_args
model abstractmethod
model()

Gets model instance of the training plan

Source code in fedbiomed/common/training_plans/_base_training_plan.py
@abstractmethod
def model(self):
    """Gets model instance of the training plan"""
model_args
model_args()

Retrieve model arguments.

Returns:

Type Description
Dict[str, Any]

Model arguments

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def model_args(self) -> Dict[str, Any]:
    """Retrieve model arguments.

    Returns:
        Model arguments
    """
    return self._model_args
optimizer
optimizer()

Get the BaseOptimizer wrapped by this training plan.

Returns:

Type Description
Optional[BaseOptimizer]

BaseOptimizer wrapped by this training plan, or None if

Optional[BaseOptimizer]

it has not been initialized yet.

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def optimizer(self) -> Optional[BaseOptimizer]:
    """Get the BaseOptimizer wrapped by this training plan.

    Returns:
        BaseOptimizer wrapped by this training plan, or None if
        it has not been initialized yet.
    """
    # FUTURE: return `self._optimizer.optimizer` instead?
    # Currently, the legacy Scaffold implem. needs the BaseOptimizer,
    # but IMHO it really should remain a private backend component.
    return self._optimizer
optimizer_args
optimizer_args()

Retrieves optimizer arguments

Returns:

Type Description
Dict[str, Any]

Optimizer arguments

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def optimizer_args(self) -> Dict[str, Any]:
    """Retrieves optimizer arguments

    Returns:
        Optimizer arguments
    """
    return self._optimizer_args
post_init abstractmethod
post_init(model_args, training_args, aggregator_args=None, initialize_optimizer=True)

Process model, training and optimizer arguments.

Parameters:

Name Type Description Default
model_args Dict[str, Any]

Arguments defined to instantiate the wrapped model.

required
training_args Dict[str, Any]

Arguments that are used in training routines such as epoch, dry_run etc. Please see TrainingArgs

required
aggregator_args Optional[Dict[str, Any]]

Arguments managed by and shared with the researcher-side aggregator.

None
initialize_optimizer bool

whether to initialize the optimizer or not. Defaults to True.

True
Source code in fedbiomed/common/training_plans/_base_training_plan.py
@abstractmethod
def post_init(
        self,
        model_args: Dict[str, Any],
        training_args: Dict[str, Any],
        aggregator_args: Optional[Dict[str, Any]] = None,
        initialize_optimizer: bool = True
    ) -> None:
    """Process model, training and optimizer arguments.

    Args:
        model_args: Arguments defined to instantiate the wrapped model.
        training_args: Arguments that are used in training routines
            such as epoch, dry_run etc.
            Please see [`TrainingArgs`][fedbiomed.common.training_args.TrainingArgs]
        aggregator_args: Arguments managed by and shared with the
            researcher-side aggregator.
        initialize_optimizer: whether to initialize the optimizer or not. Defaults
            to True.
    """

    # Store various arguments provided by the researcher
    self._model_args = model_args
    self._aggregator_args = aggregator_args or {}
    self._optimizer_args = training_args.optimizer_arguments() or {}
    self._loader_args = training_args.loader_arguments() or {}
    self._training_args = training_args.pure_training_arguments()

    # Set random seed: the seed can be either None or an int provided by the researcher.
    # when it is None, both random.seed and np.random.seed rely on the OS to generate a random seed.
    rseed = training_args['random_seed']
    random.seed(rseed)
    np.random.seed(rseed)
save_code
save_code(filepath, from_code=None)

Saves the class source/codes of the training plan class that is created byuser.

Parameters:

Name Type Description Default
filepath str

path to the destination file

required

Raises:

Type Description
FedbiomedTrainingPlanError

raised when source of the model class cannot be assessed

FedbiomedTrainingPlanError

raised when model file cannot be created/opened/edited

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def save_code(self, filepath: str, from_code: Union[str, None] = None) -> None:
    """Saves the class source/codes of the training plan class that is created byuser.

    Args:
        filepath: path to the destination file

    Raises:
        FedbiomedTrainingPlanError: raised when source of the model class cannot be assessed
        FedbiomedTrainingPlanError: raised when model file cannot be created/opened/edited
    """
    if from_code is None:
        content = self.source()
    else:
        if not isinstance(from_code, str):
            raise FedbiomedTrainingPlanError(f"{ErrorNumbers.FB605}: Expected type str for `from_code`, "
                                             "got: {type(from_code)}")
        content = from_code

    try:
        # should we write it in binary (for the sake of space optimization)?
        with open(filepath, "w", encoding="utf-8") as file:
            file.write(content)
        logger.debug("Model file has been saved: " + filepath)
    except PermissionError as exc:
        _msg = ErrorNumbers.FB605.value + f" : Unable to read {filepath} due to unsatisfactory privileges" + \
               ", can't write the model content into it"
        logger.critical(_msg)
        raise FedbiomedTrainingPlanError(_msg) from exc
    except MemoryError as exc:
        _msg = ErrorNumbers.FB605.value + f" : Can't write model file on {filepath}: out of memory!"
        logger.critical(_msg)
        raise FedbiomedTrainingPlanError(_msg) from exc
    except OSError as exc:
        _msg = ErrorNumbers.FB605.value + f" : Can't open file {filepath} to write model content"
        logger.critical(_msg)
        raise FedbiomedTrainingPlanError(_msg) from exc
set_aggregator_args
set_aggregator_args(aggregator_args)
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def set_aggregator_args(self, aggregator_args: Dict[str, Any]):
    raise FedbiomedTrainingPlanError("method not implemented and needed")
set_data_loaders
set_data_loaders(train_data_loader, test_data_loader)

Sets data loaders

Parameters:

Name Type Description Default
train_data_loader Union[DataLoader, NPDataLoader, None]

Data loader for training routine/loop

required
test_data_loader Union[DataLoader, NPDataLoader, None]

Data loader for validation routine

required
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def set_data_loaders(
        self,
        train_data_loader: Union[DataLoader, NPDataLoader, None],
        test_data_loader: Union[DataLoader, NPDataLoader, None]
    ) -> None:
    """Sets data loaders

    Args:
        train_data_loader: Data loader for training routine/loop
        test_data_loader: Data loader for validation routine
    """
    self.training_data_loader = train_data_loader
    self.testing_data_loader = test_data_loader
set_dataset_path
set_dataset_path(dataset_path)

Dataset path setter for TrainingPlan

Parameters:

Name Type Description Default
dataset_path str

The path where data is saved on the node. This method is called by the node that executes the training.

required
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def set_dataset_path(self, dataset_path: str) -> None:
    """Dataset path setter for TrainingPlan

    Args:
        dataset_path: The path where data is saved on the node.
            This method is called by the node that executes the training.
    """
    self.dataset_path = dataset_path
    logger.debug(f"Dataset path has been set as {self.dataset_path}")
set_model_params
set_model_params(params)

Assign new values to the model's trainable parameters.

The type of data structure used to store weights depends on the actual framework of the wrapped model.

Parameters:

Name Type Description Default
params Dict[str, Any]

model weights, as a dictionary mapping parameters' names to their value.

required
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def set_model_params(self, params: Dict[str, Any]) -> None:
    """Assign new values to the model's trainable parameters.

    The type of data structure used to store weights depends on the actual
    framework of the wrapped model.

    Args:
        params: model weights, as a dictionary mapping parameters' names
            to their value.
    """
    self._model.set_weights(params)
set_optimizer
set_optimizer(optimizer)
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def set_optimizer(self, optimizer: BaseOptimizer):
    self._optimizer = optimizer
source
source()
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def source(self) -> str:

    try:
        class_source = get_class_source(self.__class__)
    except FedbiomedError as exc:
        msg = f"{ErrorNumbers.FB605.value}: error while getting source of the model class: {exc}"
        logger.critical(msg)
        raise FedbiomedTrainingPlanError(msg) from exc

    # Preparing content of the module
    content = "\n".join(self._dependencies)
    content += "\n"
    content += class_source

    return content
testing_routine
testing_routine(metric, metric_args, history_monitor, before_train)

Evaluation routine, to be called once per round.

Note

If the training plan implements a testing_step method (the signature of which is func(data, target) -> metrics) then it will be used rather than the input metric.

Parameters:

Name Type Description Default
metric Optional[MetricTypes]

The metric used for validation. If None, use MetricTypes.ACCURACY.

required
metric_args Dict[str, Any]

dicitonary containing additinal arguments for setting up metric, that maps ad that will be passed to the metric function as positinal arguments.

required
history_monitor Optional[HistoryMonitor]

HistoryMonitor instance, used to record computed metrics and communicate them to the researcher (server).

required
before_train bool

Whether the evaluation is being performed before local training occurs, of afterwards. This is merely reported back through history_monitor.

required
Source code in fedbiomed/common/training_plans/_base_training_plan.py
def testing_routine(
        self,
        metric: Optional[MetricTypes],
        metric_args: Dict[str, Any],
        history_monitor: Optional['HistoryMonitor'],
        before_train: bool,
    ) -> None:
    """Evaluation routine, to be called once per round.

    !!! info "Note"
        If the training plan implements a `testing_step` method
        (the signature of which is func(data, target) -> metrics)
        then it will be used rather than the input metric.

    Args:
        metric: The metric used for validation.
            If None, use MetricTypes.ACCURACY.
        metric_args: dicitonary containing additinal arguments for setting up metric,
            that maps <argument_name; argument_value> ad that will be passed to the
            metric function as positinal arguments.
        history_monitor: HistoryMonitor instance,
            used to record computed metrics and communicate them to
            the researcher (server).
        before_train: Whether the evaluation is being performed
            before local training occurs, of afterwards. This is merely
            reported back through `history_monitor`.
    """
    # TODO: Add preprocess option for testing_data_loader.
    if self.testing_data_loader is None:
        msg = f"{ErrorNumbers.FB605.value}: no validation dataset was set."
        logger.critical(msg)
        raise FedbiomedTrainingPlanError(msg)

    n_samples = len(self.testing_data_loader.dataset)
    n_batches = max(len(self.testing_data_loader) , 1)

    # Set up a batch-wise metrics-computation function.
    # Either use an optionally-implemented custom training routine.
    if hasattr(self, "testing_step"):
        evaluate = getattr(self, "testing_step")
        metric_name = "Custom"
    # Or use the provided `metric` (or its default value).
    else:
        if metric is None:
            metric = MetricTypes.ACCURACY
        metric_controller = Metrics()
        def evaluate(data, target):
            nonlocal metric, metric_args, metric_controller
            output = self._model.predict(data)
            if isinstance(target, torch.Tensor):
                target = target.numpy()

            return metric_controller.evaluate(
                target, output, metric=metric, **metric_args
            )
        metric_name = metric.name
    # Iterate over the validation dataset and run the defined routine.
    num_samples_observed_till_now: int = 0


    for idx, (data, target) in enumerate(self.testing_data_loader, 1):

        num_samples_observed_till_now += self._infer_batch_size(data)
        # Run the evaluation step; catch and raise exceptions.
        try:
            m_value = evaluate(data, target)
        except Exception as exc:
            msg = (
                f"{ErrorNumbers.FB605.value}: An error occurred "
                f"while computing the {metric_name} metric: {exc}"
            )
            logger.critical(msg)
            raise FedbiomedTrainingPlanError(msg) from exc
        # Log the computed value.
        # Reporting

        if idx % self.training_args()['log_interval'] == 0 or idx == 1 or idx == n_batches:
            logger.debug(
                f"Validation: Batch {idx}/{n_batches} "
                f"| Samples {num_samples_observed_till_now}/{n_samples} "
                f"| Metric[{metric_name}]: {m_value}"
            )
            # Further parse, and report it (provided a monitor is set).
            if history_monitor is not None:
                m_dict = self._create_metric_result_dict(m_value, metric_name)
                history_monitor.add_scalar(
                    metric=m_dict,
                    iteration=idx,
                    epoch=None,
                    test=True,
                    test_on_local_updates=(not before_train),
                    test_on_global_updates=before_train,
                    total_samples=n_samples,
                    batch_samples=num_samples_observed_till_now,
                    num_batches=n_batches
                )
training_args
training_args()

Retrieve training arguments

Returns:

Type Description
Dict[str, Any]

Training arguments

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def training_args(self) -> Dict[str, Any]:
    """Retrieve training arguments

    Returns:
        Training arguments
    """
    return self._training_args
training_data
training_data()

All subclasses must provide a training_data routine the purpose of this actual code is to detect that it has been provided

Raises:

Type Description
FedbiomedTrainingPlanError

if called and not inherited

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def training_data(self):
    """All subclasses must provide a training_data routine the purpose of this actual code is to detect
    that it has been provided

    Raises:
        FedbiomedTrainingPlanError: if called and not inherited
    """
    msg = f"{ErrorNumbers.FB303.value}: training_data must be implemented"
    logger.critical(msg)
    raise FedbiomedTrainingPlanError(msg)
training_routine abstractmethod
training_routine(history_monitor=None, node_args=None)

Training routine, to be called once per round.

Parameters:

Name Type Description Default
history_monitor Optional[HistoryMonitor]

optional HistoryMonitor instance, recording training metadata.

None
node_args Optional[Dict[str, Any]]

Command line arguments for node. These arguments can specify GPU use; however, this is not supported for scikit-learn models and thus will be ignored.

None
Source code in fedbiomed/common/training_plans/_base_training_plan.py
@abstractmethod
def training_routine(
        self,
        history_monitor: Optional['HistoryMonitor'] = None,
        node_args: Optional[Dict[str, Any]] = None
    ) -> None:
    """Training routine, to be called once per round.

    Args:
        history_monitor: optional HistoryMonitor
            instance, recording training metadata.
        node_args: Command line arguments for node.
            These arguments can specify GPU use; however, this is not
            supported for scikit-learn models and thus will be ignored.
    """
    return None

FedPerceptron

FedPerceptron()

Bases: FedSGDClassifier

Fed-BioMed training plan for scikit-learn Perceptron models.

This class inherits from FedSGDClassifier, and forces the wrapped scikit-learn SGDClassifier model to use a "perceptron" loss, that makes it equivalent to an actual scikit-learn Perceptron model.

Source code in fedbiomed/common/training_plans/_sklearn_models.py
def __init__(self) -> None:
    """Class constructor."""
    super().__init__()

Functions

post_init
post_init(model_args, training_args, aggregator_args=None, **kwargs)
Source code in fedbiomed/common/training_plans/_sklearn_models.py
def post_init(
        self,
        model_args: Dict[str, Any],
        training_args: Dict[str, Any],
        aggregator_args: Optional[Dict[str, Any]] = None,
        **kwargs
    ) -> None:
    # get default values of Perceptron model (different from SGDClassifier model default values)
    perceptron_default_values = Perceptron().get_params()
    sgd_classifier_default_values = SGDClassifier().get_params()
    # make sure loss used is perceptron loss - can not be changed by user
    model_args["loss"] = "perceptron"
    super().post_init(model_args, training_args)
    self._model.set_params(loss="perceptron")

    # collect default values of Perceptron and set it to the model FedPerceptron
    model_hyperparameters = self._model.get_params()
    for hyperparameter_name, val in perceptron_default_values.items():
        if model_hyperparameters[hyperparameter_name] == sgd_classifier_default_values[hyperparameter_name]:
            # this means default parameter of SGDClassifier has not been changed by user
            self._model.set_params(**{hyperparameter_name: val})

FedSGDClassifier

FedSGDClassifier()

Bases: SKLearnTrainingPlanPartialFit

Fed-BioMed training plan for scikit-learn SGDClassifier models.

Source code in fedbiomed/common/training_plans/_sklearn_models.py
def __init__(self) -> None:
    """Initialize the sklearn SGDClassifier training plan."""
    super().__init__()

FedSGDRegressor

FedSGDRegressor()

Bases: SKLearnTrainingPlanPartialFit

Fed-BioMed training plan for scikit-learn SGDRegressor models.

Source code in fedbiomed/common/training_plans/_sklearn_models.py
def __init__(self) -> None:
    """Initialize the sklearn SGDRegressor training plan."""
    super().__init__()

SKLearnTrainingPlan

SKLearnTrainingPlan()

Bases: BaseTrainingPlan

Base class for Fed-BioMed wrappers of sklearn classes.

Classes that inherit from this abstract class must: - Specify a _model_cls class attribute that defines the type of scikit-learn model being wrapped for training. - Implement a set_init_params method that: - sets and assigns the model's initial trainable weights attributes. - populates the _param_list attribute with names of these attributes. - Implement a _training_routine method that performs a training round based on self.train_data_loader (which is a NPDataLoader).

Attributes:

Name Type Description
dataset_path Optional[str]

The path that indicates where dataset has been stored

pre_processes Dict[str, PreProcessDict]

Preprocess functions that will be applied to the training data at the beginning of the training routine.

training_data_loader Union[DataLoader, NPDataLoader, None]

Data loader used in the training routine.

testing_data_loader Union[DataLoader, NPDataLoader, None]

Data loader used in the validation routine.

Notes

The trained model may be exported via the export_model method, resulting in a dump file that may be reloded using joblib.load outside of Fed-BioMed.

Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def __init__(self) -> None:
    """Initialize the SKLearnTrainingPlan."""
    super().__init__()
    self._model: Union[SkLearnModel, None] = None
    self._training_args = {}  # type: Dict[str, Any]
    self.__type = TrainingPlans.SkLearnTrainingPlan
    self._batch_maxnum = 0
    self.dataset_path: Optional[str] = None
    self._optimizer: Optional[BaseOptimizer] = None
    self._add_dependency([
        "import inspect",
        "import numpy as np",
        "import pandas as pd",
        "from fedbiomed.common.training_plans import SKLearnTrainingPlan",
        "from fedbiomed.common.data import DataManager",
    ])
    self._add_dependency(list(self._model_dep))

    # Add dependencies
    self._configure_dependencies()

Attributes

dataset_path instance-attribute
dataset_path = None

Functions

init_optimizer
init_optimizer()

Creates and configures optimizer. By default, returns None (meaning native inner scikit learn optimization SGD based will be used).

In the case a Declearn Optimizer is used, this method should be overridden in the Training Plan and return a Fedbiomed Optimizer

Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def init_optimizer(self) -> Optional[FedOptimizer]:
    """Creates and configures optimizer. By default, returns None (meaning native inner scikit
    learn optimization SGD based will be used).

    In the case a Declearn Optimizer is used, this method should be overridden in the Training Plan and return
    a Fedbiomed [`Optimizer`][fedbiomed.common.optimizers.optimizer.Optimizer]"""
    pass
model
model()

Retrieve the wrapped scikit-learn model instance.

Returns:

Type Description
Optional[BaseEstimator]

Scikit-learn model instance

Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def model(self) -> Optional[BaseEstimator]:
    """Retrieve the wrapped scikit-learn model instance.

    Returns:
        Scikit-learn model instance
    """
    if self._model is not None:
        return self._model.model
    else:
        return self._model
post_init
post_init(model_args, training_args, aggregator_args=None, initialize_optimizer=True)

Process model, training and optimizer arguments.

Parameters:

Name Type Description Default
model_args Dict[str, Any]

Arguments defined to instantiate the wrapped model.

required
training_args TrainingArgs

Arguments that are used in training routines such as epoch, dry_run etc. Please see TrainingArgs

required
aggregator_args Optional[Dict[str, Any]]

Arguments managed by and shared with the researcher-side aggregator.

None
initialize_optimizer bool

Unused.

True
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def post_init(
        self,
        model_args: Dict[str, Any],
        training_args: TrainingArgs,
        aggregator_args: Optional[Dict[str, Any]] = None,
        initialize_optimizer: bool = True
) -> None:
    """Process model, training and optimizer arguments.

    Args:
        model_args: Arguments defined to instantiate the wrapped model.
        training_args: Arguments that are used in training routines
            such as epoch, dry_run etc.
            Please see [`TrainingArgs`][fedbiomed.common.training_args.TrainingArgs]
        aggregator_args: Arguments managed by and shared with the
            researcher-side aggregator.
        initialize_optimizer: Unused.
    """
    model_args.setdefault("verbose", 1)
    super().post_init(model_args, training_args, aggregator_args)
    self._model = SkLearnModel(self._model_cls)
    self._batch_maxnum = self._training_args.get('batch_maxnum', self._batch_maxnum)
    self._warn_about_training_args()

    # configure optimizer (if provided in the TrainingPlan)
    self._configure_optimizer()

    # FIXME: should we do that in `_configure_optimizer`
    # from now on, `self._optimizer`` is not None
    # Override default model parameters based on `self._model_args`.
    params = {
        key: model_args.get(key, val)
        for key, val in self._model.get_params().items()
    }
    self._model.set_params(**params)
    # Set up additional parameters (normally created by `self._model.fit`).
    self._model.set_init_params(model_args)
set_data_loaders
set_data_loaders(train_data_loader, test_data_loader)

Sets data loaders

Parameters:

Name Type Description Default
train_data_loader Union[DataLoader, NPDataLoader, None]

Data loader for training routine/loop

required
test_data_loader Union[DataLoader, NPDataLoader, None]

Data loader for validation routine

required
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def set_data_loaders(
        self,
        train_data_loader: Union[DataLoader, NPDataLoader, None],
        test_data_loader: Union[DataLoader, NPDataLoader, None]
) -> None:
    """Sets data loaders

    Args:
        train_data_loader: Data loader for training routine/loop
        test_data_loader: Data loader for validation routine
    """
    args = (train_data_loader, test_data_loader)
    if not all(isinstance(data, NPDataLoader) for data in args):
        msg = (
            f"{ErrorNumbers.FB310.value}: SKLearnTrainingPlan expects "
            "NPDataLoader instances as training and testing data "
            f"loaders, but received {type(train_data_loader)} "
            f"and {type(test_data_loader)} respectively."
        )
        logger.error(msg)
        raise FedbiomedTrainingPlanError(msg)
    self.training_data_loader = train_data_loader
    self.testing_data_loader = test_data_loader
testing_routine
testing_routine(metric, metric_args, history_monitor, before_train)

Evaluation routine, to be called once per round.

Note

If the training plan implements a testing_step method (the signature of which is func(data, target) -> metrics) then it will be used rather than the input metric.

Parameters:

Name Type Description Default
metric Optional[MetricTypes]

The metric used for validation. If None, use MetricTypes.ACCURACY.

required
history_monitor Optional[HistoryMonitor]

HistoryMonitor instance, used to record computed metrics and communicate them to the researcher (server).

required
before_train bool

Whether the evaluation is being performed before local training occurs, of afterwards. This is merely reported back through history_monitor.

required
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def testing_routine(
        self,
        metric: Optional[MetricTypes],
        metric_args: Dict[str, Any],
        history_monitor: Optional['HistoryMonitor'],
        before_train: bool
) -> None:
    """Evaluation routine, to be called once per round.

    !!! info "Note"
        If the training plan implements a `testing_step` method
        (the signature of which is func(data, target) -> metrics)
        then it will be used rather than the input metric.

    Args:
        metric: The metric used for validation.
            If None, use MetricTypes.ACCURACY.
        history_monitor: HistoryMonitor instance,
            used to record computed metrics and communicate them to
            the researcher (server).
        before_train: Whether the evaluation is being performed
            before local training occurs, of afterwards. This is merely
            reported back through `history_monitor`.
    """
    # Check that the testing data loader is of proper type.
    if not isinstance(self.testing_data_loader, NPDataLoader):
        msg = (
            f"{ErrorNumbers.FB310.value}: SKLearnTrainingPlan cannot be "
            "evaluated without a NPDataLoader as `testing_data_loader`."
        )
        logger.error(msg)
        raise FedbiomedTrainingPlanError(msg)
    # If required, make up for the lack of specifications regarding target
    # classification labels.
    if self._model.is_classification and not hasattr(self.model(), 'classes_'):
        classes = self._classes_from_concatenated_train_test()
        setattr(self.model(), 'classes_', classes)
    # If required, select the default metric (accuracy or mse).
    if metric is None:
        if self._model.is_classification:
            metric = MetricTypes.ACCURACY
        else:
            metric = MetricTypes.MEAN_SQUARE_ERROR
    # Delegate the actual evalation routine to the parent class.
    super().testing_routine(
        metric, metric_args, history_monitor, before_train
    )
training_routine
training_routine(history_monitor=None, node_args=None)

Training routine, to be called once per round.

Parameters:

Name Type Description Default
history_monitor Optional[HistoryMonitor]

optional HistoryMonitor instance, recording training metadata. Defaults to None.

None
node_args Optional[Dict[str, Any]]

command line arguments for node. These arguments can specify GPU use; however, this is not supported for scikit-learn models and thus will be ignored.

None
Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def training_routine(
        self,
        history_monitor: Optional['HistoryMonitor'] = None,
        node_args: Optional[Dict[str, Any]] = None
) -> None:
    """Training routine, to be called once per round.

    Args:
        history_monitor: optional HistoryMonitor
            instance, recording training metadata. Defaults to None.
        node_args: command line arguments for node.
            These arguments can specify GPU use; however, this is not
            supported for scikit-learn models and thus will be ignored.
    """
    if self._optimizer is None:
        raise FedbiomedTrainingPlanError('Optimizer is None, please run `post_init` beforehand')

    # Run preprocesses
    self._preprocess()

    if not isinstance(self.training_data_loader, NPDataLoader):
        msg = (
            f"{ErrorNumbers.FB310.value}: SKLearnTrainingPlan cannot "
            "be trained without a NPDataLoader as `training_data_loader`."
        )
        logger.critical(msg)
        raise FedbiomedTrainingPlanError(msg)

    # Warn if GPU-use was expected (as it is not supported).
    if node_args is not None and node_args.get('gpu_only', False):

        self._optimizer.send_to_device(False)  # disable GPU, avoid `declearn` triggering warning messages
        logger.warning(
            'Node would like to force GPU usage, but sklearn training '
            'plan does not support it. Training on CPU.'
        )
    # Run the model-specific training routine.
    try:
        return self._training_routine(history_monitor)
    except Exception as exc:
        msg = (
            f"{ErrorNumbers.FB605.value}: error while fitting "
            f"the model: {exc}"
        )
        logger.critical(msg)
        raise FedbiomedTrainingPlanError(msg)
type
type()

Getter for training plan type

Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def type(self) -> TrainingPlans:
    """Getter for training plan type """
    return self.__type

TorchTrainingPlan

TorchTrainingPlan()

Bases: BaseTrainingPlan

Implements TrainingPlan for torch NN framework

An abstraction over pytorch module to run pytorch models and scripts on node side. Researcher model (resp. params) will be:

  1. saved on a '.py' (resp. '.mpk') files,
  2. uploaded on a HTTP server (network layer),
  3. then Downloaded from the HTTP server on node side,
  4. finally, read and executed on node side.

Researcher must define/override: - a training_data() function - a training_step() function

Researcher may have to add extra dependencies/python imports, by using init_dependencies method.

Attributes:

Name Type Description
dataset_path Union[str, None]

The path that indicates where dataset has been stored

pre_processes Dict[str, PreProcessDict]

Preprocess functions that will be applied to the training data at the beginning of the training routine.

training_data_loader Union[DataLoader, NPDataLoader, None]

Data loader used in the training routine.

testing_data_loader Union[DataLoader, NPDataLoader, None]

Data loader used in the validation routine.

correction_state OrderedDict

an OrderedDict of {'parameter name': torch.Tensor} where the keys correspond to the names of the model parameters contained in self._model.named_parameters(), and the values correspond to the correction to be applied to that parameter.

Notes

The trained model may be exported via the export_model method, resulting in a dump file that may be reloded using torch.save outside of Fed-BioMed.

Source code in fedbiomed/common/training_plans/_torchnn.py
def __init__(self):
    """ Construct training plan """

    super().__init__()

    self.__type = TrainingPlans.TorchTrainingPlan

    # Differential privacy support
    self._dp_controller: Optional[DPController] = None
    self._optimizer: Union[BaseOptimizer, None] = None
    self._model: Union[TorchModel, None] = None

    self._use_gpu: bool = False
    self._share_persistent_buffers = None

    self._batch_maxnum: int = 100
    self._fedprox_mu: Optional[float] = None
    self._log_interval: int = 10
    self._epochs: int = 1
    self._dry_run = False
    self._num_updates: Optional[int] = None

    self.correction_state: OrderedDict = OrderedDict()
    self.aggregator_name: str = None

    # TODO : add random seed init
    # self.random_seed_params = None
    # self.random_seed_shuffling_data = None

    # device to use: cpu/gpu
    # - all operations except training only use cpu
    # - researcher doesn't request to use gpu by default
    self._device_init: str = "cpu"
    self._device = self._device_init

    # list dependencies of the model
    self._add_dependency(["import torch",
                         "import torch.nn as nn",
                         "import torch.nn.functional as F",
                         "from fedbiomed.common.training_plans import TorchTrainingPlan",
                         "from fedbiomed.common.data import DataManager",
                         "from fedbiomed.common.constants import ProcessTypes",
                         "from torch.utils.data import DataLoader",
                         "from torchvision import datasets, transforms"
                         ])

    # Aggregated model parameters
    #self._init_params: List[torch.Tensor] = None

    # Add dependencies
    self._configure_dependencies()

Attributes

aggregator_name instance-attribute
aggregator_name = None
correction_state instance-attribute
correction_state = OrderedDict()

Functions

after_training_params
after_training_params(flatten=False)

Return the wrapped model's parameters for aggregation.

This method returns a dict containing parameters that need to be reported back and aggregated in a federated learning setting.

If the postprocess method exists (i.e. has been defined by end-users) it is called in the context of this method. DP-required adjustments are also set to happen as part of this method.

If the researcher specified share_persistent_buffers: False in the training arguments, then we return only the output of Model.get_weights, which considers only the trainable parameters. Otherwise, the default behaviour is to return the complete state_dict.

Returns:

Type Description
Dict[str, Tensor]

The trained parameters to aggregate.

Source code in fedbiomed/common/training_plans/_torchnn.py
def after_training_params(self, flatten: bool = False) -> Dict[str, torch.Tensor]:
    """Return the wrapped model's parameters for aggregation.

    This method returns a dict containing parameters that need to be
    reported back and aggregated in a federated learning setting.

    If the `postprocess` method exists (i.e. has been defined by end-users)
    it is called in the context of this method. DP-required adjustments are
    also set to happen as part of this method.

    If the researcher specified `share_persistent_buffers: False` in the
    training arguments, then we return only the output of
    [Model.get_weights][fedbiomed.common.models.TorchModel.get_weights],
    which considers only the trainable parameters.
    Otherwise, the default behaviour is to return the complete `state_dict`.

    Returns:
        The trained parameters to aggregate.
    """
    # Either include non-parameter buffers to the outputs or not.
    # Note: this is mostly about sharing statistics from BatchNorm layers.
    params = super().after_training_params()
    # Check whether postprocess method exists, and use it.
    if hasattr(self, 'postprocess'):
        logger.debug("running model.postprocess() method")
        try:
            params = self.postprocess(self._model.model.state_dict())  # Post process
        except Exception as e:
            raise FedbiomedTrainingPlanError(f"{ErrorNumbers.FB605.value}: Error while running post-process "
                                             f"{e}") from e

    # Run (optional) DP controller adjustments as well.
    params = self._dp_controller.after_training(params)
    if flatten:
        params = self._model.flatten(exclude_buffers=not self._share_persistent_buffers)
    return params
init_model abstractmethod
init_model()

Abstract method where model should be defined.

Source code in fedbiomed/common/training_plans/_torchnn.py
@abstractmethod
def init_model(self):
    """Abstract method where model should be defined."""
init_optimizer
init_optimizer()

Abstract method for declaring optimizer by default

Source code in fedbiomed/common/training_plans/_torchnn.py
def init_optimizer(self) -> Union[FedOptimizer, torch.optim.Optimizer]:
    """Abstract method for declaring optimizer by default """
    try:
        self._optimizer = torch.optim.Adam(self._model.model.parameters(), **self._optimizer_args)
    except AttributeError as e:
        raise FedbiomedTrainingPlanError(f"{ErrorNumbers.FB605.value}: Invalid argument for default "
                                         f"optimizer Adam. Error: {e}") from e

    return self._optimizer
initial_parameters
initial_parameters()

Returns initial parameters without DP or training applied

Returns:

Type Description
Dict

State dictionary of torch Module

Source code in fedbiomed/common/training_plans/_torchnn.py
def initial_parameters(self) -> Dict:
    """Returns initial parameters without DP or training applied

    Returns:
        State dictionary of torch Module
    """
    return self._model.init_params
model
model()
Source code in fedbiomed/common/training_plans/_torchnn.py
def model(self) -> Optional[torch.nn.Module]:
    if self._model is None:
        return None
    return self._model.model
optimizer_args
optimizer_args()

Retrieves optimizer arguments

Returns:

Type Description
Dict[str, Any]

Optimizer arguments

Source code in fedbiomed/common/training_plans/_torchnn.py
def optimizer_args(self) -> Dict[str, Any]:
    """Retrieves optimizer arguments

    Returns:
        Optimizer arguments
    """
    self.update_optimizer_args()  # update `optimizer_args` (eg after training)
    return super().optimizer_args()
post_init
post_init(model_args, training_args, aggregator_args=None, initialize_optimizer=True)

Process model, training and optimizer arguments.

Parameters:

Name Type Description Default
model_args Dict[str, Any]

Arguments defined to instantiate the wrapped model.

required
training_args TrainingArgs

Arguments that are used in training routines such as epoch, dry_run etc. Please see TrainingArgs

required
aggregator_args Optional[Dict[str, Any]]

Arguments managed by and shared with the researcher-side aggregator.

None
initialize_optimizer bool

If True, configures optimizer. It has to be True for node side configuration to prepare optimizer for the training.

True

Raises: FedbiomedTrainingPlanError: If the provided arguments do not match expectations, or if the optimizer, model and dependencies configuration goes wrong.

Source code in fedbiomed/common/training_plans/_torchnn.py
def post_init(
        self,
        model_args: Dict[str, Any],
        training_args: TrainingArgs,
        aggregator_args: Optional[Dict[str, Any]] = None,
        initialize_optimizer: bool = True
) -> None:
    """Process model, training and optimizer arguments.

    Args:
        model_args: Arguments defined to instantiate the wrapped model.
        training_args: Arguments that are used in training routines
            such as epoch, dry_run etc.
            Please see [`TrainingArgs`][fedbiomed.common.training_args.TrainingArgs]
        aggregator_args: Arguments managed by and shared with the
            researcher-side aggregator.
        initialize_optimizer: If True, configures optimizer. It has to be True for node
            side configuration to prepare optimizer for the training.
    Raises:
        FedbiomedTrainingPlanError: If the provided arguments do not
            match expectations, or if the optimizer, model and dependencies
            configuration goes wrong.
    """
    super().post_init(model_args, training_args, aggregator_args)
    # Assign scalar attributes.
    self._use_gpu = self._training_args.get('use_gpu')
    self._batch_maxnum = self._training_args.get('batch_maxnum')
    self._log_interval = self._training_args.get('log_interval')
    self._epochs = self._training_args.get('epochs')
    self._num_updates = self._training_args.get('num_updates', 1)
    self._dry_run = self._training_args.get('dry_run')
    self._share_persistent_buffers = training_args.get('share_persistent_buffers', True)
    # Set random seed (Pytorch-specific)
    rseed = training_args['random_seed']
    rseed = rseed if rseed is not None else torch.seed()
    torch.manual_seed(rseed)
    # Optionally set up differential privacy.
    self._dp_controller = DPController(training_args.dp_arguments() or None)
    # Configure aggregator-related arguments
    # TODO: put fedprox mu inside strategy_args
    self._fedprox_mu = self._training_args.get('fedprox_mu')
    self.set_aggregator_args(aggregator_args or {})
    # Configure the model and optimizer.

    self._configure_model_and_optimizer(initialize_optimizer)
send_to_device
send_to_device(to_send, device)

Send inputs to correct device for training.

Recursively traverses lists, tuples and dicts until it meets a torch Tensor, then sends the Tensor to the specified device.

Parameters:

Name Type Description Default
to_send Union[Tensor, list, tuple, dict]

the data to be sent to the device.

required
device device

the device to send the data to.

required

Raises:

Type Description
FedbiomedTrainingPlanError

when to_send is not the correct type

Source code in fedbiomed/common/training_plans/_torchnn.py
def send_to_device(self,
                   to_send: Union[torch.Tensor, list, tuple, dict],
                   device: torch.device
                   ):
    """Send inputs to correct device for training.

    Recursively traverses lists, tuples and dicts until it meets a torch Tensor, then sends the Tensor
    to the specified device.

    Args:
        to_send: the data to be sent to the device.
        device: the device to send the data to.

    Raises:
       FedbiomedTrainingPlanError: when to_send is not the correct type
    """
    if isinstance(to_send, torch.Tensor):
        return to_send.to(device)
    elif isinstance(to_send, dict):
        return {key: self.send_to_device(val, device) for key, val in to_send.items()}
    elif isinstance(to_send, tuple):
        return tuple(self.send_to_device(d, device) for d in to_send)
    elif isinstance(to_send, list):
        return [self.send_to_device(d, device) for d in to_send]
    else:
        raise FedbiomedTrainingPlanError(f'{ErrorNumbers.FB310.value} cannot send data to device. '
                                         f'Data must be a torch Tensor or a list, tuple or dict '
                                         f'ultimately containing Tensors.')
set_aggregator_args
set_aggregator_args(aggregator_args)

Handles and loads aggregators arguments to received from the researcher

Parameters:

Name Type Description Default
aggregator_args Dict[str, Any]

dictionary mapping aggregator argument name with its value (eg 'aggregator_correction' with correction states)

required
Source code in fedbiomed/common/training_plans/_torchnn.py
def set_aggregator_args(self, aggregator_args: Dict[str, Any]):
    """Handles and loads aggregators arguments to received from the researcher

    Args:
        aggregator_args (Dict[str, Any]): dictionary mapping aggregator argument
            name with its value (eg 'aggregator_correction' with correction states)
    """

    self.aggregator_name = aggregator_args.get('aggregator_name', self.aggregator_name)
    # FIXME: this is too specific to Scaffold. Should be redesigned, or handled
    # by an aggregator handler that contains all keys for all strategies
    # implemented in fedbiomed
    # here we ae loading all args that have been sent from file exchange system
    for arg_name, aggregator_arg in aggregator_args.items():
        if arg_name == 'aggregator_correction':
            if not isinstance(aggregator_arg, dict):
                raise FedbiomedTrainingPlanError(
                    f"{ErrorNumbers.FB309.value}: TorchTrainingPlan received "
                    "invalid 'aggregator_correction' aggregator args."
                )
            self.correction_state = aggregator_arg
testing_routine
testing_routine(metric, metric_args, history_monitor, before_train)

Evaluation routine, to be called once per round.

Note

If the training plan implements a testing_step method (the signature of which is func(data, target) -> metrics) then it will be used rather than the input metric.

Parameters:

Name Type Description Default
metric Optional[MetricTypes]

The metric used for validation. If None, use MetricTypes.ACCURACY.

required
history_monitor Optional[HistoryMonitor]

HistoryMonitor instance, used to record computed metrics and communicate them to the researcher (server).

required
before_train bool

Whether the evaluation is being performed before local training occurs, of afterwards. This is merely reported back through history_monitor.

required
Source code in fedbiomed/common/training_plans/_torchnn.py
def testing_routine(
        self,
        metric: Optional[MetricTypes],
        metric_args: Dict[str, Any],
        history_monitor: Optional['HistoryMonitor'],
        before_train: bool
) -> None:
    """Evaluation routine, to be called once per round.

    !!! info "Note"
        If the training plan implements a `testing_step` method
        (the signature of which is func(data, target) -> metrics)
        then it will be used rather than the input metric.

    Args:
        metric: The metric used for validation.
            If None, use MetricTypes.ACCURACY.
        history_monitor: HistoryMonitor instance,
            used to record computed metrics and communicate them to
            the researcher (server).
        before_train: Whether the evaluation is being performed
            before local training occurs, of afterwards. This is merely
            reported back through `history_monitor`.
    """
    if not isinstance(self.model(), torch.nn.Module):
        msg = (
            f"{ErrorNumbers.FB320.value}: model should be a torch "
            f"nn.Module, but is of type {type(self.model())}"
        )
        logger.critical(msg)
        raise FedbiomedTrainingPlanError(msg)
    try:

        with torch.no_grad():
            super().testing_routine(
                metric, metric_args, history_monitor, before_train
            )
    finally:
        self.model().train()  # restore training behaviors
training_data abstractmethod
training_data()

Abstract method to return training data

Source code in fedbiomed/common/training_plans/_torchnn.py
@abstractmethod
def training_data(self):
    """Abstract method to return training data"""
training_routine
training_routine(history_monitor=None, node_args=None)

Training routine procedure.

End-user should define;

  • a training_data() function defining how sampling / handling data in node's dataset is done. It should return a generator able to output tuple (batch_idx, (data, targets)) that is iterable for each batch.
  • a training_step() function defining how cost is computed. It should output loss values for backpropagation.

Parameters:

Name Type Description Default
history_monitor Any

Monitor handler for real-time feed. Defined by the Node and can't be overwritten

None
node_args Union[dict, None]

command line arguments for node. Can include: - gpu (bool): propose use a GPU device if any is available. Default False. - gpu_num (Union[int, None]): if not None, use the specified GPU device instead of default GPU device if this GPU device is available. Default None. - gpu_only (bool): force use of a GPU device if any available, even if researcher doesn't request for using a GPU. Default False.

None

Returns: Total number of samples observed during the training.

Source code in fedbiomed/common/training_plans/_torchnn.py
def training_routine(self,
                     history_monitor: Any = None,
                     node_args: Union[dict, None] = None,
                     ) -> int:
    # FIXME: add betas parameters for ADAM solver + momentum for SGD
    # FIXME 2: remove parameters specific for validation specified in the
    # training routine
    """Training routine procedure.

    End-user should define;

    - a `training_data()` function defining how sampling / handling data in node's dataset is done. It should
        return a generator able to output tuple (batch_idx, (data, targets)) that is iterable for each batch.
    - a `training_step()` function defining how cost is computed. It should output loss values for backpropagation.

    Args:
        history_monitor: Monitor handler for real-time feed. Defined by the Node and can't be overwritten
        node_args: command line arguments for node. Can include:
            - `gpu (bool)`: propose use a GPU device if any is available. Default False.
            - `gpu_num (Union[int, None])`: if not None, use the specified GPU device instead of default
                GPU device if this GPU device is available. Default None.
            - `gpu_only (bool)`: force use of a GPU device if any available, even if researcher
                doesn't request for using a GPU. Default False.
    Returns:
        Total number of samples observed during the training.
    """

    #self.model().train()  # pytorch switch for training
    self._optimizer.init_training()
    # set correct type for node args
    node_args = {} if not isinstance(node_args, dict) else node_args

    # send all model to device, ensures having all the requested tensors
    self._set_device(self._use_gpu, node_args)
    self._model.send_to_device(self._device)

    # Run preprocess when everything is ready before the training
    self._preprocess()

    # # initial aggregated model parameters
    # self._init_params = deepcopy(list(self.model().parameters()))

    # DP actions
    self._optimizer, self.training_data_loader = \
        self._dp_controller.before_training(optimizer= self._optimizer, loader=self.training_data_loader)

    # set number of training loop iterations
    iterations_accountant = MiniBatchTrainingIterationsAccountant(self)

    # Training loop iterations
    for epoch in iterations_accountant.iterate_epochs():
        training_data_iter: Iterator = iter(self.training_data_loader)

        for batch_idx in iterations_accountant.iterate_batches():
            # retrieve data and target
            data, target = next(training_data_iter)

            # update accounting for number of observed samples
            batch_size = self._infer_batch_size(data)
            iterations_accountant.increment_sample_counters(batch_size)

            # handle training on accelerator devices
            data, target = self.send_to_device(data, self._device), self.send_to_device(target, self._device)

            # train this batch
            corrected_loss, loss = self._train_over_batch(data, target)

            # Reporting
            if iterations_accountant.should_log_this_batch():
                # Retrieve reporting information: semantics differ whether num_updates or epochs were specified
                num_samples, num_samples_max = iterations_accountant.reporting_on_num_samples()
                num_iter, num_iter_max = iterations_accountant.reporting_on_num_iter()
                epoch_to_report = iterations_accountant.reporting_on_epoch()

                logger.debug('Train {}| '
                             'Iteration {}/{} | '
                             'Samples {}/{} ({:.0f}%)\tLoss: {:.6f}'.format(
                                 f'Epoch: {epoch_to_report} ' if epoch_to_report is not None else '',
                                 num_iter,
                                 num_iter_max,
                                 num_samples,
                                 num_samples_max,
                                 100. * num_iter / num_iter_max,
                                 loss.item())
                             )

                # Send scalar values via general/feedback topic
                if history_monitor is not None:
                    # the researcher only sees the average value of samples observed until now
                    history_monitor.add_scalar(metric={'Loss': loss.item()},
                                               iteration=num_iter,
                                               epoch=epoch_to_report,
                                               train=True,
                                               num_samples_trained=num_samples,
                                               num_batches=num_iter_max,
                                               total_samples=num_samples_max,
                                               batch_samples=batch_size)

            # Handle dry run mode
            if self._dry_run:
                self._model.send_to_device(self._device_init)
                torch.cuda.empty_cache()
                return iterations_accountant.num_samples_observed_in_total

    # release gpu usage as much as possible though:
    # - it should be done by deleting the object
    # - and some gpu memory remains used until process (cuda kernel ?) finishes

    self._model.send_to_device(self._device_init)
    torch.cuda.empty_cache()

    # # test (to be removed)
    # assert id(self._optimizer.model.model) == id(self._model.model)

    # assert id(self._optimizer.model) == id(self._model)

    # for (layer, val), (layer2, val2) in zip(self._model.model.state_dict().items(), self._optimizer.model.model.state_dict().items()):
    #     assert layer == layer2
    #     print(val, layer2)
    #     assert torch.isclose(val, val2).all()

    # for attributes, values in self._model.__dict__.items():
    #     print("ATTRIBUTES", values)
    #     assert values == getattr(self._optimizer.model, attributes)

    # for attributes, values in self._model.model.__dict__.items():
    #     print("ATTRIBUTES", values)
    #     assert values == getattr(self._optimizer.model.model, attributes)
    return iterations_accountant.num_samples_observed_in_total
training_step abstractmethod
training_step()

Abstract method, all subclasses must provide a training_step.

Source code in fedbiomed/common/training_plans/_torchnn.py
@abstractmethod
def training_step(self):
    """Abstract method, all subclasses must provide a training_step."""
type
type()

Gets training plan type

Source code in fedbiomed/common/training_plans/_torchnn.py
def type(self) -> TrainingPlans.TorchTrainingPlan:
    """ Gets training plan type"""
    return self.__type
update_optimizer_args
update_optimizer_args()

Updates _optimizer_args variable. Can prove useful to retrieve optimizer parameters after having trained a model, parameters which may have changed during training (eg learning rate).

Updated arguments
  • learning_rate

Returns:

Name Type Description
Dict Dict

updated _optimizer_args

Source code in fedbiomed/common/training_plans/_torchnn.py
def update_optimizer_args(self) -> Dict:
    """
    Updates `_optimizer_args` variable. Can prove useful
    to retrieve optimizer parameters after having trained a
    model, parameters which may have changed during training (eg learning rate).

    Updated arguments:
     - learning_rate

    Returns:
        Dict: updated `_optimizer_args`
    """
    if self._optimizer_args is None:
        self._optimizer_args = {}
    if self.aggregator_name is not None and self.aggregator_name.lower() == "scaffold":
        self._optimizer_args['lr'] = self._optimizer.get_learning_rate()
    return self._optimizer_args

Accounting class for keeping track of training iterations.

This class has the following responsibilities:

- manage iterators for epochs and batches
- provide up-to-date values for reporting
- handle different semantics in case the researcher asked for num_updates or epochs

We assume that the underlying implementation for the training loop is always made in terms of epochs and batches. So the primary purpose of this class is to provide a way to correctly convert the number of updates into epochs and batches.

For reporting purposes, in the case of num_updates then we think of the training as a single big loop, while in the case of epochs and batches we think of it as two nested loops. This changes the meaning of the values outputted by the reporting functions (see their docstrings for more details).

Attributes:

Name Type Description
_training_plan

a reference to the training plan executing the training iterations

cur_epoch int

the index of the current epoch during iterations

cur_batch int

the index of the current batch during iterations

epochs int

the total number of epochs to be performed (we always perform one additional -- possibly empty -- epoch

num_batches_per_epoch int

the number of iterations per epoch

num_batches_in_last_epoch int

the number of iterations in the last epoch (can be zero)

num_samples_observed_in_epoch int

a counter for the number of samples observed in the current epoch, for reporting

num_samples_observed_in_total int

a counter for the number of samples observed total, for reporting

Parameters:

Name Type Description Default
training_plan TBaseTrainingPlan

a reference to the training plan that is executing the training iterations

required
Source code in fedbiomed/common/training_plans/_training_iterations.py
def __init__(self, training_plan: TBaseTrainingPlan):
    """Initialize the class.

    Arguments:
        training_plan: a reference to the training plan that is executing the training iterations
    """
    self._training_plan = training_plan
    self.cur_epoch: int = 0
    self.cur_batch: int = 0
    self.epochs: int = 0
    self.num_batches_per_epoch: int = 0
    self.num_batches_in_last_epoch: int = 0
    self.num_samples_observed_in_epoch: int = 0
    self.num_samples_observed_in_total: int = 0
    self._n_training_iterations()

Attributes

cur_batch instance-attribute

cur_batch = 0

cur_epoch instance-attribute

cur_epoch = 0

epochs instance-attribute

epochs = 0

num_batches_in_last_epoch instance-attribute

num_batches_in_last_epoch = 0

num_batches_per_epoch instance-attribute

num_batches_per_epoch = 0

num_samples_observed_in_epoch instance-attribute

num_samples_observed_in_epoch = 0

num_samples_observed_in_total instance-attribute

num_samples_observed_in_total = 0

Classes

BatchIter

BatchIter(accountant)

Iterator over batches.

Attributes:

Name Type Description
_accountant

an instance of the class that created this iterator

Source code in fedbiomed/common/training_plans/_training_iterations.py
def __init__(self, accountant: TTrainingIterationsAccountant):
    self._accountant = accountant

EpochsIter

EpochsIter(accountant)

Iterator over epochs.

Attributes:

Name Type Description
_accountant

an instance of the class that created this iterator

Source code in fedbiomed/common/training_plans/_training_iterations.py
def __init__(self, accountant: TTrainingIterationsAccountant):
    self._accountant = accountant

Functions

increment_sample_counters

increment_sample_counters(n_samples)

Increments internal counter for numbers of observed samples

Source code in fedbiomed/common/training_plans/_training_iterations.py
def increment_sample_counters(self, n_samples: int):
    """Increments internal counter for numbers of observed samples"""
    self.num_samples_observed_in_epoch += n_samples
    self.num_samples_observed_in_total += n_samples

iterate_batches

iterate_batches()

Returns an instance of a batches iterator.

Source code in fedbiomed/common/training_plans/_training_iterations.py
def iterate_batches(self):
    """Returns an instance of a batches iterator."""
    return MiniBatchTrainingIterationsAccountant.BatchIter(self)

iterate_epochs

iterate_epochs()

Returns an instance of an epochs iterator.

Source code in fedbiomed/common/training_plans/_training_iterations.py
def iterate_epochs(self):
    """Returns an instance of an epochs iterator."""
    return MiniBatchTrainingIterationsAccountant.EpochsIter(self)

num_batches_in_this_epoch

num_batches_in_this_epoch()

Returns the number of iterations to be performed in the current epoch

Source code in fedbiomed/common/training_plans/_training_iterations.py
def num_batches_in_this_epoch(self) -> int:
    """Returns the number of iterations to be performed in the current epoch"""
    if self.cur_epoch == self.epochs:
        return self.num_batches_in_last_epoch
    else:
        return self.num_batches_per_epoch

reporting_on_epoch

reporting_on_epoch()

Returns the optional index of the current epoch, for reporting.

Source code in fedbiomed/common/training_plans/_training_iterations.py
def reporting_on_epoch(self) -> Optional[int]:
    """Returns the optional index of the current epoch, for reporting."""
    if self._training_plan.training_args()['num_updates'] is not None:
        return None
    else:
        return self.cur_epoch

reporting_on_num_iter

reporting_on_num_iter()

Outputs useful reporting information about the number of iterations

If the researcher specified num_updates, then the iteration number will be the cumulated total, and similarly the maximum number of iterations will be equal to the requested number of updates. If the researcher specified epochs, then the iteration number will be the batch index in the current epoch, while the maximum number of iterations will be computed specifically for the current epoch.

Returns:

Type Description
int

the iteration number

int

the maximum number of iterations to be reported

Source code in fedbiomed/common/training_plans/_training_iterations.py
def reporting_on_num_iter(self) -> Tuple[int, int]:
    """Outputs useful reporting information about the number of iterations

    If the researcher specified num_updates, then the iteration number will be the cumulated total, and
    similarly the maximum number of iterations will be equal to the requested number of updates.
    If the researcher specified epochs, then the iteration number will be the batch index in the current epoch,
    while the maximum number of iterations will be computed specifically for the current epoch.

    Returns:
        the iteration number
        the maximum number of iterations to be reported
    """
    if self._training_plan.training_args()['num_updates'] is not None:
        num_iter = (self.cur_epoch - 1) * self.num_batches_per_epoch + self.cur_batch
        total_batches_to_be_observed = (self.epochs - 1) * self.num_batches_per_epoch + \
            self.num_batches_in_last_epoch
        num_iter_max = total_batches_to_be_observed
    else:
        num_iter = self.cur_batch
        num_iter_max = self.num_batches_per_epoch
    return num_iter, num_iter_max

reporting_on_num_samples

reporting_on_num_samples()

Outputs useful reporting information about the number of observed samples

If the researcher specified num_updates, then the number of observed samples will be the grand total, and similarly the maximum number of samples will be the grand total over all iterations. If the researcher specified epochs, then both values will be specific to the current epoch.

Returns:

Type Description
int

the number of samples observed until the current iteration

int

the maximum number of samples to be observed

Source code in fedbiomed/common/training_plans/_training_iterations.py
def reporting_on_num_samples(self) -> Tuple[int, int]:
    """Outputs useful reporting information about the number of observed samples

    If the researcher specified num_updates, then the number of observed samples will be the grand total, and
    similarly the maximum number of samples will be the grand total over all iterations.
    If the researcher specified epochs, then both values will be specific to the current epoch.

    Returns:
        the number of samples observed until the current iteration
        the maximum number of samples to be observed
    """
    # get batch size
    if 'batch_size' in self._training_plan.loader_args():
        batch_size = self._training_plan.loader_args()['batch_size']
    else:
        raise FedbiomedUserInputError('Missing required key `batch_size` in `loader_args`.')
    # compute number of observed samples
    if self._training_plan.training_args()['num_updates'] is not None:
        num_samples = self.num_samples_observed_in_total
        total_batches_to_be_observed = (self.epochs - 1) * self.num_batches_per_epoch + \
            self.num_batches_in_last_epoch
        total_n_samples_to_be_observed = batch_size * total_batches_to_be_observed
        num_samples_max = total_n_samples_to_be_observed
    else:
        num_samples = self.num_samples_observed_in_epoch
        num_samples_max = batch_size*self.num_batches_in_this_epoch() if \
            self.cur_batch < self.num_batches_in_this_epoch() else num_samples
    return num_samples, num_samples_max

should_log_this_batch

should_log_this_batch()

Whether the current batch should be logged or not.

A batch shall be logged if at least one of the following conditions is True:

- the cumulative batch index is a multiple of the logging interval
- the dry_run condition was specified by the researcher
- it is the last batch of the epoch
- it is the first batch of the epoch
Source code in fedbiomed/common/training_plans/_training_iterations.py
def should_log_this_batch(self) -> bool:
    """Whether the current batch should be logged or not.

    A batch shall be logged if at least one of the following conditions is True:

        - the cumulative batch index is a multiple of the logging interval
        - the dry_run condition was specified by the researcher
        - it is the last batch of the epoch
        - it is the first batch of the epoch
    """
    current_iter = (self.cur_epoch - 1) * self.num_batches_per_epoch + self.cur_batch
    return (current_iter % self._training_plan.training_args()['log_interval'] == 0 or
            self._training_plan.training_args()['dry_run'] or
            self.cur_batch >= self.num_batches_in_this_epoch() or  # last batch
            self.cur_batch == 1)  # first batch