Requests

Classes

DiscardOnTimeout

DiscardOnTimeout(timeout=5, nodes=None)

Bases: _ReplyTimeoutPolicy

Discards request that do not answer in given timeout

Parameters:

Name Type Description Default
timeout int

maximum time for policy

5
nodes Optional[List[str]]

optional list of nodes to apply the policy. By default applies to all known nodes of request.

None
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, timeout: int = 5, nodes: Optional[List[str]] = None) -> None:
    """Implements timeout attributes

    Args:
        timeout: maximum time for policy
        nodes: optional list of nodes to apply the policy. By default applies to all known nodes of request.
    """
    super().__init__(nodes)
    self.timeout = timeout
    self._time = None

Functions

continue_
continue_(requests)

Discards requests that reach timeout, always continue

Returns:

Type Description
PolicyStatus

CONTINUE

Source code in fedbiomed/researcher/requests/_policies.py
def continue_(self, requests: TRequest) -> PolicyStatus:
    """Discards requests that reach timeout, always continue

    Returns:
        CONTINUE
    """
    return self.apply(requests, False)

FederatedRequest

FederatedRequest(message, nodes, policy=None)

Dispatches federated requests

This class has been design to be send a request and wait until a response is received

Parameters:

Name Type Description Default
message Union[Message, MessagesByNode]

either a common Message to send to nodes or a dict of distinct message per node indexed by the node ID

required
nodes List[NodeAgent]

list of nodes that are sent the message

required
policy Optional[List[RequestPolicy]]

list of policies for controlling the handling of the request

None
Source code in fedbiomed/researcher/requests/_requests.py
def __init__(
    self,
    message: Union[Message, MessagesByNode],
    nodes: List[NodeAgent],
    policy: Optional[List[RequestPolicy]] = None
):
    """Constructor of the class.

    Args:
        message: either a common `Message` to send to nodes or a dict of distinct message per node
            indexed by the node ID
        nodes: list of nodes that are sent the message
        policy: list of policies for controlling the handling of the request
    """

    self._message = message
    self._nodes = nodes
    self._requests = []
    self._request_id = REQUEST_PREFIX + str(uuid.uuid4())
    self._nodes_status = {}

    self._pending_replies = threading.Semaphore(value=0)

    # Set-up policies
    self._policy = PolicyController(policy)

    # Set up single requests
    if isinstance(self._message, Message):
        for node in self._nodes:
            self._requests.append(
                Request(self._message, node, self._pending_replies, self._request_id)
            )

    # Different message for each node
    elif isinstance(self._message, MessagesByNode):
        for node in self._nodes:
            if m := self._message.get(node.id):
                self._requests.append(
                    Request(m, node, self._pending_replies, self._request_id)
                )
            else:
                logger.warning(f"Node {node.id} is unknown. Send message to others, ignore this one.")

Attributes

policy property
policy

Returns policy controller

requests property
requests

Returns requests

Functions

disconnected_requests
disconnected_requests()

Returns the requests to disconnected nodes

Returns:

Type Description
List[Message]

A list of request Message sent to disconnected nodes

Source code in fedbiomed/researcher/requests/_requests.py
def disconnected_requests(self) -> List[Message]:
    """Returns the requests to disconnected nodes

    Returns:
        A list of request `Message` sent to disconnected nodes
    """
    return [req for req in self._requests if req.status == RequestStatus.DISCONNECT]
errors
errors()

Returns errors of each request

Returns:

Type Description
Dict[str, ErrorMessage]

A dict of error Message received for this request, indexed by node ID

Source code in fedbiomed/researcher/requests/_requests.py
def errors(self) -> Dict[str, ErrorMessage]:
    """Returns errors of each request

    Returns:
        A dict of error `Message` received for this request, indexed by node ID
    """

    return {req.node.id: req.error for req in self._requests if req.error}
replies
replies()

Returns replies of each request

Returns:

Type Description
Dict[str, Message]

A dict of replies Message received for this request, indexed by node ID

