Certificate Manager

Attributes

Classes

CertificateManager

CertificateManager(db_path=None)

Certificate manager to manage certificates of parties

Attrs

_db: TinyDB database to store certificates

Parameters:

Name Type Description Default
db

The name of the DB file to connect through TinyDB

required
Source code in fedbiomed/common/certificate_manager.py
def __init__(self, db_path: str = None):
    """Constructs certificate manager

    Args:
        db: The name of the DB file to connect through TinyDB
    """

    self._db: Union[Table, None] = None
    self._query: Query = Query()

    if db_path is not None:
        self.set_db(db_path)

Functions

delete
delete(party_id)

Deletes given party from table

Parameters:

Name Type Description Default
party_id

Party id

required

Returns:

Type Description
List[int]

The document IDs of deleted certificates

Source code in fedbiomed/common/certificate_manager.py
def delete(
        self,
        party_id
) -> List[int]:
    """Deletes given party from table

    Args:
        party_id: Party id

    Returns:
        The document IDs of deleted certificates
    """

    return self._db.remove(self._query.party_id == party_id)
generate_self_signed_ssl_certificate staticmethod
generate_self_signed_ssl_certificate(certificate_folder, certificate_name=MPSPDZ_certificate_prefix, component_id='unknown', subject=None)

Creates self-signed certificates

Parameters:

Name Type Description Default
certificate_folder

The path where certificate files .pem and .key will be saved. Path should be absolute.

required
certificate_name str

Name of the certificate file.

MPSPDZ_certificate_prefix
component_id str

ID of the component

'unknown'

Returns:

Name Type Description
private_key str

Private key file

public_key str

Certificate file

Raises:

Type Description
FedbiomedCertificateError

If certificate directory is invalid or an error occurs while writing certificate files in given path.

Certificate files

Certificate files will be saved in the given directory as `certificates.key` for private key
`certificate.pem` for public key.
Source code in fedbiomed/common/certificate_manager.py
@staticmethod
def generate_self_signed_ssl_certificate(
        certificate_folder,
        certificate_name: str = MPSPDZ_certificate_prefix,
        component_id: str = "unknown",
        subject: Optional[Dict[str, str]] = None
) -> Tuple[str, str]:
    """Creates self-signed certificates

    Args:
        certificate_folder: The path where certificate files `.pem` and `.key` will be saved. Path should be
            absolute.
        certificate_name: Name of the certificate file.
        component_id: ID of the component

    Returns:
        private_key: Private key file
        public_key: Certificate file

    Raises:
        FedbiomedCertificateError: If certificate directory is invalid or an error occurs while writing certificate
            files in given path.

    !!! info "Certificate files"
            Certificate files will be saved in the given directory as `certificates.key` for private key
            `certificate.pem` for public key.
    """
    subject = subject or {}

    if not os.path.abspath(certificate_folder):
        raise FedbiomedCertificateError(
            f"{ErrorNumbers.FB619.value}: Certificate path should be absolute: {certificate_folder}"
        )

    if not os.path.isdir(certificate_folder):
        raise FedbiomedCertificateError(
            f"{ErrorNumbers.FB619.value}: Certificate path is not valid: {certificate_folder}"
        )

    pkey = crypto.PKey()
    pkey.generate_key(crypto.TYPE_RSA, 2048)


    cn = subject.get('CommonName', '*')
    on = subject.get('OrganizationName', component_id)

    x509 = crypto.X509()
    subject = x509.get_subject()
    subject.commonName = cn
    subject.organizationName = on
    x509.set_issuer(subject)

    extensions = []
    try:
        if ipaddress.ip_address(cn):
            extensions.append(crypto.X509Extension(
                type_name=b'subjectAltName',
                critical=False,
                value=f'IP:{cn}'.encode('ASCII')))
    except ValueError:
        pass
    if extensions:
        x509.add_extensions(extensions)

    x509.gmtime_adj_notBefore(0)
    x509.gmtime_adj_notAfter(5 * 365 * 24 * 60 * 60)
    x509.set_pubkey(pkey)
    x509.set_serial_number(random.randrange(100000))
    x509.set_version(2)
    x509.sign(pkey, 'SHA256')

    # Certificate names
    key_file = os.path.join(certificate_folder, f"{certificate_name}.key")
    pem_file = os.path.join(certificate_folder, f"{certificate_name}.pem")

    try:
        with open(key_file, "wb") as f:
            f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
            f.close()
    except Exception as e:
        raise FedbiomedCertificateError(
            f"{ErrorNumbers.FB619.value}: Can not write public key: {e}"
        )

    try:
        with open(pem_file, "wb") as f:
            f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, x509))
            f.close()
    except Exception as e:
        raise FedbiomedCertificateError(
            f"{ErrorNumbers.FB619.value}: Can not write public key: {e}"
        )

    return key_file, pem_file
