Classes
Job
Job(*, nodes, keep_files_dir)
Bases: ABC
Job represents a task to be executed on the node.
This is a base class that provides the basic functionality necessary to establish communication with the remote nodes. Actual tasks should inherit from Job
to implement their own domain logic.
Functional life-cycle
Jobs must follow a "functional" life-cycle, meaning that they should be created just before the execution of the task, and destroyed shortly after. Jobs should not persist outside the scope of the function that requested the execution of the task.
Attributes:
Name | Type | Description |
---|---|---|
requests | List[RequestPolicy] | None | read-only |
nodes | List[str] | node IDs participating in the task |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nodes | List[str] | None | A dict of node_id containing the nodes used for training | required |
keep_files_dir | str | Directory for storing files created by the job that we want to keep beyond the execution of the job. | required |
Source code in fedbiomed/researcher/federated_workflows/jobs/_job.py
def __init__(
self,
*,
nodes: List[str] | None,
keep_files_dir: str
):
"""Constructor of the class
Args:
nodes: A dict of node_id containing the nodes used for training
keep_files_dir: Directory for storing files created by the job that we want to keep beyond the execution
of the job.
"""
self._researcher_id = environ['RESEARCHER_ID']
self._reqs = Requests()
self._nodes: List[str] = nodes or [] # List of node ids participating in this task
self._keep_files_dir = keep_files_dir
self._policies: List[RequestPolicy] | None = None
Attributes
nodes property
nodes
requests property
requests
Classes
RequestTimer
RequestTimer(nodes)
Context manager that computes the processing time elapsed for the request and the reply
Usage:
nodes = ['node_1', 'node_2']
job = Job(nodes, file)
with job._timer() as my_timer:
# ... send some request
my_timer
# {node_1: 2.22, node_2: 2.21} # request time for each Node in second
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nodes | List[str] | existing nodes that will be requested for the Job | required |
Source code in fedbiomed/researcher/federated_workflows/jobs/_job.py
def __init__(self, nodes: List[str]):
"""
Constructor of NodeTimer
Args:
nodes: existing nodes that will be requested for the Job
"""
self._timer = {node_id: 0. for node_id in nodes}
Functions
execute abstractmethod
execute()
Payload of the job.
Completes a request to the job's nodes and collects replies.
Returns:
Type | Description |
---|---|
Any | values specific to the type of job |
Source code in fedbiomed/researcher/federated_workflows/jobs/_job.py
@abstractmethod
def execute(self) -> Any:
"""Payload of the job.
Completes a request to the job's nodes and collects replies.
Returns:
values specific to the type of job
"""
TrainingJob
TrainingJob(experiment_id, round_, training_plan, training_args, model_args, data, nodes_state_ids, aggregator_args, secagg_arguments=None, do_training=True, optim_aux_var=None, **kwargs)
Bases: Job
TrainingJob is a task for training an ML model on the nodes by executing a TrainingPlan.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
experiment_id | str | unique ID of this experiment | required |
round_ | int | current number of round the algorithm is performing (a round is considered to be all the training steps of a federated model between 2 aggregations). | required |
training_plan | BaseTrainingPlan | TrainingPlan with properly initialized model and optimizer | required |
training_args | Union[dict, TrainingArgs] | arguments for training | required |
model_args | Optional[dict] | arguments for the model | required |
data | FederatedDataSet | metadata of the federated data set | required |
nodes_state_ids | Dict[str, str] | unique IDs of the node states saved remotely | required |
aggregator_args | Dict[str, Dict[str, Any]] | aggregator arguments required for remote execution | required |
secagg_arguments | Union[Dict, None] | Secure aggregation arguments, some depending on scheme used | None |
do_training | bool | if False, skip training in this round (do only validation). Defaults to True. | True |
optim_aux_var | Optional[Dict[str, Dict[str, Any]]] | Auxiliary variables of the researcher-side Optimizer, if any. Note that such variables may only be used if both the Experiment and node-side training plan hold a declearn-based Optimizer, and their plug-ins are coherent with each other as to expected information exchange. | None |
*args | Positional argument of parent class | required | |
**kwargs | Named arguments of parent class. Please see | {} |
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_job.py
def __init__(
self,
experiment_id: str,
round_: int,
training_plan: BaseTrainingPlan,
training_args: Union[dict, TrainingArgs],
model_args: Optional[dict],
data: FederatedDataSet,
nodes_state_ids: Dict[str, str],
aggregator_args: Dict[str, Dict[str, Any]],
secagg_arguments: Union[Dict, None] = None,
do_training: bool = True,
optim_aux_var: Optional[Dict[str, Dict[str, Any]]] = None,
**kwargs
):
""" Constructor of the class
Args:
experiment_id: unique ID of this experiment
round_: current number of round the algorithm is performing (a round is considered to be all the
training steps of a federated model between 2 aggregations).
training_plan: TrainingPlan with properly initialized model and optimizer
training_args: arguments for training
model_args: arguments for the model
data: metadata of the federated data set
nodes_state_ids: unique IDs of the node states saved remotely
aggregator_args: aggregator arguments required for remote execution
secagg_arguments: Secure aggregation arguments, some depending on scheme used
do_training: if False, skip training in this round (do only validation). Defaults to True.
optim_aux_var: Auxiliary variables of the researcher-side Optimizer, if any.
Note that such variables may only be used if both the Experiment and node-side training plan
hold a declearn-based [Optimizer][fedbiomed.common.optimizers.Optimizer], and their plug-ins
are coherent with each other as to expected information exchange.
*args: Positional argument of parent class
[`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
**kwargs: Named arguments of parent class. Please see
[`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
"""
super().__init__(**kwargs)
# to be used for `execute()`
self._experiment_id = experiment_id
self._round_ = round_
self._training_plan = training_plan
self._training_args = training_args
self._model_args = model_args
self._data = data
self._nodes_state_ids = nodes_state_ids
self._aggregator_args = aggregator_args
self._secagg_arguments = secagg_arguments or {} # Assign empty dict to secagg arguments if it is None
self._do_training = do_training
self._optim_aux_var = optim_aux_var
Functions
execute
execute()
Sends training request to nodes and waits for the responses
Returns:
Type | Description |
---|---|
Tuple[Dict, Optional[Dict]] | A tuple of * training replies for this round * Dict of auxiliary variables, collating node-wise information, with format |
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_job.py
def execute(self) -> Tuple[Dict, Optional[Dict]]:
""" Sends training request to nodes and waits for the responses
Returns:
A tuple of
* training replies for this round
* Dict of auxiliary variables, collating node-wise information, with
format `{mod_name: {node_id: node_dict}}`.
"""
# Populate request message
msg = {
'researcher_id': self._researcher_id,
'experiment_id': self._experiment_id,
'training_args': self._training_args.dict(),
'training': self._do_training,
'model_args': self._model_args if self._model_args is not None else {},
'round': self._round_,
'training_plan': self._training_plan.source(),
'training_plan_class': self._training_plan.__class__.__name__,
'params': self._training_plan.get_model_params(
exclude_buffers=not self._training_args.dict()['share_persistent_buffers']),
'secagg_arguments': self._secagg_arguments,
'aggregator_args': {},
}
# Prepare optimizer auxiliary variables, if any.
if self._do_training and self._optim_aux_var:
aux_shared, aux_bynode = (
self._prepare_agg_optimizer_aux_var(self._optim_aux_var, nodes=list(self._nodes))
)
else:
aux_shared = {}
aux_bynode = {}
# Loop over nodes, add node specific data and send train request
messages = MessagesByNode()
for node in self._nodes:
msg['dataset_id'] = self._data.data()[node]['dataset_id']
msg['aux_vars'] = [aux_shared, aux_bynode.get(node, None)]
msg['state_id'] = self._nodes_state_ids.get(node)
# add aggregator parameters to message header
msg['aggregator_args'] = self._aggregator_args.get(node, {}) if self._aggregator_args else {}
self._log_round_info(node=node, training=self._do_training)
messages.update({node: TrainRequest(**msg)}) # send request to node
with self.RequestTimer(self._nodes) as timer: # compute request time
# Send training request
with self._reqs.send(messages, self._nodes, self._policies) as federated_req:
errors = federated_req.errors()
replies = federated_req.replies()
training_replies = self._get_training_results(replies=replies,
errors=errors)
timing_results = self._get_timing_results(replies, timer)
# `training_replies` can be empty if there wasnot any replies
for node_id in replies:
if training_replies.get(node_id):
training_replies[node_id].update({'timing': timing_results[node_id]})
# Extract aux variables from training replies
aux_vars = None
if self._do_training:
aux_vars = self._extract_received_optimizer_aux_var_from_round(training_replies)
return training_replies, aux_vars
TrainingPlanApproveJob
TrainingPlanApproveJob(training_plan, description, **kwargs)
Bases: Job
Task for requesting nodes approval for running a given TrainingPlan on these nodes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_plan | BaseTrainingPlan | an instance of a TrainingPlan object | required |
description | str | human-readable description of the TrainingPlan for the reviewer on the node | required |
*args | Positonal argument of parent class | required | |
**kwargs | Named arguments of parent class. Please see | {} |
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def __init__(self,
training_plan: BaseTrainingPlan,
description: str,
**kwargs
):
"""Constructor of the class.
Args:
training_plan: an instance of a TrainingPlan object
description: human-readable description of the TrainingPlan for the reviewer on the node
*args: Positonal argument of parent class
[`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
**kwargs: Named arguments of parent class. Please see
[`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
"""
super().__init__(**kwargs)
self._policies = [DiscardOnTimeout(5)] # specific policy for TrainingApproval
self._training_plan = training_plan
self._description = description
Functions
execute
execute()
Requests the approval of the provided TrainingPlan.
Returns:
Name | Type | Description |
---|---|---|
Dict | a dictionary of pairs (node_id: status), where status indicates to the researcher | |
Dict | that the training plan has been correctly downloaded on the node side. | |
Warning | Dict | status does not mean that the training plan is approved, only that it has been added |
Dict | to the "approval queue" on the node side. |
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def execute(self) -> Dict:
"""Requests the approval of the provided TrainingPlan.
Returns:
a dictionary of pairs (node_id: status), where status indicates to the researcher
that the training plan has been correctly downloaded on the node side.
Warning: status does not mean that the training plan is approved, only that it has been added
to the "approval queue" on the node side.
"""
return self._reqs.training_plan_approve(self._training_plan,
self._description,
self._nodes,
self._policies)
TrainingPlanCheckJob
TrainingPlanCheckJob(experiment_id, training_plan, **kwargs)
Bases: Job
Task for checking if nodes accept running a given TrainingPlan.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
experiment_id | str | unique ID of this experiment | required |
training_plan | BaseTrainingPlan | an instance of a TrainingPlan object | required |
**kwargs | Named arguments of parent class. Please see | {} |
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def __init__(
self,
experiment_id: str,
training_plan: BaseTrainingPlan,
**kwargs
):
"""Constructor of the class.
Args:
experiment_id: unique ID of this experiment
training_plan: an instance of a TrainingPlan object
**kwargs: Named arguments of parent class. Please see
[`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
"""
super().__init__(**kwargs)
self._policies = [DiscardOnTimeout(5)] # specific policy for TrainingApproval
self._experiment_id = experiment_id
self._training_plan = training_plan
Functions
execute
execute()
Checks whether model is approved or not.
This method sends training-plan-status
request to the nodes. It should be run before running experiment. So, researchers can find out if their model has been approved
Returns:
Type | Description |
---|---|
Dict | A dict of |
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def execute(self) -> Dict:
"""Checks whether model is approved or not.
This method sends `training-plan-status` request to the nodes. It should be run before running experiment.
So, researchers can find out if their model has been approved
Returns:
A dict of `Message` objects indexed by node ID, one for each job's nodes
"""
message = TrainingPlanStatusRequest(**{
'researcher_id': self._researcher_id,
'experiment_id': self._experiment_id,
'training_plan': self._training_plan.source(),
})
# Send message to each node that has been found after dataset search request
# TODO: add timer to compute request time
with self._reqs.send(message, self._nodes, policies=self._policies) as federated_req:
replies = federated_req.replies()
for node_id, reply in replies.items():
if reply.success is True:
if reply.approval_obligation is True:
if reply.status == TrainingPlanApprovalStatus.APPROVED.value:
logger.info(f'Training plan has been approved by the node: {node_id}')
else:
logger.warning(f'Training plan has NOT been approved by the node: {node_id}.' +
f'Training plan status : {reply.status}')
else:
logger.info(f'Training plan approval is not required by the node: {node_id}')
else:
logger.warning(f"Node : {node_id} : {reply.msg}")
# Get the nodes that haven't replied training-plan-status request
non_replied_nodes = list(set(self._nodes) - set(replies.keys()))
if non_replied_nodes:
logger.warning(f"Request for checking training plan status hasn't been replied \
by the nodes: {non_replied_nodes}. You might get error \
while running your experiment. ")
return replies