Source code in fedbiomed/researcher/requests/_requests.py
def replies(self) -> Dict[str, Message]:
    """Returns replies of each request

    Returns:
        A dict of replies `Message` received for this request, indexed by node ID
    """

    return {req.node.id: req.reply for req in self._requests if req.reply}
send
send()

Sends federated request

Source code in fedbiomed/researcher/requests/_requests.py
def send(self) -> None:
    """Sends federated request"""
    for req in self._requests:
        req.send()
wait
wait()

Waits for the replies of the messages that are sent

Source code in fedbiomed/researcher/requests/_requests.py
def wait(self) -> None:
    """Waits for the replies of the messages that are sent"""

    while self._policy.continue_all(self._requests) == PolicyStatus.CONTINUE:
        self._pending_replies.acquire(timeout=REQUEST_STATUS_CHECK_TIMEOUT)

MessagesByNode

Bases: dict

Type to defined messages by node

PolicyController

PolicyController(policies=None)
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(
    self,
    policies: Optional[List[RequestPolicy]] = None,
):

    policies = policies or []
    policies.insert(0, RequestPolicy())
    self._policies = policies

Functions

continue_all
continue_all(requests)

Checks if all policies indicate to continue.

Parameters:

Name Type Description Default
requests List[TRequest]

List of Request objects to check against policies

required

Returns:

Type Description
PolicyStatus

CONTINUE if all policies indicates to continue

Source code in fedbiomed/researcher/requests/_policies.py
def continue_all(self, requests: List[TRequest]) -> PolicyStatus:
    """Checks if all policies indicate to continue.

    Args:
        requests: List of [Request][fedbiomed.researcher.requests.Request] objects to
            check against policies

    Returns:
        CONTINUE if all policies indicates to continue
    """

    if not requests:
        return False

    status = all(
        [policy.continue_(requests=requests) == PolicyStatus.CONTINUE
            for policy in self._policies]
    )

    return PolicyStatus.CONTINUE if status else PolicyStatus.COMPLETED
has_stopped_any
has_stopped_any()

Checks if any of the policies indicates to stop

Returns:

Type Description
bool

True if request has stopped due to given strategy

Source code in fedbiomed/researcher/requests/_policies.py
def has_stopped_any(self) -> bool:
    """Checks if any of the policies indicates to stop

    Returns:
        True if request has stopped due to given strategy
    """

    is_stopped = any(
        [policy.status == PolicyStatus.STOPPED for policy in self._policies]
    )

    return is_stopped
report
report()

Reports strategy stop status

Returns:

Type Description
Dict[str, str]

Dict of policies stopped, indexed by the node ID that caused the stop

Source code in fedbiomed/researcher/requests/_policies.py
def report(self) -> Dict[str, str]:
    """Reports strategy stop status

    Returns:
        Dict of policies stopped, indexed by the node ID that caused the stop
    """
    report = {}
    for st in self._policies:
        if st.status == PolicyStatus.STOPPED:
            report.update({st.stop_caused_by.node.id : st.__class__.__name__})

    return report

PolicyStatus

Attributes

COMPLETED class-attribute instance-attribute
COMPLETED = 'COMPLETED'
CONTINUE class-attribute instance-attribute
CONTINUE = 'CONTINUE'
STOPPED class-attribute instance-attribute
STOPPED = 'STOPPED'

Request

Request(message, node, sem_pending, request_id=None)

Parameters:

Name Type Description Default
message Message

Message to send to the node

required
node NodeAgent

Node agent

required
request_id Optional[str]

unique ID of request

None
sem_pending Semaphore

semaphore for signaling new pending reply

required
Source code in fedbiomed/researcher/requests/_requests.py
def __init__(
    self,
    message: Message,
    node: NodeAgent,
    sem_pending: threading.Semaphore,
    request_id: Optional[str] = None,
) -> None:
    """Single request for node

    Args:
        message: Message to send to the node
        node: Node agent
        request_id: unique ID of request
        sem_pending: semaphore for signaling new pending reply
    """
    self._request_id = request_id if request_id else str(uuid.uuid4())
    self._node = node
    self._message = message

    self._sem_pending = sem_pending

    self.reply = None
    self.error = None
    self.status = None

