TrainingPlans

The fedbiomed.common.training_plans module includes training plan classes that are used for federated training

Classes

BaseTrainingPlan

BaseTrainingPlan()

Base class for training plan

All concrete, framework- and/or model-specific training plans should inherit from this class, and implement: * the post_init method: to process model and training hyper-parameters * the training_routine method: to train the model for one round * the predict method: to compute predictions over a given batch * (opt.) the testing_step method: to override the evaluation behavior and compute a batch-wise (set of) metric(s)

Attributes:

Name Type Description
dataset_path Union[str, None]

The path that indicates where dataset has been stored

pre_processes Dict[str, PreProcessDict]

Preprocess functions that will be applied to the training data at the beginning of the training routine.

training_data_loader Union[DataLoader, NPDataLoader, None]

Data loader used in the training routine.

testing_data_loader Union[DataLoader, NPDataLoader, None]

Data loader used in the validation routine.

Source code in fedbiomed/common/training_plans/_base_training_plan.py
def __init__(self) -> None:
    """Construct the base training plan."""
    self._dependencies: List[str] = []
    self.dataset_path: Union[str, None] = None
    self.pre_processes: Dict[str, PreProcessDict] = OrderedDict()
    self.training_data_loader: Union[DataLoader, NPDataLoader, None] = None
    self.testing_data_loader: Union[DataLoader, NPDataLoader, None] = None

    # Arguments provided by the researcher; they will be populated by post_init
    self._model_args: Dict[str, Any] = None
    self._aggregator_args: Dict[str, Any] = None
    self._optimizer_args: Dict[str, Any] = None
    self._loader_args: Dict[str, Any] = None
    self._training_args: Dict[str, Any] = None

    self._error_msg_import_model: str = f"{ErrorNumbers.FB605.value}: Training Plan's Model is not initialized.\n" +\
                                        "To %s a model, you should do it through `fedbiomed.researcher.experiment.Experiment`'s interface" +\
                                        " and not directly from Training Plan"

Attributes

Functions

FedPerceptron

FedPerceptron()

Bases: FedSGDClassifier

Fed-BioMed training plan for scikit-learn Perceptron models.

This class inherits from FedSGDClassifier, and forces the wrapped scikit-learn SGDClassifier model to use a "perceptron" loss, that makes it equivalent to an actual scikit-learn Perceptron model.

Source code in fedbiomed/common/training_plans/_sklearn_models.py
def __init__(self) -> None:
    """Class constructor."""
    super().__init__()

Functions

FedSGDClassifier

FedSGDClassifier()

Bases: SKLearnTrainingPlanPartialFit

Fed-BioMed training plan for scikit-learn SGDClassifier models.

Source code in fedbiomed/common/training_plans/_sklearn_models.py
def __init__(self) -> None:
    """Initialize the sklearn SGDClassifier training plan."""
    super().__init__()

FedSGDRegressor

FedSGDRegressor()

Bases: SKLearnTrainingPlanPartialFit

Fed-BioMed training plan for scikit-learn SGDRegressor models.

Source code in fedbiomed/common/training_plans/_sklearn_models.py
def __init__(self) -> None:
    """Initialize the sklearn SGDRegressor training plan."""
    super().__init__()

SKLearnTrainingPlan

SKLearnTrainingPlan()

Bases: BaseTrainingPlan

Base class for Fed-BioMed wrappers of sklearn classes.

Classes that inherit from this abstract class must: - Specify a _model_cls class attribute that defines the type of scikit-learn model being wrapped for training. - Implement a set_init_params method that: - sets and assigns the model's initial trainable weights attributes. - populates the _param_list attribute with names of these attributes. - Implement a _training_routine method that performs a training round based on self.train_data_loader (which is a NPDataLoader).

Attributes:

Name Type Description
dataset_path Optional[str]

The path that indicates where dataset has been stored

pre_processes Optional[str]

Preprocess functions that will be applied to the training data at the beginning of the training routine.

training_data_loader Optional[str]

Data loader used in the training routine.

testing_data_loader Optional[str]

Data loader used in the validation routine.

Notes

The trained model may be exported via the export_model method, resulting in a dump file that may be reloded using joblib.load outside of Fed-BioMed.

