Monitor

monitor class to trap information sent during training and sned it to tensordboard

Attributes

Classes

MetricStore

Bases: dict

Storage facility, used for storing training loss and testing metric values, in order to display them on Tensorboard. Inheriting from a dictionary, providing methods to simplify queries and saving metric values.

Storage architecture:

{<node>:
    {<for_>:
        {<metric_name>:
            {<round_>: { <iterations/values>: List[float] }
            }
        }
    }
}
Where: - node: node id - for_: either testing_global_updates, testing_local_updates, or training - metric_name: metric 's name. Custom or Custom_xxx if testing_step has been defined in TrainingPlan (custom metric) - round_: round number - iterations: index of iterations stored - values: metric value

Functions

add_iteration
add_iteration(node, train, test_on_global_updates, round_, metric, iter_)

Method adding iteration to MetricStore based on node, training/validation, round and metric.

Parameters:

Name Type Description Default
node str

The node id that metric value received from

required
train bool

Training status, If true metric value is for training, Otherwise for validation

required
test_on_global_updates bool

If True metric value is for validation on global updates. Otherwise, for validation on local updates

required
round_ int

The round that metric value has received at

required
metric dict

Dictionary that contains metric names and their values e.g {'':}

required
iter_ int

Iteration number for validation/training.

required

Returns List of cumulative iteration for each metric/validation result

Source code in fedbiomed/researcher/monitor.py
def add_iteration(self,
                  node: str,
                  train: bool,
                  test_on_global_updates: bool,
                  round_: int,
                  metric: dict,
                  iter_: int) -> list[int]:
    """
    Method adding iteration to MetricStore based on node, training/validation, round and metric.

    Args:
        node: The node id that metric value received from
        train: Training status, If true metric value is for training, Otherwise for validation
        test_on_global_updates: If True metric value is for validation on global updates. Otherwise,
            for validation on local updates
        round_: The round that metric value has received at
        metric: Dictionary that contains metric names and their values e.g {'<metric-name>':<value>}
        iter_: Iteration number for validation/training.

    Returns
         List of cumulative iteration for each metric/validation result
    """

    if node not in self:
        self._register_node(node=node)

    cum_iter = []
    for metric_name, metric_value in metric.items():

        for_ = 'training' if train is True else 'testing_global_updates' \
            if test_on_global_updates is True else 'testing_local_updates'

        if metric_name not in self[node][for_]:
            self._register_metric(node=node, for_=for_, metric_name=metric_name)

        # FIXME: for now, if validation is done on global updates (before model local update)
        # last testing metric value computed on global updates at last round is overwritten
        # by the first one computed at first round
        if round_ in self[node][for_][metric_name]:

            # Each duplication means a new epoch for training, and it is not expected for
            # validation part. Especially for `testing_on_global_updates`. If there is a duplication
            # last value should overwrite
            duplicate = self._iter_duplication_status(round_=self[node][for_][metric_name][round_],
                                                      next_iter=iter_)
            if duplicate and test_on_global_updates:
                self._add_new_iteration(node, for_, metric_name, round_, iter_, metric_value, True)
            else:
                self._add_new_iteration(node, for_, metric_name, round_, iter_, metric_value)
        else:
            self._add_new_iteration(node, for_, metric_name, round_, iter_, metric_value, True)

        cum_iter.append(self._cumulative_iteration(self[node][for_][metric_name]))
    return cum_iter

Monitor

Monitor()

Monitors nodes scalar feed-backs during training

Source code in fedbiomed/researcher/monitor.py
def __init__(self):
    """Constructor of the class """

    self._log_dir = environ['TENSORBOARD_RESULTS_DIR']
    self._round = 1
    self._metric_store = MetricStore()
    self._event_writers = {}
    self._round_state = 0
    self._tensorboard = False

    if os.listdir(self._log_dir):
        logger.info('Removing tensorboard logs from previous experiment')
        # Clear logs' directory from the files from other experiments.
        self._remove_logs()

Functions

close_writer
close_writer()

Closes SummaryWriter for each node

Source code in fedbiomed/researcher/monitor.py
def close_writer(self):
    """ Closes `SummaryWriter` for each node """
    # Close each open SummaryWriter
    for node in self._event_writers:
        self._event_writers[node].close()
on_message_handler
on_message_handler(msg)

Handler for messages received through general/monitoring channel. This method is used as callback function in Requests class

Parameters:

Name Type Description Default
msg Dict[str, Any]

incoming message from Node. Must contain key named command, describing the nature of the command (currently the command is only add_scalar).

required
Source code in fedbiomed/researcher/monitor.py
def on_message_handler(self, msg: Dict[str, Any]):
    """ Handler for messages received through general/monitoring channel. This method is used as callback function
    in Requests class

    Args:
        msg: incoming message from Node. Must contain key named `command`, describing the nature
            of the command (currently the command is only add_scalar).
    """

    # For now monitor can only handle add_scalar messages

        # Save iteration value
    cumulative_iter, *_ = self._metric_store.add_iteration(
        node=msg['node_id'],
        train=msg['train'],
        test_on_global_updates=msg['test_on_global_updates'],
        metric=msg['metric'],
        round_=self._round,
        iter_=msg['iteration'])

    # Log metric result
    self._log_metric_result(message=msg, cum_iter=cumulative_iter)
set_round
set_round(round_)

Setts round number that metric results will be received for.

By default, at the beginning round is equal to 1 which stands for the first round. T his method should be called by experiment run_once after each round completed, and round should be set to current round + 1. This will inform monitor about the current round where the metric values are getting received.

Parameters:

Name Type Description Default
round_

The round that metric value will be saved at they are received

required
Source code in fedbiomed/researcher/monitor.py
def set_round(self, round_: int) -> int:
    """ Setts round number that metric results will be received for.

    By default, at the beginning round is equal to 1 which stands for the first round. T
    his method should be called by experiment `run_once` after each round completed, and round should be set
    to current round + 1. This will inform monitor about the current round where the metric values are getting
    received.

    Args:
        round_ : The round that metric value will be saved at they are received
    """
    self._round = round_

    return self._round
set_tensorboard
set_tensorboard(tensorboard)

Sets tensorboard flag, which is used to decide the behavior of the writing scalar values into tensorboard log files.

Parameters:

Name Type Description Default
tensorboard bool

if True, data contained in AddScalarReply message will be passed to tensorboard if False, fata will only be logged on the console

required
Source code in fedbiomed/researcher/monitor.py
def set_tensorboard(self, tensorboard: bool):
    """ Sets tensorboard flag, which is used to decide the behavior of the writing scalar values into
     tensorboard log files.

    Args:
        tensorboard: if True, data contained in AddScalarReply message will be passed to tensorboard
                     if False, fata will only be logged on the console
    """
    if isinstance(tensorboard, bool):
        self._tensorboard = tensorboard
    else:
        logger.error("tensorboard should be a boolean")
        self._tensorboard = False