Attributes

error instance-attribute
error = None
node property
node

Returns node agent

reply instance-attribute
reply = None
status instance-attribute
status = None

Functions

flush
flush(stopped)

Flushes the reply that has been processed

Parameters:

Name Type Description Default
stopped bool

True if the request was stopped before completion

required
Source code in fedbiomed/researcher/requests/_requests.py
def flush(self, stopped: bool) -> None:
    """Flushes the reply that has been processed

    Args:
        stopped: True if the request was stopped before completion
    """
    self._node.flush(self._request_id, stopped)
has_finished
has_finished()

Queries if the request has finished.

Also tracks if node has disconnected.

Source code in fedbiomed/researcher/requests/_requests.py
def has_finished(self) -> bool:
    """Queries if the request has finished.

    Also tracks if node has disconnected.

    Args:
        True if a reply was received from node
    """

    if self._node.status == NodeActiveStatus.DISCONNECTED:
        self.status = RequestStatus.DISCONNECT
        return True  # Don't expect any reply

    return True if self.reply or self.error else False
on_reply
on_reply(reply)

Callback for node agent to execute once it replies.

Parameters:

Name Type Description Default
reply Message

reply message received from node

required
Source code in fedbiomed/researcher/requests/_requests.py
def on_reply(self, reply: Message) -> None:
    """Callback for node agent to execute once it replies.

    Args:
        reply: reply message received from node
    """

    if isinstance(reply, ErrorMessage):
        self.error = reply
        self.status = RequestStatus.ERROR
    else:
        self.reply = reply
        self.status = RequestStatus.SUCCESS

    self._sem_pending.release()
send
send()

Sends the request

Source code in fedbiomed/researcher/requests/_requests.py
def send(self) -> None:
    """Sends the request"""
    self._message.request_id = self._request_id
    self._node.send(self._message, self.on_reply)
    self.status = RequestStatus.NO_REPLY_YET

RequestPolicy

RequestPolicy(nodes=None)

Base strategy to collect replies from remote agents

Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, nodes: Optional[List[str]] = None):
    self.status = None
    self._nodes = nodes
    self.stop_caused_by = None

Attributes

status instance-attribute
status = None
stop_caused_by instance-attribute
stop_caused_by = None

Functions

completed
completed()

Updates status of strategy as completed without any issue

Source code in fedbiomed/researcher/requests/_policies.py
def completed(self) -> PolicyStatus:
    """Updates status of strategy as completed without any issue"""
    self.status = PolicyStatus.COMPLETED

    return PolicyStatus.COMPLETED
continue_
continue_(requests)

Default strategy stops collecting result once all nodes has answered

Returns:

Type Description
PolicyStatus

False stops the iteration

Source code in fedbiomed/researcher/requests/_policies.py
def continue_(self, requests) -> PolicyStatus:
    """Default strategy stops collecting result once all nodes has answered

    Returns:
        False stops the iteration
    """

    has_finished = all([req.has_finished() for req in requests])
    return self.keep() if not has_finished else self.completed()
keep
keep()

Keeps continue collecting replies from nodes

Source code in fedbiomed/researcher/requests/_policies.py
def keep(self) -> PolicyStatus:
    """Keeps continue collecting replies from nodes"""
    self.status = PolicyStatus.CONTINUE

    return PolicyStatus.CONTINUE
stop
stop(req)

Stop sign for strategy

Source code in fedbiomed/researcher/requests/_policies.py
def stop(self, req) -> PolicyStatus:
    """Stop sign for strategy"""
    self.status = PolicyStatus.STOPPED
    self.stop_caused_by = req

    return PolicyStatus.STOPPED

RequestStatus

Attributes

DISCONNECT class-attribute instance-attribute
DISCONNECT = 'DISCONNECT'
ERROR class-attribute instance-attribute
ERROR = 'ERROR'
NO_REPLY_YET class-attribute instance-attribute
NO_REPLY_YET = 'NO_REPLY_YET'
SUCCESS class-attribute instance-attribute
SUCCESS = 'SUCCESS'
TIMEOUT class-attribute instance-attribute
TIMEOUT = 'TIMEOUT'

