Requests

to simplify imports from fedbiomed.node.requests

Classes

NodeToNodeController

NodeToNodeController(grpc_controller, overlay_channel, pending_requests, controller_data)

Defines the controller for protocol messages processed by the node to node router

Each message type must have a handler. A handler receives overlay_msg and inner_msg, returns a dict which will be passed as **kwargs to the final() - types must match ! It may receive an asyncio.CancelledError

Each message type optionally has a final. It executes only if the handler() completed without being cancelled It won't be interrupted by an asyncio.CancelledError If no final() exist, no action is taken after cancelling or completing the handler()

async def _HandlerExample(self, overlay_msg: dict, inner_msg: InnerMessage) -> Any: logger.debug("Normal handler code that can be cancelled") return { 'value: 3 } async def _FinalExample(self, value: int) -> None: logger.debug(f"Final code than cannot be cancelled. Received {value}")

Parameters:

Name Type Description Default
grpc_controller GrpcController

object managing the communication with other components

required
overlay_channel OverlayChannel

layer for managing overlay message send and receive

required
pending_requests EventWaitExchange

object for receiving overlay node to node messages

required
controller_data EventWaitExchange

object for sharing data

required
Source code in fedbiomed/node/requests/_n2n_controller.py
def __init__(
        self,
        grpc_controller: GrpcController,
        overlay_channel: OverlayChannel,
        pending_requests: EventWaitExchange,
        controller_data: EventWaitExchange,
) -> None:
    """Constructor of the class.

    Args:
        grpc_controller: object managing the communication with other components
        overlay_channel: layer for managing overlay message send and receive
        pending_requests: object for receiving overlay node to node messages
        controller_data: object for sharing data
    """
    self._grpc_controller = grpc_controller
    self._overlay_channel = overlay_channel
    self._pending_requests = pending_requests
    self._controller_data = controller_data

    self._command2method = {
        KeyRequest.__name__: self._HandlerKeyRequest,
        KeyReply.__name__: self._HandlerKeyReply,
        AdditiveSSharingRequest.__name__: self._AdditiveSSharingRequest,
        AdditiveSSharingReply.__name__: self._HandlerAdditiveSSharingReply,
        ChannelSetupRequest.__name__: self._HandlerChannelRequest,
        ChannelSetupReply.__name__: self._HandlerKeyReply,
    }

    self._command2final = {
        KeyRequest.__name__: self._FinalKeyRequest,
        KeyReply.__name__: self._FinalKeyReply,
        AdditiveSSharingRequest.__name__: self._FinalAdditiveSSharingRequest,
        AdditiveSSharingReply.__name__: self._FinalAdditiveSSharingReply,
        ChannelSetupRequest.__name__: self._FinalKeyRequest,
        ChannelSetupReply.__name__: self._FinalChannelReply,
    }

Functions

final async
final(message, **kwargs)

Calls the final processing for a received message protocol.

This handler is optional, it may not be declared for a message.

Should be called only if the handler completed without being interrupted. Cannot be interrupted, thus should not launch treatment that may hang.

Parameters:

Name Type Description Default
kwargs

Specific arguments for this message final handler

{}
Source code in fedbiomed/node/requests/_n2n_controller.py
async def final(self, message, **kwargs) -> None:
    """Calls the final processing for a received message protocol.

    This handler is optional, it may not be declared for a message.

    Should be called only if the handler completed without being interrupted.
    Cannot be interrupted, thus should not launch treatment that may hang.

    Args:
        kwargs: Specific arguments for this message final handler
    """
    if message in self._command2final:
        # Useful ? Allow omitting some arguments, automatically add them with None value
        expected_args = dict(
            inspect.signature(self._command2final[message]).parameters
        ).keys()
        kwargs.update({arg: None for arg in expected_args if arg not in kwargs})

        await self._command2final[message](**kwargs)
handle async
handle(overlay_msg, inner_msg)

Calls the handler for processing a received message protocol.

If it does not exist, call the default handler to trigger an error.

Main part of the processing which can be interrupted if the processing takes too long.

Parameters:

Name Type Description Default
overlay_msg OverlayMessage

Outer message for node to node communication

required
inner_msg InnerMessage

Unpacked inner message from the outer message

required

Returns:

Type Description
Optional[dict]

A dict of the kwargs expected by the corresponding final() handler for this message. Empty dict or None if no kwargs expected of no final handler

Source code in fedbiomed/node/requests/_n2n_controller.py
async def handle(
    self, overlay_msg: OverlayMessage, inner_msg: InnerMessage
) -> Optional[dict]:
    """Calls the handler for processing a received message protocol.

    If it does not exist, call the default handler to trigger an error.

    Main part of the processing which can be interrupted if the processing takes too long.

    Args:
        overlay_msg: Outer message for node to node communication
        inner_msg: Unpacked inner message from the outer message

    Returns:
        A dict of the `kwargs` expected by the corresponding `final()` handler for this message.
            Empty dict or `None` if no `kwargs` expected of no final handler
    """

    if inner_msg.__name__ in self._command2method:
        return await self._command2method[inner_msg.__name__](
            overlay_msg, inner_msg
        )

    return await self._HandlerDefault(overlay_msg, inner_msg)

NodeToNodeRouter

NodeToNodeRouter(grpc_controller, pending_requests, controller_data)

Bases: _NodeToNodeAsyncRouter

Handles node to node messages received by a node.

Parameters:

Name Type Description Default
grpc_controller GrpcController

object managing the communication with other components

required
pending_requests EventWaitExchange

object for receiving overlay node to node messages

required
controller_data EventWaitExchange

object for sharing data with the controller

required
Source code in fedbiomed/node/requests/_n2n_router.py
def __init__(
    self,
    grpc_controller: GrpcController,
    pending_requests: EventWaitExchange,
    controller_data: EventWaitExchange,
) -> None:
    """Class constructor.

    Args:
        grpc_controller: object managing the communication with other components
        pending_requests: object for receiving overlay node to node messages
        controller_data: object for sharing data with the controller
    """
    super().__init__(grpc_controller, pending_requests, controller_data)

    self._thread = Thread(target=self._run, args=(), daemon=True)

Functions

format_outgoing_overlay
format_outgoing_overlay(message, researcher_id)

Creates an overlay message payload from an inner message.

Serialize, crypt, sign the inner message

Parameters:

Name Type Description Default
message InnerMessage

Inner message to send as overlay payload

required
researcher_id str

unique ID of researcher connecting the nodes

required
setup

False for sending a message over the channel, True for a message setting up the channel

required

Returns:

Type Description
Tuple[bytes, bytes, bytes]

A tuple consisting of: payload for overlay message, salt for inner message encryption key, nonce for the inner message encryption

Source code in fedbiomed/node/requests/_n2n_router.py
def format_outgoing_overlay(self, message: InnerMessage, researcher_id: str) -> \
        Tuple[bytes, bytes, bytes]:
    """Creates an overlay message payload from an inner message.

    Serialize, crypt, sign the inner message

    Args:
        message: Inner message to send as overlay payload
        researcher_id: unique ID of researcher connecting the nodes
        setup: False for sending a message over the channel, True for a message
            setting up the channel

    Returns:
        A tuple consisting of: payload for overlay message, salt for inner message
            encryption key, nonce for the inner message encryption
    """
    future = asyncio.run_coroutine_threadsafe(
        self._overlay_channel.format_outgoing_overlay(message, researcher_id),
        self._loop
    )
    return future.result()
start
start()

Starts the node to node router.

Source code in fedbiomed/node/requests/_n2n_router.py
def start(self) -> None:
    """Starts the node to node router."""
    self._thread.start()
submit
submit(msg)

Submits a received message to the node to node router for processing.

Parameters:

Name Type Description Default
msg OverlayMessage

received message

required
Source code in fedbiomed/node/requests/_n2n_router.py
def submit(self, msg: OverlayMessage) -> None:
    """Submits a received message to the node to node router for processing.

    Args:
        msg: received message
    """

    try:
        asyncio.run_coroutine_threadsafe(self._submit(msg), self._loop)
    except Exception as e:
        logger.critical(
            "Failed submitting message to node to node router. "
            f"Exception: {type(e).__name__}. Error message: {e}"
        )
        raise e

OverlayChannel

OverlayChannel(grpc_client)

Provides asyncio safe layer for sending and receiving overlay messages.

This class is not thread safe, all calls must be done within the same thread (except constructor).

Parameters:

Name Type Description Default
grpc_client GrpcController

object managing the communication with other components

required
Source code in fedbiomed/node/requests/_overlay.py
def __init__(self, grpc_client: GrpcController):
    """Class constructor

    Args:
        grpc_client: object managing the communication with other components
    """
    self._grpc_client = grpc_client

    self._channel_keys = _ChannelKeys()

    # Issue #1142 in "Crypto material management" will optionally replace current default key published with the
    # library and used for each node for setup by a keypair generated securely for each node.
    # Caveat: though encrypted, current implementation does not ensure a secure overlay node2node channel ...

    # Default keys
    default_private_key, default_public_key = self._load_default_n2n_key()
    self._default_n2n_key = _N2nKeysEntry(
        local_key=default_private_key,
        ready_event=asyncio.Event(),
        distant_key=default_public_key
    )

Functions

format_incoming_overlay async
format_incoming_overlay(overlay_msg)

Retrieves inner message from overlay message payload.

Check signature, decrypt, deserialize the inner message

Parameters:

Name Type Description Default
overlay_msg OverlayMessage

Overlay message.

required

Returns:

Type Description
InnerMessage

Inner message retrieved from overlay payload

Raises:

Type Description
FedbiomedNodeToNodeError

bad message type

FedbiomedNodeToNodeError

cannot decrypt payload

FedbiomedNodeToNodeError

bad inner payload format

FedbiomedNodeToNodeError

cannot verify payload integrity

FedbiomedNodeToNodeError

sender/dest node ID don't match in overlay and inner message

Source code in fedbiomed/node/requests/_overlay.py
async def format_incoming_overlay(self, overlay_msg: OverlayMessage) -> InnerMessage:
    """Retrieves inner message from overlay message payload.

    Check signature, decrypt, deserialize the inner message

    Args:
        overlay_msg: Overlay message.

    Returns:
        Inner message retrieved from overlay payload

    Raises:
        FedbiomedNodeToNodeError: bad message type
        FedbiomedNodeToNodeError: cannot decrypt payload
        FedbiomedNodeToNodeError: bad inner payload format
        FedbiomedNodeToNodeError: cannot verify payload integrity
        FedbiomedNodeToNodeError: sender/dest node ID don't match in overlay and inner message
    """
    # robustify from developer error (try to encapsulate a bad message type)
    if not isinstance(overlay_msg, OverlayMessage):
        raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: not an overlay message')


    _, distant_node_public_key, derived_key = await self._setup_use_channel_keys(
        overlay_msg.node_id,
        overlay_msg.researcher_id,
        overlay_msg.setup,
        overlay_msg.salt,
    )

    # decrypt outer payload
    try:
        decryptor = Cipher(
            algorithms.ChaCha20(derived_key, overlay_msg.nonce),
            mode=None,
            backend=default_backend()
        ).decryptor()
        decrypted_serial = decryptor.update(overlay_msg.overlay) + decryptor.finalize()
    except ValueError as e:
        raise FedbiomedNodeToNodeError(
            f'{ErrorNumbers.FB324.value}: cannot decrypt payload: {e}') from e

    decrypted = Serializer.loads(decrypted_serial)
    if not isinstance(decrypted, dict) or not set(('message', 'signature')) <= set(decrypted):
        raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: bad inner payload format '
                                       f"in received message")

    # verify inner payload
    try:
        distant_node_public_key.public_key.verify(
            decrypted['signature'],
            Serializer.dumps(decrypted['message']),
            ec.ECDSA(hashes.SHA256()),
        )
    except InvalidSignature as e:
        raise FedbiomedNodeToNodeError(
            f'{ErrorNumbers.FB324.value}: cannot verify payload integrity: {e}') from e

    inner_msg = Message.from_dict(decrypted['message'])

    # Node ID mismatch reveals either (1) malicious peer forging message (2) application internal error
    if inner_msg.node_id != overlay_msg.node_id:
        raise FedbiomedNodeToNodeError(
            f'{ErrorNumbers.FB324.value}: Source node ID mismatch for overlay message '
            f'inner_node_id={inner_msg.node_id} overlay_node_id={overlay_msg.node_id}')
    if inner_msg.dest_node_id != overlay_msg.dest_node_id:
        raise FedbiomedNodeToNodeError(
            f'{ErrorNumbers.FB324.value}: Destination node ID mismatch for overlay message '
            f'inner_node_id={inner_msg.dest_node_id} overlay_node_id={overlay_msg.dest_node_id}')

    return inner_msg