get
get(party_id)

Gets certificate/public key of given party

Parameters:

Name Type Description Default
party_id str

ID of the party which certificate will be retrieved from DB

required

Returns:

Type Description
Union[dict, None]

Certificate, dict like TinyDB document

Source code in fedbiomed/common/certificate_manager.py
def get(
        self,
        party_id: str
) -> Union[dict, None]:
    """Gets certificate/public key  of given party

    Args:
        party_id: ID of the party which certificate will be retrieved from DB

    Returns:
        Certificate, dict like TinyDB document
    """

    v = self._db.get(self._query.party_id == party_id)
    return v
insert
insert(certificate, party_id, component, ip, port, upsert=False)

Inserts new certificate

Parameters:

Name Type Description Default
certificate str

Public-key for the FL parties

required
party_id str

ID of the party

required
component str

Node or researcher,

required
ip str

MP-SPDZ IP of the component which the certificate will be registered

required
port str

MP-SPDZ Port of the component which the certificate will be registered

required
upsert bool

Update document with new data if it is existing

False

Returns:

Type Description
Union[int, list[int]]

Document ID of inserted certificate

Raises:

Type Description
FedbiomedCertificateError

party is already registered

Source code in fedbiomed/common/certificate_manager.py
def insert(
        self,
        certificate: str,
        party_id: str,
        component: str,
        ip: str,
        port: str,
        upsert: bool = False
) -> Union[int, list[int]]:
    """ Inserts new certificate

    Args:
        certificate: Public-key for the FL parties
        party_id: ID of the party
        component: Node or researcher,
        ip: MP-SPDZ IP of the component which the certificate will be registered
        port: MP-SPDZ Port of the component which the certificate will be registered
        upsert: Update document with new data if it is existing

    Returns:
        Document ID of inserted certificate

    Raises:
        FedbiomedCertificateError: party is already registered
    """
    certificate_ = self.get(party_id=party_id)
    if not certificate_:
        return self._db.insert(dict(
            certificate=certificate,
            party_id=party_id,
            component=component,
            ip=ip,
            port=port
        ))

    elif upsert:
        return self._db.upsert(
            dict(
                certificate=certificate,
                component=component,
                party_id=party_id,
                ip=ip,
                port=port
            ),
            self._query.party_id == party_id
        )
    else:
        raise FedbiomedCertificateError(
            f"{ErrorNumbers.FB619.value}: Party {party_id} already registered. Please use `upsert=True` or "
            f"'--upsert' option through CLI")
list
list(verbose=False)

Lists registered certificates.

Parameters:

Name Type Description Default
verbose bool

Prints list of registered certificates in tabular format

False

Returns:

Type Description
List[dict]

List of certificate objects registered in DB

Source code in fedbiomed/common/certificate_manager.py
def list(self, verbose: bool = False) -> List[dict]:
    """ Lists registered certificates.

    Args:
        verbose: Prints list of registered certificates in tabular format

    Returns:
        List of certificate objects registered in DB
    """
    certificates = self._db.all()

    if verbose:
        to_print = copy.deepcopy(certificates)
        for doc in to_print:
            doc.pop('certificate')

        print(tabulate(to_print, headers='keys'))

    return certificates
register_certificate
register_certificate(certificate_path, party_id, ip, port, upsert=False)

Registers certificate

Parameters:

Name Type Description Default
certificate_path str

Path where certificate/key file stored

required
party_id str

ID of the FL party which the certificate will be registered

required
ip str

MP-SPDZ IP address of the party where MP-SPDZ create communication

required
port int

MP-SPDZ port of the party

required
upsert bool

If True overwrites existing certificate for specified party. If False and the certificate for the specified party already existing it raises error.

False

Returns:

Type Description
Union[int, List[int]]

The document ID of registered certificated.

Raises:

Type Description
FedbiomedCertificateError

If certificate file is not existing in file system

Source code in fedbiomed/common/certificate_manager.py
def register_certificate(
        self,
        certificate_path: str,
        party_id: str,
        ip: str,
        port: int,
        upsert: bool = False
) -> Union[int, List[int]]:
    """ Registers certificate

    Args:
        certificate_path: Path where certificate/key file stored
        party_id: ID of the FL party which the certificate will be registered
        ip:  MP-SPDZ IP address of the party where MP-SPDZ create communication
        port: MP-SPDZ port of the party
        upsert: If `True` overwrites existing certificate for specified party. If `False` and the certificate for
            the specified party already existing it raises error.

    Returns:
        The document ID of registered certificated.

    Raises:
        FedbiomedCertificateError: If certificate file is not existing in file system
    """

    if not os.path.isfile(certificate_path):
        raise FedbiomedCertificateError(
            f"{ErrorNumbers.FB619.value}: Certificate path does not represents a file."
        )

    # Read certificate content
    with open(certificate_path) as file:
        certificate_content = file.read()
        file.close()

    # Save certificate in database
    component = ComponentType.NODE.name if party_id.startswith(NODE_PREFIX) else ComponentType.RESEARCHER.name

    return self.insert(
        certificate=certificate_content,
        party_id=party_id,
        ip=ip,
        port=port,
        component=component,
        upsert=upsert,
    )