Requests

Requests(config)

Manages communication between researcher and nodes.

Parameters:

Name Type Description Default
config ResearcherConfig

Object for handling the component configuration

required
Source code in fedbiomed/researcher/requests/_requests.py
def __init__(
    self,
    config: ResearcherConfig,
):
    """Constructor of the class

    Args:
        config: Object for handling the component configuration
    """
    self._monitor_message_callback = None

    server_host=config.get('server', 'host')
    server_port=config.get('server', 'port')
    cert_priv=os.path.join(
            config.root, CONFIG_FOLDER_NAME, config.get('certificate', 'private_key')
    )
    cert_pub=os.path.join(
            config.root, CONFIG_FOLDER_NAME, config.get('certificate', 'public_key')
    )

    # Creates grpc server and starts it
    self._researcher_id = config.get("default", "id")
    self._grpc_server = GrpcServer(
        host=server_host,
        port=server_port,
        on_message=self.on_message,
        ssl=SSLCredentials(
            key=cert_priv,
            cert=cert_pub)

    )
    self.start_messaging()

Functions

add_monitor_callback
add_monitor_callback(callback)

Adds callback function for monitor messages

Parameters:

Name Type Description Default
callback Callable[[Dict], None]

Callback function for handling monitor messages that come due 'general/monitoring' channel

required
Source code in fedbiomed/researcher/requests/_requests.py
def add_monitor_callback(self, callback: Callable[[Dict], None]):
    """ Adds callback function for monitor messages

    Args:
        callback: Callback function for handling monitor messages that come due 'general/monitoring' channel
    """

    self._monitor_message_callback = callback
list
list(nodes=None, verbose=False)

Lists available data in each node

Parameters:

Name Type Description Default
nodes Optional[list]

optionally filter nodes with this list. Default is None, no filtering, consider all nodes

None
verbose bool

If it is true it prints datasets in readable format

False

Returns:

Type Description
dict

A dict with node_id as keys, and list of dicts describing available data as values

Source code in fedbiomed/researcher/requests/_requests.py
def list(self, nodes: Optional[list] = None, verbose: bool = False) -> dict:
    """Lists available data in each node

    Args:
        nodes: optionally filter nodes with this list. Default is None, no filtering, consider all nodes
        verbose: If it is true it prints datasets in readable format

    Returns:
        A dict with node_id as keys, and list of dicts describing available data as values
    """

    message = ListRequest(researcher_id=self._researcher_id)

    data_found = {}
    with self.send(message, nodes, policies=[DiscardOnTimeout(5)]) as federated_req:
        for node_id, reply in federated_req.replies().items():
            data_found[node_id] = reply.databases

    if verbose:
        for node in data_found:
            if len(data_found[node]) > 0:
                rows = [row.values() for row in data_found[node]]
                headers = data_found[node][0].keys()
                info = '\n Node: {} | Number of Datasets: {} \n'.format(node, len(data_found[node]))
                logger.info(info + tabulate.tabulate(rows, headers, tablefmt="grid") + '\n')
            else:
                logger.info('\n Node: {} | Number of Datasets: {}'.format(node, len(data_found[node])) +
                            " No data has been set up for this node.")

    return data_found
on_message
on_message(msg, type_)

Handles arbitrary messages received from the remote agents

This callback is only used for feedback messages from nodes (logs, experiment monitor), not for node replies to requests.

Parameters:

Name Type Description Default
msg Union[Dict[str, Any], Message]

de-serialized msg

required
type_ MessageType

Reply type one of reply, log, scalar

required
Source code in fedbiomed/researcher/requests/_requests.py
def on_message(self, msg: Union[Dict[str, Any], Message], type_: MessageType) -> None:
    """Handles arbitrary messages received from the remote agents

    This callback is only used for feedback messages from nodes (logs, experiment
    monitor), not for node replies to requests.

    Args:
        msg: de-serialized msg
        type_: Reply type one of reply, log, scalar
    """

    if type_ == MessageType.LOG:
        # forward the treatment to node_log_handling() (same thread)
        self.print_node_log_message(msg.get_dict())

    elif type_ == MessageType.SCALAR:
        if self._monitor_message_callback is not None:
            # Pass message to Monitor's on message handler
            self._monitor_message_callback(msg.get_dict())
    else:
        logger.error(f"Undefined message type received  {type_} - IGNORING")
