Secagg

Classes

DHKey

DHKey(private_key_pem=None, public_key_pem=None)

Key handling for ephemeral-ephemeral ECDH, a session is defined for each experiment.

This class handles the generation and importing of ECC keys using the P-256 curve. It provides methods to export the private and public keys in PEM format as bytes.

Attributes:

Name Type Description
private_key

The user's private ECC key.

public_key

The user's public ECC key.

Parameters:

Name Type Description Default
private_key_pem bytes | None

Optional. The private key in PEM format.

None
public_key_pem bytes | None

Optional. The public key in PEM format.

None
Source code in fedbiomed/common/secagg/_dh.py
def __init__(
    self,
    private_key_pem: bytes | None = None,
    public_key_pem: bytes | None = None
) -> None:
    """
    Initializes the DHKey instance by generating a new key pair or importing the provided keys.

    Args:
        private_key_pem: Optional. The private key in PEM format.
        public_key_pem: Optional. The public key in PEM format.
    """
    if private_key_pem:
        self.private_key = self._import_key(
            serialization.load_pem_private_key,
            data=private_key_pem,
            password=None,
            backend=default_backend()
        )
    elif not public_key_pem:
        self.private_key = ec.generate_private_key(
            ec.SECP256R1(), default_backend()
        )
    else:
        # Means that only public key is loaded
        self.private_key = None

    if public_key_pem:
        self.public_key = self._import_key(serialization.load_pem_public_key,
                                           data=public_key_pem,
                                           backend=default_backend()
                                           )
    else:
        self.public_key = self.private_key.public_key()

Attributes

private_key instance-attribute
private_key = _import_key(load_pem_private_key, data=private_key_pem, password=None, backend=default_backend())
public_key instance-attribute
public_key = _import_key(load_pem_public_key, data=public_key_pem, backend=default_backend())

Functions

export_private_key
export_private_key()

Exports the private key to PEM format.

Returns:

Type Description

The private key in PEM format. Returns None if private key is not loaded.

Source code in fedbiomed/common/secagg/_dh.py
def export_private_key(self):
    """
    Exports the private key to PEM format.

    Returns:
        The private key in PEM format. Returns `None` if private key
            is not loaded.
    """
    if not self.private_key:
        return None

    return self.private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption(),
    )
export_public_key
export_public_key()

Exports the public key to PEM format.

Returns:

Type Description

The public key in PEM format.

Source code in fedbiomed/common/secagg/_dh.py
def export_public_key(self):
    """
    Exports the public key to PEM format.

    Returns:
        The public key in PEM format.
    """
    return self.public_key.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo,
    )

DHKeyAgreement

DHKeyAgreement(node_u_id, node_u_dh_key, session_salt)

Key Agreement with ephemeral-ephemeral ECDH, a session is defined for each experiment.

This class handles the key agreement process using the private ECC key of the user. It imports the private key, performs the key exchange, and derives the shared secret using a KDF.

Attributes:

Name Type Description
private_key

The user's private ECC key.

Parameters:

Name Type Description Default
node_u_id

The ID of the node.

required
node_u_dh_key DHKey

The keypair of the node.

required
session_salt

A session-specific salt.

required
Source code in fedbiomed/common/secagg/_dh.py
def __init__(self, node_u_id, node_u_dh_key: DHKey, session_salt):
    """
    Initializes the DHKeyAgreement instance.

    Args:
        node_u_id: The ID of the node.
        node_u_dh_key: The keypair of the node.
        session_salt: A session-specific salt.
    """
    self._node_u_id = node_u_id
    self._dh_key = node_u_dh_key
    self.session_salt = session_salt

Attributes

session_salt instance-attribute
session_salt = session_salt

Functions

agree
agree(node_v_id, public_key_pem)

Performs the key agreement and derives the shared secret.

Parameters:

Name Type Description Default
node_v_id

The ID of the other node (node_v).

required
public_key_pem

The public key of the other node in PEM format as bytes.

required

Returns:

Type Description

The derived shared secret.

