Jobs

Classes that simplify imports from fedbiomed.node.jobs package

Classes

FAJob

FAJob(root_dir, dataset_manager, node_id, node_name, request, allow_fa)

Bases: _BaseJob

Represents the analytics execution performed by a node in a Federated Analytics job.

Parameters:

Name Type Description Default
root_dir str

Root fedbiomed directory where node instance files will be stored.

required
dataset_manager DatasetManager

DatasetManager instance to retrieve datasets

required
node_id str

Node id

required
node_name str

Node name (Hospital name)

required
request FARequest

FARequest message object containing all information about the FA task

required
allow_fa bool

True if federated analytics is allowed on this node, False otherwise

required
Source code in fedbiomed/node/jobs/_fa_job.py
def __init__(
    self,
    root_dir: str,
    dataset_manager: DatasetManager,
    node_id: str,
    node_name: str,
    request: FARequest,
    allow_fa: bool,
) -> None:
    """Constructor of the class

    Args:
        root_dir: Root fedbiomed directory where node instance files will be stored.
        dataset_manager: DatasetManager instance to retrieve datasets
        node_id: Node id
        node_name: Node name (Hospital name)
        request: FARequest message object containing all information about the FA task
        allow_fa: True if federated analytics is allowed on this node, False otherwise
    """
    super().__init__(root_dir, dataset_manager, node_id, node_name, request)

    self._stats = request.stats
    self._dataset_id = request.dataset_id
    self._experiment_id = request.experiment_id
    self._fa_id = request.fa_id
    self._stats_args = request.stats_args
    self._dataset_schema = request.dataset_schema
    self._allow_fa = allow_fa

Functions

run
run()

Run FA job and return FAReply message or ErrorMessage in case of failure.

Source code in fedbiomed/node/jobs/_fa_job.py
def run(self) -> FAReply | ErrorMessage:
    """Run FA job and return FAReply message or ErrorMessage in case of failure."""

    if not self._allow_fa:
        return self._build_error_msg(
            "Federated Analytics are not allowed on this node by node configuration.",
            errnum=ErrorNumbers.FB325.value,
        )

    # Validate that all requested stats are valid enum values
    valid_stats = {s.value for s in Stats}
    if self._stats is not None:
        invalid = [s for s in self._stats if s not in valid_stats]
        if invalid:
            return self._build_error_msg(
                msg=f"'stats' contains unsupported values: {invalid}",
                errnum=ErrorNumbers.FB325.value,
            )

    try:
        dataset = self._build_dataset(DataReturnFormat.SKLEARN)
    except _InternalJobError as e:
        return self._build_error_msg(msg=repr(e), errnum=ErrorNumbers.FB325.value)

    # Use compute_stats to handle all statistics
    if not hasattr(dataset, "compute_stats"):
        return self._build_error_msg(
            msg="Dataset does not support analytics method 'compute_stats'.",
            errnum=ErrorNumbers.FB325.value,
        )

    logger.debug(
        f"FA Request received and database built, executing analytics: compute_stats (requested: {self._stats})"
    )

    # Prepare kwargs
    kwargs = {
        "stats": self._stats,
        "stats_args": self._stats_args,
        "dataset_schema": self._dataset_schema,
    }

    try:
        output: Dict = dataset.compute_stats(**kwargs)
        logger.debug(f"Analytics executed, output: {output}")
    except Exception as e:
        return self._build_error_msg(
            msg=(
                f"Error during execution of analytics '{self._stats}' "
                f"on node='{self._node_id}': {repr(e)}"
            ),
            errnum=ErrorNumbers.FB325.value,
        )

    return FAReply(
        request_id=self._request_id,
        researcher_id=self._researcher_id,
        experiment_id=self._experiment_id,
        fa_id=self._fa_id,
        stats=self._stats,
        node_id=self._node_id,
        node_name=self._node_name,
        output=output,
    )

PreprocJob

PreprocJob(root_dir, dataset_manager, node_id, node_name, request, allow_preproc)

Bases: _BaseJob

This class represents the preprocessing part executed by a node in a given preprocessing job.

Parameters:

Name Type Description Default
root_dir str

Root fedbiomed directory where node instance files will be stored.

required
dataset_manager DatasetManager

DatasetManager instance to retrieve datasets

required
node_id str

Node id

required
node_name str

Node name (Hospital name)

required
request PreprocRequest

FARequest message object containing all information about the FA task

required
allow_preproc bool

True if preprocessing is allowed on this node, False otherwise

