TasksQueue

Queue module that contains task queue class that is a wrapper to the persistqueue python library.

Attributes

Classes

TasksQueue

TasksQueue(messages_queue_dir, tmp_dir)

A disk-persistent Queue object, ensuring queue will remain on disk even if program crashes.

Relies on persistqueue package.

Parameters:

Name Type Description Default
messages_queue_dir str

directory where enqueued data should be persisted.

required
tmp_dir str

indicates where temporary files should be stored.

required
Source code in fedbiomed/common/tasks_queue.py
def __init__(self, messages_queue_dir: str, tmp_dir: str):
    """Construct disk-persistent Queue.

    Args:
        messages_queue_dir: directory where enqueued data should be persisted.
        tmp_dir: indicates where temporary files should be stored.
    """
    try:
        # small chunksize to limit un-needed use of disk space
        self.queue = persistqueue.Queue(messages_queue_dir, tempdir=tmp_dir, chunksize=1)
    except ValueError as e:
        msg = ErrorNumbers.FB603.value + ": cannot create queue (" + str(e) + ")"
        logger.critical(msg)
        raise FedbiomedTaskQueueError(msg)

Attributes

queue instance-attribute
queue = Queue(messages_queue_dir, tempdir=tmp_dir, chunksize=1)

Functions

add
add(task)

Adds a task to the queue

Parameters:

Name Type Description Default
task dict

a dict describing the task to be added

required
Source code in fedbiomed/common/tasks_queue.py
def add(self, task: dict):
    """Adds a task to the queue

    Args:
        task: a dict describing the task to be added
    """
    try:
        self.queue.put(task)
    except persistqueue.exceptions.Full:
        msg = ErrorNumbers.FB603.value + ": queue is full"
        logger.critical(msg)
        raise FedbiomedTaskQueueError(msg)
get
get(block=True)

Get the current task in the queue

Parameters:

Name Type Description Default
block Optional[bool]

if True, block if necessary until an item is available. Defaults to True.

True

Returns:

Type Description
dict

Dictionary object stored in queue

Raises:

Type Description
FedbiomedTaskQueueError

If queue is empty

Source code in fedbiomed/common/tasks_queue.py
def get(self, block: Optional[bool] = True) -> dict:
    """Get the current task in the queue

    Args:
        block: if True, block if necessary until an item is available. Defaults to True.

    Returns:
        Dictionary object stored in queue

    Raises:
        FedbiomedTaskQueueError: If queue is empty
    """
    try:
        return self.queue.get(block)
    except persistqueue.exceptions.Empty:
        msg = ErrorNumbers.FB603.value + ": queue is empty"
        logger.debug(msg)
        raise FedbiomedTaskQueueError(msg)
qsize
qsize()

Retrieve the size of the queue

Returns:

Type Description
int

size of the queue

Source code in fedbiomed/common/tasks_queue.py
def qsize(self) -> int:
    """Retrieve the size of the queue

    Returns:
        size of the queue
    """
    return self.queue.qsize()
task_done
task_done()

Indicate whether a formerly enqueued task is complete

Returns:

Type Description
Any

True if task is complete

Source code in fedbiomed/common/tasks_queue.py
def task_done(self) -> Any:
    """Indicate whether a formerly enqueued task is complete

    Returns:
        True if task is complete
    """
    try:
        return self.queue.task_done()
    except ValueError:
        # persistqueue raises it if task_done called too many times we can ignore it
        return