Manage the training part of the experiment.
Attributes
Typevar_TrainingPlanClass module-attribute
Typevar_TrainingPlanClass = TypeVar('Typevar_TrainingPlanClass', Type[TorchTrainingPlan], Type[SKLearnTrainingPlan])
training_plans_types module-attribute
training_plans_types = (TorchTrainingPlan, SKLearnTrainingPlan)
Classes
Job
Job(reqs=None, training_plan_class=None, training_args=None, model_args=None, data=None, keep_files_dir=None)
Represents the entity that manage the training part at the nodes level
Starts a message queue, loads python model file created by researcher (through [training_plans
][fedbiomed.common.training_plans]) and saves the loaded model in a file/
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reqs | Optional[Requests] | Researcher's requests assigned to nodes. Defaults to None. | None |
training_plan_class | Optional[Typevar_TrainingPlanClass] | Class containing the code of the TrainingPlan. | None |
training_args | TrainingArgs | Contains training parameters; lr, epochs, batch_size. | None |
model_args | dict | Contains output and input feature dimension | None |
data | FederatedDataSet | Federated datasets | None |
keep_files_dir | str | Directory for storing files created by the job that we want to keep beyond the execution of the job. Defaults to None, files are not kept after the end of the job. | None |
Raises:
Type | Description |
---|---|
FedbiomedJobError | bad argument type or value |
FedbiomedJobError | cannot save training plan to file |
Source code in fedbiomed/researcher/job.py
def __init__(self,
reqs: Optional[Requests] = None,
training_plan_class: Optional[Typevar_TrainingPlanClass] = None,
training_args: TrainingArgs = None,
model_args: dict = None,
data: FederatedDataSet = None,
keep_files_dir: str = None):
""" Constructor of the class
Args:
reqs: Researcher's requests assigned to nodes. Defaults to None.
training_plan_class: Class containing the code of the TrainingPlan.
training_args: Contains training parameters; lr, epochs, batch_size.
model_args: Contains output and input feature dimension
data: Federated datasets
keep_files_dir: Directory for storing files created by the job that we want to keep beyond the execution
of the job. Defaults to None, files are not kept after the end of the job.
Raises:
FedbiomedJobError: bad argument type or value
FedbiomedJobError: cannot save training plan to file
"""
# Check arguments
if not inspect.isclass(training_plan_class):
msg = f"{ErrorNumbers.FB418.value}: bad type for argument `training_plan_class` {type(training_plan_class)}"
raise FedbiomedJobError(msg)
if not issubclass(training_plan_class, training_plans_types):
msg = f"{ErrorNumbers.FB418.value}: bad type for argument `training_plan_class`. It is not subclass of " + \
f" supported training plans {training_plans_types}"
raise FedbiomedJobError(msg)
# List of node ID of the nodes used in the current round
# - initially None (no current round yet)
# - then updated during the round with the list of nodes to be used in the round, then the nodes
# that actually replied during the round
self._nodes : Optional[List[str]] = None
self._id = JOB_PREFIX + str(uuid.uuid4()) # creating a unique job id
self._researcher_id = environ['RESEARCHER_ID']
self._training_args = training_args
self._model_args = model_args
self._training_replies = {} # will contain all node replies for every round
self._model_file = None # path to local file containing model code
self._model_params_file = "" # path to local file containing current version of aggregated params
self._training_plan_class = training_plan_class
self._aggregator_args = None
if keep_files_dir:
self._keep_files_dir = keep_files_dir
else:
self._keep_files_dir = tempfile.mkdtemp(prefix=environ['TMP_DIR'])
atexit.register(lambda: shutil.rmtree(self._keep_files_dir)) # remove directory
# when script ends running (replace
# `with tempfile.TemporaryDirectory(dir=environ['TMP_DIR']) as self._keep_files_dir: `)
if reqs is None:
self._reqs = Requests()
else:
self._reqs = reqs
self.last_msg = None
self._data = data
self._node_state_agent = NodeStateAgent(list(self._data.data().keys())
if self._data and self._data.data() else [])
# create TrainingPlan instance
self._training_plan = self._training_plan_class() # contains TrainingPlan
# save and load training plan to a file to be sure
# 1. a file is associated to training plan so we can read its source, etc.
# 2. all dependencies are applied
training_plan_module = 'model_' + str(uuid.uuid4())
self._training_plan_file = os.path.join(self._keep_files_dir, training_plan_module + '.py')
try:
self._training_plan.save_code(self._training_plan_file)
except Exception as e:
msg = f"{ErrorNumbers.FB418}: cannot save training plan to file: {e}"
logger.critical(msg)
raise FedbiomedJobError(msg)
del self._training_plan
_, self._training_plan = utils.import_class_object_from_file(
self._training_plan_file, self._training_plan_class.__name__)
self._training_plan.post_init(model_args={} if self._model_args is None else self._model_args,
training_args=self._training_args)
Attributes
aggregator_args property
aggregator_args
id property
id
last_msg instance-attribute
last_msg = None
nodes property
writable
nodes
requests property
requests
training_args property
writable
training_args
training_plan property
training_plan
training_plan_file property
training_plan_file
training_replies property
training_replies
Functions
check_training_plan_is_approved_by_nodes
check_training_plan_is_approved_by_nodes()
Checks whether model is approved or not.
This method sends training-plan-status
request to the nodes. It should be run before running experiment. So, researchers can find out if their model has been approved
Returns:
Type | Description |
---|---|
Dict | A dict of |
Source code in fedbiomed/researcher/job.py
def check_training_plan_is_approved_by_nodes(self) -> Dict:
""" Checks whether model is approved or not.
This method sends `training-plan-status` request to the nodes. It should be run before running experiment.
So, researchers can find out if their model has been approved
Returns:
A dict of `Message` objects indexed by node ID, one for each job's nodes
"""
message = TrainingPlanStatusRequest(**{
'researcher_id': self._researcher_id,
'job_id': self._id,
'training_plan': self._training_plan.source(),
'command': 'training-plan-status'
})
node_ids = self._data.node_ids()
# Send message to each node that has been found after dataset search request
with self._reqs.send(message, node_ids, policies=[DiscardOnTimeout(5)]) as federated_req:
replies = federated_req.replies()
for node_id, reply in replies.items():
if reply.success is True:
if reply.approval_obligation is True:
if reply.status == TrainingPlanApprovalStatus.APPROVED.value:
logger.info(f'Training plan has been approved by the node: {node_id}')
else:
logger.warning(f'Training plan has NOT been approved by the node: {node_id}.' +
f'Training plan status : {node_id}')
else:
logger.info(f'Training plan approval is not required by the node: {node_id}')
else:
logger.warning(f"Node : {node_id} : {reply.msg}")
# Get the nodes that haven't replied training-plan-status request
non_replied_nodes = list(set(node_ids) - set(replies.keys()))
if non_replied_nodes:
logger.warning(f"Request for checking training plan status hasn't been replied \
by the nodes: {non_replied_nodes}. You might get error \
while running your experiment. ")
return replies
extract_received_optimizer_aux_var_from_round
extract_received_optimizer_aux_var_from_round(round_id)
Restructures the received auxiliary variables (if any) from a round, and saved it in a file (for the given round_id
). Modifies in-place the training_replies
"optim_aux_var" entries by the path of the file saved.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
round_id | int | Index of the round, replies from which to parse through. | required |
Returns:
Type | Description |
---|---|
Dict[str, Dict[str, Dict[str, Any]]] | Dict of auxiliary variables, collating node-wise information, with |
Dict[str, Dict[str, Dict[str, Any]]] | format |
Source code in fedbiomed/researcher/job.py
def extract_received_optimizer_aux_var_from_round(
self,
round_id: int,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Restructures the received auxiliary variables (if any) from a round, and
saved it in a file (for the given `round_id`). Modifies in-place the `training_replies`
"optim_aux_var" entries by the path of the file saved.
Args:
round_id: Index of the round, replies from which to parse through.
Returns:
Dict of auxiliary variables, collating node-wise information, with
format `{mod_name: {node_id: node_dict}}`.
"""
aux_var = {} # type: Dict[str, Dict[str, Dict[str, Any]]]
nodes_optim_aux_vars = {} # keep here all the `optim_aux_var` parameters
aux_vars_path: str = None # path to the file where optim_aux_var will be saved (if any)
for reply in self.training_replies[round_id].values():
node_id = reply["node_id"]
node_av = reply.get("optim_aux_var", {})
for module, params in node_av.items():
aux_var.setdefault(module, {})[node_id] = params
# save optimizer auxiliary variables in a file
# FIXME: should we keep them for advanced optimizer/strategies?
if node_av:
nodes_optim_aux_vars.update({node_id: node_av})
if aux_vars_path is None:
aux_vars_path = os.path.join(
self._keep_files_dir, f"auxiliary_var_replies_{round_id}_{uuid.uuid4()}.mpk")
reply["optim_aux_var"] = aux_vars_path
if nodes_optim_aux_vars:
Serializer.dump(nodes_optim_aux_vars, aux_vars_path)
return aux_var
load_state_breakpoint
load_state_breakpoint(saved_state)
Load breakpoints state for a Job from a saved state
Parameters:
Name | Type | Description | Default |
---|---|---|---|
saved_state | Dict[str, Any] | breakpoint content | required |
Source code in fedbiomed/researcher/job.py
def load_state_breakpoint(self, saved_state: Dict[str, Any]) -> None:
"""Load breakpoints state for a Job from a saved state
Args:
saved_state: breakpoint content
"""
# Reload the job and researched ids.
self._id = saved_state.get('job_id')
self._researcher_id = saved_state.get('researcher_id')
self._node_state_agent.load_state_breakpoint(saved_state.get('node_state'))
# Upload the latest model parameters. This records the filename and url.
params = Serializer.load(saved_state.get("model_params_path"))
self.update_parameters(params)
self._load_and_set_model_params_from_file(saved_state.get("model_params_path"))
# Reload the latest training replies.
self._training_replies = self._load_training_replies(
saved_state.get('training_replies', {})
)
save_state_breakpoint
save_state_breakpoint(breakpoint_path)
Creates current state of the job to be included in a breakpoint.
Includes creating links to files included in the job state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
breakpoint_path | str | path to the existing breakpoint directory | required |
Returns:
Type | Description |
---|---|
dict | Job's current state for breakpoint |
Source code in fedbiomed/researcher/job.py
def save_state_breakpoint(self, breakpoint_path: str) -> dict:
"""Creates current state of the job to be included in a breakpoint.
Includes creating links to files included in the job state.
Args:
breakpoint_path: path to the existing breakpoint directory
Returns:
Job's current state for breakpoint
"""
# Note: some state is passed to __init__() thus is not managed
# as job state but as experiment state in current version
state = {
'researcher_id': self._researcher_id,
'job_id': self._id,
'model_params_path': self._model_params_file,
'training_replies': self._save_training_replies(self._training_replies),
'node_state': self._node_state_agent.save_state_breakpoint()
}
state['model_params_path'] = create_unique_link(
breakpoint_path, 'aggregated_params_current', '.mpk',
os.path.join('..', os.path.basename(state["model_params_path"]))
)
for round_replies in state['training_replies']:
for response in round_replies.values():
node_params_path = create_unique_file_link(
breakpoint_path, response['params_path']
)
response['params_path'] = node_params_path
return state
start_nodes_training_round
start_nodes_training_round(round_, aggregator_args, secagg_arguments=None, do_training=True, optim_aux_var=None)
Sends training request to nodes and waits for the replies
Parameters:
Name | Type | Description | Default |
---|---|---|---|
round_ | int | current number of round the algorithm is performing (a round is considered to be all the training steps of a federated model between 2 aggregations). | required |
aggregator_args | Dict[str, Dict[str, Any]] | dictionary containing some metadata about the aggregation strategy, useful to transfer some data when it's required by am aggregator. | required |
secagg_arguments | Optional[Dict] | Secure aggregation ServerKey context id | None |
do_training | bool | if False, skip training in this round (do only validation). Defaults to True. | True |
optim_aux_var | Optional[Dict[str, Dict[str, Any]]] | Auxiliary variables of the researcher-side Optimizer, if any. Note that such variables may only be used if both the Experiment and node-side training plan hold a declearn-based Optimizer, and their plug-ins are coherent with each other as to expected information exchange. | None |
Source code in fedbiomed/researcher/job.py
def start_nodes_training_round(
self,
round_: int,
aggregator_args: Dict[str, Dict[str, Any]],
secagg_arguments: Optional[Dict] = None,
do_training: bool = True,
optim_aux_var: Optional[Dict[str, Dict[str, Any]]] = None,
) -> None:
""" Sends training request to nodes and waits for the replies
Args:
round_: current number of round the algorithm is performing (a round is considered to be all the
training steps of a federated model between 2 aggregations).
aggregator_args: dictionary containing some metadata about the aggregation
strategy, useful to transfer some data when it's required by am aggregator.
secagg_arguments: Secure aggregation ServerKey context id
do_training: if False, skip training in this round (do only validation). Defaults to True.
optim_aux_var: Auxiliary variables of the researcher-side Optimizer, if any.
Note that such variables may only be used if both the Experiment and node-side training plan
hold a declearn-based [Optimizer][fedbiomed.common.optimizers.Optimizer], and their plug-ins
are coherent with each other as to expected information exchange.
"""
# Assign empty dict to secagg arguments if it is None
secagg_arguments = {} if secagg_arguments is None else secagg_arguments
msg = {
'researcher_id': self._researcher_id,
'job_id': self._id,
'training_args': self._training_args.dict(),
'training': do_training,
'model_args': self._model_args,
'round': round_,
'training_plan': self._training_plan.source(),
'training_plan_class': self._training_plan_class.__name__,
'params': self._get_model_params(),
'secagg_servkey_id': secagg_arguments.get('secagg_servkey_id'),
'secagg_biprime_id': secagg_arguments.get('secagg_biprime_id'),
'secagg_random': secagg_arguments.get('secagg_random'),
'secagg_clipping_range': secagg_arguments.get('secagg_clipping_range'),
'command': 'train',
'aggregator_args': {},
'aux_vars': [],
}
timer = {}
if do_training:
# update node states when used node list has changed from one round to another
self._update_nodes_states_agent()
# FIXME: should be part of a method called from Experiment
# (behaviour can be defined by user / changed by strategy)
nodes_state_ids = self._node_state_agent.get_last_node_states()
# Upload optimizer auxiliary variables, when there are some.
if do_training and optim_aux_var:
aux_shared, aux_bynode = (
self._prepare_agg_optimizer_aux_var(optim_aux_var, nodes=list(self._nodes))
)
else:
aux_shared = {}
aux_bynode = {}
# Loop over nodes, add node specific data and send train request
messages = MessagesByNode()
for node in self._nodes:
msg['dataset_id'] = self._data.data()[node]['dataset_id']
msg['aux_vars'] = [aux_shared, aux_bynode.get(node, None)]
msg['state_id'] = nodes_state_ids.get(node)
# FIXME: There might be another node join recently
msg['aggregator_args'] = aggregator_args.get(node, {}) if aggregator_args else {}
self._log_round_info(node=node, training=do_training)
timer[node] = time.perf_counter()
messages.update({node: TrainRequest(**msg)})
# Sends training request
with self._reqs.send(messages, self._nodes) as federated_req:
errors = federated_req.errors()
replies = federated_req.replies()
self._get_training_testing_results(replies=replies, errors=errors, round_=round_, timer=timer)
if do_training:
# update node states with node answers + when used node list has changed during the round
self._update_nodes_states_agent(before_training=False)
# return the list of nodes which answered because nodes in error have been removed
return self._nodes
update_parameters
update_parameters(params)
Update model parameters
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params | Optional[Dict[str, Any]] | Aggregated model parameters | required |
Returns:
Type | Description |
---|---|
Tuple[str, str] | Path of filename where parameters are saved |
Source code in fedbiomed/researcher/job.py
def update_parameters(
self,
params: Optional[Dict[str, Any]]
) -> Tuple[str, str]:
"""Update model parameters
Args:
params: Aggregated model parameters
Returns:
Path of filename where parameters are saved
"""
self._update_model_params(params)
filename = os.path.join(self._keep_files_dir, f"aggregated_params_{uuid.uuid4()}.mpk")
Serializer.dump(params, filename)
self._model_params_file = filename
return filename
localJob
localJob(dataset_path=None, training_plan_class=None, training_args=None, model_args=None)
Represents the entity that manage the training part. LocalJob is the version of Job but applied locally on a local dataset (thus not involving any network). It is only used to compare results to a Federated approach, using networks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_path | The path where data is stored on local disk. | None | |
training_plan_class | Optional[Typevar_TrainingPlanClass] | Class containing the code of the TrainingPlan. | None |
training_args | Optional[TrainingArgs] | Contains training parameters: lr, epochs, batch_size... | None |
model_args | Optional[dict] | Contains output and input feature dimension. | None |
Raises:
Type | Description |
---|---|
FedbiomedJobError | bad argument type or value |
Source code in fedbiomed/researcher/job.py
def __init__(self,
dataset_path: Optional[str] = None,
training_plan_class: Optional[Typevar_TrainingPlanClass] = None,
training_args: Optional[TrainingArgs] = None,
model_args: Optional[dict] = None):
"""
Constructor of the class
Args:
dataset_path : The path where data is stored on local disk.
training_plan_class: Class containing the code of the TrainingPlan.
training_args: Contains training parameters: lr, epochs, batch_size...
model_args: Contains output and input feature dimension.
Raises:
FedbiomedJobError: bad argument type or value
"""
# Check arguments
if not inspect.isclass(training_plan_class):
raise FedbiomedJobError(
f"{ErrorNumbers.FB418}: bad type for argument `training_plan_class` {type(training_plan_class)}"
)
if not issubclass(training_plan_class, training_plans_types):
raise FedbiomedJobError(
f"{ErrorNumbers.FB418}: bad type for argument "
"`training_plan_class` {training_plan_class} is not subclass of training plans")
# Initialize values
self._training_args = training_args
self._model_args = model_args
self.dataset_path = dataset_path
if training_args is not None:
if training_args.get('test_on_local_updates', False) \
or training_args.get('test_on_global_updates', False):
# if user wants to perform validation, display this message
logger.warning("Cannot perform validation, not supported for LocalJob")
if not isinstance(training_args, TrainingArgs):
self._training_args = TrainingArgs(training_args, only_required=False)
else:
self._training_args = training_args
# create/save model instance
self._training_plan = training_plan_class()
self._training_plan.post_init(model_args=self._model_args,
training_args=self._training_args)
Attributes
dataset_path instance-attribute
dataset_path = dataset_path
training_args property
writable
training_args
training_plan property
training_plan
Functions
start_training
start_training()
Run the local training
Source code in fedbiomed/researcher/job.py
def start_training(self):
"""Run the local training"""
# Run import statements (very unsafely).
for i in self._training_plan.dependencies:
exec(i, globals())
# Run the training routine.
try:
self._training_plan.set_dataset_path(self.dataset_path)
data_manager = self._training_plan.training_data()
tp_type = self._training_plan.type()
data_manager.load(tp_type=tp_type)
train_loader, test_loader = data_manager.split(test_ratio=0)
self._training_plan.training_data_loader = train_loader
self._training_plan.testing_data_loader = test_loader
self._training_plan.training_routine()
except Exception as exc:
logger.error("Cannot train model in job: %s", repr(exc))
# Save the current parameters.
else:
try:
# TODO: should test status code but not yet returned by upload_file
path = os.path.join(
environ["TMP_DIR"], f"local_params_{uuid.uuid4()}.mpk"
)
Serializer.dump(self._training_plan.get_model_params(), path)
except Exception as exc:
logger.error("Cannot write results: %s", repr(exc))