PyTorch aggregation methods in Fed-BioMed¶
Difficulty level: advanced
Introduction¶
This tutorial focuses on how to deal with heterogeneous dataset by changing its Aggegator. Fed-BioMed provides different methods for Aggregation. Selecting an appropriate Aggregation method can be critical when being confronted to unbalanced /heterogeneous datasets.
Aggregators provides a way to merge local models sent by Nodes into a global, more generalized model. Please note that designing Nodes sampling Strategies could also help when working on heterogeneous datasets.
For more information about Aggregators object in Fed-BioMed, and on how to create your own Aggregator; please see Aggregators in the User Guide
Before you start¶
For this tutorial, we will be using heterogenous Fed-IXI dataset, provided by FLamby. FLamby comes with a few medical datasets that have heterogenous data properties. Please have a look at the notebooks on how to use FLamby in Fed-BioMed tutorials before starting - you will indeed need to set up FLamby before running this tutorial.
1. Defining an Experiment using FedAverage Aggregator¶
First, let's re-use the TorchTrainingPlan that is defined in the FLamby tutorials. FedAveraging has been introduced by McMahan et al. as the first aggregation method in the Federated Learning literature. It does the weighted sum of all Nodes local models parameters in order to obtain a global model:
In this tutorial, we will keep the same TrainingPlan (and thus the same model) for all the Experimentations, we will be changing only Aggregators
from fedbiomed.common.training_plans import TorchTrainingPlan
from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer
from fedbiomed.common.data import DataManager
from fedbiomed.common.data.flamby_dataset import FlambyDataset
class MyTrainingPlan(TorchTrainingPlan):
def init_model(self, model_args):
return Baseline()
def init_optimizer(self, optimizer_args):
return Optimizer(self.model().parameters(), lr=optimizer_args["lr"])
def init_dependencies(self):
return ["from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer",
"from fedbiomed.common.data import DataManager",
"from fedbiomed.common.data.flamby_dataset import FlambyDataset"]
def training_step(self, data, target):
output = self.model().forward(data)
return BaselineLoss().forward(output, target)
def training_data(self):
dataset = FlambyDataset()
loader_arguments = { 'shuffle': True}
return DataManager(dataset, **loader_arguments)
We define hereafter parameters for Experiment to be used with vanilla FedAverage
model_args = {}
training_args = {
'random_seed': 1234,
'loader_args': { 'batch_size': 8, },
'optimizer_args': {
"lr" : 1e-3
},
'dry_run': False,
'num_updates': 50
}
Activate Tensorboard
%load_ext tensorboard
from fedbiomed.researcher.config import config
import os
fedavg_tensorboard_dir = os.path.join(config.root, 'fedavg_runs')
os.makedirs(fedavg_tensorboard_dir, exist_ok=True)
config.vars['TENSORBOARD_RESULTS_DIR'] = fedavg_tensorboard_dir
tensorboard --logdir "$fedavg_tensorboard_dir"
We then import FedAverage Aggregator from Fed-BioMed's Aggregators
from fedbiomed.researcher.federated_workflows import Experiment
from fedbiomed.researcher.aggregators import FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
tags = ['flixi']
rounds = 3
exp_fed_avg = Experiment()
exp_fed_avg.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_fed_avg.set_model_args(model_args=model_args)
exp_fed_avg.set_training_args(training_args=training_args)
exp_fed_avg.set_tags(tags = tags)
exp_fed_avg.set_training_data(training_data=None, from_tags=True)
exp_fed_avg.set_aggregator(aggregator=FedAverage())
exp_fed_avg.set_strategy(node_selection_strategy=DefaultStrategy())
exp_fed_avg.set_round_limit(rounds)
exp_fed_avg.set_tensorboard(True)
exp_fed_avg.run(increase=True)
Save trained model to file
exp_fed_avg.training_plan().export_model('./trained_model')
2. Defining an Experiment using FedProx Aggregator¶
In order to improve our results, we can change our Aggregator, by changing FedAverage into FedProx. Since FedProx is a FedAverge aggregator with a regularization term, we are re-using FedAverage Aggregator but we will be adding to the training_args fedprox_mu, that is the regularization parameter.
# let's create a new folder for storing tensorbaord results for FedProx aggregator
import os
from fedbiomed.researcher.config import config
fedprox_tensorboard_dir = os.path.join(config.root, 'fedprox_runs')
os.makedirs(fedprox_tensorboard_dir, exist_ok=True)
config.vars['TENSORBOARD_RESULTS_DIR'] = fedprox_tensorboard_dir
%reload_ext tensorboard
tensorboard --logdir "$fedprox_tensorboard_dir"
model_args = {}
training_args_fedprox = {
'random_seed': 1234,
'loader_args': { 'batch_size': 8, },
'optimizer_args': {
"lr" : 1e-3
},
'dry_run': False,
'num_updates': 50,
'fedprox_mu': .1 # This parameter indicates that we are going to use FedProx
}
from fedbiomed.researcher.federated_workflows import Experiment
from fedbiomed.researcher.aggregators import FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
tags = ['flixi']
rounds = 3
exp_fedprox = Experiment()
exp_fedprox.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_fedprox.set_model_args(model_args=model_args)
exp_fedprox.set_training_args(training_args=training_args_fedprox)
exp_fedprox.set_tags(tags = tags)
exp_fedprox.set_training_data(training_data=None, from_tags=True)
exp_fedprox.set_aggregator(aggregator=FedAverage())
exp_fedprox.set_strategy(node_selection_strategy=DefaultStrategy())
exp_fedprox.set_round_limit(rounds)
exp_fedprox.set_tensorboard(True)
exp_fedprox.run(increase=True)
Save trained model to file
exp_fedprox.training_plan().export_model('./trained_model')
3. Defining an Experiment using SCAFFOLD Aggregator¶
Scaffold purpose is to limit the so called client drift that may happen when dealing with heterogenous datasset accross Nodes.
In order to use Scaffold, we will have to import another Aggregator from fedbiomed.researcher.aggregators module, as you can see below.
Scaffold takes server_lr and fds the as arguments.
server_lris the server learning rate (inScaffold, used to perform a gradient descent on global model's updatesfdsis theFederated Datasetcontaining information aboutNodesconnected to the network after issuing aTrainRequest
Please note that it is possible to use Scaffold with a regularization parameter as suggested in FedProx. For that, you just have to specify fedprox_mu into the training_args dictionary, as shown in the FedProx example
Attention: this version of Scaffold exchanges correction terms that are not protected, even when using Secure Aggregation. Please do not use this version of Scaffold under heavy security constraints.
# let's create a new folder for storing tensorbaord results for SCAFFOLD aggregator
scaffold_tensorboard_dir = os.path.join(config.root, 'scaffold_runs')
os.makedirs(scaffold_tensorboard_dir, exist_ok=True)
config.vars['TENSORBOARD_RESULTS_DIR'] = scaffold_tensorboard_dir
%reload_ext tensorboard
tensorboard --logdir "$scaffold_tensorboard_dir"
from fedbiomed.researcher.aggregators import Scaffold
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
server_lr = .8
exp_scaffold = Experiment()
exp_scaffold.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_scaffold.set_model_args(model_args=model_args)
exp_scaffold.set_training_args(training_args=training_args)
exp_scaffold.set_tags(tags = tags)
exp_scaffold.set_training_data(training_data=None, from_tags=True)
exp_scaffold.set_aggregator(Scaffold(server_lr=server_lr))
exp_scaffold.set_strategy(node_selection_strategy=DefaultStrategy())
exp_scaffold.set_round_limit(rounds)
exp_scaffold.set_tensorboard(True)
exp_scaffold.run(increase=True)
Save trained model to file
exp_scaffold.training_plan().export_model('./trained_model')
4. Going further¶
In this tutorial we presented 3 important Aggregators that can be found in the Federated Learning Literature. If you want to create your custom Aggregator, please check our Aggregation User guide
You may have noticed that thanks to Fed-BioMed's modular structure, it is possible to alternate from one aggregator to another while conducting an Experiment. For instance, you may start with SCAFFOLD Aggregator for the 3 first rounds, and then switch to FedAverage Aggregator for the remaining rounds, as shown in the example below:
from fedbiomed.researcher.aggregators import Scaffold, FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
server_lr = .8
exp_multi_agg = Experiment()
# selecting how many rounds of each aggregator we will perform
rounds_scaffold = 3
rounds_fedavg = 1
exp_multi_agg.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_multi_agg.set_model_args(model_args=model_args)
exp_multi_agg.set_training_args(training_args=training_args)
exp_multi_agg.set_tags(tags = tags)
exp_multi_agg.set_training_data(training_data=None, from_tags=True)
exp_multi_agg.set_aggregator(Scaffold(server_lr=server_lr))
exp_multi_agg.set_strategy(node_selection_strategy=DefaultStrategy())
exp_multi_agg.set_round_limit(rounds_scaffold + rounds_fedavg)
#exp_multi_agg.run(rounds=rounds_scaffold)
exp_multi_agg.set_aggregator(FedAverage())
exp_multi_agg.run(rounds=rounds_fedavg)
Save trained model to file
exp_multi_agg.training_plan().export_model('./trained_model')
For more advanced Aggregators and Regularizers, like FedOpt, you may be interested by DecLearn optimizers that are compatible with Fed-BioMed and provide more options for Aggregation and Optimization.