Fed-BioMed to train a federated SGD regressor model¶
Data¶
This tutorial shows how to deploy in Fed-BioMed to solve a federated regression problem with scikit-learn.
In this tutorial we are using the wrapper of Fed-BioMed for the SGD regressor. The goal of the notebook is to train a model on a realistic dataset of (synthetic) medical information mimicking the ADNI dataset.
Creating nodes¶
To proceed with the tutorial, we create 3 clients with corresponding dataframes of clinical information in .csv format. Each client has 300 data points composed by several features corresponding to clinical and medical imaging information. The data is entirely synthetic and randomly sampled to mimick the variability of the real ADNI dataset. The training partitions are available at the following link:
https://drive.google.com/file/d/1R39Ir60oQi8ZnmHoPz5CoGCrVIglcO9l/view?usp=sharing
The federated task we aim at solve is to predict a clinical variable (the mini-mental state examination, MMSE) from a combination of demographic and imaging features. The regressors variables are the following features:
['SEX', 'AGE', 'PTEDUCAT', 'WholeBrain.bl', 'Ventricles.bl', 'Hippocampus.bl', 'MidTemp.bl', 'Entorhinal.bl']
and the target variable is:
['MMSE.bl']
To create the federated dataset, we follow the standard procedure for node creation/population of Fed-BioMed.
we create a first node by using the commands
$ source ./scripts/fedbiomed_environment node`
$ ./scripts/fedbiomed_run node --config n1.ini start
We then populate the node with the data of first client:
$ ./scripts/fedbiomed_run node --config n1.ini dataset add`
We select option 1 (csv) to add the .csv partition of client 1, by just picking the .csv of client 1. We use adni
as tag to save the selected dataset. We can further check that the data has been added by executing ./scripts/fedbiomed_run node --config n1.ini dataset list
Following the same procedure, we create the other two nodes with the datasets of client 2 and client 3 respectively.
Fed-BioMed Researcher¶
We are now ready to start the researcher environment with the following command. This command will activate researcher environment and start Jupyter Notebook.
$ ./scripts/fedbiomed_run researcher start
We can first query the network for the adni
dataset. In this case, the nodes are sharing the respective partitions using the same tag adni
:
from fedbiomed.researcher.requests import Requests
req = Requests()
req.list(verbose=True)
Create an experiment to train a model on the data found¶
The class FedSGDRegressor
constitutes the Fed-BioMed wrapper for executing Federated Learning using Scikit-Learn SGDRegressor
model based on mini-batch Stochastic Gradient Descent (SGD). As we have done with Pytorch model in previous chapter, we create a new training plan class SGDRegressorTrainingPlan
that inherits from it. For a refresher on how Training Plans work in Fed-BioMed, please refer to our Training Plan user guide.
In scikit-learn Training Plans, you typically need to define only the training_data
function, and optionally an init_dependencies
function if your code requires additional module imports.
The training_data
function defines how datasets should be loaded in nodes to make them ready for training. It takes a batch_size
argument and returns a DataManager
class. For scikit-learn, the DataManager
must be instantiated with a dataset
and a target
argument, both np.ndarrays
of the same length.
We note that this model performs a common standardization across federated datasets by centering with respect to the same parameters.
import numpy as np
from fedbiomed.common.training_plans import FedSGDRegressor
from fedbiomed.common.data import DataManager
class SGDRegressorTrainingPlan(FedSGDRegressor):
def training_data(self):
dataset = pd.read_csv(self.dataset_path,delimiter=',')
regressors_col = ['AGE', 'WholeBrain.bl',
'Ventricles.bl', 'Hippocampus.bl', 'MidTemp.bl', 'Entorhinal.bl']
target_col = ['MMSE.bl']
# mean and standard deviation for normalizing dataset
# it has been computed over the whole dataset
scaling_mean = np.array([72.3, 0.7, 0.0, 0.0, 0.0, 0.0])
scaling_sd = np.array([7.3e+00, 5.0e-02, 1.1e-02, 1.0e-03, 2.0e-03, 1.0e-03])
X = (dataset[regressors_col].values-scaling_mean)/scaling_sd
y = dataset[target_col]
return DataManager(dataset=X, target=y.values.ravel(), )
Provide dynamic arguments for the model and training. These may potentially be changed at every round.
Model arguments¶
model_args
is a dictionary with the arguments related to the model, that will be passed to the SGDRegressor
constructor. In this case, these include n_features
, random_state
and eta0
.
IMPORTANT For regression tasks, you are required to specify the following field:
n_features
: the number of features in each input sample (in our case, the number of pixels in the images)
Training arguments¶
training_args
is a dictionary containing the arguments for the training routine (e.g. batch size, learning rate, epochs, etc.). This will be passed to the routine on the node side.
from fedbiomed.common.metrics import MetricTypes
RANDOM_SEED = 1234
model_args = {
'eta0':0.05,
'n_features': 6,
'random_state': RANDOM_SEED
}
training_args = {
'epochs': 1,
'test_ratio':.2,
'test_metric': MetricTypes.MEAN_SQUARE_ERROR,
'test_on_local_updates': True,
'test_on_global_updates': True,
'loader_args': { 'batch_size': 30, },
# 'batch_maxnum': 2, # can be used to debugging to limit the number of batches per epoch
# 'log_interval': 1, # output a logging message every log_interval batches
}
The experiment can be now defined, by providing the adni
tag, and running the local training on nodes with training plan defined in training_plan_path
, standard aggregator
(FedAvg) and client_selection_strategy
(all nodes used). Federated learning is going to be performed through 10 optimization rounds.
from fedbiomed.researcher.federated_workflows import Experiment
from fedbiomed.researcher.aggregators.fedavg import FedAverage
tags = ['adni']
# Add more rounds for results with better accuracy
#
#rounds = 40
rounds = 10
# select nodes participating in this experiment
exp = Experiment(tags=tags,
model_args=model_args,
training_plan_class=SGDRegressorTrainingPlan,
training_args=training_args,
round_limit=rounds,
aggregator=FedAverage(),
node_selection_strategy=None)
# start federated training
exp.run(increase=True)
Save trained model to file
exp.training_plan().export_model('./trained_model')
Testing¶
Once the federated model is obtained, it is possible to test it locally on an independent testing partition. The test dataset is available at this link:
https://drive.google.com/uc?id=19kxuI146WA2fhcOU2_AvF8dy-ppJkzW7
!pip install matplotlib
!pip install gdown
Download the testing dataset on the local temporary folder.
import os
import gdown
import tempfile
import zipfile
import pandas as pd
import numpy as np
from fedbiomed.researcher.environ import environ
resource = "https://drive.google.com/uc?id=19kxuI146WA2fhcOU2_AvF8dy-ppJkzW7"
tmpdir = tempfile.TemporaryDirectory(dir=environ['TMP_DIR'])
base_dir = tmpdir.name
test_file = os.path.join(base_dir, "test_data.zip")
gdown.download(resource, test_file, quiet=False)
zf = zipfile.ZipFile(test_file)
for file in zf.infolist():
zf.extract(file, base_dir)
# loading testing dataset
test_data = pd.read_csv(os.path.join(base_dir,'adni_validation.csv'))
from sklearn.linear_model import SGDRegressor
import matplotlib.pyplot as plt
%matplotlib inline
Here we extract the relevant regressors and target from the testing data
regressors_col = ['AGE', 'WholeBrain.bl', 'Ventricles.bl', 'Hippocampus.bl', 'MidTemp.bl', 'Entorhinal.bl']
target_col = ['MMSE.bl']
X_test = test_data[regressors_col].values
y_test = test_data[target_col].values
To inspect the model evolution across FL rounds, we export exp.aggregated_params()
containing models parameters collected at the end of each round. The MSE should be decreasing at each iteration with the federated parameters.
scaling_mean = np.array([72.3, 0.7, 0.0, 0.0, 0.0, 0.0])
scaling_sd = np.array([7.3e+00, 5.0e-02, 1.1e-02, 1.0e-03, 2.0e-03, 1.0e-03])
testing_error = []
# we create here several instances of SGDRegressor using same sklearn arguments
# we have used for Federated Learning training
fed_model = exp.training_plan().model()
regressor_args = {key: model_args[key] for key in model_args.keys() if key in fed_model.get_params().keys()}
for i in range(rounds):
fed_model.coef_ = exp.aggregated_params()[i]['params']['coef_'].copy()
fed_model.intercept_ = exp.aggregated_params()[i]['params']['intercept_'].copy()
mse = np.mean((fed_model.predict((X_test-scaling_mean)/scaling_sd) - y_test)**2)
testing_error.append(mse)
plt.plot(testing_error)
plt.title('FL testing loss')
plt.xlabel('FL round')
plt.ylabel('testing loss (MSE)')
We finally inspect the predictions of the final federated model on the testing data.
y_predicted = fed_model.predict((X_test-scaling_mean)/scaling_sd)
plt.scatter(y_predicted, y_test, label='model prediction')
plt.xlabel('predicted')
plt.ylabel('target')
plt.title('Federated model testing prediction')
first_diag = np.arange(np.min(y_test.flatten()),
np.max(y_test.flatten()+1))
plt.scatter(first_diag, first_diag, label='correct Target')
plt.legend()