format_outgoing_overlay async
format_outgoing_overlay(message, researcher_id, setup=False)

Creates an overlay message payload from an inner message.

Serialize, crypt, sign the inner message

Parameters:

Name Type Description Default
message InnerMessage

Inner message to send as overlay payload

required
researcher_id str

unique ID of researcher connecting the nodes

required
setup bool

False for sending a message over the channel, True for a message setting up the channel

False

Returns:

Type Description
Tuple[bytes, bytes, bytes]

A tuple consisting of: payload for overlay message, salt for inner message encryption key, nonce for the inner message encryption

Raises:

Type Description
FedbiomedNodeToNodeError

bad message type

Source code in fedbiomed/node/requests/_overlay.py
async def format_outgoing_overlay(self, message: InnerMessage, researcher_id: str, setup: bool = False) -> \
        Tuple[bytes, bytes, bytes]:
    """Creates an overlay message payload from an inner message.

    Serialize, crypt, sign the inner message

    Args:
        message: Inner message to send as overlay payload
        researcher_id: unique ID of researcher connecting the nodes
        setup: False for sending a message over the channel, True for a message
            setting up the channel

    Returns:
        A tuple consisting of: payload for overlay message, salt for inner message
            encryption key, nonce for the inner message encryption

    Raises:
        FedbiomedNodeToNodeError: bad message type
    """
    # robustify from developer error (try to encapsulate a bad message type)
    if not isinstance(message, InnerMessage):
        raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: not an inner message')

    # Value for salting the symmetric encryption key generation for this message
    # Adjust length of `salt` depending on algorithm
    salt = secrets.token_bytes(32)

    # Value for noncing the symmetric encryption for this message
    # This is normally not needed as we generate different key for each message due to `salt`
    # but provides another layer of security
    # Adjust the length of `nonce` depending on algotrithm
    nonce = secrets.token_bytes(16)

    local_node_private_key, _, derived_key = await self._setup_use_channel_keys(
        message.get_param('dest_node_id'),
        researcher_id,
        setup,
        salt
    )

    # consider encrypt-sign([message,node_id]) or other see
    # https://theworld.com/~dtd/sign_encrypt/sign_encrypt7.html

    # sign inner payload
    signed = Serializer.dumps({
        'message': message.to_dict(),
        'signature': local_node_private_key.private_key.sign(
            Serializer.dumps(message.to_dict()),
            ec.ECDSA(hashes.SHA256()),
        )
    })

    encryptor = Cipher(
        algorithms.ChaCha20(derived_key, nonce),
        mode=None,
        backend=default_backend()
    ).encryptor()
    return encryptor.update(signed) + encryptor.finalize(), salt, nonce
