Requests

Classes

DiscardOnTimeout

Bases: _ReplyTimeoutPolicy

Discards request that do not answer in given timeout

Functions

FederatedRequest

FederatedRequest(message, nodes, policy=None)

Dispatches federated requests

This class has been design to be send a request and wait until a response is received

Parameters:

Name Type Description Default
message Union[Message, MessagesByNode]

either a common Message to send to nodes or a dict of distinct message per node indexed by the node ID

required
nodes List[NodeAgent]

list of nodes that are sent the message

required
policy Optional[List[RequestPolicy]]

list of policies for controlling the handling of the request

None
Source code in fedbiomed/researcher/requests/_requests.py
def __init__(
    self,
    message: Union[Message, MessagesByNode],
    nodes: List[NodeAgent],
    policy: Optional[List[RequestPolicy]] = None
):
    """Constructor of the class.

    Args:
        message: either a common `Message` to send to nodes or a dict of distinct message per node
            indexed by the node ID
        nodes: list of nodes that are sent the message
        policy: list of policies for controlling the handling of the request
    """

    self._message = message
    self._nodes = nodes
    self._requests = []
    self._request_id = str(uuid.uuid4())
    self._nodes_status = {}

    self._pending_replies = threading.Semaphore(value=0)

    # Set-up policies
    self._policy = PolicyController(policy)

    # Set up single requests
    if isinstance(self._message, Message):
        for node in self._nodes:
            self._requests.append(
                Request(self._message, node, self._pending_replies, self._request_id)
            )

    # Different message for each node
    elif isinstance(self._message, MessagesByNode):
        for node in self._nodes:
            if m := self._message.get(node.id):
                self._requests.append(
                    Request(m, node, self._pending_replies, self._request_id)
                )
            else:
                logger.warning(f"Node {node.id} is unknown. Send message to others, ignore this one.")

Attributes

Functions

MessagesByNode

Bases: dict

Type to defined messages by node

PolicyController

PolicyController(policies=None)
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(
    self,
    policies: Optional[List[RequestPolicy]] = None,
):

    policies = policies or []
    policies.insert(0, RequestPolicy())
    self._policies = policies

Functions

PolicyStatus

Attributes

Request

Request(message, node, sem_pending, request_id=None)

Parameters:

Name Type Description Default
message Message

Message to send to the node

required
node NodeAgent

Node agent

required
request_id Optional[str]

unique ID of request

None
sem_pending Semaphore

semaphore for signaling new pending reply

required
Source code in fedbiomed/researcher/requests/_requests.py
def __init__(
    self,
    message: Message,
    node: NodeAgent,
    sem_pending: threading.Semaphore,
    request_id: Optional[str] = None,
) -> None:
    """Single request for node

    Args:
        message: Message to send to the node
        node: Node agent
        request_id: unique ID of request
        sem_pending: semaphore for signaling new pending reply
    """
    self._request_id = request_id if request_id else str(uuid.uuid4())
    self._node = node
    self._message = message

    self._sem_pending = sem_pending

    self.reply = None
    self.error = None
    self.status = None

Attributes

Functions

RequestPolicy

RequestPolicy(nodes=None)

Base strategy to collect replies from remote agents

Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, nodes: Optional[List[str]] = None):
    self.status = None
    self._nodes = nodes
    self.stop_caused_by = None

Attributes

Functions

RequestStatus

Attributes

Requests

Requests()

Manages communication between researcher and nodes.

Source code in fedbiomed/researcher/requests/_requests.py
def __init__(self):
    """Constructor of the class
    """
    self._monitor_message_callback = None

    # Creates grpc server and starts it
    self._grpc_server = GrpcServer(
        host=environ["SERVER_HOST"],
        port=environ["SERVER_PORT"],
        on_message=self.on_message,
        ssl=SSLCredentials(
            key=environ['SERVER_SSL_KEY'],
            cert=environ['SERVER_SSL_CERT'])

    )
    self.start_messaging()

Functions

StopOnDisconnect

Bases: _ReplyTimeoutPolicy

Stops collecting results if a node disconnects

Functions

StopOnError

Bases: RequestPolicy

Stops collecting results if a node returns an error

Functions

StopOnTimeout

Bases: _ReplyTimeoutPolicy

Stops the request if nodes do not answer in given timeout

Functions