ping_nodes
ping_nodes()

Pings online nodes

Returns:

Type Description
list

List of ID of up and running nodes

Source code in fedbiomed/researcher/requests/_requests.py
def ping_nodes(self) -> list:
    """ Pings online nodes

    Returns:
        List of ID of up and running nodes
    """
    ping = PingRequest(researcher_id=self._researcher_id)
    with self.send(ping, policies=[DiscardOnTimeout(5)]) as federated_req:
        nodes_online = [node_id for node_id, reply in federated_req.replies().items()]

    return nodes_online
print_node_log_message staticmethod
print_node_log_message(log)

Prints logger messages coming from the node

It is run on the communication process and must be as quick as possible: - all logs (coming from the nodes) are forwarded to the researcher logger (immediate display on console/file/whatever)

Parameters:

Name Type Description Default
log Dict[str, Any]

log message and its metadata

required
Source code in fedbiomed/researcher/requests/_requests.py
@staticmethod
def print_node_log_message(log: Dict[str, Any]) -> None:
    """Prints logger messages coming from the node

    It is run on the communication process and must be as quick as possible:
    - all logs (coming from the nodes) are forwarded to the researcher logger
    (immediate display on console/file/whatever)

    Args:
        log: log message and its metadata
    """

    # log contains the original message sent by the node
    # FIXME: we should use `fedbiomed.common.json.deserialize` method
    # instead of the json method when extracting json message
    original_msg = json.loads(log["msg"])

    # Loging fancy feedback for training
    logger.info("\033[1m{}\033[0m\n"
                "\t\t\t\t\t\033[1m NODE\033[0m {}\n"
                "\t\t\t\t\t\033[1m MESSAGE:\033[0m {}\033[0m\n"
                "{}".format(log["level"],
                            log["node_id"],
                            original_msg["message"],
                            5 * "-------------"))
remove_monitor_callback
remove_monitor_callback()

Removes callback function for Monitor class.

Source code in fedbiomed/researcher/requests/_requests.py
def remove_monitor_callback(self):
    """ Removes callback function for Monitor class. """

    self._monitor_message_callback = None
search
search(tags, nodes=None)

Searches available data by tags

Parameters:

Name Type Description Default
tags List[str]

List containing tags associated to the data researcher is looking for.

required
nodes Optional[list]

optionally filter nodes with this list. Default is None, no filtering, consider all nodes

None

Returns:

Type Description
dict

A dict with node_id as keys, and list of dicts describing available data as values

Source code in fedbiomed/researcher/requests/_requests.py
def search(self, tags: List[str], nodes: Optional[list] = None) -> dict:
    """Searches available data by tags

    Args:
        tags: List containing tags associated to the data researcher is looking for.
        nodes: optionally filter nodes with this list. Default is None, no filtering, consider all nodes

    Returns:
        A dict with node_id as keys, and list of dicts describing available data as values
    """
    message = SearchRequest(
        researcher_id=self._researcher_id,
        tags=tags,
    )

    data_found = {}
    with self.send(message, nodes, policies=[DiscardOnTimeout(5)]) as federated_req:

        for node_id, reply in federated_req.replies().items():
            if reply.databases:
                data_found[node_id] = reply.databases
                logger.info('Node selected for training -> {}'.format(reply.node_id))

        for node_id, error in federated_req.errors().items():
            logger.warning(f"Node {node_id} has returned error from search request {error.extra_msg}")


        if not data_found:
            logger.info("No available dataset has found in nodes with tags: {}".format(tags))

    return data_found
send
send(message, nodes=None, policies=None)

Sends federated request to given nodes with given message

Parameters:

Name Type Description Default
message Union[Message, MessagesByNode]

either a common Message to send to nodes or a dict of distinct message per node indexed by the node ID

