Node

Core code of the node component.

Attributes

Classes

Node

Node(dataset_manager, tp_security_manager, node_args=None)

Core code of the node component.

Defines the behaviour of the node, while communicating with the researcher through the Messaging, parsing messages from the researcher, etiher treating them instantly or queuing them, executing tasks requested by researcher stored in the queue.

Attributes:

Name Type Description
dataset_manager

DatasetManager object for managing the node's datasets.

tp_security_manager

TrainingPlanSecurityManager object managing the node's training plans.

node_args

Command line arguments for node.

Source code in fedbiomed/node/node.py
def __init__(self,
             dataset_manager: DatasetManager,
             tp_security_manager: TrainingPlanSecurityManager,
             node_args: Union[dict, None] = None):
    """Constructor of the class.

    Attributes:
        dataset_manager: `DatasetManager` object for managing the node's datasets.
        tp_security_manager: `TrainingPlanSecurityManager` object managing the node's training plans.
        node_args: Command line arguments for node.
    """

    self.tasks_queue = TasksQueue(environ['MESSAGES_QUEUE_DIR'], environ['TMP_DIR'])
    self.messaging = Messaging(self.on_message, ComponentType.NODE,
                               environ['NODE_ID'], environ['MQTT_BROKER'], environ['MQTT_BROKER_PORT'])
    self.dataset_manager = dataset_manager
    self.tp_security_manager = tp_security_manager

    self.node_args = node_args

Attributes

dataset_manager instance-attribute
dataset_manager = dataset_manager
messaging instance-attribute
messaging = Messaging(self.on_message, ComponentType.NODE, environ['NODE_ID'], environ['MQTT_BROKER'], environ['MQTT_BROKER_PORT'])
node_args instance-attribute
node_args = node_args
tasks_queue instance-attribute
tasks_queue = TasksQueue(environ['MESSAGES_QUEUE_DIR'], environ['TMP_DIR'])
tp_security_manager instance-attribute
tp_security_manager = tp_security_manager

Functions

add_task
add_task(task)

Adds a task to the pending tasks queue.

Parameters:

Name Type Description Default
task dict

A Message object describing a training task

required
Source code in fedbiomed/node/node.py
def add_task(self, task: dict):
    """Adds a task to the pending tasks queue.

    Args:
        task: A `Message` object describing a training task
    """
    self.tasks_queue.add(task)
on_message
on_message(msg, topic=None)

Handler to be used with Messaging class (ie the messager).

Called when a message arrives through the Messaging. It reads and triggers instructions received by node from Researcher, mainly: - ping requests, - train requests (then a new task will be added on node's task queue), - search requests (for searching data in node's database).

Parameters:

Name Type Description Default
msg dict

Incoming message from Researcher. Must contain key named command, describing the nature of the command (ping requests, train requests, or search requests). Should be formatted as a Message.

required
topic str

Messaging topic name, decision (specially on researcher) may be done regarding of the topic. Currently unused.

