NodeAgent

Attributes

GRPC_SERVER_TASK_WAIT_TIMEOUT module-attribute

GRPC_SERVER_TASK_WAIT_TIMEOUT = 10

Classes

AgentStore

AgentStore(loop, on_forward)

Stores node agents

Parameters:

Name Type Description Default
loop AbstractEventLoop

asyncio event loop that research server runs. Agent store should use same event loop for async operations

required
on_forward Optional[Awaitable[None]]

Coroutine for handling overlay messages to forward unchanged to a node

required
Source code in fedbiomed/transport/node_agent.py
def __init__(
    self, loop: asyncio.AbstractEventLoop, on_forward: Optional[Awaitable[None]]
) -> None:
    """Constructor of the agent store

    Args:
        loop: asyncio event loop that research server runs. Agent store should use
            same event loop for async operations
        on_forward: Coroutine for handling overlay messages to forward unchanged to a node
    """
    self._node_agents: NodeAgent = {}

    self._loop = loop
    self._on_forward = on_forward

    # protect read/write operations on self._node_agents
    self._store_lock = asyncio.Lock()

Functions

get async
get(node_id)

Gets node agent by given node id

Parameters:

Name Type Description Default
node_id str

Id of the node, or None if no agent exists for this node ID

required
Source code in fedbiomed/transport/node_agent.py
async def get(self, node_id: str) -> Optional[NodeAgent]:
    """Gets node agent by given node id

    Args:
        node_id: Id of the node, or None if no agent exists for this node ID
    """
    async with self._store_lock:
        return self._node_agents.get(node_id)
get_all async
get_all()

Returns all node agents regardless of their status (ACTIVE, DISCONNECTED, ...).

Returns:

Type Description
Dict[str, NodeAgent]

Dictionary of node agent objects, by node ID

Source code in fedbiomed/transport/node_agent.py
async def get_all(self) -> Dict[str, NodeAgent]:
    """Returns all node agents regardless of their status (ACTIVE, DISCONNECTED, ...).

    Returns:
        Dictionary of node agent objects, by node ID
    """

    async with self._store_lock:
        # a shallow copy is wanted so that
        # - we have a distinct (stable) list of NodeAgents that can be processed in calling func
        # - we use same NodeAgents objects (not a copy)
        return copy.copy(self._node_agents)
retrieve async
retrieve(node_id)

Retrieves a node agent for a given node ID.

Depending if this node is already known to the store this method gets existing agent or. registers a new agent.

Parameters:

Name Type Description Default
node_id str

ID of receiving node

required
Return

The node agent to manage tasks that are assigned to it.

Source code in fedbiomed/transport/node_agent.py
async def retrieve(self, node_id: str) -> NodeAgent:
    """Retrieves a node agent for a given node ID.

    Depending if this node is already known to the store this method gets existing agent or.
    registers a new agent.

    Args:
        node_id: ID of receiving node

    Return:
        The node agent to manage tasks that are assigned to it.
    """
    # Lock during all sequence to ensure atomicity
    async with self._store_lock:
        node = self._node_agents.get(node_id)
        if not node:
            node = NodeAgent(
                id=node_id, loop=self._loop, on_forward=self._on_forward
            )
            self._node_agents.update({node_id: node})

    return node

NodeActiveStatus

Bases: Enum

Node active status types

Attributes:

Name Type Description
WAITING

Corresponds status where researcher server waits another GetTask request after the previous one is completed.

ACTIVE

Listening for the task with open RPC call

DISCONNECTED

No GetTask RPC call running from the node

Attributes

ACTIVE class-attribute instance-attribute
ACTIVE = 2
DISCONNECTED class-attribute instance-attribute
DISCONNECTED = 3
WAITING class-attribute instance-attribute
WAITING = 1

NodeAgent

NodeAgent(id, loop, on_forward)

Bases: NodeAgentAsync

Parameters:

Name Type Description Default
id str

node unique ID

required
loop AbstractEventLoop

event loop

required
on_forward Optional[Awaitable[None]]

Coroutine for handling overlay messages to forward unchanged to a node

required
Source code in fedbiomed/transport/node_agent.py
def __init__(
    self,
    id: str,
    loop: asyncio.AbstractEventLoop,
    on_forward: Optional[Awaitable[None]],
) -> None:
    """Represent the client that connects to gRPC server

    Args:
        id: node unique ID
        loop: event loop
        on_forward: Coroutine for handling overlay messages to forward unchanged to a node
    """
    self._id: str = id
    self._on_forward = on_forward
    self._last_request: Optional[datetime] = None
    self._replies = Replies()
    self._stopped_request_ids = []
    # Node should be active when it is first instantiated
    self._status: NodeActiveStatus = NodeActiveStatus.ACTIVE

    self._queue = asyncio.Queue()
    self._loop = loop
    self._status_task: Optional[asyncio.Task] = None

    # protect read/write operations on self._status + self._status_task)
    self._status_lock = asyncio.Lock()
    self._replies_lock = asyncio.Lock()
    self._stopped_request_ids_lock = asyncio.Lock()

