Classes
DiscardOnTimeout
DiscardOnTimeout(timeout=5, nodes=None)
Bases: _ReplyTimeoutPolicy
Discards request that do not answer in given timeout
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout | int | maximum time for policy | 5 |
nodes | Optional[List[str]] | optional list of nodes to apply the policy. By default applies to all known nodes of request. | None |
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, timeout: int = 5, nodes: Optional[List[str]] = None) -> None:
"""Implements timeout attributes
Args:
timeout: maximum time for policy
nodes: optional list of nodes to apply the policy. By default applies to all known nodes of request.
"""
super().__init__(nodes)
self.timeout = timeout
self._time = None
Functions
continue_
continue_(requests)
Discards requests that reach timeout, always continue
Returns:
Type | Description |
---|---|
PolicyStatus | CONTINUE |
Source code in fedbiomed/researcher/requests/_policies.py
def continue_(self, requests: TRequest) -> PolicyStatus:
"""Discards requests that reach timeout, always continue
Returns:
CONTINUE
"""
return self.apply(requests, False)
FederatedRequest
FederatedRequest(message, nodes, policy=None)
Dispatches federated requests
This class has been design to be send a request and wait until a response is received
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | Union[Message, MessagesByNode] | either a common | required |
nodes | List[NodeAgent] | list of nodes that are sent the message | required |
policy | Optional[List[RequestPolicy]] | list of policies for controlling the handling of the request | None |
Source code in fedbiomed/researcher/requests/_requests.py
def __init__(
self,
message: Union[Message, MessagesByNode],
nodes: List[NodeAgent],
policy: Optional[List[RequestPolicy]] = None
):
"""Constructor of the class.
Args:
message: either a common `Message` to send to nodes or a dict of distinct message per node
indexed by the node ID
nodes: list of nodes that are sent the message
policy: list of policies for controlling the handling of the request
"""
self._message = message
self._nodes = nodes
self._requests = []
self._request_id = REQUEST_PREFIX + str(uuid.uuid4())
self._nodes_status = {}
self._pending_replies = threading.Semaphore(value=0)
# Set-up policies
self._policy = PolicyController(policy)
# Set up single requests
if isinstance(self._message, Message):
for node in self._nodes:
self._requests.append(
Request(self._message, node, self._pending_replies, self._request_id)
)
# Different message for each node
elif isinstance(self._message, MessagesByNode):
for node in self._nodes:
if m := self._message.get(node.id):
self._requests.append(
Request(m, node, self._pending_replies, self._request_id)
)
else:
logger.warning(f"Node {node.id} is unknown. Send message to others, ignore this one.")
Attributes
policy property
policy
Returns policy controller
requests property
requests
Returns requests
Functions
disconnected_requests
disconnected_requests()
Returns the requests to disconnected nodes
Returns:
Type | Description |
---|---|
List[Message] | A list of request |
Source code in fedbiomed/researcher/requests/_requests.py
def disconnected_requests(self) -> List[Message]:
"""Returns the requests to disconnected nodes
Returns:
A list of request `Message` sent to disconnected nodes
"""
return [req for req in self._requests if req.status == RequestStatus.DISCONNECT]
errors
errors()
Returns errors of each request
Returns:
Type | Description |
---|---|
Dict[str, ErrorMessage] | A dict of error |
Source code in fedbiomed/researcher/requests/_requests.py
def errors(self) -> Dict[str, ErrorMessage]:
"""Returns errors of each request
Returns:
A dict of error `Message` received for this request, indexed by node ID
"""
return {req.node.id: req.error for req in self._requests if req.error}
replies
replies()
Returns replies of each request
Returns:
Type | Description |
---|---|
Dict[str, Message] | A dict of replies |
Source code in fedbiomed/researcher/requests/_requests.py
def replies(self) -> Dict[str, Message]:
"""Returns replies of each request
Returns:
A dict of replies `Message` received for this request, indexed by node ID
"""
return {req.node.id: req.reply for req in self._requests if req.reply}
send
send()
Sends federated request
Source code in fedbiomed/researcher/requests/_requests.py
def send(self) -> None:
"""Sends federated request"""
for req in self._requests:
req.send()
wait
wait()
Waits for the replies of the messages that are sent
Source code in fedbiomed/researcher/requests/_requests.py
def wait(self) -> None:
"""Waits for the replies of the messages that are sent"""
while self._policy.continue_all(self._requests) == PolicyStatus.CONTINUE:
self._pending_replies.acquire(timeout=REQUEST_STATUS_CHECK_TIMEOUT)
MessagesByNode
Bases: dict
Type to defined messages by node
PolicyController
PolicyController(policies=None)
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(
self,
policies: Optional[List[RequestPolicy]] = None,
):
policies = policies or []
policies.insert(0, RequestPolicy())
self._policies = policies
Functions
continue_all
continue_all(requests)
Checks if all policies indicate to continue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
requests | List[TRequest] | List of Request objects to check against policies | required |
Returns:
Type | Description |
---|---|
PolicyStatus | CONTINUE if all policies indicates to continue |
Source code in fedbiomed/researcher/requests/_policies.py
def continue_all(self, requests: List[TRequest]) -> PolicyStatus:
"""Checks if all policies indicate to continue.
Args:
requests: List of [Request][fedbiomed.researcher.requests.Request] objects to
check against policies
Returns:
CONTINUE if all policies indicates to continue
"""
if not requests:
return False
status = all(
[policy.continue_(requests=requests) == PolicyStatus.CONTINUE
for policy in self._policies]
)
return PolicyStatus.CONTINUE if status else PolicyStatus.COMPLETED
has_stopped_any
has_stopped_any()
Checks if any of the policies indicates to stop
Returns:
Type | Description |
---|---|
bool | True if request has stopped due to given strategy |
Source code in fedbiomed/researcher/requests/_policies.py
def has_stopped_any(self) -> bool:
"""Checks if any of the policies indicates to stop
Returns:
True if request has stopped due to given strategy
"""
is_stopped = any(
[policy.status == PolicyStatus.STOPPED for policy in self._policies]
)
return is_stopped
report
report()
Reports strategy stop status
Returns:
Type | Description |
---|---|
Dict[str, str] | Dict of policies stopped, indexed by the node ID that caused the stop |
Source code in fedbiomed/researcher/requests/_policies.py
def report(self) -> Dict[str, str]:
"""Reports strategy stop status
Returns:
Dict of policies stopped, indexed by the node ID that caused the stop
"""
report = {}
for st in self._policies:
if st.status == PolicyStatus.STOPPED:
report.update({st.stop_caused_by.node.id : st.__class__.__name__})
return report
PolicyStatus
Attributes
COMPLETED class-attribute
instance-attribute
COMPLETED = 'COMPLETED'
CONTINUE class-attribute
instance-attribute
CONTINUE = 'CONTINUE'
STOPPED class-attribute
instance-attribute
STOPPED = 'STOPPED'
Request
Request(message, node, sem_pending, request_id=None)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | Message | Message to send to the node | required |
node | NodeAgent | Node agent | required |
request_id | Optional[str] | unique ID of request | None |
sem_pending | Semaphore | semaphore for signaling new pending reply | required |
Source code in fedbiomed/researcher/requests/_requests.py
def __init__(
self,
message: Message,
node: NodeAgent,
sem_pending: threading.Semaphore,
request_id: Optional[str] = None,
) -> None:
"""Single request for node
Args:
message: Message to send to the node
node: Node agent
request_id: unique ID of request
sem_pending: semaphore for signaling new pending reply
"""
self._request_id = request_id if request_id else str(uuid.uuid4())
self._node = node
self._message = message
self._sem_pending = sem_pending
self.reply = None
self.error = None
self.status = None
Attributes
error instance-attribute
error = None
node property
node
Returns node agent
reply instance-attribute
reply = None
status instance-attribute
status = None
Functions
flush
flush(stopped)
Flushes the reply that has been processed
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stopped | bool | True if the request was stopped before completion | required |
Source code in fedbiomed/researcher/requests/_requests.py
def flush(self, stopped: bool) -> None:
"""Flushes the reply that has been processed
Args:
stopped: True if the request was stopped before completion
"""
self._node.flush(self._request_id, stopped)
has_finished
has_finished()
Queries if the request has finished.
Also tracks if node has disconnected.
Source code in fedbiomed/researcher/requests/_requests.py
def has_finished(self) -> bool:
"""Queries if the request has finished.
Also tracks if node has disconnected.
Args:
True if a reply was received from node
"""
if self._node.status == NodeActiveStatus.DISCONNECTED:
self.status = RequestStatus.DISCONNECT
return True # Don't expect any reply
return True if self.reply or self.error else False
on_reply
on_reply(reply)
Callback for node agent to execute once it replies.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reply | Message | reply message received from node | required |
Source code in fedbiomed/researcher/requests/_requests.py
def on_reply(self, reply: Message) -> None:
"""Callback for node agent to execute once it replies.
Args:
reply: reply message received from node
"""
if isinstance(reply, ErrorMessage):
self.error = reply
self.status = RequestStatus.ERROR
else:
self.reply = reply
self.status = RequestStatus.SUCCESS
self._sem_pending.release()
send
send()
Sends the request
Source code in fedbiomed/researcher/requests/_requests.py
def send(self) -> None:
"""Sends the request"""
self._message.request_id = self._request_id
self._node.send(self._message, self.on_reply)
self.status = RequestStatus.NO_REPLY_YET
RequestPolicy
RequestPolicy(nodes=None)
Base strategy to collect replies from remote agents
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, nodes: Optional[List[str]] = None):
self.status = None
self._nodes = nodes
self.stop_caused_by = None
Attributes
status instance-attribute
status = None
stop_caused_by instance-attribute
stop_caused_by = None
Functions
completed
completed()
Updates status of strategy as completed without any issue
Source code in fedbiomed/researcher/requests/_policies.py
def completed(self) -> PolicyStatus:
"""Updates status of strategy as completed without any issue"""
self.status = PolicyStatus.COMPLETED
return PolicyStatus.COMPLETED
continue_
continue_(requests)
Default strategy stops collecting result once all nodes has answered
Returns:
Type | Description |
---|---|
PolicyStatus | False stops the iteration |
Source code in fedbiomed/researcher/requests/_policies.py
def continue_(self, requests) -> PolicyStatus:
"""Default strategy stops collecting result once all nodes has answered
Returns:
False stops the iteration
"""
has_finished = all([req.has_finished() for req in requests])
return self.keep() if not has_finished else self.completed()
keep
keep()
Keeps continue collecting replies from nodes
Source code in fedbiomed/researcher/requests/_policies.py
def keep(self) -> PolicyStatus:
"""Keeps continue collecting replies from nodes"""
self.status = PolicyStatus.CONTINUE
return PolicyStatus.CONTINUE
stop
stop(req)
Stop sign for strategy
Source code in fedbiomed/researcher/requests/_policies.py
def stop(self, req) -> PolicyStatus:
"""Stop sign for strategy"""
self.status = PolicyStatus.STOPPED
self.stop_caused_by = req
return PolicyStatus.STOPPED
RequestStatus
Attributes
DISCONNECT class-attribute
instance-attribute
DISCONNECT = 'DISCONNECT'
ERROR class-attribute
instance-attribute
ERROR = 'ERROR'
NO_REPLY_YET class-attribute
instance-attribute
NO_REPLY_YET = 'NO_REPLY_YET'
SUCCESS class-attribute
instance-attribute
SUCCESS = 'SUCCESS'
TIMEOUT class-attribute
instance-attribute
TIMEOUT = 'TIMEOUT'
Requests
Requests()
Manages communication between researcher and nodes.
Source code in fedbiomed/researcher/requests/_requests.py
def __init__(self):
"""Constructor of the class
"""
self._monitor_message_callback = None
# Creates grpc server and starts it
self._grpc_server = GrpcServer(
host=environ["SERVER_HOST"],
port=environ["SERVER_PORT"],
on_message=self.on_message,
ssl=SSLCredentials(
key=environ['FBM_CERTIFICATE_KEY'],
cert=environ['FBM_CERTIFICATE_PEM'])
)
self.start_messaging()
Functions
add_monitor_callback
add_monitor_callback(callback)
Adds callback function for monitor messages
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback | Callable[[Dict], None] | Callback function for handling monitor messages that come due 'general/monitoring' channel | required |
Source code in fedbiomed/researcher/requests/_requests.py
def add_monitor_callback(self, callback: Callable[[Dict], None]):
""" Adds callback function for monitor messages
Args:
callback: Callback function for handling monitor messages that come due 'general/monitoring' channel
"""
self._monitor_message_callback = callback
list
list(nodes=None, verbose=False)
Lists available data in each node
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nodes | Optional[list] | optionally filter nodes with this list. Default is None, no filtering, consider all nodes | None |
verbose | bool | If it is true it prints datasets in readable format | False |
Returns:
Type | Description |
---|---|
dict | A dict with node_id as keys, and list of dicts describing available data as values |
Source code in fedbiomed/researcher/requests/_requests.py
def list(self, nodes: Optional[list] = None, verbose: bool = False) -> dict:
"""Lists available data in each node
Args:
nodes: optionally filter nodes with this list. Default is None, no filtering, consider all nodes
verbose: If it is true it prints datasets in readable format
Returns:
A dict with node_id as keys, and list of dicts describing available data as values
"""
message = ListRequest(researcher_id=environ['RESEARCHER_ID'])
data_found = {}
with self.send(message, nodes, policies=[DiscardOnTimeout(5)]) as federated_req:
for node_id, reply in federated_req.replies().items():
data_found[node_id] = reply.databases
if verbose:
for node in data_found:
if len(data_found[node]) > 0:
rows = [row.values() for row in data_found[node]]
headers = data_found[node][0].keys()
info = '\n Node: {} | Number of Datasets: {} \n'.format(node, len(data_found[node]))
logger.info(info + tabulate.tabulate(rows, headers, tablefmt="grid") + '\n')
else:
logger.info('\n Node: {} | Number of Datasets: {}'.format(node, len(data_found[node])) +
" No data has been set up for this node.")
return data_found
on_message
on_message(msg, type_)
Handles arbitrary messages received from the remote agents
This callback is only used for feedback messages from nodes (logs, experiment monitor), not for node replies to requests.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg | Union[Dict[str, Any], Message] | de-serialized msg | required |
type_ | MessageType | Reply type one of reply, log, scalar | required |
Source code in fedbiomed/researcher/requests/_requests.py
def on_message(self, msg: Union[Dict[str, Any], Message], type_: MessageType) -> None:
"""Handles arbitrary messages received from the remote agents
This callback is only used for feedback messages from nodes (logs, experiment
monitor), not for node replies to requests.
Args:
msg: de-serialized msg
type_: Reply type one of reply, log, scalar
"""
if type_ == MessageType.LOG:
# forward the treatment to node_log_handling() (same thread)
self.print_node_log_message(msg.get_dict())
elif type_ == MessageType.SCALAR:
if self._monitor_message_callback is not None:
# Pass message to Monitor's on message handler
self._monitor_message_callback(msg.get_dict())
else:
logger.error(f"Undefined message type received {type_} - IGNORING")
ping_nodes
ping_nodes()
Pings online nodes
Returns:
Type | Description |
---|---|
list | List of ID of up and running nodes |
Source code in fedbiomed/researcher/requests/_requests.py
def ping_nodes(self) -> list:
""" Pings online nodes
Returns:
List of ID of up and running nodes
"""
ping = PingRequest(researcher_id=environ["ID"])
with self.send(ping, policies=[DiscardOnTimeout(5)]) as federated_req:
nodes_online = [node_id for node_id, reply in federated_req.replies().items()]
return nodes_online
print_node_log_message staticmethod
print_node_log_message(log)
Prints logger messages coming from the node
It is run on the communication process and must be as quick as possible: - all logs (coming from the nodes) are forwarded to the researcher logger (immediate display on console/file/whatever)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log | Dict[str, Any] | log message and its metadata | required |
Source code in fedbiomed/researcher/requests/_requests.py
@staticmethod
def print_node_log_message(log: Dict[str, Any]) -> None:
"""Prints logger messages coming from the node
It is run on the communication process and must be as quick as possible:
- all logs (coming from the nodes) are forwarded to the researcher logger
(immediate display on console/file/whatever)
Args:
log: log message and its metadata
"""
# log contains the original message sent by the node
# FIXME: we should use `fedbiomed.common.json.deserialize` method
# instead of the json method when extracting json message
original_msg = json.loads(log["msg"])
# Loging fancy feedback for training
logger.info("\033[1m{}\033[0m\n"
"\t\t\t\t\t\033[1m NODE\033[0m {}\n"
"\t\t\t\t\t\033[1m MESSAGE:\033[0m {}\033[0m\n"
"{}".format(log["level"],
log["node_id"],
original_msg["message"],
5 * "-------------"))
remove_monitor_callback
remove_monitor_callback()
Removes callback function for Monitor class.
Source code in fedbiomed/researcher/requests/_requests.py
def remove_monitor_callback(self):
""" Removes callback function for Monitor class. """
self._monitor_message_callback = None
search
search(tags, nodes=None)
Searches available data by tags
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tags | List[str] | List containing tags associated to the data researcher is looking for. | required |
nodes | Optional[list] | optionally filter nodes with this list. Default is None, no filtering, consider all nodes | None |
Returns:
Type | Description |
---|---|
dict | A dict with node_id as keys, and list of dicts describing available data as values |
Source code in fedbiomed/researcher/requests/_requests.py
def search(self, tags: List[str], nodes: Optional[list] = None) -> dict:
"""Searches available data by tags
Args:
tags: List containing tags associated to the data researcher is looking for.
nodes: optionally filter nodes with this list. Default is None, no filtering, consider all nodes
Returns:
A dict with node_id as keys, and list of dicts describing available data as values
"""
message = SearchRequest(
researcher_id=environ['RESEARCHER_ID'],
tags=tags,
)
data_found = {}
with self.send(message, nodes, policies=[DiscardOnTimeout(5)]) as federated_req:
for node_id, reply in federated_req.replies().items():
if reply.databases:
data_found[node_id] = reply.databases
logger.info('Node selected for training -> {}'.format(reply.node_id))
for node_id, error in federated_req.errors().items():
logger.warning(f"Node {node_id} has returned error from search request {error.extra_msg}")
if not data_found:
logger.info("No available dataset has found in nodes with tags: {}".format(tags))
return data_found
send
send(message, nodes=None, policies=None)
Sends federated request to given nodes with given message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | Union[Message, MessagesByNode] | either a common | required |
nodes | Optional[List[str]] | list of nodes that are sent the message. If None, send the message to all known active nodes. | None |
policy | list of policies for controlling the handling of the request, or None | required |
Returns:
Type | Description |
---|---|
FederatedRequest | The object for handling the communications for this request |
Source code in fedbiomed/researcher/requests/_requests.py
def send(
self,
message: Union[Message, MessagesByNode],
nodes: Optional[List[str]] = None,
policies: List[RequestPolicy] = None
) -> FederatedRequest:
"""Sends federated request to given nodes with given message
Args:
message: either a common `Message` to send to nodes or a dict of distinct message per node
indexed by the node ID
nodes: list of nodes that are sent the message. If None, send the message to all known active nodes.
policy: list of policies for controlling the handling of the request, or None
Returns:
The object for handling the communications for this request
"""
if nodes is not None:
nodes = [self._grpc_server.get_node(node) for node in nodes]
else:
nodes = self._grpc_server.get_all_nodes()
return FederatedRequest(message, nodes, policies)
start_messaging
start_messaging()
Start communications endpoint
Source code in fedbiomed/researcher/requests/_requests.py
def start_messaging(self) -> None:
"""Start communications endpoint
"""
self._grpc_server.start()
training_plan_approve
training_plan_approve(training_plan, description='no description provided', nodes=None, policies=None)
Send a training plan and a ApprovalRequest message to node(s).
If a list of node id(s) is provided, the message will be individually sent to all nodes of the list. If the node id(s) list is None (default), the message is broadcast to all nodes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
training_plan | BaseTrainingPlan | the training plan class to send to the nodes for approval. | required |
description | str | Description of training plan approval request | 'no description provided' |
nodes | Optional[List[str]] | list of nodes (specified by their UUID) | None |
Returns:
Name | Type | Description |
---|---|---|
dict | a dictionary of pairs (node_id: status), where status indicates to the researcher | |
dict | that the training plan has been correctly downloaded on the node side. | |
Warning | dict | status does not mean that the training plan is approved, only that it has been added |
dict | to the "approval queue" on the node side. |
Source code in fedbiomed/researcher/requests/_requests.py
def training_plan_approve(
self,
training_plan: BaseTrainingPlan,
description: str = "no description provided",
nodes: Optional[List[str]] = None,
policies: Optional[List] = None
) -> dict:
"""Send a training plan and a ApprovalRequest message to node(s).
If a list of node id(s) is provided, the message will be individually sent
to all nodes of the list.
If the node id(s) list is None (default), the message is broadcast to all nodes.
Args:
training_plan: the training plan class to send to the nodes for approval.
description: Description of training plan approval request
nodes: list of nodes (specified by their UUID)
Returns:
a dictionary of pairs (node_id: status), where status indicates to the researcher
that the training plan has been correctly downloaded on the node side.
Warning: status does not mean that the training plan is approved, only that it has been added
to the "approval queue" on the node side.
"""
training_plan_instance = training_plan
training_plan_module = 'model_' + str(uuid.uuid4())
with tempfile.TemporaryDirectory(dir=environ['TMP_DIR']) as tmp_dir:
training_plan_file = os.path.join(tmp_dir, training_plan_module + '.py')
try:
training_plan_instance.save_code(training_plan_file)
except Exception as e:
logger.error(f"Cannot save the training plan to a local tmp dir : {e}")
return {}
try:
_, training_plan_instance = import_class_object_from_file(
training_plan_file, training_plan.__class__.__name__)
tp_source = training_plan_instance.source()
except Exception as e:
logger.error(f"Cannot instantiate the training plan: {e}")
return {}
try:
minify(tp_source,
remove_annotations=False,
combine_imports=False,
remove_pass=False,
hoist_literals=False,
remove_object_base=True,
rename_locals=False)
except Exception as e:
# minify does not provide any specific exception
logger.error(f"This file is not a python file ({e})")
return {}
print(tp_source)
# send message to node(s)
message = ApprovalRequest(
researcher_id=environ['RESEARCHER_ID'],
description=str(description),
training_plan=tp_source)
with self.send(message, nodes, policies=policies) as federated_req:
errors = federated_req.errors()
replies = federated_req.replies()
results = {req.node.id: False for req in federated_req.requests}
# TODO: Loop over errors and replies
for node_id, error in errors.items():
logger.info(f"Node ({node_id}) has returned error {error.errnum}, {error.extra_msg}")
return {id: rep.get_dict() for id, rep in replies.items()}
StopOnDisconnect
StopOnDisconnect(timeout=5, nodes=None)
Bases: _ReplyTimeoutPolicy
Stops collecting results if a node disconnects
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout | int | maximum time for policy | 5 |
nodes | Optional[List[str]] | optional list of nodes to apply the policy. By default applies to all known nodes of request. | None |
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, timeout: int = 5, nodes: Optional[List[str]] = None) -> None:
"""Implements timeout attributes
Args:
timeout: maximum time for policy
nodes: optional list of nodes to apply the policy. By default applies to all known nodes of request.
"""
super().__init__(nodes)
self.timeout = timeout
self._time = None
Functions
continue_
continue_(requests)
Continues federated request if nodes are not disconnect
Returns:
Type | Description |
---|---|
PolicyStatus | CONTINUE if no node disconnect found, STOPPED if some node disconnect found and timeout is reached |
Source code in fedbiomed/researcher/requests/_policies.py
def continue_(self, requests: TRequest) -> PolicyStatus:
"""Continues federated request if nodes are not disconnect
Returns:
CONTINUE if no node disconnect found, STOPPED if some node disconnect found
and timeout is reached
"""
if self._nodes:
requests = [req for req in requests if req.node.id in self._nodes]
for req in requests:
is_timeout = self.is_timeout()
if req.status == RequestStatus.DISCONNECT and is_timeout:
return self.stop(req)
return PolicyStatus.CONTINUE
StopOnError
StopOnError(nodes=None)
Bases: RequestPolicy
Stops collecting results if a node returns an error
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, nodes: Optional[List[str]] = None):
self.status = None
self._nodes = nodes
self.stop_caused_by = None
Functions
continue_
continue_(requests)
Continues federated request if nodes does not return error
Returns:
Type | Description |
---|---|
PolicyStatus | CONTINUE if no error found, STOPPED if some error found |
Source code in fedbiomed/researcher/requests/_policies.py
def continue_(self, requests: TRequest) -> PolicyStatus:
"""Continues federated request if nodes does not return error
Returns:
CONTINUE if no error found, STOPPED if some error found
"""
if self._nodes:
requests = [req for req in requests if req.node.id in self._nodes]
for req in requests:
if req.error:
return self.stop(req)
return PolicyStatus.CONTINUE
StopOnTimeout
StopOnTimeout(timeout=5, nodes=None)
Bases: _ReplyTimeoutPolicy
Stops the request if nodes do not answer in given timeout
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout | int | maximum time for policy | 5 |
nodes | Optional[List[str]] | optional list of nodes to apply the policy. By default applies to all known nodes of request. | None |
Source code in fedbiomed/researcher/requests/_policies.py
def __init__(self, timeout: int = 5, nodes: Optional[List[str]] = None) -> None:
"""Implements timeout attributes
Args:
timeout: maximum time for policy
nodes: optional list of nodes to apply the policy. By default applies to all known nodes of request.
"""
super().__init__(nodes)
self.timeout = timeout
self._time = None
Functions
continue_
continue_(requests)
Continues federated request if nodes dont reach timeout
Returns:
Type | Description |
---|---|
PolicyStatus | CONTINUE if no node reached timeout, STOPPED if some node reached timeout and timeout is reached |
Source code in fedbiomed/researcher/requests/_policies.py
def continue_(self, requests) -> PolicyStatus:
"""Continues federated request if nodes dont reach timeout
Returns:
CONTINUE if no node reached timeout, STOPPED if some node reached timeout
and timeout is reached
"""
return self.apply(requests, True)