required
Source code in fedbiomed/node/jobs/_preproc_job.py
def __init__(
    self,
    root_dir: str,
    dataset_manager: DatasetManager,
    node_id: str,
    node_name: str,
    request: PreprocRequest,
    allow_preproc: bool,
) -> None:
    """Constructor of the class

    Args:
        root_dir: Root fedbiomed directory where node instance files will be stored.
        dataset_manager: DatasetManager instance to retrieve datasets
        node_id: Node id
        node_name: Node name (Hospital name)
        request: FARequest message object containing all information about the FA task
        allow_preproc: True if preprocessing is allowed on this node, False otherwise
    """
    super().__init__(root_dir, dataset_manager, node_id, node_name, request)

    self._experiment_id = request.experiment_id
    self._preproc_type_raw = request.preproc_type
    self._preproc_step_raw = request.preproc_step
    self._preproc_id = request.preproc_id
    self._preproc_args_raw = request.preproc_args
    self._state_id = request.state_id
    self._allow_preproc = allow_preproc

Functions

run
run()

Execute preprocessing job.

Returns:

Type Description
PreprocReply | ErrorMessage

PreprocReply message if successful, ErrorMessage otherwise.

Source code in fedbiomed/node/jobs/_preproc_job.py
def run(self) -> PreprocReply | ErrorMessage:
    """Execute preprocessing job.

    Returns:
        PreprocReply message if successful, ErrorMessage otherwise.
    """
    if not self._allow_preproc:
        return self._build_error_msg(
            "Preprocessing is not allowed on this node by node configuration.",
            errnum=ErrorNumbers.FB326.value,
        )

    # Further check message parameters if needed
    try:
        self._preproc_type = PreprocType(self._preproc_type_raw)
    except ValueError:
        return self._build_error_msg(
            f"Received invalid preproc_type: {self._preproc_type_raw}",
            errnum=ErrorNumbers.FB326.value,
        )
    try:
        self._preproc_step = HarmonizationStep(self._preproc_step_raw)
    except ValueError:
        return self._build_error_msg(
            f"Received invalid preproc_step: {self._preproc_step_raw}",
            errnum=ErrorNumbers.FB326.value,
        )
    # Here we can check content of some preproc_args
    # Only checks common to all preproc types and steps can be implemented here,
    # otherwise the check should be done in the specific preproc implementation
    self._preproc_args = self._preproc_args_raw

    try:
        preproc_type_jobs = _preproc_type_to_jobs[self._preproc_type]
    except KeyError:
        return self._build_error_msg(
            f"Unsupported preprocessing type: {self._preproc_type.name}",
            errnum=ErrorNumbers.FB326.value,
        )

    # Check that dataset exists in local dataset registry and get its type
    dataset_entry = self._dataset_manager.dataset_table.get_by_id(self._dataset_id)
    if not isinstance(dataset_entry, dict):
        return self._build_error_msg(
            f"Dataset with id {self._dataset_id} not found in local database.",
            errnum=ErrorNumbers.FB326.value,
        )
    dataset_type = dataset_entry.get("data_type")

    try:
        preproc_job_class = preproc_type_jobs()
        preproc_output = preproc_job_class(
            self._preproc_step, dataset_type, self._preproc_args
        )
    except Exception as e:
        return self._build_error_msg(
            f"Preprocessing job failed for {self._preproc_type.name} "
            f"step {self._preproc_step_raw} / {self._preproc_step.name}: {str(e)}",
            errnum=ErrorNumbers.FB326.value,
        )

    msg = (
        f"Node {self._node_name} ({self._node_id}): "
        f"Preprocessing step {self._preproc_step_raw} / {self._preproc_step.name} "
        f"of type {self._preproc_type.name} for experiment {self._experiment_id}"
    )
    exclude_args = ["biological_model", "global_bias_model"]
    filtered_args = {
        k: v for k, v in self._preproc_args.items() if k not in exclude_args
    }

    preproc_output_summary = {
        "type": type(preproc_output).__name__,
        "keys": list(preproc_output.keys()),
    }

    logger.info(
        f"Preprocessing executed successfully for {self._preproc_type.name} "
        f"step {self._preproc_step_raw} / {self._preproc_step.name} "
        f"with request id {self._request_id} preproc_id {self._preproc_id} dataset_id {self._dataset_id} "
        f"preproc_args {filtered_args} except {exclude_args} and output_summary {preproc_output_summary}"
    )
    try:
        return PreprocReply(
            request_id=self._request_id,
            researcher_id=self._researcher_id,
            experiment_id=self._experiment_id,
            node_id=self._node_id,
            node_name=self._node_name,
            msg=msg,
            preproc_output=preproc_output,
            state_id=self._state_id,
        )
    except Exception as e:
        return self._build_error_msg(
            f"Preprocessing job cannot reply: {str(e)}",
            errnum=ErrorNumbers.FB326.value,
        )