Core code of the node component.
Attributes
Classes
Node
Node(dataset_manager, tp_security_manager, node_args=None)
Core code of the node component.
Defines the behaviour of the node, while communicating with the researcher through the Messaging
, parsing messages from the researcher, etiher treating them instantly or queuing them, executing tasks requested by researcher stored in the queue.
Attributes:
Name | Type | Description |
---|---|---|
dataset_manager |
| |
tp_security_manager |
| |
node_args | Command line arguments for node. |
Source code in fedbiomed/node/node.py
def __init__(self,
dataset_manager: DatasetManager,
tp_security_manager: TrainingPlanSecurityManager,
node_args: Union[dict, None] = None):
"""Constructor of the class.
Attributes:
dataset_manager: `DatasetManager` object for managing the node's datasets.
tp_security_manager: `TrainingPlanSecurityManager` object managing the node's training plans.
node_args: Command line arguments for node.
"""
self.tasks_queue = TasksQueue(environ['MESSAGES_QUEUE_DIR'], environ['TMP_DIR'])
self.messaging = Messaging(self.on_message, ComponentType.NODE,
environ['NODE_ID'], environ['MQTT_BROKER'], environ['MQTT_BROKER_PORT'])
self.dataset_manager = dataset_manager
self.tp_security_manager = tp_security_manager
self.node_args = node_args
Attributes
dataset_manager instance-attribute
dataset_manager = dataset_manager
messaging instance-attribute
messaging = Messaging(self.on_message, ComponentType.NODE, environ['NODE_ID'], environ['MQTT_BROKER'], environ['MQTT_BROKER_PORT'])
node_args instance-attribute
node_args = node_args
tasks_queue instance-attribute
tasks_queue = TasksQueue(environ['MESSAGES_QUEUE_DIR'], environ['TMP_DIR'])
tp_security_manager instance-attribute
tp_security_manager = tp_security_manager
Functions
add_task
add_task(task)
Adds a task to the pending tasks queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | dict | A | required |
Source code in fedbiomed/node/node.py
def add_task(self, task: dict):
"""Adds a task to the pending tasks queue.
Args:
task: A `Message` object describing a training task
"""
self.tasks_queue.add(task)
on_message
on_message(msg, topic=None)
Handler to be used with Messaging
class (ie the messager).
Called when a message arrives through the Messaging
. It reads and triggers instructions received by node from Researcher, mainly: - ping requests, - train requests (then a new task will be added on node's task queue), - search requests (for searching data in node's database).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg | dict | Incoming message from Researcher. Must contain key named | required |
topic | str | Messaging topic name, decision (specially on researcher) may be done regarding of the topic. Currently unused. | None |
Source code in fedbiomed/node/node.py
def on_message(self, msg: dict, topic: str = None):
"""Handler to be used with `Messaging` class (ie the messager).
Called when a message arrives through the `Messaging`.
It reads and triggers instructions received by node from Researcher,
mainly:
- ping requests,
- train requests (then a new task will be added on node's task queue),
- search requests (for searching data in node's database).
Args:
msg: Incoming message from Researcher.
Must contain key named `command`, describing the nature
of the command (ping requests, train requests,
or search requests).
Should be formatted as a `Message`.
topic: Messaging topic name, decision (specially on researcher) may
be done regarding of the topic. Currently unused.
"""
# TODO: describe all exceptions defined in this method
msg_print = {key: value for key, value in msg.items() if key != 'aggregator_args'}
logger.debug('Message received: ' + str(msg_print))
try:
# get the request from the received message (from researcher)
command = msg['command']
request = NodeMessages.format_incoming_message(msg).get_dict()
if command in ['train', 'secagg']:
# add training task to queue
self.add_task(request)
elif command == 'secagg-delete':
self._task_secagg_delete(NodeMessages.format_incoming_message(msg))
elif command == 'ping':
self.messaging.send_message(
NodeMessages.format_outgoing_message(
{
'researcher_id': msg['researcher_id'],
'node_id': environ['NODE_ID'],
'success': True,
'sequence': msg['sequence'],
'command': 'pong'
}).get_dict())
elif command == 'search':
# Look for databases matching the tags
databases = self.dataset_manager.search_by_tags(msg['tags'])
if len(databases) != 0:
databases = self.dataset_manager.obfuscate_private_information(databases)
# FIXME: what happens if len(database) == 0
self.messaging.send_message(NodeMessages.format_outgoing_message(
{'success': True,
'command': 'search',
'node_id': environ['NODE_ID'],
'researcher_id': msg['researcher_id'],
'databases': databases,
'count': len(databases)}).get_dict())
elif command == 'list':
# Get list of all datasets
databases = self.dataset_manager.list_my_data(verbose=False)
databases = self.dataset_manager.obfuscate_private_information(databases)
self.messaging.send_message(NodeMessages.format_outgoing_message(
{'success': True,
'command': 'list',
'node_id': environ['NODE_ID'],
'researcher_id': msg['researcher_id'],
'databases': databases,
'count': len(databases),
}).get_dict())
elif command == 'approval':
# Ask for training plan approval
self.tp_security_manager.reply_training_plan_approval_request(request, self.messaging)
elif command == 'training-plan-status':
# Check is training plan approved
self.tp_security_manager.reply_training_plan_status_request(request, self.messaging)
else:
raise NotImplementedError('Command not found')
except decoder.JSONDecodeError:
resid = msg.get('researcher_id', 'unknown_researcher_id')
self.send_error(ErrorNumbers.FB301,
extra_msg="Not able to deserialize the message",
researcher_id=resid)
except NotImplementedError:
resid = msg.get('researcher_id', 'unknown_researcher_id')
self.send_error(ErrorNumbers.FB301,
extra_msg=f"Command `{command}` is not implemented",
researcher_id=resid)
except KeyError:
# FIXME: this error could be raised for other missing keys (eg
# researcher_id, ....)
resid = msg.get('researcher_id', 'unknown_researcher_id')
self.send_error(ErrorNumbers.FB301,
extra_msg="'command' property was not found",
researcher_id=resid)
except FedbiomedMessageError: # Message was not properly formatted
resid = msg.get('researcher_id', 'unknown_researcher_id')
self.send_error(ErrorNumbers.FB301,
extra_msg='Message was not properly formatted',
researcher_id=resid)
except TypeError: # Message was not serializable
resid = msg.get('researcher_id', 'unknown_researcher_id')
self.send_error(ErrorNumbers.FB301,
extra_msg='Message was not serializable',
researcher_id=resid)
parser_task_train
parser_task_train(msg)
Parses a given training task message to create a round instance
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg | TrainRequest |
| required |
Returns:
Type | Description |
---|---|
Union[Round, None] | a |
Source code in fedbiomed/node/node.py
def parser_task_train(self, msg: TrainRequest) -> Union[Round, None]:
"""Parses a given training task message to create a round instance
Args:
msg: `TrainRequest` message object to parse
Returns:
a `Round` object for the training to perform, or None if no training
"""
round = None
# msg becomes a TrainRequest object
hist_monitor = HistoryMonitor(job_id=msg.get_param('job_id'),
researcher_id=msg.get_param('researcher_id'),
client=self.messaging)
# Get arguments for the model and training
# NOTE: `get_param` has no real use save for monkey patching in unit tests
# (in real life, if the parameter is missing, an exception will be raised)
model_kwargs = msg.get_param('model_args') or {}
training_kwargs = msg.get_param('training_args') or {}
training_status = msg.get_param('training') or False
training_plan_url = msg.get_param('training_plan_url')
training_plan_class = msg.get_param('training_plan_class')
params_url = msg.get_param('params_url')
job_id = msg.get_param('job_id')
state_id = msg.get_param('state_id')
researcher_id = msg.get_param('researcher_id')
aggregator_args = msg.get_param('aggregator_args') or None
round_number = msg.get_param('round') or 0
aux_var_urls = msg.get_param('aux_var_urls') or None
assert training_plan_url is not None, 'URL for training plan on repository not found.'
assert validators.url(
training_plan_url), 'URL for training plan on repository is not valid.'
assert training_plan_class is not None, 'classname for the training plan and training routine ' \
'was not found in message.'
assert isinstance(
training_plan_class,
str), '`training_plan_class` must be a string corresponding to the classname for the training plan ' \
'and training routine in the repository'
dataset_id = msg.get_param('dataset_id')
data = self.dataset_manager.get_by_id(dataset_id)
if data is None or 'path' not in data.keys():
# FIXME: 'the condition above depends on database model
# if database model changes (ie `path` field removed/
# modified);
# condition above is likely to be false
logger.error('Did not found proper data in local datasets ' +
f'on node={environ["NODE_ID"]}')
self.messaging.send_message(NodeMessages.format_outgoing_message(
{'command': "error",
'node_id': environ['NODE_ID'],
'researcher_id': researcher_id,
'errnum': ErrorNumbers.FB313,
'extra_msg': "Did not found proper data in local datasets"}
).get_dict())
else:
dlp_and_loading_block_metadata = None
if 'dlp_id' in data:
dlp_and_loading_block_metadata = self.dataset_manager.get_dlp_by_id(data['dlp_id'])
round = Round(
model_kwargs,
training_kwargs,
training_status,
data,
training_plan_url,
training_plan_class,
params_url,
job_id,
researcher_id,
hist_monitor,
aggregator_args,
self.node_args,
round_number=round_number,
dlp_and_loading_block_metadata=dlp_and_loading_block_metadata,
aux_var_urls=aux_var_urls,
)
# the round raises an error if it cannot initialize
err_msg = round.initialize_arguments(state_id)
if err_msg is not None:
self.messaging.send_message(
NodeMessages.format_outgoing_message(
{ 'command': 'error',
'node_id': environ['ID'],
'errnum': ErrorNumbers.FB300,
'researcher_id': researcher_id,
'extra_msg': err_msg.get('msg')}
).get_dict())
return round
reply
reply(msg)
Send reply to researcher
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg | dict | | required |
Source code in fedbiomed/node/node.py
def reply(self, msg: dict):
"""Send reply to researcher
Args:
msg:
"""
try:
reply = NodeMessages.format_outgoing_message(
{'node_id': environ['ID'],
**msg}
).get_dict()
except FedbiomedMessageError as e:
logger.error(f"{ErrorNumbers.FB601.value}: {e}")
self.send_error(errnum=ErrorNumbers.FB601, extra_msg=f"{ErrorNumbers.FB601.value}: Can not reply "
f"due to incorrect message type {e}.")
except Exception as e:
logger.error(f"{ErrorNumbers.FB601.value} Unexpected error while creating node reply message {e}")
self.send_error(errnum=ErrorNumbers.FB601, extra_msg=f"{ErrorNumbers.FB601.value}: "
f"Unexpected error occurred")
else:
self.messaging.send_message(reply)
send_error
send_error(errnum, extra_msg='', researcher_id='<unknown>')
Sends an error message.
It is a wrapper of Messaging.send_error()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
errnum | ErrorNumbers | Code of the error. | required |
extra_msg | str | Additional human readable error message. | '' |
researcher_id | str | Destination researcher. | '<unknown>' |
Source code in fedbiomed/node/node.py
def send_error(
self,
errnum: ErrorNumbers,
extra_msg: str = "",
researcher_id: str = "<unknown>"
):
"""Sends an error message.
It is a wrapper of `Messaging.send_error()`.
Args:
errnum: Code of the error.
extra_msg: Additional human readable error message.
researcher_id: Destination researcher.
"""
#
self.messaging.send_error(errnum=errnum, extra_msg=extra_msg, researcher_id=researcher_id)
start_messaging
start_messaging(block=False)
Calls the start method of messaging class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
block | Optional[bool] | Whether messager is blocking (or not). Defaults to False. | False |
Source code in fedbiomed/node/node.py
def start_messaging(self, block: Optional[bool] = False):
"""Calls the start method of messaging class.
Args:
block: Whether messager is blocking (or not). Defaults to False.
"""
self.messaging.start(block)
task_manager
task_manager()
Manages training tasks in the queue.
Source code in fedbiomed/node/node.py
def task_manager(self):
"""Manages training tasks in the queue.
"""
while True:
item = self.tasks_queue.get()
item_print = {key: value for key, value in item.items() if key != 'aggregator_args'}
logger.debug('[TASKS QUEUE] Item:' + str(item_print))
try:
item = NodeMessages.format_incoming_message(item)
command = item.get_param('command')
except Exception as e:
# send an error message back to network if something wrong occured
self.messaging.send_message(
NodeMessages.format_outgoing_message(
{
'command': 'error',
'extra_msg': str(e),
'node_id': environ['NODE_ID'],
'researcher_id': 'NOT_SET',
'errnum': ErrorNumbers.FB300
}
).get_dict()
)
else:
if command == 'train':
try:
round = self.parser_task_train(item)
# once task is out of queue, initiate training rounds
if round is not None:
# iterate over each dataset found
# in the current round (here round refers
# to a round to be done on a specific dataset).
msg = round.run_model_training(
secagg_arguments={
'secagg_servkey_id': item.get_param('secagg_servkey_id'),
'secagg_biprime_id': item.get_param('secagg_biprime_id'),
'secagg_random': item.get_param('secagg_random'),
'secagg_clipping_range': item.get_param('secagg_clipping_range')
}
)
self.messaging.send_message(msg)
except Exception as e:
# send an error message back to network if something
# wrong occured
self.messaging.send_message(
NodeMessages.format_outgoing_message(
{
'command': 'error',
'extra_msg': str(e),
'node_id': environ['NODE_ID'],
'researcher_id': 'NOT_SET',
'errnum': ErrorNumbers.FB300
}
).get_dict()
)
logger.debug(f"{ErrorNumbers.FB300.value}: {e}")
elif command == 'secagg':
self._task_secagg(item)
else:
errmess = f'{ErrorNumbers.FB319.value}: "{command}"'
logger.error(errmess)
self.send_error(errnum=ErrorNumbers.FB319, extra_msg=errmess)
self.tasks_queue.task_done()