Server

Attributes

GRPC_SERVER_SETUP_TIMEOUT module-attribute

GRPC_SERVER_SETUP_TIMEOUT = GRPC_CLIENT_CONN_RETRY_TIMEOUT + server_setup_timeout

MAX_GRPC_SERVER_SETUP_TIMEOUT module-attribute

MAX_GRPC_SERVER_SETUP_TIMEOUT = 20 * server_setup_timeout

MAX_SEND_DURATION module-attribute

MAX_SEND_DURATION = 300

server_setup_timeout module-attribute

server_setup_timeout = int(getenv('GRPC_SERVER_SETUP_TIMEOUT', 1))

Classes

GrpcServer

GrpcServer(host, port, on_message, ssl, debug=False)

Bases: _GrpcAsyncServer

Grpc server implementation to be used by threads

This class extends async implementation of gRPC server to be able to call async methods from different thread. Currently, it is used by [fedbiomed.researcher.requests.Requests][Requests] class that is instantiated in the main thread

Attributes:

Name Type Description
_thread Optional[Thread]

background thread of gRPC server

Parameters:

Name Type Description Default
host str

server DNS name or IP address

required
port str

server TCP port

required
on_message Callable

Callback function to execute once a message received from the nodes

required
ssl SSLCredentials

Ssl credentials.

required
debug bool

Activate debug mode for gRPC asyncio

False
Source code in fedbiomed/transport/server.py
def __init__(
        self,
        host: str,
        port: str,
        on_message: Callable,
        ssl: SSLCredentials,
        debug: bool = False,
) -> None:
    """Class constructor

    Args:
        host: server DNS name or IP address
        port: server TCP port
        on_message: Callback function to execute once a message received from the nodes
        ssl: Ssl credentials.
        debug: Activate debug mode for gRPC asyncio
    """

    # inform all threads whether server is started
    self._is_started = threading.Event()
    self._ssl = ssl
    self._host = host
    self._port = port

    self._server = None
    self._debug = debug
    self._on_message = on_message
    self._loop = None
    self._agent_store : Optional[AgentStore] = None

Functions

broadcast
broadcast(message)

Broadcast message to all known and reachable nodes

Parameters:

Name Type Description Default
message Message

Message to broadcast

required

Raises:

Type Description
FedbiomedCommunicationError

bad argument type

FedbiomedCommunicationError

server is not started

Source code in fedbiomed/transport/server.py
def broadcast(self, message: Message) -> None:
    """Broadcast message to all known and reachable nodes

    Args:
        message: Message to broadcast

    Raises:
        FedbiomedCommunicationError: bad argument type
        FedbiomedCommunicationError: server is not started
    """
    if not isinstance(message, Message):
        raise FedbiomedCommunicationError(
            f"{ErrorNumbers.FB628}: bad argument type for message, expected `Message`, got `{type(message)}`")

    if not self._is_started.is_set():
        raise FedbiomedCommunicationError(
            f"{ErrorNumbers.FB628}: Can not broadcast given message. "
            "Communication client is not initialized.")

    self._run_threadsafe(super().broadcast(message))
get_all_nodes
get_all_nodes()

Returns all known nodes

Returns:

Type Description
List[NodeAgent]

A list of node agents

Raises:

Type Description
FedbiomedCommunicationError

server is not started

Source code in fedbiomed/transport/server.py
def get_all_nodes(self) -> List[NodeAgent]:
    """Returns all known nodes

    Returns:
        A list of node agents

    Raises:
        FedbiomedCommunicationError: server is not started
    """
    if not self._is_started.is_set():
        raise FedbiomedCommunicationError(
            f"{ErrorNumbers.FB628}: Error while getting all nodes "
            "connected:  Communication client is not initialized.")

    return self._run_threadsafe(super().get_all_nodes())
get_node
get_node(node_id)

Returns given node

Parameters:

Name Type Description Default
node_id

ID of node to retrieve

required

Returns:

Type Description
Optional[NodeAgent]

A node agent

Raises:

Type Description
FedbiomedCommunicationError

server is not started

Source code in fedbiomed/transport/server.py
def get_node(self, node_id) -> Optional[NodeAgent]:
    """Returns given node

    Args:
        node_id: ID of node to retrieve

    Returns:
        A node agent

    Raises:
        FedbiomedCommunicationError: server is not started
    """
    if not self._is_started.is_set():
        raise FedbiomedCommunicationError(
            f"{ErrorNumbers.FB628}: Error while getting node '{node_id}':"
            "Communication client is not initialized.")

    return self._run_threadsafe(super().get_node(node_id))
is_alive
is_alive()

Checks if the thread running gRPC server still alive

