Jobs

Classes

Job

Job(*, nodes, keep_files_dir)

Bases: ABC

Job represents a task to be executed on the node.

This is a base class that provides the basic functionality necessary to establish communication with the remote nodes. Actual tasks should inherit from Job to implement their own domain logic.

Functional life-cycle

Jobs must follow a "functional" life-cycle, meaning that they should be created just before the execution of the task, and destroyed shortly after. Jobs should not persist outside the scope of the function that requested the execution of the task.

Attributes:

Name Type Description
requests List[RequestPolicy] | None

read-only Requests object handling communication with remote nodes

nodes List[str]

node IDs participating in the task

Parameters:

Name Type Description Default
nodes List[str] | None

A dict of node_id containing the nodes used for training

required
keep_files_dir str

Directory for storing files created by the job that we want to keep beyond the execution of the job.

required
Source code in fedbiomed/researcher/federated_workflows/jobs/_job.py
def __init__(
    self,
    *,
    nodes: List[str] | None,
    keep_files_dir: str
):
    """Constructor of the class

    Args:
        nodes: A dict of node_id containing the nodes used for training
        keep_files_dir: Directory for storing files created by the job that we want to keep beyond the execution
            of the job.

    """

    self._researcher_id = environ['RESEARCHER_ID']
    self._reqs = Requests()
    self._nodes: List[str] = nodes or []  # List of node ids participating in this task
    self._keep_files_dir = keep_files_dir
    self._policies: List[RequestPolicy] | None = None

Attributes

nodes property
nodes
requests property
requests

Classes

RequestTimer
RequestTimer(nodes)

Context manager that computes the processing time elapsed for the request and the reply

Usage:

nodes = ['node_1', 'node_2']
job = Job(nodes, file)
with job._timer() as my_timer:
    # ... send some request

my_timer
# {node_1: 2.22, node_2: 2.21} # request time for each Node in second

Parameters:

Name Type Description Default
nodes List[str]

existing nodes that will be requested for the Job

required
Source code in fedbiomed/researcher/federated_workflows/jobs/_job.py
def __init__(self, nodes: List[str]):
    """
    Constructor of NodeTimer

    Args:
        nodes: existing nodes that will be requested for the Job
    """
    self._timer = {node_id: 0.  for node_id in nodes}

Functions

execute abstractmethod
execute()

Payload of the job.

Completes a request to the job's nodes and collects replies.

Returns:

Type Description
Any

values specific to the type of job

Source code in fedbiomed/researcher/federated_workflows/jobs/_job.py
@abstractmethod
def execute(self) -> Any:
    """Payload of the job.

    Completes a request to the job's nodes and collects replies.

    Returns:
        values specific to the type of job
    """

TrainingJob

TrainingJob(experiment_id, round_, training_plan, training_args, model_args, data, nodes_state_ids, aggregator_args, secagg_arguments=None, do_training=True, optim_aux_var=None, **kwargs)

Bases: Job

TrainingJob is a task for training an ML model on the nodes by executing a TrainingPlan.

Parameters:

Name Type Description Default
experiment_id str

unique ID of this experiment

required
round_ int

current number of round the algorithm is performing (a round is considered to be all the training steps of a federated model between 2 aggregations).

required
training_plan BaseTrainingPlan

TrainingPlan with properly initialized model and optimizer

required
training_args Union[dict, TrainingArgs]

arguments for training

required
model_args Optional[dict]

arguments for the model

required
data FederatedDataSet

metadata of the federated data set

required
nodes_state_ids Dict[str, str]

unique IDs of the node states saved remotely

required
aggregator_args Dict[str, Dict[str, Any]]

aggregator arguments required for remote execution

required
secagg_arguments Union[Dict, None]

Secure aggregation arguments, some depending on scheme used

None
do_training bool

if False, skip training in this round (do only validation). Defaults to True.

True
optim_aux_var Optional[Dict[str, Dict[str, Any]]]

Auxiliary variables of the researcher-side Optimizer, if any. Note that such variables may only be used if both the Experiment and node-side training plan hold a declearn-based Optimizer, and their plug-ins are coherent with each other as to expected information exchange.

None
*args

Positional argument of parent class Job

required
**kwargs

Named arguments of parent class. Please see Job