Attributes

status property
status

Getter for node status.

Returns:

Type Description
NodeActiveStatus

node status

Functions

flush
flush(request_id, stopped=False)

Flush processed replies

Parameters:

Name Type Description Default
request_id str

request ID for which the replies should be flushed

required
stopped bool

the request was stopped during processing

False
Source code in fedbiomed/transport/node_agent.py
def flush(self, request_id: str, stopped: bool = False) -> None:
    """Flush processed replies

    Args:
        request_id: request ID for which the replies should be flushed
        stopped: the request was stopped during processing
    """
    asyncio.run_coroutine_threadsafe(super().flush(request_id, stopped), self._loop)
send
send(message, on_reply=None)

Send message to researcher.

Parameters:

Name Type Description Default
message Message

Message to send to the researcher

required
Source code in fedbiomed/transport/node_agent.py
def send(self, message: Message, on_reply: Optional[Callable] = None) -> None:
    """Send message to researcher.

    Args:
        message: Message to send to the researcher
    """
    asyncio.run_coroutine_threadsafe(
        self.send_async(message=message, on_reply=on_reply), self._loop
    )

NodeAgentAsync

NodeAgentAsync(id, loop, on_forward)

Parameters:

Name Type Description Default
id str

node unique ID

required
loop AbstractEventLoop

event loop

required
on_forward Optional[Awaitable[None]]

Coroutine for handling overlay messages to forward unchanged to a node

required
Source code in fedbiomed/transport/node_agent.py
def __init__(
    self,
    id: str,
    loop: asyncio.AbstractEventLoop,
    on_forward: Optional[Awaitable[None]],
) -> None:
    """Represent the client that connects to gRPC server

    Args:
        id: node unique ID
        loop: event loop
        on_forward: Coroutine for handling overlay messages to forward unchanged to a node
    """
    self._id: str = id
    self._on_forward = on_forward
    self._last_request: Optional[datetime] = None
    self._replies = Replies()
    self._stopped_request_ids = []
    # Node should be active when it is first instantiated
    self._status: NodeActiveStatus = NodeActiveStatus.ACTIVE

    self._queue = asyncio.Queue()
    self._loop = loop
    self._status_task: Optional[asyncio.Task] = None

    # protect read/write operations on self._status + self._status_task)
    self._status_lock = asyncio.Lock()
    self._replies_lock = asyncio.Lock()
    self._stopped_request_ids_lock = asyncio.Lock()

Attributes

id property
id

Getter for node id.

Returns:

Type Description
str

node id

Functions

change_node_status_after_task async
change_node_status_after_task()

Coroutine to execute each time RPC call is completed

Source code in fedbiomed/transport/node_agent.py
async def change_node_status_after_task(self) -> None:
    """Coroutine to execute each time RPC call is completed"""
    async with self._status_lock:
        self._status = NodeActiveStatus.WAITING

        if self._status_task is None:
            self._status_task = asyncio.create_task(
                self._change_node_status_disconnected()
            )
flush async
flush(request_id, stopped=False)

Flushes processed reply

Parameters:

Name Type Description Default
request_id str

request ID for which the replies should be flushed

required
stopped bool

the request was stopped during processing

False
Source code in fedbiomed/transport/node_agent.py
async def flush(self, request_id: str, stopped: bool = False) -> None:
    """Flushes processed reply

    Args:
        request_id: request ID for which the replies should be flushed
        stopped: the request was stopped during processing
    """
    async with self._replies_lock:
        if stopped and self._replies[request_id]["reply"] is None:
            async with self._stopped_request_ids_lock:
                self._stopped_request_ids.append(request_id)

        self._replies.pop(request_id, None)
get_task
get_task()

Get tasks assigned by the main thread

Returns coroutine

This function return an asyncio coroutine. Please use await while calling.

Returns:

Type Description
Awaitable[Message]

A coroutine to await for retrieving a list of: a task ; a number of send retries already done ; the time of first sending attempt in seconds since epoch