Source code in fedbiomed/common/secagg/_dh.py
def agree(self, node_v_id, public_key_pem):
    """
    Performs the key agreement and derives the shared secret.

    Args:
        node_v_id: The ID of the other node (node_v).
        public_key_pem: The public key of the other node in PEM format as bytes.

    Returns:
        The derived shared secret.
    """
    dh_v_key = DHKey(public_key_pem=public_key_pem)
    shared_secret = self._dh_key.private_key.exchange(ec.ECDH(), dh_v_key.public_key)
    derived_key = self._kdf(shared_secret, node_v_id)
    return derived_key

EncryptedNumber

EncryptedNumber(param, ciphertext)

Bases: object

An encrypted number by one of the user keys .

Attributes:

Name Type Description
param

The public parameters

ciphertext

The integer value of the ciphertext

ciphertext: The integer value of the ciphertext
Source code in fedbiomed/common/secagg/_jls.py
def __init__(self, param: PublicParam, ciphertext: int):
    """

    Args:
        param: The public parameters.
        ciphertext: The integer value of the ciphertext

    """
    self.public_param = param
    self.ciphertext = mpz(ciphertext)

Attributes

ciphertext instance-attribute
ciphertext = mpz(ciphertext)
public_param instance-attribute
public_param = param

JoyeLibert

JoyeLibert()

The Joye-Libert scheme. It consists of three Probabilistic Polynomial Time algorithms: Protect, and Agg.

Attributes:

Name Type Description
_vector_encoder

The vector encoding/decoding scheme

VEParameters.TARGET_RANGE + VEParameters.WEIGHT_RANGE should be equal or less than 2**32

Source code in fedbiomed/common/secagg/_jls.py
def __init__(self):
    """Constructs the class

    VEParameters.TARGET_RANGE + VEParameters.WEIGHT_RANGE should be
    equal or less than 2**32
    """
    self._vector_encoder = VES(
        ptsize=SAParameters.KEY_SIZE // 2,
        valuesize=ceil(log2(SAParameters.TARGET_RANGE) + log2(SAParameters.WEIGHT_RANGE))
    )

Functions

aggregate
aggregate(sk_0, tau, list_y_u_tau, num_expected_params)

Aggregates users protected inputs with the server's secret key

\(X_{\tau} \gets \textbf{JL.Agg}(public_param, sk_0,\tau, \{y_{u,\tau}\}_{u \in \{1,..,n\}})\)

This algorithm aggregates the \(n\) ciphers received at time period \(\tau\) to obtain \(y_{\tau} = \prod_1^n{y_{u,\tau}}\) and decrypts the result. It obtains the sum of the private inputs ( \( X_{\tau} = \sum_{1}^n{x_{u,\tau}} \) ) as follows:

\[V_{\tau} = H(\tau)^{sk_0} \cdot y_{\tau} \qquad \qquad X_{\tau} = \frac{V_{\tau}-1}{N} \mod N\]

Parameters:

Name Type Description Default
sk_0 ServerKey

The server's secret key \(sk_0\)

required
tau int

The time period \(\tau\)

required
list_y_u_tau List[List[EncryptedNumber]]

A list of the users' protected inputs \(\{y_{u,\tau}\}_{u \in \{1,..,n\}}\)

required
num_expected_params int

Number of parameters to decode from the decrypted vectors

required

Returns:

Type Description
List[int]

The sum of the users' inputs of type int

Raises:

Type Description
ValueError

bad argument value