None
Source code in fedbiomed/node/node.py
def on_message(self, msg: dict, topic: str = None):
    """Handler to be used with `Messaging` class (ie the messager).

    Called when a  message arrives through the `Messaging`.
    It reads and triggers instructions received by node from Researcher,
    mainly:
    - ping requests,
    - train requests (then a new task will be added on node's task queue),
    - search requests (for searching data in node's database).

    Args:
        msg: Incoming message from Researcher.
            Must contain key named `command`, describing the nature
            of the command (ping requests, train requests,
            or search requests).
            Should be formatted as a `Message`.
        topic: Messaging topic name, decision (specially on researcher) may
            be done regarding of the topic. Currently unused.
    """
    # TODO: describe all exceptions defined in this method
    msg_print = {key: value for key, value in msg.items() if key != 'aggregator_args'}
    logger.debug('Message received: ' + str(msg_print))
    try:
        # get the request from the received message (from researcher)
        command = msg['command']
        request = NodeMessages.format_incoming_message(msg).get_dict()
        if command in ['train', 'secagg']:
            # add training task to queue
            self.add_task(request)
        elif command == 'secagg-delete':
            self._task_secagg_delete(NodeMessages.format_incoming_message(msg))
        elif command == 'ping':
            self.messaging.send_message(
                NodeMessages.format_outgoing_message(
                    {
                        'researcher_id': msg['researcher_id'],
                        'node_id': environ['NODE_ID'],
                        'success': True,
                        'sequence': msg['sequence'],
                        'command': 'pong'
                    }).get_dict())
        elif command == 'search':
            # Look for databases matching the tags
            databases = self.dataset_manager.search_by_tags(msg['tags'])
            if len(databases) != 0:
                databases = self.dataset_manager.obfuscate_private_information(databases)
                # FIXME: what happens if len(database) == 0
                self.messaging.send_message(NodeMessages.format_outgoing_message(
                    {'success': True,
                     'command': 'search',
                     'node_id': environ['NODE_ID'],
                     'researcher_id': msg['researcher_id'],
                     'databases': databases,
                     'count': len(databases)}).get_dict())
        elif command == 'list':
            # Get list of all datasets
            databases = self.dataset_manager.list_my_data(verbose=False)
            databases = self.dataset_manager.obfuscate_private_information(databases)
            self.messaging.send_message(NodeMessages.format_outgoing_message(
                {'success': True,
                 'command': 'list',
                 'node_id': environ['NODE_ID'],
                 'researcher_id': msg['researcher_id'],
                 'databases': databases,
                 'count': len(databases),
                 }).get_dict())
        elif command == 'approval':
            # Ask for training plan approval
            self.tp_security_manager.reply_training_plan_approval_request(request, self.messaging)
        elif command == 'training-plan-status':
            # Check is training plan approved
            self.tp_security_manager.reply_training_plan_status_request(request, self.messaging)

        else:
            raise NotImplementedError('Command not found')
    except decoder.JSONDecodeError:
        resid = msg.get('researcher_id', 'unknown_researcher_id')
        self.send_error(ErrorNumbers.FB301,
                        extra_msg="Not able to deserialize the message",
                        researcher_id=resid)
    except NotImplementedError:
        resid = msg.get('researcher_id', 'unknown_researcher_id')
        self.send_error(ErrorNumbers.FB301,
                        extra_msg=f"Command `{command}` is not implemented",
                        researcher_id=resid)
    except KeyError:
        # FIXME: this error could be raised for other missing keys (eg
        # researcher_id, ....)
        resid = msg.get('researcher_id', 'unknown_researcher_id')
        self.send_error(ErrorNumbers.FB301,
                        extra_msg="'command' property was not found",
                        researcher_id=resid)
    except FedbiomedMessageError:  # Message was not properly formatted
        resid = msg.get('researcher_id', 'unknown_researcher_id')
        self.send_error(ErrorNumbers.FB301,
                        extra_msg='Message was not properly formatted',
                        researcher_id=resid)
    except TypeError:  # Message was not serializable
        resid = msg.get('researcher_id', 'unknown_researcher_id')
        self.send_error(ErrorNumbers.FB301,
                        extra_msg='Message was not serializable',
                        researcher_id=resid)
parser_task_train
parser_task_train(msg)

Parses a given training task message to create a round instance

Parameters:

Name Type Description Default
msg TrainRequest

TrainRequest message object to parse

required

Returns:

Type Description
Union[Round, None]

a Round object for the training to perform, or None if no training

