Core code of the node component.
Attributes
Classes
Node
Node(config, node_args=None)
Core code of the node component.
Defines the behaviour of the node, while communicating with the researcher through the Messaging, parsing messages from the researcher, either treating them instantly or queuing them, executing tasks requested by researcher stored in the queue.
Attributes:
| Name | Type | Description |
|---|---|---|
config | Node configuration | |
node_args | Command line arguments for node. |
Source code in fedbiomed/node/node.py
def __init__(
self,
config: NodeConfig,
node_args: Union[dict, None] = None,
):
"""Constructor of the class.
Attributes:
config: Node configuration
node_args: Command line arguments for node.
"""
self._config = config
self._node_id = self._config.get("default", "id")
self._node_name = self._config.get("default", "name")
self._tasks_queue = TasksQueue(
os.path.join(self._config.root, "var", f"queue_{self._node_id}"),
str(os.path.join(self._config.root, "var", "tmp")),
)
self._grpc_client = GrpcController(
node_id=self._node_id,
researchers=[
ResearcherCredentials(
port=self._config.get("researcher", "port"),
host=self._config.get("researcher", "ip"),
certificate=self._config.get(
"researcher", "certificate", fallback=None
),
)
],
on_message=self.on_message,
)
self._db_path = os.path.abspath(
os.path.join(
self._config.root, CONFIG_FOLDER_NAME, self._config.get("default", "db")
)
)
self._pending_requests = EventWaitExchange(remove_delivered=True)
self._controller_data = EventWaitExchange(remove_delivered=False)
self._n2n_router = NodeToNodeRouter(
self._node_id,
self._db_path,
self._grpc_client,
self._pending_requests,
self._controller_data,
)
self.dataset_manager = DatasetManager(path=self._db_path)
self.tp_security_manager = TrainingPlanSecurityManager(
db=self._db_path,
node_id=self._node_id,
node_name=self._node_name,
hashing=self._config.get("security", "hashing_algorithm"),
tp_approval=self._config.getbool("security", "training_plan_approval"),
)
self.node_args = node_args
Attributes
config property
config
Return node config
dataset_manager instance-attribute
dataset_manager = DatasetManager(path=(_db_path))
node_args instance-attribute
node_args = node_args
node_id property
node_id
Returns id of the node
node_name property
node_name
Returns id of the node
tp_security_manager instance-attribute
tp_security_manager = TrainingPlanSecurityManager(db=(_db_path), node_id=(_node_id), node_name=(_node_name), hashing=(get('security', 'hashing_algorithm')), tp_approval=(getbool('security', 'training_plan_approval')))
Functions
add_task
add_task(task)
Adds a task to the pending tasks queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task | dict | A | required |
Source code in fedbiomed/node/node.py
def add_task(self, task: dict):
"""Adds a task to the pending tasks queue.
Args:
task: A `Message` object describing a training task
"""
self._tasks_queue.add(task)
is_connected
is_connected()
Checks if node is ready for communication with researcher
Returns:
| Type | Description |
|---|---|
bool | True if node is ready, False if node is not ready |
Source code in fedbiomed/node/node.py
def is_connected(self) -> bool:
"""Checks if node is ready for communication with researcher
Returns:
True if node is ready, False if node is not ready
"""
return self._grpc_client.is_connected()
on_message
on_message(msg)
Handler to be used with Messaging class (ie the messager).
Called when a message arrives through the Messaging. It reads and triggers instructions received by node from Researcher, mainly: - ping requests, - train requests (then a new task will be added on node's task queue), - search requests (for searching data in node's database).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
msg | dict | Incoming message from Researcher. | required |
Source code in fedbiomed/node/node.py
def on_message(self, msg: dict):
"""Handler to be used with `Messaging` class (ie the messager).
Called when a message arrives through the `Messaging`.
It reads and triggers instructions received by node from Researcher,
mainly:
- ping requests,
- train requests (then a new task will be added on node's task queue),
- search requests (for searching data in node's database).
Args:
msg: Incoming message from Researcher.
"""
message: Message
try:
message = Message.from_dict(msg)
except FedbiomedError as e:
logger.error(e) # Message was not properly formatted
resid = msg.get("researcher_id", "unknown_researcher_id")
self.send_error(
ErrorNumbers.FB301,
extra_msg="Message was not properly formatted",
researcher_id=resid,
)
else:
no_print = [
"aggregator_args",
"optim_aux_var",
"params",
"training_plan",
"overlay",
]
msg_print = {
key: value
for key, value in message.get_dict().items()
if key not in no_print
}
logger.debug("Message received: " + str(msg_print))
match message.__name__:
case (
TrainRequest.__name__
| SecaggRequest.__name__
| AdditiveSSSetupRequest.__name__
):
self.add_task(message)
case SecaggDeleteRequest.__name__:
self._task_secagg_delete(message)
case OverlayMessage.__name__:
self._n2n_router.submit(message)
case SearchRequest.__name__:
databases = self.dataset_manager.dataset_table.search_by_tags(
message.tags
)
if len(databases) != 0:
databases = self.dataset_manager.obfuscate_private_information(
databases
)
self._grpc_client.send(
SearchReply(
request_id=message.request_id,
node_id=self._node_id,
node_name=self._node_name,
researcher_id=message.researcher_id,
databases=databases,
count=len(databases),
)
)
case ListRequest.__name__:
# Get list of all datasets
databases = self.dataset_manager.list_my_datasets(verbose=False)
databases = self.dataset_manager.obfuscate_private_information(
databases
)
self._grpc_client.send(
ListReply(
success=True,
request_id=message.request_id,
node_id=self._node_id,
node_name=self._node_name,
researcher_id=message.researcher_id,
databases=databases,
count=len(databases),
)
)
case PingRequest.__name__:
self._grpc_client.send(
PingReply(
request_id=message.request_id,
researcher_id=message.researcher_id,
node_id=self._node_id,
node_name=self._node_name,
)
)
case ApprovalRequest.__name__:
reply = (
self.tp_security_manager.reply_training_plan_approval_request(
message
)
)
self._grpc_client.send(reply)
case TrainingPlanStatusRequest.__name__:
reply = self.tp_security_manager.reply_training_plan_status_request(
message
)
self._grpc_client.send(reply)
case _:
resid = msg.get("researcher_id", "unknown_researcher_id")
self.send_error(
ErrorNumbers.FB301,
extra_msg="This request handler is not implemented "
f"{message.__class__.__name__} is not implemented",
researcher_id=resid,
)
parser_task_train
parser_task_train(msg)
Parses a given training task message to create a round instance
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
msg | TrainRequest |
| required |
Returns:
| Type | Description |
|---|---|
Union[Round, None] | a |
Source code in fedbiomed/node/node.py
def parser_task_train(self, msg: TrainRequest) -> Union[Round, None]:
"""Parses a given training task message to create a round instance
Args:
msg: `TrainRequest` message object to parse
Returns:
a `Round` object for the training to perform, or None if no training
"""
round_ = None
hist_monitor = HistoryMonitor(
node_id=self._node_id,
node_name=self._node_name,
experiment_id=msg.experiment_id,
researcher_id=msg.researcher_id,
send=self._grpc_client.send,
)
dataset_id = msg.get_param("dataset_id")
data = self.dataset_manager.dataset_table.get_by_id(dataset_id)
if data is None:
return self.send_error(
extra_msg="Did not found proper data in local datasets "
f"on node={self._node_id}",
request_id=msg.request_id,
researcher_id=msg.researcher_id,
errnum=ErrorNumbers.FB313,
)
dlp_and_loading_block_metadata = None
if "dlp_id" in data:
dlp_and_loading_block_metadata = self.dataset_manager.get_dlp_by_id(
data["dlp_id"]
)
round_ = Round(
root_dir=self._config.root,
db=self._db_path,
node_id=self._node_id,
node_name=self._node_name,
training_plan=msg.get_param("training_plan"),
training_plan_class=msg.get_param("training_plan_class"),
model_kwargs=msg.get_param("model_args") or {},
training_kwargs=msg.get_param("training_args") or {},
training=msg.get_param("training") or False,
dataset=data,
params=msg.get_param("params"),
experiment_id=msg.get_param("experiment_id"),
researcher_id=msg.get_param("researcher_id"),
history_monitor=hist_monitor,
aggregator_args=msg.get_param("aggregator_args") or None,
node_args=self.node_args,
tp_security_manager=self.tp_security_manager,
round_number=msg.get_param("round"),
dlp_and_loading_block_metadata=dlp_and_loading_block_metadata,
aux_vars=msg.get_param("optim_aux_var"),
)
# the round raises an error if it cannot initialize
err_msg = round_.initialize_arguments(msg.get_param("state_id"))
if err_msg is not None:
self._grpc_client.send(
ErrorMessage(
node_id=self._node_id,
errnum=ErrorNumbers.FB300,
researcher_id=msg.researcher_id,
extra_msg="Could not initialize arguments",
)
)
return round_
send_error
send_error(errnum=ErrorNumbers.FB300, extra_msg='', researcher_id='<unknown>', broadcast=False, request_id=None)
Sends an error message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
errnum | ErrorNumbers | Code of the error. | FB300 |
extra_msg | str | Additional human readable error message. | '' |
researcher_id | str | Destination researcher. | '<unknown>' |
broadcast | bool | Broadcast the message all available researchers regardless of specific researcher. | False |
request_id | str | Optional request i to reply as error to a request. | None |
Source code in fedbiomed/node/node.py
def send_error(
self,
errnum: ErrorNumbers = ErrorNumbers.FB300,
extra_msg: str = "",
researcher_id: str = "<unknown>",
broadcast: bool = False,
request_id: str = None,
):
"""Sends an error message.
Args:
errnum: Code of the error.
extra_msg: Additional human readable error message.
researcher_id: Destination researcher.
broadcast: Broadcast the message all available researchers
regardless of specific researcher.
request_id: Optional request i to reply as error to a request.
"""
try:
logger.error(extra_msg)
self._grpc_client.send(
ErrorMessage(
request_id=request_id,
errnum=errnum.name,
node_id=self._node_id,
node_name=self._node_name,
extra_msg=extra_msg,
researcher_id=researcher_id,
),
broadcast=broadcast,
)
except Exception as e:
logger.error(f"{ErrorNumbers.FB601.value}: Cannot send error message: {e}")
start_messaging
start_messaging(on_finish=None)
Calls the start method of messaging class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_finish | Optional[Callable] | Called when the tasks for handling all known researchers have finished. Callable has no argument. | None |
Source code in fedbiomed/node/node.py
def start_messaging(self, on_finish: Optional[Callable] = None):
"""Calls the start method of messaging class.
Args:
on_finish: Called when the tasks for handling all known researchers have finished.
Callable has no argument.
"""
self._grpc_client.start(on_finish)
start_protocol
start_protocol()
Start the node to node router thread, for handling node to node message
Source code in fedbiomed/node/node.py
def start_protocol(self) -> None:
"""Start the node to node router thread, for handling node to node message"""
self._n2n_router.start()
task_manager
task_manager()
Manages training tasks in the queue.
Source code in fedbiomed/node/node.py
def task_manager(self):
"""Manages training tasks in the queue."""
while True:
item: TrainRequest = self._tasks_queue.get()
# don't want to treat again in case of failure
self._tasks_queue.task_done()
logger.info(
f"[TASKS QUEUE] Task received by task manager: "
f"Researcher: {item.researcher_id} "
f"Experiment: {item.experiment_id}"
)
try:
match item.__name__:
case TrainRequest.__name__:
round_ = self.parser_task_train(item)
# once task is out of queue, initiate training rounds
if round_ is not None:
msg = round_.run_model_training(
tp_approval=self._config.getbool(
"security", "training_plan_approval"
),
secagg_insecure_validation=self._config.getbool(
"security", "secagg_insecure_validation"
),
secagg_active=self._config.getbool(
"security", "secure_aggregation"
),
force_secagg=self._config.getbool(
"security", "force_secure_aggregation"
),
secagg_arguments=item.get_param("secagg_arguments"),
)
msg.request_id = item.request_id
self._grpc_client.send(msg)
del round_
case SecaggRequest.__name__ | AdditiveSSSetupRequest.__name__:
self._task_secagg(item)
case _:
errmess = (
f"{ErrorNumbers.FB319.value}: Undefined request message"
)
self.send_error(errnum=ErrorNumbers.FB319, extra_msg=errmess)
# TODO: Test exception
except Exception as e:
self.send_error(
request_id=item.request_id,
researcher_id=item.researcher_id,
errnum=ErrorNumbers.FB300,
extra_msg="Round error: " + str(e),
)