Source code in fedbiomed/common/secagg/_jls.py
def aggregate(
        self,
        sk_0: ServerKey,
        tau: int,
        list_y_u_tau: List[List[EncryptedNumber]],
        num_expected_params: int
) -> List[int]:
    """Aggregates users protected inputs with the server's secret key


    \\(X_{\\tau} \\gets \\textbf{JL.Agg}(public_param, sk_0,\\tau, \\{y_{u,\\tau}\\}_{u \\in \\{1,..,n\\}})\\)

    This algorithm aggregates the \\(n\\) ciphers received at time period \\(\\tau\\) to obtain
    \\(y_{\\tau} = \\prod_1^n{y_{u,\\tau}}\\) and decrypts the result.
    It obtains the sum of the private inputs ( \\( X_{\\tau} = \\sum_{1}^n{x_{u,\\tau}} \\) )
    as follows:

    $$V_{\\tau} = H(\\tau)^{sk_0} \\cdot y_{\\tau} \\qquad \\qquad X_{\\tau} = \\frac{V_{\\tau}-1}{N} \\mod N$$

    Args:
        sk_0: The server's secret key \\(sk_0\\)
        tau: The time period \\(\\tau\\)
        list_y_u_tau: A list of the users' protected inputs \\(\\{y_{u,\\tau}\\}_{u \\in \\{1,..,n\\}}\\)
        num_expected_params: Number of parameters to decode from the decrypted vectors

    Returns:
        The sum of the users' inputs of type `int`

    Raises:
        ValueError: bad argument value
    """

    if not isinstance(sk_0, ServerKey):
        raise ValueError("Key must be an instance of `ServerKey`")

    if not isinstance(list_y_u_tau, list) or not list_y_u_tau:
        raise ValueError("list_y_u_tau should be a non-empty list.")

    if not isinstance(list_y_u_tau[0], list):
        raise ValueError("list_y_u_tau should be a list that contains list of encrypted numbers")

    n_user = len(list_y_u_tau)

    sum_of_vectors: List[EncryptedNumber] = [sum(ep) for ep in zip(*list_y_u_tau)]

    decrypted_vector = sk_0.decrypt(sum_of_vectors, tau)

    return self._vector_encoder.decode(decrypted_vector, add_ops=n_user, v_expected=num_expected_params)
protect
protect(public_param, user_key, tau, x_u_tau, n_users)

Protect user input with the user's secret key:

\(y_{u,\tau} \gets \textbf{JL.Protect}(public_param,sk_u,\tau,x_{u,\tau})\)

This algorithm encrypts private inputs \(x_{u,\tau} \in \mathbb{Z}_N\) for time period \(\tau\) using secret key \(sk_u \in \mathbb{Z}_N^2\) . It outputs cipher \(y_{u,\tau}\) such that:

\[y_{u,\tau} = (1 + x_{u,\tau} N) H(\tau)^{sk_u} \mod N^2\]

Parameters:

Name Type Description Default
public_param PublicParam

The public parameters \(public_param\)

required
user_key UserKey

The user's secret key \(sk_u\)

required
tau int

The time period \(\tau\)

required
x_u_tau List[int]

The user's input \(x_{u,\tau}\)

required
n_users int

Number of nodes/users that participates secure aggregation

required

Returns:

Type Description
List[mpz]

The protected input of type EncryptedNumber or a list of EncryptedNumber

Raises:

Type Description
TypeError

bad argument type

ValueError

bad argument value

Source code in fedbiomed/common/secagg/_jls.py
def protect(self,
            public_param: PublicParam,
            user_key: UserKey,
            tau: int,
            x_u_tau: List[int],
            n_users: int,
            ) -> List[mpz]:
    """ Protect user input with the user's secret key:

    \\(y_{u,\\tau} \\gets \\textbf{JL.Protect}(public_param,sk_u,\\tau,x_{u,\\tau})\\)

    This algorithm encrypts private inputs
    \\(x_{u,\\tau} \\in \\mathbb{Z}_N\\) for time period \\(\\tau\\)
    using secret key \\(sk_u \\in \\mathbb{Z}_N^2\\) . It outputs cipher \\(y_{u,\\tau}\\) such that:

    $$y_{u,\\tau} = (1 + x_{u,\\tau} N) H(\\tau)^{sk_u} \\mod N^2$$

    Args:
        public_param: The public parameters \\(public_param\\)
        user_key: The user's secret key \\(sk_u\\)
        tau: The time period \\(\\tau\\)
        x_u_tau: The user's input \\(x_{u,\\tau}\\)
        n_users: Number of nodes/users that participates secure aggregation

    Returns:
            The protected input of type `EncryptedNumber` or a list of `EncryptedNumber`

    Raises:
            TypeError: bad argument type
            ValueError: bad argument value
    """
    if not isinstance(user_key, UserKey):
        raise TypeError(f"Expected key for encryption type is UserKey. but got {type(user_key)}")

    if user_key.public_param != public_param:
        raise ValueError("Bad public parameter. The public parameter of user key does not match the "
                         "one given for encryption")

    if not isinstance(x_u_tau, list):
        raise TypeError(f"Bad vector for encryption. Excepted argument `x_u_tau` type list but "
                        f"got {type(x_u_tau)}")

    x_u_tau = self._vector_encoder.encode(
        V=x_u_tau,
        add_ops=n_users
    )

    return user_key.encrypt(x_u_tau, tau)