{}
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_job.py
def __init__(
    self,
    experiment_id: str,
    round_: int,
    training_plan: BaseTrainingPlan,
    training_args: Union[dict, TrainingArgs],
    model_args: Optional[dict],
    data: FederatedDataSet,
    nodes_state_ids: Dict[str, str],
    aggregator_args: Dict[str, Dict[str, Any]],
    secagg_arguments: Union[Dict, None] = None,
    do_training: bool = True,
    optim_aux_var: Optional[Dict[str, Dict[str, Any]]] = None,
    **kwargs
):

    """ Constructor of the class

    Args:
        experiment_id: unique ID of this experiment
        round_: current number of round the algorithm is performing (a round is considered to be all the
            training steps of a federated model between 2 aggregations).
        training_plan: TrainingPlan with properly initialized model and optimizer
        training_args: arguments for training
        model_args: arguments for the model
        data: metadata of the federated data set
        nodes_state_ids: unique IDs of the node states saved remotely
        aggregator_args: aggregator arguments required for remote execution
        secagg_arguments: Secure aggregation arguments, some depending on scheme used
        do_training: if False, skip training in this round (do only validation). Defaults to True.
        optim_aux_var: Auxiliary variables of the researcher-side Optimizer, if any.
            Note that such variables may only be used if both the Experiment and node-side training plan
            hold a declearn-based [Optimizer][fedbiomed.common.optimizers.Optimizer], and their plug-ins
            are coherent with each other as to expected information exchange.
        *args: Positional argument of parent class
            [`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
        **kwargs: Named arguments of parent class. Please see
            [`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
    """
    super().__init__(**kwargs)
    # to be used for `execute()`
    self._experiment_id = experiment_id
    self._round_ = round_
    self._training_plan = training_plan
    self._training_args = training_args
    self._model_args = model_args
    self._data = data
    self._nodes_state_ids = nodes_state_ids
    self._aggregator_args = aggregator_args
    self._secagg_arguments = secagg_arguments or {}  # Assign empty dict to secagg arguments if it is None
    self._do_training = do_training
    self._optim_aux_var = optim_aux_var

Functions

execute
execute()

Sends training request to nodes and waits for the responses

Returns:

Type Description
Tuple[Dict, Optional[Dict]]

A tuple of * training replies for this round * Dict of auxiliary variables, collating node-wise information, with format {mod_name: {node_id: node_dict}}.

Source code in fedbiomed/researcher/federated_workflows/jobs/_training_job.py
def execute(self) -> Tuple[Dict, Optional[Dict]]:
    """ Sends training request to nodes and waits for the responses

    Returns:
        A tuple of
          * training replies for this round
          * Dict of auxiliary variables, collating node-wise information, with
            format `{mod_name: {node_id: node_dict}}`.
    """

    # Populate request message
    msg = {
        'researcher_id': self._researcher_id,
        'experiment_id': self._experiment_id,
        'training_args': self._training_args.dict(),
        'training': self._do_training,
        'model_args': self._model_args if self._model_args is not None else {},
        'round': self._round_,
        'training_plan': self._training_plan.source(),
        'training_plan_class': self._training_plan.__class__.__name__,
        'params': self._training_plan.get_model_params(
            exclude_buffers=not self._training_args.dict()['share_persistent_buffers']),
        'secagg_arguments': self._secagg_arguments,
        'command': 'train',
        'aggregator_args': {},
    }

    # Prepare optimizer auxiliary variables, if any.
    if self._do_training and self._optim_aux_var:
        aux_shared, aux_bynode = (
            self._prepare_agg_optimizer_aux_var(self._optim_aux_var, nodes=list(self._nodes))
        )
    else:
        aux_shared = {}
        aux_bynode = {}

    # Loop over nodes, add node specific data and send train request
    messages = MessagesByNode()

    for node in self._nodes:
        msg['dataset_id'] = self._data.data()[node]['dataset_id']
        msg['aux_vars'] = [aux_shared, aux_bynode.get(node, None)]
        msg['state_id'] = self._nodes_state_ids.get(node)

        # add aggregator parameters to message header
        msg['aggregator_args'] = self._aggregator_args.get(node, {}) if self._aggregator_args else {}

        self._log_round_info(node=node, training=self._do_training)

        messages.update({node: TrainRequest(**msg)})  # send request to node

    with self.RequestTimer(self._nodes) as timer:  # compute request time
        # Send training request
        with self._reqs.send(messages, self._nodes, self._policies) as federated_req:

            errors = federated_req.errors()
            replies = federated_req.replies()

    training_replies = self._get_training_results(replies=replies,
                                                  errors=errors)

    timing_results = self._get_timing_results(replies, timer)
    # `training_replies` can be empty if there wasnot any replies
    for node_id in replies:
        if training_replies.get(node_id):
            training_replies[node_id].update({'timing': timing_results[node_id]})

    # Extract aux variables from training replies
    aux_vars = None
    if self._do_training:
        aux_vars = self._extract_received_optimizer_aux_var_from_round(training_replies)

    return training_replies, aux_vars

TrainingPlanApproveJob

