Secagg

Classes

AdditiveSecret

AdditiveSecret(secret)

Manages additive secret.

Parameters:

Name Type Description Default
secret Union[int, List[int]]

The secret to be shared, either an integer or a list of integers.

required

Raises:

Type Description
FedbiomedValueError

If the secret is not an int or a list of integers.

Source code in fedbiomed/common/secagg/_additive_ss.py
def __init__(self, secret: Union[int, List[int]]) -> None:
    """
    Initializes the AdditiveSecret class with the provided additive secret value.

    Args:
        secret: The secret to be shared, either an integer or a list of integers.

    Raises:
        FedbiomedValueError: If the secret is not an int or a list of integers.
    """
    if not (
        isinstance(secret, int)
        or (isinstance(secret, list) and all(isinstance(i, int) for i in secret))
    ):
        raise FedbiomedValueError("AdditiveSecret must be an int or a list of int")
    self._secret = secret

Attributes

secret property
secret

Getter for secret

Returns:

Type Description
Union[List, int]

Secret value

Functions

split
split(num_shares, bit_length=None)

Splits the secret into the specified number of shares using additive secret sharing. The sum of the shares will equal the original secret.

Parameters:

Name Type Description Default
num_shares int

The number of shares to generate.

required
bit_length Optional[int]

The bit length of the shares. Defaults to None.

None

Returns:

Type Description
AdditiveShares

AdditiveShares object representing the split shares.

Raises:

Type Description
FedbiomedValueError

If the number of shares is less than or equal to 0.

Source code in fedbiomed/common/secagg/_additive_ss.py
def split(
    self, num_shares: int, bit_length: Optional[int] = None
) -> "AdditiveShares":
    """
    Splits the secret into the specified number of shares using additive secret sharing.
    The sum of the shares will equal the original secret.

    Args:
        num_shares: The number of shares to generate.
        bit_length: The bit length of the shares. Defaults to None.

    Returns:
        AdditiveShares  object representing the split shares.

    Raises:
        FedbiomedValueError: If the number of shares is less than or equal to 0.
    """
    if num_shares <= 0:
        raise FedbiomedValueError("Number of shares must be greater than 0")

    if isinstance(self._secret, int):
        shares = self._shares_int(self._secret, num_shares, bit_length)
    else:
        shares = []
        for value in self._secret:
            partial_shares = self._shares_int(value, num_shares, bit_length)
            shares.append(partial_shares)

        shares = list(map(list, zip(*shares)))

    return AdditiveShares([AdditiveShare(share) for share in shares])

AdditiveShare

AdditiveShare(value)

AdditiveShare class to be used after diveding secret into multiple shares

Parameters:

Name Type Description Default
value Union[int, List[int]]

The value of the share, either an integer or a list of integers.

required

Raises:

Type Description
FedbiomedTypeError

If the value is neither an int nor a list of integers.

Source code in fedbiomed/common/secagg/_additive_ss.py
def __init__(self, value: Union[int, List[int]]) -> None:
    """
    Initializes the AdditiveShare class with a given value, representing an
    additive secret share.

    Args:
        value (Union[int, List[int]]): The value of the share, either an integer
            or a list of integers.

    Raises:
        FedbiomedTypeError: If the value is neither an int nor a list of integers.
    """
    if not (
        isinstance(value, int)
        or (isinstance(value, list) and all(isinstance(i, int) for i in value))
    ):
        raise FedbiomedTypeError(
            "AdditiveShare value must be an int or a list of int"
        )
    self._value = value

Attributes

value property
value

Gets the share's value.

Returns:

Type Description
Union[int, List[int]]

Union[int, List[int]]: The value of the share.

AdditiveShares

AdditiveShares(shares)

Bases: list

A class to represent a collection of AdditiveShare objects.

Parameters:

Name Type Description Default
shares List[AdditiveShare]

A list of AdditiveShare objects.

required

Raises:

Type Description
FedbiomedTypeError

If the shares are not of type AdditiveShare.

Source code in fedbiomed/common/secagg/_additive_ss.py
def __init__(self, shares: List[AdditiveShare]) -> None:
    """
    Initializes the AdditiveShares class with a list of Share objects.

    Args:
        shares: A list of AdditiveShare objects.

    Raises:
        FedbiomedTypeError: If the shares are not of type AdditiveShare.
    """
    if not all(isinstance(share, AdditiveShare) for share in shares):
        raise FedbiomedTypeError("All shares must be of type Share")

    super().__init__(shares)

Functions

reconstruct
reconstruct()

Reconstructs the secret from the shares.

Returns:

Type Description
Union[int, List[int]]

Union[int, List[int]]: The reconstructed secret.

Source code in fedbiomed/common/secagg/_additive_ss.py
def reconstruct(self) -> Union[int, List[int]]:
    """
    Reconstructs the secret from the shares.

    Returns:
        Union[int, List[int]]: The reconstructed secret.
    """
    if all(isinstance(share.value, int) for share in self):
        result = sum(share.value for share in self)
    elif all(isinstance(share.value, list) for share in self):
        result = [
            sum(share.value[i] for share in self) for i in range(len(self[0].value))
        ]
    else:
        raise FedbiomedTypeError("Shares must be of the same type")
    return result
