Classes
DefaultStrategy
DefaultStrategy()
Bases: Strategy
Default strategy to be used when sampling/selecting nodes and to check whether nodes have responded or not
Strategy is: - select all nodes for each round - raise an error if one node does not answer - raise an error is one node returns an error
used for federated training.
Source code in fedbiomed/researcher/strategies/strategy.py
def __init__(self):
"""
Args:
data: Object that includes all active nodes and the meta-data of the dataset that is going to be
used for federated training.
"""
self._sampling_node_history = {}
self._success_node_history = {}
self._parameters = None
Functions
refine
refine(training_replies, round_i)
The method where node selection is completed by extracting parameters and length from the training replies
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
training_replies | Dict[str, TrainReply] | Dictionary of replies from nodes. | required |
round_i | int | Current round of experiment. | required |
Returns:
| Name | Type | Description |
|---|---|---|
model_params | Dict[str, Dict[str, Union[Tensor, ndarray]]] | list with each element representing a dictionary. Its only key represents the node_id and the corresponding value is a dictionary containing list of weight matrices of every node : [{"n1":{"layer1":m1,"layer2":m2},{"layer3":"m3"}},{"n2": ...}] Including the node_id is useful for the proper functioning of some strategies like Scaffold: At each round, local model params are linked to a certain correction. The correction is updated every round. The computation of correction states at round i is dependent to client states and correction states of round i-1. Since training_replies can potentially order the node replies differently from round to round, the bridge between all these parameters is represented by the node_id |
weights | Dict[str, float] | Proportions list, each element of this list represents a dictionary with its only key as the node_id and its value the proportion of lines the node has with respect to the whole |
total_rows | int | sum of number of samples used by all nodes |
encryption_factors | Dict[str, List[int]] | encryption factors from the participating nodes |
Raises:
| Type | Description |
|---|---|
FedbiomedStrategyError | If any of the following occur: - A node in the sampling list does not reply - Not all nodes successfully complete training - A successful reply does not include |
Source code in fedbiomed/researcher/strategies/default_strategy.py
def refine(
self, training_replies: Dict[str, TrainReply], round_i: int
) -> Tuple[
Dict[str, Dict[str, Union["torch.Tensor", "np.ndarray"]]],
Dict[str, float],
int,
Dict[str, List[int]],
]:
"""
The method where node selection is completed by extracting parameters and length from the training replies
Args:
training_replies: Dictionary of replies from nodes.
round_i: Current round of experiment.
Returns:
model_params: list with each element representing a dictionary. Its only key represents
the node_id and the corresponding value is a dictionary containing list of weight
matrices of every node : [{"n1":{"layer1":m1,"layer2":m2},{"layer3":"m3"}},{"n2":
...}] Including the node_id is useful for the proper functioning of some strategies
like Scaffold: At each round, local model params are linked to a certain correction.
The correction is updated every round. The computation of correction states at round
i is dependent to client states and correction states of round i-1. Since
training_replies can potentially order the node replies differently from round to
round, the bridge between all these parameters is represented by the node_id
weights: Proportions list, each element of this list represents a dictionary with
its only key as the node_id and its value the proportion of lines the node has
with respect to the whole
total_rows: sum of number of samples used by all nodes
encryption_factors: encryption factors from the participating nodes
Raises:
FedbiomedStrategyError: If any of the following occur:
- A node in the sampling list does not reply
- Not all nodes successfully complete training
- A successful reply does not include `sample_size` value.
"""
missing_node_replies = []
for node in self._sampling_node_history.get(round_i):
if node not in training_replies:
missing_node_replies.append(node)
logger.error(ErrorNumbers.FB409.value + " (node = " + node + ")")
if missing_node_replies:
raise FedbiomedStrategyError(
ErrorNumbers.FB408.value
+ f": {len(missing_node_replies)} missing training replies for round {round_i}"
)
# check that all nodes that answer could successfully train
self._success_node_history[round_i] = []
all_success = True
model_params = {}
sample_sizes = {}
encryption_factors = {}
total_rows = 0
for tr in training_replies.values():
if tr["success"] is True:
model_params[tr["node_id"]] = tr["params"]
encryption_factors[tr["node_id"]] = tr.get("encryption_factor", None)
if tr.get("sample_size") is None:
# if a Node `sample_size` is None, we cannot compute the weights: in this case
# return an error
raise FedbiomedStrategyError(
ErrorNumbers.FB402.value
+ f" : Node {tr['node_id']} did not return "
"any `sample_size` value (number of samples seen during one Round),"
" can not compute weights for the aggregation. Aborting"
)
sample_sizes[tr["node_id"]] = tr["sample_size"]
total_rows += tr["sample_size"]
self._success_node_history[round_i].append(tr["node_id"])
else:
all_success = False
msg = tr.get("msg") or ""
logger.error(
"Unsuccessful training reply{} (node = {} )".format(
f": {msg}" if msg else "", tr["node_id"]
)
)
if not all_success:
raise FedbiomedStrategyError(ErrorNumbers.FB402.value)
weights = {
node_id: (
sample_size / total_rows if total_rows != 0 else 1 / len(sample_sizes)
)
for node_id, sample_size in sample_sizes.items()
}
logger.info(
f"Nodes that successfully reply in round {round_i} "
f"{self._success_node_history[round_i]}"
)
return model_params, weights, total_rows, encryption_factors
sample_nodes
sample_nodes(from_nodes, round_i)
Samples and selects nodes on which to train local model. In this strategy we will consider all existing nodes
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_nodes | List[str] | the node ids which may be sampled | required |
round_i | int | number of round. | required |
Returns:
| Name | Type | Description |
|---|---|---|
node_ids | List[str] | list of all node ids considered for training during this round |
Source code in fedbiomed/researcher/strategies/default_strategy.py
def sample_nodes(self, from_nodes: List[str], round_i: int) -> List[str]:
"""Samples and selects nodes on which to train local model. In this strategy we will consider all existing
nodes
Args:
from_nodes: the node ids which may be sampled
round_i: number of round.
Returns:
node_ids: list of all node ids considered for training during
this round `round_i`.
"""
self._sampling_node_history[round_i] = copy.copy(from_nodes)
return from_nodes
Strategy
Strategy()
Default Strategy as Parent class. Custom strategy classes must inherit from this parent class.
Inconsistent history
The Strategy class keeps a history of sampled and successful nodes. No attempt is made to keep this history consistent when the _fds member is modified.
used for federated training.
Source code in fedbiomed/researcher/strategies/strategy.py
def __init__(self):
"""
Args:
data: Object that includes all active nodes and the meta-data of the dataset that is going to be
used for federated training.
"""
self._sampling_node_history = {}
self._success_node_history = {}
self._parameters = None
Functions
load_state_breakpoint
load_state_breakpoint(state=None)
Method for loading strategy state from breakpoint state
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state | Dict[str, Any] | The state that will be loaded | None |
Source code in fedbiomed/researcher/strategies/strategy.py
def load_state_breakpoint(self, state: Dict[str, Any] = None):
"""
Method for loading strategy state from breakpoint state
Args:
state: The state that will be loaded
"""
# fds may be modified and diverge from Experiment
self._parameters = state["parameters"]
refine
refine(training_replies, round_i)
Abstract method that must be implemented by child class
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
training_replies | Dict[str, TrainReply] | Dictionary of replies from nodes | required |
round_i | int | Current round of experiment | required |
Raises:
| Type | Description |
|---|---|
FedbiomedStrategyError | If method is not implemented by child class |
Source code in fedbiomed/researcher/strategies/strategy.py
def refine(
self, training_replies: Dict[str, TrainReply], round_i: int
) -> Tuple[
Dict[str, Dict[str, Union["torch.Tensor", "np.ndarray"]]],
Dict[str, float],
int,
Dict[str, List[int]],
]:
"""
Abstract method that must be implemented by child class
Args:
training_replies: Dictionary of replies from nodes
round_i: Current round of experiment
Raises:
FedbiomedStrategyError: If method is not implemented by child class
"""
msg = (
ErrorNumbers.FB402.value
+ ": refine method should be overloaded by the provided strategy"
)
logger.critical(msg)
raise FedbiomedStrategyError(msg)
sample_nodes
sample_nodes(from_nodes, round_i)
Abstract method that must be implemented by child class
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_nodes | List[str] | the node ids which may be sampled | required |
round_i | int | Current round of experiment | required |
Source code in fedbiomed/researcher/strategies/strategy.py
def sample_nodes(self, from_nodes: List[str], round_i: int):
"""
Abstract method that must be implemented by child class
Args:
from_nodes: the node ids which may be sampled
round_i: Current round of experiment
"""
msg = (
ErrorNumbers.FB402.value
+ ": sample nodes method should be overloaded by the provided strategy"
)
logger.critical(msg)
raise FedbiomedStrategyError(msg)
save_state_breakpoint
save_state_breakpoint()
Method for saving strategy state for saving breakpoints
Returns:
| Type | Description |
|---|---|
Dict[str, Any] | The state of the strategy |
Source code in fedbiomed/researcher/strategies/strategy.py
def save_state_breakpoint(self) -> Dict[str, Any]:
"""
Method for saving strategy state for saving breakpoints
Returns:
The state of the strategy
"""
state = {
"class": type(self).__name__,
"module": self.__module__,
"parameters": self._parameters,
}
return state