feat(@projects/@claire): add host telemetry migration and reporting

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Natalie 2026-05-31 18:08:43 -06:00
parent 8d8c1e32e9
commit bda5dad83d
5 changed files with 154 additions and 1 deletions

View file

@ -15,6 +15,7 @@ dependencies = [
"python-multipart>=0.0.9",
"anthropic>=0.102.0",
"mcp>=1.27.1",
"psutil>=5.9",
]
[project.scripts]

View file

@ -291,6 +291,28 @@ _MIGRATIONS: list[tuple[str, str]] = [
# reintroduce the old value.
"<sentinel:apply_0010_role_clare_to_claire>",
),
(
"0011_host_telemetry",
# Per-host CPU/mem/load/disk snapshot from the Linux `claire agent`.
# One row per host (last-write-wins by hlc). The partial index keeps
# `prune-telemetry` (DELETE old host_telemetry_reported events) fast.
"""
CREATE TABLE IF NOT EXISTS host_telemetry (
host TEXT PRIMARY KEY,
cpu_percent REAL NOT NULL,
mem_used_bytes INTEGER NOT NULL,
mem_total_bytes INTEGER NOT NULL,
load_1 REAL NOT NULL,
load_5 REAL NOT NULL,
load_15 REAL NOT NULL,
disk_used_bytes INTEGER NOT NULL,
disk_total_bytes INTEGER NOT NULL,
updated_hlc TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS events_type_telemetry
ON events(event_type) WHERE event_type = 'host_telemetry_reported';
""",
),
]

View file

@ -367,6 +367,28 @@ class DecisionRecorded(_Payload):
task_id: UUID | None = None
# ---------------------------------------------------------------------------
# Host telemetry (migration 0011_host_telemetry). The Linux `claire agent`
# samples local CPU/mem/load/disk per interval and emits one of these; it
# syncs to plum and surfaces in `fleet_load` so dispatch sees real host
# capacity, not just live-session counts. Last-write-wins projection
# (one row per host) — safe to prune old samples.
# ---------------------------------------------------------------------------
class HostTelemetryReported(_Payload):
kind: Literal["host_telemetry_reported"] = "host_telemetry_reported"
host: str
cpu_percent: float
mem_used_bytes: int
mem_total_bytes: int
load_1: float
load_5: float
load_15: float
disk_used_bytes: int
disk_total_bytes: int
EventPayload: TypeAlias = Annotated[
ProjectCreated
| ProjectUpdated
@ -401,7 +423,8 @@ EventPayload: TypeAlias = Annotated[
| TaskMetaSet
| AgentStatusReported
| UsageRecorded
| DecisionRecorded,
| DecisionRecorded
| HostTelemetryReported,
Field(discriminator="kind"),
]
@ -618,6 +641,7 @@ def replay(conn: sqlite3.Connection) -> int:
"task_tags",
"task_state_history",
"agent_status",
"host_telemetry",
"usage",
"updates",
"assignments",
@ -1098,6 +1122,29 @@ def _apply_payload(conn: sqlite3.Connection, hlc: HLC, payload: EventPayload) ->
h,
),
)
case HostTelemetryReported():
# One current-snapshot row per host; last-write-wins by hlc.
conn.execute(
"INSERT INTO host_telemetry "
"(host, cpu_percent, mem_used_bytes, mem_total_bytes, "
"load_1, load_5, load_15, disk_used_bytes, disk_total_bytes, updated_hlc) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "
"ON CONFLICT(host) DO UPDATE SET "
"cpu_percent=excluded.cpu_percent, "
"mem_used_bytes=excluded.mem_used_bytes, "
"mem_total_bytes=excluded.mem_total_bytes, "
"load_1=excluded.load_1, load_5=excluded.load_5, "
"load_15=excluded.load_15, "
"disk_used_bytes=excluded.disk_used_bytes, "
"disk_total_bytes=excluded.disk_total_bytes, "
"updated_hlc=excluded.updated_hlc",
(
payload.host, payload.cpu_percent,
payload.mem_used_bytes, payload.mem_total_bytes,
payload.load_1, payload.load_5, payload.load_15,
payload.disk_used_bytes, payload.disk_total_bytes, h,
),
)
_fanout_system_chat(conn, h, payload)

