Attributes
Classes
CertificateManager
CertificateManager(db_path=None)
Certificate manager to manage certificates of parties
Attrs
_db: TinyDB database to store certificates
Parameters:
Name | Type | Description | Default |
---|---|---|---|
db | The name of the DB file to connect through TinyDB | required |
Source code in fedbiomed/common/certificate_manager.py
def __init__(self, db_path: str = None):
"""Constructs certificate manager
Args:
db: The name of the DB file to connect through TinyDB
"""
self._db: Union[Table, None] = None
self._query: Query = Query()
if db_path is not None:
self.set_db(db_path)
Functions
delete
delete(party_id)
Deletes given party from table
Parameters:
Name | Type | Description | Default |
---|---|---|---|
party_id | Party id | required |
Returns:
Type | Description |
---|---|
List[int] | The document IDs of deleted certificates |
Source code in fedbiomed/common/certificate_manager.py
def delete(
self,
party_id
) -> List[int]:
"""Deletes given party from table
Args:
party_id: Party id
Returns:
The document IDs of deleted certificates
"""
return self._db.remove(self._query.party_id == party_id)
generate_self_signed_ssl_certificate staticmethod
generate_self_signed_ssl_certificate(certificate_folder, certificate_name=MPSPDZ_certificate_prefix, component_id='unknown', subject=None)
Creates self-signed certificates
Parameters:
Name | Type | Description | Default |
---|---|---|---|
certificate_folder | The path where certificate files | required | |
certificate_name | str | Name of the certificate file. | MPSPDZ_certificate_prefix |
component_id | str | ID of the component | 'unknown' |
Returns:
Name | Type | Description |
---|---|---|
private_key | str | Private key file |
public_key | str | Certificate file |
Raises:
Type | Description |
---|---|
FedbiomedCertificateError | If certificate directory is invalid or an error occurs while writing certificate files in given path. |
Certificate files
Certificate files will be saved in the given directory as `certificates.key` for private key
`certificate.pem` for public key.
Source code in fedbiomed/common/certificate_manager.py
@staticmethod
def generate_self_signed_ssl_certificate(
certificate_folder,
certificate_name: str = MPSPDZ_certificate_prefix,
component_id: str = "unknown",
subject: Optional[Dict[str, str]] = None
) -> Tuple[str, str]:
"""Creates self-signed certificates
Args:
certificate_folder: The path where certificate files `.pem` and `.key` will be saved. Path should be
absolute.
certificate_name: Name of the certificate file.
component_id: ID of the component
Returns:
private_key: Private key file
public_key: Certificate file
Raises:
FedbiomedCertificateError: If certificate directory is invalid or an error occurs while writing certificate
files in given path.
!!! info "Certificate files"
Certificate files will be saved in the given directory as `certificates.key` for private key
`certificate.pem` for public key.
"""
subject = subject or {}
if not os.path.abspath(certificate_folder):
raise FedbiomedCertificateError(
f"{ErrorNumbers.FB619.value}: Certificate path should be absolute: {certificate_folder}"
)
if not os.path.isdir(certificate_folder):
raise FedbiomedCertificateError(
f"{ErrorNumbers.FB619.value}: Certificate path is not valid: {certificate_folder}"
)
pkey = crypto.PKey()
pkey.generate_key(crypto.TYPE_RSA, 2048)
cn = subject.get('CommonName', '*')
on = subject.get('OrganizationName', component_id)
x509 = crypto.X509()
subject = x509.get_subject()
subject.commonName = cn
subject.organizationName = on
x509.set_issuer(subject)
extensions = []
try:
if ipaddress.ip_address(cn):
extensions.append(crypto.X509Extension(
type_name=b'subjectAltName',
critical=False,
value=f'IP:{cn}'.encode('ASCII')))
except ValueError:
pass
if extensions:
x509.add_extensions(extensions)
x509.gmtime_adj_notBefore(0)
x509.gmtime_adj_notAfter(5 * 365 * 24 * 60 * 60)
x509.set_pubkey(pkey)
x509.set_serial_number(random.randrange(100000))
x509.set_version(2)
x509.sign(pkey, 'SHA256')
# Certificate names
key_file = os.path.join(certificate_folder, f"{certificate_name}.key")
pem_file = os.path.join(certificate_folder, f"{certificate_name}.pem")
try:
with open(key_file, "wb") as f:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
f.close()
except Exception as e:
raise FedbiomedCertificateError(
f"{ErrorNumbers.FB619.value}: Can not write public key: {e}"
)
try:
with open(pem_file, "wb") as f:
f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, x509))
f.close()
except Exception as e:
raise FedbiomedCertificateError(
f"{ErrorNumbers.FB619.value}: Can not write public key: {e}"
)
return key_file, pem_file
get
get(party_id)
Gets certificate/public key of given party
Parameters:
Name | Type | Description | Default |
---|---|---|---|
party_id | str | ID of the party which certificate will be retrieved from DB | required |
Returns:
Type | Description |
---|---|
Union[dict, None] | Certificate, dict like TinyDB document |
Source code in fedbiomed/common/certificate_manager.py
def get(
self,
party_id: str
) -> Union[dict, None]:
"""Gets certificate/public key of given party
Args:
party_id: ID of the party which certificate will be retrieved from DB
Returns:
Certificate, dict like TinyDB document
"""
v = self._db.get(self._query.party_id == party_id)
return v
insert
insert(certificate, party_id, component, ip, port, upsert=False)
Inserts new certificate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
certificate | str | Public-key for the FL parties | required |
party_id | str | ID of the party | required |
component | str | Node or researcher, | required |
ip | str | MP-SPDZ IP of the component which the certificate will be registered | required |
port | str | MP-SPDZ Port of the component which the certificate will be registered | required |
upsert | bool | Update document with new data if it is existing | False |
Returns:
Type | Description |
---|---|
Union[int, list[int]] | Document ID of inserted certificate |
Raises:
Type | Description |
---|---|
FedbiomedCertificateError | party is already registered |
Source code in fedbiomed/common/certificate_manager.py
def insert(
self,
certificate: str,
party_id: str,
component: str,
ip: str,
port: str,
upsert: bool = False
) -> Union[int, list[int]]:
""" Inserts new certificate
Args:
certificate: Public-key for the FL parties
party_id: ID of the party
component: Node or researcher,
ip: MP-SPDZ IP of the component which the certificate will be registered
port: MP-SPDZ Port of the component which the certificate will be registered
upsert: Update document with new data if it is existing
Returns:
Document ID of inserted certificate
Raises:
FedbiomedCertificateError: party is already registered
"""
certificate_ = self.get(party_id=party_id)
if not certificate_:
return self._db.insert(dict(
certificate=certificate,
party_id=party_id,
component=component,
ip=ip,
port=port
))
elif upsert:
return self._db.upsert(
dict(
certificate=certificate,
component=component,
party_id=party_id,
ip=ip,
port=port
),
self._query.party_id == party_id
)
else:
raise FedbiomedCertificateError(
f"{ErrorNumbers.FB619.value}: Party {party_id} already registered. Please use `upsert=True` or "
f"'--upsert' option through CLI")
list
list(verbose=False)
Lists registered certificates.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
verbose | bool | Prints list of registered certificates in tabular format | False |
Returns:
Type | Description |
---|---|
List[dict] | List of certificate objects registered in DB |
Source code in fedbiomed/common/certificate_manager.py
def list(self, verbose: bool = False) -> List[dict]:
""" Lists registered certificates.
Args:
verbose: Prints list of registered certificates in tabular format
Returns:
List of certificate objects registered in DB
"""
certificates = self._db.all()
if verbose:
to_print = copy.deepcopy(certificates)
for doc in to_print:
doc.pop('certificate')
print(tabulate(to_print, headers='keys'))
return certificates
register_certificate
register_certificate(certificate_path, party_id, ip, port, upsert=False)
Registers certificate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
certificate_path | str | Path where certificate/key file stored | required |
party_id | str | ID of the FL party which the certificate will be registered | required |
ip | str | MP-SPDZ IP address of the party where MP-SPDZ create communication | required |
port | int | MP-SPDZ port of the party | required |
upsert | bool | If | False |
Returns:
Type | Description |
---|---|
Union[int, List[int]] | The document ID of registered certificated. |
Raises:
Type | Description |
---|---|
FedbiomedCertificateError | If certificate file is not existing in file system |
Source code in fedbiomed/common/certificate_manager.py
def register_certificate(
self,
certificate_path: str,
party_id: str,
ip: str,
port: int,
upsert: bool = False
) -> Union[int, List[int]]:
""" Registers certificate
Args:
certificate_path: Path where certificate/key file stored
party_id: ID of the FL party which the certificate will be registered
ip: MP-SPDZ IP address of the party where MP-SPDZ create communication
port: MP-SPDZ port of the party
upsert: If `True` overwrites existing certificate for specified party. If `False` and the certificate for
the specified party already existing it raises error.
Returns:
The document ID of registered certificated.
Raises:
FedbiomedCertificateError: If certificate file is not existing in file system
"""
if not os.path.isfile(certificate_path):
raise FedbiomedCertificateError(
f"{ErrorNumbers.FB619.value}: Certificate path does not represents a file."
)
# Read certificate content
with open(certificate_path) as file:
certificate_content = file.read()
file.close()
# Save certificate in database
component = ComponentType.NODE.name if party_id.startswith(NODE_PREFIX) else ComponentType.RESEARCHER.name
return self.insert(
certificate=certificate_content,
party_id=party_id,
ip=ip,
port=port,
component=component,
upsert=upsert,
)
set_db
set_db(db_path)
Sets database
Parameters:
Name | Type | Description | Default |
---|---|---|---|
db_path | str | The path of DB file where | required |
Source code in fedbiomed/common/certificate_manager.py
def set_db(self, db_path: str) -> None:
"""Sets database
Args:
db_path: The path of DB file where `Certificates` table are stored
"""
db = TinyDB(db_path)
db.table_class = DBTable
self._db: Table = db.table("Certificates")
write_mpc_certificates_for_experiment
write_mpc_certificates_for_experiment(parties, path_certificates, path_ips, self_id, self_ip, self_port, self_private_key, self_public_key)
Writes certificates into given directory respecting the order
Certificate Naming Convention
MP-SPDZ requires saving certificates respecting the naming convention `P<PARTY_ID>.pem`. Party ID should
be integer in the order of [0,1, ...]. Therefore, the order of parties are critical in the sense of
naming files in given folder path. Files will be named as `P[ORDER].pem` to make it compatible with
MP-SPDZ.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parties | List[str] | ID of the parties (nodes/researchers) will join FL experiment.y | required |
path_certificates | str | The path where certificate files will be writen | required |
path_ips | str | Path where ip addresses will be saved | required |
self_id | str | ID of the component that will launch MP-SPDZ protocol | required |
self_ip | str | IP of the component that will launch MP-SPDZ protocol | required |
self_port | int | Port of the component that will launch MP-SPDZ protocol | required |
self_private_key | str | Path to MPSPDZ public key | required |
self_public_key | str | Path to MPSDPZ private key | required |
Returns:
Type | Description |
---|---|
str | Path where ip addresses are saved |
List[str] | List of written certificates files (paths). |
Raises:
Type | Description |
---|---|
FedbiomedCertificateError |
|
Source code in fedbiomed/common/certificate_manager.py
def write_mpc_certificates_for_experiment(
self,
parties: List[str],
path_certificates: str,
path_ips: str,
self_id: str,
self_ip: str,
self_port: int,
self_private_key: str,
self_public_key: str
) -> Tuple[str, List[str]]:
""" Writes certificates into given directory respecting the order
!!! info "Certificate Naming Convention"
MP-SPDZ requires saving certificates respecting the naming convention `P<PARTY_ID>.pem`. Party ID should
be integer in the order of [0,1, ...]. Therefore, the order of parties are critical in the sense of
naming files in given folder path. Files will be named as `P[ORDER].pem` to make it compatible with
MP-SPDZ.
Args:
parties: ID of the parties (nodes/researchers) will join FL experiment.y
path_certificates: The path where certificate files will be writen
path_ips: Path where ip addresses will be saved
self_id: ID of the component that will launch MP-SPDZ protocol
self_ip: IP of the component that will launch MP-SPDZ protocol
self_port: Port of the component that will launch MP-SPDZ protocol
self_private_key: Path to MPSPDZ public key
self_public_key: Path to MPSDPZ private key
Returns:
Path where ip addresses are saved
List of written certificates files (paths).
Raises:
FedbiomedCertificateError: - If certificate for given party is not existing in the database
- If given path is not a directory
"""
if not os.path.isdir(path_certificates):
raise FedbiomedCertificateError(
f"{ErrorNumbers.FB619.value}: Specified `path_certificates` argument should be a directory. "
f"`path_certificates` is not a directory or it is not existing. {path_certificates}"
)
if not os.path.isdir(path_ips):
raise FedbiomedCertificateError(
f"{ErrorNumbers.FB619.value}: Specified `path_ips` argument should be a directory. `path_ips` is not a "
f"directory or it is not existing. {path_ips}"
)
path_certificates = os.path.abspath(path_certificates)
path_ips = os.path.abspath(path_ips)
self_private_key = os.path.abspath(self_private_key)
self_public_key = os.path.abspath(self_public_key)
ip_addresses = os.path.join(path_ips, "ip_addresses")
# Files already writen into directory
writen_certificates = []
# Function remove writen files in case of error
def remove_writen_files():
for wf in writen_certificates:
if os.path.isfile(wf):
os.remove(wf)
if os.path.isfile(ip_addresses):
os.remove(ip_addresses)
if os.path.isfile(ip_addresses):
os.remove(ip_addresses)
# Get certificate for each party
try:
for index, party in enumerate(parties):
# Self certificate requires to
if party == self_id:
self_certificate_key = read_file(self_private_key)
self_certificate_pem = read_file(self_public_key)
key = os.path.join(path_certificates, f"P{index}.key")
pem = os.path.join(path_certificates, f"P{index}.pem")
self._write_certificate_file(key, self_certificate_key)
self._write_certificate_file(pem, self_certificate_pem)
self._append_new_ip_address(ip_addresses, self_ip, self_port, self_id)
writen_certificates.extend([key, pem])
continue
# Remote parties
party_object = self.get(party)
if not party_object:
remove_writen_files()
raise FedbiomedCertificateError(
f"{ErrorNumbers.FB619.value}: Certificate for {party} is not existing. Certificates of "
f"each federated training participant should be present. {self_id} should register certificate "
f"of {party}."
)
path_ = os.path.join(path_certificates, f"P{index}.pem")
self._write_certificate_file(path_, party_object["certificate"])
writen_certificates.append(path_)
self._append_new_ip_address(
ip_addresses,
party_object["ip"],
party_object["port"],
party_object["party_id"]
)
except FedbiomedCertificateError as e:
# Remove all writen file in case of an error
remove_writen_files()
raise FedbiomedCertificateError(e)
except FedbiomedError as e:
remove_writen_files()
raise FedbiomedCertificateError(f"{ErrorNumbers.FB619.value}: {e}")
except Exception as e:
remove_writen_files()
raise FedbiomedCertificateError(f"{ErrorNumbers.FB619.value}: Undetermined error: {e}")
return ip_addresses, writen_certificates
Functions
generate_certificate
generate_certificate(root, component_id, prefix=None, subject=None)
Generates certificates
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id | ID of the component for which the certificate will be generated | required |
Returns:
Name | Type | Description |
---|---|---|
key_file | str | The path where private key file is saved |
pem_file | str | The path where public key file is saved |
Raises:
Type | Description |
---|---|
FedbiomedEnvironError | If certificate directory for the component has already |
Source code in fedbiomed/common/certificate_manager.py
def generate_certificate(
root,
component_id,
prefix: Optional[str] = None,
subject: Optional[Dict[str, str]] = None
) -> Tuple[str, str]:
"""Generates certificates
Args:
component_id: ID of the component for which the certificate will be generated
Returns:
key_file: The path where private key file is saved
pem_file: The path where public key file is saved
Raises:
FedbiomedEnvironError: If certificate directory for the component has already `certificate.pem` or
`certificate.key` files generated.
"""
certificate_path = os.path.join(root, CERTS_FOLDER_NAME, f"cert_{component_id}")
if os.path.isdir(certificate_path) \
and (os.path.isfile(os.path.join(certificate_path, "certificate.key")) or
os.path.isfile(os.path.join(certificate_path, "certificate.pem"))):
raise ValueError(f"Certificate generation is aborted. Directory {certificate_path} has already "
f"certificates. Please remove those files to regenerate")
else:
os.makedirs(certificate_path, exist_ok=True)
key_file, pem_file = CertificateManager.generate_self_signed_ssl_certificate(
certificate_folder=certificate_path,
certificate_name=prefix if prefix else '',
component_id=component_id,
subject=subject
)
return key_file, pem_file
retrieve_ip_and_port
retrieve_ip_and_port(root, new=False, increment=None)
Source code in fedbiomed/common/certificate_manager.py
def retrieve_ip_and_port(
root: str,
new: bool = False,
increment: Union[int, None] = None
) -> Tuple[str, str]:
# Get IP from global environment
ip = os.getenv('MPSPDZ_IP', "localhost")
increment_file = os.path.join(root, CONFIG_FOLDER_NAME, 'port_increment')
if os.path.isfile(increment_file) and new is False:
with open(increment_file, "r+") as file:
port_increment = file.read()
if port_increment != "":
port = int(port_increment) + 1
file.truncate(0)
file.close()
# Renew port in the file
_ = retrieve_ip_and_port(
root,
new=True,
increment=port)
else:
_, port = retrieve_ip_and_port(
root,
new=True)
else:
with open(increment_file, "w") as file:
port = os.getenv('MPSPDZ_PORT', 14000) if increment is None else increment
file.write(f"{port}")
file.close()
return ip, port