to_list
to_list()

Gets the values of the shares.

Returns:

Type Description
List[Union[int, List[int]]]

List[Union[int, List[int]]]: The values of the shares.

Source code in fedbiomed/common/secagg/_additive_ss.py
def to_list(self) -> List[Union[int, List[int]]]:
    """
    Gets the values of the shares.

    Returns:
        List[Union[int, List[int]]]: The values of the shares.
    """
    return [share.value for share in self]

DHKey

DHKey(private_key_pem=None, public_key_pem=None)

Key handling for ephemeral-ephemeral ECDH, a session is defined for each experiment.

This class handles the generation and importing of ECC keys using the P-256 curve. It provides methods to export the private and public keys in PEM format as bytes.

Attributes:

Name Type Description
private_key

The user's private ECC key.

public_key

The user's public ECC key.

Parameters:

Name Type Description Default
private_key_pem bytes | None

Optional. The private key in PEM format.

None
public_key_pem bytes | None

Optional. The public key in PEM format.

None
Source code in fedbiomed/common/secagg/_dh.py
def __init__(
    self,
    private_key_pem: bytes | None = None,
    public_key_pem: bytes | None = None
) -> None:
    """
    Initializes the DHKey instance by generating a new key pair or importing the provided keys.

    Args:
        private_key_pem: Optional. The private key in PEM format.
        public_key_pem: Optional. The public key in PEM format.
    """
    if private_key_pem:
        self.private_key = self._import_key(
            serialization.load_pem_private_key,
            data=private_key_pem,
            password=None,
            backend=default_backend()
        )
    elif not public_key_pem:
        self.private_key = ec.generate_private_key(
            ec.SECP256R1(), default_backend()
        )
    else:
        # Means that only public key is loaded
        self.private_key = None

    if public_key_pem:
        self.public_key = self._import_key(serialization.load_pem_public_key,
                                           data=public_key_pem,
                                           backend=default_backend()
                                           )
    else:
        self.public_key = self.private_key.public_key()

Attributes

private_key instance-attribute
private_key = _import_key(load_pem_private_key, data=private_key_pem, password=None, backend=default_backend())
public_key instance-attribute
public_key = _import_key(load_pem_public_key, data=public_key_pem, backend=default_backend())

Functions

export_private_key
export_private_key()

Exports the private key to PEM format.

Returns:

Type Description

The private key in PEM format. Returns None if private key is not loaded.

Source code in fedbiomed/common/secagg/_dh.py
def export_private_key(self):
    """
    Exports the private key to PEM format.

    Returns:
        The private key in PEM format. Returns `None` if private key
            is not loaded.
    """
    if not self.private_key:
        return None

    return self.private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption(),
    )
export_public_key
export_public_key()

Exports the public key to PEM format.

Returns:

Type Description

The public key in PEM format.

Source code in fedbiomed/common/secagg/_dh.py
def export_public_key(self):
    """
    Exports the public key to PEM format.

    Returns:
        The public key in PEM format.
    """
    return self.public_key.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo,
    )

DHKeyAgreement

DHKeyAgreement(node_u_id, node_u_dh_key, session_salt)

Key Agreement with ephemeral-ephemeral ECDH, a session is defined for each experiment.

This class handles the key agreement process using the private ECC key of the user. It imports the private key, performs the key exchange, and derives the shared secret using a KDF.

Attributes:

Name Type Description
private_key

The user's private ECC key.

Parameters:

Name Type Description Default
node_u_id

The ID of the node.

required
node_u_dh_key DHKey

The keypair of the node.

required
session_salt

A session-specific salt.

required
Source code in fedbiomed/common/secagg/_dh.py
def __init__(self, node_u_id, node_u_dh_key: DHKey, session_salt):
    """
    Initializes the DHKeyAgreement instance.

    Args:
        node_u_id: The ID of the node.
        node_u_dh_key: The keypair of the node.
        session_salt: A session-specific salt.
    """
    self._node_u_id = node_u_id
    self._dh_key = node_u_dh_key
    self.session_salt = session_salt

Attributes

session_salt instance-attribute
session_salt = session_salt

Functions

agree
agree(node_v_id, public_key_pem)

Performs the key agreement and derives the shared secret.

Parameters:

Name Type Description Default
node_v_id

The ID of the other node (node_v).

required
public_key_pem

The public key of the other node in PEM format as bytes.

required

Returns:

Type Description

The derived shared secret.

Source code in fedbiomed/common/secagg/_dh.py
def agree(self, node_v_id, public_key_pem):
    """
    Performs the key agreement and derives the shared secret.

    Args:
        node_v_id: The ID of the other node (node_v).
        public_key_pem: The public key of the other node in PEM format as bytes.

    Returns:
        The derived shared secret.
    """
    dh_v_key = DHKey(public_key_pem=public_key_pem)
    shared_secret = self._dh_key.private_key.exchange(ec.ECDH(), dh_v_key.public_key)
    derived_key = self._kdf(shared_secret, node_v_id)
    return derived_key

