Job

Manage the training part of the experiment.

Attributes

Typevar_TrainingPlanClass module-attribute

Typevar_TrainingPlanClass = TypeVar('Typevar_TrainingPlanClass', Type[TorchTrainingPlan], Type[SKLearnTrainingPlan])

training_plans_types module-attribute

training_plans_types = (TorchTrainingPlan, SKLearnTrainingPlan)

Classes

Job

Job(reqs=None, training_plan_class=None, training_args=None, model_args=None, data=None, keep_files_dir=None)

Represents the entity that manage the training part at the nodes level

Starts a message queue, loads python model file created by researcher (through [training_plans][fedbiomed.common.training_plans]) and saves the loaded model in a file/

Parameters:

Name Type Description Default
reqs Optional[Requests]

Researcher's requests assigned to nodes. Defaults to None.

None
training_plan_class Optional[Typevar_TrainingPlanClass]

Class containing the code of the TrainingPlan.

None
training_args TrainingArgs

Contains training parameters; lr, epochs, batch_size.

None
model_args dict

Contains output and input feature dimension

None
data FederatedDataSet

Federated datasets

None
keep_files_dir str

Directory for storing files created by the job that we want to keep beyond the execution of the job. Defaults to None, files are not kept after the end of the job.

None

Raises:

Type Description
FedbiomedJobError

bad argument type or value

FedbiomedJobError

cannot save training plan to file

Source code in fedbiomed/researcher/job.py
def __init__(self,
             reqs: Optional[Requests] = None,
             training_plan_class: Optional[Typevar_TrainingPlanClass] = None,
             training_args: TrainingArgs = None,
             model_args: dict = None,
             data: FederatedDataSet = None,
             keep_files_dir: str = None):

    """ Constructor of the class

    Args:
        reqs: Researcher's requests assigned to nodes. Defaults to None.
        training_plan_class: Class containing the code of the TrainingPlan.
        training_args: Contains training parameters; lr, epochs, batch_size.
        model_args: Contains output and input feature dimension
        data: Federated datasets
        keep_files_dir: Directory for storing files created by the job that we want to keep beyond the execution
            of the job. Defaults to None, files are not kept after the end of the job.

    Raises:
        FedbiomedJobError: bad argument type or value
        FedbiomedJobError: cannot save training plan to file
    """
    # Check arguments
    if not inspect.isclass(training_plan_class):
        msg = f"{ErrorNumbers.FB418.value}: bad type for argument `training_plan_class` {type(training_plan_class)}"
        raise FedbiomedJobError(msg)

    if not issubclass(training_plan_class, training_plans_types):
        msg = f"{ErrorNumbers.FB418.value}: bad type for argument `training_plan_class`. It is not subclass of " + \
              f" supported training plans {training_plans_types}"
        raise FedbiomedJobError(msg)

    # List of node ID of the nodes used in the current round
    # - initially None (no current round yet)
    # - then updated during the round with the list of nodes to be used in the round, then the nodes
    #   that actually replied during the round
    self._nodes : Optional[List[str]] = None

    self._id = JOB_PREFIX + str(uuid.uuid4())  # creating a unique job id
    self._researcher_id = environ['RESEARCHER_ID']
    self._training_args = training_args
    self._model_args = model_args
    self._training_replies = {}  # will contain all node replies for every round
    self._model_file = None  # path to local file containing model code
    self._model_params_file = ""  # path to local file containing current version of aggregated params
    self._training_plan_class = training_plan_class
    self._aggregator_args = None

    if keep_files_dir:
        self._keep_files_dir = keep_files_dir
    else:
        self._keep_files_dir = tempfile.mkdtemp(prefix=environ['TMP_DIR'])
        atexit.register(lambda: shutil.rmtree(self._keep_files_dir))  # remove directory
        # when script ends running (replace
        # `with tempfile.TemporaryDirectory(dir=environ['TMP_DIR']) as self._keep_files_dir: `)

    if reqs is None:
        self._reqs = Requests()
    else:
        self._reqs = reqs

    self.last_msg = None
    self._data = data
    self._node_state_agent = NodeStateAgent(list(self._data.data().keys())
                                            if self._data and self._data.data() else [])

    # create TrainingPlan instance
    self._training_plan = self._training_plan_class()  # contains TrainingPlan

    # save and load training plan to a file to be sure
    # 1. a file is associated to training plan so we can read its source, etc.
    # 2. all dependencies are applied
    training_plan_module = 'model_' + str(uuid.uuid4())
    self._training_plan_file = os.path.join(self._keep_files_dir, training_plan_module + '.py')
    try:
        self._training_plan.save_code(self._training_plan_file)
    except Exception as e:
        msg = f"{ErrorNumbers.FB418}: cannot save training plan to file: {e}"
        logger.critical(msg)
        raise FedbiomedJobError(msg)
    del self._training_plan

    _, self._training_plan = utils.import_class_object_from_file(
        self._training_plan_file, self._training_plan_class.__name__)

    self._training_plan.post_init(model_args={} if self._model_args is None else self._model_args,
                                  training_args=self._training_args)

Attributes

aggregator_args property
aggregator_args
id property
id
last_msg instance-attribute
last_msg = None
nodes property writable
nodes
requests property
requests
training_args property writable
training_args
training_plan property
training_plan
training_plan_file property
training_plan_file
training_replies property
training_replies

Functions

check_training_plan_is_approved_by_nodes
check_training_plan_is_approved_by_nodes()

Checks whether model is approved or not.

This method sends training-plan-status request to the nodes. It should be run before running experiment. So, researchers can find out if their model has been approved

Returns:

Type Description
Dict

A dict of Message objects indexed by node ID, one for each job's nodes

Source code in fedbiomed/researcher/job.py
def check_training_plan_is_approved_by_nodes(self) -> Dict:
    """ Checks whether model is approved or not.

    This method sends `training-plan-status` request to the nodes. It should be run before running experiment.
    So, researchers can find out if their model has been approved

    Returns:
        A dict of `Message` objects indexed by node ID, one for each job's nodes
    """

    message = TrainingPlanStatusRequest(**{
        'researcher_id': self._researcher_id,
        'job_id': self._id,
        'training_plan': self._training_plan.source(),
        'command': 'training-plan-status'
    })

    node_ids = self._data.node_ids()

    # Send message to each node that has been found after dataset search request
    with self._reqs.send(message, node_ids, policies=[DiscardOnTimeout(5)]) as federated_req:
        replies = federated_req.replies()

        for node_id, reply in replies.items():
            if reply.success is True:
                if reply.approval_obligation is True:
                    if reply.status == TrainingPlanApprovalStatus.APPROVED.value:
                        logger.info(f'Training plan has been approved by the node: {node_id}')
                    else:
                        logger.warning(f'Training plan has NOT been approved by the node: {node_id}.' +
                                       f'Training plan status : {node_id}')
                else:
                    logger.info(f'Training plan approval is not required by the node: {node_id}')
            else:
                logger.warning(f"Node : {node_id} : {reply.msg}")

    # Get the nodes that haven't replied training-plan-status request
    non_replied_nodes = list(set(node_ids) - set(replies.keys()))
    if non_replied_nodes:
        logger.warning(f"Request for checking training plan status hasn't been replied \
                         by the nodes: {non_replied_nodes}. You might get error \
                             while running your experiment. ")

    return replies
extract_received_optimizer_aux_var_from_round
extract_received_optimizer_aux_var_from_round(round_id)

Restructures the received auxiliary variables (if any) from a round, and saved it in a file (for the given round_id). Modifies in-place the training_replies "optim_aux_var" entries by the path of the file saved.

Parameters:

Name Type Description Default
round_id int

Index of the round, replies from which to parse through.

required

Returns:

Type Description
Dict[str, Dict[str, Dict[str, Any]]]

Dict of auxiliary variables, collating node-wise information, with

Dict[str, Dict[str, Dict[str, Any]]]

format {mod_name: {node_id: node_dict}}.

