Migration (quantum_safe.migrate)¶
Tools for scanning codebases and upgrading classical keys to hybrid PQC.
Scanner¶
- class quantum_safe.migrate.scanner.Scanner[source]¶
Bases:
objectScans Python source files for classical cryptography usage.
Usage:
report = Scanner.scan_directory("./src") print(report.summary()) if report.has_blocking_findings: for f in report.high + report.critical: print(f) sys.exit(1)
- classmethod scan_file(filepath)[source]¶
Scan a single Python file.
- Parameters:
- Return type:
- Returns:
ScanReport with findings for this file.
- classmethod scan_directory(directory, exclude=None, max_file_size_kb=512)[source]¶
Recursively scan a directory for classical crypto usage.
- Parameters:
- Return type:
- Returns:
ScanReport with aggregated findings.
- class quantum_safe.migrate.scanner.ScanReport(root, files_scanned=0, findings=<factory>, errors=<factory>)[source]¶
Bases:
objectAggregated results from scanning one or more files/directories.
- root¶
The directory or file that was scanned.
- files_scanned¶
Number of Python files analyzed.
- findings¶
All findings, sorted by (file, line).
- errors¶
Files that could not be parsed (syntax errors, permission issues).
- class quantum_safe.migrate.scanner.Finding(file, line, col, severity, rule_id, message, snippet='', fix_hint='')[source]¶
Bases:
objectA single classical crypto usage finding.
- file¶
Absolute or relative path to the source file.
- line¶
1-based line number.
- col¶
1-based column number.
- severity¶
Severity level.
- rule_id¶
Short machine-readable rule identifier, e.g. “QS001”.
- message¶
Human-readable description.
- snippet¶
The offending source line, stripped of leading whitespace.
- fix_hint¶
Optional suggestion for how to fix the issue.
Upgrader¶
- class quantum_safe.migrate.upgrader.Upgrader[source]¶
Bases:
objectUpgrades classical keys to hybrid PQC keys.
All methods are class methods — no instantiation needed.
Example:
from quantum_safe.migrate import Upgrader from quantum_safe.types.keys import MigrationState # Upgrade an X25519 secret key to hybrid result = Upgrader.upgrade_kem_key( classical_secret_bytes=x25519_private_bytes, classical_public_bytes=x25519_public_bytes, classical_algorithm="X25519", target_pqc="ML-KEM-768", backend="auto", ) new_kp = result.new_keypair print(new_kp.algorithm) # "X25519+ML-KEM-768"
- classmethod upgrade_kem_key(classical_secret_bytes, classical_public_bytes, classical_algorithm='X25519', target_pqc='ML-KEM-768', backend='auto')[source]¶
Upgrade a classical KEM key to a hybrid key.
The original classical sub-key is retained in the output — it is the same bytes as classical_secret_bytes / classical_public_bytes, just wrapped alongside the new PQC component.
- Parameters:
classical_secret_bytes (
bytes) – Raw bytes of the classical private key. For X25519: 32 bytes.classical_public_bytes (
bytes) – Raw bytes of the classical public key. For X25519: 32 bytes.classical_algorithm (
str) – “X25519” or “P-256”.target_pqc (
str) – PQC algorithm to add. Default “ML-KEM-768”.backend (
str) – PQC backend.
- Return type:
- Returns:
UpgradeResult with the new hybrid KeyPair.
- classmethod upgrade_signing_key(classical_secret_bytes, classical_public_bytes, classical_algorithm='Ed25519', target_pqc='ML-DSA-65', backend='auto')[source]¶
Upgrade a classical signing key to a hybrid signing key.
- Parameters:
classical_secret_bytes (
bytes) – Raw bytes of the classical private key. For Ed25519: 32 bytes.classical_public_bytes (
bytes) – Raw bytes of the classical public key. For Ed25519: 32 bytes.classical_algorithm (
str) – “Ed25519” or “P-256”.target_pqc (
str) – PQC signature algorithm. Default “ML-DSA-65”.backend (
str) – PQC backend.
- Return type:
- Returns:
UpgradeResult with the new hybrid signing KeyPair.
- classmethod strip_classical_component(hybrid_keypair)[source]¶
Remove the classical sub-key from a hybrid keypair.
Use this when you’re confident all clients support PQC and you want to move to PQC_ONLY migration state.
Warning: This is a one-way operation. Old clients that only support classical algorithms will no longer be able to use the returned key. Make sure you’ve fully migrated before calling this.
- Parameters:
hybrid_keypair (
KeyPair) – A keypair in HYBRID_TRANSITION or PQC_PREFERRED state.- Return type:
- Returns:
A new KeyPair with only the PQC component, in PQC_ONLY state.
- Raises:
ValueError – if the keypair is not in a hybrid state.
- class quantum_safe.migrate.upgrader.UpgradeResult(new_keypair, old_algorithm, new_algorithm, migration_state, backward_compat, notes='')[source]¶
Bases:
objectResult of a key upgrade operation.
- new_keypair¶
The upgraded hybrid keypair.
- old_algorithm¶
Algorithm string of the key before upgrade.
- new_algorithm¶
Algorithm string of the upgraded key.
- migration_state¶
Migration state of the new key (always HYBRID_TRANSITION).
- backward_compat¶
True if the new key is backward-compatible with the old algorithm (i.e. old clients can still use it).
- notes¶
Human-readable notes about the upgrade.
Migration state¶
- class quantum_safe.migrate.state.MigrationStateManager(store)[source]¶
Bases:
objectManages migration state for a collection of keys.
This class is storage-agnostic — it takes a dict-like store and wraps it with validation, history, and audit logging. You provide the storage; we provide the business logic.
- Parameters:
store (
dict[str,bytes]) – A dict-like object for persistent state storage. Keys are string key_ids; values are MigrationRecord bytes. In production, back this with Redis, DynamoDB, Postgres, etc. In tests, a plain dict works fine.
Example:
store = {} # replace with your database abstraction mgr = MigrationStateManager(store) # First time we see this key rec = mgr.transition( key_id="user-123", from_state=MigrationState.CLASSICAL_ONLY, to_state=MigrationState.HYBRID_TRANSITION, algorithm="X25519+ML-KEM-768", actor="key-rotation-job-v1", ) # rec is stored in `store["user-123_current"]` # Full history in `store["user-123_history"]`
- transition(key_id, from_state, to_state, algorithm, actor='system', reason='', metadata=None, allow_backward=False)[source]¶
Record a state transition for a key.
- Parameters:
key_id (
str) – Application key identifier.from_state (
MigrationState) – Expected current state (for optimistic concurrency check).to_state (
MigrationState) – New target state.algorithm (
str) – Key algorithm after this transition.actor (
str) – Who is performing the migration.reason (
str) – Why (required for backward transitions).metadata (
dict[str,Any] |None) – Arbitrary key-value pairs for audit.allow_backward (
bool) – Set True to explicitly permit backward transitions. Still requires a non-empty reason string.
- Return type:
MigrationRecord- Returns:
The created MigrationRecord.
- Raises:
ValueError – if the transition is not valid or current state doesn’t match from_state.
- get_current_state(key_id)[source]¶
Return the current migration state for a key, or None if unknown.
- Parameters:
key_id (
str)- Return type:
- keys_by_state(state)[source]¶
Return all key IDs currently in the given migration state.
This is a full scan — in production, maintain a secondary index.
- Parameters:
state (
MigrationState)- Return type:
Shims¶
- class quantum_safe.migrate.shims.FernetShim(backend='auto')[source]¶
Bases:
_ShimBaseDrop-in replacement for cryptography.fernet.Fernet.
Replaces the symmetric Fernet construction with asymmetric hybrid KEM + AES-256-GCM encryption. See module docstring for semantic differences.
- The interface is intentionally similar to Fernet but not identical:
No generate_key() class method (keys are asymmetric now)
encrypt(data) → bytes (SealedMessage serialized)
decrypt(token) → bytes
Call shim_stats() to see how often this shim is being used.
- Parameters:
backend (
str)
- class quantum_safe.migrate.shims.JWTShim[source]¶
Bases:
_ShimBaseDrop-in replacement for PyJWT’s jwt.encode() / jwt.decode().
Replaces classical JWT signing (RS256, ES256, HS256) with hybrid PQC signing (Ed25519+ML-DSA-65).
Usage:
# Before: import jwt token = jwt.encode({"sub": "user"}, private_key, algorithm="RS256") claims = jwt.decode(token, public_key, algorithms=["RS256"]) # After (drop-in): from quantum_safe.migrate.shims import JWTShim as jwt token = jwt.encode({"sub": "user"}, keypair, algorithm="Ed25519+ML-DSA-65") claims = jwt.decode(token, public_key, algorithms=["Ed25519+ML-DSA-65"])
The key parameter accepts a quantum_safe.types.KeyPair for encoding and a quantum_safe.types.PublicKey for decoding.