LOM

LOM(nonce=None)

Lightweight Obfuscation Mechanism (LOM) class for protecting and aggregating data.

Attributes:

Name Type Description
_prf PRF

An instance of the PRF class.

_vector_dtype str

The data type of the vector.

Source code in fedbiomed/common/secagg/_lom.py
def __init__(
    self,
    nonce: bytes | None = None
) -> None:

    if not nonce:
        nonce = secrets.token_bytes(16)

    self._prf: PRF = PRF(nonce)
    self._vector_dtype: str = 'uint32'
    self._values_bit = np.iinfo(np.dtype(self._vector_dtype)).bits  # should be equal to 32 bit

Functions

aggregate
aggregate(list_y_u_tau)

Aggregates multiple vectors into a single vector.

Parameters:

Name Type Description Default
list_y_u_tau List[int]

A dictionary of vectors from different nodes.

required

Returns:

Type Description
List[int]

The aggregated vector.

Source code in fedbiomed/common/secagg/_lom.py
def aggregate(self, list_y_u_tau: List[int]) -> List[int]:
    """
    Aggregates multiple vectors into a single vector.

    Args:
        list_y_u_tau: A dictionary of vectors from different nodes.

    Returns:
        The aggregated vector.
    """
    list_y_u_tau = np.array(list_y_u_tau, dtype=self._vector_dtype)
    decrypted_vector = np.sum(list_y_u_tau, axis=0)
    decrypted_vector = decrypted_vector.astype(np.int32)
    decrypted_vector = decrypted_vector.tolist()

    return decrypted_vector
protect
protect(node_id, pairwise_secrets, tau, x_u_tau, node_ids)

Protects the input vector by applying a mask based on pairwise secrets.

Parameters:

Name Type Description Default
node_id str

Id of the node that applies encryption

required
pairwise_secrets Dict[str, bytes]

DH agreed secrets between node that applies encryption and others

required
tau int

The current round number.

required
x_u_tau List[int]

The input vector to be protected.

required
node_ids List[str]

A list of node IDs participates aggregation.

required

Raises:

Type Description
FedBioMedError

raises if the input vector x_u_tau contains any values that exceed 32 - log_2(numner_of_nodes), where 32 is the number of bit for each value (uint32). Not respecting the above condition can lead to computation overflow.

Returns:

Type Description
List[int]

The protected (masked) vector.

Source code in fedbiomed/common/secagg/_lom.py
def protect(
    self,
    node_id: str,
    pairwise_secrets: Dict[str, bytes],
    tau: int,
    x_u_tau: List[int],
    node_ids: List[str]
) -> List[int]:
    """
    Protects the input vector by applying a mask based on pairwise secrets.

    Args:
        node_id: Id of the node that applies encryption
        pairwise_secrets: DH agreed secrets between node that applies encryption and others
        tau: The current round number.
        x_u_tau: The input vector to be protected.
        node_ids: A list of node IDs participates aggregation.

    Raises:
        FedBioMedError: raises if the input vector `x_u_tau` contains any 
            values that exceed  `32 - log_2(numner_of_nodes)`, where `32` is 
            the number of bit for each value (`uint32`). Not respecting the above condition 
            can lead to computation overflow.

    Returns:
        The protected (masked) vector.
    """

    num_nodes = len(node_ids)
    _max_bit_length = self._values_bit - math.log2(num_nodes)
    # FIXME: is a ceiling missing in this equation (ceil(math.log2(num_nodes)))
    if any(val.bit_length() > _max_bit_length for val in x_u_tau):
        raise FedbiomedSecaggError(
            f"{ErrorNumbers.FB417.value}: Bit length of one or more values of input vector has more bits "
            f"that {_max_bit_length}. This could lead to computation overflow"
        )

    x_u_tau = np.array(x_u_tau, dtype=self._vector_dtype)
    mask = np.zeros(len(x_u_tau), dtype=self._vector_dtype)
    for pair_id in node_ids:

        if pair_id == node_id:
            continue

        secret = pairwise_secrets[pair_id]

        pairwise_seed = self._prf.eval_key(
            pairwise_secret=secret,
            tau=tau)

        # print(len(pairwise_seed))
        pairwise_vector = self._prf.eval_vector(
            seed=pairwise_seed,
            tau=tau,
            input_size=len(x_u_tau))

        pairwise_vector = np.frombuffer(pairwise_vector, dtype=self._vector_dtype)

        if pair_id < node_id:
            mask += pairwise_vector
        else:
            mask -= pairwise_vector

    encrypted_params = mask + x_u_tau

    return encrypted_params.tolist()