Source code in fedbiomed/common/training_plans/_sklearn_training_plan.py
def __init__(self) -> None:
    """Initialize the SKLearnTrainingPlan."""
    super().__init__()
    self._model: Union[SkLearnModel, None] = None
    self._training_args = {}  # type: Dict[str, Any]
    self.__type = TrainingPlans.SkLearnTrainingPlan
    self._batch_maxnum = 0
    self.dataset_path: Optional[str] = None
    self._optimizer: Optional[BaseOptimizer] = None
    self._add_dependency([
        "import inspect",
        "import numpy as np",
        "import pandas as pd",
        "from fedbiomed.common.training_plans import SKLearnTrainingPlan",
        "from fedbiomed.common.data import DataManager",
    ])
    self._add_dependency(list(self._model_dep))

    # Add dependencies
    self._configure_dependencies()

Attributes

Functions

TorchTrainingPlan

TorchTrainingPlan()

Bases: BaseTrainingPlan

Implements TrainingPlan for torch NN framework

An abstraction over pytorch module to run pytorch models and scripts on node side. Researcher model (resp. params) will be:

  1. saved on a '.py' (resp. '.mpk') files,
  2. uploaded on a HTTP server (network layer),
  3. then Downloaded from the HTTP server on node side,
  4. finally, read and executed on node side.

Researcher must define/override: - a training_data() function - a training_step() function

Researcher may have to add extra dependencies/python imports, by using init_dependencies method.

Attributes:

Name Type Description
dataset_path

The path that indicates where dataset has been stored

pre_processes

Preprocess functions that will be applied to the training data at the beginning of the training routine.

training_data_loader

Data loader used in the training routine.

testing_data_loader

Data loader used in the validation routine.

correction_state OrderedDict

an OrderedDict of {'parameter name': torch.Tensor} where the keys correspond to the names of the model parameters contained in self._model.named_parameters(), and the values correspond to the correction to be applied to that parameter.

Notes

The trained model may be exported via the export_model method, resulting in a dump file that may be reloded using torch.save outside of Fed-BioMed.

Source code in fedbiomed/common/training_plans/_torchnn.py
def __init__(self):
    """ Construct training plan """

    super().__init__()

    self.__type = TrainingPlans.TorchTrainingPlan

    # Differential privacy support
    self._dp_controller: Optional[DPController] = None
    self._optimizer: Union[BaseOptimizer, None] = None
    self._model: Union[TorchModel, None] = None

    self._use_gpu: bool = False
    self._share_persistent_buffers = None

    self._batch_maxnum: int = 100
    self._fedprox_mu: Optional[float] = None
    self._log_interval: int = 10
    self._epochs: int = 1
    self._dry_run = False
    self._num_updates: Optional[int] = None

    self.correction_state: OrderedDict = OrderedDict()
    self.aggregator_name: str = None

    # TODO : add random seed init
    # self.random_seed_params = None
    # self.random_seed_shuffling_data = None

    # device to use: cpu/gpu
    # - all operations except training only use cpu
    # - researcher doesn't request to use gpu by default
    self._device_init: str = "cpu"
    self._device = self._device_init

    # list dependencies of the model
    self._add_dependency(["import torch",
                         "import torch.nn as nn",
                         "import torch.nn.functional as F",
                         "from fedbiomed.common.training_plans import TorchTrainingPlan",
                         "from fedbiomed.common.data import DataManager",
                         "from fedbiomed.common.constants import ProcessTypes",
                         "from torch.utils.data import DataLoader",
                         "from torchvision import datasets, transforms"
                         ])

    # Aggregated model parameters
    #self._init_params: List[torch.Tensor] = None

    # Add dependencies
    self._configure_dependencies()

Attributes

Functions

Accounting class for keeping track of training iterations.

This class has the following responsibilities:

- manage iterators for epochs and batches
- provide up-to-date values for reporting
- handle different semantics in case the researcher asked for num_updates or epochs

We assume that the underlying implementation for the training loop is always made in terms of epochs and batches. So the primary purpose of this class is to provide a way to correctly convert the number of updates into epochs and batches.

For reporting purposes, in the case of num_updates then we think of the training as a single big loop, while in the case of epochs and batches we think of it as two nested loops. This changes the meaning of the values outputted by the reporting functions (see their docstrings for more details).

Attributes:

Name Type Description
_training_plan

a reference to the training plan executing the training iterations