EncryptedNumber

EncryptedNumber(param, ciphertext)

Bases: object

An encrypted number by one of the user keys .

Attributes:

Name Type Description
param

The public parameters

ciphertext

The integer value of the ciphertext

ciphertext: The integer value of the ciphertext
Source code in fedbiomed/common/secagg/_jls.py
def __init__(self, param: PublicParam, ciphertext: int):
    """

    Args:
        param: The public parameters.
        ciphertext: The integer value of the ciphertext

    """
    self.public_param = param
    self.ciphertext = mpz(ciphertext)

Attributes

ciphertext instance-attribute
ciphertext = mpz(ciphertext)
public_param instance-attribute
public_param = param

JoyeLibert

JoyeLibert()

The Joye-Libert scheme. It consists of three Probabilistic Polynomial Time algorithms: Protect, and Agg.

Attributes:

Name Type Description
_vector_encoder

The vector encoding/decoding scheme

VEParameters.TARGET_RANGE + VEParameters.WEIGHT_RANGE should be equal or less than 2**32

Source code in fedbiomed/common/secagg/_jls.py
def __init__(self):
    """Constructs the class

    VEParameters.TARGET_RANGE + VEParameters.WEIGHT_RANGE should be
    equal or less than 2**32
    """
    self._vector_encoder = VES(
        ptsize=SAParameters.KEY_SIZE // 2,
        valuesize=ceil(log2(SAParameters.TARGET_RANGE) + log2(SAParameters.WEIGHT_RANGE))
    )

Functions

aggregate
aggregate(sk_0, tau, list_y_u_tau, num_expected_params)

Aggregates users protected inputs with the server's secret key

\(X_{\tau} \gets \textbf{JL.Agg}(public_param, sk_0,\tau, \{y_{u,\tau}\}_{u \in \{1,..,n\}})\)

This algorithm aggregates the \(n\) ciphers received at time period \(\tau\) to obtain \(y_{\tau} = \prod_1^n{y_{u,\tau}}\) and decrypts the result. It obtains the sum of the private inputs ( \( X_{\tau} = \sum_{1}^n{x_{u,\tau}} \) ) as follows:

\[V_{\tau} = H(\tau)^{sk_0} \cdot y_{\tau} \qquad \qquad X_{\tau} = \frac{V_{\tau}-1}{N} \mod N\]

Parameters:

Name Type Description Default
sk_0 ServerKey

The server's secret key \(sk_0\)

required
tau int

The time period \(\tau\)

required
list_y_u_tau List[List[EncryptedNumber]]

A list of the users' protected inputs \(\{y_{u,\tau}\}_{u \in \{1,..,n\}}\)

required
num_expected_params int

Number of parameters to decode from the decrypted vectors

required

Returns:

Type Description
List[int]

The sum of the users' inputs of type int

Raises:

Type Description
ValueError

bad argument value

Source code in fedbiomed/common/secagg/_jls.py
def aggregate(
        self,
        sk_0: ServerKey,
        tau: int,
        list_y_u_tau: List[List[EncryptedNumber]],
        num_expected_params: int
) -> List[int]:
    """Aggregates users protected inputs with the server's secret key


    \\(X_{\\tau} \\gets \\textbf{JL.Agg}(public_param, sk_0,\\tau, \\{y_{u,\\tau}\\}_{u \\in \\{1,..,n\\}})\\)

    This algorithm aggregates the \\(n\\) ciphers received at time period \\(\\tau\\) to obtain
    \\(y_{\\tau} = \\prod_1^n{y_{u,\\tau}}\\) and decrypts the result.
    It obtains the sum of the private inputs ( \\( X_{\\tau} = \\sum_{1}^n{x_{u,\\tau}} \\) )
    as follows:

    $$V_{\\tau} = H(\\tau)^{sk_0} \\cdot y_{\\tau} \\qquad \\qquad X_{\\tau} = \\frac{V_{\\tau}-1}{N} \\mod N$$

    Args:
        sk_0: The server's secret key \\(sk_0\\)
        tau: The time period \\(\\tau\\)
        list_y_u_tau: A list of the users' protected inputs \\(\\{y_{u,\\tau}\\}_{u \\in \\{1,..,n\\}}\\)
        num_expected_params: Number of parameters to decode from the decrypted vectors

    Returns:
        The sum of the users' inputs of type `int`

    Raises:
        ValueError: bad argument value
    """

    if not isinstance(sk_0, ServerKey):
        raise ValueError("Key must be an instance of `ServerKey`")

    if not isinstance(list_y_u_tau, list) or not list_y_u_tau:
        raise ValueError("list_y_u_tau should be a non-empty list.")

    if not isinstance(list_y_u_tau[0], list):
        raise ValueError("list_y_u_tau should be a list that contains list of encrypted numbers")

    n_user = len(list_y_u_tau)

    sum_of_vectors: List[EncryptedNumber] = [sum(ep) for ep in zip(*list_y_u_tau)]

    decrypted_vector = sk_0.decrypt(sum_of_vectors, tau)

    return self._vector_encoder.decode(decrypted_vector, add_ops=n_user, v_expected=num_expected_params)
