diff --git a/src/claire/cli.py b/src/claire/cli.py index ec58728..803f1dc 100644 --- a/src/claire/cli.py +++ b/src/claire/cli.py @@ -137,10 +137,21 @@ def agent_run( (sync now; supervisor + telemetry later). Deployed as a systemd --user unit on apricot/black. """ + import logging + import uvicorn from .web.app import create_app + # Surface the agent loops' INFO logs to stdout → journald. Without this, + # uvicorn only configures its own loggers and the `claire.*` loop logs + # (sync/supervisor/telemetry, incl. auto-continue dry-run) are invisible. + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + logging.getLogger("claire").setLevel(logging.INFO) + cfg = load_or_init() bind_port = port or cfg.agent.port console.print( diff --git a/src/claire/pull.py b/src/claire/pull.py index 44f9984..bc0e941 100644 --- a/src/claire/pull.py +++ b/src/claire/pull.py @@ -19,7 +19,7 @@ import re as _re import sqlite3 from collections import Counter from dataclasses import dataclass -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from uuid import UUID from . import events, scheduler @@ -28,6 +28,16 @@ from .hlc import HLCGenerator from .rclaude import Rclaude, RclaudeError +# A session must be `closed` AND silent for at least this long before its +# active assignments are reaped. `liveness` is a heuristic (freshest-N-panes +# by tmux-name match) and plum's view of *remote* tmux can transiently +# misflag a live worker as closed; a live worker writes to its JSONL +# constantly, so a fresh `last_seen_mtime` protects it from a misfire. The +# window only has to exceed the longest a genuinely-live session stays silent +# between transcript writes — 30 minutes is comfortably above that. +_REAP_GRACE_MINUTES = 30 + + @dataclass(frozen=True) class PullStats: sessions_observed: int @@ -35,6 +45,7 @@ class PullStats: errors: list[str] sessions_alive: int = 0 sessions_closed: int = 0 + assignments_reaped: int = 0 # --------------------------------------------------------------------------- @@ -42,6 +53,63 @@ class PullStats: # --------------------------------------------------------------------------- +def _reap_stale_assignments( + conn: sqlite3.Connection, + generator: HLCGenerator, + *, + now: datetime | None = None, + grace_minutes: int = _REAP_GRACE_MINUTES, +) -> int: + """Deactivate active assignments whose worker session is gone for good. + + When a worker dies its tmux pane vanishes and `pull`'s liveness pass marks + the session `closed` — but nothing ever flipped the assignment to inactive, + so the task kept *looking* assigned (and `list_fleet`, which only shows + alive sessions, could never reveal the orphan). The scheduler treats a + todo/in_progress task with no active assignment as dispatchable work + (`scheduler.suggest_assignments`), so deactivating the dead assignment is + all that's needed to re-surface the task — no task-state change, which + keeps the two-gate state machine intact (`in_progress → todo` is not a + legal transition by design). + + Gated on staleness, not `closed` alone, to survive a liveness misfire on a + quiet-but-live worker (see `_REAP_GRACE_MINUTES`). Idempotent: an already + inactive assignment is never re-selected, so re-running is a no-op. + """ + now = now or datetime.now(timezone.utc) + cutoff = now - timedelta(minutes=grace_minutes) + rows = conn.execute( + """ + SELECT a.id AS assignment_id, s.last_seen_mtime AS last_seen + FROM assignments a + JOIN sessions s ON s.uuid = a.session_uuid + WHERE a.active = 1 AND s.liveness = 'closed' + """ + ).fetchall() + + reaped = 0 + for row in rows: + last_seen = row["last_seen"] + if not last_seen: + # No timestamp to prove staleness — refuse to reap (conservative). + continue + try: + seen_at = datetime.fromisoformat(last_seen) + except ValueError: + continue + if seen_at.tzinfo is None: + seen_at = seen_at.replace(tzinfo=timezone.utc) + if seen_at >= cutoff: + continue # silent, but not long enough — could be a live misfire + events.append( + conn, + generator, + events.AssignmentDeactivated(assignment_id=UUID(row["assignment_id"])), + ) + reaped += 1 + return reaped + + def _session_snapshot( conn: sqlite3.Connection, ) -> dict[UUID, tuple[str, str | None, str | None]]: @@ -244,6 +312,17 @@ def pull( else: sessions_closed += 1 + # --- Reap ------------------------------------------------------------ + # A worker that died left its assignment marked active; deactivate any + # whose session is now closed-and-stale so the orphaned task re-surfaces + # as dispatchable work. Must run AFTER the liveness pass above so it sees + # this cycle's fresh `closed` flags. Never break a pull — count as error. + assignments_reaped = 0 + try: + assignments_reaped = _reap_stale_assignments(conn, generator) + except Exception as exc: # noqa: BLE001 — defensive: never break pull + errors.append(f"reap: {exc}") + # --- Triage ----------------------------------------------------------- try: triage_rows = rclaude.triage() @@ -299,6 +378,7 @@ def pull( errors=errors, sessions_alive=sessions_alive, sessions_closed=sessions_closed, + assignments_reaped=assignments_reaped, ) diff --git a/src/claire/web/api.py b/src/claire/web/api.py index 82d0e6a..415e16c 100644 --- a/src/claire/web/api.py +++ b/src/claire/web/api.py @@ -408,7 +408,13 @@ def fleet_considered_get(cg: Dep) -> dict[str, Any]: def pull_now(cg: Dep) -> dict[str, Any]: conn, gen = cg stats = service.pull(conn, gen) - return {"sessions_observed": stats.sessions_observed, "errors": stats.errors} + return { + "sessions_observed": stats.sessions_observed, + "sessions_alive": stats.sessions_alive, + "sessions_closed": stats.sessions_closed, + "assignments_reaped": stats.assignments_reaped, + "errors": stats.errors, + } @router.get("/budget") diff --git a/tests/test_pull_reap.py b/tests/test_pull_reap.py new file mode 100644 index 0000000..ce559f3 --- /dev/null +++ b/tests/test_pull_reap.py @@ -0,0 +1,118 @@ +"""Reap pass: a dead worker's assignment is deactivated so its task re-surfaces. + +When a worker dies its tmux pane vanishes and the liveness pass marks the +session `closed`, but nothing ever flipped the assignment inactive — so the +task kept *looking* assigned and the scheduler never re-offered it. The reap +pass deactivates active assignments whose session is closed-and-stale, gated +on a grace window so a momentarily-misflagged live worker is never reaped. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from uuid import UUID + +from claire.domain import TaskStatus +from claire.pull import pull +from claire.rclaude import SessionRow, TmuxRow +from claire.web import service + +_CWD = "/var/home/lilith/Code/@projects/@claire" +_SLUG = "var-home-lilith-Code--projects--claire" + + +class _FakeRclaude: + def __init__(self, sessions: list[SessionRow], tmux: list[TmuxRow]): + self._sessions = sessions + self._tmux = tmux + + def list_sessions(self) -> list[SessionRow]: + return list(self._sessions) + + def list_tmux(self) -> list[TmuxRow]: + return list(self._tmux) + + def triage(self) -> list: + return [] + + +def _session(uuid_hex: str, mtime: int) -> SessionRow: + return SessionRow( + host="apricot", uuid=UUID(uuid_hex), snippet="x", cwd=_CWD, mtime_epoch=mtime, + ) + + +def _active(conn, assignment_id: UUID) -> bool: + row = conn.execute( + "SELECT active FROM assignments WHERE id = ?", (str(assignment_id),) + ).fetchone() + return bool(row["active"]) + + +def _setup_in_progress_task(conn, gen) -> UUID: + service.create_project(conn, gen, name="proj") + task = service.add_task(conn, gen, project="proj", title="t", priority=1) + service.transition_task_state( + conn, gen, task_id=task.id, to_state=TaskStatus.IN_PROGRESS + ) + return task.id + + +def test_reaps_only_closed_and_stale_assignment(conn, gen) -> None: + """Three workers on one in-progress task: dead-stale reaped; closed-fresh + and alive both protected.""" + now = int(datetime.now(timezone.utc).timestamp()) + dead = "11111111-1111-1111-1111-111111111111" # old → closed + stale + fresh_closed = "22222222-2222-2222-2222-222222222222" # closed but within grace + alive = "33333333-3333-3333-3333-333333333333" # live pane → alive + + task_id = _setup_in_progress_task(conn, gen) + a_dead = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead)) + a_fresh = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(fresh_closed)) + a_alive = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(alive)) + + fake = _FakeRclaude( + sessions=[ + _session(dead, 1_700_000_000), # years old + _session(fresh_closed, now - 300), # 5 min ago — inside the 30-min grace + _session(alive, now - 60), # freshest → claims the one live pane + ], + # One live pane at the workspace → only the freshest session is alive. + tmux=[TmuxRow(host="apricot", session_name=f"claude-natalie-{_SLUG}-1779320000", detail="")], + ) + + stats = pull(conn, gen, rclaude=fake) + + assert stats.assignments_reaped == 1 + assert _active(conn, a_dead.id) is False # dead + stale → reaped + assert _active(conn, a_fresh.id) is True # closed but fresh → protected + assert _active(conn, a_alive.id) is True # alive → protected + + +def test_reap_is_idempotent(conn, gen) -> None: + """A second pull over unchanged fleet reaps nothing — the dead assignment + is already inactive.""" + dead = "44444444-4444-4444-4444-444444444444" + task_id = _setup_in_progress_task(conn, gen) + service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead)) + fake = _FakeRclaude(sessions=[_session(dead, 1_700_000_000)], tmux=[]) + + first = pull(conn, gen, rclaude=fake) + second = pull(conn, gen, rclaude=fake) + + assert first.assignments_reaped == 1 + assert second.assignments_reaped == 0 + + +def test_reaped_task_becomes_dispatchable_again(conn, gen) -> None: + """After reaping, the in-progress task has no active assignment, which is + exactly what the scheduler treats as unassigned open work.""" + dead = "55555555-5555-5555-5555-555555555555" + task_id = _setup_in_progress_task(conn, gen) + service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead)) + fake = _FakeRclaude(sessions=[_session(dead, 1_700_000_000)], tmux=[]) + + pull(conn, gen, rclaude=fake) + + active_for_task = service.read.list_assignments(conn, task_id=task_id, active_only=True) + assert active_for_task == [] diff --git a/uv.lock b/uv.lock index a193785..b7b10ad 100644 --- a/uv.lock +++ b/uv.lock @@ -181,6 +181,7 @@ dependencies = [ { name = "httpx" }, { name = "jinja2" }, { name = "mcp" }, + { name = "psutil" }, { name = "pydantic" }, { name = "python-multipart" }, { name = "rich" }, @@ -204,6 +205,7 @@ requires-dist = [ { name = "jinja2", specifier = ">=3.1" }, { name = "mcp", specifier = ">=1.27.1" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.10" }, + { name = "psutil", specifier = ">=5.9" }, { name = "pydantic", specifier = ">=2.7" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23" }, @@ -776,6 +778,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, ] +[[package]] +name = "psutil" +version = "7.2.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/aa/c6/d1ddf4abb55e93cebc4f2ed8b5d6dbad109ecb8d63748dd2b20ab5e57ebe/psutil-7.2.2.tar.gz", hash = "sha256:0746f5f8d406af344fd547f1c8daa5f5c33dbc293bb8d6a16d80b4bb88f59372", size = 493740, upload-time = "2026-01-28T18:14:54.428Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/51/08/510cbdb69c25a96f4ae523f733cdc963ae654904e8db864c07585ef99875/psutil-7.2.2-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:2edccc433cbfa046b980b0df0171cd25bcaeb3a68fe9022db0979e7aa74a826b", size = 130595, upload-time = "2026-01-28T18:14:57.293Z" }, + { url = "https://files.pythonhosted.org/packages/d6/f5/97baea3fe7a5a9af7436301f85490905379b1c6f2dd51fe3ecf24b4c5fbf/psutil-7.2.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:e78c8603dcd9a04c7364f1a3e670cea95d51ee865e4efb3556a3a63adef958ea", size = 131082, upload-time = "2026-01-28T18:14:59.732Z" }, + { url = "https://files.pythonhosted.org/packages/37/d6/246513fbf9fa174af531f28412297dd05241d97a75911ac8febefa1a53c6/psutil-7.2.2-cp313-cp313t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1a571f2330c966c62aeda00dd24620425d4b0cc86881c89861fbc04549e5dc63", size = 181476, upload-time = "2026-01-28T18:15:01.884Z" }, + { url = "https://files.pythonhosted.org/packages/b8/b5/9182c9af3836cca61696dabe4fd1304e17bc56cb62f17439e1154f225dd3/psutil-7.2.2-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:917e891983ca3c1887b4ef36447b1e0873e70c933afc831c6b6da078ba474312", size = 184062, upload-time = "2026-01-28T18:15:04.436Z" }, + { url = "https://files.pythonhosted.org/packages/16/ba/0756dca669f5a9300d0cbcbfae9a4c30e446dfc7440ffe43ded5724bfd93/psutil-7.2.2-cp313-cp313t-win_amd64.whl", hash = "sha256:ab486563df44c17f5173621c7b198955bd6b613fb87c71c161f827d3fb149a9b", size = 139893, upload-time = "2026-01-28T18:15:06.378Z" }, + { url = "https://files.pythonhosted.org/packages/1c/61/8fa0e26f33623b49949346de05ec1ddaad02ed8ba64af45f40a147dbfa97/psutil-7.2.2-cp313-cp313t-win_arm64.whl", hash = "sha256:ae0aefdd8796a7737eccea863f80f81e468a1e4cf14d926bd9b6f5f2d5f90ca9", size = 135589, upload-time = "2026-01-28T18:15:08.03Z" }, + { url = "https://files.pythonhosted.org/packages/81/69/ef179ab5ca24f32acc1dac0c247fd6a13b501fd5534dbae0e05a1c48b66d/psutil-7.2.2-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:eed63d3b4d62449571547b60578c5b2c4bcccc5387148db46e0c2313dad0ee00", size = 130664, upload-time = "2026-01-28T18:15:09.469Z" }, + { url = "https://files.pythonhosted.org/packages/7b/64/665248b557a236d3fa9efc378d60d95ef56dd0a490c2cd37dafc7660d4a9/psutil-7.2.2-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:7b6d09433a10592ce39b13d7be5a54fbac1d1228ed29abc880fb23df7cb694c9", size = 131087, upload-time = "2026-01-28T18:15:11.724Z" }, + { url = "https://files.pythonhosted.org/packages/d5/2e/e6782744700d6759ebce3043dcfa661fb61e2fb752b91cdeae9af12c2178/psutil-7.2.2-cp314-cp314t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1fa4ecf83bcdf6e6c8f4449aff98eefb5d0604bf88cb883d7da3d8d2d909546a", size = 182383, upload-time = "2026-01-28T18:15:13.445Z" }, + { url = "https://files.pythonhosted.org/packages/57/49/0a41cefd10cb7505cdc04dab3eacf24c0c2cb158a998b8c7b1d27ee2c1f5/psutil-7.2.2-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e452c464a02e7dc7822a05d25db4cde564444a67e58539a00f929c51eddda0cf", size = 185210, upload-time = "2026-01-28T18:15:16.002Z" }, + { url = "https://files.pythonhosted.org/packages/dd/2c/ff9bfb544f283ba5f83ba725a3c5fec6d6b10b8f27ac1dc641c473dc390d/psutil-7.2.2-cp314-cp314t-win_amd64.whl", hash = "sha256:c7663d4e37f13e884d13994247449e9f8f574bc4655d509c3b95e9ec9e2b9dc1", size = 141228, upload-time = "2026-01-28T18:15:18.385Z" }, + { url = "https://files.pythonhosted.org/packages/f2/fc/f8d9c31db14fcec13748d373e668bc3bed94d9077dbc17fb0eebc073233c/psutil-7.2.2-cp314-cp314t-win_arm64.whl", hash = "sha256:11fe5a4f613759764e79c65cf11ebdf26e33d6dd34336f8a337aa2996d71c841", size = 136284, upload-time = "2026-01-28T18:15:19.912Z" }, + { url = "https://files.pythonhosted.org/packages/e7/36/5ee6e05c9bd427237b11b3937ad82bb8ad2752d72c6969314590dd0c2f6e/psutil-7.2.2-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:ed0cace939114f62738d808fdcecd4c869222507e266e574799e9c0faa17d486", size = 129090, upload-time = "2026-01-28T18:15:22.168Z" }, + { url = "https://files.pythonhosted.org/packages/80/c4/f5af4c1ca8c1eeb2e92ccca14ce8effdeec651d5ab6053c589b074eda6e1/psutil-7.2.2-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:1a7b04c10f32cc88ab39cbf606e117fd74721c831c98a27dc04578deb0c16979", size = 129859, upload-time = "2026-01-28T18:15:23.795Z" }, + { url = "https://files.pythonhosted.org/packages/b5/70/5d8df3b09e25bce090399cf48e452d25c935ab72dad19406c77f4e828045/psutil-7.2.2-cp36-abi3-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:076a2d2f923fd4821644f5ba89f059523da90dc9014e85f8e45a5774ca5bc6f9", size = 155560, upload-time = "2026-01-28T18:15:25.976Z" }, + { url = "https://files.pythonhosted.org/packages/63/65/37648c0c158dc222aba51c089eb3bdfa238e621674dc42d48706e639204f/psutil-7.2.2-cp36-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b0726cecd84f9474419d67252add4ac0cd9811b04d61123054b9fb6f57df6e9e", size = 156997, upload-time = "2026-01-28T18:15:27.794Z" }, + { url = "https://files.pythonhosted.org/packages/8e/13/125093eadae863ce03c6ffdbae9929430d116a246ef69866dad94da3bfbc/psutil-7.2.2-cp36-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:fd04ef36b4a6d599bbdb225dd1d3f51e00105f6d48a28f006da7f9822f2606d8", size = 148972, upload-time = "2026-01-28T18:15:29.342Z" }, + { url = "https://files.pythonhosted.org/packages/04/78/0acd37ca84ce3ddffaa92ef0f571e073faa6d8ff1f0559ab1272188ea2be/psutil-7.2.2-cp36-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:b58fabe35e80b264a4e3bb23e6b96f9e45a3df7fb7eed419ac0e5947c61e47cc", size = 148266, upload-time = "2026-01-28T18:15:31.597Z" }, + { url = "https://files.pythonhosted.org/packages/b4/90/e2159492b5426be0c1fef7acba807a03511f97c5f86b3caeda6ad92351a7/psutil-7.2.2-cp37-abi3-win_amd64.whl", hash = "sha256:eb7e81434c8d223ec4a219b5fc1c47d0417b12be7ea866e24fb5ad6e84b3d988", size = 137737, upload-time = "2026-01-28T18:15:33.849Z" }, + { url = "https://files.pythonhosted.org/packages/8c/c7/7bb2e321574b10df20cbde462a94e2b71d05f9bbda251ef27d104668306a/psutil-7.2.2-cp37-abi3-win_arm64.whl", hash = "sha256:8c233660f575a5a89e6d4cb65d9f938126312bca76d8fe087b947b3a1aaac9ee", size = 134617, upload-time = "2026-01-28T18:15:36.514Z" }, +] + [[package]] name = "pycparser" version = "3.0"