Core code of the node component.
Attributes
Classes
Node
Node(dataset_manager, tp_security_manager, node_args=None)
Core code of the node component.
Defines the behaviour of the node, while communicating with the researcher through the Messaging
, parsing messages from the researcher, etiher treating them instantly or queuing them, executing tasks requested by researcher stored in the queue.
Attributes:
Name | Type | Description |
---|---|---|
dataset_manager |
| |
tp_security_manager |
| |
node_args | Command line arguments for node. |
Source code in fedbiomed/node/node.py
def __init__(
self,
dataset_manager: DatasetManager,
tp_security_manager: TrainingPlanSecurityManager,
node_args: Union[dict, None] = None,
):
"""Constructor of the class.
Attributes:
dataset_manager: `DatasetManager` object for managing the node's datasets.
tp_security_manager: `TrainingPlanSecurityManager` object managing the node's
training plans.
node_args: Command line arguments for node.
"""
self._tasks_queue = TasksQueue(
environ["MESSAGES_QUEUE_DIR"], environ["TMP_DIR"]
)
res = environ["RESEARCHERS"][0]
self._grpc_client = GrpcController(
node_id=environ["ID"],
researchers=[
ResearcherCredentials(
port=res["port"], host=res["ip"], certificate=res["certificate"]
)
],
on_message=self.on_message,
)
self._pending_requests = EventWaitExchange(remove_delivered=True)
self._controller_data = EventWaitExchange(remove_delivered=False)
self._n2n_router = NodeToNodeRouter(self._grpc_client, self._pending_requests, self._controller_data)
self.dataset_manager = dataset_manager
self.tp_security_manager = tp_security_manager
self.node_args = node_args
Attributes
dataset_manager instance-attribute
dataset_manager = dataset_manager
node_args instance-attribute
node_args = node_args
tp_security_manager instance-attribute
tp_security_manager = tp_security_manager
Functions
add_task
add_task(task)
Adds a task to the pending tasks queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | dict | A | required |
Source code in fedbiomed/node/node.py
def add_task(self, task: dict):
"""Adds a task to the pending tasks queue.
Args:
task: A `Message` object describing a training task
"""
self._tasks_queue.add(task)
is_connected
is_connected()
Checks if node is ready for communication with researcher
Returns:
Type | Description |
---|---|
bool | True if node is ready, False if node is not ready |
Source code in fedbiomed/node/node.py
def is_connected(self) -> bool:
"""Checks if node is ready for communication with researcher
Returns:
True if node is ready, False if node is not ready
"""
return self._grpc_client.is_connected()
on_message
on_message(msg)
Handler to be used with Messaging
class (ie the messager).
Called when a message arrives through the Messaging
. It reads and triggers instructions received by node from Researcher, mainly: - ping requests, - train requests (then a new task will be added on node's task queue), - search requests (for searching data in node's database).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg | dict | Incoming message from Researcher. | required |
Source code in fedbiomed/node/node.py
def on_message(self, msg: dict):
"""Handler to be used with `Messaging` class (ie the messager).
Called when a message arrives through the `Messaging`.
It reads and triggers instructions received by node from Researcher,
mainly:
- ping requests,
- train requests (then a new task will be added on node's task queue),
- search requests (for searching data in node's database).
Args:
msg: Incoming message from Researcher.
"""
message: Message
# Deserialize message
try:
message = Message.from_dict(msg)
except FedbiomedError as e:
logger.error(e) # Message was not properly formatted
resid = msg.get("researcher_id", "unknown_researcher_id")
self.send_error(
ErrorNumbers.FB301,
extra_msg="Message was not properly formatted",
researcher_id=resid,
)
else:
no_print = [
"aggregator_args",
"aux_vars",
"params",
"training_plan",
"overlay",
]
msg_print = {
key: value
for key, value in message.get_dict().items()
if key not in no_print
}
logger.debug("Message received: " + str(msg_print))
match message.__name__:
case (
TrainRequest.__name__
| SecaggRequest.__name__
| AdditiveSSSetupRequest.__name__
):
self.add_task(message)
case SecaggDeleteRequest.__name__:
self._task_secagg_delete(message)
case OverlayMessage.__name__:
self._n2n_router.submit(message)
case SearchRequest.__name__:
databases = self.dataset_manager.search_by_tags(message.tags)
if len(databases) != 0:
databases = self.dataset_manager.obfuscate_private_information(
databases
)
self._grpc_client.send(
SearchReply(
request_id=message.request_id,
node_id=environ["NODE_ID"],
researcher_id=message.researcher_id,
databases=databases,
count=len(databases),
)
)
case ListRequest.__name__:
# Get list of all datasets
databases = self.dataset_manager.list_my_data(verbose=False)
databases = self.dataset_manager.obfuscate_private_information(databases)
self._grpc_client.send(
ListReply(
success=True,
request_id=message.request_id,
node_id=environ['NODE_ID'],
researcher_id=message.researcher_id,
databases=databases,
count=len(databases),
)
)
case PingRequest.__name__:
self._grpc_client.send(
PingReply(
request_id=message.request_id,
researcher_id=message.researcher_id,
node_id=environ["ID"],
)
)
case ApprovalRequest.__name__:
reply = (
self.tp_security_manager.reply_training_plan_approval_request(
message
)
)
self._grpc_client.send(reply)
case TrainingPlanStatusRequest.__name__:
reply = self.tp_security_manager.reply_training_plan_status_request(
message
)
self._grpc_client.send(reply)
case _:
resid = msg.get("researcher_id", "unknown_researcher_id")
self.send_error(
ErrorNumbers.FB301,
extra_msg="This request handler is not implemented "
f"{message.__class__.__name__} is not implemented",
researcher_id=resid,
)
parser_task_train
parser_task_train(msg)
Parses a given training task message to create a round instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg | TrainRequest |
| required |
Returns:
Type | Description |
---|---|
Union[Round, None] | a |
Source code in fedbiomed/node/node.py
def parser_task_train(self, msg: TrainRequest) -> Union[Round, None]:
"""Parses a given training task message to create a round instance
Args:
msg: `TrainRequest` message object to parse
Returns:
a `Round` object for the training to perform, or None if no training
"""
round_ = None
hist_monitor = HistoryMonitor(
experiment_id=msg.experiment_id,
researcher_id=msg.researcher_id,
send=self._grpc_client.send,
)
dataset_id = msg.get_param("dataset_id")
data = self.dataset_manager.get_by_id(dataset_id)
if data is None:
return self.send_error(
extra_msg="Did not found proper data in local datasets "
f'on node={environ["NODE_ID"]}',
request_id=msg.request_id,
researcher_id=msg.researcher_id,
errnum=ErrorNumbers.FB313,
)
dlp_and_loading_block_metadata = None
if "dlp_id" in data:
dlp_and_loading_block_metadata = self.dataset_manager.get_dlp_by_id(
data["dlp_id"]
)
round_ = Round(
training_plan=msg.get_param("training_plan"),
training_plan_class=msg.get_param("training_plan_class"),
model_kwargs=msg.get_param("model_args") or {},
training_kwargs=msg.get_param("training_args") or {},
training=msg.get_param("training") or False,
dataset=data,
params=msg.get_param("params"),
experiment_id=msg.get_param("experiment_id"),
researcher_id=msg.get_param("researcher_id"),
history_monitor=hist_monitor,
aggregator_args=msg.get_param("aggregator_args") or None,
node_args=self.node_args,
round_number=msg.get_param("round"),
dlp_and_loading_block_metadata=dlp_and_loading_block_metadata,
aux_vars=msg.get_param("aux_vars"),
)
# the round raises an error if it cannot initialize
err_msg = round_.initialize_arguments(msg.get_param("state_id"))
if err_msg is not None:
self._grpc_client.send(
ErrorMessage(
node_id=environ["ID"],
errnum=ErrorNumbers.FB300,
researcher_id=msg.researcher_id,
extra_msg="Could not initialize arguments",
)
)
return round_
send_error
send_error(errnum=ErrorNumbers.FB300, extra_msg='', researcher_id='<unknown>', broadcast=False, request_id=None)
Sends an error message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
errnum | ErrorNumbers | Code of the error. | FB300 |
extra_msg | str | Additional human readable error message. | '' |
researcher_id | str | Destination researcher. | '<unknown>' |
broadcast | bool | Broadcast the message all available researchers regardless of specific researcher. | False |
request_id | str | Optional request i to reply as error to a request. | None |
Source code in fedbiomed/node/node.py
def send_error(
self,
errnum: ErrorNumbers = ErrorNumbers.FB300,
extra_msg: str = "",
researcher_id: str = "<unknown>",
broadcast: bool = False,
request_id: str = None,
):
"""Sends an error message.
Args:
errnum: Code of the error.
extra_msg: Additional human readable error message.
researcher_id: Destination researcher.
broadcast: Broadcast the message all available researchers
regardless of specific researcher.
request_id: Optional request i to reply as error to a request.
"""
try:
logger.error(extra_msg)
self._grpc_client.send(
ErrorMessage(
request_id=request_id,
errnum=errnum.name,
node_id=environ["NODE_ID"],
extra_msg=extra_msg,
researcher_id=researcher_id,
),
broadcast=broadcast,
)
except Exception as e:
logger.error(f"{ErrorNumbers.FB601.value}: Cannot send error message: {e}")
start_messaging
start_messaging(on_finish=None)
Calls the start method of messaging class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
on_finish | Optional[Callable] | Called when the tasks for handling all known researchers have finished. Callable has no argument. | None |
Source code in fedbiomed/node/node.py
def start_messaging(self, on_finish: Optional[Callable] = None):
"""Calls the start method of messaging class.
Args:
on_finish: Called when the tasks for handling all known researchers have finished.
Callable has no argument.
"""
self._grpc_client.start(on_finish)
start_protocol
start_protocol()
Start the node to node router thread, for handling node to node message
Source code in fedbiomed/node/node.py
def start_protocol(self) -> None:
"""Start the node to node router thread, for handling node to node message"""
self._n2n_router.start()
task_manager
task_manager()
Manages training tasks in the queue.
Source code in fedbiomed/node/node.py
def task_manager(self):
"""Manages training tasks in the queue."""
while True:
item: Message = self._tasks_queue.get()
# don't want to treat again in case of failure
self._tasks_queue.task_done()
logger.info(
f"[TASKS QUEUE] Task received by task manager: "
f"Researcher: {item.researcher_id} "
f"Experiment: {item.experiment_id}"
)
try:
match item.__name__:
case TrainRequest.__name__:
round_ = self.parser_task_train(item)
# once task is out of queue, initiate training rounds
if round_ is not None:
msg = round_.run_model_training(
secagg_arguments=item.get_param("secagg_arguments"),
)
msg.request_id = item.request_id
self._grpc_client.send(msg)
del round_
case SecaggRequest.__name__ | AdditiveSSSetupRequest.__name__:
self._task_secagg(item)
case _:
errmess = f"{ErrorNumbers.FB319.value}: Undefined request message"
self.send_error(errnum=ErrorNumbers.FB319, extra_msg=errmess)
# TODO: Test exception
except Exception as e:
self.send_error(
request_id=item.request_id,
researcher_id=item.researcher_id,
errnum=ErrorNumbers.FB300,
extra_msg="Round error: " + str(e),
)