PyTorch aggregation methods in Fed-BioMed¶
Difficulty level: advanced
Introduction¶
This tutorial focuses on how to deal with heterogeneous dataset by changing its Aggegator
. Fed-BioMed provides different methods for Aggregation. Selecting an appropriate Aggregation method can be critical when being confronted to unbalanced /heterogeneous datasets.
Aggregators
provides a way to merge local models sent by Nodes
into a global, more generalized model. Please note that designing Nodes
sampling Strategies
could also help when working on heterogeneous datasets.
For more information about Aggregators
object in Fed-BioMed, and on how to create your own Aggregator
; please see Aggregators
in the User Guide
Before you start¶
For this tutorial, we will be using heterogenous Fed-IXI dataset, provided by FLamby. FLamby comes with a few medical datasets that have heterogenous data properties. Please have a look at the notebooks on how to use FLamby in Fed-BioMed tutorials before starting - you will indeed need to set up FLamby before running this tutorial.
1. Defining an Experiment
using FedAverage
Aggregator
¶
First, let's re-use the TorchTrainingPlan
that is defined in the FLamby tutorials. FedAveraging has been introduced by McMahan et al. as the first aggregation method in the Federated Learning literature. It does the weighted sum of all Nodes
local models parameters in order to obtain a global model:
In this tutorial, we will keep the same TrainingPlan
(and thus the same model) for all the Experimentations
, we will be changing only Aggregators
from fedbiomed.common.training_plans import TorchTrainingPlan
from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer
from fedbiomed.common.data import FlambyDataset, DataManager
class MyTrainingPlan(TorchTrainingPlan):
def init_model(self, model_args):
return Baseline()
def init_optimizer(self, optimizer_args):
return Optimizer(self.model().parameters(), lr=optimizer_args["lr"])
def init_dependencies(self):
return ["from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer",
"from fedbiomed.common.data import FlambyDataset, DataManager"]
def training_step(self, data, target):
output = self.model().forward(data)
return BaselineLoss().forward(output, target)
def training_data(self):
dataset = FlambyDataset()
loader_arguments = { 'shuffle': True}
return DataManager(dataset, **loader_arguments)
We define hereafter parameters for Experiment
to be used with vanilla FedAverage
model_args = {}
training_args = {
'random_seed': 1234,
'loader_args': { 'batch_size': 8, },
'optimizer_args': {
"lr" : 1e-3
},
'dry_run': False,
'num_updates': 50
}
Activate Tensorboard
%load_ext tensorboard
from fedbiomed.researcher.environ import environ
import os
fedavg_tensorboard_dir = os.path.join(environ['ROOT_DIR'], 'fedavg_runs')
os.makedirs(fedavg_tensorboard_dir, exist_ok=True)
environ['TENSORBOARD_RESULTS_DIR'] = fedavg_tensorboard_dir
tensorboard --logdir "$fedavg_tensorboard_dir"
We then import FedAverage
Aggregator
from Fed-BioMed's Aggregators
from fedbiomed.researcher.federated_workflows import Experiment
from fedbiomed.researcher.aggregators import FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
tags = ['flixi']
rounds = 3
exp_fed_avg = Experiment()
exp_fed_avg.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_fed_avg.set_model_args(model_args=model_args)
exp_fed_avg.set_training_args(training_args=training_args)
exp_fed_avg.set_tags(tags = tags)
exp_fed_avg.set_training_data(training_data=None, from_tags=True)
exp_fed_avg.set_aggregator(aggregator=FedAverage())
exp_fed_avg.set_strategy(node_selection_strategy=DefaultStrategy())
exp_fed_avg.set_round_limit(rounds)
exp_fed_avg.set_tensorboard(True)
exp_fed_avg.run(increase=True)
Save trained model to file
exp_fed_avg.training_plan().export_model('./trained_model')
2. Defining an Experiment
using FedProx
Aggregator
¶
In order to improve our results, we can change our Aggregator
, by changing FedAverage
into FedProx
. Since FedProx
is a FedAverge
aggregator with a regularization term, we are re-using FedAverage
Aggregator
but we will be adding to the training_args
fedprox_mu
, that is the regularization parameter.
# let's create a new folder for storing tensorbaord results for FedProx aggregator
import os
from fedbiomed.researcher.environ import environ
fedprox_tensorboard_dir = os.path.join(environ['ROOT_DIR'], 'fedprox_runs')
os.makedirs(fedprox_tensorboard_dir, exist_ok=True)
environ['TENSORBOARD_RESULTS_DIR'] = fedprox_tensorboard_dir
%reload_ext tensorboard
tensorboard --logdir "$fedprox_tensorboard_dir"
model_args = {}
training_args_fedprox = {
'random_seed': 1234,
'loader_args': { 'batch_size': 8, },
'optimizer_args': {
"lr" : 1e-3
},
'dry_run': False,
'num_updates': 50,
'fedprox_mu': .1 # This parameter indicates that we are going to use FedProx
}
from fedbiomed.researcher.federated_workflows import Experiment
from fedbiomed.researcher.aggregators import FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
tags = ['flixi']
rounds = 3
exp_fedprox = Experiment()
exp_fedprox.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_fedprox.set_model_args(model_args=model_args)
exp_fedprox.set_training_args(training_args=training_args_fedprox)
exp_fedprox.set_tags(tags = tags)
exp_fedprox.set_training_data(training_data=None, from_tags=True)
exp_fedprox.set_aggregator(aggregator=FedAverage())
exp_fedprox.set_strategy(node_selection_strategy=DefaultStrategy())
exp_fedprox.set_round_limit(rounds)
exp_fedprox.set_tensorboard(True)
exp_fedprox.run(increase=True)
Save trained model to file
exp_fedprox.training_plan().export_model('./trained_model')
3. Defining an Experiment
using SCAFFOLD
Aggregator
¶
Scaffold
purpose is to limit the so called client drift that may happen when dealing with heterogenous datasset accross Nodes
.
In order to use Scaffold
, we will have to import another Aggregator
from fedbiomed.researcher.aggregators
module, as you can see below.
Scaffold
takes server_lr
and fds
the as arguments.
server_lr
is the server learning rate (inScaffold
, used to perform a gradient descent on global model's updatesfds
is theFederated Dataset
containing information aboutNodes
connected to the network after issuing aTrainRequest
Please note that it is possible to use Scaffold
with a regularization parameter as suggested in FedProx
. For that, you just have to specify fedprox_mu
into the training_args
dictionary, as shown in the FedProx
example
Attention: this version of Scaffold
exchanges correction terms that are not protected, even when using Secure Aggregation. Please do not use this version of Scaffold
under heavy security constraints.
# let's create a new folder for storing tensorbaord results for SCAFFOLD aggregator
scaffold_tensorboard_dir = os.path.join(environ['ROOT_DIR'], 'scaffold_runs')
os.makedirs(scaffold_tensorboard_dir, exist_ok=True)
environ['TENSORBOARD_RESULTS_DIR'] = scaffold_tensorboard_dir
%reload_ext tensorboard
tensorboard --logdir "$scaffold_tensorboard_dir"
from fedbiomed.researcher.aggregators import Scaffold
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
server_lr = .8
exp_scaffold = Experiment()
exp_scaffold.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_scaffold.set_model_args(model_args=model_args)
exp_scaffold.set_training_args(training_args=training_args)
exp_scaffold.set_tags(tags = tags)
exp_scaffold.set_training_data(training_data=None, from_tags=True)
exp_scaffold.set_aggregator(Scaffold(server_lr=server_lr))
exp_scaffold.set_strategy(node_selection_strategy=DefaultStrategy())
exp_scaffold.set_round_limit(rounds)
exp_scaffold.set_tensorboard(True)
exp_scaffold.run(increase=True)
Save trained model to file
exp_scaffold.training_plan().export_model('./trained_model')
4. Going further¶
In this tutorial we presented 3 important Aggregators
that can be found in the Federated Learning Literature. If you want to create your custom Aggregator
, please check our Aggregation User guide
You may have noticed that thanks to Fed-BioMed's modular structure, it is possible to alternate from one aggregator to another while conducting an Experiment
. For instance, you may start with SCAFFOLD
Aggregator
for the 3 first rounds, and then switch to FedAverage
Aggregator
for the remaining rounds, as shown in the example below:
from fedbiomed.researcher.aggregators import Scaffold, FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy
server_lr = .8
exp_multi_agg = Experiment()
# selecting how many rounds of each aggregator we will perform
rounds_scaffold = 3
rounds_fedavg = 1
exp_multi_agg.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_multi_agg.set_model_args(model_args=model_args)
exp_multi_agg.set_training_args(training_args=training_args)
exp_multi_agg.set_tags(tags = tags)
exp_multi_agg.set_training_data(training_data=None, from_tags=True)
exp_multi_agg.set_aggregator(Scaffold(server_lr=server_lr))
exp_multi_agg.set_strategy(node_selection_strategy=DefaultStrategy())
exp_multi_agg.set_round_limit(rounds_scaffold + rounds_fedavg)
#exp_multi_agg.run(rounds=rounds_scaffold)
exp_multi_agg.set_aggregator(FedAverage())
exp_multi_agg.run(rounds=rounds_fedavg)
Save trained model to file
exp_multi_agg.training_plan().export_model('./trained_model')
For more advanced Aggregators and Regularizers, like FedOpt
, you may be interested by DecLearn
optimizers that are compatible with Fed-BioMed and provide more options for Aggregation and Optimization.