• Home
  • User Documentation
  • About
  • More
    • Funding
    • News
    • Contributors
    • Users
    • Roadmap
    • Contact Us
  • Home
  • User Documentation
  • About
  • More
    • Funding
    • News
    • Contributors
    • Users
    • Roadmap
    • Contact Us
  • Getting Started
    • What's Fed-BioMed
    • Fedbiomed Architecture
    • Fedbiomed Workflow
    • Installation
    • Basic Example
    • Configuration
  • Tutorials
    • PyTorch
      • PyTorch MNIST Basic Example
      • How to Create Your Custom PyTorch Training Plan
      • PyTorch Used Cars Dataset Example
      • PyTorch aggregation methods in Fed-BioMed
      • Transfer-learning in Fed-BioMed tutorial
    • MONAI
      • Federated 2d image classification with MONAI
      • Federated 2d XRay registration with MONAI
    • Scikit-Learn
      • MNIST classification with Scikit-Learn Classifier (Perceptron)
      • Fed-BioMed to train a federated SGD regressor model
      • Implementing other Scikit Learn models for Federated Learning
    • Optimizers
      • Advanced optimizers in Fed-BioMed
    • FLamby
      • General Concepts
      • FLamby integration in Fed-BioMed
    • Advanced
      • In Depth Experiment Configuration
      • PyTorch model training using a GPU
      • Breakpoints
    • Security
      • Using Differential Privacy with OPACUS on Fed-BioMed
      • Local and Central DP with Fed-BioMed: MONAI 2d image registration
      • Training Process with Training Plan Management
      • Training with Secure Aggregation
      • End-to-end Privacy Preserving Training and Inference on Medical Data
    • Biomedical data
      • Brain Segmentation
  • User Guide
    • Glossary
    • Deployment
      • Introduction
      • VPN Deployment
      • Network matrix
      • Security model
    • Node
      • Configuring Nodes
      • Deploying Datasets
      • Training Plan Management
      • Using GPU
      • Node GUI
    • Researcher
      • Training Plan
      • Training Data
      • Experiment
      • Aggregation
      • Listing Datasets and Selecting Nodes
      • Model Validation on the Node Side
      • Tensorboard
    • Optimization
    • Secure Aggregation
      • Introduction
      • Configuration
      • Managing Secure Aggregation in Researcher
  • Developer
    • API Reference
      • Common
        • Certificate Manager
        • CLI
        • Config
        • Constants
        • Data
        • DB
        • Exceptions
        • IPython
        • Json
        • Logger
        • Message
        • Metrics
        • Model
        • MPC controller
        • Optimizers
        • Privacy
        • Secagg
        • Secagg Manager
        • Serializer
        • Singleton
        • Synchro
        • TasksQueue
        • TrainingPlans
        • TrainingArgs
        • Utils
        • Validator
      • Node
        • CLI
        • CLI Utils
        • Config
        • DatasetManager
        • HistoryMonitor
        • Node
        • NodeStateManager
        • Requests
        • Round
        • Secagg
        • Secagg Manager
        • TrainingPlanSecurityManager
      • Researcher
        • Aggregators
        • CLI
        • Config
        • Datasets
        • Federated Workflows
        • Filetools
        • Jobs
        • Monitor
        • NodeStateAgent
        • Requests
        • Secagg
        • Strategies
      • Transport
        • Client
        • Controller
        • NodeAgent
        • Server
    • Usage and Tools
    • Continuous Integration
    • Definition of Done
    • Development Environment
    • Testing in Fed-BioMed
    • RPC Protocol and Messages
  • FAQ & Troubleshooting
Download Notebook

PyTorch aggregation methods in Fed-BioMed¶

Difficulty level: advanced

Introduction¶

This tutorial focuses on how to deal with heterogeneous dataset by changing its Aggegator. Fed-BioMed provides different methods for Aggregation. Selecting an appropriate Aggregation method can be critical when being confronted to unbalanced /heterogeneous datasets.

Aggregators provides a way to merge local models sent by Nodes into a global, more generalized model. Please note that designing Nodes sampling Strategies could also help when working on heterogeneous datasets.

