Requests

to simplify imports from fedbiomed.node.requests

Classes

NodeToNodeController

NodeToNodeController(grpc_controller, pending_requests, controller_data)

Defines the controller for protocol messages processed by the node to node router

Parameters:

Name Type Description Default
grpc_controller GrpcController

object managing the communication with other components

required
pending_requests EventWaitExchange

object for receiving overlay node to node messages

required
controller_data EventWaitExchange

object for sharing data

required
Source code in fedbiomed/node/requests/_n2n_controller.py
def __init__(
        self,
        grpc_controller: GrpcController,
        pending_requests: EventWaitExchange,
        controller_data: EventWaitExchange,
) -> None:
    """Constructor of the class.

    Args:
        grpc_controller: object managing the communication with other components
        pending_requests: object for receiving overlay node to node messages
        controller_data: object for sharing data
    """
    self._grpc_controller = grpc_controller
    self._pending_requests = pending_requests
    self._controller_data = controller_data

    self._command2method = {
        'key-request': self._HandlerKeyRequest,
        'key-reply': self._HandlerKeyReply,
        # 'dummy-inner': self._HandlerDummyInner,
    }

    self._command2final = {
        'key-request': self._FinalKeyRequest,
        'key-reply': self._FinalKeyReply,
        # 'dummy-inner': self._FinalDummyInner,
    }

Functions

final async
final(command, **kwargs)

Calls the final processing for a received message protocol.

This handler is optional, it may not be declared for a message.

Should be called only if the handler completed without being interrupted. Cannot be interrupted, thus should not launch treatment that may hang.

Parameters:

Name Type Description Default
kwargs

Specific arguments for this message final handler

{}
Source code in fedbiomed/node/requests/_n2n_controller.py
async def final(self, command, **kwargs) -> None:
    """Calls the final processing for a received message protocol.

    This handler is optional, it may not be declared for a message.

    Should be called only if the handler completed without being interrupted.
    Cannot be interrupted, thus should not launch treatment that may hang.

    Args:
        kwargs: Specific arguments for this message final handler
    """
    if command in self._command2final:
        # Useful ? Allow omitting some arguments, automatically add them with None value
        expected_args = dict(inspect.signature(self._command2final[command]).parameters).keys()
        kwargs.update({arg: None for arg in expected_args if arg not in kwargs})

        await self._command2final[command](**kwargs)
handle async
handle(overlay_msg, inner_msg)

Calls the handler for processing a received message protocol.

If it does not exist, call the default handler to trigger an error.

Main part of the processing which can be interrupted if the processing takes too long.

Parameters:

Name Type Description Default
overlay_msg dict

Outer message for node to node communication

required
inner_msg InnerMessage

Unpacked inner message from the outer message

required

Returns:

Type Description
Optional[dict]

A dict of the kwargs expected by the corresponding final() handler for this message. Empty dict or None if no kwargs expected of no final handler

Source code in fedbiomed/node/requests/_n2n_controller.py
async def handle(self, overlay_msg: dict, inner_msg: InnerMessage) -> Optional[dict]:
    """Calls the handler for processing a received message protocol.

    If it does not exist, call the default handler to trigger an error.

    Main part of the processing which can be interrupted if the processing takes too long.

    Args:
        overlay_msg: Outer message for node to node communication
        inner_msg: Unpacked inner message from the outer message

    Returns:
        A dict of the `kwargs` expected by the corresponding `final()` handler for this message.
            Empty dict or `None` if no `kwargs` expected of no final handler
    """

    if inner_msg.get_param('command') in self._command2method:
        return await self._command2method[inner_msg.get_param('command')](overlay_msg, inner_msg)
    else:
        return await self._HandlerDefault(overlay_msg, inner_msg)

NodeToNodeRouter

NodeToNodeRouter(grpc_controller, pending_requests, controller_data)

Bases: _NodeToNodeAsyncRouter

Handles node to node messages received by a node.

Parameters:

Name Type Description Default
grpc_controller GrpcController

object managing the communication with other components

required
pending_requests EventWaitExchange

object for receiving overlay node to node messages

required
controller_data EventWaitExchange

object for sharing data with the controller

