Manage the training part of the experiment.
Attributes
Classes
Job
Job(reqs=None, training_plan_class=None, training_plan_path=None, training_args=None, model_args=None, data=None, keep_files_dir=None)
Represents the entity that manage the training part at the nodes level
Starts a message queue, loads python model file created by researcher (through [training_plans
][fedbiomed.common.training_plans]) and saves the loaded model in a temporary file (under the filename '
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reqs | Requests | Researcher's requests assigned to nodes. Defaults to None. | None |
training_plan_class | Union[Type[Callable], str] | instance or class of the TrainingPlan. | None |
training_plan_path | str | Path to file containing model class code | None |
training_args | TrainingArgs | Contains training parameters; lr, epochs, batch_size. | None |
model_args | dict | Contains output and input feature dimension | None |
data | FederatedDataSet | Federated datasets | None |
keep_files_dir | str | Directory for storing files created by the job that we want to keep beyond the execution of the job. Defaults to None, files are not kept after the end of the job. | None |
Raises:
Type | Description |
---|---|
NameError | If model is not defined or if the class can not to be inspected |
Source code in fedbiomed/researcher/job.py
def __init__(self,
reqs: Requests = None,
training_plan_class: Union[Type[Callable], str] = None,
training_plan_path: str = None,
training_args: TrainingArgs = None,
model_args: dict = None,
data: FederatedDataSet = None,
keep_files_dir: str = None):
""" Constructor of the class
Args:
reqs: Researcher's requests assigned to nodes. Defaults to None.
training_plan_class: instance or class of the TrainingPlan.
training_plan_path: Path to file containing model class code
training_args: Contains training parameters; lr, epochs, batch_size.
model_args: Contains output and input feature dimension
data: Federated datasets
keep_files_dir: Directory for storing files created by the job that we want to keep beyond the execution
of the job. Defaults to None, files are not kept after the end of the job.
Raises:
NameError: If model is not defined or if the class can not to be inspected
"""
# List of node ID of the nodes used in the current round
# - initially None (no current round yet)
# - then updated during the round with the list of nodes to be used in the round, then the nodes
# that actually replied during the round
self._nodes : Optional[List[str]] = None
self._id = JOB_PREFIX + str(uuid.uuid4()) # creating a unique job id
self._researcher_id = environ['RESEARCHER_ID']
self._repository_args = {}
self._training_args = training_args
self._model_args = model_args
self._training_replies = {} # will contain all node replies for every round (type: Dict[Responses]])
self._model_file = None # path to local file containing model code
self._model_params_file = "" # path to local file containing current version of aggregated params
self._training_plan_class = training_plan_class
# self._training_plan = None # declared below, as a TrainingPlan instance
self._aggregator_args = None
if keep_files_dir:
self._keep_files_dir = keep_files_dir
else:
self._keep_files_dir = tempfile.mkdtemp(prefix=environ['TMP_DIR'])
atexit.register(lambda: shutil.rmtree(self._keep_files_dir)) # remove directory
# when script ends running (replace
# `with tempfile.TemporaryDirectory(dir=environ['TMP_DIR']) as self._keep_files_dir: `)
if reqs is None:
self._reqs = Requests()
else:
self._reqs = reqs
self.last_msg = None
self._data = data
self._node_state_agent = NodeStateAgent(list(self._data.data().keys())
if self._data and self._data.data() else [])
# Model is mandatory
if self._training_plan_class is None:
mess = "Missing training plan class name or instance in Job arguments"
logger.critical(mess)
raise NameError(mess)
# handle case when model is in a file
if training_plan_path is not None:
try:
# import model from python file
model_module = os.path.basename(training_plan_path)
model_module = re.search("(.*)\.py$", model_module).group(1)
sys.path.insert(0, os.path.dirname(training_plan_path))
module = importlib.import_module(model_module)
tr_class = getattr(module, self._training_plan_class)
self._training_plan_class = tr_class
sys.path.pop(0)
except Exception as e:
e = sys.exc_info()
logger.critical(f"Cannot import class {self._training_plan_class} from "
f"path {training_plan_path} - Error: {str(e)}")
sys.exit(-1)
# check class is defined
try:
_ = inspect.isclass(self._training_plan_class)
except NameError:
mess = f"Cannot find training plan for Job, training plan class {self._training_plan_class} is not defined"
logger.critical(mess)
raise NameError(mess)
# create/save TrainingPlan instance
if inspect.isclass(self._training_plan_class):
self._training_plan = self._training_plan_class() # contains TrainingPlan
else:
self._training_plan = self._training_plan_class
self._training_plan.configure_dependencies()
# find the name of the class in any case
# (it is `model` only in the case where `model` is not an instance)
self._training_plan_name = self._training_plan.__class__.__name__
self.repo = Repository(environ['UPLOADS_URL'], self._keep_files_dir, environ['CACHE_DIR'])
training_plan_module = 'my_model_' + str(uuid.uuid4())
self._training_plan_file = os.path.join(self._keep_files_dir, training_plan_module + '.py')
try:
self._training_plan.save_code(self._training_plan_file)
except Exception as e:
logger.error("Cannot save the training plan to a local tmp dir : " + str(e))
return
# upload my_model_xxx.py on repository server (contains model definition)
repo_response = self.repo.upload_file(self._training_plan_file)
self._repository_args['training_plan_url'] = repo_response['file']
self._training_plan = self._load_training_plan_from_file(training_plan_module)
self._training_plan.post_init(model_args={} if self._model_args is None else self._model_args,
training_args=self._training_args)
# Save model parameters to a local file and upload it to the remote repository.
# The filename and remote url are assigned to attributes through this call.
try:
self.update_parameters()
except SystemExit:
return
# (below) regex: matches a character not present among "^", "\", "."
# characters at the end of string.
self._repository_args['training_plan_class'] = self._training_plan_name
# Validate fields in each argument
self.validate_minimal_arguments(self._repository_args,
['training_plan_url', 'training_plan_class', 'params_url'])
Attributes
aggregator_args property
aggregator_args
id property
id
last_msg instance-attribute
last_msg = None
nodes property
writable
nodes
repo instance-attribute
repo = Repository(environ['UPLOADS_URL'], self._keep_files_dir, environ['CACHE_DIR'])
requests property
requests
training_args property
writable
training_args
training_plan property
training_plan
training_plan_file property
training_plan_file
training_plan_name property
training_plan_name
training_replies property
training_replies
Functions
check_training_plan_is_approved_by_nodes
check_training_plan_is_approved_by_nodes()
Checks whether model is approved or not.
This method sends training-plan-status
request to the nodes. It should be run before running experiment. So, researchers can find out if their model has been approved
Source code in fedbiomed/researcher/job.py
def check_training_plan_is_approved_by_nodes(self) -> List:
""" Checks whether model is approved or not.
This method sends `training-plan-status` request to the nodes. It should be run before running experiment.
So, researchers can find out if their model has been approved
"""
message = {
'researcher_id': self._researcher_id,
'job_id': self._id,
'training_plan_url': self._repository_args['training_plan_url'],
'command': 'training-plan-status'
}
responses = Responses([])
replied_nodes = []
node_ids = self._data.node_ids()
# Send message to each node that has been found after dataset search request
for cli in node_ids:
logger.info('Sending request to node ' +
str(cli) + " to check model is approved or not")
self._reqs.send_message(
message,
cli)
# Wait for responses
for resp in self._reqs.get_responses(look_for_commands=['training-plan-status'], only_successful=False):
responses.append(resp)
replied_nodes.append(resp.get('node_id'))
if resp.get('success') is True:
if resp.get('approval_obligation') is True:
if resp.get('status') == TrainingPlanApprovalStatus.APPROVED.value:
logger.info(f'Training plan has been approved by the node: {resp.get("node_id")}')
else:
logger.warning(f'Training plan has NOT been approved by the node: {resp.get("node_id")}.' +
f'Training plan status : {resp.get("status")}')
else:
logger.info(f'Training plan approval is not required by the node: {resp.get("node_id")}')
else:
logger.warning(f"Node : {resp.get('node_id')} : {resp.get('msg')}")
# Get the nodes that haven't replied training-plan-status request
non_replied_nodes = list(set(node_ids) - set(replied_nodes))
if non_replied_nodes:
logger.warning(f"Request for checking training plan status hasn't been replied \
by the nodes: {non_replied_nodes}. You might get error \
while running your experiment. ")
return responses
extract_received_optimizer_aux_var_from_round
extract_received_optimizer_aux_var_from_round(round_id)
Restructure the received auxiliary variables (if any) from a round.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
round_id | int | Index of the round, replies from which to parse through. | required |
Returns:
Type | Description |
---|---|
Dict[str, Dict[str, Dict[str, Any]]] | Dict of auxiliary variables, collating node-wise information, with |
Dict[str, Dict[str, Dict[str, Any]]] | format |
Source code in fedbiomed/researcher/job.py
def extract_received_optimizer_aux_var_from_round(
self,
round_id: int,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Restructure the received auxiliary variables (if any) from a round.
Args:
round_id: Index of the round, replies from which to parse through.
Returns:
Dict of auxiliary variables, collating node-wise information, with
format `{mod_name: {node_id: node_dict}}`.
"""
aux_var = {} # type: Dict[str, Dict[str, Dict[str, Any]]]
for reply in self._training_replies[round_id]:
node_id = reply["node_id"]
node_av = reply.get("optim_aux_var", {})
for module, params in node_av.items():
aux_var.setdefault(module, {})[node_id] = params
return aux_var
load_state_breakpoint
load_state_breakpoint(saved_state)
Load breakpoints state for a Job from a saved state
Parameters:
Name | Type | Description | Default |
---|---|---|---|
saved_state | Dict[str, Any] | breakpoint content | required |
Source code in fedbiomed/researcher/job.py
def load_state_breakpoint(self, saved_state: Dict[str, Any]) -> None:
"""Load breakpoints state for a Job from a saved state
Args:
saved_state: breakpoint content
"""
# Reload the job and researched ids.
self._id = saved_state.get('job_id')
self._researcher_id = saved_state.get('researcher_id')
self._node_state_agent.load_state_breakpoint(saved_state.get('node_state'))
# Upload the latest model parameters. This records the filename and url.
self.update_parameters(filename=saved_state.get("model_params_path"))
# Reload the latest training replies.
self._training_replies = self._load_training_replies(
saved_state.get('training_replies', [])
)
save_state_breakpoint
save_state_breakpoint(breakpoint_path)
Creates current state of the job to be included in a breakpoint.
Includes creating links to files included in the job state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
breakpoint_path | str | path to the existing breakpoint directory | required |
Returns:
Type | Description |
---|---|
dict | Job's current state for breakpoint |
Source code in fedbiomed/researcher/job.py
def save_state_breakpoint(self, breakpoint_path: str) -> dict:
"""Creates current state of the job to be included in a breakpoint.
Includes creating links to files included in the job state.
Args:
breakpoint_path: path to the existing breakpoint directory
Returns:
Job's current state for breakpoint
"""
# Note: some state is passed to __init__() thus is not managed
# as job state but as experiment state in current version
state = {
'researcher_id': self._researcher_id,
'job_id': self._id,
'model_params_path': self._model_params_file,
'training_replies': self._save_training_replies(self._training_replies),
'node_state': self._node_state_agent.save_state_breakpoint()
}
state['model_params_path'] = create_unique_link(
breakpoint_path, 'aggregated_params_current', '.mpk',
os.path.join('..', os.path.basename(state["model_params_path"]))
)
for round_replies in state['training_replies']:
for response in round_replies:
node_params_path = create_unique_file_link(
breakpoint_path, response['params_path']
)
response['params_path'] = node_params_path
return state
start_nodes_training_round
start_nodes_training_round(round_, aggregator_args_thr_msg, aggregator_args_thr_files, secagg_arguments=None, do_training=True, optim_aux_var=None)
Sends training request to nodes and waits for the responses
Parameters:
Name | Type | Description | Default |
---|---|---|---|
round_ | int | current number of round the algorithm is performing (a round is considered to be all the training steps of a federated model between 2 aggregations). | required |
aggregator_args_thr_msg | Dict[str, Dict[str, Any]] | dictionary containing some metadata about the aggregation strategy, useful to transfer some data when it's required by am aggregator. First key should be the node_id, and sub-dictionary sould be parameters to be sent through MQTT messaging system | required |
aggregator_args_thr_files | Dict[str, Dict[str, Any]] | dictionary containing metadata about aggregation strategy, to be transferred via the Repository's HTTP API, as opposed to the mqtt system. Format is the same as aggregator_args_thr_msg . | required |
secagg_arguments | Union[Dict, None] | Secure aggregation ServerKey context id | None |
do_training | bool | if False, skip training in this round (do only validation). Defaults to True. | True |
optim_aux_var | Optional[Dict[str, Dict[str, Any]]] | Auxiliary variables of the researcher-side Optimizer, if any. Note that such variables may only be used if both the Experiment and node-side training plan hold a declearn-based Optimizer, and their plug-ins are coherent with each other as to expected information exchange. | None |
Source code in fedbiomed/researcher/job.py
def start_nodes_training_round(
self,
round_: int,
aggregator_args_thr_msg: Dict[str, Dict[str, Any]],
aggregator_args_thr_files: Dict[str, Dict[str, Any]],
secagg_arguments: Union[Dict, None] = None,
do_training: bool = True,
optim_aux_var: Optional[Dict[str, Dict[str, Any]]] = None,
) -> None:
""" Sends training request to nodes and waits for the responses
Args:
round_: current number of round the algorithm is performing (a round is considered to be all the
training steps of a federated model between 2 aggregations).
aggregator_args_thr_msg: dictionary containing some metadata about the aggregation
strategy, useful to transfer some data when it's required by am aggregator. First key should be the
node_id, and sub-dictionary sould be parameters to be sent through MQTT messaging system
aggregator_args_thr_files: dictionary containing metadata about aggregation strategy, to be transferred
via the Repository's HTTP API, as opposed to the mqtt system. Format is the same as
aggregator_args_thr_msg .
secagg_arguments: Secure aggregation ServerKey context id
do_training: if False, skip training in this round (do only validation). Defaults to True.
optim_aux_var: Auxiliary variables of the researcher-side Optimizer, if any.
Note that such variables may only be used if both the Experiment and node-side training plan
hold a declearn-based [Optimizer][fedbiomed.common.optimizers.Optimizer], and their plug-ins
are coherent with each other as to expected information exchange.
"""
# Assign empty dict to secagg arguments if it is None
if secagg_arguments is None:
secagg_arguments = {}
headers = {
'researcher_id': self._researcher_id,
'job_id': self._id,
'training_args': self._training_args.dict(),
'training': do_training,
'model_args': self._model_args,
'round': round_,
'secagg_servkey_id': secagg_arguments.get('secagg_servkey_id'),
'secagg_biprime_id': secagg_arguments.get('secagg_biprime_id'),
'secagg_random': secagg_arguments.get('secagg_random'),
'secagg_clipping_range': secagg_arguments.get('secagg_clipping_range'),
'command': 'train',
'aggregator_args': {},
'aux_var_urls': None,
}
msg = {**headers, **self._repository_args}
time_start = {}
if do_training:
# update node states when used node list has changed from one round to another
self._update_nodes_states_agent()
# pass heavy aggregator params through file exchange system
self.upload_aggregator_args(aggregator_args_thr_msg, aggregator_args_thr_files)
# Upload optimizer auxiliary variables, when there are.
if do_training and optim_aux_var:
aux_url_shared, aux_url_bynode = (
self.upload_agg_optimizer_aux_var(optim_aux_var)
)
else:
aux_url_shared = None
aux_url_bynode = {}
# FIXME: should be part of a method called from Experiment
# (behaviour can be defined by user / changed by strategy)
nodes_state_ids = self._node_state_agent.get_last_node_states()
for cli in self._nodes:
msg['dataset_id'] = self._data.data()[cli]['dataset_id']
cli_aux_urls = (aux_url_shared, aux_url_bynode.get(cli, None))
msg['aux_var_urls'] = [url for url in cli_aux_urls if url] or None
msg['state_id'] = nodes_state_ids.get(cli)
if aggregator_args_thr_msg:
# add aggregator parameters to message header
msg['aggregator_args'] = aggregator_args_thr_msg[cli]
if not do_training:
logger.info(f'\033[1mSending request\033[0m \n'
f'\t\t\t\t\t\033[1m To\033[0m: {str(cli)} \n'
f'\t\t\t\t\t\033[1m Request: \033[0m:Perform final validation on '
f'aggregated parameters \n {5 * "-------------"}')
else:
msg_print = {key: value for key, value in msg.items()
if key != 'aggregator_args' and logger.level != "DEBUG" }
logger.info(f'\033[1mSending request\033[0m \n'
f'\t\t\t\t\t\033[1m To\033[0m: {str(cli)} \n'
f'\t\t\t\t\t\033[1m Request: \033[0m: Perform training with the arguments: '
f'{str(msg_print)} '
f'\n {5 * "-------------"}')
time_start[cli] = time.perf_counter()
self._reqs.send_message(msg, cli) # send request to node
# Recollect models trained
self._training_replies[round_] = Responses([])
while self.waiting_for_nodes(self._training_replies[round_]):
# collect nodes responses from researcher request 'train'
# (wait for all nodes with a ` while true` loop)
# models_done = self._reqs.get_responses(look_for_commands=['train'])
models_done = self._reqs.get_responses(look_for_commands=['train', 'error'], only_successful=False)
for m in models_done.data(): # retrieve all models
# (there should have as many models done as nodes)
# manage error messages during training
if m['command'] == 'error':
if m['extra_msg']:
logger.info(f"Error message received during training: {str(m['errnum'].value)} "
f"- {str(m['extra_msg'])}")
else:
logger.info(f"Error message received during training: {str(m['errnum'].value)}")
faulty_node = m['node_id'] # remove the faulty node from the list
if faulty_node not in list(self._nodes):
logger.warning(f"Error message from {faulty_node} ignored, since this node is not part ot "
f"the training any mode")
continue
self._nodes.remove(faulty_node)
continue
# only consider replies for our request
if m['researcher_id'] != environ['RESEARCHER_ID'] or \
m['job_id'] != self._id or m['node_id'] not in list(self._nodes):
continue
# manage training failure for this job
if not m['success']:
logger.error(f"Training failed for node {m['node_id']}: {m['msg']}")
self._nodes.remove(m['node_id']) # remove the faulty node from the list
continue
rtime_total = time.perf_counter() - time_start[m['node_id']]
if do_training:
logger.info(f"Downloading model params after training on {m['node_id']} - from {m['params_url']}")
try:
_, params_path = self.repo.download_file(m["params_url"], f"node_params_{uuid.uuid4()}.mpk")
except FedbiomedRepositoryError as err:
logger.error(f"Cannot download model parameter from node {m['node_id']}, probably because Node"
f" stops working (details: {err})")
return
results = Serializer.load(params_path)
params = results["model_weights"]
optimizer_args = results.get("optimizer_args")
optim_aux_var = results.get("optim_aux_var", {})
encryption_factor = results.get('encryption_factor', None)
else:
params_path = None
params = None
optimizer_args = None
encryption_factor = None
# TODO: could choose completely different name/structure for
timing = m['timing']
timing['rtime_total'] = rtime_total
response = Responses({
'success': m['success'],
'msg': m['msg'],
'dataset_id': m['dataset_id'],
'node_id': m['node_id'],
'state_id': m['state_id'],
'params_path': params_path,
'params': params,
'optimizer_args': optimizer_args,
'optim_aux_var': optim_aux_var,
'sample_size': m["sample_size"],
'encryption_factor': encryption_factor,
'timing': timing,
})
self._training_replies[round_].append(response)
if do_training:
# update node states with node answers + when used node list has changed during the round
self._update_nodes_states_agent(before_training=False)
# return the list of nodes which answered because nodes in error have been removed
return self._nodes
update_parameters
update_parameters(params=None, filename=None)
Save and upload global model parameters, optionally after updating them.
This method is designed to save and upload the parameters of the wrapped training plan instance. It may also be used to update these parameters prior to their upload, whether based on provided in-memory values or on a pre-exported dump file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params | Optional[Dict[str, Any]] | data structure containing the new version of the aggregated parameters for this job, | None |
filename | Optional[str] | path to the file containing the new version of the aggregated parameters for this job, | None |
is_model_params | whether params are models parameters or another value that must be sent | required | |
variable_name | name the filename with variable_name. Defaults to 'aggregated_prams'. | required | |
params | Optional[Dict[str, Any]] | Optional dict storing new aggregated parameters that are to be assigned to this job's training plan's model. If None, export and upload the current model parameters. | None |
filename | Optional[str] | Optional path to a pre-existing file containing the aggregated parameters to load an upload. If | None |
Returns:
Name | Type | Description |
---|---|---|
filename | str | path to the local parameters file |
url | str | url at which the file was uploaded |
Raises:
Type | Description |
---|---|
ValueError | if both |
!!! info "Notes": * The path to the created and/or uploaded file is stored under the _model_params_file
attribute, that is updated by this method. * The url of the uploaded file is stored under the _repository_args["params_url"]
attribute, that is also updated by this method.
!!! warning "Warning": * The params
and filename
parameters are mutually-exclusive.
Source code in fedbiomed/researcher/job.py
def update_parameters(
self,
params: Optional[Dict[str, Any]] = None,
filename: Optional[str] = None,
) -> Tuple[str, str]:
"""Save and upload global model parameters, optionally after updating them.
This method is designed to save and upload the parameters of the wrapped
training plan instance. It may also be used to update these parameters
prior to their upload, whether based on provided in-memory values or on
a pre-exported dump file.
Args:
params: data structure containing the new version of the aggregated parameters for this job,
defaults to empty dictionary {}
filename: path to the file containing the new version of the aggregated parameters for this job,
defaults to None.
is_model_params: whether params are models parameters or another value that must be sent
through file exchange system. Defaults to True (argument are model parameters).
variable_name: name the filename with variable_name. Defaults to 'aggregated_prams'.
params: Optional dict storing new aggregated parameters that are to
be assigned to this job's training plan's model.
If None, export and upload the current model parameters.
filename: Optional path to a pre-existing file containing the
aggregated parameters to load an upload.
If `params` is not None, `filename` has to be None.
Returns:
filename: path to the local parameters file
url: url at which the file was uploaded
Raises:
ValueError: if both `params` and `filename` are provoided: these parameters are mutually-exclusive.
!!! info "Notes":
* The path to the created and/or uploaded file is stored under the `_model_params_file` attribute,
that is updated by this method.
* The url of the uploaded file is stored under the `_repository_args["params_url"]` attribute,
that is also updated by this method.
!!! warning "Warning":
* The `params` and `filename` parameters are mutually-exclusive.
"""
try:
if params and filename:
raise ValueError("'update_parameters' received both filename and params: only one may be used.")
# Case when uploading a pre-existing file: load the parameters.
if filename:
params = Serializer.load(filename)["model_weights"]
self._training_plan.set_model_params(params)
# Case when exporting current parameters: create a local dump file.
else:
# Case when uploading the current parameters: gather them.
if params is None:
params = self._training_plan.get_model_params()
# Case when uploading a new set of parameters: assign them.
else:
self._training_plan.set_model_params(params)
# At any rate, create a local dump file.
filename = os.path.join(self._keep_files_dir, f"aggregated_params_{uuid.uuid4()}.mpk")
params_dump = {
"researcher_id": self._researcher_id,
"model_weights": params,
}
Serializer.dump(params_dump, filename)
# Upload the file and record its local and remote locations.
self._model_params_file = filename
repo_response = self.repo.upload_file(filename)
self._repository_args["params_url"] = url = repo_response["file"]
# Return the local path and remote url to the file.
return filename, url
# Log exceptions and trigger a system exit if one is raised.
except Exception:
exc = sys.exc_info()
logger.error("'Job.update_parameters' failed with error: %s", exc)
sys.exit(-1)
upload_agg_optimizer_aux_var
upload_agg_optimizer_aux_var(aux_var)
Upload auxiliary variables emitted by a researcher-side Optimizer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aux_var | Dict[str, Dict[str, Any]] | Dict of auxiliary variables emitted by an Optimizer held by the researcher, that are to be uploaded after having been structured into multiple files, to avoid information leakage as well as content redundancy. | required |
Returns:
Name | Type | Description |
---|---|---|
url_shared | Optional[str] | url of a file containing auxiliary variables shared across all nodes, or None (in the absence of such information). |
url_bynode | Dict[UUID, str] | dict mapping urls of files containing node-specific auxiliary variables to the nodes' id (a missing |
!!!info "Note": The use of both a shared URL and node-specific one is merely a way to reduce communication costs by uploading only once the information that is to be downloaded by each and every node.
Source code in fedbiomed/researcher/job.py
def upload_agg_optimizer_aux_var(
self,
aux_var: Dict[str, Dict[str, Any]],
) -> Tuple[Optional[str], Dict[uuid.UUID, str]]:
"""Upload auxiliary variables emitted by a researcher-side Optimizer.
Args:
aux_var: Dict of auxiliary variables emitted by an Optimizer held
by the researcher, that are to be uploaded after having been
structured into multiple files, to avoid information leakage
as well as content redundancy.
Returns:
url_shared: url of a file containing auxiliary variables shared
across all nodes, or None (in the absence of such information).
url_bynode: dict mapping urls of files containing node-specific
auxiliary variables to the nodes' id (a missing `nodes` key
indicates that this node has no such information to receive).
!!!info "Note":
The use of both a shared URL and node-specific one is merely a
way to reduce communication costs by uploading only once the
information that is to be downloaded by each and every node.
"""
# Split the information between shared and node-wise dictionaries.
aux_shared, aux_bynode = self._prepare_agg_optimizer_aux_var(
aux_var=aux_var, nodes=list(self._nodes)
)
# Upload the shared information that all nodes will download.
if aux_shared:
path = os.path.join(
self._keep_files_dir, f"aux_var_shared_{uuid.uuid4()}.mpk"
)
Serializer.dump(aux_shared, path)
url_shared = self.repo.upload_file(path)["file"]
else:
url_shared = None
# Upload the node-specific information, with node-specific urls.
url_bynode = {} # type: Dict[uuid.UUID, str]
for node_id, node_aux in aux_bynode.items():
if not node_aux:
continue
path = os.path.join(
self._keep_files_dir,
f"aux_var_node_{node_id}_{uuid.uuid4()}.mpk"
)
Serializer.dump(node_aux, path)
url_bynode[node_id] = self.repo.upload_file(path)["file"]
# Return the urls of the uploaded files.
return url_shared, url_bynode
upload_aggregator_args
upload_aggregator_args(args_thr_msg, args_thr_files)
Uploads aggregator metadata to the Repository and updates the mqtt message accordingly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args_thr_msg | Union[Dict[str, Dict[str, Any]], dict] | dictionary containing metadata about the aggregation strategy, useful to transfer some data when it's required by am aggregator. First key should be the node_id, and sub-dictionary should be parameters to be sent through MQTT messaging system. This dictionary may be modified by this function with additional metadata about other metadata transferred via the Repository. | required |
args_thr_files | Union[Dict[str, Dict[str, Any]], dict] | dictionary containing metadata about aggregation strategy, to be transferred via the Repository's HTTP API, as opposed to the mqtt system. Format is the same as aggregator_args_thr_msg . | required |
Returns:
Type | Description |
---|---|
Dict[str, Dict[str, Any]] | The updated dictionary with metadata to be introduced in the mqtt message. |
Source code in fedbiomed/researcher/job.py
def upload_aggregator_args(self,
args_thr_msg: Union[Dict[str, Dict[str, Any]], dict],
args_thr_files: Union[Dict[str, Dict[str, Any]], dict]) -> Dict[str, Dict[str, Any]]:
"""Uploads aggregator metadata to the Repository and updates the mqtt message accordingly.
Args:
args_thr_msg: dictionary containing metadata about the aggregation
strategy, useful to transfer some data when it's required by am aggregator. First key should be the
node_id, and sub-dictionary should be parameters to be sent through MQTT messaging system. This
dictionary may be modified by this function with additional metadata about other metadata
transferred via the Repository.
args_thr_files: dictionary containing metadata about aggregation strategy, to be transferred
via the Repository's HTTP API, as opposed to the mqtt system. Format is the same as
aggregator_args_thr_msg .
Returns:
The updated dictionary with metadata to be introduced in the mqtt message.
"""
for node_id, aggr_params in args_thr_files.items():
for arg_name, aggr_param in aggr_params.items():
if arg_name == 'aggregator_name':
continue
args_thr_msg[node_id][arg_name] = {}
args_thr_msg[node_id][arg_name]['arg_name'] = arg_name # name of the argument to look at
try:
filename = os.path.join(self._keep_files_dir, f"{arg_name}_{uuid.uuid4()}.mpk")
Serializer.dump(aggr_param, filename)
url = self.repo.upload_file(filename)["file"]
except Exception as exc:
logger.critical("Failed to export %s to local file and upload it: %s", arg_name, exc)
sys.exit(-1)
args_thr_msg[node_id][arg_name]['filename'] = filename # path to the file with the parameters
args_thr_msg[node_id][arg_name]['url'] = url
return args_thr_msg
validate_minimal_arguments staticmethod
validate_minimal_arguments(obj, fields)
Validates a given dictionary by given mandatory fields.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj | dict | Object to be validated | required |
fields | Union[tuple, list] | List of fields that should be present on the obj | required |
Source code in fedbiomed/researcher/job.py
@staticmethod
def validate_minimal_arguments(obj: dict, fields: Union[tuple, list]):
""" Validates a given dictionary by given mandatory fields.
Args:
obj: Object to be validated
fields: List of fields that should be present on the obj
"""
for f in fields:
assert f in obj.keys(), f'Field {f} is required in object {obj}. Was not found.'
if 'url' in f:
assert validators.url(obj[f]), f'Url not valid: {f}'
waiting_for_nodes
waiting_for_nodes(responses)
Verifies if all nodes involved in the job are present and Responding
Parameters:
Name | Type | Description | Default |
---|---|---|---|
responses | Responses | contains message answers | required |
Returns:
Type | Description |
---|---|
bool | False if all nodes are present in the Responses object. True if waiting for at least one node. |
Source code in fedbiomed/researcher/job.py
def waiting_for_nodes(self, responses: Responses) -> bool:
""" Verifies if all nodes involved in the job are present and Responding
Args:
responses: contains message answers
Returns:
False if all nodes are present in the Responses object. True if waiting for at least one node.
"""
try:
nodes_done = set(responses.dataframe()['node_id'])
except KeyError:
nodes_done = set()
return not nodes_done == set(self._nodes)
localJob
localJob(dataset_path=None, training_plan_class='MyTrainingPlan', training_plan_path=None, training_args=None, model_args=None)
Represents the entity that manage the training part. LocalJob is the version of Job but applied locally on a local dataset (thus not involving any network). It is only used to compare results to a Federated approach, using networks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_path | The path where data is stored on local disk. | None | |
training_plan_class | str | Name of the model class to use for training or model class. | 'MyTrainingPlan' |
training_plan_path | str | path to file containing model code. Defaults to None. | None |
training_args | TrainingArgs | contains training parameters: lr, epochs, batch_size... | None |
model_args | dict | contains output and input feature dimension. | None |
Source code in fedbiomed/researcher/job.py
def __init__(self, dataset_path: str = None,
training_plan_class: str = 'MyTrainingPlan',
training_plan_path: str = None,
training_args: TrainingArgs = None,
model_args: dict = None):
"""
Constructor of the class
Args:
dataset_path : The path where data is stored on local disk.
training_plan_class: Name of the model class to use for training or model class.
training_plan_path: path to file containing model code. Defaults to None.
training_args: contains training parameters: lr, epochs, batch_size...
model_args: contains output and input feature dimension.
"""
self._id = str(uuid.uuid4())
self._repository_args = {}
self._localjob_training_args = training_args
self._model_args = model_args
self._training_args = TrainingArgs(training_args, only_required=False)
self.dataset_path = dataset_path
if training_args is not None:
if training_args.get('test_on_local_updates', False) \
or training_args.get('test_on_global_updates', False):
# if user wants to perform validation, display this message
logger.warning("Cannot perform validation, not supported for LocalJob")
# handle case when model is in a file
if training_plan_path is not None:
try:
model_module = os.path.basename(training_plan_path)
model_module = re.search("(.*)\.py$", model_module).group(1)
sys.path.insert(0, os.path.dirname(training_plan_path))
module = importlib.import_module(model_module)
tr_class = getattr(module, training_plan_class)
self._training_plan = tr_class()
sys.path.pop(0)
except Exception as e:
e = sys.exc_info()
logger.critical("Cannot import class " + training_plan_class + " from path " +
training_plan_path + " - Error: " + str(e))
sys.exit(-1)
else:
# create/save model instance
if inspect.isclass(training_plan_class):
self._training_plan = training_plan_class()
else:
self._training_plan = training_plan_class
self._training_plan.post_init(model_args=self._model_args,
training_args=self._training_args)
Attributes
dataset_path instance-attribute
dataset_path = dataset_path
model property
model
training_args property
writable
training_args
training_plan property
training_plan
Functions
start_training
start_training()
Sends training task to nodes and waits for the responses
Source code in fedbiomed/researcher/job.py
def start_training(self):
"""Sends training task to nodes and waits for the responses"""
# Run import statements (very unsafely).
for i in self._training_plan._dependencies:
exec(i, globals())
# Run the training routine.
try:
self._training_plan.set_dataset_path(self.dataset_path)
data_manager = self._training_plan.training_data()
tp_type = self._training_plan.type()
data_manager.load(tp_type=tp_type)
train_loader, test_loader = data_manager.split(test_ratio=0)
self._training_plan.training_data_loader = train_loader
self._training_plan.testing_data_loader = test_loader
self._training_plan.training_routine()
except Exception as exc:
logger.error("Cannot train model in job: %s", repr(exc))
# Save the current parameters.
else:
try:
# TODO: should test status code but not yet returned by upload_file
path = os.path.join(
environ["TMP_DIR"], f"local_params_{uuid.uuid4()}.mpk"
)
Serializer.dump(self._training_plan.get_model_params(), path)
except Exception as exc:
logger.error("Cannot write results: %s", repr(exc))