to simplify imports from fedbiomed.node.requests
Classes
NodeToNodeController
NodeToNodeController(grpc_controller, overlay_channel, pending_requests, controller_data)
Defines the controller for protocol messages processed by the node to node router
Each message type must have a handler. A handler receives overlay_msg
and inner_msg
, returns a dict which will be passed as **kwargs
to the final()
- types must match ! It may receive an asyncio.CancelledError
Each message type optionally has a final. It executes only if the handler()
completed without being cancelled It won't be interrupted by an asyncio.CancelledError If no final()
exist, no action is taken after cancelling or completing the handler()
async def _HandlerExample(self, overlay_msg: dict, inner_msg: InnerMessage) -> Any: logger.debug("Normal handler code that can be cancelled") return { 'value: 3 } async def _FinalExample(self, value: int) -> None: logger.debug(f"Final code than cannot be cancelled. Received {value}")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
grpc_controller | GrpcController | object managing the communication with other components | required |
overlay_channel | OverlayChannel | layer for managing overlay message send and receive | required |
pending_requests | EventWaitExchange | object for receiving overlay node to node messages | required |
controller_data | EventWaitExchange | object for sharing data | required |
Source code in fedbiomed/node/requests/_n2n_controller.py
def __init__(
self,
grpc_controller: GrpcController,
overlay_channel: OverlayChannel,
pending_requests: EventWaitExchange,
controller_data: EventWaitExchange,
) -> None:
"""Constructor of the class.
Args:
grpc_controller: object managing the communication with other components
overlay_channel: layer for managing overlay message send and receive
pending_requests: object for receiving overlay node to node messages
controller_data: object for sharing data
"""
self._grpc_controller = grpc_controller
self._overlay_channel = overlay_channel
self._pending_requests = pending_requests
self._controller_data = controller_data
self._command2method = {
KeyRequest.__name__: self._HandlerKeyRequest,
KeyReply.__name__: self._HandlerKeyReply,
AdditiveSSharingRequest.__name__: self._AdditiveSSharingRequest,
AdditiveSSharingReply.__name__: self._HandlerAdditiveSSharingReply,
ChannelSetupRequest.__name__: self._HandlerChannelRequest,
ChannelSetupReply.__name__: self._HandlerKeyReply,
}
self._command2final = {
KeyRequest.__name__: self._FinalKeyRequest,
KeyReply.__name__: self._FinalKeyReply,
AdditiveSSharingRequest.__name__: self._FinalAdditiveSSharingRequest,
AdditiveSSharingReply.__name__: self._FinalAdditiveSSharingReply,
ChannelSetupRequest.__name__: self._FinalKeyRequest,
ChannelSetupReply.__name__: self._FinalChannelReply,
}
Functions
final async
final(message, **kwargs)
Calls the final processing for a received message protocol.
This handler is optional, it may not be declared for a message.
Should be called only if the handler completed without being interrupted. Cannot be interrupted, thus should not launch treatment that may hang.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs | Specific arguments for this message final handler | {} |
Source code in fedbiomed/node/requests/_n2n_controller.py
async def final(self, message, **kwargs) -> None:
"""Calls the final processing for a received message protocol.
This handler is optional, it may not be declared for a message.
Should be called only if the handler completed without being interrupted.
Cannot be interrupted, thus should not launch treatment that may hang.
Args:
kwargs: Specific arguments for this message final handler
"""
if message in self._command2final:
# Useful ? Allow omitting some arguments, automatically add them with None value
expected_args = dict(
inspect.signature(self._command2final[message]).parameters
).keys()
kwargs.update({arg: None for arg in expected_args if arg not in kwargs})
await self._command2final[message](**kwargs)
handle async
handle(overlay_msg, inner_msg)
Calls the handler for processing a received message protocol.
If it does not exist, call the default handler to trigger an error.
Main part of the processing which can be interrupted if the processing takes too long.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
overlay_msg | OverlayMessage | Outer message for node to node communication | required |
inner_msg | InnerMessage | Unpacked inner message from the outer message | required |
Returns:
Type | Description |
---|---|
Optional[dict] | A dict of the |
Source code in fedbiomed/node/requests/_n2n_controller.py
async def handle(
self, overlay_msg: OverlayMessage, inner_msg: InnerMessage
) -> Optional[dict]:
"""Calls the handler for processing a received message protocol.
If it does not exist, call the default handler to trigger an error.
Main part of the processing which can be interrupted if the processing takes too long.
Args:
overlay_msg: Outer message for node to node communication
inner_msg: Unpacked inner message from the outer message
Returns:
A dict of the `kwargs` expected by the corresponding `final()` handler for this message.
Empty dict or `None` if no `kwargs` expected of no final handler
"""
if inner_msg.__name__ in self._command2method:
return await self._command2method[inner_msg.__name__](
overlay_msg, inner_msg
)
return await self._HandlerDefault(overlay_msg, inner_msg)
NodeToNodeRouter
NodeToNodeRouter(grpc_controller, pending_requests, controller_data)
Bases: _NodeToNodeAsyncRouter
Handles node to node messages received by a node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
grpc_controller | GrpcController | object managing the communication with other components | required |
pending_requests | EventWaitExchange | object for receiving overlay node to node messages | required |
controller_data | EventWaitExchange | object for sharing data with the controller | required |
Source code in fedbiomed/node/requests/_n2n_router.py
def __init__(
self,
grpc_controller: GrpcController,
pending_requests: EventWaitExchange,
controller_data: EventWaitExchange,
) -> None:
"""Class constructor.
Args:
grpc_controller: object managing the communication with other components
pending_requests: object for receiving overlay node to node messages
controller_data: object for sharing data with the controller
"""
super().__init__(grpc_controller, pending_requests, controller_data)
self._thread = Thread(target=self._run, args=(), daemon=True)
Functions
format_outgoing_overlay
format_outgoing_overlay(message, researcher_id)
Creates an overlay message payload from an inner message.
Serialize, crypt, sign the inner message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | InnerMessage | Inner message to send as overlay payload | required |
researcher_id | str | unique ID of researcher connecting the nodes | required |
setup | False for sending a message over the channel, True for a message setting up the channel | required |
Returns:
Type | Description |
---|---|
Tuple[bytes, bytes, bytes] | A tuple consisting of: payload for overlay message, salt for inner message encryption key, nonce for the inner message encryption |
Source code in fedbiomed/node/requests/_n2n_router.py
def format_outgoing_overlay(self, message: InnerMessage, researcher_id: str) -> \
Tuple[bytes, bytes, bytes]:
"""Creates an overlay message payload from an inner message.
Serialize, crypt, sign the inner message
Args:
message: Inner message to send as overlay payload
researcher_id: unique ID of researcher connecting the nodes
setup: False for sending a message over the channel, True for a message
setting up the channel
Returns:
A tuple consisting of: payload for overlay message, salt for inner message
encryption key, nonce for the inner message encryption
"""
future = asyncio.run_coroutine_threadsafe(
self._overlay_channel.format_outgoing_overlay(message, researcher_id),
self._loop
)
return future.result()
start
start()
Starts the node to node router.
Source code in fedbiomed/node/requests/_n2n_router.py
def start(self) -> None:
"""Starts the node to node router."""
self._thread.start()
submit
submit(msg)
Submits a received message to the node to node router for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg | OverlayMessage | received message | required |
Source code in fedbiomed/node/requests/_n2n_router.py
def submit(self, msg: OverlayMessage) -> None:
"""Submits a received message to the node to node router for processing.
Args:
msg: received message
"""
try:
asyncio.run_coroutine_threadsafe(self._submit(msg), self._loop)
except Exception as e:
logger.critical(
"Failed submitting message to node to node router. "
f"Exception: {type(e).__name__}. Error message: {e}"
)
raise e
OverlayChannel
OverlayChannel(grpc_client)
Provides asyncio safe layer for sending and receiving overlay messages.
This class is not thread safe, all calls must be done within the same thread (except constructor).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
grpc_client | GrpcController | object managing the communication with other components | required |
Source code in fedbiomed/node/requests/_overlay.py
def __init__(self, grpc_client: GrpcController):
"""Class constructor
Args:
grpc_client: object managing the communication with other components
"""
self._grpc_client = grpc_client
self._channel_keys = _ChannelKeys()
# Issue #1142 in "Crypto material management" will optionally replace current default key published with the
# library and used for each node for setup by a keypair generated securely for each node.
# Caveat: though encrypted, current implementation does not ensure a secure overlay node2node channel ...
# Default keys
default_private_key, default_public_key = self._load_default_n2n_key()
self._default_n2n_key = _N2nKeysEntry(
local_key=default_private_key,
ready_event=asyncio.Event(),
distant_key=default_public_key
)
Functions
format_incoming_overlay async
format_incoming_overlay(overlay_msg)
Retrieves inner message from overlay message payload.
Check signature, decrypt, deserialize the inner message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
overlay_msg | OverlayMessage | Overlay message. | required |
Returns:
Type | Description |
---|---|
InnerMessage | Inner message retrieved from overlay payload |
Raises:
Type | Description |
---|---|
FedbiomedNodeToNodeError | bad message type |
FedbiomedNodeToNodeError | cannot decrypt payload |
FedbiomedNodeToNodeError | bad inner payload format |
FedbiomedNodeToNodeError | cannot verify payload integrity |
FedbiomedNodeToNodeError | sender/dest node ID don't match in overlay and inner message |
Source code in fedbiomed/node/requests/_overlay.py
async def format_incoming_overlay(self, overlay_msg: OverlayMessage) -> InnerMessage:
"""Retrieves inner message from overlay message payload.
Check signature, decrypt, deserialize the inner message
Args:
overlay_msg: Overlay message.
Returns:
Inner message retrieved from overlay payload
Raises:
FedbiomedNodeToNodeError: bad message type
FedbiomedNodeToNodeError: cannot decrypt payload
FedbiomedNodeToNodeError: bad inner payload format
FedbiomedNodeToNodeError: cannot verify payload integrity
FedbiomedNodeToNodeError: sender/dest node ID don't match in overlay and inner message
"""
# robustify from developer error (try to encapsulate a bad message type)
if not isinstance(overlay_msg, OverlayMessage):
raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: not an overlay message')
_, distant_node_public_key, derived_key = await self._setup_use_channel_keys(
overlay_msg.node_id,
overlay_msg.researcher_id,
overlay_msg.setup,
overlay_msg.salt,
)
# decrypt outer payload
try:
decryptor = Cipher(
algorithms.ChaCha20(derived_key, overlay_msg.nonce),
mode=None,
backend=default_backend()
).decryptor()
decrypted_serial = decryptor.update(overlay_msg.overlay) + decryptor.finalize()
except ValueError as e:
raise FedbiomedNodeToNodeError(
f'{ErrorNumbers.FB324.value}: cannot decrypt payload: {e}') from e
decrypted = Serializer.loads(decrypted_serial)
if not isinstance(decrypted, dict) or not set(('message', 'signature')) <= set(decrypted):
raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: bad inner payload format '
f"in received message")
# verify inner payload
try:
distant_node_public_key.public_key.verify(
decrypted['signature'],
Serializer.dumps(decrypted['message']),
ec.ECDSA(hashes.SHA256()),
)
except InvalidSignature as e:
raise FedbiomedNodeToNodeError(
f'{ErrorNumbers.FB324.value}: cannot verify payload integrity: {e}') from e
inner_msg = Message.from_dict(decrypted['message'])
# Node ID mismatch reveals either (1) malicious peer forging message (2) application internal error
if inner_msg.node_id != overlay_msg.node_id:
raise FedbiomedNodeToNodeError(
f'{ErrorNumbers.FB324.value}: Source node ID mismatch for overlay message '
f'inner_node_id={inner_msg.node_id} overlay_node_id={overlay_msg.node_id}')
if inner_msg.dest_node_id != overlay_msg.dest_node_id:
raise FedbiomedNodeToNodeError(
f'{ErrorNumbers.FB324.value}: Destination node ID mismatch for overlay message '
f'inner_node_id={inner_msg.dest_node_id} overlay_node_id={overlay_msg.dest_node_id}')
return inner_msg
format_outgoing_overlay async
format_outgoing_overlay(message, researcher_id, setup=False)
Creates an overlay message payload from an inner message.
Serialize, crypt, sign the inner message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | InnerMessage | Inner message to send as overlay payload | required |
researcher_id | str | unique ID of researcher connecting the nodes | required |
setup | bool | False for sending a message over the channel, True for a message setting up the channel | False |
Returns:
Type | Description |
---|---|
Tuple[bytes, bytes, bytes] | A tuple consisting of: payload for overlay message, salt for inner message encryption key, nonce for the inner message encryption |
Raises:
Type | Description |
---|---|
FedbiomedNodeToNodeError | bad message type |
Source code in fedbiomed/node/requests/_overlay.py
async def format_outgoing_overlay(self, message: InnerMessage, researcher_id: str, setup: bool = False) -> \
Tuple[bytes, bytes, bytes]:
"""Creates an overlay message payload from an inner message.
Serialize, crypt, sign the inner message
Args:
message: Inner message to send as overlay payload
researcher_id: unique ID of researcher connecting the nodes
setup: False for sending a message over the channel, True for a message
setting up the channel
Returns:
A tuple consisting of: payload for overlay message, salt for inner message
encryption key, nonce for the inner message encryption
Raises:
FedbiomedNodeToNodeError: bad message type
"""
# robustify from developer error (try to encapsulate a bad message type)
if not isinstance(message, InnerMessage):
raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: not an inner message')
# Value for salting the symmetric encryption key generation for this message
# Adjust length of `salt` depending on algorithm
salt = secrets.token_bytes(32)
# Value for noncing the symmetric encryption for this message
# This is normally not needed as we generate different key for each message due to `salt`
# but provides another layer of security
# Adjust the length of `nonce` depending on algotrithm
nonce = secrets.token_bytes(16)
local_node_private_key, _, derived_key = await self._setup_use_channel_keys(
message.get_param('dest_node_id'),
researcher_id,
setup,
salt
)
# consider encrypt-sign([message,node_id]) or other see
# https://theworld.com/~dtd/sign_encrypt/sign_encrypt7.html
# sign inner payload
signed = Serializer.dumps({
'message': message.to_dict(),
'signature': local_node_private_key.private_key.sign(
Serializer.dumps(message.to_dict()),
ec.ECDSA(hashes.SHA256()),
)
})
encryptor = Cipher(
algorithms.ChaCha20(derived_key, nonce),
mode=None,
backend=default_backend()
).encryptor()
return encryptor.update(signed) + encryptor.finalize(), salt, nonce
get_local_public_key async
get_local_public_key(distant_node_id)
Gets local public key for peering with a given distant peer node
Parameters:
Name | Type | Description | Default |
---|---|---|---|
distant_node_id | str | unique ID of the peer node | required |
Returns:
Type | Description |
---|---|
bytes | Local node's public key ID for this peer node |
Source code in fedbiomed/node/requests/_overlay.py
async def get_local_public_key(self, distant_node_id: str) -> bytes:
"""Gets local public key for peering with a given distant peer node
Args:
distant_node_id: unique ID of the peer node
Returns:
Local node's public key ID for this peer node
"""
return await self._channel_keys.get_local_public_key(distant_node_id)
send_node_setup async
send_node_setup(researcher_id, node, message)
Send a channel setup message to another node using overlay communications and wait for its reply.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
researcher_id | str | unique ID of researcher connecting the nodes | required |
node | str | unique node ID of the destination node | required |
message | InnerMessage | inner message for the destination node | required |
Returns:
Type | Description |
---|---|
bool | True if channel is ready, False if channel not ready after timeout |
Source code in fedbiomed/node/requests/_overlay.py
async def send_node_setup(
self,
researcher_id: str,
node: str,
message: InnerMessage,
) -> bool:
"""Send a channel setup message to another node using overlay communications and wait for its reply.
Args:
researcher_id: unique ID of researcher connecting the nodes
node: unique node ID of the destination node
message: inner message for the destination node
Returns:
True if channel is ready, False if channel not ready after timeout
"""
overlay, salt, nonce = await self.format_outgoing_overlay(message, researcher_id, True)
message_overlay = OverlayMessage(
researcher_id=researcher_id,
node_id=environ['NODE_ID'],
dest_node_id=node,
overlay=overlay,
setup=True,
salt=salt,
nonce=nonce,
)
self._grpc_client.send(message_overlay)
return await self._channel_keys.wait_ready_channel(node)
set_distant_key async
set_distant_key(distant_node_id, public_key_pem, request_id)
Sets distant (public) key of a channel for peering with a given distant peer node.
Distant key is not set if no channel exists for that distant_node_id
or if the request_id
does not match a pending request
Parameters:
Name | Type | Description | Default |
---|---|---|---|
distant_node_id | str | unique ID of the peer node | required |
public_key_pem | bytes | public key in PEM format | required |
request_id | str | unique ID of the request | required |
Returns:
Type | Description |
---|---|
bool | True if the distant key was set, False if it was not set |
Source code in fedbiomed/node/requests/_overlay.py
async def set_distant_key(
self,
distant_node_id: str,
public_key_pem: bytes,
request_id: str,
) -> bool:
"""Sets distant (public) key of a channel for peering with a given
distant peer node.
Distant key is not set if no channel exists for that `distant_node_id`
or if the `request_id` does not match a pending request
Args:
distant_node_id: unique ID of the peer node
public_key_pem: public key in PEM format
request_id: unique ID of the request
Returns:
True if the distant key was set, False if it was not set
"""
return await self._channel_keys.set_distant_key(
distant_node_id,
public_key_pem,
request_id
)
Functions
send_nodes
send_nodes(n2n_router, grpc_client, pending_requests, researcher_id, nodes, messages, raise_if_not_all_received=False)
Send message to some other nodes using overlay communications and wait for their replies.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n2n_router | NodeToNodeRouter | object managing node to node messages | required |
grpc_client | GrpcController | object managing the communication with other components | required |
pending_requests | EventWaitExchange | object for receiving overlay node to node reply message | required |
researcher_id | str | unique ID of researcher connecting the nodes | required |
nodes | List[str] | list of node IDs of the destination nodes | required |
messages | List[InnerMessage] | list of the inner messages for the destination nodes | required |
raise_if_not_all_received | bool | if True, raise exception if not all answers from nodes were received. Default to False, return with | False |
Returns: status: True if all messages are received replies: List of replies from each node.
Raises:
Type | Description |
---|---|
FedbiomedNodeToNodeError | not all answers received and raise_if_not_all_received is True |
Source code in fedbiomed/node/requests/_send_nodes.py
def send_nodes(
n2n_router: NodeToNodeRouter,
grpc_client: GrpcController,
pending_requests: EventWaitExchange,
researcher_id: str,
nodes: List[str],
messages: List[InnerMessage],
raise_if_not_all_received: bool = False,
) -> Tuple[bool, List[Any]]:
"""Send message to some other nodes using overlay communications and wait for their replies.
Args:
n2n_router: object managing node to node messages
grpc_client: object managing the communication with other components
pending_requests: object for receiving overlay node to node reply message
researcher_id: unique ID of researcher connecting the nodes
nodes: list of node IDs of the destination nodes
messages: list of the inner messages for the destination nodes
raise_if_not_all_received: if True, raise exception if not all answers from nodes were received.
Default to False, return with `status` to False when not all answers from nodes were received.
Returns:
status: True if all messages are received
replies: List of replies from each node.
Raises:
FedbiomedNodeToNodeError: not all answers received and raise_if_not_all_received is True
"""
request_ids = []
for node, message in zip(nodes, messages):
overlay, salt, nonce = n2n_router.format_outgoing_overlay(message, researcher_id)
message_overlay = OverlayMessage(
researcher_id=researcher_id,
node_id=environ['NODE_ID'],
dest_node_id=node,
overlay=overlay,
setup=False,
salt=salt,
nonce=nonce,
)
grpc_client.send(message_overlay)
if isinstance(message, InnerRequestReply):
request_ids += [message.get_param('request_id')]
all_received, replies = pending_requests.wait(request_ids, TIMEOUT_NODE_TO_NODE_REQUEST)
if not all_received and raise_if_not_all_received:
nodes_no_answer = set(nodes) - set(m.node_id for m in replies)
raise FedbiomedNodeToNodeError(
f"{ErrorNumbers.FB318.value}: Some nodes did not answer request "
f"{nodes_no_answer}"
)
return all_received, replies