Client

Attributes

GRPC_CLIENT_CONN_RETRY_TIMEOUT module-attribute

GRPC_CLIENT_CONN_RETRY_TIMEOUT = 2

GRPC_CLIENT_TASK_REQUEST_TIMEOUT module-attribute

GRPC_CLIENT_TASK_REQUEST_TIMEOUT = 3600

Classes

Channels

Channels(researcher)

Keeps gRPC server channels

Parameters:

Name Type Description Default
researcher ResearcherCredentials

An instance of ResearcherCredentials

required
Source code in fedbiomed/transport/client.py
def __init__(self, researcher: ResearcherCredentials):
    """Create channels and stubs

    Args:
        researcher: An instance of ResearcherCredentials
    """
    self._researcher = researcher

    self._channels = {}
    self._stubs = {}
    self._stub_types = [
        _StubType.LISTENER_TASK_STUB,
        _StubType.SENDER_TASK_STUB,
        _StubType.SENDER_FEEDBACK_STUB,
    ]
    for st in self._stub_types:
        self._channels[st]: grpc.aio.Channel = None
        self._stubs[st]: ResearcherServiceStub = None

    # lock for accessing channels and stubs
    self._channels_stubs_lock = asyncio.Lock()

Functions

connect async
connect(stub_type=_StubType.ANY_STUB)

Connects gRPC server and instatiates stubs.

Parameters:

Name Type Description Default
stub_type _StubType

only (re)connect for matching stub type(s)

ANY_STUB
Source code in fedbiomed/transport/client.py
async def connect(self, stub_type: _StubType = _StubType.ANY_STUB):
    """Connects gRPC server and instatiates stubs.

    Args:
        stub_type: only (re)connect for matching stub type(s)
    """

    async with self._channels_stubs_lock:
        # Closes if channels are open
        for st, channel in self._channels.items():
            if channel and (stub_type == _StubType.ANY_STUB or stub_type == st):
                await channel.close()

        # Creates channels
        for st in self._channels.keys():
            if stub_type == _StubType.ANY_STUB or stub_type == st:
                self._channels[st] = self._create()
                self._stubs[st] = ResearcherServiceStub(channel=self._channels[st])
stub async
stub(stub_type)

Get stub for a given stub type.

Parameters:

Name Type Description Default
stub_type _StubType

the stub type to get

required

Returns:

Type Description
ResearcherServiceStub

the stub if it exists or None

Source code in fedbiomed/transport/client.py
async def stub(self, stub_type: _StubType) -> ResearcherServiceStub:
    """Get stub for a given stub type.

    Args:
        stub_type: the stub type to get

    Returns:
        the stub if it exists or None
    """
    if stub_type in self._stub_types:
        async with self._channels_stubs_lock:
            return self._stubs[stub_type]
    else:
        return None

ClientStatus

Bases: Enum

Attributes

CONNECTED class-attribute instance-attribute
CONNECTED = 1
DISCONNECTED class-attribute instance-attribute
DISCONNECTED = 0
FAILED class-attribute instance-attribute
FAILED = 2

GrpcClient

GrpcClient(node_id, researcher, update_id_map)

An agent of remote researcher gRPC server.

Parameters:

Name Type Description Default
node_id str

unique ID of this node (connection client)

required
researcher ResearcherCredentials

the researcher to which the node connects (connection server)

required
update_id_map Awaitable

awaitable to call when updating the researcher ID, needs proper prototype

required
Source code in fedbiomed/transport/client.py
def __init__(
    self, node_id: str, researcher: ResearcherCredentials, update_id_map: Awaitable
) -> None:
    """Class constructor

    Args:
        node_id: unique ID of this node (connection client)
        researcher: the researcher to which the node connects (connection server)
        update_id_map: awaitable to call when updating the researcher ID, needs proper prototype
    """
    self._id = None
    self._researcher = researcher
    self._channels = Channels(researcher)

    self._task_listener = TaskListener(
        channels=self._channels,
        node_id=node_id,
        on_status_change=self._on_status_change,
        update_id=self._update_id,
    )

    self._sender = Sender(
        channels=self._channels, on_status_change=self._on_status_change
    )

    # TODO: use `self._status` for finer gRPC agent handling.
    # Currently, the (tentative) status is maintained but not used
    self._status = ClientStatus.DISCONNECTED
    # lock for accessing self._status
    self._status_lock = asyncio.Lock()

    self._update_id_map = update_id_map
    self._tasks = []

Functions

send async
send(message)

Sends messages from node to researcher server.

Parameters:

Name Type Description Default
message Message

message to send from node to server

required
Source code in fedbiomed/transport/client.py
async def send(self, message: Message) -> None:
    """Sends messages from node to researcher server.

    Args:
        message: message to send from node to server
    """

    await self._sender.send(message)
start
start(on_task)

Start researcher gRPC agent.

Starts long-lived tasks, one waiting for server requests, one waiting on the async queue for the replies from the node that are going to be sent back to researcher.

Parameters:

Name Type Description Default
on_task

Callback function to execute once a payload received from researcher.

required

Returns:

Type Description
List[Awaitable[Optional[Callable]]]

A list of task objects of the agent

