Attributes
GRACE_TRIGGERED_EVENT module-attribute
GRACE_TRIGGERED_EVENT = 10
MAX_TRIGGERED_EVENT_TIMEOUT module-attribute
MAX_TRIGGERED_EVENT_TIMEOUT = 60
Classes
EventWaitExchange
EventWaitExchange(remove_delivered)
Provides thread safe synchronized data exchange object.
Object users can wait on one or more events registered with an event_id
that must be chosen to ensure it is unique in the object.
Object users can trigger events using an event_id
that may already be waited (or not). Arbitrary data can be associated to the event and will be transmitted to the receiving waiter(s).
One or more waiter can wait on the same event. If the event is removed when delivered, to a waiter then only one waiter receives it. If the event is not removed when delivered, then multiple waiters can receive it until it is cleaned on timeout.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
remove_delivered | if True, remove from an event from the triggered event when they are delivered | required |
Source code in fedbiomed/common/synchro.py
def __init__(self, remove_delivered):
"""Constructor of the class.
Args:
remove_delivered: if True, remove from an event from the triggered event when they are delivered
"""
self._remove_delivered = remove_delivered
# table for storing the triggered event received, still waiting to be consumed
self._triggered_events = {}
# lock for accessing self._triggered_events
self._triggered_events_lock = threading.Lock()
# table for tracking pending requests of the listeners waiting for events
# Key is an internal unique `id_counter`, value is a list of `event_id`
self._pending_listeners = {}
# lock for accessing self._pending_listeners
self._pending_listeners_lock = threading.Lock()
self._id_counter = 0
Functions
event
event(event_id, event_data)
Add an entry to the table of triggered event
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_id | str | unique ID of the event | required |
event_data | Any | arbitrary data to transmit to the event receiver | required |
Source code in fedbiomed/common/synchro.py
def event(self, event_id: str, event_data: Any) -> None:
"""Add an entry to the table of triggered event
Args:
event_id: unique ID of the event
event_data: arbitrary data to transmit to the event receiver
"""
with self._triggered_events_lock:
# Remove obsolete triggered events. We could additionally call at other times.
self._clean_triggered_events()
# In case a event already exists for this ID, overwrite it with the newer one
self._triggered_events[event_id] = {
"start_time": time.time(),
"data": event_data,
}
# check if added event completes some listeners
completed_listeners = self._all_events()
with self._pending_listeners_lock:
for completed_listener in completed_listeners:
# check: listener may have been removed since tested `_all_events`
# as we didn't keep the lock
if completed_listener in self._pending_listeners:
# wake up waiting listener
self._pending_listeners[completed_listener]["event"].set()
wait
wait(event_ids, timeout)
Wait for a registered listener to complete.
Blocks until all events for this listener are triggered or until timeout is reached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_ids | list[str] | list of the unique IDs for all events to wait | required |
timeout | float | maximum time to wait for replies in seconds | required |
Returns:
Type | Description |
---|---|
Tuple[bool, List[Any]] | A tuple consisting of - a bool set to |
Raises:
Type | Description |
---|---|
FedbiomedNodeToNodeError | timeout has incorrect type or value |
Source code in fedbiomed/common/synchro.py
def wait(self, event_ids: list[str], timeout: float) -> Tuple[bool, List[Any]]:
"""Wait for a registered listener to complete.
Blocks until all events for this listener are triggered or until timeout is reached.
Args:
event_ids: list of the unique IDs for all events to wait
timeout: maximum time to wait for replies in seconds
Returns:
A tuple consisting of
- a bool set to `True` if all events for this listener were delivered to this
request, `False` if not
- a list of the arbitrary data associated with each delivered event
Raises:
FedbiomedNodeToNodeError: timeout has incorrect type or value
"""
# Check value for timeout, as bad value may cause hard to detect problems
if (
not isinstance(timeout, (float, int))
or timeout < 0
or timeout > MAX_TRIGGERED_EVENT_TIMEOUT
):
raise FedbiomedSynchroError(
f"{ErrorNumbers.FB324}: Cannot wait {timeout} seconds. "
f"Should be int or float between 0 and {MAX_TRIGGERED_EVENT_TIMEOUT}"
)
time_initial = time.time()
with self._pending_listeners_lock:
self._id_counter += 1
listener_id = self._id_counter
event = threading.Event()
self._pending_listeners[listener_id] = {
"start_time": time_initial,
"event": event,
"event_ids": event_ids,
}
# wait until all events are triggered or timeout is reached
while not self._all_events(listener_id) and (
time.time() < time_initial + timeout
):
# be sure not to hold any lock when waiting !
event.wait(time_initial + timeout - time.time())
event.clear()
with self._pending_listeners_lock:
with self._triggered_events_lock:
# check if all events were received (and are still available for delivery)
all_received = set(
self._pending_listeners[listener_id]["event_ids"]
).issubset(set(self._triggered_events.keys()))
# create list of delivered data
events_data = [
self._triggered_events[reqid]["data"]
for reqid in self._pending_listeners[listener_id]["event_ids"]
if reqid in self._triggered_events
]
if self._remove_delivered:
# remove all events for this request from the triggered events available
# for delivery
for reqid in self._pending_listeners[listener_id]["event_ids"]:
if reqid in self._triggered_events:
self._triggered_events.pop(reqid)
# remove expired listener
del self._pending_listeners[listener_id]
return all_received, events_data