Source code in fedbiomed/node/node.py
def parser_task_train(self, msg: TrainRequest) -> Union[Round, None]:
    """Parses a given training task message to create a round instance

    Args:
        msg: `TrainRequest` message object to parse

    Returns:
        a `Round` object for the training to perform, or None if no training
    """
    round = None
    # msg becomes a TrainRequest object
    hist_monitor = HistoryMonitor(job_id=msg.get_param('job_id'),
                                  researcher_id=msg.get_param('researcher_id'),
                                  client=self.messaging)
    # Get arguments for the model and training
    # NOTE: `get_param` has no real use save for monkey patching in unit tests
    # (in real life, if the parameter is missing, an exception will be raised)
    model_kwargs = msg.get_param('model_args') or {}
    training_kwargs = msg.get_param('training_args') or {}
    training_status = msg.get_param('training') or False
    training_plan_url = msg.get_param('training_plan_url')
    training_plan_class = msg.get_param('training_plan_class')
    params_url = msg.get_param('params_url')
    job_id = msg.get_param('job_id')
    state_id = msg.get_param('state_id')
    researcher_id = msg.get_param('researcher_id')
    aggregator_args = msg.get_param('aggregator_args') or None
    round_number = msg.get_param('round') or 0
    aux_var_urls = msg.get_param('aux_var_urls') or None

    assert training_plan_url is not None, 'URL for training plan on repository not found.'
    assert validators.url(
        training_plan_url), 'URL for training plan on repository is not valid.'
    assert training_plan_class is not None, 'classname for the training plan and training routine ' \
                                            'was not found in message.'

    assert isinstance(
        training_plan_class,
        str), '`training_plan_class` must be a string corresponding to the classname for the training plan ' \
              'and training routine in the repository'

    dataset_id = msg.get_param('dataset_id')
    data = self.dataset_manager.get_by_id(dataset_id)
    if data is None or 'path' not in data.keys():
        # FIXME: 'the condition above depends on database model
        # if database model changes (ie `path` field removed/
        # modified);
        # condition above is likely to be false
        logger.error('Did not found proper data in local datasets ' +
                     f'on node={environ["NODE_ID"]}')
        self.messaging.send_message(NodeMessages.format_outgoing_message(
            {'command': "error",
             'node_id': environ['NODE_ID'],
             'researcher_id': researcher_id,
             'errnum': ErrorNumbers.FB313,
             'extra_msg': "Did not found proper data in local datasets"}
        ).get_dict())
    else:

        dlp_and_loading_block_metadata = None
        if 'dlp_id' in data:
            dlp_and_loading_block_metadata = self.dataset_manager.get_dlp_by_id(data['dlp_id'])
        round = Round(
            model_kwargs,
            training_kwargs,
            training_status,
            data,
            training_plan_url,
            training_plan_class,
            params_url,
            job_id,
            researcher_id,
            hist_monitor,
            aggregator_args,
            self.node_args,
            round_number=round_number,
            dlp_and_loading_block_metadata=dlp_and_loading_block_metadata,
            aux_var_urls=aux_var_urls,
        )

        # the round raises an error if it cannot initialize
        err_msg = round.initialize_arguments(state_id)
        if err_msg is not None:
            self.messaging.send_message(
                NodeMessages.format_outgoing_message(
                    {   'command': 'error',
                        'node_id': environ['ID'],
                        'errnum': ErrorNumbers.FB300,
                        'researcher_id': researcher_id,
                        'extra_msg': err_msg.get('msg')}
                ).get_dict())
    return round
reply
reply(msg)

Send reply to researcher

Parameters:

Name Type Description Default
msg dict
required
Source code in fedbiomed/node/node.py
def reply(self, msg: dict):
    """Send reply to researcher

    Args:
        msg:

    """

    try:
        reply = NodeMessages.format_outgoing_message(
            {'node_id': environ['ID'],
             **msg}
        ).get_dict()
    except FedbiomedMessageError as e:
        logger.error(f"{ErrorNumbers.FB601.value}: {e}")
        self.send_error(errnum=ErrorNumbers.FB601, extra_msg=f"{ErrorNumbers.FB601.value}: Can not reply "
                                                             f"due to incorrect message type {e}.")
    except Exception as e:
        logger.error(f"{ErrorNumbers.FB601.value} Unexpected error while creating node reply message {e}")
        self.send_error(errnum=ErrorNumbers.FB601, extra_msg=f"{ErrorNumbers.FB601.value}: "
                                                             f"Unexpected error occurred")

    else:
        self.messaging.send_message(reply)