Returns:

Type Description
bool

gRPC server running status

Raises:

Type Description
FedbiomedCommunicationError

server is not started

Source code in fedbiomed/transport/server.py
def is_alive(self) -> bool:
    """Checks if the thread running gRPC server still alive

    Returns:
        gRPC server running status

    Raises:
        FedbiomedCommunicationError: server is not started
    """
    if not self._is_started.is_set():
        raise FedbiomedCommunicationError(
            f"{ErrorNumbers.FB628}: Can not check if thread is alive."
            "Communication client is not initialized.")

    # TODO: more tests about gRPC server and task status ?
    return False if not isinstance(self._thread, threading.Thread) else self._thread.is_alive()
send
send(message, node_id)

Send message to a specific node.

Parameters:

Name Type Description Default
message Message

Message to send

required
node_id str

Destination node unique ID

required

Raises:

Type Description
FedbiomedCommunicationError

bad argument type

FedbiomedCommunicationError

server is not started

Source code in fedbiomed/transport/server.py
def send(self, message: Message, node_id: str) -> None:
    """Send message to a specific node.

    Args:
        message: Message to send
        node_id: Destination node unique ID

    Raises:
        FedbiomedCommunicationError: bad argument type
        FedbiomedCommunicationError: server is not started
    """
    if not isinstance(message, Message):
        raise FedbiomedCommunicationError(
            f"{ErrorNumbers.FB628}: bad argument type for message, expected `Message`, got `{type(message)}`")

    if not self._is_started.is_set():
        raise FedbiomedCommunicationError(
            f"{ErrorNumbers.FB628.value}: Can not send message. "
            "Communication client is not initialized.")

    self._run_threadsafe(super().send(message, node_id))
start
start()

Starts async GrpcServer

Source code in fedbiomed/transport/server.py
def start(self) -> None:
    """Starts async GrpcServer """

    self._thread = threading.Thread(target=self._run, daemon=True)
    self._thread.start()

    # FIXME: This implementation assumes that nodes will be able connect and server complete setup with this delay
    logger.info("Starting researcher service...")


    logger.info(f'Waiting {GRPC_SERVER_SETUP_TIMEOUT}s for nodes to connect...')
    time.sleep(GRPC_SERVER_SETUP_TIMEOUT)

    sleep_ = 0
    while len(self.get_all_nodes()) == 0:

        if sleep_ == 0:
            logger.info(f"No nodes found, server will wait "
                        f"{MAX_GRPC_SERVER_SETUP_TIMEOUT - GRPC_SERVER_SETUP_TIMEOUT} "
                        "more seconds until a node creates connection.")

        if sleep_ > MAX_GRPC_SERVER_SETUP_TIMEOUT - GRPC_SERVER_SETUP_TIMEOUT:
            if len(self.get_all_nodes()) == 0:
                logger.warning("Server has not received connection from any remote nodes in "
                               f"MAX_GRPC_SERVER_SETUP_TIMEOUT: {MAX_GRPC_SERVER_SETUP_TIMEOUT} "
                               "This may effect the request created right after the server initialization. "
                               "However, server will keep running in the background so you can retry the "
                               "operations for sending requests to remote nodes until one receives.")
            break

        time.sleep(1)
        sleep_ += 1

ResearcherServicer

ResearcherServicer(agent_store, on_message)

Bases: ResearcherServiceServicer

RPC Servicer

Parameters:

Name Type Description Default
agent_store AgentStore

The class that stores node agents

required
on_message Callable

Callback function to execute once a message received from the nodes

required
Source code in fedbiomed/transport/server.py
def __init__(
        self,
        agent_store: AgentStore,
        on_message: Callable
) -> None:
    """Constructor of gRPC researcher servicer

    Args:
        agent_store: The class that stores node agents
        on_message: Callback function to execute once a message received from the nodes
    """
    super().__init__()
    self._agent_store = agent_store
    self._on_message = on_message

Functions

Feedback async
Feedback(request, unused_context)

Executed for Feedback request received from the nodes

Parameters:

Name Type Description Default
request Message

Feedback message

required
unused_context ServicerContext

Request service context

required
Source code in fedbiomed/transport/server.py
async def Feedback(
        self,
        request: ProtoBufMessage,
        unused_context: grpc.aio.ServicerContext
) -> None:
    """Executed for Feedback request received from the nodes

    Args:
        request: Feedback message
        unused_context: Request service context
    """

    # Get the type of Feedback | log or scalar
    one_of = request.WhichOneof("feedback_type")
    feedback = FeedbackMessage.from_proto(request)

    # Execute on_message assigned by the researcher.requests modules
    self._on_message(feedback.get_param(one_of), MessageType.convert(one_of))

    return Empty()