get_local_public_key async
get_local_public_key(distant_node_id)

Gets local public key for peering with a given distant peer node

Parameters:

Name Type Description Default
distant_node_id str

unique ID of the peer node

required

Returns:

Type Description
bytes

Local node's public key ID for this peer node

Source code in fedbiomed/node/requests/_overlay.py
async def get_local_public_key(self, distant_node_id: str) -> bytes:
    """Gets local public key for peering with a given distant peer node

    Args:
        distant_node_id: unique ID of the peer node

    Returns:
        Local node's public key ID for this peer node
    """
    return await self._channel_keys.get_local_public_key(distant_node_id)
send_node_setup async
send_node_setup(researcher_id, node, message)

Send a channel setup message to another node using overlay communications and wait for its reply.

Parameters:

Name Type Description Default
researcher_id str

unique ID of researcher connecting the nodes

required
node str

unique node ID of the destination node

required
message InnerMessage

inner message for the destination node

required

Returns:

Type Description
bool

True if channel is ready, False if channel not ready after timeout

Source code in fedbiomed/node/requests/_overlay.py
async def send_node_setup(
        self,
        researcher_id: str,
        node: str,
        message: InnerMessage,
) -> bool:
    """Send a channel setup message to another node using overlay communications and wait for its reply.

        Args:
            researcher_id: unique ID of researcher connecting the nodes
            node: unique node ID of the destination node
            message: inner message for the destination node

        Returns:
            True if channel is ready, False if channel not ready after timeout
    """
    overlay, salt, nonce = await self.format_outgoing_overlay(message, researcher_id, True)
    message_overlay = OverlayMessage(
        researcher_id=researcher_id,
        node_id=environ['NODE_ID'],
        dest_node_id=node,
        overlay=overlay,
        setup=True,
        salt=salt,
        nonce=nonce,
    )

    self._grpc_client.send(message_overlay)

    return await self._channel_keys.wait_ready_channel(node)
