implementation of Round class of the node component
Attributes
Classes
Round
Round(training_plan, training_plan_class, model_kwargs, training_kwargs, training, dataset, params, experiment_id, researcher_id, history_monitor, aggregator_args, node_args, round_number=0, dlp_and_loading_block_metadata=None, aux_vars=None)
This class represents the training part execute by a node in a given round
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_plan | str | code of the training plan for this round | required |
training_plan_class | str | class name of the training plan | required |
model_kwargs | dict | contains model args. Defaults to None. | required |
training_kwargs | dict | contains training arguments. Defaults to None. | required |
training | bool | whether to perform a model training or just to perform a validation check (model infering) | required |
dataset | dict | dataset details to use in this round. It contains the dataset name, dataset's id, data path, its shape, its description... . Defaults to None. | required |
params | str | parameters of the model | required |
experiment_id | str | experiment id | required |
researcher_id | str | researcher id | required |
history_monitor | HistoryMonitor | Sends real-time feed-back to end-user during training | required |
aggregator_args | Dict[str, Any] | Arguments managed by and shared with the researcher-side aggregator. | required |
node_args | Dict | command line arguments for node. Can include: - | required |
dlp_and_loading_block_metadata | Optional[Tuple[dict, List[dict]]] | Data loading plan to apply, or None if no DLP for this round. | None |
round_number | int | number of the iteration for this experiment | 0 |
aux_var | auxiliary variables of the model. | required |
Source code in fedbiomed/node/round.py
def __init__(
self,
training_plan: str,
training_plan_class: str,
model_kwargs: dict,
training_kwargs: dict,
training: bool ,
dataset: dict,
params: str,
experiment_id: str,
researcher_id: str,
history_monitor: HistoryMonitor,
aggregator_args: Dict[str, Any],
node_args: Dict,
round_number: int = 0,
dlp_and_loading_block_metadata: Optional[Tuple[dict, List[dict]]] = None,
aux_vars: Optional[List[str]] = None,
) -> None:
"""Constructor of the class
Args:
training_plan: code of the training plan for this round
training_plan_class: class name of the training plan
model_kwargs: contains model args. Defaults to None.
training_kwargs: contains training arguments. Defaults to None.
training: whether to perform a model training or just to perform a validation check (model infering)
dataset: dataset details to use in this round. It contains the dataset name, dataset's id,
data path, its shape, its description... . Defaults to None.
params: parameters of the model
experiment_id: experiment id
researcher_id: researcher id
history_monitor: Sends real-time feed-back to end-user during training
aggregator_args: Arguments managed by and shared with the
researcher-side aggregator.
node_args: command line arguments for node. Can include:
- `gpu (bool)`: propose use a GPU device if any is available.
- `gpu_num (Union[int, None])`: if not None, use the specified GPU device instead of default
GPU device if this GPU device is available.
- `gpu_only (bool)`: force use of a GPU device if any available, even if researcher
doesn't request for using a GPU.
dlp_and_loading_block_metadata: Data loading plan to apply, or None if no DLP for this round.
round_number: number of the iteration for this experiment
aux_var: auxiliary variables of the model.
"""
self.dataset = dataset
self.training_plan_source = training_plan
self.training_plan_class = training_plan_class
self.params = params
self.experiment_id = experiment_id
self.researcher_id = researcher_id
self.history_monitor = history_monitor
self.aggregator_args = aggregator_args
self.aux_vars = aux_vars or []
self.node_args = node_args
self.training = training
self._dlp_and_loading_block_metadata = dlp_and_loading_block_metadata
self.training_kwargs = training_kwargs
self.model_arguments = model_kwargs
# Class attributes
self.tp_security_manager = TrainingPlanSecurityManager()
self.training_plan = None
self.testing_arguments = None
self.loader_arguments = None
self.training_arguments = None
self._secure_aggregation = None
self._round = round_number
self._node_state_manager: NodeStateManager = NodeStateManager(environ['DB_PATH'])
self._keep_files_dir = tempfile.mkdtemp(prefix=environ['TMP_DIR'])
Attributes
aggregator_args instance-attribute
aggregator_args = aggregator_args
aux_vars instance-attribute
aux_vars = aux_vars or []
dataset instance-attribute
dataset = dataset
experiment_id instance-attribute
experiment_id = experiment_id
history_monitor instance-attribute
history_monitor = history_monitor
loader_arguments instance-attribute
loader_arguments = None
model_arguments instance-attribute
model_arguments = model_kwargs
node_args instance-attribute
node_args = node_args
params instance-attribute
params = params
researcher_id instance-attribute
researcher_id = researcher_id
testing_arguments instance-attribute
testing_arguments = None
tp_security_manager instance-attribute
tp_security_manager = TrainingPlanSecurityManager()
training instance-attribute
training = training
training_arguments instance-attribute
training_arguments = None
training_kwargs instance-attribute
training_kwargs = training_kwargs
training_plan instance-attribute
training_plan = None
training_plan_class instance-attribute
training_plan_class = training_plan_class
training_plan_source instance-attribute
training_plan_source = training_plan
Functions
collect_optim_aux_var
collect_optim_aux_var()
Collect auxiliary variables from the wrapped Optimizer, if any.
If the TrainingPlan does not use a Fed-BioMed Optimizer, return an empty dict. If it does not hold any BaseOptimizer however, raise a FedbiomedRoundError.
Returns:
Type | Description |
---|---|
Dict[str, Any] | Auxiliary variables |
Source code in fedbiomed/node/round.py
def collect_optim_aux_var(self) -> Dict[str, Any]:
"""Collect auxiliary variables from the wrapped Optimizer, if any.
If the TrainingPlan does not use a Fed-BioMed Optimizer, return an
empty dict. If it does not hold any BaseOptimizer however, raise a
FedbiomedRoundError.
Returns:
Auxiliary variables
"""
optimizer = self._get_base_optimizer()
if isinstance(optimizer.optimizer, Optimizer):
aux_var = optimizer.optimizer.get_aux()
if aux_var and (self._secure_aggregation is None or self._secure_aggregation.use_secagg):
# TODO: remove the following warning when secagg compatibility has been fixed
# if secagg is used, raise a warning that encryption is not working with auxiliary variable
logger.warning(f'Node {environ["NODE_ID"]} optimizer is sending auxiliary variables to the '
'Researcher, but those are not encrypted with SecAgg.'
'Auxiliary Variables may contain sensitive information about the Nodes.'
'This issue will be fixed in a future version of Fed-BioMed',
researcher_id=self.researcher_id)
return aux_var
return {}
initialize_arguments
initialize_arguments(previous_state_id=None)
Initializes arguments for training and testing and the NodeStateManager, the latter handling Node state loading and saving.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
previous_state_id | Optional[str] | previous Node state id. Defaults to None (which is the state_id default value for the first Round). | None |
Returns:
Type | Description |
---|---|
Optional[Dict[str, Any]] | A dictionary containing the error message if an error is triggered while parsing training and testing |
Optional[Dict[str, Any]] | arguments, None otherwise. |
Source code in fedbiomed/node/round.py
def initialize_arguments(self,
previous_state_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Initializes arguments for training and testing and the NodeStateManager, the latter handling
Node state loading and saving.
Args:
previous_state_id: previous Node state id. Defaults to None (which is the state_id default value for the first Round).
Returns:
A dictionary containing the error message if an error is triggered while parsing training and testing
arguments, None otherwise.
"""
# initialize Node State Manager
self._node_state_manager.initialize(previous_state_id=previous_state_id,
testing=not self.training)
return self._initialize_validate_training_arguments()
process_optim_aux_var
process_optim_aux_var()
Process researcher-emitted Optimizer auxiliary variables, if any.
Returns:
Type | Description |
---|---|
Optional[str] | Error message, empty if the operation was successful. |
Source code in fedbiomed/node/round.py
def process_optim_aux_var(self) -> Optional[str]:
"""Process researcher-emitted Optimizer auxiliary variables, if any.
Returns:
Error message, empty if the operation was successful.
"""
# Early-exit if there are no auxiliary variables to process.
if not any(self.aux_vars):
return
aux_vars = {}
aux_vars.update(self.aux_vars[0])
aux_vars.update(self.aux_vars[1])
# Fetch the training plan's BaseOptimizer.
try:
optimizer = self._get_base_optimizer()
except FedbiomedRoundError as exc:
return str(exc)
# Verify that the BaseOptimizer wraps an Optimizer.
if not isinstance(optimizer.optimizer, Optimizer):
return (
"Received Optimizer auxiliary variables, but the "
"TrainingPlan does not manage a compatible Optimizer."
)
# Pass auxiliary variables to the Optimizer.
try:
optimizer.optimizer.set_aux(aux_vars)
except FedbiomedOptimizerError as exc:
return (
"TrainingPlan Optimizer failed to ingest the provided "
f"auxiliary variables: {repr(exc)}"
)
return
run_model_training
run_model_training(secagg_arguments=None)
Runs one round of model training
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secagg_arguments | Union[Dict, None] | arguments for secure aggregation, some are specific to the scheme | None |
Returns:
Type | Description |
---|---|
Optional[Dict[str, Any]] | Returns the corresponding node message, training reply instance |
Source code in fedbiomed/node/round.py
def run_model_training(
self,
secagg_arguments: Union[Dict, None] = None,
) -> Optional[Dict[str, Any]]:
"""Runs one round of model training
Args:
secagg_arguments: arguments for secure aggregation, some are specific to the scheme
Returns:
Returns the corresponding node message, training reply instance
"""
# Validate secagg status. Raises error if the training request is not compatible with
# secure aggregation settings
try:
self._secure_aggregation = SecaggRound(secagg_arguments, self.experiment_id)
except FedbiomedSecureAggregationError as e:
logger.error(str(e))
return self._send_round_reply(
success=False,
message='Could not configure secure aggregation on node')
# Validate and load training plan
if environ["TRAINING_PLAN_APPROVAL"]:
approved, training_plan_ = self.tp_security_manager.\
check_training_plan_status(
self.training_plan_source,
TrainingPlanApprovalStatus.APPROVED)
if not approved:
return self._send_round_reply(
False,
f'Requested training plan is not approved by the node: {environ["NODE_ID"]}')
else:
logger.info(f'Training plan has been approved by the node {training_plan_["name"]}',
researcher_id=self.researcher_id)
# Import training plan, save to file, reload, instantiate a training plan
try:
CurrentTPModule, CurrentTrainingPlan = utils.import_class_from_spec(
code=self.training_plan_source, class_name=self.training_plan_class)
self.training_plan = CurrentTrainingPlan()
except Exception:
error_message = "Cannot instantiate training plan object."
return self._send_round_reply(success=False, message=error_message)
# save and load training plan to a file to be sure
# 1. a file is associated to training plan so we can read its source, etc.
# 2. all dependencies are applied
training_plan_module = 'model_' + str(uuid.uuid4())
training_plan_file = os.path.join(self._keep_files_dir, training_plan_module + '.py')
try:
self.training_plan.save_code(training_plan_file, from_code=self.training_plan_source)
except Exception as e:
error_message = "Cannot save the training plan to a local tmp dir"
logger.error(f"Cannot save the training plan to a local tmp dir : {e}")
return self._send_round_reply(success=False, message=error_message)
del CurrentTrainingPlan
del CurrentTPModule
try:
CurrentTPModule, self.training_plan = utils.import_class_object_from_file(
training_plan_file, self.training_plan_class)
except Exception:
error_message = "Cannot load training plan object from file."
return self._send_round_reply(success=False, message=error_message)
try:
self.training_plan.post_init(model_args=self.model_arguments,
training_args=self.training_arguments,
aggregator_args=self.aggregator_args)
except Exception:
error_message = "Can't initialize training plan with the arguments."
return self._send_round_reply(success=False, message=error_message)
# load node state
previous_state_id = self._node_state_manager.previous_state_id
if previous_state_id is not None:
try:
self._load_round_state(previous_state_id)
except Exception:
# don't send error details
return self._send_round_reply(success=False, message="Can't read previous node state.")
# Load model parameters received from researcher
try:
self.training_plan.set_model_params(self.params)
except Exception:
error_message = "Cannot initialize model parameters."
return self._send_round_reply(success=False, message=error_message)
# ---------------------------------------------------------------------
# Process Optimizer auxiliary variables, if any.
error_message = self.process_optim_aux_var()
if error_message:
return self._send_round_reply(success=False, message=error_message)
# Split training and validation data -------------------------------------
try:
self._set_training_testing_data_loaders()
except FedbiomedError as fe:
error_message = f"Can not create validation/train data: {repr(fe)}"
return self._send_round_reply(success=False, message=error_message)
except Exception as e:
error_message = f"Undetermined error while creating data for training/validation. Can not create " \
f"validation/train data: {repr(e)}"
return self._send_round_reply(success=False, message=error_message)
# ------------------------------------------------------------------------
# Validation Before Training
if self.testing_arguments.get('test_on_global_updates', False) is not False:
# Last control to make sure validation data loader is set.
if self.training_plan.testing_data_loader is not None:
try:
self.training_plan.testing_routine(metric=self.testing_arguments.get('test_metric', None),
metric_args=self.testing_arguments.get('test_metric_args', {}),
history_monitor=self.history_monitor,
before_train=True)
except FedbiomedError as e:
logger.error(f"{ErrorNumbers.FB314}: During the validation phase on global parameter updates; "
f"{repr(e)}", researcher_id=self.researcher_id)
except Exception as e:
logger.error(f"Undetermined error during the testing phase on global parameter updates: "
f"{repr(e)}", researcher_id=self.researcher_id)
else:
logger.error(f"{ErrorNumbers.FB314}: Can not execute validation routine due to missing testing dataset"
f"Please make sure that `test_ratio` has been set correctly",
researcher_id=self.researcher_id)
# If training is activated.
if self.training:
results = {}
# Perform the training round.
if self.training_plan.training_data_loader is not None:
try:
rtime_before = time.perf_counter()
ptime_before = time.process_time()
self.training_plan.training_routine(history_monitor=self.history_monitor,
node_args=self.node_args)
rtime_after = time.perf_counter()
ptime_after = time.process_time()
except Exception as exc:
error_message = f"Cannot train model in round: {repr(exc)}"
return self._send_round_reply(success=False, message=error_message)
# Collect Optimizer auxiliary variables, if any.
try:
results['optim_aux_var'] = self.collect_optim_aux_var()
except (FedbiomedOptimizerError, FedbiomedRoundError) as exc:
error_message = f"Cannot collect Optimizer auxiliary variables: {repr(exc)}"
return self._send_round_reply(success=False, message=error_message)
# Validation after training
if self.testing_arguments.get('test_on_local_updates', False) is not False:
if self.training_plan.testing_data_loader is not None:
try:
self.training_plan.testing_routine(metric=self.testing_arguments.get('test_metric', None),
metric_args=self.testing_arguments.get('test_metric_args',
{}),
history_monitor=self.history_monitor,
before_train=False)
except FedbiomedError as e:
logger.error(
f"{ErrorNumbers.FB314.value}: During the validation phase on local parameter updates; "
f"{repr(e)}", researcher_id=self.researcher_id)
except Exception as e:
logger.error(f"Undetermined error during the validation phase on local parameter updates"
f"{repr(e)}", researcher_id=self.researcher_id)
else:
logger.error(
f"{ErrorNumbers.FB314.value}: Can not execute validation routine due to missing testing "
f"dataset please make sure that test_ratio has been set correctly",
researcher_id=self.researcher_id)
# FIXME: this will fail if `self.training_plan.training_data_loader = None` (see issue )
results["sample_size"] = len(self.training_plan.training_data_loader.dataset)
results["encrypted"] = False
model_weights = self.training_plan.after_training_params(flatten=self._secure_aggregation.use_secagg)
if self._secure_aggregation.use_secagg:
logger.info("Encrypting model parameters. This process can take some time depending on model size.",
researcher_id=self.researcher_id)
model_weights = self._secure_aggregation.scheme.encrypt(
params=model_weights,
current_round=self._round,
weight=results['sample_size'],
)
results["encrypted"] = True
results["encryption_factor"] = None
if self._secure_aggregation.scheme.secagg_random is not None and environ['SECAGG_INSECURE_VALIDATION']:
results["encryption_factor"] = self._secure_aggregation.scheme.encrypt(
params=[self._secure_aggregation.scheme.secagg_random],
current_round=self._round,
weight=results['sample_size'])
logger.info("Encryption is completed!",
researcher_id=self.researcher_id)
results['params'] = model_weights
results['optimizer_args'] = self.training_plan.optimizer_args()
results['state_id'] = self._node_state_manager.state_id
try:
self._save_round_state()
except Exception:
# don't send details to researcher
return self._send_round_reply(success=False, message="Can't save new node state.")
# end : clean the namespace
try:
del self.training_plan
del CurrentTPModule
except Exception:
logger.debug(f'Exception raised while deleting training plan instance')
return self._send_round_reply(success=True,
timing={'rtime_training': rtime_after - rtime_before,
'ptime_training': ptime_after - ptime_before},
extend_with=results)
else:
# Only for validation
return self._send_round_reply(success=True)