set_db
set_db(db_path)

Sets database

Parameters:

Name Type Description Default
db_path str

The path of DB file where Certificates table are stored

required
Source code in fedbiomed/common/certificate_manager.py
def set_db(self, db_path: str) -> None:
    """Sets database

    Args:
        db_path: The path of DB file where `Certificates` table are stored
    """
    db = TinyDB(db_path)
    db.table_class = DBTable
    self._db: Table = db.table("Certificates")
write_mpc_certificates_for_experiment
write_mpc_certificates_for_experiment(parties, path_certificates, path_ips, self_id, self_ip, self_port, self_private_key, self_public_key)

Writes certificates into given directory respecting the order

Certificate Naming Convention

MP-SPDZ requires saving certificates respecting the naming convention `P<PARTY_ID>.pem`. Party ID should
be integer in the order of [0,1, ...].  Therefore, the order of parties are critical in the sense of
naming files in given folder path. Files will be named as `P[ORDER].pem` to make it compatible with
MP-SPDZ.

Parameters:

Name Type Description Default
parties List[str]

ID of the parties (nodes/researchers) will join FL experiment.y

required
path_certificates str

The path where certificate files will be writen

required
path_ips str

Path where ip addresses will be saved

required
self_id str

ID of the component that will launch MP-SPDZ protocol

required
self_ip str

IP of the component that will launch MP-SPDZ protocol

required
self_port int

Port of the component that will launch MP-SPDZ protocol

required
self_private_key str

Path to MPSPDZ public key

required
self_public_key str

Path to MPSDPZ private key

required

Returns:

Type Description
str

Path where ip addresses are saved

List[str]

List of written certificates files (paths).

Raises:

Type Description
FedbiomedCertificateError
  • If certificate for given party is not existing in the database
  • If given path is not a directory
Source code in fedbiomed/common/certificate_manager.py
def write_mpc_certificates_for_experiment(
        self,
        parties: List[str],
        path_certificates: str,
        path_ips: str,
        self_id: str,
        self_ip: str,
        self_port: int,
        self_private_key: str,
        self_public_key: str
) -> Tuple[str, List[str]]:
    """ Writes certificates into given directory respecting the order

    !!! info "Certificate Naming Convention"
            MP-SPDZ requires saving certificates respecting the naming convention `P<PARTY_ID>.pem`. Party ID should
            be integer in the order of [0,1, ...].  Therefore, the order of parties are critical in the sense of
            naming files in given folder path. Files will be named as `P[ORDER].pem` to make it compatible with
            MP-SPDZ.

    Args:
        parties: ID of the parties (nodes/researchers) will join FL experiment.y
        path_certificates: The path where certificate files will be writen
        path_ips: Path where ip addresses will be saved
        self_id: ID of the component that will launch MP-SPDZ protocol
        self_ip: IP of the component that will launch MP-SPDZ protocol
        self_port: Port of the component that will launch MP-SPDZ protocol
        self_private_key: Path to MPSPDZ public key
        self_public_key: Path to MPSDPZ private key

    Returns:
        Path where ip addresses are saved
        List of written certificates files (paths).

    Raises:
        FedbiomedCertificateError: - If certificate for given party is not existing in the database
            - If given path is not a directory
    """

    if not os.path.isdir(path_certificates):
        raise FedbiomedCertificateError(
            f"{ErrorNumbers.FB619.value}: Specified `path_certificates` argument should be a directory. "
            f"`path_certificates` is not a directory or it is not existing. {path_certificates}"
        )

    if not os.path.isdir(path_ips):
        raise FedbiomedCertificateError(
            f"{ErrorNumbers.FB619.value}: Specified `path_ips` argument should be a directory. `path_ips` is not a "
            f"directory or it is not existing. {path_ips}"
        )


    path_certificates = os.path.abspath(path_certificates)
    path_ips = os.path.abspath(path_ips)
    self_private_key = os.path.abspath(self_private_key)
    self_public_key = os.path.abspath(self_public_key)

    ip_addresses = os.path.join(path_ips, "ip_addresses")
    # Files already writen into directory
    writen_certificates = []

    # Function remove writen files in case of error
    def remove_writen_files():
        for wf in writen_certificates:
            if os.path.isfile(wf):
                os.remove(wf)

        if os.path.isfile(ip_addresses):
            os.remove(ip_addresses)

    if os.path.isfile(ip_addresses):
        os.remove(ip_addresses)

    # Get certificate for each party
    try:
        for index, party in enumerate(parties):

            # Self certificate requires to
            if party == self_id:
                self_certificate_key = read_file(self_private_key)
                self_certificate_pem = read_file(self_public_key)

                key = os.path.join(path_certificates, f"P{index}.key")
                pem = os.path.join(path_certificates, f"P{index}.pem")

                self._write_certificate_file(key, self_certificate_key)
                self._write_certificate_file(pem, self_certificate_pem)
                self._append_new_ip_address(ip_addresses, self_ip, self_port, self_id)
                writen_certificates.extend([key, pem])

                continue

            # Remote parties
            party_object = self.get(party)
            if not party_object:
                remove_writen_files()
                raise FedbiomedCertificateError(
                    f"{ErrorNumbers.FB619.value}: Certificate for {party} is not existing. Certificates  of "
                    f"each federated training participant should be present. {self_id} should register certificate "
                    f"of {party}."
                )

            path_ = os.path.join(path_certificates, f"P{index}.pem")
            self._write_certificate_file(path_, party_object["certificate"])
            writen_certificates.append(path_)

            self._append_new_ip_address(
                ip_addresses,
                party_object["ip"],
                party_object["port"],
                party_object["party_id"]
            )

    except FedbiomedCertificateError as e:
        # Remove all writen file in case of an error
        remove_writen_files()
        raise FedbiomedCertificateError(e)

    except FedbiomedError as e:
        remove_writen_files()
        raise FedbiomedCertificateError(f"{ErrorNumbers.FB619.value}: {e}")

    except Exception as e:
        remove_writen_files()
        raise FedbiomedCertificateError(f"{ErrorNumbers.FB619.value}: Undetermined error: {e}")

    return ip_addresses, writen_certificates