View file

@ -720,6 +720,22 @@ def fleet_load(conn: sqlite3.Connection) -> dict:
# the operator has registered but with no live sessions still appears
# in the UI (otherwise it'd be invisible until something lands there).
host_names = set(load.keys()) | {h.name for h in cfg.known_hosts}
# Latest CPU/mem/load/disk snapshot per host, reported by the Linux
# `claire agent` and synced in. `None` for hosts with no sample yet.
telemetry = {
row["host"]: {
"cpu_percent": row["cpu_percent"],
"mem_used_bytes": row["mem_used_bytes"],
"mem_total_bytes": row["mem_total_bytes"],
"load_1": row["load_1"],
"load_5": row["load_5"],
"load_15": row["load_15"],
"disk_used_bytes": row["disk_used_bytes"],
"disk_total_bytes": row["disk_total_bytes"],
"updated_hlc": row["updated_hlc"],
}
for row in conn.execute("SELECT * FROM host_telemetry").fetchall()
}
return {
# The default cap; per-host resolved caps are on each `hosts` entry.
"per_host_max": limits.per_host_max,
@ -731,6 +747,7 @@ def fleet_load(conn: sqlite3.Connection) -> dict:
"live_sessions": load.get(h, 0),
"cap": limits.cap_for(h),
"has_capacity": load.get(h, 0) < limits.cap_for(h),
"telemetry": telemetry.get(h),
}
for h in sorted(host_names)
],

View file

@ -0,0 +1,66 @@
"""HostTelemetryReported event + host_telemetry projection (migration 0011).
The Linux `claire agent` emits these; they sync to plum and surface in
fleet_load. The projection is one row per host, last-write-wins by hlc.
"""
from __future__ import annotations
from claire import events as ev
from claire.db import migrate, open_db
from claire.hlc import HLCGenerator
def _conn():
conn = open_db(":memory:")
migrate(conn)
return conn
def _sample(host: str, cpu: float) -> ev.HostTelemetryReported:
return ev.HostTelemetryReported(
host=host,
cpu_percent=cpu,
mem_used_bytes=8_000_000_000,
mem_total_bytes=16_000_000_000,
load_1=1.5,
load_5=1.2,
load_15=0.9,
disk_used_bytes=100_000_000_000,
disk_total_bytes=500_000_000_000,
)
def test_telemetry_projection_upserts() -> None:
conn = _conn()
gen = HLCGenerator("test-machine")
ev.append(conn, gen, _sample("apricot", 12.5))
row = conn.execute(
"SELECT host, cpu_percent FROM host_telemetry WHERE host = 'apricot'"
).fetchone()
assert row is not None
assert row["cpu_percent"] == 12.5
def test_telemetry_last_write_wins_one_row_per_host() -> None:
conn = _conn()
gen = HLCGenerator("test-machine")
ev.append(conn, gen, _sample("apricot", 10.0))
ev.append(conn, gen, _sample("apricot", 99.0))
rows = conn.execute(
"SELECT cpu_percent FROM host_telemetry WHERE host = 'apricot'"
).fetchall()
assert len(rows) == 1 # upsert, not append
assert rows[0]["cpu_percent"] == 99.0 # latest wins
def test_telemetry_survives_replay() -> None:
"""Projection rebuilds identically from the event log."""
conn = _conn()
gen = HLCGenerator("test-machine")
ev.append(conn, gen, _sample("black", 42.0))
ev.replay(conn)
row = conn.execute(
"SELECT cpu_percent FROM host_telemetry WHERE host = 'black'"
).fetchone()
assert row is not None and row["cpu_percent"] == 42.0