Source code in fedbiomed/researcher/job.py
def extract_received_optimizer_aux_var_from_round(
    self,
    round_id: int,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
    """Restructures the received auxiliary variables (if any) from a round, and
    saved it in a file (for the given `round_id`). Modifies in-place the `training_replies`
    "optim_aux_var" entries by the path of the file saved.

    Args:
        round_id: Index of the round, replies from which to parse through.

    Returns:
        Dict of auxiliary variables, collating node-wise information, with
        format `{mod_name: {node_id: node_dict}}`.
    """
    aux_var = {}  # type: Dict[str, Dict[str, Dict[str, Any]]]
    nodes_optim_aux_vars = {}  # keep here all the `optim_aux_var` parameters
    aux_vars_path: str = None  # path to the file where optim_aux_var will be saved (if any)

    for reply in self.training_replies[round_id].values():
        node_id = reply["node_id"]
        node_av = reply.get("optim_aux_var", {})
        for module, params in node_av.items():
            aux_var.setdefault(module, {})[node_id] = params
        # save optimizer auxiliary variables in a file
        # FIXME: should we keep them for advanced optimizer/strategies?
        if node_av:
            nodes_optim_aux_vars.update({node_id: node_av})
            if aux_vars_path is None:
                aux_vars_path = os.path.join(
                    self._keep_files_dir, f"auxiliary_var_replies_{round_id}_{uuid.uuid4()}.mpk")

            reply["optim_aux_var"] = aux_vars_path
    if nodes_optim_aux_vars:
        Serializer.dump(nodes_optim_aux_vars, aux_vars_path)
    return aux_var
load_state_breakpoint
load_state_breakpoint(saved_state)

Load breakpoints state for a Job from a saved state

Parameters:

Name Type Description Default
saved_state Dict[str, Any]

breakpoint content

required
Source code in fedbiomed/researcher/job.py
def load_state_breakpoint(self, saved_state: Dict[str, Any]) -> None:
    """Load breakpoints state for a Job from a saved state

    Args:
        saved_state: breakpoint content
    """
    # Reload the job and researched ids.
    self._id = saved_state.get('job_id')
    self._researcher_id = saved_state.get('researcher_id')
    self._node_state_agent.load_state_breakpoint(saved_state.get('node_state'))
    # Upload the latest model parameters. This records the filename and url.
    params = Serializer.load(saved_state.get("model_params_path"))
    self.update_parameters(params)

    self._load_and_set_model_params_from_file(saved_state.get("model_params_path"))
    # Reload the latest training replies.
    self._training_replies = self._load_training_replies(
        saved_state.get('training_replies', {})
    )
save_state_breakpoint
save_state_breakpoint(breakpoint_path)

Creates current state of the job to be included in a breakpoint.

Includes creating links to files included in the job state.

Parameters:

Name Type Description Default
breakpoint_path str

path to the existing breakpoint directory

required

Returns:

Type Description
dict

Job's current state for breakpoint

Source code in fedbiomed/researcher/job.py
def save_state_breakpoint(self, breakpoint_path: str) -> dict:
    """Creates current state of the job to be included in a breakpoint.

    Includes creating links to files included in the job state.

    Args:
        breakpoint_path: path to the existing breakpoint directory

    Returns:
        Job's current state for breakpoint
    """

    # Note: some state is passed to __init__() thus is not managed
    # as job state but as experiment state in current version
    state = {
        'researcher_id': self._researcher_id,
        'job_id': self._id,
        'model_params_path': self._model_params_file,
        'training_replies': self._save_training_replies(self._training_replies),
        'node_state': self._node_state_agent.save_state_breakpoint()
    }

    state['model_params_path'] = create_unique_link(
        breakpoint_path, 'aggregated_params_current', '.mpk',
        os.path.join('..', os.path.basename(state["model_params_path"]))
    )

    for round_replies in state['training_replies']:
        for response in round_replies.values():
            node_params_path = create_unique_file_link(
                breakpoint_path, response['params_path']
            )
            response['params_path'] = node_params_path

    return state
start_nodes_training_round
start_nodes_training_round(round_, aggregator_args, secagg_arguments=None, do_training=True, optim_aux_var=None)

Sends training request to nodes and waits for the replies

Parameters:

Name Type Description Default
round_ int

current number of round the algorithm is performing (a round is considered to be all the training steps of a federated model between 2 aggregations).

required
aggregator_args Dict[str, Dict[str, Any]]

dictionary containing some metadata about the aggregation strategy, useful to transfer some data when it's required by am aggregator.

required
secagg_arguments Optional[Dict]

Secure aggregation ServerKey context id

None
do_training bool

if False, skip training in this round (do only validation). Defaults to True.

True
optim_aux_var Optional[Dict[str, Dict[str, Any]]]

Auxiliary variables of the researcher-side Optimizer, if any. Note that such variables may only be used if both the Experiment and node-side training plan hold a declearn-based Optimizer, and their plug-ins are coherent with each other as to expected information exchange.

None
Source code in fedbiomed/researcher/job.py
def start_nodes_training_round(
    self,
    round_: int,
    aggregator_args: Dict[str, Dict[str, Any]],
    secagg_arguments: Optional[Dict] = None,
    do_training: bool = True,
    optim_aux_var: Optional[Dict[str, Dict[str, Any]]] = None,
) -> None:
    """ Sends training request to nodes and waits for the replies

    Args:
        round_: current number of round the algorithm is performing (a round is considered to be all the
            training steps of a federated model between 2 aggregations).
        aggregator_args: dictionary containing some metadata about the aggregation
            strategy, useful to transfer some data when it's required by am aggregator.
        secagg_arguments: Secure aggregation ServerKey context id
        do_training: if False, skip training in this round (do only validation). Defaults to True.
        optim_aux_var: Auxiliary variables of the researcher-side Optimizer, if any.
            Note that such variables may only be used if both the Experiment and node-side training plan
            hold a declearn-based [Optimizer][fedbiomed.common.optimizers.Optimizer], and their plug-ins
            are coherent with each other as to expected information exchange.
    """

    # Assign empty dict to secagg arguments if it is None
    secagg_arguments = {} if secagg_arguments is None else secagg_arguments

    msg = {
        'researcher_id': self._researcher_id,
        'job_id': self._id,
        'training_args': self._training_args.dict(),
        'training': do_training,
        'model_args': self._model_args,
        'round': round_,
        'training_plan': self._training_plan.source(),
        'training_plan_class': self._training_plan_class.__name__,
        'params': self._get_model_params(),
        'secagg_servkey_id': secagg_arguments.get('secagg_servkey_id'),
        'secagg_biprime_id': secagg_arguments.get('secagg_biprime_id'),
        'secagg_random': secagg_arguments.get('secagg_random'),
        'secagg_clipping_range': secagg_arguments.get('secagg_clipping_range'),
        'command': 'train',
        'aggregator_args': {},
        'aux_vars': [],
    }

    timer = {}

    if do_training:
        # update node states when used node list has changed from one round to another
        self._update_nodes_states_agent()

    # FIXME: should be part of a method called from Experiment
    # (behaviour can be defined by user / changed by strategy)
    nodes_state_ids = self._node_state_agent.get_last_node_states()

    # Upload optimizer auxiliary variables, when there are some.
    if do_training and optim_aux_var:
        aux_shared, aux_bynode = (
            self._prepare_agg_optimizer_aux_var(optim_aux_var, nodes=list(self._nodes))
        )

    else:
        aux_shared = {}
        aux_bynode = {}

    # Loop over nodes, add node specific data and send train request
    messages = MessagesByNode()

    for node in self._nodes:
        msg['dataset_id'] = self._data.data()[node]['dataset_id']
        msg['aux_vars'] = [aux_shared, aux_bynode.get(node, None)]

        msg['state_id'] = nodes_state_ids.get(node)

        # FIXME: There might be another node join recently
        msg['aggregator_args'] = aggregator_args.get(node, {}) if aggregator_args else {}
        self._log_round_info(node=node, training=do_training)

        timer[node] = time.perf_counter()

        messages.update({node: TrainRequest(**msg)})

        # Sends training request

    with self._reqs.send(messages, self._nodes) as federated_req:
        errors = federated_req.errors()
        replies = federated_req.replies()
        self._get_training_testing_results(replies=replies, errors=errors, round_=round_, timer=timer)

    if do_training:
        # update node states with node answers + when used node list has changed during the round
        self._update_nodes_states_agent(before_training=False)

    # return the list of nodes which answered because nodes in error have been removed
    return self._nodes
update_parameters
update_parameters(params)

Update model parameters

Parameters:

Name Type Description Default
params Optional[Dict[str, Any]]

Aggregated model parameters

required

Returns:

Type Description
Tuple[str, str]

Path of filename where parameters are saved

Source code in fedbiomed/researcher/job.py
def update_parameters(
    self,
    params: Optional[Dict[str, Any]]
) -> Tuple[str, str]:
    """Update model parameters

    Args:
        params: Aggregated model parameters

    Returns:
        Path of filename where parameters are saved
    """
    self._update_model_params(params)
    filename = os.path.join(self._keep_files_dir, f"aggregated_params_{uuid.uuid4()}.mpk")
    Serializer.dump(params, filename)
    self._model_params_file = filename

    return filename

localJob

localJob(dataset_path=None, training_plan_class=None, training_args=None, model_args=None)

Represents the entity that manage the training part. LocalJob is the version of Job but applied locally on a local dataset (thus not involving any network). It is only used to compare results to a Federated approach, using networks.

Parameters:

Name Type Description Default
dataset_path

The path where data is stored on local disk.

None
training_plan_class Optional[Typevar_TrainingPlanClass]

Class containing the code of the TrainingPlan.

None
training_args Optional[TrainingArgs]

Contains training parameters: lr, epochs, batch_size...

None
model_args Optional[dict]

Contains output and input feature dimension.

None

Raises:

Type Description
FedbiomedJobError

bad argument type or value

Source code in fedbiomed/researcher/job.py
def __init__(self,
             dataset_path: Optional[str] = None,
             training_plan_class: Optional[Typevar_TrainingPlanClass] = None,
             training_args: Optional[TrainingArgs] = None,
             model_args: Optional[dict] = None):

    """
    Constructor of the class

    Args:
        dataset_path : The path where data is stored on local disk.
        training_plan_class: Class containing the code of the TrainingPlan.
        training_args: Contains training parameters: lr, epochs, batch_size...
        model_args: Contains output and input feature dimension.

    Raises:
        FedbiomedJobError: bad argument type or value
    """
    # Check arguments
    if not inspect.isclass(training_plan_class):
        raise FedbiomedJobError(
            f"{ErrorNumbers.FB418}: bad type for argument `training_plan_class` {type(training_plan_class)}"
        )
    if not issubclass(training_plan_class, training_plans_types):
        raise FedbiomedJobError(
            f"{ErrorNumbers.FB418}: bad type for argument "
            "`training_plan_class` {training_plan_class} is not subclass of training plans")

    # Initialize values
    self._training_args = training_args
    self._model_args = model_args
    self.dataset_path = dataset_path

    if training_args is not None:
        if training_args.get('test_on_local_updates', False) \
                or training_args.get('test_on_global_updates', False):
            # if user wants to perform validation, display this message
            logger.warning("Cannot perform validation, not supported for LocalJob")

    if not isinstance(training_args, TrainingArgs):
        self._training_args = TrainingArgs(training_args, only_required=False)
    else:
        self._training_args = training_args

    # create/save model instance
    self._training_plan = training_plan_class()

    self._training_plan.post_init(model_args=self._model_args,
                                  training_args=self._training_args)

Attributes

dataset_path instance-attribute
dataset_path = dataset_path
training_args property writable
training_args
training_plan property
training_plan

Functions

start_training
start_training()

Run the local training

Source code in fedbiomed/researcher/job.py
def start_training(self):
    """Run the local training"""
    # Run import statements (very unsafely).
    for i in self._training_plan.dependencies:
        exec(i, globals())

    # Run the training routine.
    try:
        self._training_plan.set_dataset_path(self.dataset_path)
        data_manager = self._training_plan.training_data()
        tp_type = self._training_plan.type()
        data_manager.load(tp_type=tp_type)
        train_loader, test_loader = data_manager.split(test_ratio=0)
        self._training_plan.training_data_loader = train_loader
        self._training_plan.testing_data_loader = test_loader
        self._training_plan.training_routine()
    except Exception as exc:
        logger.error("Cannot train model in job: %s", repr(exc))
    # Save the current parameters.
    else:
        try:
            # TODO: should test status code but not yet returned by upload_file
            path = os.path.join(
                environ["TMP_DIR"], f"local_params_{uuid.uuid4()}.mpk"
            )
            Serializer.dump(self._training_plan.get_model_params(), path)
        except Exception as exc:
            logger.error("Cannot write results: %s", repr(exc))

Functions