Federated Analytics

This page describes the architecture of the Federated Analytics (FA) feature, the responsibility of each component, and which files to touch when making changes.


How It Works

Researcher                         Node
──────────                         ────
FederatedAnalytics.fetch_stats()
  └─ FARequestJob.execute()  ──── FARequest ──►  node.py
                                                   └─ FAJob.run()
                                                        ├─ dataset.compute_stats()
                                                        │    └─ AnalyticsOrchestrator
                                                        │         └─ Accumulators
                                                        └─ FAReply 
  └─ FAResult (aggregate)  ◄────────────────────────────────┘
  1. The researcher calls a stat method (e.g. exp.analytics.mean()).
  2. FederatedAnalytics checks its cache; missing stats are sent as an FARequest to each node via FARequestJob.
  3. On the node, FAJob validates the request and calls dataset.compute_stats().
  4. AnalyticsOrchestrator reads analytics_schema() from the dataset, builds an accumulator tree, iterates samples, and returns partial statistics.
  5. The node sends back an FAReply; the researcher side merges node replies into an FAResult and aggregates globally via AGGREGATORS_MAP.

Statistics, API and Aggregation

Available Statistics

Stats (in fedbiomed/common/constants.py) is the single source of truth for stat names. Every node accumulator, registry entry, and aggregator function must reference one of these values. Adding a stat starts here.

Current implementation status

Only tabular (ROW) data is supported. Image datasets are not yet covered by FA. The enabled statistics are count, mean, and variance. histogram is defined and partially implemented but is under validation — its implementation is not yet complete.

Enum String value Required stats_args key Status
Stats.COUNT "count" Enabled (tabular only)
Stats.MEAN "mean" Enabled; auto-requests count as dependency
Stats.VARIANCE "variance" Enabled; auto-requests mean + count
Stats.HISTOGRAM "histogram" bin_edges Under validation — implementation incomplete

Researcher-only derived stats

std and sum are computable on the researcher side from mean/variance/count via FAResult.global_stats() — they are never sent from nodes.

FederatedAnalytics is the entry point for all analytics requests. It handles caching and delegates network I/O to FARequestJob. The API is split into two methods depending on whether the requested statistic needs extra parameters.

Use fetch_stats when no computation arguments are needed — statistics are identified by name, and an optional dataset_schema restricts which columns or modalities are included:

fetch_stats(
    stats: str | list[str] | None = None,     # one or more Stats string values; None → ["count", "mean", "variance"]
    dataset_schema: str | list | None = None, # column/modality filter; None → whole schema
) -> FAResult

Use fetch_stats_with_args when the statistic requires parameters (e.g. histogram bin edges). Because the parameters are specific to individual columns or modalities, schema selection is encoded directly inside stats_args — there is no separate dataset_schema:

fetch_stats_with_args(
    stats_args: dict,   # encodes both schema selection and computation parameters
) -> FAResult

Both methods cache their results. fetch_stats keys the cache by (node_ids, dataset_schema); fetch_stats_with_args keys it by (node_ids, stats_args). In both cases, an identical call is served from the cache without a network round-trip.

Convenience methods are thin wrappers around fetch_stats + global_stats for the most common stats:

exp.analytics.mean(dataset_schema=None)
# → {'year': 2016.96, 'price': 16235.20, 'mileage': 23908.94, 'mpg': 55.25, ...}

exp.analytics.mean(dataset_schema=["price", "mileage"])
# → {'price': 16235.20, 'mileage': 23908.94}

No stats_args in convenience methods

Use fetch_stats_with_args directly for histogram, which requires bin_edges in stats_args.

Stat Dependencies

The orchestrator resolves dependencies automatically before building the accumulator tree. This means requesting variance will also compute mean and count on the node even if they are not listed explicitly in stats. Dependencies and required arguments for each stat are declared in fedbiomed/common/analytics/accumulators/_registry.py.

Cross-Node Aggregation

Once all nodes have replied, FAResult calls AGGREGATORS_MAP to combine per-node partial results into a single global value per modality or column. Each function is registered via the @aggregator(stat) decorator; its parameter names match Stats string values.

