to simplify imports from fedbiomed.node.requests
Classes
NodeToNodeController
NodeToNodeController(grpc_controller, pending_requests, controller_data)
Defines the controller for protocol messages processed by the node to node router
Parameters:
Name | Type | Description | Default |
---|---|---|---|
grpc_controller | GrpcController | object managing the communication with other components | required |
pending_requests | EventWaitExchange | object for receiving overlay node to node messages | required |
controller_data | EventWaitExchange | object for sharing data | required |
Source code in fedbiomed/node/requests/_n2n_controller.py
def __init__(
self,
grpc_controller: GrpcController,
pending_requests: EventWaitExchange,
controller_data: EventWaitExchange,
) -> None:
"""Constructor of the class.
Args:
grpc_controller: object managing the communication with other components
pending_requests: object for receiving overlay node to node messages
controller_data: object for sharing data
"""
self._grpc_controller = grpc_controller
self._pending_requests = pending_requests
self._controller_data = controller_data
self._command2method = {
'key-request': self._HandlerKeyRequest,
'key-reply': self._HandlerKeyReply,
# 'dummy-inner': self._HandlerDummyInner,
}
self._command2final = {
'key-request': self._FinalKeyRequest,
'key-reply': self._FinalKeyReply,
# 'dummy-inner': self._FinalDummyInner,
}
Functions
final async
final(command, **kwargs)
Calls the final processing for a received message protocol.
This handler is optional, it may not be declared for a message.
Should be called only if the handler completed without being interrupted. Cannot be interrupted, thus should not launch treatment that may hang.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs | Specific arguments for this message final handler | {} |
Source code in fedbiomed/node/requests/_n2n_controller.py
async def final(self, command, **kwargs) -> None:
"""Calls the final processing for a received message protocol.
This handler is optional, it may not be declared for a message.
Should be called only if the handler completed without being interrupted.
Cannot be interrupted, thus should not launch treatment that may hang.
Args:
kwargs: Specific arguments for this message final handler
"""
if command in self._command2final:
# Useful ? Allow omitting some arguments, automatically add them with None value
expected_args = dict(inspect.signature(self._command2final[command]).parameters).keys()
kwargs.update({arg: None for arg in expected_args if arg not in kwargs})
await self._command2final[command](**kwargs)
handle async
handle(overlay_msg, inner_msg)
Calls the handler for processing a received message protocol.
If it does not exist, call the default handler to trigger an error.
Main part of the processing which can be interrupted if the processing takes too long.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
overlay_msg | dict | Outer message for node to node communication | required |
inner_msg | InnerMessage | Unpacked inner message from the outer message | required |
Returns:
Type | Description |
---|---|
Optional[dict] | A dict of the |
Source code in fedbiomed/node/requests/_n2n_controller.py
async def handle(self, overlay_msg: dict, inner_msg: InnerMessage) -> Optional[dict]:
"""Calls the handler for processing a received message protocol.
If it does not exist, call the default handler to trigger an error.
Main part of the processing which can be interrupted if the processing takes too long.
Args:
overlay_msg: Outer message for node to node communication
inner_msg: Unpacked inner message from the outer message
Returns:
A dict of the `kwargs` expected by the corresponding `final()` handler for this message.
Empty dict or `None` if no `kwargs` expected of no final handler
"""
if inner_msg.get_param('command') in self._command2method:
return await self._command2method[inner_msg.get_param('command')](overlay_msg, inner_msg)
else:
return await self._HandlerDefault(overlay_msg, inner_msg)
NodeToNodeRouter
NodeToNodeRouter(grpc_controller, pending_requests, controller_data)
Bases: _NodeToNodeAsyncRouter
Handles node to node messages received by a node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
grpc_controller | GrpcController | object managing the communication with other components | required |
pending_requests | EventWaitExchange | object for receiving overlay node to node messages | required |
controller_data | EventWaitExchange | object for sharing data with the controller | required |
Source code in fedbiomed/node/requests/_n2n_router.py
def __init__(
self,
grpc_controller: GrpcController,
pending_requests: EventWaitExchange,
controller_data: EventWaitExchange
) -> None:
"""Class constructor.
Args:
grpc_controller: object managing the communication with other components
pending_requests: object for receiving overlay node to node messages
controller_data: object for sharing data with the controller
"""
super().__init__(grpc_controller, pending_requests, controller_data)
self._thread = Thread(target=self._run, args=(), daemon=True)
Functions
start
start()
Starts the node to node router.
Source code in fedbiomed/node/requests/_n2n_router.py
def start(self) -> None:
"""Starts the node to node router."""
self._thread.start()
submit
submit(msg)
Submits a received message to the node to node router for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg | dict | received message | required |
Raises:
Type | Description |
---|---|
FedbiomedNodeToNodeError | bad message type or value. |
Source code in fedbiomed/node/requests/_n2n_router.py
def submit(self, msg: dict) -> None:
"""Submits a received message to the node to node router for processing.
Args:
msg: received message
Raises:
FedbiomedNodeToNodeError: bad message type or value.
"""
# node to node router currently handles only node to node messages
# Conceived to be later extended for other messages processing, during node redesign
if msg['command'] != 'overlay':
raise FedbiomedNodeToNodeError(
f'{ErrorNumbers.FB324.value}: node to node router needs a node to node message')
try:
asyncio.run_coroutine_threadsafe(self._submit(msg), self._loop)
except Exception as e:
logger.critical(
f"Failed submitting message to node to node router. Exception: {type(e).__name__}. Error message: {e}")
raise e
Functions
format_incoming_overlay
format_incoming_overlay(payload)
Retrieves inner message from overlay message payload.
Check signature, decrypt, deserialize the inner message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
payload | List[bytes] | Payload of overlay message. | required |
Returns:
Type | Description |
---|---|
InnerMessage | Inner message retrieved from overlay payload |
Raises:
Type | Description |
---|---|
FedbiomedNodeToNodeError | key is too short |
FedbiomedNodeToNodeError | cannot decrypt payload |
FedbiomedNodeToNodeError | bad payload format |
FedbiomedNodeToNodeError | cannot verify payload integrity |
Source code in fedbiomed/node/requests/_overlay.py
def format_incoming_overlay(payload: List[bytes]) -> InnerMessage:
"""Retrieves inner message from overlay message payload.
Check signature, decrypt, deserialize the inner message
Args:
payload: Payload of overlay message.
Returns:
Inner message retrieved from overlay payload
Raises:
FedbiomedNodeToNodeError: key is too short
FedbiomedNodeToNodeError: cannot decrypt payload
FedbiomedNodeToNodeError: bad payload format
FedbiomedNodeToNodeError: cannot verify payload integrity
"""
# check payload types (not yet done by message type checks, only checks it's a list)
if not all(isinstance(p, bytes) for p in payload):
raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: bad type for node to node payload')
# decode and ensure only node2node (inner) messages are received
local_node_private_key = _default_n2n_key
distant_node_public_key = _default_n2n_key.public_key()
if _CHUNK_SIZE * 8 > min(local_node_private_key.key_size, distant_node_public_key.key_size):
raise FedbiomedNodeToNodeError(
f'{ErrorNumbers.FB324.value}: cannot use key shorter than {_CHUNK_SIZE} bits')
# decrypt outer payload
# caveat: decryption can be long for long messages (~10s for 1MB cleartext message)
try:
decrypted_chunks = [
local_node_private_key.decrypt(
chunk,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
for chunk in payload
]
except ValueError as e:
raise FedbiomedNodeToNodeError(
f'{ErrorNumbers.FB324.value}: cannot decrypt payload: {e}') from e
decrypted = Serializer.loads(bytes().join(decrypted_chunks))
if not isinstance(decrypted, dict) or not set(('message', 'signature')) <= set(decrypted):
raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: bad inner payload format '
f"in received message")
# verify inner payload
try:
distant_node_public_key.verify(
decrypted['signature'],
Serializer.dumps(decrypted['message']),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
except InvalidSignature as e:
raise FedbiomedNodeToNodeError(
f'{ErrorNumbers.FB324.value}: cannot verify payload integrity: {e}') from e
return NodeToNodeMessages.format_incoming_message(decrypted['message'])
format_outgoing_overlay
format_outgoing_overlay(message)
Creates an overlay message payload from an inner message.
Serialize, crypt, sign the inner message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | Message | Inner message to send as overlay payload | required |
Returns:
Type | Description |
---|---|
List[bytes] | Payload for overlay message |
Raises:
Type | Description |
---|---|
FedbiomedNodeToNodeError | key is too short |
FedbiomedNodeToNodeError | bad message type |
Source code in fedbiomed/node/requests/_overlay.py
def format_outgoing_overlay(message: Message) -> List[bytes]:
"""Creates an overlay message payload from an inner message.
Serialize, crypt, sign the inner message
Args:
message: Inner message to send as overlay payload
Returns:
Payload for overlay message
Raises:
FedbiomedNodeToNodeError: key is too short
FedbiomedNodeToNodeError: bad message type
"""
# robustify from developer error (try to encapsulate a bad message type)
if not isinstance(message, InnerMessage):
raise FedbiomedNodeToNodeError(f'{ErrorNumbers.FB324.value}: not an inner message')
# consider encrypt-sign([message,node_id]) or other see
# https://theworld.com/~dtd/sign_encrypt/sign_encrypt7.html
local_node_private_key = _default_n2n_key
distant_node_public_key = _default_n2n_key.public_key()
if _CHUNK_SIZE * 8 > min(local_node_private_key.key_size, distant_node_public_key.key_size):
raise FedbiomedNodeToNodeError(
f'{ErrorNumbers.FB324.value}: cannot use key shorter than {_CHUNK_SIZE} bits')
# sign inner payload
signed = Serializer.dumps({
'message': message.get_dict(),
'signature': local_node_private_key.sign(
Serializer.dumps(message.get_dict()),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
})
# split to chunks and encrypt
return [
distant_node_public_key.encrypt(
signed[i:i + _CHUNK_SIZE],
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
for i in range(0, len(signed), _CHUNK_SIZE)
]
send_nodes
send_nodes(grpc_client, pending_requests, researcher_id, nodes, messages)
Send message to some other nodes using overlay communications.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
grpc_client | GrpcController | object managing the communication with other components | required |
pending_requests | EventWaitExchange | object for receiving overlay node to node reply messages | required |
researcher_id | str | unique ID of researcher connecting the nodes | required |
nodes | List[str] | list of node IDs of the destination nodes | required |
messages | List[InnerMessage] | list of the inner messages for the destination nodes | required |
Returns: status: True if all messages are received replies: List of replies from each node.
Source code in fedbiomed/node/requests/_overlay.py
def send_nodes(
grpc_client: GrpcController,
pending_requests: EventWaitExchange,
researcher_id: str,
nodes: List[str],
messages: List[InnerMessage],
) -> Tuple[bool, List[Any]]:
"""Send message to some other nodes using overlay communications.
Args:
grpc_client: object managing the communication with other components
pending_requests: object for receiving overlay node to node reply messages
researcher_id: unique ID of researcher connecting the nodes
nodes: list of node IDs of the destination nodes
messages: list of the inner messages for the destination nodes
Returns:
status: True if all messages are received
replies: List of replies from each node.
"""
request_ids = []
for node, message in zip(nodes, messages):
message_overlay = NodeMessages.format_outgoing_message(
{
'researcher_id': researcher_id,
'node_id': environ['NODE_ID'],
'dest_node_id': node,
'overlay': format_outgoing_overlay(message),
'command': 'overlay'
})
grpc_client.send(message_overlay)
if isinstance(message, InnerRequestReply):
request_ids += [message.get_param('request_id')]
return pending_requests.wait(request_ids, TIMEOUT_NODE_TO_NODE_REQUEST)