feat(@projects/@claire): add vault verification CLI

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Natalie 2026-06-01 00:24:35 -06:00
parent 70b09d5a9b
commit 6cd1d3b28d
3 changed files with 244 additions and 0 deletions

View file

@ -640,6 +640,37 @@ def vault_status_cmd() -> None:
console.print(table)
@vault_app.command("verify")
def vault_verify_cmd(
file: Annotated[str | None, typer.Option("--file", help="verify a single vault file")] = None,
) -> None:
"""Live-test vault credentials against their declared `verify:` endpoint.
Only files with a `verify: <scheme>://<mesh-ip>:<port>` line are tested
(opt-in). ONE attempt per credential never retries a rejection
(fail2ban-safe). Run from a host on the WireGuard mesh.
"""
from rich.table import Table
from . import vault_verify as vv
results = [vv.verify_file(file)] if file else vv.verify_all()
if not results:
console.print("[yellow]no vault files declare a `verify:` endpoint[/yellow]")
raise typer.Exit(code=0)
colour = {"ok": "green", "rejected": "red", "unreachable": "yellow",
"unsupported": "yellow", "no-spec": "dim"}
table = Table(title="vault credential verify")
for c in ("file", "endpoint", "status", "detail"):
table.add_column(c)
for r in results:
table.add_row(r.file, r.endpoint or "-",
f"[{colour.get(r.status, 'white')}]{r.status}[/]", r.detail or "")
console.print(table)
if any(r.status == "rejected" for r in results):
raise typer.Exit(code=1)
@vault_app.command("rotate-secret")
def vault_rotate_secret(
confirm: Annotated[bool, typer.Option("--confirm", help="execute (default is dry-run)")] = False,

173
src/claire/vault_verify.py Normal file
View file

@ -0,0 +1,173 @@
"""Live credential verification for vault secrets — the empirical "which one
works" check, for reconciliation/audit.
A secret is only tested when its vault file OPTS IN via a `verify:` line we
never guess endpoints. One attempt per credential (a rejected login must NOT be
retried, or docker-mailserver/fail2ban bans the source).
verify: imap://10.9.0.1:993 # uses `account:` + `password:`
verify: smtps://10.9.0.1:465 # SMTP-over-SSL
verify: smtp://10.9.0.1:587 # SMTP + STARTTLS
verify: https://10.0.0.11/api/health # bearer = file body (token files)
Endpoints are the service's MESH IP (services bind WireGuard, not public DNS).
"""
from __future__ import annotations
import re
import socket
import ssl
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlparse
from .vault import default_vault_dir
_TIMEOUT = 8
@dataclass(frozen=True)
class VerifyResult:
file: str
endpoint: str # scheme://host:port, or "" when unspecified
status: str # ok | rejected | unreachable | no-spec | unsupported
detail: str = ""
def _field(text: str, key: str) -> str | None:
m = re.search(rf"^{key}:\s*(\S+)", text, re.M)
return m.group(1) if m else None
def parse_spec(text: str):
"""Return (scheme, host, port, path) from a `verify:` line, or None."""
raw = _field(text, "verify")
if not raw:
return None
u = urlparse(raw)
if not u.scheme or not u.hostname:
return None
return u.scheme, u.hostname, u.port, (u.path or "/")
def _imap(host, port, account, secret) -> tuple[str, str]:
import imaplib
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
socket.setdefaulttimeout(_TIMEOUT)
try:
m = imaplib.IMAP4_SSL(host, port or 993, ssl_context=ctx)
except (OSError, ssl.SSLError) as e:
return "unreachable", f"{type(e).__name__}: {e}"
try:
m.login(account, secret)
m.logout()
return "ok", ""
except imaplib.IMAP4.error as e:
return "rejected", str(e)
def _smtp(host, port, account, secret, *, starttls: bool) -> tuple[str, str]:
import smtplib
socket.setdefaulttimeout(_TIMEOUT)
try:
if starttls:
s = smtplib.SMTP(host, port or 587, timeout=_TIMEOUT)
s.ehlo()
s.starttls(context=_lax_ctx())
s.ehlo()
else:
s = smtplib.SMTP_SSL(host, port or 465, context=_lax_ctx(), timeout=_TIMEOUT)
except (OSError, smtplib.SMTPException) as e:
return "unreachable", f"{type(e).__name__}: {e}"
try:
s.login(account, secret)
s.quit()
return "ok", ""
except smtplib.SMTPAuthenticationError as e:
return "rejected", str(e)
except smtplib.SMTPException as e:
return "unreachable", str(e)
def _https(host, port, path, token) -> tuple[str, str]:
import httpx
url = f"https://{host}{':' + str(port) if port else ''}{path}"
try:
r = httpx.get(url, headers={"Authorization": f"Bearer {token}"},
timeout=_TIMEOUT, verify=False)
except httpx.HTTPError as e:
return "unreachable", f"{type(e).__name__}: {e}"
if r.status_code in (401, 403):
return "rejected", f"HTTP {r.status_code}"
if r.status_code < 400:
return "ok", f"HTTP {r.status_code}"
return "unreachable", f"HTTP {r.status_code}"
def _lax_ctx() -> ssl.SSLContext:
"""TLS context with cert verification OFF — a DELIBERATE, scoped choice.
The verifier connects to internal services by their WireGuard MESH IP
(e.g. 10.9.0.1), so the server cert's hostname never matches and strict
verification would fail every test. The security boundary here is the
WireGuard tunnel itself (authenticated, encrypted peers) this tool only
asks "does this credential authenticate?", it is not establishing trust in
the endpoint. It MUST stay confined to mesh IPs; never point a `verify:`
spec at a public host. (To harden: route by a hostname whose cert SAN
matches + trust the internal CA, then drop this.)
"""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
def verify_file(name: str, vault_dir: Path | None = None) -> VerifyResult:
"""Test one vault file's credential against its declared `verify:` endpoint.
ONE attempt never retries a rejection (fail2ban-safe).
"""
p = (vault_dir or default_vault_dir()) / name
text = p.read_text(encoding="utf-8")
spec = parse_spec(text)
if spec is None:
return VerifyResult(name, "", "no-spec")
scheme, host, port, path = spec
endpoint = f"{scheme}://{host}{':' + str(port) if port else ''}"
account = _field(text, "account")
secret = _field(text, "password") or text.strip() # token files = bare body
if scheme == "imap":
status, detail = _imap(host, port, account, secret)
elif scheme == "smtps":
status, detail = _smtp(host, port, account, secret, starttls=False)
elif scheme == "smtp":
status, detail = _smtp(host, port, account, secret, starttls=True)
elif scheme in ("https", "http"):
status, detail = _https(host, port, path, secret)
else:
return VerifyResult(name, endpoint, "unsupported", f"scheme {scheme!r}")
return VerifyResult(name, endpoint, status, detail)
def verify_all(vault_dir: Path | None = None) -> list[VerifyResult]:
"""Verify every vault file that declares a `verify:` endpoint."""
d = vault_dir or default_vault_dir()
results: list[VerifyResult] = []
for p in sorted(d.glob("*")):
if not p.is_file() or p.name.startswith(".") or p.name.endswith(".prev.txt"):
continue
try:
text = p.read_text(encoding="utf-8")
except OSError:
continue
if parse_spec(text) is None:
continue # silently skip files with no verify spec
results.append(verify_file(p.name, vault_dir=d))
return results

View file

@ -0,0 +1,40 @@
"""Vault credential-verify: spec parsing + dispatch (no live network)."""
from __future__ import annotations
from pathlib import Path
from claire import vault_verify as vv
def test_parse_spec_variants():
assert vv.parse_spec("verify: imap://10.9.0.1:993\n") == ("imap", "10.9.0.1", 993, "/")
assert vv.parse_spec("verify: smtps://10.9.0.1:465")[0] == "smtps"
s = vv.parse_spec("verify: https://10.0.0.11/api/health")
assert s == ("https", "10.0.0.11", None, "/api/health")
def test_parse_spec_none_when_absent_or_garbage():
assert vv.parse_spec("account: a\npassword: b\n") is None
assert vv.parse_spec("verify: not-a-url") is None
def test_verify_file_no_spec(tmp_path: Path):
(tmp_path / "x.txt").write_text("account: a\npassword: b\n")
r = vv.verify_file("x.txt", vault_dir=tmp_path)
assert r.status == "no-spec"
def test_verify_file_unsupported_scheme(tmp_path: Path):
(tmp_path / "x.txt").write_text("account: a\npassword: b\nverify: postgres://10.0.0.11:5432\n")
r = vv.verify_file("x.txt", vault_dir=tmp_path)
assert r.status == "unsupported"
assert "postgres" in r.detail
def test_verify_all_skips_specless(tmp_path: Path):
(tmp_path / "a.txt").write_text("password: x\n") # no verify
(tmp_path / "b.txt").write_text("verify: imap://h:993\naccount: u\npassword: p\n")
(tmp_path / "c.prev.txt").write_text("verify: imap://h:993\n") # prev excluded
names = {r.file for r in vv.verify_all(vault_dir=tmp_path)}
assert names == {"b.txt"} # only the spec'd, non-prev file