Classes
NodeStateAgent
NodeStateAgent(node_ids)
Manages Node States collection, gathered from Nodes
replies.
Initializes state ID of each node provided in node_ids
to None (will maintain state, no previous node state yet)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_ids | List[str] | list of node IDs of the nodes for which to maintain state ID | required |
Source code in fedbiomed/researcher/node_state_agent.py
def __init__(self, node_ids: List[str]) -> None:
"""Constructor for NodeStateAgent.
Initializes state ID of each node provided in `node_ids` to None (will maintain state,
no previous node state yet)
Args:
node_ids: list of node IDs of the nodes for which to maintain state ID
"""
self._collection_state_ids: Dict[str, str] = { # Mapping <node_id, state_id>
node_id: None for node_id in node_ids
}
Functions
get_last_node_states
get_last_node_states()
Returns a dictionary mapping Nodes
replies.
If used before the end of the first Round, each state_id is set to None
Returns:
Type | Description |
---|---|
Dict[str, str] | Mapping of |
Source code in fedbiomed/researcher/node_state_agent.py
def get_last_node_states(self) -> Dict[str, str]:
"""Returns a dictionary mapping <node_id, state_id> from latest `Nodes` replies.
If used before the end of the first Round, each state_id is set to None
Returns:
Mapping of `<node_id, state_id>`
"""
return self._collection_state_ids
load_state_breakpoint
load_state_breakpoint(node_state)
Loads NodeStateAgent's state from saved state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_state | Dict | state of | required |
Source code in fedbiomed/researcher/node_state_agent.py
def load_state_breakpoint(self, node_state: Dict):
"""Loads NodeStateAgent's state from saved state.
Args:
node_state: state of `NodeStateAgent` to be loaded.
"""
self._collection_state_ids = node_state.get('collection_state_ids')
save_state_breakpoint
save_state_breakpoint()
NodeStateAgent's state, to be saved in a breakpoint.
Returns:
Type | Description |
---|---|
Dict | Node state for breakpoint |
Source code in fedbiomed/researcher/node_state_agent.py
def save_state_breakpoint(self) -> Dict:
"""NodeStateAgent's state, to be saved in a breakpoint.
Returns:
Node state for breakpoint
"""
return {
'collection_state_ids': self._collection_state_ids
}
update_node_states
update_node_states(all_node_ids, resp=None)
Updates the state_id collection with respect to current nodes and latest Nodes replies.
Adds node IDs contained in node_ids argument that was not part of the previous Round, and discards node_ids that do not belong to the current Round anymore.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
all_node_ids | List[str] | all possible nodes that can participate to the training. | required |
resp | optional | latest Nodes replies. Defaults to None. | None |
Raises:
Type | Description |
---|---|
FedbiomedNodeStateAgentError | raised if Nodes replies have a missing entry that needs to be collected. |
Source code in fedbiomed/researcher/node_state_agent.py
def update_node_states(self, all_node_ids: List[str], resp: Optional[Dict] = None):
"""Updates the state_id collection with respect to current nodes and latest Nodes replies.
Adds node IDs contained in node_ids argument that was not part of the previous Round, and discards node_ids that
do not belong to the current Round anymore.
Args:
all_node_ids: all possible nodes that can participate to the training.
resp (optional): latest Nodes replies. Defaults to None.
Raises:
FedbiomedNodeStateAgentError: raised if Nodes replies have a missing entry that needs to be collected.
"""
# first, we update _collection_state_id wrt new FederatedDataset (if it has been modified)
self._update_collection_state_ids(all_node_ids)
if resp is not None:
for node_reply in resp.values():
# adds Node replies
try:
node_id, state_id = node_reply['node_id'], node_reply['state_id']
except KeyError as ke:
raise FedbiomedNodeStateAgentError(f"{ErrorNumbers.FB419.value}: Missing entry in Response") from ke
if node_id in self._collection_state_ids:
self._collection_state_ids[node_id] = state_id