GetTaskUnary async
GetTaskUnary(request, context)

Gets unary RPC request and return stream of response

Parameters:

Name Type Description Default
request Message

RPC request

required
context ServicerContext

RPC peer context

required
Source code in fedbiomed/transport/server.py
async def GetTaskUnary(
        self,
        request: ProtoBufMessage,
        context: grpc.aio.ServicerContext
) -> None:
    """Gets unary RPC request and return stream of response

    Args:
        request: RPC request
        context: RPC peer context
    """
    task_request = TaskRequest.from_proto(request).get_dict()
    logger.debug(f"Node: {task_request.get('node')} polling for the tasks")

    node_agent = await self._agent_store.retrieve(node_id=task_request["node"])

    # Update node active status as active
    await node_agent.set_active()

    task = None
    try:
        while True:
            task, retry_count, first_send_time = await node_agent.get_task()

            # Choice: mark task as de-queued as soon only if really sent
            node_agent.task_done()

            # discard if message too old
            if first_send_time + MAX_SEND_DURATION > time.time():
                break
            else:
                task = None
                logger.warning(f"Message to send is older than {MAX_SEND_DURATION} seconds. Discard message.")

        task_bytes = Serializer.dumps(task.get_dict())

        chunk_range = range(0, len(task_bytes), MAX_MESSAGE_BYTES_LENGTH)
        for start, iter_ in zip(chunk_range, range(1, len(chunk_range) + 1)):
            stop = start + MAX_MESSAGE_BYTES_LENGTH

            try:
                yield TaskResponse(
                    size=len(chunk_range),
                    iteration=iter_,
                    bytes_=task_bytes[start:stop]
                ).to_proto()
            except GeneratorExit:
                # schedule resend if task sending could not be completed
                # => retry send as long as (1) send not successful
                # (2) max retries not reached
                # => else discard message
                #
                # Note: if node is disconnected then back online, message is retried after reconnection.
                # This is not fully coherent with upper layers (Requests) that may trigger an application
                # level failure in the while, but it is mitigated by the MAX_SEND_DURATION
                if retry_count < MAX_SEND_RETRIES:
                    await node_agent.send_async(
                        message=task, on_reply=None, retry_count=retry_count + 1, first_send_time=first_send_time
                    )
                else:
                    logger.warning(f"Message cannot be sent after {MAX_SEND_RETRIES} retries. Discard message.")
                await node_agent.change_node_status_after_task()
                # need return here to avoid RuntimeError
                return

    except asyncio.CancelledError:
        if task is not None and retry_count is not None and first_send_time is not None:
            # schedule resend if task was pulled from queue
            if retry_count < MAX_SEND_RETRIES:
                await node_agent.send_async(
                    message=task, on_reply=None, retry_count=retry_count + 1, first_send_time=first_send_time
                )
            else:
                logger.warning(f"Message cannot be sent after {MAX_SEND_RETRIES} retries. Discard message.")
    finally:
        await node_agent.change_node_status_after_task()
ReplyTask async
ReplyTask(request_iterator, unused_context)

Gets stream replies from the nodes

Parameters:

Name Type Description Default
request_iterator Iterable[Message]

Iterator for streaming

required
unused_context ServicerContext

Request service context

required
Source code in fedbiomed/transport/server.py
async def ReplyTask(
        self,
        request_iterator: Iterable[ProtoBufMessage],
        unused_context: grpc.aio.ServicerContext
) -> None:
    """Gets stream replies from the nodes

    Args:
        request_iterator: Iterator for streaming
        unused_context: Request service context
    """

    reply = bytes()
    async for answer in request_iterator:
        reply += answer.bytes_
        if answer.size != answer.iteration:
            continue
        else:
            # Deserialize message
            message = Serializer.loads(reply)

            # Replies are handled by node agent callbacks
            node = await self._agent_store.get(message["node_id"])
            await node.on_reply(message)

            reply = bytes()

    return Empty()

SSLCredentials

SSLCredentials(key, cert)

Contains credentials for SSL certifcate of the gRPC server

Parameters:

Name Type Description Default
key str

path to private key

required
cert str

path to certificate

required
Source code in fedbiomed/transport/server.py
def __init__(self, key: str, cert: str):
    """Reads private key and cert file

    Args:
        key: path to private key
        cert: path to certificate
    """
    with open(key, 'rb') as f:
        self.private_key = f.read()
    with open(cert, 'rb') as f:
        self.certificate = f.read()

Attributes

certificate instance-attribute
certificate = read()
private_key instance-attribute
private_key = read()