Classes that simplify imports from fedbiomed.node.jobs package
Classes
FAJob
FAJob(root_dir, dataset_manager, node_id, node_name, request, allow_fa)
Bases: _BaseJob
Represents the analytics execution performed by a node in a Federated Analytics job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
root_dir | str | Root fedbiomed directory where node instance files will be stored. | required |
dataset_manager | DatasetManager | DatasetManager instance to retrieve datasets | required |
node_id | str | Node id | required |
node_name | str | Node name (Hospital name) | required |
request | FARequest | FARequest message object containing all information about the FA task | required |
allow_fa | bool | True if federated analytics is allowed on this node, False otherwise | required |
Source code in fedbiomed/node/jobs/_fa_job.py
def __init__(
self,
root_dir: str,
dataset_manager: DatasetManager,
node_id: str,
node_name: str,
request: FARequest,
allow_fa: bool,
) -> None:
"""Constructor of the class
Args:
root_dir: Root fedbiomed directory where node instance files will be stored.
dataset_manager: DatasetManager instance to retrieve datasets
node_id: Node id
node_name: Node name (Hospital name)
request: FARequest message object containing all information about the FA task
allow_fa: True if federated analytics is allowed on this node, False otherwise
"""
super().__init__(root_dir, dataset_manager, node_id, node_name, request)
self._stats = request.stats
self._dataset_id = request.dataset_id
self._experiment_id = request.experiment_id
self._fa_id = request.fa_id
self._stats_args = request.stats_args
self._dataset_schema = request.dataset_schema
self._allow_fa = allow_fa
Functions
run
run()
Run FA job and return FAReply message or ErrorMessage in case of failure.
Source code in fedbiomed/node/jobs/_fa_job.py
def run(self) -> FAReply | ErrorMessage:
"""Run FA job and return FAReply message or ErrorMessage in case of failure."""
if not self._allow_fa:
return self._build_error_msg(
"Federated Analytics are not allowed on this node by node configuration.",
errnum=ErrorNumbers.FB325.value,
)
# Validate that all requested stats are valid enum values
valid_stats = {s.value for s in Stats}
if self._stats is not None:
invalid = [s for s in self._stats if s not in valid_stats]
if invalid:
return self._build_error_msg(
msg=f"'stats' contains unsupported values: {invalid}",
errnum=ErrorNumbers.FB325.value,
)
try:
dataset = self._build_dataset(DataReturnFormat.SKLEARN)
except _InternalJobError as e:
return self._build_error_msg(msg=repr(e), errnum=ErrorNumbers.FB325.value)
# Use compute_stats to handle all statistics
if not hasattr(dataset, "compute_stats"):
return self._build_error_msg(
msg="Dataset does not support analytics method 'compute_stats'.",
errnum=ErrorNumbers.FB325.value,
)
logger.debug(
f"FA Request received and database built, executing analytics: compute_stats (requested: {self._stats})"
)
# Prepare kwargs
kwargs = {
"stats": self._stats,
"stats_args": self._stats_args,
"dataset_schema": self._dataset_schema,
}
try:
output: Dict = dataset.compute_stats(**kwargs)
logger.debug(f"Analytics executed, output: {output}")
except Exception as e:
return self._build_error_msg(
msg=(
f"Error during execution of analytics '{self._stats}' "
f"on node='{self._node_id}': {repr(e)}"
),
errnum=ErrorNumbers.FB325.value,
)
return FAReply(
request_id=self._request_id,
researcher_id=self._researcher_id,
experiment_id=self._experiment_id,
fa_id=self._fa_id,
stats=self._stats,
node_id=self._node_id,
node_name=self._node_name,
output=output,
)
PreprocJob
PreprocJob(root_dir, dataset_manager, node_id, node_name, request, allow_preproc)
Bases: _BaseJob
This class represents the preprocessing part executed by a node in a given preprocessing job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
root_dir | str | Root fedbiomed directory where node instance files will be stored. | required |
dataset_manager | DatasetManager | DatasetManager instance to retrieve datasets | required |
node_id | str | Node id | required |
node_name | str | Node name (Hospital name) | required |
request | PreprocRequest | FARequest message object containing all information about the FA task | required |
allow_preproc | bool | True if preprocessing is allowed on this node, False otherwise | required |
Source code in fedbiomed/node/jobs/_preproc_job.py
def __init__(
self,
root_dir: str,
dataset_manager: DatasetManager,
node_id: str,
node_name: str,
request: PreprocRequest,
allow_preproc: bool,
) -> None:
"""Constructor of the class
Args:
root_dir: Root fedbiomed directory where node instance files will be stored.
dataset_manager: DatasetManager instance to retrieve datasets
node_id: Node id
node_name: Node name (Hospital name)
request: FARequest message object containing all information about the FA task
allow_preproc: True if preprocessing is allowed on this node, False otherwise
"""
super().__init__(root_dir, dataset_manager, node_id, node_name, request)
self._experiment_id = request.experiment_id
self._preproc_type_raw = request.preproc_type
self._preproc_step_raw = request.preproc_step
self._preproc_id = request.preproc_id
self._preproc_args_raw = request.preproc_args
self._state_id = request.state_id
self._allow_preproc = allow_preproc
Functions
run
run()
Execute preprocessing job.
Returns:
| Type | Description |
|---|---|
PreprocReply | ErrorMessage | PreprocReply message if successful, ErrorMessage otherwise. |
Source code in fedbiomed/node/jobs/_preproc_job.py
def run(self) -> PreprocReply | ErrorMessage:
"""Execute preprocessing job.
Returns:
PreprocReply message if successful, ErrorMessage otherwise.
"""
if not self._allow_preproc:
return self._build_error_msg(
"Preprocessing is not allowed on this node by node configuration.",
errnum=ErrorNumbers.FB326.value,
)
# Further check message parameters if needed
try:
self._preproc_type = PreprocType(self._preproc_type_raw)
except ValueError:
return self._build_error_msg(
f"Received invalid preproc_type: {self._preproc_type_raw}",
errnum=ErrorNumbers.FB326.value,
)
try:
self._preproc_step = HarmonizationStep(self._preproc_step_raw)
except ValueError:
return self._build_error_msg(
f"Received invalid preproc_step: {self._preproc_step_raw}",
errnum=ErrorNumbers.FB326.value,
)
# Here we can check content of some preproc_args
# Only checks common to all preproc types and steps can be implemented here,
# otherwise the check should be done in the specific preproc implementation
self._preproc_args = self._preproc_args_raw
try:
preproc_type_jobs = _preproc_type_to_jobs[self._preproc_type]
except KeyError:
return self._build_error_msg(
f"Unsupported preprocessing type: {self._preproc_type.name}",
errnum=ErrorNumbers.FB326.value,
)
# Check that dataset exists in local dataset registry and get its type
dataset_entry = self._dataset_manager.dataset_table.get_by_id(self._dataset_id)
if not isinstance(dataset_entry, dict):
return self._build_error_msg(
f"Dataset with id {self._dataset_id} not found in local database.",
errnum=ErrorNumbers.FB326.value,
)
dataset_type = dataset_entry.get("data_type")
try:
preproc_job_class = preproc_type_jobs()
preproc_output = preproc_job_class(
self._preproc_step, dataset_type, self._preproc_args
)
except Exception as e:
return self._build_error_msg(
f"Preprocessing job failed for {self._preproc_type.name} "
f"step {self._preproc_step_raw} / {self._preproc_step.name}: {str(e)}",
errnum=ErrorNumbers.FB326.value,
)
msg = (
f"Node {self._node_name} ({self._node_id}): "
f"Preprocessing step {self._preproc_step_raw} / {self._preproc_step.name} "
f"of type {self._preproc_type.name} for experiment {self._experiment_id}"
)
exclude_args = ["biological_model", "global_bias_model"]
filtered_args = {
k: v for k, v in self._preproc_args.items() if k not in exclude_args
}
preproc_output_summary = {
"type": type(preproc_output).__name__,
"keys": list(preproc_output.keys()),
}
logger.info(
f"Preprocessing executed successfully for {self._preproc_type.name} "
f"step {self._preproc_step_raw} / {self._preproc_step.name} "
f"with request id {self._request_id} preproc_id {self._preproc_id} dataset_id {self._dataset_id} "
f"preproc_args {filtered_args} except {exclude_args} and output_summary {preproc_output_summary}"
)
try:
return PreprocReply(
request_id=self._request_id,
researcher_id=self._researcher_id,
experiment_id=self._experiment_id,
node_id=self._node_id,
node_name=self._node_name,
msg=msg,
preproc_output=preproc_output,
state_id=self._state_id,
)
except Exception as e:
return self._build_error_msg(
f"Preprocessing job cannot reply: {str(e)}",
errnum=ErrorNumbers.FB326.value,
)