Classes
DiscardOnTimeout
Bases: _ReplyTimeoutPolicy
Discards request that do not answer in given timeout
Functions
FederatedRequest
FederatedRequest(message, nodes, policy=None)
Dispatches federated requests
This class has been design to be send a request and wait until a response is received
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | Union[Message, MessagesByNode] | either a common | required |
nodes | List[NodeAgent] | list of nodes that are sent the message | required |
policy | Optional[List[RequestPolicy]] | list of policies for controlling the handling of the request | None |
Source code in fedbiomed/researcher/requests/_requests.py
def __init__(
self,
message: Union[Message, MessagesByNode],
nodes: List[NodeAgent],
policy: Optional[List[RequestPolicy]] = None
):
"""Constructor of the class.
Args:
message: either a common `Message` to send to nodes or a dict of distinct message per node
indexed by the node ID
nodes: list of nodes that are sent the message
policy: list of policies for controlling the handling of the request
"""
self._message = message
self._nodes = nodes
self._requests = []
self._request_id = str(uuid.uuid4())
self._nodes_status = {}
self._pending_replies = threading.Semaphore(value=0)
# Set-up policies
self._policy = PolicyController(policy)
# Set up single requests
if isinstance(self._message, Message):
for node in self._nodes:
self._requests.append(
Request(self._message, node, self._pending_replies, self._request_id)
)
# Different message for each node
elif isinstance(self._message, MessagesByNode):
for node in self._nodes:
if m := self._message.get(node.id):
self._requests.append(
Request(m, node, self._pending_replies, self._request_id)
)
else:
logger.warning(f"Node {node.id} is unknown. Send message to others, ignore this one.")
Attributes
Functions
MessagesByNode
Bases: dict
Type to defined messages by node
PolicyController
PolicyController(policies=None)
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(
self,
policies: Optional[List[RequestPolicy]] = None,
):
policies = policies or []
policies.insert(0, RequestPolicy())
self._policies = policies
Functions
PolicyStatus
Attributes
Request
Request(message, node, sem_pending, request_id=None)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | Message | Message to send to the node | required |
node | NodeAgent | Node agent | required |
request_id | Optional[str] | unique ID of request | None |
sem_pending | Semaphore | semaphore for signaling new pending reply | required |
Source code in fedbiomed/researcher/requests/_requests.py
def __init__(
self,
message: Message,
node: NodeAgent,
sem_pending: threading.Semaphore,
request_id: Optional[str] = None,
) -> None:
"""Single request for node
Args:
message: Message to send to the node
node: Node agent
request_id: unique ID of request
sem_pending: semaphore for signaling new pending reply
"""
self._request_id = request_id if request_id else str(uuid.uuid4())
self._node = node
self._message = message
self._sem_pending = sem_pending
self.reply = None
self.error = None
self.status = None
Attributes
Functions
RequestPolicy
RequestPolicy(nodes=None)
Base strategy to collect replies from remote agents
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, nodes: Optional[List[str]] = None):
self.status = None
self._nodes = nodes
self.stop_caused_by = None
Attributes
Functions
RequestStatus
Attributes
Requests
Requests()
Manages communication between researcher and nodes.
Source code in fedbiomed/researcher/requests/_requests.py
def __init__(self):
"""Constructor of the class
"""
self._monitor_message_callback = None
# Creates grpc server and starts it
self._grpc_server = GrpcServer(
host=environ["SERVER_HOST"],
port=environ["SERVER_PORT"],
on_message=self.on_message,
ssl=SSLCredentials(
key=environ['SERVER_SSL_KEY'],
cert=environ['SERVER_SSL_CERT'])
)
self.start_messaging()
Functions
StopOnDisconnect
Bases: _ReplyTimeoutPolicy
Stops collecting results if a node disconnects
Functions
StopOnError
StopOnTimeout
Bases: _ReplyTimeoutPolicy
Stops the request if nodes do not answer in given timeout