PRF

PRF(nonce)

Pseudorandom Function (PRF) class using the ChaCha20 stream cipher.

Attributes:

Name Type Description
nonce bytes

A 12-byte nonce used for encryption.

Source code in fedbiomed/common/secagg/_lom.py
def __init__(self, nonce: bytes) -> None:
    self._nonce = nonce

Functions

eval_key
eval_key(pairwise_secret, tau)

Evaluates a pseudorandom key for a given round.

Parameters:

Name Type Description Default
pairwise_secret bytes

A secret key shared between nodes.

required
round int

The current round number.

required

Returns:

Name Type Description
bytes bytes

A 32-byte pseudorandom key.

Source code in fedbiomed/common/secagg/_lom.py
def eval_key(self, pairwise_secret: bytes, tau: int) -> bytes:
    """
    Evaluates a pseudorandom key for a given round.

    Args:
        pairwise_secret (bytes): A secret key shared between nodes.
        round (int): The current round number.

    Returns:
        bytes: A 32-byte pseudorandom key.
    """
    tau = tau.to_bytes(16, 'big')
    try:
        encryptor = Cipher(
            algorithms.ChaCha20(pairwise_secret, self._nonce),
            mode=None,
            backend=default_backend()
        ).encryptor()
    except ValueError as ve:
        raise FedbiomedSecaggError(f"{ErrorNumbers.FB417.value}: Error while ciphering: got exception {ve}")
    c = encryptor.update(tau) + encryptor.finalize()
    # the output is a 16 bytes string, pad it to 32 bytes
    c = c + b'\x00' * 16

    return c
eval_vector
eval_vector(seed, tau, input_size)

Evaluates a pseudorandom vector based on the seed.

Parameters:

Name Type Description Default
seed bytes

A 32-byte seed for generating the vector.

required
tau int

The current round number.

required
input_size int

The size of the input vector.

required

Returns:

Name Type Description
bytes bytes

A pseudorandom vector of the specified size.

Source code in fedbiomed/common/secagg/_lom.py
def eval_vector(self, seed: bytes, tau: int, input_size: int) -> bytes:
    """
    Evaluates a pseudorandom vector based on the seed.

    Args:
        seed (bytes): A 32-byte seed for generating the vector.
        tau (int): The current round number.
        input_size (int): The size of the input vector.

    Returns:
        bytes: A pseudorandom vector of the specified size.
    """
    encryptor = Cipher(
        algorithms.ChaCha20(seed, self._nonce),
        mode=None,
        backend=default_backend()
    ).encryptor()

    # TODO: Better handling limits for secure aggregation
    if not (input_size + _MAX_ROUND) <= 2**32:
        raise FedbiomedSecaggError(
            f"{ErrorNumbers.FB417.value}: Can not perform encryiton due to large input vector. input_size "
            f"({input_size}) + MAX_ROUND ({_MAX_ROUND}) allowed is greater than 2**32"
        )

    # create a list of indices from 0 to input_size where each element is concatenated with tau
    taus = b''.join([(i + tau).to_bytes(4, 'big') for i in range(input_size)])
    return encryptor.update(taus) + encryptor.finalize()

SecaggCrypter

SecaggCrypter()

Secure aggregation encryption and decryption manager.

This class is responsible for encrypting model parameters using Joye-Libert secure aggregation scheme. It also aggregates encrypted model parameters and decrypts to retrieve final model parameters as vector. This vector can be loaded into model by converting it proper format for the framework.

Source code in fedbiomed/common/secagg/_secagg_crypter.py
def __init__(self) -> None:
    """Constructs ParameterEncrypter"""
    self._jls = JoyeLibert()

Functions

aggregate
aggregate(current_round, num_nodes, params, key, biprime, total_sample_size, clipping_range=None, num_expected_params=1)

Decrypt given parameters

Parameters:

Name Type Description Default
current_round int

The round that the aggregation will be done

required
params List[List[int]]

Aggregated/Summed encrypted parameters

