Classes
DHKey
DHKey(private_key_pem=None, public_key_pem=None)
Key handling for ephemeral-ephemeral ECDH, a session is defined for each experiment.
This class handles the generation and importing of ECC keys using the P-256 curve. It provides methods to export the private and public keys in PEM format as bytes.
Attributes:
Name | Type | Description |
---|---|---|
private_key | The user's private ECC key. | |
public_key | The user's public ECC key. |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
private_key_pem | bytes | None | Optional. The private key in PEM format. | None |
public_key_pem | bytes | None | Optional. The public key in PEM format. | None |
Source code in fedbiomed/common/secagg/_dh.py
def __init__(
self,
private_key_pem: bytes | None = None,
public_key_pem: bytes | None = None
) -> None:
"""
Initializes the DHKey instance by generating a new key pair or importing the provided keys.
Args:
private_key_pem: Optional. The private key in PEM format.
public_key_pem: Optional. The public key in PEM format.
"""
if private_key_pem:
self.private_key = self._import_key(
serialization.load_pem_private_key,
data=private_key_pem,
password=None,
backend=default_backend()
)
elif not public_key_pem:
self.private_key = ec.generate_private_key(
ec.SECP256R1(), default_backend()
)
else:
# Means that only public key is loaded
self.private_key = None
if public_key_pem:
self.public_key = self._import_key(serialization.load_pem_public_key,
data=public_key_pem,
backend=default_backend()
)
else:
self.public_key = self.private_key.public_key()
Attributes
private_key instance-attribute
private_key = _import_key(load_pem_private_key, data=private_key_pem, password=None, backend=default_backend())
public_key instance-attribute
public_key = _import_key(load_pem_public_key, data=public_key_pem, backend=default_backend())
Functions
export_private_key
export_private_key()
Exports the private key to PEM format.
Returns:
Type | Description |
---|---|
The private key in PEM format. Returns |
Source code in fedbiomed/common/secagg/_dh.py
def export_private_key(self):
"""
Exports the private key to PEM format.
Returns:
The private key in PEM format. Returns `None` if private key
is not loaded.
"""
if not self.private_key:
return None
return self.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
export_public_key
export_public_key()
Exports the public key to PEM format.
Returns:
Type | Description |
---|---|
The public key in PEM format. |
Source code in fedbiomed/common/secagg/_dh.py
def export_public_key(self):
"""
Exports the public key to PEM format.
Returns:
The public key in PEM format.
"""
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
DHKeyAgreement
DHKeyAgreement(node_u_id, node_u_dh_key, session_salt)
Key Agreement with ephemeral-ephemeral ECDH, a session is defined for each experiment.
This class handles the key agreement process using the private ECC key of the user. It imports the private key, performs the key exchange, and derives the shared secret using a KDF.
Attributes:
Name | Type | Description |
---|---|---|
private_key | The user's private ECC key. |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_u_id | The ID of the node. | required | |
node_u_dh_key | DHKey | The keypair of the node. | required |
session_salt | A session-specific salt. | required |
Source code in fedbiomed/common/secagg/_dh.py
def __init__(self, node_u_id, node_u_dh_key: DHKey, session_salt):
"""
Initializes the DHKeyAgreement instance.
Args:
node_u_id: The ID of the node.
node_u_dh_key: The keypair of the node.
session_salt: A session-specific salt.
"""
self._node_u_id = node_u_id
self._dh_key = node_u_dh_key
self.session_salt = session_salt
Attributes
session_salt instance-attribute
session_salt = session_salt
Functions
agree
agree(node_v_id, public_key_pem)
Performs the key agreement and derives the shared secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_v_id | The ID of the other node (node_v). | required | |
public_key_pem | The public key of the other node in PEM format as bytes. | required |
Returns:
Type | Description |
---|---|
The derived shared secret. |
Source code in fedbiomed/common/secagg/_dh.py
def agree(self, node_v_id, public_key_pem):
"""
Performs the key agreement and derives the shared secret.
Args:
node_v_id: The ID of the other node (node_v).
public_key_pem: The public key of the other node in PEM format as bytes.
Returns:
The derived shared secret.
"""
dh_v_key = DHKey(public_key_pem=public_key_pem)
shared_secret = self._dh_key.private_key.exchange(ec.ECDH(), dh_v_key.public_key)
derived_key = self._kdf(shared_secret, node_v_id)
return derived_key
EncryptedNumber
EncryptedNumber(param, ciphertext)
Bases: object
An encrypted number by one of the user keys .
Attributes:
Name | Type | Description |
---|---|---|
param | The public parameters | |
ciphertext | The integer value of the ciphertext |
ciphertext: The integer value of the ciphertext
Source code in fedbiomed/common/secagg/_jls.py
def __init__(self, param: PublicParam, ciphertext: int):
"""
Args:
param: The public parameters.
ciphertext: The integer value of the ciphertext
"""
self.public_param = param
self.ciphertext = mpz(ciphertext)
Attributes
ciphertext instance-attribute
ciphertext = mpz(ciphertext)
public_param instance-attribute
public_param = param
JoyeLibert
JoyeLibert()
The Joye-Libert scheme. It consists of three Probabilistic Polynomial Time algorithms: Protect
, and Agg
.
Attributes:
Name | Type | Description |
---|---|---|
_vector_encoder | The vector encoding/decoding scheme |
VEParameters.TARGET_RANGE + VEParameters.WEIGHT_RANGE should be equal or less than 2**32
Source code in fedbiomed/common/secagg/_jls.py
def __init__(self):
"""Constructs the class
VEParameters.TARGET_RANGE + VEParameters.WEIGHT_RANGE should be
equal or less than 2**32
"""
self._vector_encoder = VES(
ptsize=SAParameters.KEY_SIZE // 2,
valuesize=ceil(log2(SAParameters.TARGET_RANGE) + log2(SAParameters.WEIGHT_RANGE))
)
Functions
aggregate
aggregate(sk_0, tau, list_y_u_tau, num_expected_params)
Aggregates users protected inputs with the server's secret key
\(X_{\tau} \gets \textbf{JL.Agg}(public_param, sk_0,\tau, \{y_{u,\tau}\}_{u \in \{1,..,n\}})\)
This algorithm aggregates the \(n\) ciphers received at time period \(\tau\) to obtain \(y_{\tau} = \prod_1^n{y_{u,\tau}}\) and decrypts the result. It obtains the sum of the private inputs ( \( X_{\tau} = \sum_{1}^n{x_{u,\tau}} \) ) as follows:
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sk_0 | ServerKey | The server's secret key \(sk_0\) | required |
tau | int | The time period \(\tau\) | required |
list_y_u_tau | List[List[EncryptedNumber]] | A list of the users' protected inputs \(\{y_{u,\tau}\}_{u \in \{1,..,n\}}\) | required |
num_expected_params | int | Number of parameters to decode from the decrypted vectors | required |
Returns:
Type | Description |
---|---|
List[int] | The sum of the users' inputs of type |
Raises:
Type | Description |
---|---|
ValueError | bad argument value |
Source code in fedbiomed/common/secagg/_jls.py
def aggregate(
self,
sk_0: ServerKey,
tau: int,
list_y_u_tau: List[List[EncryptedNumber]],
num_expected_params: int
) -> List[int]:
"""Aggregates users protected inputs with the server's secret key
\\(X_{\\tau} \\gets \\textbf{JL.Agg}(public_param, sk_0,\\tau, \\{y_{u,\\tau}\\}_{u \\in \\{1,..,n\\}})\\)
This algorithm aggregates the \\(n\\) ciphers received at time period \\(\\tau\\) to obtain
\\(y_{\\tau} = \\prod_1^n{y_{u,\\tau}}\\) and decrypts the result.
It obtains the sum of the private inputs ( \\( X_{\\tau} = \\sum_{1}^n{x_{u,\\tau}} \\) )
as follows:
$$V_{\\tau} = H(\\tau)^{sk_0} \\cdot y_{\\tau} \\qquad \\qquad X_{\\tau} = \\frac{V_{\\tau}-1}{N} \\mod N$$
Args:
sk_0: The server's secret key \\(sk_0\\)
tau: The time period \\(\\tau\\)
list_y_u_tau: A list of the users' protected inputs \\(\\{y_{u,\\tau}\\}_{u \\in \\{1,..,n\\}}\\)
num_expected_params: Number of parameters to decode from the decrypted vectors
Returns:
The sum of the users' inputs of type `int`
Raises:
ValueError: bad argument value
"""
if not isinstance(sk_0, ServerKey):
raise ValueError("Key must be an instance of `ServerKey`")
if not isinstance(list_y_u_tau, list) or not list_y_u_tau:
raise ValueError("list_y_u_tau should be a non-empty list.")
if not isinstance(list_y_u_tau[0], list):
raise ValueError("list_y_u_tau should be a list that contains list of encrypted numbers")
n_user = len(list_y_u_tau)
sum_of_vectors: List[EncryptedNumber] = [sum(ep) for ep in zip(*list_y_u_tau)]
decrypted_vector = sk_0.decrypt(sum_of_vectors, tau)
return self._vector_encoder.decode(decrypted_vector, add_ops=n_user, v_expected=num_expected_params)
protect
protect(public_param, user_key, tau, x_u_tau, n_users)
Protect user input with the user's secret key:
\(y_{u,\tau} \gets \textbf{JL.Protect}(public_param,sk_u,\tau,x_{u,\tau})\)
This algorithm encrypts private inputs \(x_{u,\tau} \in \mathbb{Z}_N\) for time period \(\tau\) using secret key \(sk_u \in \mathbb{Z}_N^2\) . It outputs cipher \(y_{u,\tau}\) such that:
Parameters:
Name | Type | Description | Default |
---|---|---|---|
public_param | PublicParam | The public parameters \(public_param\) | required |
user_key | UserKey | The user's secret key \(sk_u\) | required |
tau | int | The time period \(\tau\) | required |
x_u_tau | List[int] | The user's input \(x_{u,\tau}\) | required |
n_users | int | Number of nodes/users that participates secure aggregation | required |
Returns:
Type | Description |
---|---|
List[mpz] | The protected input of type |
Raises:
Type | Description |
---|---|
TypeError | bad argument type |
ValueError | bad argument value |
Source code in fedbiomed/common/secagg/_jls.py
def protect(self,
public_param: PublicParam,
user_key: UserKey,
tau: int,
x_u_tau: List[int],
n_users: int,
) -> List[mpz]:
""" Protect user input with the user's secret key:
\\(y_{u,\\tau} \\gets \\textbf{JL.Protect}(public_param,sk_u,\\tau,x_{u,\\tau})\\)
This algorithm encrypts private inputs
\\(x_{u,\\tau} \\in \\mathbb{Z}_N\\) for time period \\(\\tau\\)
using secret key \\(sk_u \\in \\mathbb{Z}_N^2\\) . It outputs cipher \\(y_{u,\\tau}\\) such that:
$$y_{u,\\tau} = (1 + x_{u,\\tau} N) H(\\tau)^{sk_u} \\mod N^2$$
Args:
public_param: The public parameters \\(public_param\\)
user_key: The user's secret key \\(sk_u\\)
tau: The time period \\(\\tau\\)
x_u_tau: The user's input \\(x_{u,\\tau}\\)
n_users: Number of nodes/users that participates secure aggregation
Returns:
The protected input of type `EncryptedNumber` or a list of `EncryptedNumber`
Raises:
TypeError: bad argument type
ValueError: bad argument value
"""
if not isinstance(user_key, UserKey):
raise TypeError(f"Expected key for encryption type is UserKey. but got {type(user_key)}")
if user_key.public_param != public_param:
raise ValueError("Bad public parameter. The public parameter of user key does not match the "
"one given for encryption")
if not isinstance(x_u_tau, list):
raise TypeError(f"Bad vector for encryption. Excepted argument `x_u_tau` type list but "
f"got {type(x_u_tau)}")
x_u_tau = self._vector_encoder.encode(
V=x_u_tau,
add_ops=n_users
)
return user_key.encrypt(x_u_tau, tau)
LOM
LOM(nonce=None)
Lightweight Obfuscation Mechanism (LOM) class for protecting and aggregating data.
Attributes:
Name | Type | Description |
---|---|---|
_prf | PRF | An instance of the PRF class. |
_vector_dtype | str | The data type of the vector. |
Source code in fedbiomed/common/secagg/_lom.py
def __init__(
self,
nonce: bytes | None = None
) -> None:
if not nonce:
nonce = secrets.token_bytes(16)
self._prf: PRF = PRF(nonce)
self._vector_dtype: str = 'uint32'
self._values_bit = np.iinfo(np.dtype(self._vector_dtype)).bits # should be equal to 32 bit
Functions
aggregate
aggregate(list_y_u_tau)
Aggregates multiple vectors into a single vector.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
list_y_u_tau | List[int] | A dictionary of vectors from different nodes. | required |
Returns:
Type | Description |
---|---|
List[int] | The aggregated vector. |
Source code in fedbiomed/common/secagg/_lom.py
def aggregate(self, list_y_u_tau: List[int]) -> List[int]:
"""
Aggregates multiple vectors into a single vector.
Args:
list_y_u_tau: A dictionary of vectors from different nodes.
Returns:
The aggregated vector.
"""
list_y_u_tau = np.array(list_y_u_tau, dtype=self._vector_dtype)
decrypted_vector = np.sum(list_y_u_tau, axis=0)
decrypted_vector = decrypted_vector.astype(np.int32)
decrypted_vector = decrypted_vector.tolist()
return decrypted_vector
protect
protect(node_id, pairwise_secrets, tau, x_u_tau, node_ids)
Protects the input vector by applying a mask based on pairwise secrets.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_id | str | Id of the node that applies encryption | required |
pairwise_secrets | Dict[str, bytes] | DH agreed secrets between node that applies encryption and others | required |
tau | int | The current round number. | required |
x_u_tau | List[int] | The input vector to be protected. | required |
node_ids | List[str] | A list of node IDs participates aggregation. | required |
Raises:
Type | Description |
---|---|
FedBioMedError | raises if the input vector |
Returns:
Type | Description |
---|---|
List[int] | The protected (masked) vector. |
Source code in fedbiomed/common/secagg/_lom.py
def protect(
self,
node_id: str,
pairwise_secrets: Dict[str, bytes],
tau: int,
x_u_tau: List[int],
node_ids: List[str]
) -> List[int]:
"""
Protects the input vector by applying a mask based on pairwise secrets.
Args:
node_id: Id of the node that applies encryption
pairwise_secrets: DH agreed secrets between node that applies encryption and others
tau: The current round number.
x_u_tau: The input vector to be protected.
node_ids: A list of node IDs participates aggregation.
Raises:
FedBioMedError: raises if the input vector `x_u_tau` contains any
values that exceed `32 - log_2(numner_of_nodes)`, where `32` is
the number of bit for each value (`uint32`). Not respecting the above condition
can lead to computation overflow.
Returns:
The protected (masked) vector.
"""
num_nodes = len(node_ids)
_max_bit_length = self._values_bit - math.log2(num_nodes)
# FIXME: is a ceiling missing in this equation (ceil(math.log2(num_nodes)))
if any(val.bit_length() > _max_bit_length for val in x_u_tau):
raise FedbiomedSecaggError(
f"{ErrorNumbers.FB417.value}: Bit length of one or more values of input vector has more bits "
f"that {_max_bit_length}. This could lead to computation overflow"
)
x_u_tau = np.array(x_u_tau, dtype=self._vector_dtype)
mask = np.zeros(len(x_u_tau), dtype=self._vector_dtype)
for pair_id in node_ids:
if pair_id == node_id:
continue
secret = pairwise_secrets[pair_id]
pairwise_seed = self._prf.eval_key(
pairwise_secret=secret,
tau=tau)
# print(len(pairwise_seed))
pairwise_vector = self._prf.eval_vector(
seed=pairwise_seed,
tau=tau,
input_size=len(x_u_tau))
pairwise_vector = np.frombuffer(pairwise_vector, dtype=self._vector_dtype)
if pair_id < node_id:
mask += pairwise_vector
else:
mask -= pairwise_vector
encrypted_params = mask + x_u_tau
return encrypted_params.tolist()
PRF
PRF(nonce)
Pseudorandom Function (PRF) class using the ChaCha20 stream cipher.
Attributes:
Name | Type | Description |
---|---|---|
nonce | bytes | A 12-byte nonce used for encryption. |
Source code in fedbiomed/common/secagg/_lom.py
def __init__(self, nonce: bytes) -> None:
self._nonce = nonce
Functions
eval_key
eval_key(pairwise_secret, tau)
Evaluates a pseudorandom key for a given round.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pairwise_secret | bytes | A secret key shared between nodes. | required |
round | int | The current round number. | required |
Returns:
Name | Type | Description |
---|---|---|
bytes | bytes | A 32-byte pseudorandom key. |
Source code in fedbiomed/common/secagg/_lom.py
def eval_key(self, pairwise_secret: bytes, tau: int) -> bytes:
"""
Evaluates a pseudorandom key for a given round.
Args:
pairwise_secret (bytes): A secret key shared between nodes.
round (int): The current round number.
Returns:
bytes: A 32-byte pseudorandom key.
"""
tau = tau.to_bytes(16, 'big')
try:
encryptor = Cipher(
algorithms.ChaCha20(pairwise_secret, self._nonce),
mode=None,
backend=default_backend()
).encryptor()
except ValueError as ve:
raise FedbiomedSecaggError(f"{ErrorNumbers.FB417.value}: Error while ciphering: got exception {ve}")
c = encryptor.update(tau) + encryptor.finalize()
# the output is a 16 bytes string, pad it to 32 bytes
c = c + b'\x00' * 16
return c
eval_vector
eval_vector(seed, tau, input_size)
Evaluates a pseudorandom vector based on the seed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
seed | bytes | A 32-byte seed for generating the vector. | required |
tau | int | The current round number. | required |
input_size | int | The size of the input vector. | required |
Returns:
Name | Type | Description |
---|---|---|
bytes | bytes | A pseudorandom vector of the specified size. |
Source code in fedbiomed/common/secagg/_lom.py
def eval_vector(self, seed: bytes, tau: int, input_size: int) -> bytes:
"""
Evaluates a pseudorandom vector based on the seed.
Args:
seed (bytes): A 32-byte seed for generating the vector.
tau (int): The current round number.
input_size (int): The size of the input vector.
Returns:
bytes: A pseudorandom vector of the specified size.
"""
encryptor = Cipher(
algorithms.ChaCha20(seed, self._nonce),
mode=None,
backend=default_backend()
).encryptor()
# TODO: Better handling limits for secure aggregation
if not (input_size + _MAX_ROUND) <= 2**32:
raise FedbiomedSecaggError(
f"{ErrorNumbers.FB417.value}: Can not perform encryiton due to large input vector. input_size "
f"({input_size}) + MAX_ROUND ({_MAX_ROUND}) allowed is greater than 2**32"
)
# create a list of indices from 0 to input_size where each element is concatenated with tau
taus = b''.join([(i + tau).to_bytes(4, 'big') for i in range(input_size)])
return encryptor.update(taus) + encryptor.finalize()
SecaggCrypter
SecaggCrypter()
Secure aggregation encryption and decryption manager.
This class is responsible for encrypting model parameters using Joye-Libert secure aggregation scheme. It also aggregates encrypted model parameters and decrypts to retrieve final model parameters as vector. This vector can be loaded into model by converting it proper format for the framework.
Source code in fedbiomed/common/secagg/_secagg_crypter.py
def __init__(self) -> None:
"""Constructs ParameterEncrypter"""
self._jls = JoyeLibert()
Functions
aggregate
aggregate(current_round, num_nodes, params, key, biprime, total_sample_size, clipping_range=None, num_expected_params=1)
Decrypt given parameters
Parameters:
Name | Type | Description | Default |
---|---|---|---|
current_round | int | The round that the aggregation will be done | required |
params | List[List[int]] | Aggregated/Summed encrypted parameters | required |
num_nodes | int | number of nodes | required |
key | int | The key that will be used for decryption | required |
biprime | int | Biprime number of | required |
total_sample_size | int | sum of number of samples from all nodes | required |
clipping_range | Union[int, None] | Clipping range for reverse-quantization, should be the same clipping range used for quantization | None |
num_expected_params | int | number of parameters to decode from the | 1 |
Returns: Aggregated parameters decrypted and structured
Raises:
Type | Description |
---|---|
FedbiomedSecaggCrypterError | bad parameters |
FedbiomedSecaggCrypterError | aggregation issue |
Source code in fedbiomed/common/secagg/_secagg_crypter.py
def aggregate(
self,
current_round: int,
num_nodes: int,
params: List[List[int]],
key: int,
biprime: int,
total_sample_size: int,
clipping_range: Union[int, None] = None,
num_expected_params: int = 1
) -> List[float]:
"""Decrypt given parameters
Args:
current_round: The round that the aggregation will be done
params: Aggregated/Summed encrypted parameters
num_nodes: number of nodes
key: The key that will be used for decryption
biprime: Biprime number of `PublicParam`
total_sample_size: sum of number of samples from all nodes
clipping_range: Clipping range for reverse-quantization, should be the
same clipping range used for quantization
num_expected_params: number of parameters to decode from the `params`
Returns:
Aggregated parameters decrypted and structured
Raises:
FedbiomedSecaggCrypterError: bad parameters
FedbiomedSecaggCrypterError: aggregation issue
"""
start = time.process_time()
if len(params) != num_nodes:
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value}: Num of parameters that are received from nodes "
f"does not match the number of nodes has been set for the encrypter. There might "
f"be some nodes did not answered to training request or num of clients of "
"`ParameterEncrypter` has not been set properly before train request.")
if not isinstance(params, list) or not all(isinstance(p, list) for p in params):
raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: The parameters to aggregate should be a "
f"list containing list of parameters")
if not all(all(isinstance(p_, int) for p_ in p) for p in params):
raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: Invalid parameter type. The parameters "
f"should be of type of integers.")
# TODO provide dynamically created biprime. Biprime that is used
# on the node-side should matched the one used for decryption
public_param = self._setup_public_param(biprime=biprime)
key = ServerKey(public_param, key)
params = self._convert_to_encrypted_number(params, public_param)
try:
sum_of_weights = self._jls.aggregate(
sk_0=key,
tau=current_round, # The time period \\(\\tau\\)
list_y_u_tau=params,
num_expected_params=num_expected_params
)
except (ValueError, TypeError) as e:
raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624.value}: The aggregation of encrypted parameters "
f"is not successful: {e}")
# Reverse quantize and division (averaging)
logger.info(f"Aggregating {len(params)} parameters from {num_nodes} nodes.")
aggregated_params = self._apply_average(sum_of_weights, total_sample_size)
aggregated_params: List[float] = reverse_quantize(
aggregated_params,
clipping_range=clipping_range
)
time_elapsed = time.process_time() - start
logger.debug(f"Aggregation is completed in {round(time_elapsed, ndigits=2)} seconds.")
return aggregated_params
encrypt
encrypt(num_nodes, current_round, params, key, biprime, clipping_range=None, weight=None)
Encrypts model parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_nodes | int | Number of nodes that is expected to encrypt parameters for aggregation | required |
current_round | int | Current round of federated training | required |
params | List[float] | List of flatten parameters | required |
key | int | Key to encrypt | required |
biprime | int | Prime number to create public parameter | required |
weight | Optional[int] | Weight for the params | None |
clipping_range | Union[int, None] | Clipping-range for quantization of float model parameters. Clipping range must grater than minimum model parameters | None |
Returns:
Type | Description |
---|---|
List[int] | List of encrypted parameters |
Raises:
Type | Description |
---|---|
FedbiomedSecaggCrypterError | bad parameters |
FedbiomedSecaggCrypterError | encryption issue |
Source code in fedbiomed/common/secagg/_secagg_crypter.py
def encrypt(
self,
num_nodes: int,
current_round: int,
params: List[float],
key: int,
biprime: int,
clipping_range: Union[int, None] = None,
weight: Optional[int] = None,
) -> List[int]:
"""Encrypts model parameters.
Args:
num_nodes: Number of nodes that is expected to encrypt parameters for aggregation
current_round: Current round of federated training
params: List of flatten parameters
key: Key to encrypt
biprime: Prime number to create public parameter
weight: Weight for the params
clipping_range: Clipping-range for quantization of float model parameters. Clipping range
must grater than minimum model parameters
Returns:
List of encrypted parameters
Raises:
FedbiomedSecaggCrypterError: bad parameters
FedbiomedSecaggCrypterError: encryption issue
"""
start = time.process_time()
if not isinstance(params, list):
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value}: Expected argument `params` type list but got {type(params)}"
)
if not all([isinstance(p, float) for p in params]):
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value}: The parameters to encrypt should list of floats. "
f"There are one or more than a value that is not type of float."
)
# Make use the key is instance of
if not isinstance(key, int):
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value}: The argument `key` must be integer"
)
# first we quantize the parameters, and we get params in the range [0, 2^VEParameters.TARGET_RANGE]
params = quantize(weights=params,
clipping_range=clipping_range)
# We multiply the parameters with the weight, and we get params in
# the range [0, 2^(log2(VEParameters.TARGET_RANGE) + log2(VEParameters.WEIGHT_RANGE)) - 1]
# Check if weight if num_bits of weight is less than VEParameters.WEIGHT_RANGE
if weight is not None:
if 2**weight.bit_length() > SAParameters.WEIGHT_RANGE:
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value}: The weight is too large. The weight should be less than "
f"{SAParameters.WEIGHT_RANGE}, but got {weight}"
)
params = self._apply_weighting(params, weight)
public_param = self._setup_public_param(biprime=biprime)
# Instantiates UserKey object
key = UserKey(public_param, key)
try:
# Encrypt parameters
encrypted_params: List[mpz] = self._jls.protect(
public_param=public_param,
user_key=key,
tau=current_round,
x_u_tau=params,
n_users=num_nodes
)
except (TypeError, ValueError) as exp:
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value} Error during parameter encryption. {exp}") from exp
time_elapsed = time.process_time() - start
logger.debug(f"Encryption of the parameters took {time_elapsed} seconds.")
return [int(e_p) for e_p in encrypted_params]
SecaggLomCrypter
SecaggLomCrypter(nonce=None)
Bases: SecaggCrypter
Low-Overhead Masking secure aggregation controller
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nonce | str | None |
| None |
Source code in fedbiomed/common/secagg/_secagg_crypter.py
def __init__(
self,
nonce: str | None = None
):
"""LOM Secure aggregation to encrypt and aggregate
Args:
nonce: `nonce` to use in encryption
"""
if nonce:
random.seed(nonce)
nonce = random.getrandbits(128).to_bytes(16, 'big')
self._lom = LOM(nonce)
Functions
aggregate
aggregate(params, total_sample_size, clipping_range=None)
Decrypt given parameters
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params | List[List[int]] | Aggregated/Summed encrypted parameters | required |
total_sample_size | int | sum of number of samples from all nodes | required |
clipping_range | Union[int, None] | Clipping range for reverse-quantization, should be the same clipping range used for quantization | None |
Returns: Aggregated parameters decrypted and structured
Raises:
Type | Description |
---|---|
FedbiomedSecaggCrypterError | bad parameters |
FedbiomedSecaggCrypterError | aggregation issue |
Source code in fedbiomed/common/secagg/_secagg_crypter.py
def aggregate(
self,
params: List[List[int]],
total_sample_size: int,
clipping_range: Union[int, None] = None,
) -> List[float]:
"""Decrypt given parameters
Args:
params: Aggregated/Summed encrypted parameters
total_sample_size: sum of number of samples from all nodes
clipping_range: Clipping range for reverse-quantization, should be the
same clipping range used for quantization
Returns:
Aggregated parameters decrypted and structured
Raises:
FedbiomedSecaggCrypterError: bad parameters
FedbiomedSecaggCrypterError: aggregation issue
"""
start = time.process_time()
if not isinstance(params, list) or not all(isinstance(p, list) for p in params):
raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: The parameters to aggregate should be a "
f"list containing list of parameters")
if not all(all(isinstance(p_, int) for p_ in p) for p in params):
raise FedbiomedSecaggCrypterError(f"{ErrorNumbers.FB624}: Invalid parameter type. The parameters "
f"should be of type of integers.")
num_nodes = len(params)
try:
sum_of_weights = self._lom.aggregate(
list_y_u_tau=params,
)
except (ValueError, TypeError) as e:
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value}: The aggregation of encrypted parameters "
f"is not successful: {e}") from e
# Reverse quantize and division (averaging)
logger.info(f"Aggregating {len(params)} parameters from {num_nodes} nodes.")
aggregated_params = self._apply_average(sum_of_weights, total_sample_size)
aggregated_params: List[float] = reverse_quantize(
aggregated_params,
clipping_range=clipping_range
)
time_elapsed = time.process_time() - start
logger.debug(f"Aggregation is completed in {round(time_elapsed, ndigits=2)} seconds.")
return aggregated_params
encrypt
encrypt(current_round, node_id, params, pairwise_secrets, node_ids, clipping_range=None, weight=None)
Encrypts model parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
current_round | int | Current round of federated training | required |
node_id | str | ID of the node applies encryption | required |
params | List[float] | List of flatten parameters | required |
pairwise_secrets | Dict[str, bytes] | DH agreed secrets between node that applies encryption and others | required |
node_ids | List[str] | All nodes that participates secure aggregation | required |
weight | Optional[int] | Weight for the params | None |
clipping_range | Union[int, None] | Clipping-range for quantization of float model parameters. Clipping range must grater than minimum model parameters | None |
Returns:
Type | Description |
---|---|
List[int] | List of encrypted parameters |
Raises:
Type | Description |
---|---|
FedbiomedSecaggCrypterError | bad parameters |
FedbiomedSecaggCrypterError | encryption issue |
Source code in fedbiomed/common/secagg/_secagg_crypter.py
def encrypt(
self,
current_round: int,
node_id: str,
params: List[float],
pairwise_secrets: Dict[str, bytes],
node_ids: List[str],
clipping_range: Union[int, None] = None,
weight: Optional[int] = None,
) -> List[int]:
"""Encrypts model parameters.
Args:
current_round: Current round of federated training
node_id: ID of the node applies encryption
params: List of flatten parameters
pairwise_secrets: DH agreed secrets between node that applies encryption and others
node_ids: All nodes that participates secure aggregation
weight: Weight for the params
clipping_range: Clipping-range for quantization of float model parameters. Clipping range
must grater than minimum model parameters
Returns:
List of encrypted parameters
Raises:
FedbiomedSecaggCrypterError: bad parameters
FedbiomedSecaggCrypterError: encryption issue
"""
start = time.process_time()
if not isinstance(params, list):
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value}: Expected argument `params` type list but got {type(params)}"
)
if not all(isinstance(p, float) for p in params):
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value}: The parameters to encrypt should list of floats. "
f"There are one or more than a value that is not type of float."
)
params = quantize(weights=params,
clipping_range=clipping_range)
if weight is not None:
if 2**weight.bit_length() > SAParameters.WEIGHT_RANGE:
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value}: The weight is too large. The weight should be less than "
f"{SAParameters.WEIGHT_RANGE}."
)
params = self._apply_weighting(params, weight)
try:
# Encrypt parameters
encrypted_params: List[int] = self._lom.protect(
pairwise_secrets=pairwise_secrets,
node_id=node_id,
tau=current_round,
x_u_tau=params,
node_ids=node_ids
)
except (TypeError, ValueError) as exp:
raise FedbiomedSecaggCrypterError(
f"{ErrorNumbers.FB624.value} Error during parameter encryption. {exp}") from exp
time_elapsed = time.process_time() - start
logger.debug(f"Encryption of the parameters took {time_elapsed} seconds.")
return encrypted_params