required
Source code in fedbiomed/node/requests/_n2n_router.py
def __init__(
        self,
        grpc_controller: GrpcController,
        pending_requests: EventWaitExchange,
        controller_data: EventWaitExchange
) -> None:
    """Class constructor.

    Args:
        grpc_controller: object managing the communication with other components
        pending_requests: object for receiving overlay node to node messages
        controller_data: object for sharing data with the controller
    """
    super().__init__(grpc_controller, pending_requests, controller_data)

    self._thread = Thread(target=self._run, args=(), daemon=True)

Functions

start
start()

Starts the node to node router.

Source code in fedbiomed/node/requests/_n2n_router.py
def start(self) -> None:
    """Starts the node to node router."""
    self._thread.start()
submit
submit(msg)

Submits a received message to the node to node router for processing.

Parameters:

Name Type Description Default
msg dict

received message

required

Raises:

Type Description
FedbiomedNodeToNodeError

bad message type or value.

Source code in fedbiomed/node/requests/_n2n_router.py
def submit(self, msg: dict) -> None:
    """Submits a received message to the node to node router for processing.

    Args:
        msg: received message

    Raises:
        FedbiomedNodeToNodeError: bad message type or value.
    """
    # node to node router currently handles only node to node messages
    # Conceived to be later extended for other messages processing, during node redesign
    if msg['command'] != 'overlay':
        raise FedbiomedNodeToNodeError(
            f'{ErrorNumbers.FB324.value}: node to node router needs a node to node message')

    try:
        asyncio.run_coroutine_threadsafe(self._submit(msg), self._loop)
    except Exception as e:
        logger.critical(
            f"Failed submitting message to node to node router. Exception: {type(e).__name__}. Error message: {e}")
        raise e

Functions

format_incoming_overlay

format_incoming_overlay(payload)

Retrieves inner message from overlay message payload.

Check signature, decrypt, deserialize the inner message

Parameters:

Name Type Description Default
payload List[bytes]

Payload of overlay message.

required

Returns:

Type Description
InnerMessage

Inner message retrieved from overlay payload

Raises:

Type Description
FedbiomedNodeToNodeError

key is too short

FedbiomedNodeToNodeError

cannot decrypt payload

FedbiomedNodeToNodeError

bad payload format

FedbiomedNodeToNodeError

cannot verify payload integrity