cur_epoch int

the index of the current epoch during iterations

cur_batch int

the index of the current batch during iterations

epochs int

the total number of epochs to be performed (we always perform one additional -- possibly empty -- epoch

num_batches_per_epoch int

the number of iterations per epoch

num_batches_in_last_epoch int

the number of iterations in the last epoch (can be zero)

num_samples_observed_in_epoch int

a counter for the number of samples observed in the current epoch, for reporting

num_samples_observed_in_total int

a counter for the number of samples observed total, for reporting

Parameters:

Name Type Description Default
training_plan TBaseTrainingPlan

a reference to the training plan that is executing the training iterations

required
Source code in fedbiomed/common/training_plans/_training_iterations.py
def __init__(self, training_plan: TBaseTrainingPlan):
    """Initialize the class.

    Arguments:
        training_plan: a reference to the training plan that is executing the training iterations
    """
    self._training_plan = training_plan
    self.cur_epoch: int = 0
    self.cur_batch: int = 0
    self.epochs: int = 0
    self.num_batches_per_epoch: int = 0
    self.num_batches_in_last_epoch: int = 0
    self.num_samples_observed_in_epoch: int = 0
    self.num_samples_observed_in_total: int = 0
    self._n_training_iterations()

Attributes

cur_batch instance-attribute

cur_batch: int = 0

cur_epoch instance-attribute

cur_epoch: int = 0

epochs instance-attribute

epochs: int = 0

num_batches_in_last_epoch instance-attribute

num_batches_in_last_epoch: int = 0

num_batches_per_epoch instance-attribute

num_batches_per_epoch: int = 0

num_samples_observed_in_epoch instance-attribute

num_samples_observed_in_epoch: int = 0

num_samples_observed_in_total instance-attribute

num_samples_observed_in_total: int = 0

Classes

BatchIter

BatchIter(accountant)

Iterator over batches.

Attributes:

Name Type Description
_accountant

an instance of the class that created this iterator

Source code in fedbiomed/common/training_plans/_training_iterations.py
def __init__(self, accountant: TTrainingIterationsAccountant):
    self._accountant = accountant

EpochsIter

EpochsIter(accountant)

Iterator over epochs.

Attributes:

Name Type Description
_accountant

an instance of the class that created this iterator

Source code in fedbiomed/common/training_plans/_training_iterations.py
def __init__(self, accountant: TTrainingIterationsAccountant):
    self._accountant = accountant

Functions

increment_sample_counters

increment_sample_counters(n_samples)

Increments internal counter for numbers of observed samples

Source code in fedbiomed/common/training_plans/_training_iterations.py
def increment_sample_counters(self, n_samples: int):
    """Increments internal counter for numbers of observed samples"""
    self.num_samples_observed_in_epoch += n_samples
    self.num_samples_observed_in_total += n_samples

iterate_batches

iterate_batches()

Returns an instance of a batches iterator.

Source code in fedbiomed/common/training_plans/_training_iterations.py
def iterate_batches(self):
    """Returns an instance of a batches iterator."""
    return MiniBatchTrainingIterationsAccountant.BatchIter(self)

iterate_epochs

iterate_epochs()

Returns an instance of an epochs iterator.

Source code in fedbiomed/common/training_plans/_training_iterations.py
def iterate_epochs(self):
    """Returns an instance of an epochs iterator."""
    return MiniBatchTrainingIterationsAccountant.EpochsIter(self)

num_batches_in_this_epoch

num_batches_in_this_epoch()

Returns the number of iterations to be performed in the current epoch

Source code in fedbiomed/common/training_plans/_training_iterations.py
def num_batches_in_this_epoch(self) -> int:
    """Returns the number of iterations to be performed in the current epoch"""
    if self.cur_epoch == self.epochs:
        return self.num_batches_in_last_epoch
    else:
        return self.num_batches_per_epoch

reporting_on_epoch

reporting_on_epoch()

Returns the optional index of the current epoch, for reporting.

Source code in fedbiomed/common/training_plans/_training_iterations.py
def reporting_on_epoch(self) -> Optional[int]:
    """Returns the optional index of the current epoch, for reporting."""
    if self._training_plan.training_args()['num_updates'] is not None:
        return None
    else:
        return self.cur_epoch

reporting_on_num_iter

reporting_on_num_iter()

Outputs useful reporting information about the number of iterations

If the researcher specified num_updates, then the iteration number will be the cumulated total, and similarly the maximum number of iterations will be equal to the requested number of updates. If the researcher specified epochs, then the iteration number will be the batch index in the current epoch, while the maximum number of iterations will be computed specifically for the current epoch.

Returns:

Type Description
int

the iteration number

int

the maximum number of iterations to be reported

Source code in fedbiomed/common/training_plans/_training_iterations.py
def reporting_on_num_iter(self) -> Tuple[int, int]:
    """Outputs useful reporting information about the number of iterations

    If the researcher specified num_updates, then the iteration number will be the cumulated total, and
    similarly the maximum number of iterations will be equal to the requested number of updates.
    If the researcher specified epochs, then the iteration number will be the batch index in the current epoch,
    while the maximum number of iterations will be computed specifically for the current epoch.

    Returns:
        the iteration number
        the maximum number of iterations to be reported
    """
    if self._training_plan.training_args()['num_updates'] is not None:
        num_iter = (self.cur_epoch - 1) * self.num_batches_per_epoch + self.cur_batch
        total_batches_to_be_observed = (self.epochs - 1) * self.num_batches_per_epoch + \
            self.num_batches_in_last_epoch
        num_iter_max = total_batches_to_be_observed
    else:
        num_iter = self.cur_batch
        num_iter_max = self.num_batches_per_epoch
    return num_iter, num_iter_max

reporting_on_num_samples

reporting_on_num_samples()

Outputs useful reporting information about the number of observed samples

If the researcher specified num_updates, then the number of observed samples will be the grand total, and similarly the maximum number of samples will be the grand total over all iterations. If the researcher specified epochs, then both values will be specific to the current epoch.

Returns:

Type Description
int

the number of samples observed until the current iteration

int

the maximum number of samples to be observed

Source code in fedbiomed/common/training_plans/_training_iterations.py
def reporting_on_num_samples(self) -> Tuple[int, int]:
    """Outputs useful reporting information about the number of observed samples

    If the researcher specified num_updates, then the number of observed samples will be the grand total, and
    similarly the maximum number of samples will be the grand total over all iterations.
    If the researcher specified epochs, then both values will be specific to the current epoch.

    Returns:
        the number of samples observed until the current iteration
        the maximum number of samples to be observed
    """
    # get batch size
    if 'batch_size' in self._training_plan.loader_args():
        batch_size = self._training_plan.loader_args()['batch_size']
    else:
        raise FedbiomedUserInputError('Missing required key `batch_size` in `loader_args`.')
    # compute number of observed samples
    if self._training_plan.training_args()['num_updates'] is not None:
        num_samples = self.num_samples_observed_in_total
        total_batches_to_be_observed = (self.epochs - 1) * self.num_batches_per_epoch + \
            self.num_batches_in_last_epoch
        total_n_samples_to_be_observed = batch_size * total_batches_to_be_observed
        num_samples_max = total_n_samples_to_be_observed
    else:
        num_samples = self.num_samples_observed_in_epoch
        num_samples_max = batch_size*self.num_batches_in_this_epoch() if \
            self.cur_batch < self.num_batches_in_this_epoch() else num_samples
    return num_samples, num_samples_max

should_log_this_batch

should_log_this_batch()

Whether the current batch should be logged or not.

A batch shall be logged if at least one of the following conditions is True:

- the cumulative batch index is a multiple of the logging interval
- the dry_run condition was specified by the researcher
- it is the last batch of the epoch
- it is the first batch of the epoch
Source code in fedbiomed/common/training_plans/_training_iterations.py
def should_log_this_batch(self) -> bool:
    """Whether the current batch should be logged or not.

    A batch shall be logged if at least one of the following conditions is True:

        - the cumulative batch index is a multiple of the logging interval
        - the dry_run condition was specified by the researcher
        - it is the last batch of the epoch
        - it is the first batch of the epoch
    """
    current_iter = (self.cur_epoch - 1) * self.num_batches_per_epoch + self.cur_batch
    return (current_iter % self._training_plan.training_args()['log_interval'] == 0 or
            self._training_plan.training_args()['dry_run'] or
            self.cur_batch >= self.num_batches_in_this_epoch() or  # last batch
            self.cur_batch == 1)  # first batch