For more information about Aggregators object in Fed-BioMed, and on how to create your own Aggregator; please see Aggregators in the User Guide

Before you start¶

For this tutorial, we will be using heterogenous Fed-IXI dataset, provided by FLamby. FLamby comes with a few medical datasets that have heterogenous data properties. Please have a look at the notebooks on how to use FLamby in Fed-BioMed tutorials before starting - you will indeed need to set up FLamby before running this tutorial.

1. Defining an Experiment using FedAverage Aggregator¶

First, let's re-use the TorchTrainingPlan that is defined in the FLamby tutorials. FedAveraging has been introduced by McMahan et al. as the first aggregation method in the Federated Learning literature. It does the weighted sum of all Nodes local models parameters in order to obtain a global model:

In this tutorial, we will keep the same TrainingPlan (and thus the same model) for all the Experimentations, we will be changing only Aggregators

In [ ]:
Copied!
from fedbiomed.common.training_plans import TorchTrainingPlan
from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer
from fedbiomed.common.data import DataManager
from fedbiomed.common.data.flamby_dataset import FlambyDataset
       

class MyTrainingPlan(TorchTrainingPlan):
    def init_model(self, model_args):
        return Baseline()

    def init_optimizer(self, optimizer_args):
        return Optimizer(self.model().parameters(), lr=optimizer_args["lr"])

    def init_dependencies(self):
        return ["from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer",
                "from fedbiomed.common.data import DataManager",
                "from fedbiomed.common.data.flamby_dataset import FlambyDataset"]

    def training_step(self, data, target):
        output = self.model().forward(data)
        return BaselineLoss().forward(output, target)

    def training_data(self):
        dataset = FlambyDataset()
        loader_arguments = { 'shuffle': True}
        return DataManager(dataset, **loader_arguments)
from fedbiomed.common.training_plans import TorchTrainingPlan from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer from fedbiomed.common.data import DataManager from fedbiomed.common.data.flamby_dataset import FlambyDataset class MyTrainingPlan(TorchTrainingPlan): def init_model(self, model_args): return Baseline() def init_optimizer(self, optimizer_args): return Optimizer(self.model().parameters(), lr=optimizer_args["lr"]) def init_dependencies(self): return ["from flamby.datasets.fed_ixi import Baseline, BaselineLoss, Optimizer", "from fedbiomed.common.data import DataManager", "from fedbiomed.common.data.flamby_dataset import FlambyDataset"] def training_step(self, data, target): output = self.model().forward(data) return BaselineLoss().forward(output, target) def training_data(self): dataset = FlambyDataset() loader_arguments = { 'shuffle': True} return DataManager(dataset, **loader_arguments)

We define hereafter parameters for Experiment to be used with vanilla FedAverage

In [ ]:
Copied!
model_args = {}

training_args = {
    'random_seed': 1234,
    'loader_args': { 'batch_size': 8, },
    'optimizer_args': {
        "lr" : 1e-3
    },
    'dry_run': False,
    'num_updates': 50
}
model_args = {} training_args = { 'random_seed': 1234, 'loader_args': { 'batch_size': 8, }, 'optimizer_args': { "lr" : 1e-3 }, 'dry_run': False, 'num_updates': 50 }

Activate Tensorboard

In [ ]:
Copied!
%load_ext tensorboard
%load_ext tensorboard
In [ ]:
Copied!
from fedbiomed.researcher.config import config
import os


fedavg_tensorboard_dir = os.path.join(config.root, 'fedavg_runs')
os.makedirs(fedavg_tensorboard_dir, exist_ok=True)
config.vars['TENSORBOARD_RESULTS_DIR'] = fedavg_tensorboard_dir
from fedbiomed.researcher.config import config import os fedavg_tensorboard_dir = os.path.join(config.root, 'fedavg_runs') os.makedirs(fedavg_tensorboard_dir, exist_ok=True) config.vars['TENSORBOARD_RESULTS_DIR'] = fedavg_tensorboard_dir
In [ ]:
Copied!
tensorboard --logdir "$fedavg_tensorboard_dir"
tensorboard --logdir "$fedavg_tensorboard_dir"

We then import FedAverage Aggregator from Fed-BioMed's Aggregators