protect
protect(public_param, user_key, tau, x_u_tau, n_users)

Protect user input with the user's secret key:

\(y_{u,\tau} \gets \textbf{JL.Protect}(public_param,sk_u,\tau,x_{u,\tau})\)

This algorithm encrypts private inputs \(x_{u,\tau} \in \mathbb{Z}_N\) for time period \(\tau\) using secret key \(sk_u \in \mathbb{Z}_N^2\) . It outputs cipher \(y_{u,\tau}\) such that:

\[y_{u,\tau} = (1 + x_{u,\tau} N) H(\tau)^{sk_u} \mod N^2\]

Parameters:

Name Type Description Default
public_param PublicParam

The public parameters \(public_param\)

required
user_key UserKey

The user's secret key \(sk_u\)

required
tau int

The time period \(\tau\)

required
x_u_tau List[int]

The user's input \(x_{u,\tau}\)

required
n_users int

Number of nodes/users that participates secure aggregation

required

Returns:

Type Description
List[mpz]

The protected input of type EncryptedNumber or a list of EncryptedNumber

Raises:

Type Description
TypeError

bad argument type

ValueError

bad argument value

Source code in fedbiomed/common/secagg/_jls.py
def protect(self,
            public_param: PublicParam,
            user_key: UserKey,
            tau: int,
            x_u_tau: List[int],
            n_users: int,
            ) -> List[mpz]:
    """ Protect user input with the user's secret key:

    \\(y_{u,\\tau} \\gets \\textbf{JL.Protect}(public_param,sk_u,\\tau,x_{u,\\tau})\\)

    This algorithm encrypts private inputs
    \\(x_{u,\\tau} \\in \\mathbb{Z}_N\\) for time period \\(\\tau\\)
    using secret key \\(sk_u \\in \\mathbb{Z}_N^2\\) . It outputs cipher \\(y_{u,\\tau}\\) such that:

    $$y_{u,\\tau} = (1 + x_{u,\\tau} N) H(\\tau)^{sk_u} \\mod N^2$$

    Args:
        public_param: The public parameters \\(public_param\\)
        user_key: The user's secret key \\(sk_u\\)
        tau: The time period \\(\\tau\\)
        x_u_tau: The user's input \\(x_{u,\\tau}\\)
        n_users: Number of nodes/users that participates secure aggregation

    Returns:
            The protected input of type `EncryptedNumber` or a list of `EncryptedNumber`

    Raises:
            TypeError: bad argument type
            ValueError: bad argument value
    """
    if not isinstance(user_key, UserKey):
        raise TypeError(f"Expected key for encryption type is UserKey. but got {type(user_key)}")

    if user_key.public_param != public_param:
        raise ValueError("Bad public parameter. The public parameter of user key does not match the "
                         "one given for encryption")

    if not isinstance(x_u_tau, list):
        raise TypeError(f"Bad vector for encryption. Excepted argument `x_u_tau` type list but "
                        f"got {type(x_u_tau)}")

    x_u_tau = self._vector_encoder.encode(
        V=x_u_tau,
        add_ops=n_users
    )

    return user_key.encrypt(x_u_tau, tau)

LOM

LOM(nonce=None)

Lightweight Obfuscation Mechanism (LOM) class for protecting and aggregating data.

Attributes:

Name Type Description
_prf PRF

An instance of the PRF class.

_vector_dtype str

The data type of the vector.

Source code in fedbiomed/common/secagg/_lom.py
def __init__(
    self,
    nonce: bytes | None = None
) -> None:

    if not nonce:
        nonce = secrets.token_bytes(16)

    self._prf: PRF = PRF(nonce)
    self._vector_dtype: str = 'uint32'
    self._values_bit = np.iinfo(np.dtype(self._vector_dtype)).bits  # should be equal to 32 bit

Functions

aggregate
aggregate(list_y_u_tau)

Aggregates multiple vectors into a single vector.

Parameters:

Name Type Description Default
list_y_u_tau List[List[int]]

A list of vectors from different nodes.

required

Returns:

Type Description
List[int]

The aggregated vector.

Source code in fedbiomed/common/secagg/_lom.py
def aggregate(self, list_y_u_tau: List[List[int]]) -> List[int]:
    """
    Aggregates multiple vectors into a single vector.

    Args:
        list_y_u_tau: A list of vectors from different nodes.

    Returns:
        The aggregated vector.
    """
    list_y_u_tau = np.array(list_y_u_tau, dtype=self._vector_dtype)
    decrypted_vector = np.sum(list_y_u_tau, axis=0)
    decrypted_vector = decrypted_vector.astype(np.uint32)
    decrypted_vector = decrypted_vector.tolist()

    return decrypted_vector
protect
protect(node_id, pairwise_secrets, tau, x_u_tau, node_ids)

