Strategies

Classes

DefaultStrategy

DefaultStrategy()

Bases: Strategy

Default strategy to be used when sampling/selecting nodes and to check whether nodes have responded or not

Strategy is: - select all nodes for each round - raise an error if one node does not answer - raise an error is one node returns an error

    used for federated training.
Source code in fedbiomed/researcher/strategies/strategy.py
def __init__(self):
    """

    Args:
        data: Object that includes all active nodes and the meta-data of the dataset that is going to be
            used for federated training.
    """
    self._sampling_node_history = {}
    self._success_node_history = {}
    self._parameters = None

Functions

refine
refine(training_replies, round_i)

The method where node selection is completed by extracting parameters and length from the training replies

Parameters:

Name Type Description Default
training_replies Dict[str, TrainReply]

Dictionary of replies from nodes.

required
round_i int

Current round of experiment.

required

Returns:

Name Type Description
model_params Dict[str, Dict[str, Union[Tensor, ndarray]]]

list with each element representing a dictionary. Its only key represents the node_id and the corresponding value is a dictionary containing list of weight matrices of every node : [{"n1":{"layer1":m1,"layer2":m2},{"layer3":"m3"}},{"n2": ...}] Including the node_id is useful for the proper functioning of some strategies like Scaffold: At each round, local model params are linked to a certain correction. The correction is updated every round. The computation of correction states at round i is dependent to client states and correction states of round i-1. Since training_replies can potentially order the node replies differently from round to round, the bridge between all these parameters is represented by the node_id

weights Dict[str, float]

Proportions list, each element of this list represents a dictionary with its only key as the node_id and its value the proportion of lines the node has with respect to the whole

total_rows int

sum of number of samples used by all nodes

encryption_factors Dict[str, List[int]]

encryption factors from the participating nodes

Raises:

Type Description
FedbiomedStrategyError

If any of the following occur: - A node in the sampling list does not reply - Not all nodes successfully complete training - A successful reply does not include sample_size value.

Source code in fedbiomed/researcher/strategies/default_strategy.py
def refine(
    self, training_replies: Dict[str, TrainReply], round_i: int
) -> Tuple[
    Dict[str, Dict[str, Union["torch.Tensor", "np.ndarray"]]],
    Dict[str, float],
    int,
    Dict[str, List[int]],
]:
    """
    The method where node selection is completed by extracting parameters and length from the training replies

    Args:
        training_replies: Dictionary of replies from nodes.
        round_i: Current round of experiment.

    Returns:
        model_params: list with each element representing a dictionary. Its only key represents
            the node_id and the corresponding value is a dictionary containing list of weight
            matrices of every node : [{"n1":{"layer1":m1,"layer2":m2},{"layer3":"m3"}},{"n2":
            ...}] Including the node_id is useful for the proper functioning of some strategies
            like Scaffold: At each round, local model params are linked to a certain correction.
            The correction is updated every round. The computation of correction states at round
            i is dependent to client states and correction states of round i-1. Since
            training_replies    can potentially order the node replies differently from round to
            round, the bridge between all these parameters is represented by the node_id
        weights: Proportions list, each element of this list represents a dictionary with
            its only key as the node_id and its value the proportion of lines the node has
            with respect to the whole
        total_rows: sum of number of samples used by all nodes
        encryption_factors: encryption factors from the participating nodes

    Raises:
        FedbiomedStrategyError: If any of the following occur:
            - A node in the sampling list does not reply
            - Not all nodes successfully complete training
            - A successful reply does not include `sample_size` value.
    """
    missing_node_replies = []
    for node in self._sampling_node_history.get(round_i):
        if node not in training_replies:
            missing_node_replies.append(node)
            logger.error(ErrorNumbers.FB409.value + " (node = " + node + ")")
    if missing_node_replies:
        raise FedbiomedStrategyError(
            ErrorNumbers.FB408.value
            + f": {len(missing_node_replies)} missing training replies for round {round_i}"
        )

    # check that all nodes that answer could successfully train
    self._success_node_history[round_i] = []
    all_success = True
    model_params = {}
    sample_sizes = {}
    encryption_factors = {}
    total_rows = 0
    for tr in training_replies.values():
        if tr["success"] is True:
            model_params[tr["node_id"]] = tr["params"]
            encryption_factors[tr["node_id"]] = tr.get("encryption_factor", None)

            if tr.get("sample_size") is None:
                # if a Node `sample_size` is None, we cannot compute the weights: in this case
                # return an error
                raise FedbiomedStrategyError(
                    ErrorNumbers.FB402.value
                    + f" : Node {tr['node_id']} did not return "
                    "any `sample_size` value (number of samples seen during one Round),"
                    " can not compute weights for the aggregation. Aborting"
                )
            sample_sizes[tr["node_id"]] = tr["sample_size"]

            total_rows += tr["sample_size"]
            self._success_node_history[round_i].append(tr["node_id"])
        else:
            all_success = False
            msg = tr.get("msg") or ""
            logger.error(
                "Unsuccessful training reply{} (node = {} )".format(
                    f": {msg}" if msg else "", tr["node_id"]
                )
            )

    if not all_success:
        raise FedbiomedStrategyError(ErrorNumbers.FB402.value)

    weights = {
        node_id: (
            sample_size / total_rows if total_rows != 0 else 1 / len(sample_sizes)
        )
        for node_id, sample_size in sample_sizes.items()
    }

    logger.info(
        f"Nodes that successfully reply in round {round_i} "
        f"{self._success_node_history[round_i]}"
    )

    return model_params, weights, total_rows, encryption_factors