required
nodes Optional[List[str]]

list of nodes that are sent the message. If None, send the message to all known active nodes.

None
policy

list of policies for controlling the handling of the request, or None

required

Returns:

Type Description
FederatedRequest

The object for handling the communications for this request

Source code in fedbiomed/researcher/requests/_requests.py
def send(
        self,
        message: Union[Message, MessagesByNode],
        nodes: Optional[List[str]] = None,
        policies: List[RequestPolicy] = None
) -> FederatedRequest:
    """Sends federated request to given nodes with given message

    Args:
        message: either a common `Message` to send to nodes or a dict of distinct message per node
            indexed by the node ID
        nodes: list of nodes that are sent the message. If None, send the message to all known active nodes.
        policy: list of policies for controlling the handling of the request, or None

    Returns:
        The object for handling the communications for this request
    """

    if nodes is not None:
        nodes = [self._grpc_server.get_node(node) for node in nodes]
    else:
        nodes = self._grpc_server.get_all_nodes()

    return FederatedRequest(message, nodes, policies)
start_messaging
start_messaging()

Start communications endpoint

Source code in fedbiomed/researcher/requests/_requests.py
def start_messaging(self) -> None:
    """Start communications endpoint
    """
    self._grpc_server.start()
training_plan_approve
training_plan_approve(training_plan, description='no description provided', nodes=None, policies=None)

Send a training plan and a ApprovalRequest message to node(s).

If a list of node id(s) is provided, the message will be individually sent to all nodes of the list. If the node id(s) list is None (default), the message is broadcast to all nodes.

Parameters:

Name Type Description Default
training_plan BaseTrainingPlan

the training plan class to send to the nodes for approval.

required
description str

Description of training plan approval request

'no description provided'
nodes Optional[List[str]]

list of nodes (specified by their UUID)

None

Returns:

Name Type Description
dict

a dictionary of pairs (node_id: status), where status indicates to the researcher

dict

that the training plan has been correctly downloaded on the node side.

Warning dict

status does not mean that the training plan is approved, only that it has been added

dict

to the "approval queue" on the node side.

Source code in fedbiomed/researcher/requests/_requests.py
def training_plan_approve(
        self,
        training_plan: BaseTrainingPlan,
        description: str = "no description provided",
        nodes: Optional[List[str]] = None,
        policies: Optional[List] = None
) -> dict:
    """Send a training plan and a ApprovalRequest message to node(s).

    If a list of node id(s) is provided, the message will be individually sent
    to all nodes of the list.
    If the node id(s) list is None (default), the message is broadcast to all nodes.

    Args:
        training_plan: the training plan class to send to the nodes for approval.
        description: Description of training plan approval request
        nodes: list of nodes (specified by their UUID)

    Returns:
        a dictionary of pairs (node_id: status), where status indicates to the researcher
        that the training plan has been correctly downloaded on the node side.
        Warning: status does not mean that the training plan is approved, only that it has been added
        to the "approval queue" on the node side.
    """

    training_plan_instance = training_plan
    training_plan_module = 'model_' + str(uuid.uuid4())
    with tempfile.TemporaryDirectory() as tmp_dir:
        training_plan_file = os.path.join(tmp_dir, training_plan_module + '.py')
        try:
            training_plan_instance.save_code(training_plan_file)
        except Exception as e:
            logger.error(f"Cannot save the training plan to a local tmp dir : {e}")
            return {}

        try:
            _, training_plan_instance = import_class_object_from_file(
                training_plan_file, training_plan.__class__.__name__)
            tp_source = training_plan_instance.source()
        except Exception as e:
            logger.error(f"Cannot instantiate the training plan: {e}")
            return {}

    try:
        minify(tp_source,
               remove_annotations=False,
               combine_imports=False,
               remove_pass=False,
               hoist_literals=False,
               remove_object_base=True,
               rename_locals=False)
    except Exception as e:
        # minify does not provide any specific exception
        logger.error(f"This file is not a python file ({e})")
        return {}
    print(tp_source)
    # send message to node(s)
    message = ApprovalRequest(
        researcher_id=self._researcher_id,
        description=str(description),
        training_plan=tp_source)

    with self.send(message, nodes, policies=policies) as federated_req:
        errors = federated_req.errors()
        replies = federated_req.replies()
        results = {req.node.id: False for req in federated_req.requests}

        # TODO: Loop over errors and replies
        for node_id, error in errors.items():
            logger.info(f"Node ({node_id}) has returned error {error.errnum}, {error.extra_msg}")

    return {id: rep.get_dict() for id, rep in replies.items()}

