feat(@projects): add pm expansion tools

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Natalie 2026-05-18 22:49:33 -07:00
parent 3d24384340
commit bc894b550e
5 changed files with 1148 additions and 4 deletions

View file

@ -271,6 +271,122 @@ def build_server() -> FastMCP:
lambda c, _g: tools.suggest_assignments(c),
)
# PM expansion tools (11) ----------------------------------------------
@mcp.tool()
async def create_org(name: str, description: str | None = None) -> dict[str, Any]:
"""Create a new org (top-level grouping above projects)."""
return await _call_tool(
"create_org", {"name": name, "description": description},
lambda c, g: tools.create_org(c, g, name=name, description=description),
)
@mcp.tool()
async def create_person(
handle: str, display_name: str, email: str | None = None,
) -> dict[str, Any]:
"""Create a person. Handle must start with `@`."""
return await _call_tool(
"create_person", {"handle": handle, "display_name": display_name, "email": email},
lambda c, g: tools.create_person(c, g, handle=handle, display_name=display_name, email=email),
)
@mcp.tool()
async def create_epic(
project: str, title: str, description: str | None = None,
) -> dict[str, Any]:
"""Create an epic (parent of tasks) inside a project."""
return await _call_tool(
"create_epic", {"project": project, "title": title, "description": description},
lambda c, g: tools.create_epic(c, g, project=project, title=title, description=description),
)
@mcp.tool()
async def archive_epic(epic_id: str) -> dict[str, Any]:
"""Archive an epic. Tasks under it have their epic_id cleared (soft cascade)."""
return await _call_tool(
"archive_epic", {"epic_id": epic_id},
lambda c, g: tools.archive_epic(c, g, epic_id=epic_id),
)
@mcp.tool()
async def create_tag(name: str, color: str | None = None) -> dict[str, Any]:
"""Create a tag (many-to-many label)."""
return await _call_tool(
"create_tag", {"name": name, "color": color},
lambda c, g: tools.create_tag(c, g, name=name, color=color),
)
@mcp.tool()
async def transition_task_state(
task_id: str, to_state: str, reason: str | None = None,
) -> dict[str, Any]:
"""Move a task between states. Rejects illegal transitions.
Legal transitions: todoin_progressdone, in_progress{blocked,review},
blocked/reviewin_progress, reviewdone, donein_progress.
"""
return await _call_tool(
"transition_task_state",
{"task_id": task_id, "to_state": to_state, "reason": reason},
lambda c, g: tools.transition_task_state(
c, g, task_id=task_id, to_state=to_state, reason=reason,
),
)
@mcp.tool()
async def tag_task(task_id: str, tag_name: str) -> dict[str, Any]:
"""Tag a task (auto-creates tag if missing). Idempotent."""
return await _call_tool(
"tag_task", {"task_id": task_id, "tag_name": tag_name},
lambda c, g: tools.tag_task(c, g, task_id=task_id, tag_name=tag_name),
)
@mcp.tool()
async def untag_task(task_id: str, tag_name: str) -> dict[str, Any]:
"""Remove a tag from a task."""
return await _call_tool(
"untag_task", {"task_id": task_id, "tag_name": tag_name},
lambda c, g: tools.untag_task(c, g, task_id=task_id, tag_name=tag_name),
)
@mcp.tool()
async def set_task_owner(
task_id: str, person_handle: str | None = None,
) -> dict[str, Any]:
"""Set a task's owner by handle. Pass null to unassign."""
return await _call_tool(
"set_task_owner", {"task_id": task_id, "person_handle": person_handle},
lambda c, g: tools.set_task_owner(c, g, task_id=task_id, person_handle=person_handle),
)
@mcp.tool()
async def set_task_type(
task_id: str, task_type: str | None = None,
) -> dict[str, Any]:
"""Set task type (bug/feature/chore/research/spike) or null to clear."""
return await _call_tool(
"set_task_type", {"task_id": task_id, "task_type": task_type},
lambda c, g: tools.set_task_type(c, g, task_id=task_id, task_type=task_type),
)
@mcp.tool()
async def set_task_meta(
task_id: str,
color: str | None = None,
emoji: str | None = None,
apply_color_rule: bool | None = None,
) -> dict[str, Any]:
"""Set task display metadata. Each null field is left unchanged."""
return await _call_tool(
"set_task_meta",
{"task_id": task_id, "color": color, "emoji": emoji,
"apply_color_rule": apply_color_rule},
lambda c, g: tools.set_task_meta(
c, g, task_id=task_id, color=color,
emoji=emoji, apply_color_rule=apply_color_rule,
),
)
# Send tool -------------------------------------------------------------
@mcp.tool()
async def send_to_session(session_ref: str, text: str) -> dict[str, Any]:

View file