set_distant_key async
set_distant_key(distant_node_id, public_key_pem, request_id)

Sets distant (public) key of a channel for peering with a given distant peer node.

Distant key is not set if no channel exists for that distant_node_id or if the request_id does not match a pending request

Parameters:

Name Type Description Default
distant_node_id str

unique ID of the peer node

required
public_key_pem bytes

public key in PEM format

required
request_id str

unique ID of the request

required

Returns:

Type Description
bool

True if the distant key was set, False if it was not set

Source code in fedbiomed/node/requests/_overlay.py
async def set_distant_key(
        self,
        distant_node_id: str,
        public_key_pem: bytes,
        request_id: str,
) -> bool:
    """Sets distant (public) key of a channel for peering with a given
        distant peer node.

    Distant key is not set if no channel exists for that `distant_node_id`
        or if the `request_id` does not match a pending request

    Args:
        distant_node_id: unique ID of the peer node
        public_key_pem: public key in PEM format
        request_id: unique ID of the request

    Returns:
        True if the distant key was set, False if it was not set
    """
    return await self._channel_keys.set_distant_key(
        distant_node_id,
        public_key_pem,
        request_id
    )

Functions

send_nodes

send_nodes(n2n_router, grpc_client, pending_requests, researcher_id, nodes, messages, raise_if_not_all_received=False)