Source code in fedbiomed/node/requests/_overlay.py
def format_incoming_overlay(payload: List[bytes]) -> InnerMessage:
    """Retrieves inner message from overlay message payload.

    Check signature, decrypt, deserialize the inner message

    Args:
        payload: Payload of overlay message.

    Returns:
        Inner message retrieved from overlay payload

    Raises:
        FedbiomedNodeToNodeError: key is too short
        FedbiomedNodeToNodeError: cannot decrypt payload
        FedbiomedNodeToNodeError: bad payload format
        FedbiomedNodeToNodeError: cannot verify payload integrity
    """
    # check payload types (not yet done by message type checks, only checks it's a list)
    if not all(isinstance(p, bytes) for p in payload):
        raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: bad type for node to node payload')

    # decode and ensure only node2node (inner) messages are received

    local_node_private_key = _default_n2n_key
    distant_node_public_key = _default_n2n_key.public_key()

    if _CHUNK_SIZE * 8 > min(local_node_private_key.key_size, distant_node_public_key.key_size):
        raise FedbiomedNodeToNodeError(
            f'{ErrorNumbers.FB324.value}: cannot use key shorter than {_CHUNK_SIZE} bits')

    # decrypt outer payload
    # caveat: decryption can be long for long messages (~10s for 1MB cleartext message)
    try:
        decrypted_chunks = [
            local_node_private_key.decrypt(
                chunk,
                padding.OAEP(
                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )
            for chunk in payload
        ]
    except ValueError as e:
        raise FedbiomedNodeToNodeError(
            f'{ErrorNumbers.FB324.value}: cannot decrypt payload: {e}') from e

    decrypted = Serializer.loads(bytes().join(decrypted_chunks))

    if not isinstance(decrypted, dict) or not set(('message', 'signature')) <= set(decrypted):
        raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: bad inner payload format '
                                       f"in received message")

    # verify inner payload
    try:
        distant_node_public_key.verify(
            decrypted['signature'],
            Serializer.dumps(decrypted['message']),
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
    except InvalidSignature as e:
        raise FedbiomedNodeToNodeError(
            f'{ErrorNumbers.FB324.value}: cannot verify payload integrity: {e}') from e

    return NodeToNodeMessages.format_incoming_message(decrypted['message'])

format_outgoing_overlay

format_outgoing_overlay(message)

Creates an overlay message payload from an inner message.

Serialize, crypt, sign the inner message

Parameters:

Name Type Description Default
message Message

Inner message to send as overlay payload

required

Returns:

Type Description
List[bytes]

Payload for overlay message

Raises:

Type Description
FedbiomedNodeToNodeError

key is too short

FedbiomedNodeToNodeError

bad message type

Source code in fedbiomed/node/requests/_overlay.py
def format_outgoing_overlay(message: Message) -> List[bytes]:
    """Creates an overlay message payload from an inner message.

    Serialize, crypt, sign the inner message

    Args:
        message: Inner message to send as overlay payload

    Returns:
        Payload for overlay message

    Raises:
        FedbiomedNodeToNodeError: key is too short
        FedbiomedNodeToNodeError: bad message type
    """
    # robustify from developer error (try to encapsulate a bad message type)
    if not isinstance(message, InnerMessage):
        raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: not an inner message')

    # consider encrypt-sign([message,node_id]) or other see
    # https://theworld.com/~dtd/sign_encrypt/sign_encrypt7.html

    local_node_private_key = _default_n2n_key
    distant_node_public_key = _default_n2n_key.public_key()

    if _CHUNK_SIZE * 8 > min(local_node_private_key.key_size, distant_node_public_key.key_size):
        raise FedbiomedNodeToNodeError(
            f'{ErrorNumbers.FB324.value}: cannot use key shorter than {_CHUNK_SIZE} bits')

    # sign inner payload
    signed = Serializer.dumps({
        'message': message.get_dict(),
        'signature': local_node_private_key.sign(
            Serializer.dumps(message.get_dict()),
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()

        )
    })

    # split to chunks and encrypt
    return [
        distant_node_public_key.encrypt(
            signed[i:i + _CHUNK_SIZE],
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
        for i in range(0, len(signed), _CHUNK_SIZE)
    ]

send_nodes

send_nodes(grpc_client, pending_requests, researcher_id, nodes, messages)

Send message to some other nodes using overlay communications.

Parameters:

Name Type Description Default
grpc_client GrpcController

object managing the communication with other components

required
pending_requests EventWaitExchange

object for receiving overlay node to node reply messages

required
researcher_id str

unique ID of researcher connecting the nodes

required
nodes List[str]

list of node IDs of the destination nodes

required
messages List[InnerMessage]

list of the inner messages for the destination nodes

required

Returns: status: True if all messages are received replies: List of replies from each node.

Source code in fedbiomed/node/requests/_overlay.py
def send_nodes(
        grpc_client: GrpcController,
        pending_requests: EventWaitExchange,
        researcher_id: str,
        nodes: List[str],
        messages: List[InnerMessage],
) -> Tuple[bool, List[Any]]:
    """Send message to some other nodes using overlay communications.

        Args:
            grpc_client: object managing the communication with other components
            pending_requests: object for receiving overlay node to node reply messages
            researcher_id: unique ID of researcher connecting the nodes
            nodes: list of node IDs of the destination nodes
            messages: list of the inner messages for the destination nodes
        Returns:
            status: True if all messages are received
            replies: List of replies from each node.
    """
    request_ids = []

    for node, message in zip(nodes, messages):
        message_overlay = NodeMessages.format_outgoing_message(
            {
                'researcher_id': researcher_id,
                'node_id': environ['NODE_ID'],
                'dest_node_id': node,
                'overlay': format_outgoing_overlay(message),
                'command': 'overlay'
            })

        grpc_client.send(message_overlay)

        if isinstance(message, InnerRequestReply):
            request_ids += [message.get_param('request_id')]

    return pending_requests.wait(request_ids, TIMEOUT_NODE_TO_NODE_REQUEST)