Source code in fedbiomed/transport/node_agent.py
def get_task(self) -> Awaitable[Message]:
    """Get tasks assigned by the main thread

    !!! note "Returns coroutine"
        This function return an asyncio coroutine. Please use `await` while calling.

    Returns:
        A coroutine to await for retrieving a list of: a task ; a number of send retries already done ;
            the time of first sending attempt in seconds since epoch
    """
    return self._queue.get()
on_reply async
on_reply(message)

Callback to execute each time new reply received from the node

Source code in fedbiomed/transport/node_agent.py
async def on_reply(self, message: Dict) -> None:
    """Callback to execute each time new reply received from the node"""

    message = Message.from_dict(message)

    # Handle overlay messages to relay to a node
    if isinstance(message, OverlayMessage):
        await self._on_forward(message)
        return

    # Handle RequestReply messages for the researcher
    if not message.request_id:
        logger.error(
            f"Server received a reply from the client {self._id} that does "
            "not contains request id."
        )

    async with self._replies_lock:
        if message.request_id in self._replies:
            if self._replies[message.request_id]["reply"] is None:
                self._replies[message.request_id]["reply"] = message
                self._replies[message.request_id]["callback"](message)
            else:
                # Handle case of multiple replies
                # Avoid conflict with consumption of reply.
                logger.warning(
                    f"Received multiple replies for request {message.request_id}. "
                    "Keep first reply, ignore subsequent replies"
                )
        else:
            async with self._stopped_request_ids_lock:
                if message.request_id in self._stopped_request_ids:
                    logger.warning(
                        "Received a reply from a federated request that has been "
                        f"stopped: {message.request_id}."
                    )
                    self._stopped_request_ids.remove(message.request_id)
                else:
                    logger.warning(
                        f"Received a reply from an unexpected request: {message.request_id}"
                    )
send_async async
send_async(message, on_reply=None, retry_count=0, first_send_time=None)

Async function send message to researcher.

Parameters:

Name Type Description Default
message Message

Message to send to the researcher

required
on_reply Optional[Callable]

optional callback to execute when receiving message reply

None
retry_count int

number of retries already done for this message

0
first_send_time Optional[float]

time of first send attempt for this message

None
Source code in fedbiomed/transport/node_agent.py
async def send_async(
    self,
    message: Message,
    on_reply: Optional[Callable] = None,
    retry_count: int = 0,
    first_send_time: Optional[float] = None,
) -> None:
    """Async function send message to researcher.

    Args:
        message: Message to send to the researcher
        on_reply: optional callback to execute when receiving message reply
        retry_count: number of retries already done for this message
        first_send_time: time of first send attempt for this message
    """

    async with self._status_lock:
        if self._status == NodeActiveStatus.DISCONNECTED:
            logger.info(f"Node {self._id} is disconnected. Discard message.")
            return

        if self._status == NodeActiveStatus.WAITING:
            logger.info(
                f"Node {self._id} is in WAITING status. Server is "
                "waiting for receiving a request from "
                "this node to convert it as ACTIVE. Node will be updated "
                "as DISCONNECTED soon if no request received."
            )

    # Updates replies
    #
    # Note: as forwarded messages don't have a `request_id` field we don't have to test
    # if this is an OverlayMessage but check whether the field exists
    async with self._replies_lock:
        # update replies only for (1) request-response messages
        # (2) that are not yet registered as pending request
        if (
            hasattr(message, "request_id")
            and message.request_id
            and message.request_id not in self._replies
        ):
            self._replies.update(
                {message.request_id: {"callback": on_reply, "reply": None}}
            )

    if first_send_time is None:
        first_send_time = time.time()
    await self._queue.put([message, retry_count, first_send_time])
set_active async
set_active()

Updates node status as active

Source code in fedbiomed/transport/node_agent.py
async def set_active(self) -> None:
    """Updates node status as active"""

    async with self._status_lock:

        # Inform user that node is online again
        if self._status == NodeActiveStatus.DISCONNECTED:
            logger.info(f"Node {self._id} is back online!")

        self._status = NodeActiveStatus.ACTIVE

        # Cancel status task if there is any running
        if self._status_task:
            self._status_task.cancel()
            self._status_task = None
status_async async
status_async()

Getter for node status.

Returns:

Type Description
NodeActiveStatus

node status

Source code in fedbiomed/transport/node_agent.py
async def status_async(self) -> NodeActiveStatus:
    """Getter for node status.

    Returns:
        node status
    """
    async with self._status_lock:
        # (deep)copy is not needed as long as this remains a simple value ...
        return self._status
task_done
task_done()

Acknowledge completion of a de-queued task

Source code in fedbiomed/transport/node_agent.py
def task_done(self) -> None:
    """Acknowledge completion of a de-queued task"""
    self._queue.task_done()

Replies

Bases: dict