In [ ]:
Copied!
from fedbiomed.researcher.federated_workflows import Experiment
from fedbiomed.researcher.aggregators import FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy

tags =  ['flixi']

rounds = 3


exp_fed_avg = Experiment()
exp_fed_avg.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_fed_avg.set_model_args(model_args=model_args)
exp_fed_avg.set_training_args(training_args=training_args)
exp_fed_avg.set_tags(tags = tags)
exp_fed_avg.set_training_data(training_data=None, from_tags=True)
exp_fed_avg.set_aggregator(aggregator=FedAverage())
exp_fed_avg.set_strategy(node_selection_strategy=DefaultStrategy())
exp_fed_avg.set_round_limit(rounds)
exp_fed_avg.set_tensorboard(True)
from fedbiomed.researcher.federated_workflows import Experiment from fedbiomed.researcher.aggregators import FedAverage from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy tags = ['flixi'] rounds = 3 exp_fed_avg = Experiment() exp_fed_avg.set_training_plan_class(training_plan_class=MyTrainingPlan) exp_fed_avg.set_model_args(model_args=model_args) exp_fed_avg.set_training_args(training_args=training_args) exp_fed_avg.set_tags(tags = tags) exp_fed_avg.set_training_data(training_data=None, from_tags=True) exp_fed_avg.set_aggregator(aggregator=FedAverage()) exp_fed_avg.set_strategy(node_selection_strategy=DefaultStrategy()) exp_fed_avg.set_round_limit(rounds) exp_fed_avg.set_tensorboard(True)
In [ ]:
Copied!
exp_fed_avg.run(increase=True)
exp_fed_avg.run(increase=True)

Save trained model to file

In [ ]:
Copied!
exp_fed_avg.training_plan().export_model('./trained_model')
exp_fed_avg.training_plan().export_model('./trained_model')

2. Defining an Experiment using FedProx Aggregator¶

In order to improve our results, we can change our Aggregator, by changing FedAverage into FedProx. Since FedProx is a FedAverge aggregator with a regularization term, we are re-using FedAverage Aggregator but we will be adding to the training_args fedprox_mu, that is the regularization parameter.

In [ ]:
Copied!
# let's create a new folder for storing tensorbaord results for FedProx aggregator
import os
from fedbiomed.researcher.config import config


fedprox_tensorboard_dir = os.path.join(config.root, 'fedprox_runs')
os.makedirs(fedprox_tensorboard_dir, exist_ok=True)

config.vars['TENSORBOARD_RESULTS_DIR'] = fedprox_tensorboard_dir
# let's create a new folder for storing tensorbaord results for FedProx aggregator import os from fedbiomed.researcher.config import config fedprox_tensorboard_dir = os.path.join(config.root, 'fedprox_runs') os.makedirs(fedprox_tensorboard_dir, exist_ok=True) config.vars['TENSORBOARD_RESULTS_DIR'] = fedprox_tensorboard_dir
In [ ]:
Copied!
%reload_ext tensorboard
%reload_ext tensorboard
In [ ]:
Copied!
tensorboard --logdir "$fedprox_tensorboard_dir"
tensorboard --logdir "$fedprox_tensorboard_dir"
In [ ]:
Copied!
model_args = {}