StopOnDisconnect

StopOnDisconnect(timeout=5, nodes=None)

Bases: _ReplyTimeoutPolicy

Stops collecting results if a node disconnects

Parameters:

Name Type Description Default
timeout int

maximum time for policy

5
nodes Optional[List[str]]

optional list of nodes to apply the policy. By default applies to all known nodes of request.

None
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, timeout: int = 5, nodes: Optional[List[str]] = None) -> None:
    """Implements timeout attributes

    Args:
        timeout: maximum time for policy
        nodes: optional list of nodes to apply the policy. By default applies to all known nodes of request.
    """
    super().__init__(nodes)
    self.timeout = timeout
    self._time = None

Functions

continue_
continue_(requests)

Continues federated request if nodes are not disconnect

Returns:

Type Description
PolicyStatus

CONTINUE if no node disconnect found, STOPPED if some node disconnect found and timeout is reached

Source code in fedbiomed/researcher/requests/_policies.py
def continue_(self, requests: TRequest) -> PolicyStatus:
    """Continues federated request if nodes are not disconnect

    Returns:
        CONTINUE if no node disconnect found, STOPPED if some node disconnect found
            and timeout is reached
    """

    if self._nodes:
        requests = [req for req in requests if req.node.id in self._nodes]

    for req in requests:
        is_timeout = self.is_timeout()
        if req.status == RequestStatus.DISCONNECT and is_timeout:
            return self.stop(req)

    return PolicyStatus.CONTINUE

StopOnError

StopOnError(nodes=None)

Bases: RequestPolicy

Stops collecting results if a node returns an error

Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, nodes: Optional[List[str]] = None):
    self.status = None
    self._nodes = nodes
    self.stop_caused_by = None

Functions

continue_
continue_(requests)

Continues federated request if nodes does not return error

Returns:

Type Description
PolicyStatus

CONTINUE if no error found, STOPPED if some error found

Source code in fedbiomed/researcher/requests/_policies.py
def continue_(self, requests: TRequest) -> PolicyStatus:
    """Continues federated request if nodes does not return error

    Returns:
        CONTINUE if no error found, STOPPED if some error found
    """

    if self._nodes:
        requests = [req for req in requests if req.node.id in self._nodes]

    for req in requests:
        if req.error:
            return self.stop(req)

    return PolicyStatus.CONTINUE

StopOnTimeout

StopOnTimeout(timeout=5, nodes=None)

Bases: _ReplyTimeoutPolicy

Stops the request if nodes do not answer in given timeout

Parameters:

Name Type Description Default
timeout int

maximum time for policy

5
nodes Optional[List[str]]

optional list of nodes to apply the policy. By default applies to all known nodes of request.

None
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, timeout: int = 5, nodes: Optional[List[str]] = None) -> None:
    """Implements timeout attributes

    Args:
        timeout: maximum time for policy
        nodes: optional list of nodes to apply the policy. By default applies to all known nodes of request.
    """
    super().__init__(nodes)
    self.timeout = timeout
    self._time = None

Functions

continue_
continue_(requests)

Continues federated request if nodes dont reach timeout

Returns:

Type Description
PolicyStatus

CONTINUE if no node reached timeout, STOPPED if some node reached timeout and timeout is reached

Source code in fedbiomed/researcher/requests/_policies.py
def continue_(self, requests) -> PolicyStatus:
    """Continues federated request if nodes dont reach timeout

    Returns:
        CONTINUE if no node reached timeout, STOPPED if some node reached timeout
            and timeout is reached
    """
    return self.apply(requests, True)