"""
quantum_safe.protocols.envelope
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Authenticated encryption envelope: KEM-derived key + AES-256-GCM.
This is what you use when you want to encrypt data to a recipient's
public key such that:
- Only the holder of the corresponding secret key can decrypt it.
- Any tampering with the ciphertext is detected.
- The algorithm used is recorded in the envelope for future migration.
- The format survives algorithm upgrades via the version field.
Construction
------------
For a given recipient public key pk:
1. Run HybridKEM.encapsulate(pk) → (kem_ct, shared_secret)
2. Derive enc_key = HKDF(shared_secret, info="qs-envelope-enc-v1")
3. Derive mac_key = HKDF(shared_secret, info="qs-envelope-mac-v1")
(mac_key is currently unused — AES-GCM provides authentication —
but we derive it for forward compatibility with MAC-then-encrypt schemes)
4. Generate a random 12-byte nonce.
5. aead_ct = AES-256-GCM.encrypt(enc_key, nonce, plaintext, aad=metadata)
where metadata = version || algo || kem_ct_fingerprint
6. Store: SealedMessage{version, algo, kem_ct, nonce, aead_ct, aad}
The metadata is authenticated but not encrypted (AAD in GCM). This means
the algorithm name and version are visible without decryption — useful for
operational tools that need to inspect ciphertexts without access to the key.
Wire format (CBOR-encoded SealedMessage)
-----------------------------------------
{
"v": 1, # envelope format version
"algo": "X25519+ML-KEM-768", # KEM algorithm used
"kct": <bytes>, # KEM ciphertext (HybridCipherText.to_bytes())
"n": <bytes>, # 12-byte AES-GCM nonce
"ct": <bytes>, # AES-256-GCM ciphertext + 16-byte tag
"aad": <bytes>, # additional authenticated data (visible, not encrypted)
}
Migration
---------
When you want to upgrade from one KEM algorithm to another:
1. Decrypt the existing SealedMessage (you need the old secret key for this).
2. Re-seal the plaintext with the new algorithm.
3. Delete the old SealedMessage.
The version field and algorithm name make it straightforward to identify
which sealed messages need upgrading — scan for version=1, algo=old_algo.
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from typing import Any
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from quantum_safe._internal import serialization as _ser
from quantum_safe.exceptions import (
DecapsulationError,
KeyParseError,
)
from quantum_safe.kem.hybrid import HybridKEM
from quantum_safe.types import HybridCipherText, PublicKey, SecretKey
# Envelope format version. Bump for backward-incompatible changes.
_ENVELOPE_VERSION = 1
# AES-GCM parameters
_NONCE_LEN = 12 # bytes — GCM standard nonce
_KEY_LEN = 32 # bytes — AES-256
_GCM_TAG_LEN = 16 # bytes — appended to ciphertext by AESGCM
# HKDF info strings for key derivation — version-pinned for domain separation
_ENC_KEY_INFO = b"qs-envelope-enc-v1"
_MAC_KEY_INFO = b"qs-envelope-mac-v1"
[docs]
@dataclass
class SealedMessage:
"""A ciphertext envelope produced by Envelope.seal().
All fields are needed to decrypt. The `aad` field is authenticated
but not encrypted — it's safe to inspect without the decryption key.
Attributes:
version: Envelope format version.
algorithm: KEM algorithm used (e.g. "X25519+ML-KEM-768").
kem_ct: KEM ciphertext bytes (HybridCipherText wire format).
nonce: 12-byte AES-GCM nonce. Never reused.
ciphertext: AES-256-GCM encrypted payload including 16-byte tag.
aad: Additional authenticated data. Visible but authenticated.
"""
version: int
algorithm: str
kem_ct: bytes
nonce: bytes
ciphertext: bytes
aad: bytes = field(default=b"")
def __post_init__(self) -> None:
if len(self.nonce) != _NONCE_LEN:
raise ValueError(f"nonce must be exactly {_NONCE_LEN} bytes, got {len(self.nonce)}")
# ------------------------------------------------------------------
# Serialization
# ------------------------------------------------------------------
[docs]
def to_bytes(self) -> bytes:
"""Serialize to CBOR (or JSON-b64 fallback) bytes."""
return _ser.dumps(
{
"v": self.version,
"algo": self.algorithm,
"kct": self.kem_ct,
"n": self.nonce,
"ct": self.ciphertext,
"aad": self.aad,
}
)
[docs]
@classmethod
def from_bytes(cls, data: bytes) -> SealedMessage:
"""Deserialize from bytes produced by to_bytes()."""
try:
d = _ser.loads(data)
except Exception as exc:
raise KeyParseError("envelope", f"CBOR/JSON decode failed: {exc}") from exc
required = ("v", "algo", "kct", "n", "ct")
for key in required:
if key not in d:
raise KeyParseError("envelope", f"missing field '{key}'")
return cls(
version=d["v"],
algorithm=d["algo"],
kem_ct=bytes(d["kct"]),
nonce=bytes(d["n"]),
ciphertext=bytes(d["ct"]),
aad=bytes(d.get("aad", b"")),
)
[docs]
def to_hex(self) -> str:
"""Convenience: serialize to a hex string."""
return self.to_bytes().hex()
[docs]
@classmethod
def from_hex(cls, hex_str: str) -> SealedMessage:
"""Deserialize from a hex string."""
try:
return cls.from_bytes(bytes.fromhex(hex_str))
except Exception as exc:
raise KeyParseError("envelope", f"hex decode failed: {exc}") from exc
[docs]
def inspect(self) -> dict[str, Any]:
"""Return a dict of visible (non-secret) metadata for logging/debugging.
Never logs ciphertext or key material. Safe to include in structured logs.
"""
return {
"version": self.version,
"algorithm": self.algorithm,
"kem_ct_size": len(self.kem_ct),
"ciphertext_size": len(self.ciphertext),
"aad": self.aad.hex() if self.aad else "",
}
def __repr__(self) -> str:
return (
f"SealedMessage("
f"v={self.version}, "
f"algo={self.algorithm!r}, "
f"ct_size={len(self.ciphertext)}B)"
)
[docs]
class Envelope:
"""Authenticated encryption using hybrid KEM + AES-256-GCM.
This is a class with only class methods — you don't instantiate it.
Think of it as a namespace for seal() and open().
Example::
# Sender:
sealed = Envelope.seal(b"top secret", recipient_public_key)
wire = sealed.to_bytes() # send this over the network
# Recipient:
msg = sealed.from_bytes(wire)
plain = Envelope.open(msg, recipient_secret_key)
"""
[docs]
@classmethod
def seal(
cls,
plaintext: bytes,
recipient_public_key: PublicKey,
aad: bytes = b"",
kem: HybridKEM | None = None,
) -> SealedMessage:
"""Encrypt plaintext to the recipient's public key.
Args:
plaintext: The data to encrypt. No size limit.
recipient_public_key: Recipient's HybridKEM public key.
aad: Additional authenticated data. Included
unencrypted in the envelope but authenticated
by GCM — any modification fails decryption.
Use for metadata you want visible but protected
(e.g. recipient ID, timestamp, content type).
kem: HybridKEM instance to use. If None, creates
a default HybridKEM() matching the key's
algorithm.
Returns:
SealedMessage that can be decrypted with Envelope.open().
Raises:
UnsupportedAlgorithm: if the key's algorithm isn't a known
hybrid KEM combination.
"""
if kem is None:
kem = cls._kem_for_key(recipient_public_key)
# KEM encapsulation — this is the core of the construction
kem_ct, shared_secret = kem.encapsulate(recipient_public_key)
# Derive AES key from the shared secret
enc_key = shared_secret.derive_key(
length=_KEY_LEN,
info=_ENC_KEY_INFO,
)
# Random nonce — 12 bytes for AES-GCM
nonce = os.urandom(_NONCE_LEN)
# Build AAD: version byte || algo string, plus any caller-supplied aad.
# This binds the version and algorithm into the authentication tag.
built_aad = cls._build_aad(
version=_ENVELOPE_VERSION,
algorithm=recipient_public_key.algorithm,
extra=aad,
)
# Encrypt + authenticate
aes = AESGCM(enc_key)
ciphertext = aes.encrypt(nonce, plaintext, built_aad)
return SealedMessage(
version=_ENVELOPE_VERSION,
algorithm=recipient_public_key.algorithm,
kem_ct=kem_ct.to_bytes(),
nonce=nonce,
ciphertext=ciphertext,
aad=aad, # store the caller's aad (not the built one)
)
[docs]
@classmethod
def open(
cls,
sealed: SealedMessage,
recipient_secret_key: SecretKey,
kem: HybridKEM | None = None,
) -> bytes:
"""Decrypt a SealedMessage.
Args:
sealed: SealedMessage from Envelope.seal() or
deserialized from the wire.
recipient_secret_key: The recipient's HybridKEM secret key.
kem: HybridKEM instance. If None, auto-created
from the envelope's algorithm field.
Returns:
Original plaintext bytes.
Raises:
DecapsulationError: if KEM decapsulation fails.
cryptography.exceptions.InvalidTag: if the ciphertext is tampered
or the wrong key is used (GCM authentication
failure). We let this propagate from
cryptography directly — don't catch it.
"""
if kem is None:
kem = cls._kem_for_algorithm(sealed.algorithm)
# Reconstruct the HybridCipherText from wire bytes
try:
kem_ct = HybridCipherText.from_bytes(sealed.kem_ct, algorithm=sealed.algorithm)
except Exception as exc:
raise DecapsulationError(algo=sealed.algorithm) from exc
# KEM decapsulation — recovers the shared secret
shared_secret = kem.decapsulate(recipient_secret_key, kem_ct)
# Derive the same AES key
enc_key = shared_secret.derive_key(
length=_KEY_LEN,
info=_ENC_KEY_INFO,
)
# Rebuild AAD — must match what was used during seal()
built_aad = cls._build_aad(
version=sealed.version,
algorithm=sealed.algorithm,
extra=sealed.aad,
)
# Decrypt + verify authentication tag
# AESGCM.decrypt() raises InvalidTag if tampered — let it propagate.
aes = AESGCM(enc_key)
return aes.decrypt(sealed.nonce, sealed.ciphertext, built_aad)
@staticmethod
def _build_aad(version: int, algorithm: str, extra: bytes) -> bytes:
"""Build the full AAD passed to AES-GCM.
We always include version and algorithm so these fields are
authenticated even when the caller doesn't pass any extra aad.
Format: version_byte || algo_len_byte || algo_bytes || extra_aad
"""
algo_bytes = algorithm.encode("ascii")
if len(algo_bytes) > 255:
raise ValueError("algorithm name too long for AAD encoding")
return bytes([version, len(algo_bytes)]) + algo_bytes + extra
@staticmethod
def _kem_for_key(public_key: PublicKey) -> HybridKEM:
"""Create a HybridKEM matching the algorithm of the given key."""
return Envelope._kem_for_algorithm(public_key.algorithm)
@staticmethod
def _kem_for_algorithm(algorithm: str) -> HybridKEM:
"""Parse a hybrid algorithm string and create a matching HybridKEM."""
from quantum_safe.kem.algorithms import parse_hybrid_name
try:
classical, pqc = parse_hybrid_name(algorithm)
except ValueError as exc:
from quantum_safe.exceptions import UnsupportedAlgorithm
raise UnsupportedAlgorithm(
algorithm,
available=["X25519+ML-KEM-768", "X25519+ML-KEM-1024"],
) from exc
return HybridKEM(classical=classical, pqc=pqc)