Queue module that contains task queue class that is a wrapper to the persistqueue python library.
Attributes
Classes
TasksQueue
TasksQueue(messages_queue_dir, tmp_dir)
A disk-persistent Queue object, ensuring queue will remain on disk even if program crashes.
Relies on persistqueue
package.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages_queue_dir | str | directory where enqueued data should be persisted. | required |
tmp_dir | str | indicates where temporary files should be stored. | required |
Source code in fedbiomed/common/tasks_queue.py
def __init__(self, messages_queue_dir: str, tmp_dir: str):
"""Construct disk-persistent Queue.
Args:
messages_queue_dir: directory where enqueued data should be persisted.
tmp_dir: indicates where temporary files should be stored.
"""
try:
# small chunksize to limit un-needed use of disk space
self.queue = persistqueue.Queue(messages_queue_dir, tempdir=tmp_dir, chunksize=1)
except ValueError as e:
msg = ErrorNumbers.FB603.value + ": cannot create queue (" + str(e) + ")"
logger.critical(msg)
raise FedbiomedTaskQueueError(msg)
Attributes
queue instance-attribute
queue = Queue(messages_queue_dir, tempdir=tmp_dir, chunksize=1)
Functions
add
add(task)
Adds a task to the queue
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task | dict | a dict describing the task to be added | required |
Source code in fedbiomed/common/tasks_queue.py
def add(self, task: dict):
"""Adds a task to the queue
Args:
task: a dict describing the task to be added
"""
try:
self.queue.put(task)
except persistqueue.exceptions.Full:
msg = ErrorNumbers.FB603.value + ": queue is full"
logger.critical(msg)
raise FedbiomedTaskQueueError(msg)
get
get(block=True)
Get the current task in the queue
Parameters:
Name | Type | Description | Default |
---|---|---|---|
block | Optional[bool] | if True, block if necessary until an item is available. Defaults to True. | True |
Returns:
Type | Description |
---|---|
dict | Dictionary object stored in queue |
Raises:
Type | Description |
---|---|
FedbiomedTaskQueueError | If queue is empty |
Source code in fedbiomed/common/tasks_queue.py
def get(self, block: Optional[bool] = True) -> dict:
"""Get the current task in the queue
Args:
block: if True, block if necessary until an item is available. Defaults to True.
Returns:
Dictionary object stored in queue
Raises:
FedbiomedTaskQueueError: If queue is empty
"""
try:
return self.queue.get(block)
except persistqueue.exceptions.Empty:
msg = ErrorNumbers.FB603.value + ": queue is empty"
logger.debug(msg)
raise FedbiomedTaskQueueError(msg)
qsize
qsize()
Retrieve the size of the queue
Returns:
Type | Description |
---|---|
int | size of the queue |
Source code in fedbiomed/common/tasks_queue.py
def qsize(self) -> int:
"""Retrieve the size of the queue
Returns:
size of the queue
"""
return self.queue.qsize()
task_done
task_done()
Indicate whether a formerly enqueued task is complete
Returns:
Type | Description |
---|---|
Any | True if task is complete |
Source code in fedbiomed/common/tasks_queue.py
def task_done(self) -> Any:
"""Indicate whether a formerly enqueued task is complete
Returns:
True if task is complete
"""
try:
return self.queue.task_done()
except ValueError:
# persistqueue raises it if task_done called too many times we can ignore it
return