sample_nodes
sample_nodes(from_nodes, round_i)

Samples and selects nodes on which to train local model. In this strategy we will consider all existing nodes

Parameters:

Name Type Description Default
from_nodes List[str]

the node ids which may be sampled

required
round_i int

number of round.

required

Returns:

Name Type Description
node_ids List[str]

list of all node ids considered for training during this round round_i.

Source code in fedbiomed/researcher/strategies/default_strategy.py
def sample_nodes(self, from_nodes: List[str], round_i: int) -> List[str]:
    """Samples and selects nodes on which to train local model. In this strategy we will consider all existing
    nodes

    Args:
        from_nodes: the node ids which may be sampled
        round_i: number of round.

    Returns:
        node_ids: list of all node ids considered for training during
            this round `round_i`.
    """
    self._sampling_node_history[round_i] = copy.copy(from_nodes)

    return from_nodes

Strategy

Strategy()

Default Strategy as Parent class. Custom strategy classes must inherit from this parent class.

Inconsistent history

The Strategy class keeps a history of sampled and successful nodes. No attempt is made to keep this history consistent when the _fds member is modified.

    used for federated training.
Source code in fedbiomed/researcher/strategies/strategy.py
def __init__(self):
    """

    Args:
        data: Object that includes all active nodes and the meta-data of the dataset that is going to be
            used for federated training.
    """
    self._sampling_node_history = {}
    self._success_node_history = {}
    self._parameters = None

Functions

load_state_breakpoint
load_state_breakpoint(state=None)

Method for loading strategy state from breakpoint state

Parameters:

Name Type Description Default
state Dict[str, Any]

The state that will be loaded

None
Source code in fedbiomed/researcher/strategies/strategy.py
def load_state_breakpoint(self, state: Dict[str, Any] = None):
    """
    Method for loading strategy state from breakpoint state

    Args:
        state: The state that will be loaded
    """
    # fds may be modified and diverge from Experiment
    self._parameters = state["parameters"]
refine
refine(training_replies, round_i)

Abstract method that must be implemented by child class

Parameters:

Name Type Description Default
training_replies Dict[str, TrainReply]

Dictionary of replies from nodes

required
round_i int

Current round of experiment

required

Raises:

Type Description
FedbiomedStrategyError

If method is not implemented by child class

Source code in fedbiomed/researcher/strategies/strategy.py
def refine(
    self, training_replies: Dict[str, TrainReply], round_i: int
) -> Tuple[
    Dict[str, Dict[str, Union["torch.Tensor", "np.ndarray"]]],
    Dict[str, float],
    int,
    Dict[str, List[int]],
]:
    """
    Abstract method that must be implemented by child class

    Args:
        training_replies: Dictionary of replies from nodes
        round_i: Current round of experiment

    Raises:
        FedbiomedStrategyError: If method is not implemented by child class
    """
    msg = (
        ErrorNumbers.FB402.value
        + ": refine method should be overloaded by the provided strategy"
    )
    logger.critical(msg)
    raise FedbiomedStrategyError(msg)
sample_nodes
sample_nodes(from_nodes, round_i)

Abstract method that must be implemented by child class

Parameters:

Name Type Description Default
from_nodes List[str]

the node ids which may be sampled

required
round_i int

Current round of experiment

required
Source code in fedbiomed/researcher/strategies/strategy.py
def sample_nodes(self, from_nodes: List[str], round_i: int):
    """
    Abstract method that must be implemented by child class

    Args:
        from_nodes: the node ids which may be sampled
        round_i: Current round of experiment
    """
    msg = (
        ErrorNumbers.FB402.value
        + ": sample nodes method should be overloaded by the provided strategy"
    )
    logger.critical(msg)
    raise FedbiomedStrategyError(msg)
save_state_breakpoint
save_state_breakpoint()

Method for saving strategy state for saving breakpoints

Returns:

Type Description
Dict[str, Any]

The state of the strategy

Source code in fedbiomed/researcher/strategies/strategy.py
def save_state_breakpoint(self) -> Dict[str, Any]:
    """
    Method for saving strategy state for saving breakpoints

    Returns:
        The state of the strategy
    """

    state = {
        "class": type(self).__name__,
        "module": self.__module__,
        "parameters": self._parameters,
    }
    return state