Protects the input vector by applying a mask based on pairwise secrets.

Parameters:

Name Type Description Default
node_id str

Id of the node that applies encryption

required
pairwise_secrets Dict[str, bytes]

DH agreed secrets between node that applies encryption and others

required
tau int

The current round number.

required
x_u_tau List[int]

The input vector to be protected.

required
node_ids List[str]

A list of node IDs participates aggregation.

required

Raises:

Type Description
FedBioMedError

raises if the input vector x_u_tau contains any values that exceed 32 - log_2(numner_of_nodes), where 32 is the number of bit for each value (uint32). Not respecting the above condition can lead to computation overflow.

Returns:

Type Description
List[int]

The protected (masked) vector.

Source code in fedbiomed/common/secagg/_lom.py
def protect(
    self,
    node_id: str,
    pairwise_secrets: Dict[str, bytes],
    tau: int,
    x_u_tau: List[int],
    node_ids: List[str]
) -> List[int]:
    """
    Protects the input vector by applying a mask based on pairwise secrets.

    Args:
        node_id: Id of the node that applies encryption
        pairwise_secrets: DH agreed secrets between node that applies encryption and others
        tau: The current round number.
        x_u_tau: The input vector to be protected.
        node_ids: A list of node IDs participates aggregation.

    Raises:
        FedBioMedError: raises if the input vector `x_u_tau` contains any 
            values that exceed  `32 - log_2(numner_of_nodes)`, where `32` is 
            the number of bit for each value (`uint32`). Not respecting the above condition 
            can lead to computation overflow.

    Returns:
        The protected (masked) vector.
    """

    num_nodes = len(node_ids)
    _max_param_bits = max(val.bit_length() for val in x_u_tau)
    _node_bits = math.ceil(math.log2(num_nodes))

    if _max_param_bits > self._values_bit - _node_bits:
        _max_nodes = 2**(self._values_bit - _max_param_bits)
        _missing_bits = _max_param_bits + _node_bits - self._values_bit
        raise FedbiomedSecaggError(
            f"{ErrorNumbers.FB417.value}: Computation overflow using LOM secagg "
            f"with current settings. Cannot use more than {_max_nodes} nodes or need to "
            f"reduce sample number on this node to use {_missing_bits} bit(s) less."
        )

    x_u_tau = np.array(x_u_tau, dtype=self._vector_dtype)
    mask = np.zeros(len(x_u_tau), dtype=self._vector_dtype)
    for pair_id in node_ids:

        if pair_id == node_id:
            continue

        secret = pairwise_secrets[pair_id]

        pairwise_seed = self._prf.eval_key(
            pairwise_secret=secret,
            tau=tau)

        # print(len(pairwise_seed))
        pairwise_vector = self._prf.eval_vector(
            seed=pairwise_seed,
            tau=tau,
            input_size=len(x_u_tau))

        pairwise_vector = np.frombuffer(pairwise_vector, dtype=self._vector_dtype)

        if pair_id < node_id:
            mask += pairwise_vector
        else:
            mask -= pairwise_vector

    encrypted_params = mask + x_u_tau

    return encrypted_params.tolist()

PRF

PRF(nonce)

Pseudorandom Function (PRF) class using the ChaCha20 stream cipher.

Attributes:

Name Type Description
nonce bytes

A 12-byte nonce used for encryption.

Source code in fedbiomed/common/secagg/_lom.py
def __init__(self, nonce: bytes) -> None:
    self._nonce = nonce

Functions

eval_key
eval_key(pairwise_secret, tau)

Evaluates a pseudorandom key for a given round.

Parameters:

Name Type Description Default
pairwise_secret bytes

A secret key shared between nodes.

required
round int

The current round number.

required

Returns:

Name Type Description
bytes bytes

A 32-byte pseudorandom key.

Source code in fedbiomed/common/secagg/_lom.py
def eval_key(self, pairwise_secret: bytes, tau: int) -> bytes:
    """
    Evaluates a pseudorandom key for a given round.

    Args:
        pairwise_secret (bytes): A secret key shared between nodes.
        round (int): The current round number.

    Returns:
        bytes: A 32-byte pseudorandom key.
    """
    tau = tau.to_bytes(16, 'big')
    try:
        encryptor = Cipher(
            algorithms.ChaCha20(pairwise_secret, self._nonce),
            mode=None,
            backend=default_backend()
        ).encryptor()
    except ValueError as ve:
        raise FedbiomedSecaggError(f"{ErrorNumbers.FB417.value}: Error while ciphering: got exception {ve}")
    c = encryptor.update(tau) + encryptor.finalize()
    # the output is a 16 bytes string, pad it to 32 bytes
    c = c + b'\x00' * 16

    return c
eval_vector
eval_vector(seed, tau, input_size)

Evaluates a pseudorandom vector based on the seed.

Parameters:

Name Type Description Default
seed bytes

A 32-byte seed for generating the vector.

required
tau int

The current round number.

required
input_size int

The size of the input vector.

required

Returns:

Name Type Description
bytes bytes

A pseudorandom vector of the specified size.

Source code in fedbiomed/common/secagg/_lom.py
def eval_vector(self, seed: bytes, tau: int, input_size: int) -> bytes:
    """
    Evaluates a pseudorandom vector based on the seed.

    Args:
        seed (bytes): A 32-byte seed for generating the vector.
        tau (int): The current round number.
        input_size (int): The size of the input vector.

    Returns:
        bytes: A pseudorandom vector of the specified size.
    """
    encryptor = Cipher(
        algorithms.ChaCha20(seed, self._nonce),
        mode=None,
        backend=default_backend()
    ).encryptor()

    # TODO: Better handling limits for secure aggregation
    if not (input_size + _MAX_ROUND) <= 2**32:
        raise FedbiomedSecaggError(
            f"{ErrorNumbers.FB417.value}: Can not perform encryiton due to large input vector. input_size "
            f"({input_size}) + MAX_ROUND ({_MAX_ROUND}) allowed is greater than 2**32"
        )

    # create a list of indices from 0 to input_size where each element is concatenated with tau
    taus = b''.join([(i + tau).to_bytes(4, 'big') for i in range(input_size)])
    return encryptor.update(taus) + encryptor.finalize()

SecaggCrypter

SecaggCrypter()

Secure aggregation encryption and decryption manager.

This class is responsible for encrypting model parameters using Joye-Libert secure aggregation scheme. It also aggregates encrypted model parameters and decrypts to retrieve final model parameters as vector. This vector can be loaded into model by converting it proper format for the framework.

Source code in fedbiomed/common/secagg/_secagg_crypter.py
def __init__(self) -> None:
    """Constructs ParameterEncrypter"""
    self._jls = JoyeLibert()

Functions

aggregate
aggregate(current_round, num_nodes, params, key, biprime, total_sample_size, clipping_range=None, num_expected_params=1)

Decrypt given parameters

Parameters:

Name Type Description Default
current_round int

The round that the aggregation will be done

required
params List[List[int]]

Aggregated/Summed encrypted parameters

required
num_nodes int

number of nodes

required
key int

The key that will be used for decryption

required
biprime int

Biprime number of PublicParam

required
total_sample_size int

sum of number of samples from all nodes

required
clipping_range Union[int, None]

Clipping range for reverse-quantization, should be the same clipping range used for quantization

None
num_expected_params int

number of parameters to decode from the params

1

Returns: Aggregated parameters decrypted and structured

Raises:

Type Description
FedbiomedSecaggCrypterError

bad parameters

FedbiomedSecaggCrypterError

aggregation issue

Source code in fedbiomed/common/secagg/_secagg_crypter.py
def aggregate(
        self,
        current_round: int,
        num_nodes: int,
        params: List[List[int]],
        key: int,
        biprime: int,
        total_sample_size: int,
        clipping_range: Union[int, None] = None,
        num_expected_params: int = 1
) -> List[float]:
    """Decrypt given parameters

    Args:
        current_round: The round that the aggregation will be done
        params: Aggregated/Summed encrypted parameters
        num_nodes: number of nodes
        key: The key that will be used for decryption
        biprime: Biprime number of `PublicParam`
        total_sample_size: sum of number of samples from all nodes
        clipping_range: Clipping range for reverse-quantization, should be the
            same clipping range used for quantization
        num_expected_params: number of parameters to decode from the `params`
    Returns:
        Aggregated parameters decrypted and structured

    Raises:
         FedbiomedSecaggCrypterError: bad parameters
         FedbiomedSecaggCrypterError: aggregation issue
    """
    start = time.process_time()

    if len(params) != num_nodes:
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: Num of parameters that are received from nodes "
            f"does not match the number of nodes has been set for the encrypter. There might "
            f"be some nodes did not answered to training request or num of clients of "
            "`ParameterEncrypter` has not been set properly before train request.")

    if not isinstance(params, list) or not all(isinstance(p, list) for p in params):
        raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: The parameters to aggregate should be a "
                                          f"list containing list of parameters")

    if not all(all(isinstance(p_, int) for p_ in p) for p in params):
        raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: Invalid parameter type. The parameters "
                                          f"should be of type of integers.")

    # TODO provide dynamically created biprime. Biprime that is used
    #  on the node-side should matched the one used for decryption
    public_param = self._setup_public_param(biprime=biprime)
    key = ServerKey(public_param, key)

    params = self._convert_to_encrypted_number(params, public_param)

    try:
        sum_of_weights = self._jls.aggregate(
            sk_0=key,
            tau=current_round,  # The time period \\(\\tau\\)
            list_y_u_tau=params,
            num_expected_params=num_expected_params
        )
    except (ValueError, TypeError) as e:
        raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624.value}: The aggregation of encrypted parameters "
                                          f"is not successful: {e}")

    # Reverse quantize and division (averaging)
    logger.info(f"Aggregating {len(params)} parameters from {num_nodes} nodes.")
    aggregated_params = self._apply_average(sum_of_weights, total_sample_size)

    aggregated_params: List[float] = reverse_quantize(
        aggregated_params,
        clipping_range=clipping_range
    )
    time_elapsed = time.process_time() - start
    logger.debug(f"Aggregation is completed in {round(time_elapsed, ndigits=2)} seconds.")

    return aggregated_params