send_error
send_error(errnum, extra_msg='', researcher_id='<unknown>')

Sends an error message.

It is a wrapper of Messaging.send_error().

Parameters:

Name Type Description Default
errnum ErrorNumbers

Code of the error.

required
extra_msg str

Additional human readable error message.

''
researcher_id str

Destination researcher.

'<unknown>'
Source code in fedbiomed/node/node.py
def send_error(
        self,
        errnum: ErrorNumbers,
        extra_msg: str = "",
        researcher_id: str = "<unknown>"
):
    """Sends an error message.

    It is a wrapper of `Messaging.send_error()`.

    Args:
        errnum: Code of the error.
        extra_msg: Additional human readable error message.
        researcher_id: Destination researcher.
    """

    #
    self.messaging.send_error(errnum=errnum, extra_msg=extra_msg, researcher_id=researcher_id)
start_messaging
start_messaging(block=False)

Calls the start method of messaging class.

Parameters:

Name Type Description Default
block Optional[bool]

Whether messager is blocking (or not). Defaults to False.

False
Source code in fedbiomed/node/node.py
def start_messaging(self, block: Optional[bool] = False):
    """Calls the start method of messaging class.

    Args:
        block: Whether messager is blocking (or not). Defaults to False.
    """
    self.messaging.start(block)
task_manager
task_manager()

Manages training tasks in the queue.

Source code in fedbiomed/node/node.py
def task_manager(self):
    """Manages training tasks in the queue.
    """

    while True:
        item = self.tasks_queue.get()
        item_print = {key: value for key, value in item.items() if key != 'aggregator_args'}
        logger.debug('[TASKS QUEUE] Item:' + str(item_print))
        try:

            item = NodeMessages.format_incoming_message(item)
            command = item.get_param('command')
        except Exception as e:
            # send an error message back to network if something wrong occured
            self.messaging.send_message(
                NodeMessages.format_outgoing_message(
                    {
                        'command': 'error',
                        'extra_msg': str(e),
                        'node_id': environ['NODE_ID'],
                        'researcher_id': 'NOT_SET',
                        'errnum': ErrorNumbers.FB300
                    }
                ).get_dict()
            )
        else:
            if command == 'train':
                try:
                    round = self.parser_task_train(item)
                    # once task is out of queue, initiate training rounds
                    if round is not None:
                        # iterate over each dataset found
                        # in the current round (here round refers
                        # to a round to be done on a specific dataset).
                        msg = round.run_model_training(
                            secagg_arguments={
                                'secagg_servkey_id': item.get_param('secagg_servkey_id'),
                                'secagg_biprime_id': item.get_param('secagg_biprime_id'),
                                'secagg_random': item.get_param('secagg_random'),
                                'secagg_clipping_range': item.get_param('secagg_clipping_range')
                            }
                        )
                        self.messaging.send_message(msg)
                except Exception as e:
                    # send an error message back to network if something
                    # wrong occured
                    self.messaging.send_message(
                        NodeMessages.format_outgoing_message(
                            {
                                'command': 'error',
                                'extra_msg': str(e),
                                'node_id': environ['NODE_ID'],
                                'researcher_id': 'NOT_SET',
                                'errnum': ErrorNumbers.FB300
                            }
                        ).get_dict()
                    )
                    logger.debug(f"{ErrorNumbers.FB300.value}: {e}")
            elif command == 'secagg':
                self._task_secagg(item)
            else:
                errmess = f'{ErrorNumbers.FB319.value}: "{command}"'
                logger.error(errmess)
                self.send_error(errnum=ErrorNumbers.FB319, extra_msg=errmess)

        self.tasks_queue.task_done()