@ -25,7 +25,7 @@ from typing import Any
from uuid import UUID
from .. import read, scheduler
from ..domain import ChatScope, TaskStatus
from ..domain import ChatScope, TaskStatus, TaskType
from ..hlc import HLCGenerator
from ..web import service
@ -199,6 +199,18 @@ def help_text() -> dict[str, Any]:
{"name": "suggest_assignments", "summary": "Suggest unassigned-task → session pairs (M5)."},
{"name": "send_to_session", "summary": "Send text to one specific session."},
{"name": "submit_chat_reply", "summary": "Signal the user-turn is done."},
# PM expansion
{"name": "create_org", "summary": "Create an org (above projects)."},
{"name": "create_person", "summary": "Create a person (assignee identity)."},
{"name": "create_epic", "summary": "Create an epic (parent of tasks)."},
{"name": "archive_epic", "summary": "Archive an epic; tasks have epic_id cleared."},
{"name": "create_tag", "summary": "Create a tag (m:n label)."},
{"name": "transition_task_state", "summary": "Move a task between states (validated)."},
{"name": "tag_task", "summary": "Attach a tag to a task (auto-creates)."},
{"name": "untag_task", "summary": "Remove a tag from a task."},
{"name": "set_task_owner", "summary": "Set task owner by @handle (or unassign)."},
{"name": "set_task_type", "summary": "Set task type (bug/feature/chore/research/spike)."},
{"name": "set_task_meta", "summary": "Set task color/emoji/apply_color_rule."},
],
}
@ -356,12 +368,199 @@ def suggest_assignments(conn: sqlite3.Connection) -> dict[str, Any]:
return scheduler.suggest_assignments(conn)
# ---------------------------------------------------------------------------
# PM expansion (migration 0003_pm). Each tool mirrors a service mutator.
# ---------------------------------------------------------------------------
def create_org(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
name: str,
description: str | None = None,
) -> dict[str, Any]:
org = _wrap_service(service.create_org, conn=conn, gen=gen, name=name, description=description)
return org.model_dump(mode="json")
def create_person(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
handle: str,
display_name: str,
email: str | None = None,
) -> dict[str, Any]:
person = _wrap_service(
service.create_person, conn=conn, gen=gen,
handle=handle, display_name=display_name, email=email,
)
return person.model_dump(mode="json")
def create_epic(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
project: str,
title: str,
description: str | None = None,
) -> dict[str, Any]:
epic = _wrap_service(
service.create_epic, conn=conn, gen=gen,
project=project, title=title, description=description,
)
return epic.model_dump(mode="json")
def archive_epic(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
epic_id: str,
) -> dict[str, Any]:
epic = _wrap_service(
service.archive_epic, conn=conn, gen=gen, epic_id=UUID(epic_id),
)
return epic.model_dump(mode="json")
def create_tag(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
name: str,
color: str | None = None,
) -> dict[str, Any]:
tag = _wrap_service(
service.create_tag, conn=conn, gen=gen, name=name, color=color,
)
return tag.model_dump(mode="json")
def transition_task_state(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: str,
to_state: str,
reason: str | None = None,
) -> dict[str, Any]:
"""Move a task between states. `to_state` is the enum value."""
try:
state_enum = TaskStatus(to_state)
except ValueError as exc:
raise ToolError(f"invalid state: {to_state!r}") from exc
task = _wrap_service(
service.transition_task_state, conn=conn, gen=gen,
task_id=UUID(task_id), to_state=state_enum, reason=reason,
)
return task.model_dump(mode="json")
def tag_task(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: str,
tag_name: str,
) -> dict[str, Any]:
"""Attach a tag to a task (tag is auto-created if it doesn't exist)."""
tag = _wrap_service(service.get_or_create_tag, conn=conn, gen=gen, name=tag_name)
_wrap_service(
service.tag_task, conn=conn, gen=gen,
task_id=UUID(task_id), tag_id=tag.id,
)
return {"task_id": task_id, "tag": tag.model_dump(mode="json")}
def untag_task(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: str,
tag_name: str,
) -> dict[str, Any]:
tag = read.get_tag_by_name(conn, tag_name)
if tag is None:
raise ToolError(f"no such tag: {tag_name!r}")
_wrap_service(
service.untag_task, conn=conn, gen=gen,
task_id=UUID(task_id), tag_id=tag.id,
)
return {"task_id": task_id, "tag_name": tag_name, "removed": True}
def set_task_owner(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: str,
person_handle: str | None = None,
) -> dict[str, Any]:
"""Assign a task to a person by handle. Pass `person_handle=None` to unassign."""
owner_id: UUID | None = None
if person_handle is not None:
person = read.get_person_by_handle(conn, person_handle)
if person is None:
raise ToolError(f"no such person: {person_handle!r}")
owner_id = person.id
task = _wrap_service(
service.set_task_owner, conn=conn, gen=gen,
task_id=UUID(task_id), owner_person_id=owner_id,
)
return task.model_dump(mode="json")
def set_task_type(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: str,
task_type: str | None = None,
) -> dict[str, Any]:
type_enum: TaskType | None = None
if task_type is not None:
try:
type_enum = TaskType(task_type)
except ValueError as exc:
raise ToolError(f"invalid task type: {task_type!r}") from exc
task = _wrap_service(
service.set_task_type, conn=conn, gen=gen,
task_id=UUID(task_id), task_type=type_enum,
)
return task.model_dump(mode="json")
def set_task_meta(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: str,
color: str | None = None,
emoji: str | None = None,
apply_color_rule: bool | None = None,
) -> dict[str, Any]:
task = _wrap_service(
service.set_task_meta, conn=conn, gen=gen,
task_id=UUID(task_id), color=color,
emoji=emoji, apply_color_rule=apply_color_rule,
)
return task.model_dump(mode="json")
__all__ = [
"ToolError",
"add_task",
"archive_epic",
"broadcast",
"create_assignment",
"create_epic",
"create_org",
"create_person",
"create_project",
"create_tag",
"get_session",
"help_text",
"list_recent_events",
@ -369,7 +568,13 @@ __all__ = [
"pull",
"search_chat_messages",
"send_to_session",
"set_task_meta",
"set_task_owner",
"set_task_type",
"status",
"suggest_assignments",
"summarize_project",
"tag_task",
"transition_task_state",
"untag_task",
]

View file

@ -24,7 +24,7 @@ from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter
from ... import read
from ...domain import ChatRole, ChatScope, TaskStatus
from ...domain import ChatRole, ChatScope, TaskStatus, TaskType
from ...hlc import HLCGenerator
from .. import service
@ -103,6 +103,63 @@ class StatusCmd(_Cmd):
kind: Literal["status"] = "status"
# PM expansion commands ----------------------------------------------------
class OrgNewCmd(_Cmd):
kind: Literal["org_new"] = "org_new"
name: str = Field(min_length=1)
description: str | None = None
class PersonNewCmd(_Cmd):
kind: Literal["person_new"] = "person_new"
handle: str = Field(min_length=2, pattern=r"^@[\w.-]+$")
display_name: str = Field(min_length=1)
email: str | None = None
class EpicNewCmd(_Cmd):
kind: Literal["epic_new"] = "epic_new"
project: str = Field(min_length=1)
title: str = Field(min_length=1)
description: str | None = None
class TaskStateCmd(_Cmd):
kind: Literal["task_state"] = "task_state"
task_ref: str = Field(min_length=4)
to_state: TaskStatus
reason: str | None = None
class TaskTagCmd(_Cmd):
kind: Literal["task_tag"] = "task_tag"
task_ref: str = Field(min_length=4)
tag_name: str = Field(min_length=1)
remove: bool = False # True when parsed from /untag
class TaskOwnCmd(_Cmd):
kind: Literal["task_own"] = "task_own"
task_ref: str = Field(min_length=4)
person_handle: str | None = None # None = unassign
class TaskTypeCmd(_Cmd):
kind: Literal["task_type"] = "task_type"
task_ref: str = Field(min_length=4)
task_type: TaskType | None = None # None = clear
class TaskMetaCmd(_Cmd):
kind: Literal["task_meta"] = "task_meta"
task_ref: str = Field(min_length=4)
color: str | None = None
emoji: str | None = None
apply_color_rule: bool | None = None
Command: TypeAlias = Annotated[
HelpCmd
| ProjectNewCmd
@ -111,7 +168,15 @@ Command: TypeAlias = Annotated[
| AssignCmd
| BroadcastCmd
| PullCmd
| StatusCmd,
| StatusCmd
| OrgNewCmd
| PersonNewCmd
| EpicNewCmd
| TaskStateCmd
| TaskTagCmd
| TaskOwnCmd
| TaskTypeCmd
| TaskMetaCmd,
Field(discriminator="kind"),
]
@ -284,6 +349,136 @@ def parse(text: str, ctx: ScopeCtx) -> Command:
"kind": "broadcast", "target": target, "text": body,
})
# PM expansion ----------------------------------------------------------
if head == "org":
if not rest or rest[0] != "new":
raise ParseError("usage: /org new <name> [--description ...]")
sub_pos, sub_flags = _split_flags(rest[1:])
if not sub_pos:
raise ParseError("usage: /org new <name>")
return _CMD_ADAPTER.validate_python({
"kind": "org_new",
"name": sub_pos[0],
"description": _str_flag(sub_flags, "description"),
})
if head == "person":
if not rest or rest[0] != "new":
raise ParseError("usage: /person new <@handle> <display_name> [--email ...]")
sub_pos, sub_flags = _split_flags(rest[1:])
if len(sub_pos) < 2:
raise ParseError("usage: /person new <@handle> <display_name>")
return _CMD_ADAPTER.validate_python({
"kind": "person_new",
"handle": sub_pos[0],
"display_name": sub_pos[1],
"email": _str_flag(sub_flags, "email"),
})
if head == "epic":
if not rest or rest[0] != "new":
raise ParseError("usage: /epic new <title> [--project <name>] [--description ...]")
sub_pos, sub_flags = _split_flags(rest[1:])
if not sub_pos:
raise ParseError("usage: /epic new <title>")
project = _str_flag(sub_flags, "project")
if project is None:
if ctx.scope == ChatScope.PROJECT and ctx.scope_ref:
project = ctx.scope_ref
else:
raise ParseError("--project required outside a project chat")
return _CMD_ADAPTER.validate_python({
"kind": "epic_new",
"project": project,
"title": sub_pos[0],
"description": _str_flag(sub_flags, "description"),
})
if head == "state":
sub_pos, sub_flags = _split_flags(rest)
if len(sub_pos) < 2:
raise ParseError("usage: /state <task-ref> <todo|in_progress|blocked|review|done> [--reason ...]")
try:
to_state = TaskStatus(sub_pos[1])
except ValueError as exc:
raise ParseError(f"unknown task state: {sub_pos[1]!r}") from exc
return _CMD_ADAPTER.validate_python({
"kind": "task_state",
"task_ref": sub_pos[0],
"to_state": to_state,
"reason": _str_flag(sub_flags, "reason"),
})
if head == "tag":
sub_pos, _ = _split_flags(rest)
if len(sub_pos) < 2:
raise ParseError("usage: /tag <task-ref> <tag-name>")
return _CMD_ADAPTER.validate_python({
"kind": "task_tag",
"task_ref": sub_pos[0],
"tag_name": sub_pos[1],
"remove": False,
})
if head == "untag":
sub_pos, _ = _split_flags(rest)
if len(sub_pos) < 2:
raise ParseError("usage: /untag <task-ref> <tag-name>")
return _CMD_ADAPTER.validate_python({
"kind": "task_tag",
"task_ref": sub_pos[0],
"tag_name": sub_pos[1],
"remove": True,
})
if head == "own":
sub_pos, _ = _split_flags(rest)
if not sub_pos:
raise ParseError("usage: /own <task-ref> [@handle] (omit handle to unassign)")
return _CMD_ADAPTER.validate_python({
"kind": "task_own",
"task_ref": sub_pos[0],
"person_handle": sub_pos[1] if len(sub_pos) > 1 else None,
})
if head == "type":
sub_pos, _ = _split_flags(rest)
if not sub_pos:
raise ParseError("usage: /type <task-ref> [bug|feature|chore|research|spike]")
task_type: TaskType | None = None
if len(sub_pos) > 1:
try:
task_type = TaskType(sub_pos[1])
except ValueError as exc:
raise ParseError(f"unknown task type: {sub_pos[1]!r}") from exc
return _CMD_ADAPTER.validate_python({
"kind": "task_type",
"task_ref": sub_pos[0],
"task_type": task_type,
})
if head == "meta":
sub_pos, sub_flags = _split_flags(rest)
if not sub_pos:
raise ParseError("usage: /meta <task-ref> [--color ...] [--emoji ...] [--apply-color-rule on|off]")
acr_raw = _str_flag(sub_flags, "apply-color-rule")
acr: bool | None = None
if acr_raw is not None:
if acr_raw in {"on", "true", "1", "yes"}:
acr = True
elif acr_raw in {"off", "false", "0", "no"}:
acr = False
else:
raise ParseError(f"--apply-color-rule must be on/off, got {acr_raw!r}")
return _CMD_ADAPTER.validate_python({
"kind": "task_meta",
"task_ref": sub_pos[0],
"color": _str_flag(sub_flags, "color"),
"emoji": _str_flag(sub_flags, "emoji"),
"apply_color_rule": acr,
})
raise ParseError(f"unknown command: /{head}")
@ -437,17 +632,138 @@ def dispatch(
},
)
# PM expansion ---------------------------------------------------
case OrgNewCmd():
org = service.create_org(conn, gen, name=cmd.name, description=cmd.description)
return DispatchResult(
body=f"✓ org {org.name!r} created.",
meta={"kind": "org_new", "org_id": str(org.id), "name": org.name},
)
case PersonNewCmd():
person = service.create_person(
conn, gen, handle=cmd.handle,
display_name=cmd.display_name, email=cmd.email,
)
return DispatchResult(
body=f"✓ person {person.handle} ({person.display_name}) created.",
meta={"kind": "person_new", "person_id": str(person.id), "handle": person.handle},
)
case EpicNewCmd():
epic = service.create_epic(
conn, gen, project=cmd.project,
title=cmd.title, description=cmd.description,
)
return DispatchResult(
body=f"✓ epic {epic.title!r} created in {cmd.project}.",
meta={"kind": "epic_new", "epic_id": str(epic.id), "project": cmd.project},
)
case TaskStateCmd():
task_id = service.resolve_task_id(conn, cmd.task_ref)
task = service.transition_task_state(
conn, gen, task_id=task_id,
to_state=cmd.to_state, reason=cmd.reason,
)
return DispatchResult(
body=f"Task {str(task_id)[:8]}{task.status.value}",
meta={
"kind": "task_state",
"task_id": str(task_id),
"state": task.status.value,
},
)
case TaskTagCmd():
task_id = service.resolve_task_id(conn, cmd.task_ref)
if cmd.remove:
tag = read.get_tag_by_name(conn, cmd.tag_name)
if tag is None:
raise service.NotFound(f"no such tag: {cmd.tag_name!r}")
service.untag_task(conn, gen, task_id=task_id, tag_id=tag.id)
return DispatchResult(
body=f"Task {str(task_id)[:8]} untagged: {cmd.tag_name}",
meta={"kind": "task_untag", "task_id": str(task_id), "tag": cmd.tag_name},
)
tag = service.get_or_create_tag(conn, gen, name=cmd.tag_name)
service.tag_task(conn, gen, task_id=task_id, tag_id=tag.id)
return DispatchResult(
body=f"Task {str(task_id)[:8]} tagged: {cmd.tag_name}",
meta={"kind": "task_tag", "task_id": str(task_id), "tag": cmd.tag_name},
)
case TaskOwnCmd():
task_id = service.resolve_task_id(conn, cmd.task_ref)
owner_id: UUID | None = None
if cmd.person_handle is not None:
person = read.get_person_by_handle(conn, cmd.person_handle)
if person is None:
raise service.NotFound(f"no such person: {cmd.person_handle!r}")
owner_id = person.id
task = service.set_task_owner(conn, gen, task_id=task_id, owner_person_id=owner_id)
who = cmd.person_handle or "unassigned"
return DispatchResult(
body=f"Task {str(task_id)[:8]} owner: {who}",
meta={
"kind": "task_own",
"task_id": str(task_id),
"owner_person_id": str(task.owner_person_id) if task.owner_person_id else None,
},
)
case TaskTypeCmd():
task_id = service.resolve_task_id(conn, cmd.task_ref)
task = service.set_task_type(conn, gen, task_id=task_id, task_type=cmd.task_type)
label = task.task_type.value if task.task_type else "cleared"
return DispatchResult(
body=f"Task {str(task_id)[:8]} type: {label}",
meta={
"kind": "task_type",
"task_id": str(task_id),
"type": task.task_type.value if task.task_type else None,
},
)
case TaskMetaCmd():
task_id = service.resolve_task_id(conn, cmd.task_ref)
task = service.set_task_meta(
conn, gen, task_id=task_id,
color=cmd.color, emoji=cmd.emoji,
apply_color_rule=cmd.apply_color_rule,
)
return DispatchResult(
body=f"Task {str(task_id)[:8]} meta updated.",
meta={
"kind": "task_meta",
"task_id": str(task_id),
"color": task.color,
"emoji": task.emoji,
"apply_color_rule": task.apply_color_rule,
},
)
_HELP_TEXT = (
"Slash commands:\n"
" /project new <name> [--goal ...] [--owner ...]\n"
" /task new <title> [--project <name>] [--priority 0-4]\n"
" /task list [--project <name>] [--status todo|in_progress|blocked|done]\n"
" /task list [--project <name>] [--status todo|in_progress|blocked|review|done]\n"
" /assign <task-id> <session-id>\n"
" /broadcast <text> (in project or session chat)\n"
" /pull refresh fleet from rclaude\n"
" /status fleet summary\n"
" /help this message\n"
"PM commands:\n"
" /org new <name> [--description ...]\n"
" /person new <@handle> <display_name> [--email ...]\n"
" /epic new <title> [--project <name>] [--description ...]\n"
" /state <task-ref> <todo|in_progress|blocked|review|done> [--reason ...]\n"
" /tag <task-ref> <tag-name> · /untag <task-ref> <tag-name>\n"
" /own <task-ref> [@handle] (omit handle to unassign)\n"
" /type <task-ref> [bug|feature|chore|research|spike]\n"
" /meta <task-ref> [--color X] [--emoji Y] [--apply-color-rule on|off]\n"
"Bare text (no leading /) is sent to the NL parser."
)
@ -457,14 +773,22 @@ __all__ = [
"BroadcastCmd",
"Command",
"DispatchResult",
"EpicNewCmd",
"HelpCmd",
"OrgNewCmd",
"ParseError",
"PersonNewCmd",
"ProjectNewCmd",
"PullCmd",
"ScopeCtx",
"StatusCmd",
"TaskListCmd",
"TaskMetaCmd",
"TaskNewCmd",
"TaskOwnCmd",
"TaskStateCmd",
"TaskTagCmd",
"TaskTypeCmd",
"dispatch",
"is_slash",
"parse",

View file

@ -20,14 +20,23 @@ from .. import events as ev
from .. import read
from ..domain import (
Assignment,
Category,
ChatMessage,
ChatRole,
ChatScope,
Domain,
Epic,
EpicStatus,
Org,
Person,
Project,
ProjectStatus,
Session,
TASK_TRANSITIONS,
Tag,
Task,
TaskStatus,
TaskType,
)
from ..hlc import HLCGenerator
from ..pull import PullStats
@ -494,3 +503,331 @@ def resolve_session_id(conn: sqlite3.Connection, raw: str) -> UUID:
if len(rows) > 1:
raise InvalidInput(f"ambiguous session prefix {raw!r}: {len(rows)} matches")
return UUID(rows[0]["uuid"])
# ---------------------------------------------------------------------------
# PM expansion (migration 0003_pm) — org, person, epic, tag, category,
# domain entities + richer task surface (state-history, owner, type,
# classification, display meta).
# ---------------------------------------------------------------------------
def create_org(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
name: str,
description: str | None = None,
) -> Org:
if read.get_org(conn, name) is not None:
raise Conflict(f"org already exists: {name!r}")
org_id = _uuid.uuid4()
ev.append(conn, gen, ev.OrgCreated(
org_id=org_id, name=name, description=description,
))
fresh = read.get_org(conn, str(org_id))
assert fresh is not None
return fresh
def list_orgs(conn: sqlite3.Connection) -> list[Org]:
return read.list_orgs(conn)
def create_person(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
handle: str,
display_name: str,
email: str | None = None,
) -> Person:
if not handle.startswith("@"):
raise InvalidInput(f"person handle must start with @: {handle!r}")
if read.get_person_by_handle(conn, handle) is not None:
raise Conflict(f"person with handle {handle!r} already exists")
person_id = _uuid.uuid4()
ev.append(conn, gen, ev.PersonCreated(
person_id=person_id, handle=handle,
display_name=display_name, email=email,
))
fresh = read.get_person(conn, person_id)
assert fresh is not None
return fresh
def list_people(conn: sqlite3.Connection) -> list[Person]:
return read.list_people(conn)
def create_epic(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
project: str,
title: str,
description: str | None = None,
) -> Epic:
proj = read.get_project(conn, project)
if proj is None:
raise NotFound(f"no such project: {project!r}")
epic_id = _uuid.uuid4()
ev.append(conn, gen, ev.EpicCreated(
epic_id=epic_id, project_id=proj.id,
title=title, description=description,
))
fresh = read.get_epic(conn, epic_id)
assert fresh is not None
return fresh
def archive_epic(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
epic_id: UUID,
) -> Epic:
epic = read.get_epic(conn, epic_id)
if epic is None:
raise NotFound(f"no such epic: {epic_id}")
if epic.status == EpicStatus.ARCHIVED:
raise Conflict("epic already archived")
ev.append(conn, gen, ev.EpicUpdated(
epic_id=epic_id, status=EpicStatus.ARCHIVED,
))
fresh = read.get_epic(conn, epic_id)
assert fresh is not None
return fresh
def list_epics(
conn: sqlite3.Connection,
*,
project: str | None = None,
include_archived: bool = False,
) -> list[Epic]:
proj_id: UUID | None = None
if project is not None:
proj = read.get_project(conn, project)
if proj is None:
raise NotFound(f"no such project: {project!r}")
proj_id = proj.id
status: EpicStatus | None = None if include_archived else EpicStatus.OPEN
return read.list_epics(conn, project_id=proj_id, status=status)
def create_tag(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
name: str,
color: str | None = None,
) -> Tag:
if read.get_tag_by_name(conn, name) is not None:
raise Conflict(f"tag already exists: {name!r}")
tag_id = _uuid.uuid4()
ev.append(conn, gen, ev.TagCreated(tag_id=tag_id, name=name, color=color))
fresh = read.get_tag(conn, tag_id)
assert fresh is not None
return fresh
def get_or_create_tag(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
name: str,
color: str | None = None,
) -> Tag:
existing = read.get_tag_by_name(conn, name)
if existing is not None:
return existing
return create_tag(conn, gen, name=name, color=color)
def list_tags(conn: sqlite3.Connection) -> list[Tag]:
return read.list_tags(conn)
def create_category(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
name: str,
color: str | None = None,
) -> Category:
if read.get_category_by_name(conn, name) is not None:
raise Conflict(f"category already exists: {name!r}")
cat_id = _uuid.uuid4()
ev.append(conn, gen, ev.CategoryCreated(
category_id=cat_id, name=name, color=color,
))
fresh = read.get_category(conn, cat_id)
assert fresh is not None
return fresh
def list_categories(conn: sqlite3.Connection) -> list[Category]:
return read.list_categories(conn)
def create_domain(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
name: str,
) -> Domain:
if read.get_domain_by_name(conn, name) is not None:
raise Conflict(f"domain already exists: {name!r}")
dom_id = _uuid.uuid4()
ev.append(conn, gen, ev.DomainCreated(domain_id=dom_id, name=name))
fresh = read.get_domain(conn, dom_id)
assert fresh is not None
return fresh
def list_domains(conn: sqlite3.Connection) -> list[Domain]:
return read.list_domains(conn)
def transition_task_state(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: UUID,
to_state: TaskStatus,
reason: str | None = None,
) -> Task:
"""Move a task between states. Rejects illegal transitions."""
task = read.get_task(conn, task_id)
if task is None:
raise NotFound(f"no such task: {task_id}")
if (task.status, to_state) not in TASK_TRANSITIONS:
raise InvalidInput(
f"illegal transition: {task.status.value}{to_state.value}"
)
ev.append(conn, gen, ev.TaskStateTransitioned(
task_id=task_id, from_state=task.status,
to_state=to_state, reason=reason,
))
fresh = read.get_task(conn, task_id)
assert fresh is not None
return fresh
def tag_task(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: UUID,
tag_id: UUID,
) -> None:
"""Idempotent: re-tagging is a no-op (no Conflict)."""
if read.get_task(conn, task_id) is None:
raise NotFound(f"no such task: {task_id}")
if read.get_tag(conn, tag_id) is None:
raise NotFound(f"no such tag: {tag_id}")
ev.append(conn, gen, ev.TaskTagged(task_id=task_id, tag_id=tag_id))
def untag_task(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: UUID,
tag_id: UUID,
) -> None:
ev.append(conn, gen, ev.TaskUntagged(task_id=task_id, tag_id=tag_id))
def set_task_owner(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: UUID,
owner_person_id: UUID | None,
) -> Task:
if read.get_task(conn, task_id) is None:
raise NotFound(f"no such task: {task_id}")
if owner_person_id is not None and read.get_person(conn, owner_person_id) is None:
raise NotFound(f"no such person: {owner_person_id}")
ev.append(conn, gen, ev.TaskOwnerSet(
task_id=task_id, owner_person_id=owner_person_id,
))
fresh = read.get_task(conn, task_id)
assert fresh is not None
return fresh
def set_task_type(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: UUID,
task_type: TaskType | None,
) -> Task:
if read.get_task(conn, task_id) is None:
raise NotFound(f"no such task: {task_id}")
ev.append(conn, gen, ev.TaskTypeSet(task_id=task_id, task_type=task_type))
fresh = read.get_task(conn, task_id)
assert fresh is not None
return fresh
def set_task_category(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: UUID,
category_id: UUID | None,
) -> Task:
if read.get_task(conn, task_id) is None:
raise NotFound(f"no such task: {task_id}")
if category_id is not None and read.get_category(conn, category_id) is None:
raise NotFound(f"no such category: {category_id}")
ev.append(conn, gen, ev.TaskCategorySet(task_id=task_id, category_id=category_id))
fresh = read.get_task(conn, task_id)
assert fresh is not None
return fresh
def set_task_domain(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: UUID,
domain_id: UUID | None,
) -> Task:
if read.get_task(conn, task_id) is None:
raise NotFound(f"no such task: {task_id}")
if domain_id is not None and read.get_domain(conn, domain_id) is None:
raise NotFound(f"no such domain: {domain_id}")
ev.append(conn, gen, ev.TaskDomainSet(task_id=task_id, domain_id=domain_id))
fresh = read.get_task(conn, task_id)
assert fresh is not None
return fresh
def set_task_meta(
conn: sqlite3.Connection,
gen: HLCGenerator,
*,
task_id: UUID,
color: str | None = None,
emoji: str | None = None,
apply_color_rule: bool | None = None,
) -> Task:
"""No-op when every field is None — no event emitted."""
if read.get_task(conn, task_id) is None:
raise NotFound(f"no such task: {task_id}")
if color is None and emoji is None and apply_color_rule is None:
fresh = read.get_task(conn, task_id)
assert fresh is not None
return fresh
ev.append(conn, gen, ev.TaskMetaSet(
task_id=task_id, color=color, emoji=emoji,
apply_color_rule=apply_color_rule,
))
fresh = read.get_task(conn, task_id)
assert fresh is not None
return fresh

162
tests/test_pm_events.py Normal file
View file

@ -0,0 +1,162 @@
"""Projection tests for PM-expansion events.
Verifies that each new event payload writes to its projection table and that
a full `replay()` reconstructs identical state from the event log.
"""
from __future__ import annotations
import uuid as _uuid
from clare import events as ev
from clare.db import migrate, open_db
from clare.domain import EpicStatus, TaskStatus, TaskType
from clare.hlc import HLCGenerator
def _setup() -> tuple:
conn = open_db(":memory:")
migrate(conn)
gen = HLCGenerator("test-machine")
return conn, gen
def test_org_created_projects() -> None:
conn, gen = _setup()
org_id = _uuid.uuid4()
ev.append(conn, gen, ev.OrgCreated(org_id=org_id, name="acme", description="d"))
row = conn.execute("SELECT * FROM orgs WHERE id = ?", (str(org_id),)).fetchone()
assert row["name"] == "acme" and row["description"] == "d"
def test_person_created_unique_handle() -> None:
conn, gen = _setup()
p1 = _uuid.uuid4()
p2 = _uuid.uuid4()
ev.append(conn, gen, ev.PersonCreated(person_id=p1, handle="@quinn", display_name="Quinn"))
# Second insert with the same handle violates UNIQUE — surfaces as an error
# at the projection layer (events.append uses a single transaction).
try:
ev.append(conn, gen, ev.PersonCreated(person_id=p2, handle="@quinn", display_name="Quinn 2"))
raise AssertionError("expected UNIQUE constraint violation")
except Exception: # noqa: BLE001
pass
def test_epic_archive_nulls_task_epic_id() -> None:
conn, gen = _setup()
proj_id = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=proj_id, name="proj"))
epic_id = _uuid.uuid4()
ev.append(conn, gen, ev.EpicCreated(epic_id=epic_id, project_id=proj_id, title="E"))
# Three tasks linked to the epic via direct UPDATE (we have no
# "set epic_id" event yet — tasks land in the epic via projection-level
# link in normal usage; here we set epic_id manually for the test).
for _ in range(3):
tid = _uuid.uuid4()
ev.append(conn, gen, ev.TaskAdded(task_id=tid, project_id=proj_id, title="t"))
conn.execute("UPDATE tasks SET epic_id = ? WHERE id = ?", (str(epic_id), str(tid)))
assert conn.execute("SELECT COUNT(*) FROM tasks WHERE epic_id = ?", (str(epic_id),)).fetchone()[0] == 3
ev.append(conn, gen, ev.EpicUpdated(epic_id=epic_id, status=EpicStatus.ARCHIVED))
assert conn.execute("SELECT COUNT(*) FROM tasks WHERE epic_id = ?", (str(epic_id),)).fetchone()[0] == 0
def test_task_state_transition_writes_history() -> None:
conn, gen = _setup()
proj_id = _uuid.uuid4()
task_id = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=proj_id, name="proj"))
ev.append(conn, gen, ev.TaskAdded(task_id=task_id, project_id=proj_id, title="t"))
ev.append(conn, gen, ev.TaskStateTransitioned(
task_id=task_id, from_state=TaskStatus.TODO, to_state=TaskStatus.IN_PROGRESS,
))
row = conn.execute("SELECT status FROM tasks WHERE id = ?", (str(task_id),)).fetchone()
assert row["status"] == "in_progress"
hist = conn.execute(
"SELECT from_state, to_state FROM task_state_history WHERE task_id = ?", (str(task_id),)
).fetchall()
assert len(hist) == 1
assert hist[0]["from_state"] == "todo" and hist[0]["to_state"] == "in_progress"
def test_task_tagged_and_untagged_idempotent() -> None:
conn, gen = _setup()
proj_id = _uuid.uuid4()
task_id = _uuid.uuid4()
tag_id = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=proj_id, name="p"))
ev.append(conn, gen, ev.TaskAdded(task_id=task_id, project_id=proj_id, title="t"))
ev.append(conn, gen, ev.TagCreated(tag_id=tag_id, name="urgent"))
ev.append(conn, gen, ev.TaskTagged(task_id=task_id, tag_id=tag_id))
ev.append(conn, gen, ev.TaskTagged(task_id=task_id, tag_id=tag_id)) # idempotent
count = conn.execute(
"SELECT COUNT(*) FROM task_tags WHERE task_id = ?", (str(task_id),)
).fetchone()[0]
assert count == 1
ev.append(conn, gen, ev.TaskUntagged(task_id=task_id, tag_id=tag_id))
ev.append(conn, gen, ev.TaskUntagged(task_id=task_id, tag_id=tag_id)) # no-op
count = conn.execute(
"SELECT COUNT(*) FROM task_tags WHERE task_id = ?", (str(task_id),)
).fetchone()[0]
assert count == 0
def test_task_meta_partial_update() -> None:
conn, gen = _setup()
proj_id = _uuid.uuid4()
task_id = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=proj_id, name="p"))
ev.append(conn, gen, ev.TaskAdded(task_id=task_id, project_id=proj_id, title="t"))
ev.append(conn, gen, ev.TaskMetaSet(task_id=task_id, emoji="🐛"))
row = conn.execute(
"SELECT color, emoji, apply_color_rule FROM tasks WHERE id = ?", (str(task_id),)
).fetchone()
assert row["emoji"] == "🐛"
assert row["color"] is None # unchanged
assert row["apply_color_rule"] == 1
ev.append(conn, gen, ev.TaskMetaSet(task_id=task_id, color="#ff0000"))
row = conn.execute(
"SELECT color, emoji FROM tasks WHERE id = ?", (str(task_id),)
).fetchone()
assert row["color"] == "#ff0000" and row["emoji"] == "🐛"
def test_replay_reproduces_pm_projection() -> None:
"""Full replay round-trip on a PM-rich event log produces identical state."""
conn, gen = _setup()
org_id = _uuid.uuid4()
person_id = _uuid.uuid4()
proj_id = _uuid.uuid4()
epic_id = _uuid.uuid4()
tag_id = _uuid.uuid4()
task_id = _uuid.uuid4()
ev.append(conn, gen, ev.OrgCreated(org_id=org_id, name="acme"))
ev.append(conn, gen, ev.PersonCreated(person_id=person_id, handle="@q", display_name="Q"))
ev.append(conn, gen, ev.ProjectCreated(project_id=proj_id, name="p"))
ev.append(conn, gen, ev.EpicCreated(epic_id=epic_id, project_id=proj_id, title="E"))
ev.append(conn, gen, ev.TagCreated(tag_id=tag_id, name="urgent", color="#f00"))
ev.append(conn, gen, ev.TaskAdded(task_id=task_id, project_id=proj_id, title="t"))
ev.append(conn, gen, ev.TaskTagged(task_id=task_id, tag_id=tag_id))
ev.append(conn, gen, ev.TaskOwnerSet(task_id=task_id, owner_person_id=person_id))
ev.append(conn, gen, ev.TaskTypeSet(task_id=task_id, task_type=TaskType.BUG))
ev.append(conn, gen, ev.TaskStateTransitioned(
task_id=task_id, from_state=TaskStatus.TODO, to_state=TaskStatus.IN_PROGRESS,
))
def snapshot() -> dict:
return {
"orgs": conn.execute("SELECT name FROM orgs ORDER BY name").fetchall(),
"people": conn.execute("SELECT handle FROM people ORDER BY handle").fetchall(),
"projects": conn.execute("SELECT name FROM projects ORDER BY name").fetchall(),
"epics": conn.execute("SELECT title, status FROM epics ORDER BY title").fetchall(),
"tags": conn.execute("SELECT name, color FROM tags ORDER BY name").fetchall(),
"tasks": conn.execute("SELECT status, task_type, owner_person_id FROM tasks").fetchall(),
"tags_links": conn.execute("SELECT * FROM task_tags").fetchall(),
"history": conn.execute("SELECT from_state, to_state FROM task_state_history").fetchall(),
}
before = {k: [tuple(r) for r in v] for k, v in snapshot().items()}
n = ev.replay(conn)
assert n >= 10
after = {k: [tuple(r) for r in v] for k, v in snapshot().items()}
assert before == after