Attributes
GRPC_SERVER_SETUP_TIMEOUT module-attribute
GRPC_SERVER_SETUP_TIMEOUT = GRPC_CLIENT_CONN_RETRY_TIMEOUT + server_setup_timeout
MAX_GRPC_SERVER_SETUP_TIMEOUT module-attribute
MAX_GRPC_SERVER_SETUP_TIMEOUT = 20 * server_setup_timeout
MAX_SEND_DURATION module-attribute
MAX_SEND_DURATION = 300
server_setup_timeout module-attribute
server_setup_timeout = int(getenv('GRPC_SERVER_SETUP_TIMEOUT', 1))
Classes
GrpcServer
GrpcServer(host, port, on_message, ssl, debug=False)
Bases: _GrpcAsyncServer
Grpc server implementation to be used by threads
This class extends async implementation of gRPC server to be able to call async methods from different thread. Currently, it is used by [fedbiomed.researcher.requests.Requests][Requests
] class that is instantiated in the main thread
Attributes:
Name | Type | Description |
---|---|---|
_thread | Optional[Thread] | background thread of gRPC server |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host | str | server DNS name or IP address | required |
port | str | server TCP port | required |
on_message | Callable | Callback function to execute once a message received from the nodes | required |
ssl | SSLCredentials | Ssl credentials. | required |
debug | bool | Activate debug mode for gRPC asyncio | False |
Source code in fedbiomed/transport/server.py
def __init__(
self,
host: str,
port: str,
on_message: Callable,
ssl: SSLCredentials,
debug: bool = False,
) -> None:
"""Class constructor
Args:
host: server DNS name or IP address
port: server TCP port
on_message: Callback function to execute once a message received from the nodes
ssl: Ssl credentials.
debug: Activate debug mode for gRPC asyncio
"""
# inform all threads whether server is started
self._is_started = threading.Event()
self._ssl = ssl
self._host = host
self._port = port
self._server = None
self._debug = debug
self._on_message = on_message
self._loop = None
self._agent_store : Optional[AgentStore] = None
Functions
broadcast
broadcast(message)
Broadcast message to all known and reachable nodes
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | Message | Message to broadcast | required |
Raises:
Type | Description |
---|---|
FedbiomedCommunicationError | bad argument type |
FedbiomedCommunicationError | server is not started |
Source code in fedbiomed/transport/server.py
def broadcast(self, message: Message) -> None:
"""Broadcast message to all known and reachable nodes
Args:
message: Message to broadcast
Raises:
FedbiomedCommunicationError: bad argument type
FedbiomedCommunicationError: server is not started
"""
if not isinstance(message, Message):
raise FedbiomedCommunicationError(
f"{ErrorNumbers.FB628}: bad argument type for message, expected `Message`, got `{type(message)}`")
if not self._is_started.is_set():
raise FedbiomedCommunicationError(
f"{ErrorNumbers.FB628}: Can not broadcast given message. "
"Communication client is not initialized.")
self._run_threadsafe(super().broadcast(message))
get_all_nodes
get_all_nodes()
Returns all known nodes
Returns:
Type | Description |
---|---|
List[NodeAgent] | A list of node agents |
Raises:
Type | Description |
---|---|
FedbiomedCommunicationError | server is not started |
Source code in fedbiomed/transport/server.py
def get_all_nodes(self) -> List[NodeAgent]:
"""Returns all known nodes
Returns:
A list of node agents
Raises:
FedbiomedCommunicationError: server is not started
"""
if not self._is_started.is_set():
raise FedbiomedCommunicationError(
f"{ErrorNumbers.FB628}: Error while getting all nodes "
"connected: Communication client is not initialized.")
return self._run_threadsafe(super().get_all_nodes())
get_node
get_node(node_id)
Returns given node
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_id | ID of node to retrieve | required |
Returns:
Type | Description |
---|---|
Optional[NodeAgent] | A node agent |
Raises:
Type | Description |
---|---|
FedbiomedCommunicationError | server is not started |
Source code in fedbiomed/transport/server.py
def get_node(self, node_id) -> Optional[NodeAgent]:
"""Returns given node
Args:
node_id: ID of node to retrieve
Returns:
A node agent
Raises:
FedbiomedCommunicationError: server is not started
"""
if not self._is_started.is_set():
raise FedbiomedCommunicationError(
f"{ErrorNumbers.FB628}: Error while getting node '{node_id}':"
"Communication client is not initialized.")
return self._run_threadsafe(super().get_node(node_id))
is_alive
is_alive()
Checks if the thread running gRPC server still alive
Returns:
Type | Description |
---|---|
bool | gRPC server running status |
Raises:
Type | Description |
---|---|
FedbiomedCommunicationError | server is not started |
Source code in fedbiomed/transport/server.py
def is_alive(self) -> bool:
"""Checks if the thread running gRPC server still alive
Returns:
gRPC server running status
Raises:
FedbiomedCommunicationError: server is not started
"""
if not self._is_started.is_set():
raise FedbiomedCommunicationError(
f"{ErrorNumbers.FB628}: Can not check if thread is alive."
"Communication client is not initialized.")
# TODO: more tests about gRPC server and task status ?
return False if not isinstance(self._thread, threading.Thread) else self._thread.is_alive()
send
send(message, node_id)
Send message to a specific node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | Message | Message to send | required |
node_id | str | Destination node unique ID | required |
Raises:
Type | Description |
---|---|
FedbiomedCommunicationError | bad argument type |
FedbiomedCommunicationError | server is not started |
Source code in fedbiomed/transport/server.py
def send(self, message: Message, node_id: str) -> None:
"""Send message to a specific node.
Args:
message: Message to send
node_id: Destination node unique ID
Raises:
FedbiomedCommunicationError: bad argument type
FedbiomedCommunicationError: server is not started
"""
if not isinstance(message, Message):
raise FedbiomedCommunicationError(
f"{ErrorNumbers.FB628}: bad argument type for message, expected `Message`, got `{type(message)}`")
if not self._is_started.is_set():
raise FedbiomedCommunicationError(
f"{ErrorNumbers.FB628.value}: Can not send message. "
"Communication client is not initialized.")
self._run_threadsafe(super().send(message, node_id))
start
start()
Starts async GrpcServer
Source code in fedbiomed/transport/server.py
def start(self) -> None:
"""Starts async GrpcServer """
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
# FIXME: This implementation assumes that nodes will be able connect and server complete setup with this delay
logger.info("Starting researcher service...")
logger.info(f'Waiting {GRPC_SERVER_SETUP_TIMEOUT}s for nodes to connect...')
time.sleep(GRPC_SERVER_SETUP_TIMEOUT)
sleep_ = 0
while len(self.get_all_nodes()) == 0:
if sleep_ == 0:
logger.info(f"No nodes found, server will wait "
f"{MAX_GRPC_SERVER_SETUP_TIMEOUT - GRPC_SERVER_SETUP_TIMEOUT} "
"more seconds until a node creates connection.")
if sleep_ > MAX_GRPC_SERVER_SETUP_TIMEOUT - GRPC_SERVER_SETUP_TIMEOUT:
if len(self.get_all_nodes()) == 0:
logger.warning("Server has not received connection from any remote nodes in "
f"MAX_GRPC_SERVER_SETUP_TIMEOUT: {MAX_GRPC_SERVER_SETUP_TIMEOUT} "
"This may effect the request created right after the server initialization. "
"However, server will keep running in the background so you can retry the "
"operations for sending requests to remote nodes until one receives.")
break
time.sleep(1)
sleep_ += 1
ResearcherServicer
ResearcherServicer(agent_store, on_message)
Bases: ResearcherServiceServicer
RPC Servicer
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent_store | AgentStore | The class that stores node agents | required |
on_message | Callable | Callback function to execute once a message received from the nodes | required |
Source code in fedbiomed/transport/server.py
def __init__(
self,
agent_store: AgentStore,
on_message: Callable
) -> None:
"""Constructor of gRPC researcher servicer
Args:
agent_store: The class that stores node agents
on_message: Callback function to execute once a message received from the nodes
"""
super().__init__()
self._agent_store = agent_store
self._on_message = on_message
Functions
Feedback async
Feedback(request, unused_context)
Executed for Feedback request received from the nodes
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request | Message | Feedback message | required |
unused_context | ServicerContext | Request service context | required |
Source code in fedbiomed/transport/server.py
async def Feedback(
self,
request: ProtoBufMessage,
unused_context: grpc.aio.ServicerContext
) -> None:
"""Executed for Feedback request received from the nodes
Args:
request: Feedback message
unused_context: Request service context
"""
# Get the type of Feedback | log or scalar
one_of = request.WhichOneof("feedback_type")
feedback = FeedbackMessage.from_proto(request)
# Execute on_message assigned by the researcher.requests modules
self._on_message(feedback.get_param(one_of), MessageType.convert(one_of))
return Empty()
GetTaskUnary async
GetTaskUnary(request, context)
Gets unary RPC request and return stream of response
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request | Message | RPC request | required |
context | ServicerContext | RPC peer context | required |
Source code in fedbiomed/transport/server.py
async def GetTaskUnary(
self,
request: ProtoBufMessage,
context: grpc.aio.ServicerContext
) -> None:
"""Gets unary RPC request and return stream of response
Args:
request: RPC request
context: RPC peer context
"""
task_request = TaskRequest.from_proto(request).get_dict()
logger.debug(f"Node: {task_request.get('node')} polling for the tasks")
node_agent = await self._agent_store.retrieve(node_id=task_request["node"])
# Update node active status as active
await node_agent.set_active()
task = None
try:
while True:
task, retry_count, first_send_time = await node_agent.get_task()
# Choice: mark task as de-queued as soon only if really sent
node_agent.task_done()
# discard if message too old
if first_send_time + MAX_SEND_DURATION > time.time():
break
else:
task = None
logger.warning(f"Message to send is older than {MAX_SEND_DURATION} seconds. Discard message.")
task_bytes = Serializer.dumps(task.to_dict())
chunk_range = range(0, len(task_bytes), MAX_MESSAGE_BYTES_LENGTH)
for start, iter_ in zip(chunk_range, range(1, len(chunk_range) + 1)):
stop = start + MAX_MESSAGE_BYTES_LENGTH
try:
yield TaskResponse(
size=len(chunk_range),
iteration=iter_,
bytes_=task_bytes[start:stop]
).to_proto()
except GeneratorExit:
# schedule resend if task sending could not be completed
# => retry send as long as (1) send not successful
# (2) max retries not reached
# => else discard message
#
# Note: if node is disconnected then back online, message is retried after reconnection.
# This is not fully coherent with upper layers (Requests) that may trigger an application
# level failure in the while, but it is mitigated by the MAX_SEND_DURATION
if retry_count < MAX_SEND_RETRIES:
await node_agent.send_async(
message=task, on_reply=None, retry_count=retry_count + 1, first_send_time=first_send_time
)
else:
logger.warning(f"Message cannot be sent after {MAX_SEND_RETRIES} retries. Discard message.")
await node_agent.change_node_status_after_task()
# need return here to avoid RuntimeError
return
except asyncio.CancelledError:
if task is not None and retry_count is not None and first_send_time is not None:
# schedule resend if task was pulled from queue
if retry_count < MAX_SEND_RETRIES:
await node_agent.send_async(
message=task, on_reply=None, retry_count=retry_count + 1, first_send_time=first_send_time
)
else:
logger.warning(f"Message cannot be sent after {MAX_SEND_RETRIES} retries. Discard message.")
finally:
await node_agent.change_node_status_after_task()
ReplyTask async
ReplyTask(request_iterator, unused_context)
Gets stream replies from the nodes
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request_iterator | Iterable[Message] | Iterator for streaming | required |
unused_context | ServicerContext | Request service context | required |
Source code in fedbiomed/transport/server.py
async def ReplyTask(
self,
request_iterator: Iterable[ProtoBufMessage],
unused_context: grpc.aio.ServicerContext
) -> None:
"""Gets stream replies from the nodes
Args:
request_iterator: Iterator for streaming
unused_context: Request service context
"""
reply = bytes()
async for answer in request_iterator:
reply += answer.bytes_
if answer.size != answer.iteration:
continue
# Deserialize message
message = Serializer.loads(reply)
# Replies are handled by node agent callbacks
node = await self._agent_store.get(message["node_id"])
await node.on_reply(message)
reply = bytes()
return Empty()
SSLCredentials
SSLCredentials(key, cert)
Contains credentials for SSL certifcate of the gRPC server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key | str | path to private key | required |
cert | str | path to certificate | required |
Source code in fedbiomed/transport/server.py
def __init__(self, key: str, cert: str):
"""Reads private key and cert file
Args:
key: path to private key
cert: path to certificate
"""
with open(key, 'rb') as f:
self.private_key = f.read()
with open(cert, 'rb') as f:
self.certificate = f.read()
Attributes
certificate instance-attribute
certificate = read()
private_key instance-attribute
private_key = read()