training_args_fedprox = {
    'random_seed': 1234,
    'loader_args': { 'batch_size': 8, },
    'optimizer_args': {
        "lr" : 1e-3
    },
    'dry_run': False,
    'num_updates': 50, 
    'fedprox_mu': .1  # This parameter indicates that we are going to use FedProx
}
model_args = {} training_args_fedprox = { 'random_seed': 1234, 'loader_args': { 'batch_size': 8, }, 'optimizer_args': { "lr" : 1e-3 }, 'dry_run': False, 'num_updates': 50, 'fedprox_mu': .1 # This parameter indicates that we are going to use FedProx }
In [ ]:
Copied!
from fedbiomed.researcher.federated_workflows import Experiment
from fedbiomed.researcher.aggregators import FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy

tags =  ['flixi']
rounds = 3

exp_fedprox = Experiment()


exp_fedprox.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_fedprox.set_model_args(model_args=model_args)
exp_fedprox.set_training_args(training_args=training_args_fedprox)
exp_fedprox.set_tags(tags = tags)
exp_fedprox.set_training_data(training_data=None, from_tags=True)
exp_fedprox.set_aggregator(aggregator=FedAverage())
exp_fedprox.set_strategy(node_selection_strategy=DefaultStrategy())
exp_fedprox.set_round_limit(rounds)
exp_fedprox.set_tensorboard(True)
from fedbiomed.researcher.federated_workflows import Experiment from fedbiomed.researcher.aggregators import FedAverage from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy tags = ['flixi'] rounds = 3 exp_fedprox = Experiment() exp_fedprox.set_training_plan_class(training_plan_class=MyTrainingPlan) exp_fedprox.set_model_args(model_args=model_args) exp_fedprox.set_training_args(training_args=training_args_fedprox) exp_fedprox.set_tags(tags = tags) exp_fedprox.set_training_data(training_data=None, from_tags=True) exp_fedprox.set_aggregator(aggregator=FedAverage()) exp_fedprox.set_strategy(node_selection_strategy=DefaultStrategy()) exp_fedprox.set_round_limit(rounds) exp_fedprox.set_tensorboard(True)
In [ ]:
Copied!
exp_fedprox.run(increase=True)
exp_fedprox.run(increase=True)

Save trained model to file

In [ ]:
Copied!
exp_fedprox.training_plan().export_model('./trained_model')
exp_fedprox.training_plan().export_model('./trained_model')

3. Defining an Experiment using SCAFFOLD Aggregator¶

Scaffold purpose is to limit the so called client drift that may happen when dealing with heterogenous datasset accross Nodes.

In order to use Scaffold, we will have to import another Aggregator from fedbiomed.researcher.aggregators module, as you can see below.

Scaffold takes server_lr and fds the as arguments.

  • server_lr is the server learning rate (in Scaffold, used to perform a gradient descent on global model's updates
  • fds is the Federated Dataset containing information about Nodes connected to the network after issuing a TrainRequest

Please note that it is possible to use Scaffold with a regularization parameter as suggested in FedProx. For that, you just have to specify fedprox_mu into the training_args dictionary, as shown in the FedProx example

Attention: this version of Scaffold exchanges correction terms that are not protected, even when using Secure Aggregation. Please do not use this version of Scaffold under heavy security constraints.

In [ ]:
Copied!
# let's create a new folder for storing tensorbaord results for SCAFFOLD aggregator
scaffold_tensorboard_dir = os.path.join(config.root, 'scaffold_runs')
os.makedirs(scaffold_tensorboard_dir, exist_ok=True)

config.vars['TENSORBOARD_RESULTS_DIR'] = scaffold_tensorboard_dir
# let's create a new folder for storing tensorbaord results for SCAFFOLD aggregator scaffold_tensorboard_dir = os.path.join(config.root, 'scaffold_runs') os.makedirs(scaffold_tensorboard_dir, exist_ok=True) config.vars['TENSORBOARD_RESULTS_DIR'] = scaffold_tensorboard_dir
In [ ]:
Copied!
%reload_ext tensorboard
%reload_ext tensorboard
In [ ]:
Copied!
tensorboard --logdir "$scaffold_tensorboard_dir"
tensorboard --logdir "$scaffold_tensorboard_dir"
In [ ]:
Copied!
from fedbiomed.researcher.aggregators import Scaffold
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy

server_lr = .8
exp_scaffold = Experiment()

exp_scaffold.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_scaffold.set_model_args(model_args=model_args)
exp_scaffold.set_training_args(training_args=training_args)
exp_scaffold.set_tags(tags = tags)
exp_scaffold.set_training_data(training_data=None, from_tags=True)
exp_scaffold.set_aggregator(Scaffold(server_lr=server_lr))
exp_scaffold.set_strategy(node_selection_strategy=DefaultStrategy())
exp_scaffold.set_round_limit(rounds)
exp_scaffold.set_tensorboard(True)
from fedbiomed.researcher.aggregators import Scaffold from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy server_lr = .8 exp_scaffold = Experiment() exp_scaffold.set_training_plan_class(training_plan_class=MyTrainingPlan) exp_scaffold.set_model_args(model_args=model_args) exp_scaffold.set_training_args(training_args=training_args) exp_scaffold.set_tags(tags = tags) exp_scaffold.set_training_data(training_data=None, from_tags=True) exp_scaffold.set_aggregator(Scaffold(server_lr=server_lr)) exp_scaffold.set_strategy(node_selection_strategy=DefaultStrategy()) exp_scaffold.set_round_limit(rounds) exp_scaffold.set_tensorboard(True)
In [ ]:
Copied!
exp_scaffold.run(increase=True)
exp_scaffold.run(increase=True)

Save trained model to file

In [ ]:
Copied!
exp_scaffold.training_plan().export_model('./trained_model')
exp_scaffold.training_plan().export_model('./trained_model')

4. Going further¶

In this tutorial we presented 3 important Aggregators that can be found in the Federated Learning Literature. If you want to create your custom Aggregator, please check our Aggregation User guide

You may have noticed that thanks to Fed-BioMed's modular structure, it is possible to alternate from one aggregator to another while conducting an Experiment. For instance, you may start with SCAFFOLD Aggregator for the 3 first rounds, and then switch to FedAverage Aggregator for the remaining rounds, as shown in the example below:

In [ ]:
Copied!
from fedbiomed.researcher.aggregators import Scaffold, FedAverage
from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy

server_lr = .8
exp_multi_agg = Experiment()

# selecting how many rounds of each aggregator we will perform
rounds_scaffold = 3
rounds_fedavg = 1


exp_multi_agg.set_training_plan_class(training_plan_class=MyTrainingPlan)
exp_multi_agg.set_model_args(model_args=model_args)
exp_multi_agg.set_training_args(training_args=training_args)
exp_multi_agg.set_tags(tags = tags)
exp_multi_agg.set_training_data(training_data=None, from_tags=True)
exp_multi_agg.set_aggregator(Scaffold(server_lr=server_lr))
exp_multi_agg.set_strategy(node_selection_strategy=DefaultStrategy())
exp_multi_agg.set_round_limit(rounds_scaffold + rounds_fedavg)

#exp_multi_agg.run(rounds=rounds_scaffold)
from fedbiomed.researcher.aggregators import Scaffold, FedAverage from fedbiomed.researcher.strategies.default_strategy import DefaultStrategy server_lr = .8 exp_multi_agg = Experiment() # selecting how many rounds of each aggregator we will perform rounds_scaffold = 3 rounds_fedavg = 1 exp_multi_agg.set_training_plan_class(training_plan_class=MyTrainingPlan) exp_multi_agg.set_model_args(model_args=model_args) exp_multi_agg.set_training_args(training_args=training_args) exp_multi_agg.set_tags(tags = tags) exp_multi_agg.set_training_data(training_data=None, from_tags=True) exp_multi_agg.set_aggregator(Scaffold(server_lr=server_lr)) exp_multi_agg.set_strategy(node_selection_strategy=DefaultStrategy()) exp_multi_agg.set_round_limit(rounds_scaffold + rounds_fedavg) #exp_multi_agg.run(rounds=rounds_scaffold)
In [ ]:
Copied!
exp_multi_agg.set_aggregator(FedAverage())
exp_multi_agg.run(rounds=rounds_fedavg)
exp_multi_agg.set_aggregator(FedAverage()) exp_multi_agg.run(rounds=rounds_fedavg)

Save trained model to file

In [ ]:
Copied!
exp_multi_agg.training_plan().export_model('./trained_model')
exp_multi_agg.training_plan().export_model('./trained_model')

For more advanced Aggregators and Regularizers, like FedOpt, you may be interested by DecLearn optimizers that are compatible with Fed-BioMed and provide more options for Aggregation and Optimization.

Download Notebook
  • Introduction
    • Before you start
  • 1. Defining an Experiment using FedAverage Aggregator
  • 2. Defining an Experiment using FedProx Aggregator
  • 3. Defining an Experiment using SCAFFOLD Aggregator
  • 4. Going further
Address:

2004 Rte des Lucioles, 06902 Sophia Antipolis

E-mail:

fedbiomed _at_ inria _dot_ fr

Fed-BioMed © 2022