Stat Aggregation logic Status
count sum (scalar int, or dict of per-key counts) Enabled
sum Σ(mean × count) per node Derived (researcher side)
mean weighted mean: Σ(mean × count) / Σcount Enabled
variance combined sample variance via SS-within + SS-between Enabled
std √variance (derived; never sent from nodes) Derived (researcher side)
histogram element-wise count sum (bin edges must match across nodes) Under validation
quantile linear interpolation on the aggregated histogram Under validation

Component Responsibilities

Common layer

File Responsibility
fedbiomed/common/constants.py Stats enum — the single source of truth for valid stat names
fedbiomed/common/message.py FARequest / FAReply wire schemas (add fields here when the protocol changes)
fedbiomed/common/analytics/_aggregators.py AGGREGATORS_MAP — maps each stat name to its cross-node aggregation function
fedbiomed/common/analytics/_orchestrator.py AnalyticsOrchestrator — drives per-node stat computation; builds accumulator trees from the dataset schema
fedbiomed/common/analytics/accumulators/_registry.py Links stat names ↔ accumulator classes and element types; update here to register a new stat
fedbiomed/common/analytics/accumulators/_operations.py Primitive accumulator implementations (sum, count, histogram, quantile, …)
fedbiomed/common/analytics/accumulators/_row.py Vectorised accumulator for tabular / row data
fedbiomed/common/analytics/accumulators/_image.py Accumulator for N-D array data — not yet supported; image datasets are not covered by FA
fedbiomed/common/analytics/accumulators/_base.py Accumulator abstract base class

Node layer

File Responsibility
fedbiomed/node/config.py allow_federated_analytics flag — guards FA on a per-node basis
fedbiomed/node/node.py Routes incoming FARequest messages to FAJob
fedbiomed/node/jobs/_fa_job.py FAJob — validates the request, calls dataset.compute_stats(), returns FAReply or ErrorMessage

Researcher layer

File Responsibility
fedbiomed/researcher/federated_workflows/_federated_analytics.py FederatedAnalytics (API, cache) and FAResult (per-node storage + cross-node aggregation)
fedbiomed/researcher/federated_workflows/_federated_workflow.py Instantiates FederatedAnalytics as experiment.analytics
fedbiomed/researcher/federated_workflows/jobs/_fa_request_job.py FARequestJob — broadcast FARequest to nodes and collect FAReply responses

Adding a New Statistic

  1. Add the name to Stats in fedbiomed/common/constants.py.
  2. Implement the accumulator in fedbiomed/common/analytics/accumulators/_operations.py.
  3. Register it in fedbiomed/common/analytics/accumulators/_registry.py (element type, dependency, required args).
  4. Add the cross-node aggregator to AGGREGATORS_MAP in fedbiomed/common/analytics/_aggregators.py.
  5. (Optional) Add a convenience method in FederatedAnalytics (fedbiomed/researcher/federated_workflows/_federated_analytics.py).
  6. Add tests in tests/test_analytics/.

Test Coverage

Test file What it covers
tests/test_analytics/test_federated_analytics.py FederatedAnalytics and FAResult (cache, merge, aggregation)
tests/test_analytics/test_node_fa_job.py FAJob — permission checks, error paths, compute_stats delegation
tests/test_analytics/test_fa_request_job.py FARequestJob — request broadcast and reply collection
tests/test_analytics/test_analytics_orchestrator.py AnalyticsOrchestrator — schema parsing, accumulator wiring
tests/test_analytics/test_aggregators.py AGGREGATORS_MAP aggregator functions
tests/test_analytics/test_accumulators_row.py RowAccumulator
tests/test_analytics/test_accumulators_image.py ImageAccumulator
tests/test_analytics/test_accumulators_operations.py Primitive accumulator operations
tests/test_analytics/test_accumulators_registry.py AnalyticsRegistry
tests/test_message.py FARequest / FAReply message creation
tests/test_node.py Node routing of FARequest to FAJob