Attributes
GRPC_CLIENT_CONN_RETRY_TIMEOUT module-attribute
GRPC_CLIENT_CONN_RETRY_TIMEOUT = 2
GRPC_CLIENT_TASK_REQUEST_TIMEOUT module-attribute
GRPC_CLIENT_TASK_REQUEST_TIMEOUT = 3600
Classes
Channels
Channels(researcher)
Keeps gRPC server channels
Parameters:
Name | Type | Description | Default |
---|---|---|---|
researcher | ResearcherCredentials | An instance of ResearcherCredentials | required |
Source code in fedbiomed/transport/client.py
def __init__(self, researcher: ResearcherCredentials):
"""Create channels and stubs
Args:
researcher: An instance of ResearcherCredentials
"""
self._researcher = researcher
self._channels = {}
self._stubs = {}
self._stub_types = [
_StubType.LISTENER_TASK_STUB,
_StubType.SENDER_TASK_STUB,
_StubType.SENDER_FEEDBACK_STUB
]
for st in self._stub_types:
self._channels[st]: grpc.aio.Channel = None
self._stubs[st]: ResearcherServiceStub = None
# lock for accessing channels and stubs
self._channels_stubs_lock = asyncio.Lock()
Functions
connect async
connect(stub_type=_StubType.ANY_STUB)
Connects gRPC server and instatiates stubs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stub_type | _StubType | only (re)connect for matching stub type(s) | ANY_STUB |
Source code in fedbiomed/transport/client.py
async def connect(self, stub_type: _StubType = _StubType.ANY_STUB):
"""Connects gRPC server and instatiates stubs.
Args:
stub_type: only (re)connect for matching stub type(s)
"""
async with self._channels_stubs_lock:
# Closes if channels are open
for st, channel in self._channels.items():
if channel and (stub_type == _StubType.ANY_STUB or stub_type == st):
await channel.close()
# Creates channels
for st in self._channels.keys():
if stub_type == _StubType.ANY_STUB or stub_type == st:
self._channels[st] = self._create()
self._stubs[st] = ResearcherServiceStub(channel=self._channels[st])
stub async
stub(stub_type)
Get stub for a given stub type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stub_type | _StubType | the stub type to get | required |
Returns:
Type | Description |
---|---|
ResearcherServiceStub | the stub if it exists or None |
Source code in fedbiomed/transport/client.py
async def stub(self, stub_type: _StubType) -> ResearcherServiceStub:
"""Get stub for a given stub type.
Args:
stub_type: the stub type to get
Returns:
the stub if it exists or None
"""
if stub_type in self._stub_types:
async with self._channels_stubs_lock:
return self._stubs[stub_type]
else:
return None
ClientStatus
Bases: Enum
Attributes
CONNECTED class-attribute
instance-attribute
CONNECTED = 1
DISCONNECTED class-attribute
instance-attribute
DISCONNECTED = 0
FAILED class-attribute
instance-attribute
FAILED = 2
GrpcClient
GrpcClient(node_id, researcher, update_id_map)
An agent of remote researcher gRPC server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_id | str | unique ID of this node (connection client) | required |
researcher | ResearcherCredentials | the researcher to which the node connects (connection server) | required |
update_id_map | Awaitable | awaitable to call when updating the researcher ID, needs proper prototype | required |
Source code in fedbiomed/transport/client.py
def __init__(
self,
node_id: str,
researcher: ResearcherCredentials,
update_id_map: Awaitable
) -> None:
"""Class constructor
Args:
node_id: unique ID of this node (connection client)
researcher: the researcher to which the node connects (connection server)
update_id_map: awaitable to call when updating the researcher ID, needs proper prototype
"""
self._id = None
self._researcher = researcher
self._channels = Channels(researcher)
self._task_listener = TaskListener(
channels=self._channels,
node_id=node_id,
on_status_change = self._on_status_change,
update_id=self._update_id)
self._sender = Sender(
channels=self._channels,
on_status_change = self._on_status_change)
# TODO: use `self._status` for finer gRPC agent handling.
# Currently, the (tentative) status is maintained but not used
self._status = ClientStatus.DISCONNECTED
# lock for accessing self._status
self._status_lock = asyncio.Lock()
self._update_id_map = update_id_map
self._tasks = []
Functions
send async
send(message)
Sends messages from node to researcher server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | Message | message to send from node to server | required |
Source code in fedbiomed/transport/client.py
async def send(self, message: Message) -> None:
"""Sends messages from node to researcher server.
Args:
message: message to send from node to server
"""
await self._sender.send(message)
start
start(on_task)
Start researcher gRPC agent.
Starts long-lived tasks, one waiting for server requests, one waiting on the async queue for the replies from the node that are going to be sent back to researcher.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
on_task | Callback function to execute once a payload received from researcher. | required |
Returns:
Type | Description |
---|---|
List[Awaitable[Optional[Callable]]] | A list of task objects of the agent |
Source code in fedbiomed/transport/client.py
def start(self, on_task) -> List[Awaitable[Optional[Callable]]]:
"""Start researcher gRPC agent.
Starts long-lived tasks, one waiting for server requests, one waiting on the async queue
for the replies from the node that are going to be sent back to researcher.
Args:
on_task: Callback function to execute once a payload received from researcher.
Returns:
A list of task objects of the agent
"""
async def run():
"""Connects and dispatches the tasks"""
# First connects to channel
await self._connect()
# Launch listeners
await asyncio.gather(
self._task_listener.listen(on_task),
self._sender.listen()
)
# Returns client task
return asyncio.create_task(run())
Listener
Listener(channels)
Abstract generic listener method for a node's communications.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
channels | Channels | Keeps channels and stubs. | required |
Source code in fedbiomed/transport/client.py
def __init__(self, channels: Channels) -> None:
"""Constructs task listener channels
Args:
channels: Keeps channels and stubs.
"""
self._channels = channels
self._retry_on_error = False
Functions
listen
listen(callback=None)
Listens for tasks from given channels
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback | Optional[Callable] | Callback function to execute once a task is processed | None |
Returns:
Type | Description |
---|---|
Awaitable[Optional[Callable]] | Asyncio task to run task listener |
Source code in fedbiomed/transport/client.py
def listen(self, callback: Optional[Callable] = None) -> Awaitable[Optional[Callable]]:
"""Listens for tasks from given channels
Args:
callback: Callback function to execute once a task is processed
Returns:
Asyncio task to run task listener
"""
return asyncio.create_task(self._listen(callback))
ResearcherCredentials dataclass
ResearcherCredentials(port, host, certificate=None)
Attributes
certificate class-attribute
instance-attribute
certificate = None
host instance-attribute
host
port instance-attribute
port
Sender
Sender(channels, on_status_change)
Bases: Listener
Parameters:
Name | Type | Description | Default |
---|---|---|---|
channels | Channels | RPC channels and stubs to be used for polling tasks from researcher | required |
on_status_change | Awaitable | Callback awaitable to run for changing node agent status | required |
Source code in fedbiomed/transport/client.py
def __init__(
self,
channels: Channels,
on_status_change: Awaitable,
) -> None:
"""Class constructor.
Args:
channels: RPC channels and stubs to be used for polling tasks from researcher
on_status_change: Callback awaitable to run for changing node agent status
"""
super().__init__(channels)
self._queue = asyncio.Queue()
self._on_status_change = on_status_change
self._retry_count = 0
self._retry_item = None
self._stub_type = _StubType.NO_STUB
self._retry_on_error = True
Functions
send async
send(message)
Send a message to peer researcher.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message | Message | Message to send | required |
Source code in fedbiomed/transport/client.py
async def send(self, message: Message) -> None:
"""Send a message to peer researcher.
Args:
message: Message to send
"""
# Switch-case for message type and gRPC calls
match message.__class__.__name__:
case FeedbackMessage.__name__:
# Note: FeedbackMessage is designed as proto serializable message.
await self._queue.put({"stub": _StubType.SENDER_FEEDBACK_STUB,
"message": message})
case _:
await self._queue.put({"stub": _StubType.SENDER_TASK_STUB,
"message": message})
TaskListener
TaskListener(channels, node_id, on_status_change, update_id)
Bases: Listener
Listener for the task assigned by the researcher component
Parameters:
Name | Type | Description | Default |
---|---|---|---|
channels | Channels | RPC channels and stubs to be used for polling tasks from researcher | required |
node_id | str | unique ID for this node | required |
on_status_change | Awaitable | Callback awaitable to run for changing node agent status | required |
update_id | Awaitable | Callback function to run updating peer researcher ID | required |
Source code in fedbiomed/transport/client.py
def __init__(
self,
channels: Channels,
node_id: str,
on_status_change: Awaitable,
update_id: Awaitable
) -> None:
"""Class constructor.
Args:
channels: RPC channels and stubs to be used for polling tasks from researcher
node_id: unique ID for this node
on_status_change: Callback awaitable to run for changing node agent status
update_id: Callback function to run updating peer researcher ID
"""
super().__init__(channels)
self._node_id = node_id
self._on_status_change = on_status_change
self._update_id = update_id
self._retry_count = 0
Functions
is_server_alive
is_server_alive(host, port)
Checks if the server is alive
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host | str | The host/ip of researcher/server component | required |
port | str | Port number of researcher/server component | required |
Source code in fedbiomed/transport/client.py
def is_server_alive(host: str, port: str):
"""Checks if the server is alive
Args:
host: The host/ip of researcher/server component
port: Port number of researcher/server component
"""
port = int(port)
address_info = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
for family, socktype, protocol, _ , address in address_info:
s = socket.socket(family, socktype, protocol)
# Need this timeout for the case where the server does not answer
# If not present, socket timeout increases and this function takes more
# than GRPC_CLIENT_CONN_RETRY_TIMEOUT to execute
s.settimeout(GRPC_CLIENT_CONN_RETRY_TIMEOUT)
try:
s.connect(address)
except socket.error:
return False
else:
s.close()
return True