diff --git a/pyproject.toml b/pyproject.toml index 88cfeaa..02c516e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "python-multipart>=0.0.9", "anthropic>=0.102.0", "mcp>=1.27.1", + "psutil>=5.9", ] [project.scripts] diff --git a/src/claire/db.py b/src/claire/db.py index 574093f..fc47627 100644 --- a/src/claire/db.py +++ b/src/claire/db.py @@ -291,6 +291,28 @@ _MIGRATIONS: list[tuple[str, str]] = [ # reintroduce the old value. "", ), + ( + "0011_host_telemetry", + # Per-host CPU/mem/load/disk snapshot from the Linux `claire agent`. + # One row per host (last-write-wins by hlc). The partial index keeps + # `prune-telemetry` (DELETE old host_telemetry_reported events) fast. + """ + CREATE TABLE IF NOT EXISTS host_telemetry ( + host TEXT PRIMARY KEY, + cpu_percent REAL NOT NULL, + mem_used_bytes INTEGER NOT NULL, + mem_total_bytes INTEGER NOT NULL, + load_1 REAL NOT NULL, + load_5 REAL NOT NULL, + load_15 REAL NOT NULL, + disk_used_bytes INTEGER NOT NULL, + disk_total_bytes INTEGER NOT NULL, + updated_hlc TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS events_type_telemetry + ON events(event_type) WHERE event_type = 'host_telemetry_reported'; + """, + ), ] diff --git a/src/claire/events.py b/src/claire/events.py index 11de95f..be1f048 100644 --- a/src/claire/events.py +++ b/src/claire/events.py @@ -367,6 +367,28 @@ class DecisionRecorded(_Payload): task_id: UUID | None = None +# --------------------------------------------------------------------------- +# Host telemetry (migration 0011_host_telemetry). The Linux `claire agent` +# samples local CPU/mem/load/disk per interval and emits one of these; it +# syncs to plum and surfaces in `fleet_load` so dispatch sees real host +# capacity, not just live-session counts. Last-write-wins projection +# (one row per host) — safe to prune old samples. +# --------------------------------------------------------------------------- + + +class HostTelemetryReported(_Payload): + kind: Literal["host_telemetry_reported"] = "host_telemetry_reported" + host: str + cpu_percent: float + mem_used_bytes: int + mem_total_bytes: int + load_1: float + load_5: float + load_15: float + disk_used_bytes: int + disk_total_bytes: int + + EventPayload: TypeAlias = Annotated[ ProjectCreated | ProjectUpdated @@ -401,7 +423,8 @@ EventPayload: TypeAlias = Annotated[ | TaskMetaSet | AgentStatusReported | UsageRecorded - | DecisionRecorded, + | DecisionRecorded + | HostTelemetryReported, Field(discriminator="kind"), ] @@ -618,6 +641,7 @@ def replay(conn: sqlite3.Connection) -> int: "task_tags", "task_state_history", "agent_status", + "host_telemetry", "usage", "updates", "assignments", @@ -1098,6 +1122,29 @@ def _apply_payload(conn: sqlite3.Connection, hlc: HLC, payload: EventPayload) -> h, ), ) + case HostTelemetryReported(): + # One current-snapshot row per host; last-write-wins by hlc. + conn.execute( + "INSERT INTO host_telemetry " + "(host, cpu_percent, mem_used_bytes, mem_total_bytes, " + "load_1, load_5, load_15, disk_used_bytes, disk_total_bytes, updated_hlc) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + "ON CONFLICT(host) DO UPDATE SET " + "cpu_percent=excluded.cpu_percent, " + "mem_used_bytes=excluded.mem_used_bytes, " + "mem_total_bytes=excluded.mem_total_bytes, " + "load_1=excluded.load_1, load_5=excluded.load_5, " + "load_15=excluded.load_15, " + "disk_used_bytes=excluded.disk_used_bytes, " + "disk_total_bytes=excluded.disk_total_bytes, " + "updated_hlc=excluded.updated_hlc", + ( + payload.host, payload.cpu_percent, + payload.mem_used_bytes, payload.mem_total_bytes, + payload.load_1, payload.load_5, payload.load_15, + payload.disk_used_bytes, payload.disk_total_bytes, h, + ), + ) _fanout_system_chat(conn, h, payload) diff --git a/src/claire/web/service.py b/src/claire/web/service.py index 52742d9..2e4386b 100644 --- a/src/claire/web/service.py +++ b/src/claire/web/service.py @@ -720,6 +720,22 @@ def fleet_load(conn: sqlite3.Connection) -> dict: # the operator has registered but with no live sessions still appears # in the UI (otherwise it'd be invisible until something lands there). host_names = set(load.keys()) | {h.name for h in cfg.known_hosts} + # Latest CPU/mem/load/disk snapshot per host, reported by the Linux + # `claire agent` and synced in. `None` for hosts with no sample yet. + telemetry = { + row["host"]: { + "cpu_percent": row["cpu_percent"], + "mem_used_bytes": row["mem_used_bytes"], + "mem_total_bytes": row["mem_total_bytes"], + "load_1": row["load_1"], + "load_5": row["load_5"], + "load_15": row["load_15"], + "disk_used_bytes": row["disk_used_bytes"], + "disk_total_bytes": row["disk_total_bytes"], + "updated_hlc": row["updated_hlc"], + } + for row in conn.execute("SELECT * FROM host_telemetry").fetchall() + } return { # The default cap; per-host resolved caps are on each `hosts` entry. "per_host_max": limits.per_host_max, @@ -731,6 +747,7 @@ def fleet_load(conn: sqlite3.Connection) -> dict: "live_sessions": load.get(h, 0), "cap": limits.cap_for(h), "has_capacity": load.get(h, 0) < limits.cap_for(h), + "telemetry": telemetry.get(h), } for h in sorted(host_names) ], diff --git a/tests/test_telemetry_event.py b/tests/test_telemetry_event.py new file mode 100644 index 0000000..2446e68 --- /dev/null +++ b/tests/test_telemetry_event.py @@ -0,0 +1,66 @@ +"""HostTelemetryReported event + host_telemetry projection (migration 0011). + +The Linux `claire agent` emits these; they sync to plum and surface in +fleet_load. The projection is one row per host, last-write-wins by hlc. +""" + +from __future__ import annotations + +from claire import events as ev +from claire.db import migrate, open_db +from claire.hlc import HLCGenerator + + +def _conn(): + conn = open_db(":memory:") + migrate(conn) + return conn + + +def _sample(host: str, cpu: float) -> ev.HostTelemetryReported: + return ev.HostTelemetryReported( + host=host, + cpu_percent=cpu, + mem_used_bytes=8_000_000_000, + mem_total_bytes=16_000_000_000, + load_1=1.5, + load_5=1.2, + load_15=0.9, + disk_used_bytes=100_000_000_000, + disk_total_bytes=500_000_000_000, + ) + + +def test_telemetry_projection_upserts() -> None: + conn = _conn() + gen = HLCGenerator("test-machine") + ev.append(conn, gen, _sample("apricot", 12.5)) + row = conn.execute( + "SELECT host, cpu_percent FROM host_telemetry WHERE host = 'apricot'" + ).fetchone() + assert row is not None + assert row["cpu_percent"] == 12.5 + + +def test_telemetry_last_write_wins_one_row_per_host() -> None: + conn = _conn() + gen = HLCGenerator("test-machine") + ev.append(conn, gen, _sample("apricot", 10.0)) + ev.append(conn, gen, _sample("apricot", 99.0)) + rows = conn.execute( + "SELECT cpu_percent FROM host_telemetry WHERE host = 'apricot'" + ).fetchall() + assert len(rows) == 1 # upsert, not append + assert rows[0]["cpu_percent"] == 99.0 # latest wins + + +def test_telemetry_survives_replay() -> None: + """Projection rebuilds identically from the event log.""" + conn = _conn() + gen = HLCGenerator("test-machine") + ev.append(conn, gen, _sample("black", 42.0)) + ev.replay(conn) + row = conn.execute( + "SELECT cpu_percent FROM host_telemetry WHERE host = 'black'" + ).fetchone() + assert row is not None and row["cpu_percent"] == 42.0