required
num_nodes int

number of nodes

required
key int

The key that will be used for decryption

required
biprime int

Biprime number of PublicParam

required
total_sample_size int

sum of number of samples from all nodes

required
clipping_range Union[int, None]

Clipping range for reverse-quantization, should be the same clipping range used for quantization

None
num_expected_params int

number of parameters to decode from the params

1

Returns: Aggregated parameters decrypted and structured

Raises:

Type Description
FedbiomedSecaggCrypterError

bad parameters

FedbiomedSecaggCrypterError

aggregation issue

Source code in fedbiomed/common/secagg/_secagg_crypter.py
def aggregate(
        self,
        current_round: int,
        num_nodes: int,
        params: List[List[int]],
        key: int,
        biprime: int,
        total_sample_size: int,
        clipping_range: Union[int, None] = None,
        num_expected_params: int = 1
) -> List[float]:
    """Decrypt given parameters

    Args:
        current_round: The round that the aggregation will be done
        params: Aggregated/Summed encrypted parameters
        num_nodes: number of nodes
        key: The key that will be used for decryption
        biprime: Biprime number of `PublicParam`
        total_sample_size: sum of number of samples from all nodes
        clipping_range: Clipping range for reverse-quantization, should be the
            same clipping range used for quantization
        num_expected_params: number of parameters to decode from the `params`
    Returns:
        Aggregated parameters decrypted and structured

    Raises:
         FedbiomedSecaggCrypterError: bad parameters
         FedbiomedSecaggCrypterError: aggregation issue
    """
    start = time.process_time()

    if len(params) != num_nodes:
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: Num of parameters that are received from nodes "
            f"does not match the number of nodes has been set for the encrypter. There might "
            f"be some nodes did not answered to training request or num of clients of "
            "`ParameterEncrypter` has not been set properly before train request.")

    if not isinstance(params, list) or not all(isinstance(p, list) for p in params):
        raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: The parameters to aggregate should be a "
                                          f"list containing list of parameters")

    if not all(all(isinstance(p_, int) for p_ in p) for p in params):
        raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: Invalid parameter type. The parameters "
                                          f"should be of type of integers.")

    # TODO provide dynamically created biprime. Biprime that is used
    #  on the node-side should matched the one used for decryption
    public_param = self._setup_public_param(biprime=biprime)
    key = ServerKey(public_param, key)

    params = self._convert_to_encrypted_number(params, public_param)

    try:
        sum_of_weights = self._jls.aggregate(
            sk_0=key,
            tau=current_round,  # The time period \\(\\tau\\)
            list_y_u_tau=params,
            num_expected_params=num_expected_params
        )
    except (ValueError, TypeError) as e:
        raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624.value}: The aggregation of encrypted parameters "
                                          f"is not successful: {e}")

    # Reverse quantize and division (averaging)
    logger.info(f"Aggregating {len(params)} parameters from {num_nodes} nodes.")
    aggregated_params = self._apply_average(sum_of_weights, total_sample_size)

    aggregated_params: List[float] = reverse_quantize(
        aggregated_params,
        clipping_range=clipping_range
    )
    time_elapsed = time.process_time() - start
    logger.debug(f"Aggregation is completed in {round(time_elapsed, ndigits=2)} seconds.")

    return aggregated_params
encrypt
encrypt(num_nodes, current_round, params, key, biprime, clipping_range=None, weight=None)

Encrypts model parameters.

Parameters:

Name Type Description Default
num_nodes int

Number of nodes that is expected to encrypt parameters for aggregation

required
current_round int

Current round of federated training

required
params List[float]

List of flatten parameters

required
key int

Key to encrypt

required
biprime int

Prime number to create public parameter

required
weight Optional[int]

Weight for the params

None
clipping_range Union[int, None]

Clipping-range for quantization of float model parameters. Clipping range must grater than minimum model parameters

None

Returns:

Type Description
List[int]

List of encrypted parameters

Raises:

Type Description
FedbiomedSecaggCrypterError

bad parameters

FedbiomedSecaggCrypterError

encryption issue

