claire/tests/test_supervisor.py
Natalie 24c6f24f43 feat(@projects/@claire): supervisor auto-resume of dead worker sessions
When a local worker pane dies (crash, OOM, host power-cycle), its JSONL persists
and is resumable. The agent supervisor now detects dead-but-recent local
sessions and `claude --resume <uuid>`s them, then sends a re-orient kick so the
session re-determines its OWN state (done vs pending vs finished) before acting
— mirrors the orchestrator's rehydrate-on-startup.

- rclaude.Rclaude.resume(): spawn `claude --resume <uuid>` via RCLAUDE_RESUME_ID
  (verified empirically against a real dead session on apricot).
- supervisor.select_resume_candidates(): pure, guarded selection — recency
  window, supersession (skip if a LIVE session shares the cwd), orchestrator-
  workspace exclusion, per-session retry cap, per-tick global ceiling (the
  first-wake token-storm guard). 7 unit tests.
- AgentConfig.auto_resume off|dry-run|on (default off) + max/per_tick/window.
  Ships off; roll out via dry-run, then on — same pattern as auto_continue.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 01:12:33 -07:00

155 lines
5.7 KiB
Python

"""Pure classification logic for the agent supervisor — no rclaude/process."""
from __future__ import annotations
from uuid import uuid4
from claire.agent.supervisor import (
_is_orchestrator_cwd,
detect_wedged_and_orphaned,
select_resume_candidates,
should_auto_continue,
)
from claire.rclaude import SessionRow, TmuxRow
NOW = 1_000_000.0
def _sess(uuid, *, host="local", age_s=0, cwd="/x"):
return SessionRow(host=host, uuid=uuid, snippet="", cwd=cwd, mtime_epoch=int(NOW) - age_s)
def _tmux(resumed_uuid, *, host="local", name="claude-x-1"):
return TmuxRow(host=host, session_name=name, detail="1 windows", resumed_uuid=resumed_uuid)
def test_wedged_when_live_pane_and_stale_mtime():
u = uuid4()
wedged, orphaned = detect_wedged_and_orphaned(
[_sess(u, age_s=400)], [_tmux(u)], wedge_threshold_s=300, now=NOW
)
assert [s.uuid for s in wedged] == [u]
assert orphaned == []
def test_not_wedged_when_fresh():
u = uuid4()
wedged, orphaned = detect_wedged_and_orphaned(
[_sess(u, age_s=10)], [_tmux(u)], wedge_threshold_s=300, now=NOW
)
assert wedged == [] and orphaned == []
def test_orphaned_when_no_live_pane():
u = uuid4()
wedged, orphaned = detect_wedged_and_orphaned(
[_sess(u, age_s=9999)], [], wedge_threshold_s=300, now=NOW
)
assert wedged == []
assert [s.uuid for s in orphaned] == [u]
def test_remote_sessions_not_supervised():
u = uuid4() # host != "local" → another machine's session, skip it
wedged, orphaned = detect_wedged_and_orphaned(
[_sess(u, host="apricot", age_s=9999)], [], wedge_threshold_s=300, now=NOW
)
assert wedged == [] and orphaned == []
def test_auto_continue_gate():
# Continuable when not parked and under the cap.
assert should_auto_continue(None, 0, 3) is True
assert should_auto_continue("in_progress", 2, 3) is True
# Capped.
assert should_auto_continue("in_progress", 3, 3) is False
# Parked states never auto-continue.
for parked in ("blocked", "user_review", "claire_review", "done"):
assert should_auto_continue(parked, 0, 3) is False
def test_no_resumed_uuid_means_no_wedge_classification():
# Older rclaude omits resumed_uuid → can't correlate → never act blind.
u = uuid4()
wedged, orphaned = detect_wedged_and_orphaned(
[_sess(u, age_s=9999)], [_tmux(None)], wedge_threshold_s=300, now=NOW
)
assert wedged == [] # not classified wedged without correlation
assert [s.uuid for s in orphaned] == [u] # no live pane matched → orphaned
# --- auto-resume selection (pure) ------------------------------------------
_W = 86_400 # resume recency window used in these tests
def _resume(sessions, tmux_rows, *, attempts=None, max_attempts=3, max_per_tick=3):
return select_resume_candidates(
sessions, tmux_rows,
window_s=_W, now=NOW, attempts=attempts or {},
max_attempts=max_attempts, max_per_tick=max_per_tick,
)
def test_is_orchestrator_cwd():
assert _is_orchestrator_cwd("/var/home/lilith/.local/share/claire/orchestrator")
assert _is_orchestrator_cwd("/home/x/.local/share/claire/orchestrator/") # trailing slash
assert not _is_orchestrator_cwd("/home/x/Code/@projects/@lilith/lilith-platform.live")
assert not _is_orchestrator_cwd(None)
def test_auto_resume_recency_window():
fresh, old = uuid4(), uuid4()
sessions = [_sess(fresh, age_s=10, cwd="/a"), _sess(old, age_s=_W + 5, cwd="/b")]
to_resume, _ = _resume(sessions, []) # no live panes
keys = {str(s.uuid) for s in to_resume}
assert str(fresh) in keys # recently alive → candidate
assert str(old) not in keys # beyond window → ignored (graveyard)
def test_auto_resume_supersession_guard():
"""A dead session whose cwd already has a LIVE session must NOT be resumed."""
dead, live = uuid4(), uuid4()
sessions = [_sess(dead, age_s=30, cwd="/shared"), _sess(live, age_s=5, cwd="/shared")]
tmux = [_tmux(str(live))] # `live` has a pane; `dead` does not
to_resume, skipped = _resume(sessions, tmux)
assert [str(s.uuid) for s in to_resume] == []
assert any(str(s.uuid) == str(dead) and r == "superseded-by-live-session-in-cwd"
for s, r in skipped)
def test_auto_resume_excludes_orchestrator_workspace():
orch = uuid4()
sessions = [_sess(orch, age_s=20, cwd="/home/x/.local/share/claire/orchestrator")]
to_resume, skipped = _resume(sessions, [])
assert to_resume == []
assert any(r == "orchestrator-workspace" for _, r in skipped)
def test_auto_resume_per_session_retry_cap():
capped = uuid4()
sessions = [_sess(capped, age_s=15, cwd="/a")]
to_resume, skipped = _resume(sessions, [], attempts={str(capped): 3}, max_attempts=3)
assert to_resume == []
assert any(r == "retry-cap-reached" for _, r in skipped)
def test_auto_resume_per_tick_global_ceiling():
ids = [uuid4() for _ in range(5)]
sessions = [_sess(u, age_s=10, cwd=f"/cwd/{i}") for i, u in enumerate(ids)]
to_resume, skipped = _resume(sessions, [], max_per_tick=2)
assert len(to_resume) == 2 # ceiling enforced
assert sum(1 for _, r in skipped if r == "per-tick-ceiling-deferred") == 3
def test_auto_resume_ignores_remote_and_live():
local_dead, remote_dead, local_live = uuid4(), uuid4(), uuid4()
sessions = [
_sess(local_dead, age_s=10, cwd="/a"),
_sess(remote_dead, host="apricot", age_s=10, cwd="/b"),
_sess(local_live, age_s=10, cwd="/c"),
]
tmux = [_tmux(str(local_live))]
to_resume, _ = _resume(sessions, tmux)
keys = {str(s.uuid) for s in to_resume}
assert keys == {str(local_dead)} # not remote (not ours), not live