Source code in fedbiomed/transport/client.py
def start(self, on_task) -> List[Awaitable[Optional[Callable]]]:
    """Start researcher gRPC agent.

    Starts long-lived tasks, one waiting for server requests, one waiting on the async queue
    for the replies from the node that are going to be sent back to researcher.

    Args:
        on_task: Callback function to execute once a payload received from researcher.

    Returns:
        A list of task objects of the agent
    """

    async def run():
        """Connects and dispatches the tasks"""

        # First connects to channel
        await self._connect()

        # Launch listeners
        await asyncio.gather(
            self._task_listener.listen(on_task), self._sender.listen()
        )

    # Returns client task
    return asyncio.create_task(run())

Listener

Listener(channels)

Abstract generic listener method for a node's communications.

Parameters:

Name Type Description Default
channels Channels

Keeps channels and stubs.

required
Source code in fedbiomed/transport/client.py
def __init__(self, channels: Channels) -> None:
    """Constructs task listener channels

    Args:
        channels: Keeps channels and stubs.
    """
    self._channels = channels
    self._retry_on_error = False

Functions

listen
listen(callback=None)

Listens for tasks from given channels

Parameters:

Name Type Description Default
callback Optional[Callable]

Callback function to execute once a task is processed

None

Returns:

Type Description
Awaitable[Optional[Callable]]

Asyncio task to run task listener

Source code in fedbiomed/transport/client.py
def listen(
    self, callback: Optional[Callable] = None
) -> Awaitable[Optional[Callable]]:
    """Listens for tasks from given channels

    Args:
        callback: Callback function to execute once a task is processed

    Returns:
        Asyncio task to run task listener
    """
    return asyncio.create_task(self._listen(callback))

ResearcherCredentials dataclass

ResearcherCredentials(port, host, certificate=None)

Attributes

certificate class-attribute instance-attribute
certificate = None
host instance-attribute
host
port instance-attribute
port

Sender

Sender(channels, on_status_change)

Bases: Listener

Parameters:

Name Type Description Default
channels Channels

RPC channels and stubs to be used for polling tasks from researcher

required
on_status_change Awaitable

Callback awaitable to run for changing node agent status

required
Source code in fedbiomed/transport/client.py
def __init__(
    self,
    channels: Channels,
    on_status_change: Awaitable,
) -> None:
    """Class constructor.

    Args:
        channels: RPC channels and stubs to be used for polling tasks from researcher
        on_status_change: Callback awaitable to run for changing node agent status
    """
    super().__init__(channels)

    self._queue = asyncio.Queue()
    self._on_status_change = on_status_change
    self._retry_count = 0
    self._retry_item = None
    self._stub_type = _StubType.NO_STUB
    self._retry_on_error = True

Functions

send async
send(message)

Send a message to peer researcher.

Parameters:

Name Type Description Default
message Message

Message to send

required
Source code in fedbiomed/transport/client.py
async def send(self, message: Message) -> None:
    """Send a message to peer researcher.

    Args:
        message: Message to send
    """
    # Switch-case for message type and gRPC calls
    match message.__class__.__name__:
        case FeedbackMessage.__name__:
            # Note: FeedbackMessage is designed as proto serializable message.
            await self._queue.put(
                {"stub": _StubType.SENDER_FEEDBACK_STUB, "message": message}
            )

        case _:
            await self._queue.put(
                {"stub": _StubType.SENDER_TASK_STUB, "message": message}
            )

TaskListener

TaskListener(channels, node_id, on_status_change, update_id)

Bases: Listener

Listener for the task assigned by the researcher component

Parameters:

Name Type Description Default
channels Channels

RPC channels and stubs to be used for polling tasks from researcher

required
node_id str

unique ID for this node

required
on_status_change Awaitable

Callback awaitable to run for changing node agent status

required
update_id Awaitable

Callback function to run updating peer researcher ID

required
Source code in fedbiomed/transport/client.py
def __init__(
    self,
    channels: Channels,
    node_id: str,
    on_status_change: Awaitable,
    update_id: Awaitable,
) -> None:
    """Class constructor.

    Args:
        channels: RPC channels and stubs to be used for polling tasks from researcher
        node_id: unique ID for this node
        on_status_change: Callback awaitable to run for changing node agent status
        update_id: Callback function to run updating peer researcher ID
    """
    super().__init__(channels)

    self._node_id = node_id
    self._on_status_change = on_status_change
    self._update_id = update_id
    self._retry_count = 0

Functions

is_server_alive

is_server_alive(host, port)

Checks if the server is alive

Parameters:

Name Type Description Default
host str

The host/ip of researcher/server component

required
port str

Port number of researcher/server component

required
Source code in fedbiomed/transport/client.py
def is_server_alive(host: str, port: str):
    """Checks if the server is alive

    Args:
        host: The host/ip of researcher/server component
        port: Port number of researcher/server component
    """

    port = int(port)
    address_info = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
    for family, socktype, protocol, _, address in address_info:
        s = socket.socket(family, socktype, protocol)
        # Need this timeout for the case where the server does not answer
        # If not present, socket timeout increases and this function takes more
        # than GRPC_CLIENT_CONN_RETRY_TIMEOUT to execute
        s.settimeout(GRPC_CLIENT_CONN_RETRY_TIMEOUT)
        try:
            s.connect(address)
        except socket.error:
            return False
        else:
            s.close()
            return True