Source code in fedbiomed/common/secagg/_secagg_crypter.py
def encrypt(
        self,
        num_nodes: int,
        current_round: int,
        params: List[float],
        key: int,
        biprime: int,
        clipping_range: Union[int, None] = None,
        weight: Optional[int] = None,
) -> List[int]:
    """Encrypts model parameters.

    Args:
        num_nodes: Number of nodes that is expected to encrypt parameters for aggregation
        current_round: Current round of federated training
        params: List of flatten parameters
        key: Key to encrypt
        biprime: Prime number to create public parameter
        weight: Weight for the params
        clipping_range: Clipping-range for quantization of float model parameters. Clipping range
            must grater than minimum model parameters

    Returns:
        List of encrypted parameters

    Raises:
        FedbiomedSecaggCrypterError: bad parameters
        FedbiomedSecaggCrypterError: encryption issue
    """

    start = time.process_time()

    if not isinstance(params, list):
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: Expected argument `params` type list but got {type(params)}"
        )

    if not all([isinstance(p, float) for p in params]):
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: The parameters to encrypt should list of floats. "
            f"There are one or more than a value that is not type of float."
        )

    # Make use the key is instance of
    if not isinstance(key, int):
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: The argument `key` must be integer"
        )

    # first we quantize the parameters, and we get params in the range [0, 2^VEParameters.TARGET_RANGE]
    params = quantize(weights=params,
                      clipping_range=clipping_range)

    # We multiply the parameters with the weight, and we get params in
    # the range [0, 2^(log2(VEParameters.TARGET_RANGE) + log2(VEParameters.WEIGHT_RANGE)) - 1]
    # Check if weight if num_bits of weight is less than VEParameters.WEIGHT_RANGE
    if weight is not None:
        if 2**weight.bit_length() > SAParameters.WEIGHT_RANGE:
            raise FedbiomedSecaggCrypterError(
                f"{ErrorNumbers.FB624.value}: The weight is too large. The weight should be less than "
                f"{SAParameters.WEIGHT_RANGE}, but got {weight}"
            )
        params = self._apply_weighting(params, weight)


    public_param = self._setup_public_param(biprime=biprime)

    # Instantiates UserKey object
    key = UserKey(public_param, key)

    try:
        # Encrypt parameters
        encrypted_params: List[mpz] = self._jls.protect(
            public_param=public_param,
            user_key=key,
            tau=current_round,
            x_u_tau=params,
            n_users=num_nodes
        )
    except (TypeError, ValueError) as exp:
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value} Error during parameter encryption. {exp}") from exp

    time_elapsed = time.process_time() - start
    logger.debug(f"Encryption of the parameters took {time_elapsed} seconds.")

    return [int(e_p) for e_p in encrypted_params]

SecaggLomCrypter

SecaggLomCrypter(nonce=None)

Bases: SecaggCrypter

Low-Overhead Masking secure aggregation controller

Parameters:

Name Type Description Default
nonce str | None

nonce to use in encryption

None
Source code in fedbiomed/common/secagg/_secagg_crypter.py
def __init__(
    self,
    nonce: str | None = None
):
    """LOM Secure aggregation to encrypt and aggregate

    Args:
        nonce: `nonce` to use in encryption
    """
    if nonce:
        random.seed(nonce)
        nonce = random.getrandbits(128).to_bytes(16, 'big')

    self._lom = LOM(nonce)

Functions

aggregate
aggregate(params, total_sample_size, clipping_range=None)

Decrypt given parameters

Parameters:

Name Type Description Default
params List[List[int]]

Aggregated/Summed encrypted parameters

required
total_sample_size int

sum of number of samples from all nodes

required
clipping_range Union[int, None]

Clipping range for reverse-quantization, should be the same clipping range used for quantization

None

Returns: Aggregated parameters decrypted and structured

Raises:

Type Description
FedbiomedSecaggCrypterError

bad parameters

FedbiomedSecaggCrypterError

aggregation issue

