From 474acb7b48ee3bb51580a5433532aeae56db5972 Mon Sep 17 00:00:00 2001 From: Natalie Date: Wed, 20 May 2026 03:29:47 -0700 Subject: [PATCH] =?UTF-8?q?feat(orchestrator):=20=E2=9C=A8=20add=20proacti?= =?UTF-8?q?ve=20fleet=20scanning=20loop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- src/clare/config.py | 7 +++++++ src/clare/web/app.py | 45 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/src/clare/config.py b/src/clare/config.py index 26840c1..52f7eaf 100644 --- a/src/clare/config.py +++ b/src/clare/config.py @@ -49,6 +49,11 @@ class OrchestratorConfig(_Strict): # caller would resolve to the wrong account. cwd: str | None = None reply_timeout_s: int = Field(default=180, ge=1, le=3600) + # Periodic-rounds scheduler. When > 0 the supervisor posts a "do rounds" + # prompt to the orchestrator every N seconds so it proactively scans + # the fleet, identifies blockers, and surfaces next-step recommendations + # — closing the loop on "reduce required user interaction." 0 disables. + rounds_interval_s: int = Field(default=0, ge=0, le=86400) class PeerConfig(_Strict): @@ -126,6 +131,8 @@ def _serialize(cfg: ClareConfig) -> str: if orch.cwd is not None: lines.append(f'cwd = "{orch.cwd}"') lines.append(f"reply_timeout_s = {orch.reply_timeout_s}") + if orch.rounds_interval_s != 0: + lines.append(f"rounds_interval_s = {orch.rounds_interval_s}") for peer in cfg.peers: lines.append("") lines.append("[[peers]]") diff --git a/src/clare/web/app.py b/src/clare/web/app.py index dd117d1..2c64322 100644 --- a/src/clare/web/app.py +++ b/src/clare/web/app.py @@ -98,14 +98,59 @@ def create_app( except Exception as exc: # noqa: BLE001 logger.warning("orchestrator heartbeat raised: %s", exc) + async def _rounds_loop() -> None: + """Periodic fleet-rounds. Posts a `DO ROUNDS` prompt to the + orchestrator on a cadence so Clare proactively scans the fleet, + flags blockers, and surfaces next-step recommendations without + waiting on a user turn. Disabled when rounds_interval_s == 0. + """ + from ..config import load_or_init + from ..db import migrate, open_db + from ..hlc import HLCGenerator + from ..domain import ChatScope + from .chat.handler import handle_input + + cfg = load_or_init(config_path) + interval = cfg.orchestrator.rounds_interval_s + if interval <= 0: + return # disabled + logger.info("orchestrator rounds loop enabled (every %ds)", interval) + prompt = ( + "DO ROUNDS: Call list_fleet, list_recent_events, then summarize " + "(1) what each active agent is working on, (2) any session that " + "looks stuck/idle/blocked, (3) up to 3 concrete next-step " + "recommendations. Reply in 8 short lines max. Before replying, " + "call report_status with your own one-line summary." + ) + while True: + try: + await asyncio.sleep(interval) + def _post() -> None: + conn = open_db(db_path) + migrate(conn) + gen = HLCGenerator(cfg.machine_id) + handle_input( + conn, gen, + scope=ChatScope.ORCHESTRATOR, scope_ref=None, + body=prompt, + ) + conn.close() + await asyncio.to_thread(_post) + except asyncio.CancelledError: + return + except Exception as exc: # noqa: BLE001 + logger.warning("rounds loop tick raised: %s", exc) + # Enter the MCP sub-app's lifespan so its streamable HTTP task # group is started before any request lands. async with mcp_app.router.lifespan_context(mcp_app): initial = asyncio.create_task(_bootstrap_once()) heartbeat = asyncio.create_task(_heartbeat()) + rounds = asyncio.create_task(_rounds_loop()) try: yield finally: + rounds.cancel() heartbeat.cancel() initial.cancel()