Source code for quantum_safe.kem.core

"""
quantum_safe.kem.core
~~~~~~~~~~~~~~~~~~~~~

The KEM class: single-algorithm Key Encapsulation Mechanism.

Most users should use HybridKEM instead โ€” it gives you classical + PQC
security without any extra work.  KEM is for when you specifically need
a single PQC algorithm, e.g.:

  - Benchmarking a specific algorithm
  - Protocol testing against a reference implementation
  - Cases where key size is critical and hybrid overhead matters

Usage::

    from quantum_safe import KEM

    kem = KEM("ML-KEM-768")
    kp  = kem.generate_keypair()
    ct, ss = kem.encapsulate(kp.public)
    ss2    = kem.decapsulate(kp.secret, ct)
    assert ss == ss2
"""

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING

from quantum_safe.backends import get_kem_backend
from quantum_safe.exceptions import InsecureOperationError, UnsupportedAlgorithm
from quantum_safe.kem.algorithms import (
    MINIMUM_NIST_LEVEL,
    get_algorithm_spec,
)
from quantum_safe.types import (
    CipherText,
    KeyPair,
    MigrationState,
    PublicKey,
    SecretKey,
    SharedSecret,
)

if TYPE_CHECKING:
    from quantum_safe.backends.base import AbstractKEMBackend


[docs] class KEM: """Single-algorithm KEM. Wraps a backend and presents a typed, safe interface. Key decisions: - Key generation always returns a KeyPair (not raw bytes). - Encapsulate takes a PublicKey, returns (CipherText, SharedSecret). - Decapsulate takes (SecretKey, CipherText), returns SharedSecret. - All inputs/outputs are typed โ€” you can't accidentally pass a shared secret where a ciphertext is expected. Args: algorithm: PQC algorithm name. Defaults to ML-KEM-768. backend: Backend name: "auto", "liboqs", "rustcrypto". "auto" tries rustcrypto first, then liboqs. allow_low_security: If True, allows NIST level 1 algorithms without warning. Default False (warns on L1). strict: If True, raises instead of warning for non-standard configurations. Default False. """ def __init__( self, algorithm: str = "ML-KEM-768", backend: str = "auto", allow_low_security: bool = False, strict: bool = False, ) -> None: self._algorithm = algorithm self._strict = strict self._spec = get_algorithm_spec(algorithm) self._backend: AbstractKEMBackend = get_kem_backend(backend) # Security level check if self._spec.nist_level < MINIMUM_NIST_LEVEL: msg = ( f"Algorithm '{algorithm}' is at NIST level {self._spec.nist_level}, " f"below the minimum recommended level {MINIMUM_NIST_LEVEL}." ) if strict: raise InsecureOperationError(msg, algorithm=algorithm) warnings.warn(msg, stacklevel=2) if not self._spec.is_nist_standard: msg = ( f"Algorithm '{algorithm}' is not a NIST standardized algorithm. " f"It may not be suitable for production use." ) if strict: raise InsecureOperationError(msg, algorithm=algorithm) warnings.warn(msg, stacklevel=2) @property def algorithm(self) -> str: """The PQC algorithm name.""" return self._algorithm @property def backend_name(self) -> str: """Name of the backend being used.""" return self._backend.name
[docs] def generate_keypair(self) -> KeyPair: """Generate a fresh key pair for this algorithm. Returns: KeyPair with .public (PublicKey) and .secret (SecretKey). Example:: kp = kem.generate_keypair() print(kp.public.fingerprint()) """ pub_bytes, sec_bytes = self._backend.keygen(self._algorithm) pub = PublicKey( raw=pub_bytes, algorithm=self._algorithm, migration_state=MigrationState.PQC_ONLY, backend_tag=self._backend.name, ) sec = SecretKey( raw=sec_bytes, algorithm=self._algorithm, migration_state=MigrationState.PQC_ONLY, backend_tag=self._backend.name, ) return KeyPair(public=pub, secret=sec)
[docs] def encapsulate(self, public_key: PublicKey) -> tuple[CipherText, SharedSecret]: """Encapsulate a shared secret under the recipient's public key. Args: public_key: The recipient's public key. Must be for the same algorithm as this KEM instance. Returns: (ct, ss): CipherText to send to the recipient, SharedSecret for the sender to use. Raises: UnsupportedAlgorithm: if the key's algorithm doesn't match. """ if public_key.algorithm != self._algorithm: raise UnsupportedAlgorithm( public_key.algorithm, available=[self._algorithm], ) ct_bytes, ss_bytes = self._backend.encapsulate(self._algorithm, public_key.raw_bytes) ct = CipherText(data=ct_bytes, algorithm=self._algorithm) # SS must be exactly 32 bytes for NIST-standard algorithms ss = SharedSecret(data=ss_bytes[:32], algorithm=self._algorithm, is_hybrid=False) return ct, ss
[docs] def decapsulate(self, secret_key: SecretKey, ciphertext: CipherText) -> SharedSecret: """Decapsulate: recover the shared secret from a ciphertext. Args: secret_key: The recipient's secret key. ciphertext: The CipherText from the sender. Returns: SharedSecret matching the one the sender derived. Raises: DecapsulationError: if decapsulation fails. Note that ML-KEM implements implicit rejection (FIPS 203 ยง6.3), so malformed ciphertexts produce a pseudorandom value rather than raising. The error is raised only for structural failures. """ if secret_key.algorithm != self._algorithm: raise UnsupportedAlgorithm( secret_key.algorithm, available=[self._algorithm], ) ss_bytes = self._backend.decapsulate( self._algorithm, secret_key.raw_bytes, bytes(ciphertext) ) return SharedSecret(data=ss_bytes[:32], algorithm=self._algorithm, is_hybrid=False)
def __repr__(self) -> str: return f"KEM(algo={self._algorithm!r}, backend={self._backend.name!r})"