feat(orchestrator): ✨ add proactive fleet scanning loop
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
parent
9f9c1f0cd5
commit
474acb7b48
2 changed files with 52 additions and 0 deletions
|
|
@ -49,6 +49,11 @@ class OrchestratorConfig(_Strict):
|
|||
# caller would resolve to the wrong account.
|
||||
cwd: str | None = None
|
||||
reply_timeout_s: int = Field(default=180, ge=1, le=3600)
|
||||
# Periodic-rounds scheduler. When > 0 the supervisor posts a "do rounds"
|
||||
# prompt to the orchestrator every N seconds so it proactively scans
|
||||
# the fleet, identifies blockers, and surfaces next-step recommendations
|
||||
# — closing the loop on "reduce required user interaction." 0 disables.
|
||||
rounds_interval_s: int = Field(default=0, ge=0, le=86400)
|
||||
|
||||
|
||||
class PeerConfig(_Strict):
|
||||
|
|
@ -126,6 +131,8 @@ def _serialize(cfg: ClareConfig) -> str:
|
|||
if orch.cwd is not None:
|
||||
lines.append(f'cwd = "{orch.cwd}"')
|
||||
lines.append(f"reply_timeout_s = {orch.reply_timeout_s}")
|
||||
if orch.rounds_interval_s != 0:
|
||||
lines.append(f"rounds_interval_s = {orch.rounds_interval_s}")
|
||||
for peer in cfg.peers:
|
||||
lines.append("")
|
||||
lines.append("[[peers]]")
|
||||
|
|
|
|||
|
|
@ -98,14 +98,59 @@ def create_app(
|
|||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("orchestrator heartbeat raised: %s", exc)
|
||||
|
||||
async def _rounds_loop() -> None:
|
||||
"""Periodic fleet-rounds. Posts a `DO ROUNDS` prompt to the
|
||||
orchestrator on a cadence so Clare proactively scans the fleet,
|
||||
flags blockers, and surfaces next-step recommendations without
|
||||
waiting on a user turn. Disabled when rounds_interval_s == 0.
|
||||
"""
|
||||
from ..config import load_or_init
|
||||
from ..db import migrate, open_db
|
||||
from ..hlc import HLCGenerator
|
||||
from ..domain import ChatScope
|
||||
from .chat.handler import handle_input
|
||||
|
||||
cfg = load_or_init(config_path)
|
||||
interval = cfg.orchestrator.rounds_interval_s
|
||||
if interval <= 0:
|
||||
return # disabled
|
||||
logger.info("orchestrator rounds loop enabled (every %ds)", interval)
|
||||
prompt = (
|
||||
"DO ROUNDS: Call list_fleet, list_recent_events, then summarize "
|
||||
"(1) what each active agent is working on, (2) any session that "
|
||||
"looks stuck/idle/blocked, (3) up to 3 concrete next-step "
|
||||
"recommendations. Reply in 8 short lines max. Before replying, "
|
||||
"call report_status with your own one-line summary."
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(interval)
|
||||
def _post() -> None:
|
||||
conn = open_db(db_path)
|
||||
migrate(conn)
|
||||
gen = HLCGenerator(cfg.machine_id)
|
||||
handle_input(
|
||||
conn, gen,
|
||||
scope=ChatScope.ORCHESTRATOR, scope_ref=None,
|
||||
body=prompt,
|
||||
)
|
||||
conn.close()
|
||||
await asyncio.to_thread(_post)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("rounds loop tick raised: %s", exc)
|
||||
|
||||
# Enter the MCP sub-app's lifespan so its streamable HTTP task
|
||||
# group is started before any request lands.
|
||||
async with mcp_app.router.lifespan_context(mcp_app):
|
||||
initial = asyncio.create_task(_bootstrap_once())
|
||||
heartbeat = asyncio.create_task(_heartbeat())
|
||||
rounds = asyncio.create_task(_rounds_loop())
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
rounds.cancel()
|
||||
heartbeat.cancel()
|
||||
initial.cancel()
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue