monitor class to trap information sent during training and sned it to tensordboard
Attributes
Classes
MetricStore
Bases: dict
Storage facility, used for storing training loss and testing metric values, in order to display them on Tensorboard. Inheriting from a dictionary, providing methods to simplify queries and saving metric values.
Storage architecture:
{<node>:
{<for_>:
{<metric_name>:
{<round_>: { <iterations/values>: List[float] }
}
}
}
}
node
: node id - for_
: either testing_global_updates, testing_local_updates, or training - metric_name
: metric 's name. Custom or Custom_xxx if testing_step has been defined in TrainingPlan (custom metric) - round_
: round number - iterations
: index of iterations stored - values
: metric value Functions
add_iteration
add_iteration(node, train, test_on_global_updates, round_, metric, iter_)
Method adding iteration to MetricStore based on node, training/validation, round and metric.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node | str | The node id that metric value received from | required |
train | bool | Training status, If true metric value is for training, Otherwise for validation | required |
test_on_global_updates | bool | If True metric value is for validation on global updates. Otherwise, for validation on local updates | required |
round_ | int | The round that metric value has received at | required |
metric | dict | Dictionary that contains metric names and their values e.g {' | required |
iter_ | int | Iteration number for validation/training. | required |
Returns List of cumulative iteration for each metric/validation result
Source code in fedbiomed/researcher/monitor.py
def add_iteration(self,
node: str,
train: bool,
test_on_global_updates: bool,
round_: int,
metric: dict,
iter_: int) -> list[int]:
"""
Method adding iteration to MetricStore based on node, training/validation, round and metric.
Args:
node: The node id that metric value received from
train: Training status, If true metric value is for training, Otherwise for validation
test_on_global_updates: If True metric value is for validation on global updates. Otherwise,
for validation on local updates
round_: The round that metric value has received at
metric: Dictionary that contains metric names and their values e.g {'<metric-name>':<value>}
iter_: Iteration number for validation/training.
Returns
List of cumulative iteration for each metric/validation result
"""
if node not in self:
self._register_node(node=node)
cum_iter = []
for metric_name, metric_value in metric.items():
for_ = 'training' if train is True else 'testing_global_updates' \
if test_on_global_updates is True else 'testing_local_updates'
if metric_name not in self[node][for_]:
self._register_metric(node=node, for_=for_, metric_name=metric_name)
# FIXME: for now, if validation is done on global updates (before model local update)
# last testing metric value computed on global updates at last round is overwritten
# by the first one computed at first round
if round_ in self[node][for_][metric_name]:
# Each duplication means a new epoch for training, and it is not expected for
# validation part. Especially for `testing_on_global_updates`. If there is a duplication
# last value should overwrite
duplicate = self._iter_duplication_status(round_=self[node][for_][metric_name][round_],
next_iter=iter_)
if duplicate and test_on_global_updates:
self._add_new_iteration(node, for_, metric_name, round_, iter_, metric_value, True)
else:
self._add_new_iteration(node, for_, metric_name, round_, iter_, metric_value)
else:
self._add_new_iteration(node, for_, metric_name, round_, iter_, metric_value, True)
cum_iter.append(self._cumulative_iteration(self[node][for_][metric_name]))
return cum_iter
Monitor
Monitor()
Monitors nodes scalar feed-backs during training
Source code in fedbiomed/researcher/monitor.py
def __init__(self):
"""Constructor of the class """
self._log_dir = environ['TENSORBOARD_RESULTS_DIR']
self._round = 1
self._metric_store = MetricStore()
self._event_writers = {}
self._round_state = 0
self._tensorboard = False
if os.listdir(self._log_dir):
logger.info('Removing tensorboard logs from previous experiment')
# Clear logs' directory from the files from other experiments.
self._remove_logs()
Functions
close_writer
close_writer()
Closes SummaryWriter
for each node
Source code in fedbiomed/researcher/monitor.py
def close_writer(self):
""" Closes `SummaryWriter` for each node """
# Close each open SummaryWriter
for node in self._event_writers:
self._event_writers[node].close()
on_message_handler
on_message_handler(msg)
Handler for messages received through general/monitoring channel. This method is used as callback function in Requests class
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg | Dict[str, Any] | incoming message from Node. Must contain key named | required |
Source code in fedbiomed/researcher/monitor.py
def on_message_handler(self, msg: Dict[str, Any]):
""" Handler for messages received through general/monitoring channel. This method is used as callback function
in Requests class
Args:
msg: incoming message from Node. Must contain key named `command`, describing the nature
of the command (currently the command is only add_scalar).
"""
# For now monitor can only handle add_scalar messages
# Save iteration value
cumulative_iter, *_ = self._metric_store.add_iteration(
node=msg['node_id'],
train=msg['train'],
test_on_global_updates=msg['test_on_global_updates'],
metric=msg['metric'],
round_=self._round,
iter_=msg['iteration'])
# Log metric result
self._log_metric_result(message=msg, cum_iter=cumulative_iter)
set_round
set_round(round_)
Setts round number that metric results will be received for.
By default, at the beginning round is equal to 1 which stands for the first round. T his method should be called by experiment run_once
after each round completed, and round should be set to current round + 1. This will inform monitor about the current round where the metric values are getting received.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
round_ | The round that metric value will be saved at they are received | required |
Source code in fedbiomed/researcher/monitor.py
def set_round(self, round_: int) -> int:
""" Setts round number that metric results will be received for.
By default, at the beginning round is equal to 1 which stands for the first round. T
his method should be called by experiment `run_once` after each round completed, and round should be set
to current round + 1. This will inform monitor about the current round where the metric values are getting
received.
Args:
round_ : The round that metric value will be saved at they are received
"""
self._round = round_
return self._round
set_tensorboard
set_tensorboard(tensorboard)
Sets tensorboard flag, which is used to decide the behavior of the writing scalar values into tensorboard log files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tensorboard | bool | if True, data contained in AddScalarReply message will be passed to tensorboard if False, fata will only be logged on the console | required |
Source code in fedbiomed/researcher/monitor.py
def set_tensorboard(self, tensorboard: bool):
""" Sets tensorboard flag, which is used to decide the behavior of the writing scalar values into
tensorboard log files.
Args:
tensorboard: if True, data contained in AddScalarReply message will be passed to tensorboard
if False, fata will only be logged on the console
"""
if isinstance(tensorboard, bool):
self._tensorboard = tensorboard
else:
logger.error("tensorboard should be a boolean")
self._tensorboard = False