encrypt
encrypt(num_nodes, current_round, params, key, biprime, clipping_range=None, weight=None)

Encrypts model parameters.

Parameters:

Name Type Description Default
num_nodes int

Number of nodes that is expected to encrypt parameters for aggregation

required
current_round int

Current round of federated training

required
params List[float]

List of flatten parameters

required
key int

Key to encrypt

required
biprime int

Prime number to create public parameter

required
weight Optional[int]

Weight for the params

None
clipping_range Union[int, None]

Clipping-range for quantization of float model parameters. Clipping range must grater than minimum model parameters

None

Returns:

Type Description
List[int]

List of encrypted parameters

Raises:

Type Description
FedbiomedSecaggCrypterError

bad parameters

FedbiomedSecaggCrypterError

encryption issue

Source code in fedbiomed/common/secagg/_secagg_crypter.py
def encrypt(
        self,
        num_nodes: int,
        current_round: int,
        params: List[float],
        key: int,
        biprime: int,
        clipping_range: Union[int, None] = None,
        weight: Optional[int] = None,
) -> List[int]:
    """Encrypts model parameters.

    Args:
        num_nodes: Number of nodes that is expected to encrypt parameters for aggregation
        current_round: Current round of federated training
        params: List of flatten parameters
        key: Key to encrypt
        biprime: Prime number to create public parameter
        weight: Weight for the params
        clipping_range: Clipping-range for quantization of float model parameters. Clipping range
            must grater than minimum model parameters

    Returns:
        List of encrypted parameters

    Raises:
        FedbiomedSecaggCrypterError: bad parameters
        FedbiomedSecaggCrypterError: encryption issue
    """

    start = time.process_time()

    if not isinstance(params, list):
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: Expected argument `params` type list but got {type(params)}"
        )

    if not all([isinstance(p, float) for p in params]):
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: The parameters to encrypt should list of floats. "
            f"There are one or more than a value that is not type of float."
        )

    # Make use the key is instance of
    if not isinstance(key, int):
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: The argument `key` must be integer"
        )

    # first we quantize the parameters, and we get params in the range [0, 2^VEParameters.TARGET_RANGE]
    params = quantize(weights=params,
                      clipping_range=clipping_range)

    # We multiply the parameters with the weight, and we get params in
    # the range [0, 2^(log2(VEParameters.TARGET_RANGE) + log2(VEParameters.WEIGHT_RANGE)) - 1]
    # Check if weight if num_bits of weight is less than VEParameters.WEIGHT_RANGE
    if weight is not None:
        if 2**weight.bit_length() > SAParameters.WEIGHT_RANGE:
            raise FedbiomedSecaggCrypterError(
                f"{ErrorNumbers.FB624.value}: The weight is too large. The weight should be less than "
                f"{SAParameters.WEIGHT_RANGE}, but got {weight}"
            )
        params = self._apply_weighting(params, weight)


    public_param = self._setup_public_param(biprime=biprime)

    # Instantiates UserKey object
    key = UserKey(public_param, key)

    try:
        # Encrypt parameters
        encrypted_params: List[mpz] = self._jls.protect(
            public_param=public_param,
            user_key=key,
            tau=current_round,
            x_u_tau=params,
            n_users=num_nodes
        )
    except (TypeError, ValueError) as exp:
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value} Error during parameter encryption. {exp}") from exp

    time_elapsed = time.process_time() - start
    logger.debug(f"Encryption of the parameters took {time_elapsed} seconds.")

    return [int(e_p) for e_p in encrypted_params]

SecaggLomCrypter

SecaggLomCrypter(nonce=None)

Bases: SecaggCrypter

Low-Overhead Masking secure aggregation controller

Parameters:

Name Type Description Default
nonce str | None

nonce to use in encryption. Needs to be the same between the parties of the LOM computation. Can be disclosed (public). Must not be re-used.

None
Source code in fedbiomed/common/secagg/_secagg_crypter.py
def __init__(
    self,
    nonce: str | None = None
):
    """LOM Secure aggregation to encrypt and aggregate

    Args:
        nonce: `nonce` to use in encryption. Needs to be the same between the parties of
            the LOM computation. Can be disclosed (public). Must not be re-used.
    """
    if nonce:
        # The security relies on the non-reuse of the nonce.
        # We also need to ensure 128 bits
        # Padding is enough, using `random()` is misleading (no additional security)
        nonce = str.encode(nonce).zfill(16)[:16]

    self._lom = LOM(nonce)

Functions

aggregate
aggregate(params, total_sample_size, clipping_range=None)

Decrypt given parameters

Parameters:

Name Type Description Default
params List[List[int]]

Aggregated/Summed encrypted parameters

required
total_sample_size int

sum of number of samples from all nodes