TrainingPlanApproveJob(training_plan, description, **kwargs)

Bases: Job

Task for requesting nodes approval for running a given TrainingPlan on these nodes.

Parameters:

Name Type Description Default
training_plan BaseTrainingPlan

an instance of a TrainingPlan object

required
description str

human-readable description of the TrainingPlan for the reviewer on the node

required
*args

Positonal argument of parent class Job

required
**kwargs

Named arguments of parent class. Please see Job

{}
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def __init__(self,
             training_plan: BaseTrainingPlan,
             description: str,
             **kwargs
             ):
    """Constructor of the class.

    Args:
        training_plan: an instance of a TrainingPlan object
        description: human-readable description of the TrainingPlan for the reviewer on the node
        *args: Positonal argument of parent class
            [`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
        **kwargs: Named arguments of parent class. Please see
            [`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
    """
    super().__init__(**kwargs)
    self._policies = [DiscardOnTimeout(5)]  # specific policy for TrainingApproval
    self._training_plan = training_plan
    self._description = description

Functions

execute
execute()

Requests the approval of the provided TrainingPlan.

Returns:

Name Type Description
Dict

a dictionary of pairs (node_id: status), where status indicates to the researcher

Dict

that the training plan has been correctly downloaded on the node side.

Warning Dict

status does not mean that the training plan is approved, only that it has been added

Dict

to the "approval queue" on the node side.

Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def execute(self) -> Dict:
    """Requests the approval of the provided TrainingPlan.

    Returns:
        a dictionary of pairs (node_id: status), where status indicates to the researcher
        that the training plan has been correctly downloaded on the node side.
        Warning: status does not mean that the training plan is approved, only that it has been added
        to the "approval queue" on the node side.
    """
    return self._reqs.training_plan_approve(self._training_plan,
                                            self._description,
                                            self._nodes,
                                            self._policies)

TrainingPlanCheckJob

TrainingPlanCheckJob(experiment_id, training_plan, **kwargs)

Bases: Job

Task for checking if nodes accept running a given TrainingPlan.

Parameters:

Name Type Description Default
experiment_id str

unique ID of this experiment

required
training_plan BaseTrainingPlan

an instance of a TrainingPlan object

required
**kwargs

Named arguments of parent class. Please see Job

{}
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def __init__(
    self,
    experiment_id: str,
    training_plan: BaseTrainingPlan,
    **kwargs
):
    """Constructor of the class.

    Args:
        experiment_id: unique ID of this experiment
        training_plan: an instance of a TrainingPlan object
        **kwargs: Named arguments of parent class. Please see
            [`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]

    """
    super().__init__(**kwargs)
    self._policies = [DiscardOnTimeout(5)]  # specific policy for TrainingApproval
    self._experiment_id = experiment_id
    self._training_plan = training_plan

Functions

execute
execute()

Checks whether model is approved or not.

This method sends training-plan-status request to the nodes. It should be run before running experiment. So, researchers can find out if their model has been approved

Returns:

Type Description
Dict

A dict of Message objects indexed by node ID, one for each job's nodes

Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def execute(self) -> Dict:
    """Checks whether model is approved or not.

    This method sends `training-plan-status` request to the nodes. It should be run before running experiment.
    So, researchers can find out if their model has been approved

    Returns:
        A dict of `Message` objects indexed by node ID, one for each job's nodes
    """

    message = TrainingPlanStatusRequest(**{
        'researcher_id': self._researcher_id,
        'experiment_id': self._experiment_id,
        'training_plan': self._training_plan.source(),
        'command': 'training-plan-status'
    })

    # Send message to each node that has been found after dataset search request
    # TODO: add timer to compute request time
    with self._reqs.send(message, self._nodes, policies=self._policies) as federated_req:
        replies = federated_req.replies()

        for node_id, reply in replies.items():
            if reply.success is True:
                if reply.approval_obligation is True:
                    if reply.status == TrainingPlanApprovalStatus.APPROVED.value:
                        logger.info(f'Training plan has been approved by the node: {node_id}')
                    else:
                        logger.warning(f'Training plan has NOT been approved by the node: {node_id}.' +
                                       f'Training plan status : {reply.status}')
                else:
                    logger.info(f'Training plan approval is not required by the node: {node_id}')
            else:
                logger.warning(f"Node : {node_id} : {reply.msg}")

    # Get the nodes that haven't replied training-plan-status request
    non_replied_nodes = list(set(self._nodes) - set(replies.keys()))
    if non_replied_nodes:
        logger.warning(f"Request for checking training plan status hasn't been replied \
                         by the nodes: {non_replied_nodes}. You might get error \
                             while running your experiment. ")

    return replies