Functions

generate_certificate

generate_certificate(root, component_id, prefix=None, subject=None)

Generates certificates

Parameters:

Name Type Description Default
component_id

ID of the component for which the certificate will be generated

required

Returns:

Name Type Description
key_file str

The path where private key file is saved

pem_file str

The path where public key file is saved

Raises:

Type Description
FedbiomedEnvironError

If certificate directory for the component has already certificate.pem or certificate.key files generated.

Source code in fedbiomed/common/certificate_manager.py
def generate_certificate(
    root,
    component_id,
    prefix: Optional[str] = None,
    subject: Optional[Dict[str, str]] = None
) -> Tuple[str, str]:
    """Generates certificates

    Args:
        component_id: ID of the component for which the certificate will be generated

    Returns:
        key_file: The path where private key file is saved
        pem_file: The path where public key file is saved

    Raises:
        FedbiomedEnvironError: If certificate directory for the component has already `certificate.pem` or
            `certificate.key` files generated.
    """

    certificate_path = os.path.join(root, CERTS_FOLDER_NAME, f"cert_{component_id}")

    if os.path.isdir(certificate_path) \
            and (os.path.isfile(os.path.join(certificate_path, "certificate.key")) or
                 os.path.isfile(os.path.join(certificate_path, "certificate.pem"))):

        raise ValueError(f"Certificate generation is aborted. Directory {certificate_path} has already "
                         f"certificates. Please remove those files to regenerate")
    else:
        os.makedirs(certificate_path, exist_ok=True)

    key_file, pem_file = CertificateManager.generate_self_signed_ssl_certificate(
        certificate_folder=certificate_path,
        certificate_name=prefix if prefix else '',
        component_id=component_id,
        subject=subject
    )

    return key_file, pem_file

retrieve_ip_and_port

retrieve_ip_and_port(root, new=False, increment=None)
Source code in fedbiomed/common/certificate_manager.py
def retrieve_ip_and_port(
        root: str, 
        new: bool = False,
        increment: Union[int, None] = None
) -> Tuple[str, str]:

    # Get IP from global environment
    ip = os.getenv('MPSPDZ_IP', "localhost")

    increment_file = os.path.join(root, CONFIG_FOLDER_NAME, 'port_increment')

    if os.path.isfile(increment_file) and new is False:
        with open(increment_file, "r+") as file:
            port_increment = file.read()
            if port_increment != "":
                port = int(port_increment) + 1
                file.truncate(0)
                file.close()

                # Renew port in the  file
                _ = retrieve_ip_and_port(
                    root,
                    new=True,
                    increment=port)
            else:
                _, port = retrieve_ip_and_port(
                    root,
                    new=True)
    else:
        with open(increment_file, "w") as file:
            port = os.getenv('MPSPDZ_PORT', 14000) if increment is None else increment
            file.write(f"{port}")
            file.close()

    return ip, port