required
clipping_range Union[int, None]

Clipping range for reverse-quantization, should be the same clipping range used for quantization

None

Returns: Aggregated parameters decrypted and structured

Raises:

Type Description
FedbiomedSecaggCrypterError

bad parameters

FedbiomedSecaggCrypterError

aggregation issue

Source code in fedbiomed/common/secagg/_secagg_crypter.py
def aggregate(
        self,
        params: List[List[int]],
        total_sample_size: int,
        clipping_range: Union[int, None] = None,
) -> List[float]:
    """Decrypt given parameters

    Args:
        params: Aggregated/Summed encrypted parameters
        total_sample_size: sum of number of samples from all nodes
        clipping_range: Clipping range for reverse-quantization, should be the
            same clipping range used for quantization
    Returns:
        Aggregated parameters decrypted and structured

    Raises:
         FedbiomedSecaggCrypterError: bad parameters
         FedbiomedSecaggCrypterError: aggregation issue
    """
    start = time.process_time()

    if not isinstance(params, list) or not all(isinstance(p, list) for p in params):
        raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: The parameters to aggregate should be a "
                                          f"list containing list of parameters")

    if not all(all(isinstance(p_, int) for p_ in p) for p in params):
        raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: Invalid parameter type. The parameters "
                                          f"should be of type of integers.")

    num_nodes = len(params)

    try:
        sum_of_weights = self._lom.aggregate(
            list_y_u_tau=params,
        )
    except (ValueError, TypeError) as e:
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: The aggregation of encrypted parameters "
            f"is not successful: {e}") from e


    # Reverse quantize and division (averaging)
    logger.info(f"Aggregating {len(params)} parameters from {num_nodes} nodes.")
    aggregated_params = self._apply_average(sum_of_weights, total_sample_size)

    aggregated_params: List[float] = reverse_quantize(
        aggregated_params,
        clipping_range=clipping_range
    )
    time_elapsed = time.process_time() - start
    logger.debug(f"Aggregation is completed in {round(time_elapsed, ndigits=2)} seconds.")

    return aggregated_params
encrypt
encrypt(current_round, node_id, params, pairwise_secrets, node_ids, clipping_range=None, weight=None)

Encrypts model parameters.

Parameters:

Name Type Description Default
current_round int

Current round of federated training

required
node_id str

ID of the node applies encryption

required
params List[float]

List of flatten parameters

required
pairwise_secrets Dict[str, bytes]

DH agreed secrets between node that applies encryption and others

required
node_ids List[str]

All nodes that participates secure aggregation

required
weight Optional[int]

Weight for the params

None
clipping_range Union[int, None]

Clipping-range for quantization of float model parameters. Clipping range must grater than minimum model parameters

None

Returns:

Type Description
List[int]

List of encrypted parameters

Raises:

Type Description
FedbiomedSecaggCrypterError

bad parameters

FedbiomedSecaggCrypterError

encryption issue

Source code in fedbiomed/common/secagg/_secagg_crypter.py
def encrypt(
    self,
    current_round: int,
    node_id: str,
    params: List[float],
    pairwise_secrets: Dict[str, bytes],
    node_ids: List[str],
    clipping_range: Union[int, None] = None,
    weight: Optional[int] = None,
) -> List[int]:
    """Encrypts model parameters.

    Args:
        current_round: Current round of federated training
        node_id: ID of the node applies encryption
        params: List of flatten parameters
        pairwise_secrets: DH agreed secrets between node that applies encryption and others
        node_ids: All nodes that participates secure aggregation
        weight: Weight for the params
        clipping_range: Clipping-range for quantization of float model parameters. Clipping range
            must grater than minimum model parameters

    Returns:
        List of encrypted parameters

    Raises:
        FedbiomedSecaggCrypterError: bad parameters
        FedbiomedSecaggCrypterError: encryption issue
    """

    start = time.process_time()

    if not isinstance(params, list):
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: Expected argument `params` type list but got {type(params)}"
        )

    if not all(isinstance(p, float) for p in params):
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: The parameters to encrypt should list of floats. "
            f"There are one or more than a value that is not type of float."
        )

    params = quantize(weights=params,
                      clipping_range=clipping_range)

    if weight is not None:
        if 2**weight.bit_length() > SAParameters.WEIGHT_RANGE:
            raise FedbiomedSecaggCrypterError(
                f"{ErrorNumbers.FB624.value}: The weight is too large. The weight should be less than "
                f"{SAParameters.WEIGHT_RANGE}."
            )
        params = self._apply_weighting(params, weight)

    try:
        # Encrypt parameters
        encrypted_params: List[int] = self._lom.protect(
            pairwise_secrets=pairwise_secrets,
            node_id=node_id,
            tau=current_round,
            x_u_tau=params,
            node_ids=node_ids
        )
    except (TypeError, ValueError) as exp:
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value} Error during parameter encryption. {exp}") from exp


    time_elapsed = time.process_time() - start
    logger.debug(f"Encryption of the parameters took {time_elapsed} seconds.")

    return encrypted_params