session-tools/bin/_claude-triage

334 lines
12 KiB
Text
Raw Normal View History

#!/usr/bin/env python3
"""rclaude triage helper — Haiku-powered per-session summary + prioritization.
Iterates ~/.claude/projects/, builds a compressed transcript per session, and
runs them through claude-code-batch-sdk (Haiku) with content-addressable
disk caching. Sessions whose mtime + transcript fingerprint haven't changed
are served from cache.
Output (TSV, one row per session, sorted by priority desc / mtime desc):
<mtime>\\t<uuid>\\t<cwd>\\t<priority>\\t<status>\\t<summary>\\t<next_action>
Env tuning:
RCLAUDE_TRIAGE_MODEL Claude model (default: haiku)
RCLAUDE_TRIAGE_CONCURRENT Concurrent claude subprocesses (default: 4)
RCLAUDE_TRIAGE_BATCH Sessions per CLI call (default: 8)
RCLAUDE_TRIAGE_LIMIT Max sessions to consider (default: 100)
RCLAUDE_TRIAGE_CTX_BYTES Bytes of transcript per session (default: 3000)
CLI flags:
--limit N override session cap
--uuids U... restrict to specific session UUIDs (prefix match ok)
--refresh bypass cache for selected sessions
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import signal
import sys
from pathlib import Path
# Allow piped output to truncate without traceback.
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
try:
from claude_code_batch_sdk import (
ClaudeClient,
GenerationItem,
ResponseCache,
run_batched,
)
except ImportError as e:
print(
f"_claude-triage: claude-code-batch-sdk not installed for this python ({sys.executable}): {e}",
file=sys.stderr,
)
sys.exit(2)
ROOT = Path.home() / ".claude" / "projects"
CACHE_DIR = Path.home() / ".claude" / ".cache" / "rclaude-triage"
MODEL = os.environ.get("RCLAUDE_TRIAGE_MODEL", "haiku")
MAX_CONCURRENT = int(os.environ.get("RCLAUDE_TRIAGE_CONCURRENT", "4"))
BATCH_SIZE = int(os.environ.get("RCLAUDE_TRIAGE_BATCH", "8"))
CTX_PER_SESSION = int(os.environ.get("RCLAUDE_TRIAGE_CTX_BYTES", "3000"))
SYSTEM_PREFIXES = (
"<command-name>", "<command-message>", "<system-reminder>", "<local-command-",
"Caveat:", "<bash-input>", "<bash-stdout>", "[task-persistence]", "[tts-state]",
"This session is being continued", "Please continue", "<task-notification>",
"[Request interrupted",
)
def is_system_user(text: str) -> bool:
s = (text or "").lstrip()
return not s or s.startswith(SYSTEM_PREFIXES)
def get_text(entry: dict) -> str:
msg = entry.get("message") or {}
content = msg.get("content")
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for block in content:
if not isinstance(block, dict):
continue
t = block.get("type")
if t == "text":
parts.append(block.get("text", ""))
elif t == "tool_use":
parts.append(f"[tool:{block.get('name','?')}]")
return " ".join(parts)
return ""
def read_cwd_only(jsonl: Path) -> str:
"""Cheap version of compress_session that stops at the first cwd field."""
try:
with jsonl.open(encoding="utf-8", errors="replace") as f:
for line in f:
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if entry.get("cwd"):
return entry["cwd"]
except OSError:
return ""
return ""
def compress_session(jsonl: Path) -> tuple[str, str, str]:
"""Return (cwd, first_user_text, transcript_excerpt)."""
first_user = ""
cwd = ""
turns: list[str] = []
try:
with jsonl.open(encoding="utf-8", errors="replace") as f:
for line in f:
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if not cwd and entry.get("cwd"):
cwd = entry["cwd"]
role = (entry.get("message") or {}).get("role") or entry.get("type")
if role not in ("user", "assistant"):
continue
text = get_text(entry)
if not text.strip():
continue
if role == "user" and is_system_user(text):
continue
flat = re.sub(r"\s+", " ", text).strip()
if role == "user" and not first_user:
first_user = flat[:400]
turns.append(f"[{role}] {flat[:300]}")
except OSError:
return ("", "", "")
# Keep recent turns; head-truncate to budget.
transcript = "\n".join(turns[-20:])[-CTX_PER_SESSION:]
return (cwd, first_user, transcript)
SYSTEM_PROMPT = """You triage Claude Code coding sessions. For each session in the input batch, emit one JSON object with these fields:
- ref_index: integer matching the input's ref_index
- summary: ONE short sentence describing what's happening
- status: one of done, in_progress, blocked, waiting_on_user, abandoned
- priority: integer 0-4 (0 = critical to resume now, 4 = abandonable). Follows the P0/P1 incident convention — LOWER number means HIGHER priority.
- next_action: ONE short imperative phrase, or empty string if status is done/abandoned
Output ONLY a JSON array. No markdown, no prose."""
def build_prompt(batch: list[GenerationItem]) -> str:
parts = ["Triage these sessions. Respond with JSON array.\n"]
for i, item in enumerate(batch):
m = item.metadata
parts.append(f"\n=== ref_index={i} ===")
parts.append(f"INITIAL REQUEST: {m['first_user']}")
parts.append("RECENT TRANSCRIPT:")
parts.append(m["transcript"])
parts.append(
'\n\nReply with: [{"ref_index": 0, "summary": "...", '
'"status": "...", "priority": N, "next_action": "..."}, ...]'
)
return "\n".join(parts)
def validate(result: dict) -> bool:
if not isinstance(result, dict):
return False
if not all(k in result for k in ("summary", "status", "priority", "next_action")):
return False
try:
int(result["priority"])
except (TypeError, ValueError):
return False
return True
def enrich(result: dict, item: GenerationItem) -> dict:
return {
**result,
"uuid": item.metadata["uuid"],
"cwd": item.metadata["cwd"],
"mtime": item.metadata["mtime"],
}
def collect_candidates(limit: int) -> list[tuple[int, Path]]:
if not ROOT.is_dir():
return []
candidates: list[tuple[int, Path]] = []
for project_dir in ROOT.iterdir():
if not project_dir.is_dir():
continue
for jsonl in project_dir.glob("*.jsonl"):
try:
mtime = int(jsonl.stat().st_mtime)
except OSError:
continue
candidates.append((mtime, jsonl))
candidates.sort(key=lambda x: x[0], reverse=True)
return candidates[:limit]
async def main_async(args: argparse.Namespace) -> None:
candidates = collect_candidates(args.limit)
if args.uuids:
wanted = list(args.uuids)
candidates = [
(m, j) for (m, j) in candidates
if any(j.stem == u or j.stem.startswith(u) for u in wanted)
]
if not candidates:
return
# Cache key is (uuid, mtime) only — mtime is sufficient since a session's
# content can't change without its mtime advancing, and this lets us check
# the cache BEFORE reading the JSONL (the expensive part for a triage run
# over hundreds of sessions where most are unchanged).
cache = ResponseCache(CACHE_DIR)
# v2: priority scale flipped to P0=critical / P4=abandonable convention.
template_id = "rclaude-triage-v2"
cached_results: list[dict] = []
uncached: list[tuple[int, Path, str]] = []
for mtime, jsonl in candidates:
cache_key = f"{jsonl.stem}:{mtime}"
hit = None if args.refresh else cache.get(template_id, cache_key)
if hit is not None:
# The SDK cache stores the raw LLM response (priority/status/
# summary/next_action); enrichment (uuid/cwd/mtime) is added at
# read time by the SDK. Since we bypass run_batched for cached
# items, re-enrich here. cwd needs a tiny disk read (first cwd
# line of the jsonl) but no LLM call.
cwd = hit.get("cwd") or read_cwd_only(jsonl)
if not cwd:
# Couldn't determine cwd → skip; rclaude can't resume without it.
continue
cached_results.append({
**hit,
"uuid": hit.get("uuid") or jsonl.stem,
"mtime": hit.get("mtime") or mtime,
"cwd": cwd,
})
continue
uncached.append((mtime, jsonl, cache_key))
if args.refresh:
for _, _, ck in uncached:
key_hash = cache._hash_key(template_id, ck)
path = cache._cache_path(template_id, key_hash)
if path.exists():
try: path.unlink()
except OSError: pass
items: list[GenerationItem] = []
for mtime, jsonl, cache_key in uncached:
cwd, first_user, transcript = compress_session(jsonl)
if not cwd:
continue
items.append(
GenerationItem(
template_id=template_id,
cache_key=cache_key,
metadata={
"uuid": jsonl.stem,
"cwd": cwd,
"mtime": mtime,
"first_user": first_user,
"transcript": transcript,
},
)
)
results = list(cached_results)
if items:
client = ClaudeClient(model=MODEL, max_concurrent=MAX_CONCURRENT)
try:
new_results = await run_batched(
client=client,
cache=cache,
items=items,
system_prompt=SYSTEM_PROMPT,
build_batch_prompt=build_prompt,
validate_result=validate,
enrich_result=enrich,
index_key="ref_index",
batch_size=BATCH_SIZE,
description="rclaude-triage",
)
finally:
await client.close()
results.extend(new_results)
# Lower priority number = higher importance (P0 convention). Sort
# ascending by priority, descending by mtime so the most-recent within
# each priority bucket floats up.
def sort_key(r: dict) -> tuple[int, int]:
try:
prio = int(r.get("priority", 9))
except (TypeError, ValueError):
prio = 9
return (prio, -int(r.get("mtime", 0)))
for r in sorted(results, key=sort_key):
line = "\t".join(
[
str(r.get("mtime", 0)),
str(r.get("uuid", "")),
str(r.get("cwd", "")),
str(r.get("priority", 0)),
str(r.get("status", "")),
re.sub(r"\s+", " ", str(r.get("summary", "")))[:200],
re.sub(r"\s+", " ", str(r.get("next_action", "")))[:200],
]
)
print(line)
print(f"# cache {cache.stats_summary()}", file=sys.stderr)
def main() -> None:
ap = argparse.ArgumentParser(description="Triage Claude Code sessions with Haiku.")
ap.add_argument(
"--limit",
type=int,
default=int(os.environ.get("RCLAUDE_TRIAGE_LIMIT", "100")),
help="Max sessions to consider (default: 100, env: RCLAUDE_TRIAGE_LIMIT)",
)
ap.add_argument("--uuids", nargs="*", help="Restrict to specific UUIDs (prefix ok)")
ap.add_argument("--refresh", action="store_true", help="Bypass cache")
args = ap.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()