Source code in fedbiomed/common/secagg/_secagg_crypter.py
def aggregate(
        self,
        params: List[List[int]],
        total_sample_size: int,
        clipping_range: Union[int, None] = None,
) -> List[float]:
    """Decrypt given parameters

    Args:
        params: Aggregated/Summed encrypted parameters
        total_sample_size: sum of number of samples from all nodes
        clipping_range: Clipping range for reverse-quantization, should be the
            same clipping range used for quantization
    Returns:
        Aggregated parameters decrypted and structured

    Raises:
         FedbiomedSecaggCrypterError: bad parameters
         FedbiomedSecaggCrypterError: aggregation issue
    """
    start = time.process_time()

    if not isinstance(params, list) or not all(isinstance(p, list) for p in params):
        raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: The parameters to aggregate should be a "
                                          f"list containing list of parameters")

    if not all(all(isinstance(p_, int) for p_ in p) for p in params):
        raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: Invalid parameter type. The parameters "
                                          f"should be of type of integers.")

    num_nodes = len(params)

    try:
        sum_of_weights = self._lom.aggregate(
            list_y_u_tau=params,
        )
    except (ValueError, TypeError) as e:
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: The aggregation of encrypted parameters "
            f"is not successful: {e}") from e


    # Reverse quantize and division (averaging)
    logger.info(f"Aggregating {len(params)} parameters from {num_nodes} nodes.")
    aggregated_params = self._apply_average(sum_of_weights, total_sample_size)

    aggregated_params: List[float] = reverse_quantize(
        aggregated_params,
        clipping_range=clipping_range
    )
    time_elapsed = time.process_time() - start
    logger.debug(f"Aggregation is completed in {round(time_elapsed, ndigits=2)} seconds.")

    return aggregated_params
encrypt
encrypt(current_round, node_id, params, pairwise_secrets, node_ids, clipping_range=None, weight=None)

Encrypts model parameters.

Parameters:

Name Type Description Default
current_round int

Current round of federated training

required
node_id str

ID of the node applies encryption

required
params List[float]

List of flatten parameters

required
pairwise_secrets Dict[str, bytes]

DH agreed secrets between node that applies encryption and others

required
node_ids List[str]

All nodes that participates secure aggregation

required
weight Optional[int]

Weight for the params

None
clipping_range Union[int, None]

Clipping-range for quantization of float model parameters. Clipping range must grater than minimum model parameters

None

Returns:

Type Description
List[int]

List of encrypted parameters

Raises:

Type Description
FedbiomedSecaggCrypterError

bad parameters

FedbiomedSecaggCrypterError

encryption issue

Source code in fedbiomed/common/secagg/_secagg_crypter.py
def encrypt(
    self,
    current_round: int,
    node_id: str,
    params: List[float],
    pairwise_secrets: Dict[str, bytes],
    node_ids: List[str],
    clipping_range: Union[int, None] = None,
    weight: Optional[int] = None,
) -> List[int]:
    """Encrypts model parameters.

    Args:
        current_round: Current round of federated training
        node_id: ID of the node applies encryption
        params: List of flatten parameters
        pairwise_secrets: DH agreed secrets between node that applies encryption and others
        node_ids: All nodes that participates secure aggregation
        weight: Weight for the params
        clipping_range: Clipping-range for quantization of float model parameters. Clipping range
            must grater than minimum model parameters

    Returns:
        List of encrypted parameters

    Raises:
        FedbiomedSecaggCrypterError: bad parameters
        FedbiomedSecaggCrypterError: encryption issue
    """

    start = time.process_time()

    if not isinstance(params, list):
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: Expected argument `params` type list but got {type(params)}"
        )

    if not all(isinstance(p, float) for p in params):
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value}: The parameters to encrypt should list of floats. "
            f"There are one or more than a value that is not type of float."
        )

    params = quantize(weights=params,
                      clipping_range=clipping_range)

    if weight is not None:
        if 2**weight.bit_length() > SAParameters.WEIGHT_RANGE:
            raise FedbiomedSecaggCrypterError(
                f"{ErrorNumbers.FB624.value}: The weight is too large. The weight should be less than "
                f"{SAParameters.WEIGHT_RANGE}."
            )
        params = self._apply_weighting(params, weight)

    try:
        # Encrypt parameters
        encrypted_params: List[int] = self._lom.protect(
            pairwise_secrets=pairwise_secrets,
            node_id=node_id,
            tau=current_round,
            x_u_tau=params,
            node_ids=node_ids
        )
    except (TypeError, ValueError) as exp:
        raise FedbiomedSecaggCrypterError(
            f"{ErrorNumbers.FB624.value} Error during parameter encryption. {exp}") from exp


    time_elapsed = time.process_time() - start
    logger.debug(f"Encryption of the parameters took {time_elapsed} seconds.")

    return encrypted_params