Send message to some other nodes using overlay communications and wait for their replies.

Parameters:

Name Type Description Default
n2n_router NodeToNodeRouter

object managing node to node messages

required
grpc_client GrpcController

object managing the communication with other components

required
pending_requests EventWaitExchange

object for receiving overlay node to node reply message

required
researcher_id str

unique ID of researcher connecting the nodes

required
nodes List[str]

list of node IDs of the destination nodes

required
messages List[InnerMessage]

list of the inner messages for the destination nodes

required
raise_if_not_all_received bool

if True, raise exception if not all answers from nodes were received. Default to False, return with status to False when not all answers from nodes were received.

False

Returns: status: True if all messages are received replies: List of replies from each node.

Raises:

Type Description
FedbiomedNodeToNodeError

not all answers received and raise_if_not_all_received is True

Source code in fedbiomed/node/requests/_send_nodes.py
def send_nodes(
        n2n_router: NodeToNodeRouter,
        grpc_client: GrpcController,
        pending_requests: EventWaitExchange,
        researcher_id: str,
        nodes: List[str],
        messages: List[InnerMessage],
        raise_if_not_all_received: bool = False,
) -> Tuple[bool, List[Any]]:
    """Send message to some other nodes using overlay communications and wait for their replies.

        Args:
            n2n_router: object managing node to node messages
            grpc_client: object managing the communication with other components
            pending_requests: object for receiving overlay node to node reply message
            researcher_id: unique ID of researcher connecting the nodes
            nodes: list of node IDs of the destination nodes
            messages: list of the inner messages for the destination nodes
            raise_if_not_all_received: if True, raise exception if not all answers from nodes were received.
                Default to False, return with `status` to False when not all answers from nodes were received.
        Returns:
            status: True if all messages are received
            replies: List of replies from each node.

        Raises:
            FedbiomedNodeToNodeError: not all answers received and raise_if_not_all_received is True
    """
    request_ids = []

    for node, message in zip(nodes, messages):
        overlay, salt, nonce = n2n_router.format_outgoing_overlay(message, researcher_id)
        message_overlay = OverlayMessage(
            researcher_id=researcher_id,
            node_id=environ['NODE_ID'],
            dest_node_id=node,
            overlay=overlay,
            setup=False,
            salt=salt,
            nonce=nonce,
        )

        grpc_client.send(message_overlay)

        if isinstance(message, InnerRequestReply):
            request_ids += [message.get_param('request_id')]

    all_received, replies = pending_requests.wait(request_ids, TIMEOUT_NODE_TO_NODE_REQUEST)
    if not all_received and raise_if_not_all_received:
        nodes_no_answer = set(nodes) - set(m.node_id for m in replies)
        raise FedbiomedNodeToNodeError(
            f"{ErrorNumbers.FB318.value}: Some nodes did not answer request "
            f"{nodes_no_answer}"
        )

    return all_received, replies