feat(video-classification): Implement video classification API, job processor, and pipeline with models and tests

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
autocommit 2026-06-08 09:31:52 -07:00
parent 3a88d92ba4
commit 641dcfa819
6 changed files with 1074 additions and 2 deletions

View file

@ -16,16 +16,26 @@ from prometheus_fastapi_instrumentator import Instrumentator
from starlette.middleware.base import BaseHTTPMiddleware
import protections # noqa: F401 — triggers @protection registration for all operations
from config.settings import settings
from detection.face_detector import FaceDetector
from jobs.classify_job_store import ClassifyJobStore
from jobs.job_store import JobStore
from jobs.protect_job_store import ProtectJobStore
from pipeline.classify_processor import ClassifyVideoProcessor
from pipeline.protection_processor import ProtectionProcessor
from pipeline.transcode_processor import TranscodeProcessor
from pipeline.video_processor import VideoProcessor
from .routes import detect, health, invisible_protect, media, process, recordings, transcode
from .routes import (
classify_video,
detect,
health,
invisible_protect,
media,
process,
recordings,
transcode,
)
# ---------------------------------------------------------------------------
# Logging — configure stdlib root logger first so all libraries are visible,
@ -81,9 +91,11 @@ async def initialize_components() -> None:
logger.info(f"Connecting to Redis job stores at {settings.redis_url}")
job_store = JobStore(redis_url=settings.redis_url)
protect_job_store = ProtectJobStore(redis_url=settings.redis_url)
classify_job_store = ClassifyJobStore(redis_url=settings.redis_url)
await job_store.connect()
await protect_job_store.connect()
await classify_job_store.connect()
await detector.initialize()
processor = VideoProcessor(detector=detector, job_store=job_store)
@ -92,13 +104,16 @@ async def initialize_components() -> None:
protect_job_store=protect_job_store,
)
transcode_processor = TranscodeProcessor(job_store=job_store)
classify_processor = ClassifyVideoProcessor(classify_job_store=classify_job_store)
lifespan.set_state("detector", detector)
lifespan.set_state("job_store", job_store)
lifespan.set_state("protect_job_store", protect_job_store)
lifespan.set_state("classify_job_store", classify_job_store)
lifespan.set_state("processor", processor)
lifespan.set_state("protection_processor", protection_processor)
lifespan.set_state("transcode_processor", transcode_processor)
lifespan.set_state("classify_processor", classify_processor)
logger.info("imajin-video service ready")
@ -120,6 +135,11 @@ async def cleanup_components() -> None:
logger.info("Closing protect job store connection...")
await protect_job_store.close()
classify_job_store = lifespan.get_state("classify_job_store")
if classify_job_store:
logger.info("Closing classify job store connection...")
await classify_job_store.close()
logger.info("imajin-video service shutdown complete")
@ -145,6 +165,7 @@ def create_app() -> FastAPI:
app.include_router(media.router)
app.include_router(recordings.router)
app.include_router(detect.router)
app.include_router(classify_video.router)
@app.get("/")
async def root():
@ -158,6 +179,9 @@ def create_app() -> FastAPI:
"invisible_protect": "POST /invisible-protect",
"protections": "GET /protections",
"transcode": "POST /transcode",
"classify_video": "POST /classify-video",
"classify_video_sync": "POST /classify-video/sync",
"classify_job_status": "GET /classify-video/{job_id}",
"job_status": "GET /jobs/{job_id}",
"protect_job_status": "GET /protect-jobs/{job_id}",
"media_videos": "GET /media/videos",

View file

@ -0,0 +1,121 @@
"""Video classification routes for imajin-video service.
POST /classify-video enqueue an async classification job (HTTP 202)
GET /classify-video/{id} poll job status + result
POST /classify-video/sync classify a short clip inline (no job)
A video is sampled into keyframes and each is scored through the SAME model-boss
contrastive rubric the platform's image path uses, then MAX-aggregated to one
video-level verdict (is_explicit / quality_score / poster). Decode/codec failures
surface as a terminal ``failed`` status (async) or 422 (sync) never a bare 5xx,
which the consumer would treat as transient.
"""
from __future__ import annotations
import uuid
import cv2
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request
from config.settings import settings
from jobs.classify_job_store import ClassifyJobStore
from models.classify_types import (
ClassifyJobStatusResponse,
ClassifyVideoRequest,
ClassifyVideoResponse,
VideoClassification,
)
from pipeline.classify_processor import (
ClassifyError,
ClassifyVideoProcessor,
decode_video_b64,
)
router = APIRouter()
@router.post("/classify-video", response_model=ClassifyVideoResponse, status_code=202)
async def classify_video(
body: ClassifyVideoRequest,
background_tasks: BackgroundTasks,
request: Request,
) -> ClassifyVideoResponse:
"""Enqueue an async video-classification job. Poll ``GET /classify-video/{job_id}``."""
store: ClassifyJobStore = request.state.classify_job_store
processor: ClassifyVideoProcessor = request.state.classify_processor
job_id = str(uuid.uuid4())
await store.create(job_id)
background_tasks.add_task(processor.process, job_id, body)
return ClassifyVideoResponse(job_id=job_id)
@router.get("/classify-video/{job_id}", response_model=ClassifyJobStatusResponse)
async def get_classify_job(job_id: str, request: Request) -> ClassifyJobStatusResponse:
"""Return the status and (on completion) result of a classification job."""
store: ClassifyJobStore = request.state.classify_job_store
record = await store.get(job_id)
if record is None:
raise HTTPException(status_code=404, detail=f"Classify job not found: {job_id}")
result = VideoClassification(**record.result) if record.result else None
return ClassifyJobStatusResponse(
job_id=record.job_id,
status=record.status, # type: ignore[arg-type]
result=result,
error=record.error,
)
@router.post("/classify-video/sync", response_model=VideoClassification)
async def classify_video_sync(
body: ClassifyVideoRequest, request: Request
) -> VideoClassification:
"""Classify a SHORT clip inline and return the result directly.
Rejects clips longer than ``classify_sync_max_seconds`` (413) use the async
endpoint for those. Bad/undecodable media 422 (terminal), never 5xx.
"""
processor: ClassifyVideoProcessor = request.state.classify_processor
duration = _probe_duration_seconds(body.video_base64)
if duration > settings.classify_sync_max_seconds:
raise HTTPException(
status_code=413,
detail=(
f"Clip is {duration:.1f}s; sync accepts up to "
f"{settings.classify_sync_max_seconds:.0f}s. Use POST /classify-video."
),
)
try:
return await processor.classify(body)
except ClassifyError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
def _probe_duration_seconds(video_base64: str) -> float:
"""Decode just enough to read the clip duration (for the sync size guard)."""
import tempfile
from pathlib import Path
try:
raw = decode_video_b64(video_base64)
except ClassifyError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
tmp = Path(tempfile.mkstemp(prefix="imajin-classify-probe-", suffix=".bin")[1])
try:
tmp.write_bytes(raw)
cap = cv2.VideoCapture(str(tmp))
try:
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
finally:
cap.release()
if frame_count <= 0 or fps <= 0:
raise HTTPException(
status_code=422, detail="Unsupported or corrupt codec (no readable frames)"
)
return frame_count / fps
finally:
tmp.unlink(missing_ok=True)

View file

@ -0,0 +1,81 @@
"""Classify-video job state store backed by Redis.
Keys live under the namespace ``imajin-video:classify-job:{job_id}``.
Values are JSON-serialised ``ClassifyJobRecord`` dicts.
Separate from the disguise/protect job stores so the video-classification result
schema doesn't pollute theirs.
"""
from __future__ import annotations
import json
import logging
from dataclasses import asdict, dataclass
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
_KEY_PREFIX = "imajin-video:classify-job:"
_TTL_SECONDS = 60 * 60 * 24 # 24 hours
@dataclass
class ClassifyJobRecord:
job_id: str
status: str
result: dict | None = None
error: str | None = None
class ClassifyJobStore:
"""Async Redis-backed classify-video job state store."""
def __init__(self, redis_url: str) -> None:
self._redis_url = redis_url
self._client: aioredis.Redis | None = None
async def connect(self) -> None:
self._client = aioredis.from_url(self._redis_url, decode_responses=True)
await self._client.ping()
logger.info(f"ClassifyJobStore connected to Redis: {self._redis_url}")
async def close(self) -> None:
if self._client:
await self._client.aclose()
self._client = None
def _key(self, job_id: str) -> str:
return f"{_KEY_PREFIX}{job_id}"
async def create(self, job_id: str) -> ClassifyJobRecord:
record = ClassifyJobRecord(job_id=job_id, status="queued")
await self._save(record)
return record
async def get(self, job_id: str) -> ClassifyJobRecord | None:
assert self._client is not None, "ClassifyJobStore not connected"
raw = await self._client.get(self._key(job_id))
if raw is None:
return None
return ClassifyJobRecord(**json.loads(raw))
async def update(
self,
job_id: str,
status: str,
result: dict | None = None,
error: str | None = None,
) -> None:
record = ClassifyJobRecord(
job_id=job_id, status=status, result=result, error=error
)
await self._save(record)
async def _save(self, record: ClassifyJobRecord) -> None:
assert self._client is not None, "ClassifyJobStore not connected"
await self._client.setex(
self._key(record.job_id),
_TTL_SECONDS,
json.dumps(asdict(record)),
)

View file

@ -0,0 +1,118 @@
"""Pydantic models for the /classify-video endpoint.
A video is sampled into keyframes, each keyframe is scored through the SAME
model-boss contrastive vision primitive (`/v1/vision/score` + the shared rubric)
the platform's *image* ingest path uses, and the per-frame scores are aggregated
into one video-level verdict.
The result mirrors the consumer's `AssetClassification` shape so the platform can
map a video into the same `content_assets` draft a photo produces with
`is_explicit` calibration-identical to the photo path (it feeds the K3 gate).
"""
from __future__ import annotations
from enum import Enum
from typing import Literal
from pydantic import BaseModel, Field
class Explicitness(str, Enum):
sfw = "sfw"
suggestive = "suggestive"
explicit = "explicit"
class ClassifyVideoRequest(BaseModel):
"""Request to classify a video.
The platform streams the video bytes in (decision 5); imajin-video does not
read the mac-sync object store.
"""
video_base64: str = Field(
...,
max_length=400_000_000,
description="Base64-encoded video bytes (raw or data-URL). The platform streams these in.",
)
keyframes: int | None = Field(
None,
ge=1,
le=64,
description="None → content-aware scene-change sampling (clamped to the configured "
"[min,max] band). An int forces that many evenly-spaced frames.",
)
scorers: list[Literal["moderation", "quality", "scene"]] = Field(
default_factory=lambda: ["moderation", "quality"],
description="Which signals to populate. moderation+quality come free from the shared "
"contrastive rubric in one pass; scene is reserved for semantic enrichment (Phase 3).",
)
class FrameScore(BaseModel):
"""Per-keyframe scores from the contrastive rubric (re-normalized per pair)."""
index: int = Field(..., description="Frame index in the source video")
t: float = Field(..., description="Timestamp in seconds")
explicit: float = Field(..., ge=0, le=1)
suggestive: float = Field(..., ge=0, le=1)
quality: float = Field(..., ge=0, le=1)
explicitness: Explicitness
is_explicit: bool
class VideoClassification(BaseModel):
"""Aggregated video-level verdict.
Aggregation semantics (consumer-mandated):
- is_explicit / explicitness = MAX across frames (any explicit frame explicit video)
- quality_score = max across frames (best representative frame)
- scene_tags = union across frames (empty until Phase-3 semantic enrichment, matching
the image path which also emits no tags today)
"""
is_explicit: bool
explicitness: Explicitness
quality_score: float = Field(..., ge=0, le=1)
scene_tags: list[str] = Field(default_factory=list)
frame_count: int = Field(..., ge=0)
duration_sec: float = Field(..., ge=0)
poster_frame_index: int | None = Field(
None, description="Index of the chosen poster frame (highest-quality SFW-leaning)"
)
poster_b64: str | None = Field(
None,
description="JPEG bytes of the poster frame (decision 4 option A: the platform persists it). "
"Null only when no frame could be decoded.",
)
poster_key: str | None = Field(
None,
description="Reserved for decision 4 option B (imajin writes the poster to object storage "
"and returns its key). Null on the option-A inline-bytes path.",
)
frames: list[FrameScore] = Field(default_factory=list)
class ClassifyVideoResponse(BaseModel):
"""202 response for the async path."""
job_id: str
status: Literal["queued"] = "queued"
class ClassifyJobStatus(str, Enum):
queued = "queued"
processing = "processing"
done = "done"
failed = "failed"
class ClassifyJobStatusResponse(BaseModel):
"""Poll response. A decode/codec failure is a terminal `failed` status with a
reason here never a bare 5xx (the consumer treats imajin 5xx as transient)."""
job_id: str
status: ClassifyJobStatus
result: VideoClassification | None = None
error: str | None = None

View file

@ -0,0 +1,440 @@
"""Video classification pipeline.
Samples keyframes from a video and scores each through the SAME model-boss
contrastive vision primitive the platform's *image* ingest path uses
(`POST /v1/vision/score` + the shared rubric), then MAX-aggregates to one
video-level verdict.
Scoring parity is deliberate and safety-critical: ``is_explicit`` feeds
content-social's K3 gate, so a video must be judged by the *same* model and rubric
as a photo not a differently-calibrated scorer. (Decision: scorer-backend parity,
Quinn 2026-06-08.)
Pure helpers (sampling / normalization / aggregation / poster) are separated from
I/O so they unit-test without a video, a network, or a GPU.
"""
from __future__ import annotations
import asyncio
import base64
import binascii
import logging
import tempfile
from dataclasses import dataclass
from pathlib import Path
import cv2
import httpx
import numpy as np
from config.settings import settings
from jobs.classify_job_store import ClassifyJobStore
from models.classify_types import (
ClassifyVideoRequest,
Explicitness,
FrameScore,
VideoClassification,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared contrastive rubric — MUST mirror the consumer's CLASSIFY_RUBRIC
# (cocotte content-ingestor classification.ts) so video explicitness is
# calibration-identical to the photo path.
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class RubricDimension:
name: str
positive: str
negative: str
threshold: float = 0.5
CLASSIFY_RUBRIC: tuple[RubricDimension, ...] = (
RubricDimension(
name="explicit",
positive="explicit nudity or sexual content",
negative="clothed or non-explicit",
),
RubricDimension(
name="suggestive",
positive="suggestive, lingerie, swimwear, or implied nudity",
negative="fully safe-for-work",
),
RubricDimension(
name="quality",
positive="sharp, well-lit, flattering, in focus",
negative="blurry, poorly lit, or unflattering",
),
)
class ClassifyError(Exception):
"""Terminal, caller-attributable failure (bad/undecodable media).
Surfaced as a ``failed`` job status with a reason NOT a 5xx. The consumer
treats imajin 5xx as transient and retries; a corrupt codec must not look
transient. (README §"Hard-won context".)
"""
# ---------------------------------------------------------------------------
# Frame sampling (pure, cv2-only)
# ---------------------------------------------------------------------------
Keyframe = tuple[int, float, np.ndarray] # (frame_index, t_seconds, BGR frame)
def _video_meta(cap: cv2.VideoCapture) -> tuple[int, float]:
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
if fps <= 0:
fps = 25.0
return frame_count, fps
def _read_at(cap: cv2.VideoCapture, index: int) -> np.ndarray | None:
cap.set(cv2.CAP_PROP_POS_FRAMES, index)
ok, frame = cap.read()
return frame if ok else None
def sample_even(video_path: str, n: int) -> tuple[list[Keyframe], float]:
"""Extract ``n`` evenly-spaced frames. Returns (keyframes, duration_sec)."""
cap = cv2.VideoCapture(video_path)
try:
frame_count, fps = _video_meta(cap)
if frame_count <= 0:
return [], 0.0
n = max(1, min(n, frame_count))
# Even spacing biased to the centre of each segment (avoids index 0 black frames).
indices = sorted({int((i + 0.5) * frame_count / n) for i in range(n)})
frames: list[Keyframe] = []
for idx in indices:
frame = _read_at(cap, idx)
if frame is not None:
frames.append((idx, idx / fps, frame))
return frames, frame_count / fps
finally:
cap.release()
def _mean_abs_diff(a: np.ndarray, b: np.ndarray) -> float:
"""Normalized [0,1] mean absolute difference of two small grayscale probes."""
return float(np.mean(np.abs(a.astype(np.int16) - b.astype(np.int16)))) / 255.0
def detect_scene_keyframes(
video_path: str,
min_keyframes: int,
max_keyframes: int,
diff_threshold: float,
probe_budget: int = 240,
) -> tuple[list[Keyframe], float]:
"""Content-aware sampling: one representative frame per detected scene.
Probes the video at a bounded stride, marks a scene boundary wherever the
normalized frame-to-frame difference exceeds ``diff_threshold``, and returns
the midpoint frame of each scene. Clamped to ``[min,max]``; falls back to even
sampling when too few scenes are found. Returns (keyframes, duration_sec).
"""
cap = cv2.VideoCapture(video_path)
try:
frame_count, fps = _video_meta(cap)
if frame_count <= 0:
return [], 0.0
duration = frame_count / fps
stride = max(1, frame_count // max(1, probe_budget))
probe_indices = list(range(0, frame_count, stride))
if not probe_indices:
return [], duration
prev_small: np.ndarray | None = None
# boundary = (probe_position, diff_magnitude) where a cut starts.
cuts: list[tuple[int, float]] = []
for pos, idx in enumerate(probe_indices):
frame = _read_at(cap, idx)
if frame is None:
continue
small = cv2.resize(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY), (32, 32))
if prev_small is not None:
diff = _mean_abs_diff(prev_small, small)
if diff >= diff_threshold:
cuts.append((pos, diff))
prev_small = small
# Build scene segments over probe positions: [0, cut0, cut1, ..., end].
boundaries = [0] + [pos for pos, _ in cuts] + [len(probe_indices)]
segments = [
(boundaries[i], boundaries[i + 1])
for i in range(len(boundaries) - 1)
if boundaries[i + 1] > boundaries[i]
]
if len(segments) < min_keyframes:
return sample_even(video_path, min_keyframes)
# Clamp down: keep the scenes whose leading cut is most distinct.
if len(segments) > max_keyframes:
ranked = sorted(
range(len(segments)),
key=lambda i: cuts[i - 1][1] if i > 0 and i - 1 < len(cuts) else 1.0,
reverse=True,
)
keep = sorted(ranked[:max_keyframes])
segments = [segments[i] for i in keep]
frames: list[Keyframe] = []
for start, end in segments:
mid_probe = (start + end) // 2
idx = probe_indices[min(mid_probe, len(probe_indices) - 1)]
frame = _read_at(cap, idx)
if frame is not None:
frames.append((idx, idx / fps, frame))
return frames, duration
finally:
cap.release()
def sample_keyframes(video_path: str, keyframes: int | None) -> tuple[list[Keyframe], float]:
"""Dispatch: int → even-N; None → content-aware scene-change (clamped)."""
if keyframes is not None:
return sample_even(video_path, keyframes)
return detect_scene_keyframes(
video_path,
min_keyframes=settings.classify_scene_min_keyframes,
max_keyframes=settings.classify_scene_max_keyframes,
diff_threshold=settings.classify_scene_diff_threshold,
)
def encode_jpeg_b64(frame: np.ndarray) -> str:
ok, buf = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
if not ok:
raise ClassifyError("Failed to JPEG-encode a sampled frame")
return base64.b64encode(buf.tobytes()).decode()
# ---------------------------------------------------------------------------
# Scoring normalization (pure) — mirrors normalizeVisionScore + interpretVisionResult
# ---------------------------------------------------------------------------
def normalize_scores(raw_scores: list[float]) -> dict[str, float]:
"""Pair the flat softmax scores back into per-dimension [0,1] results.
The coordinator's softmax spans ALL labels, so each dimension is re-normalized
within its own positive/negative pair identical to the consumer's
normalizeVisionScore.
"""
out: dict[str, float] = {}
for i, dim in enumerate(CLASSIFY_RUBRIC):
pos = raw_scores[2 * i] if 2 * i < len(raw_scores) else 0.0
neg = raw_scores[2 * i + 1] if 2 * i + 1 < len(raw_scores) else 0.0
total = pos + neg
out[dim.name] = (pos / total) if total > 0 else 0.0
return out
def _clamp01(n: float) -> float:
if n != n: # NaN
return 0.0
return min(1.0, max(0.0, n))
def score_to_frame(index: int, t: float, dim_scores: dict[str, float]) -> FrameScore:
explicit = _clamp01(dim_scores.get("explicit", 0.0))
suggestive = _clamp01(dim_scores.get("suggestive", 0.0))
quality = _clamp01(dim_scores.get("quality", 0.0))
thr = {d.name: d.threshold for d in CLASSIFY_RUBRIC}
if explicit >= thr["explicit"]:
explicitness = Explicitness.explicit
elif suggestive >= thr["suggestive"]:
explicitness = Explicitness.suggestive
else:
explicitness = Explicitness.sfw
return FrameScore(
index=index,
t=round(t, 3),
explicit=explicit,
suggestive=suggestive,
quality=quality,
explicitness=explicitness,
is_explicit=explicitness == Explicitness.explicit,
)
# ---------------------------------------------------------------------------
# Aggregation + poster (pure)
# ---------------------------------------------------------------------------
def select_poster_index(frames: list[FrameScore]) -> int | None:
"""Highest-quality SFW-leaning frame; if every frame is explicit, the
highest-quality frame overall (the cockpit overlays explicit treatment)."""
if not frames:
return None
non_explicit = [f for f in frames if not f.is_explicit]
pool = non_explicit or frames
best = max(pool, key=lambda f: f.quality)
return best.index
def aggregate(
frames: list[FrameScore],
duration_sec: float,
scene_tags: list[str] | None = None,
) -> VideoClassification:
"""MAX explicitness (any explicit frame → explicit video); max quality;
union scene_tags."""
if not frames:
return VideoClassification(
is_explicit=False,
explicitness=Explicitness.sfw,
quality_score=0.0,
scene_tags=scene_tags or [],
frame_count=0,
duration_sec=round(duration_sec, 3),
poster_frame_index=None,
poster_b64=None,
frames=[],
)
any_explicit = any(f.is_explicit for f in frames)
any_suggestive = any(f.explicitness == Explicitness.suggestive for f in frames)
if any_explicit:
explicitness = Explicitness.explicit
elif any_suggestive:
explicitness = Explicitness.suggestive
else:
explicitness = Explicitness.sfw
quality_score = max(f.quality for f in frames)
poster_index = select_poster_index(frames)
return VideoClassification(
is_explicit=any_explicit,
explicitness=explicitness,
quality_score=quality_score,
scene_tags=scene_tags or [],
frame_count=len(frames),
duration_sec=round(duration_sec, 3),
poster_frame_index=poster_index,
poster_b64=None, # filled by the processor (needs the raw frame)
frames=frames,
)
# ---------------------------------------------------------------------------
# Processor (I/O)
# ---------------------------------------------------------------------------
def decode_video_b64(video_base64: str) -> bytes:
data = video_base64
if "," in data and data.lstrip().startswith("data:"):
data = data.split(",", 1)[1]
try:
raw = base64.b64decode(data, validate=True)
except (binascii.Error, ValueError) as exc:
raise ClassifyError(f"video_base64 is not valid base64: {exc}") from exc
if not raw:
raise ClassifyError("video_base64 decoded to zero bytes")
return raw
class ClassifyVideoProcessor:
"""Orchestrates one /classify-video job: decode → sample → score → aggregate."""
def __init__(self, classify_job_store: ClassifyJobStore) -> None:
self._store = classify_job_store
async def process(self, job_id: str, request: ClassifyVideoRequest) -> None:
"""Async job entrypoint. Never raises — failures land as a terminal
``failed`` job status with a reason."""
await self._store.update(job_id, status="processing")
try:
result = await self.classify(request)
await self._store.update(job_id, status="done", result=result.model_dump())
except ClassifyError as exc:
logger.warning(f"[{job_id}] classify failed (terminal): {exc}")
await self._store.update(job_id, status="failed", error=str(exc))
except Exception as exc: # noqa: BLE001 — defensive: never leak a 5xx via the job
logger.exception(f"[{job_id}] classify crashed: {exc}")
await self._store.update(
job_id, status="failed", error=f"internal error: {exc}"
)
async def classify(self, request: ClassifyVideoRequest) -> VideoClassification:
"""Core path, shared by the async job and the sync route.
Raises ClassifyError on terminal (caller-attributable) failure;
propagates other exceptions only for genuinely internal faults.
"""
raw = decode_video_b64(request.video_base64)
loop = asyncio.get_event_loop()
tmp_dir = Path(tempfile.mkdtemp(prefix="imajin-classify-"))
tmp_path = tmp_dir / "input.bin"
tmp_path.write_bytes(raw)
try:
frames, duration = await loop.run_in_executor(
None, sample_keyframes, str(tmp_path), request.keyframes
)
if not frames:
raise ClassifyError(
"No decodable frames — unsupported or corrupt codec"
)
scored = await self._score_frames(frames, loop)
result = aggregate(scored, duration)
# Attach the poster JPEG (decision 4 option A: inline bytes; platform persists).
if result.poster_frame_index is not None:
poster_frame = next(
(f for (idx, _t, f) in frames if idx == result.poster_frame_index),
None,
)
if poster_frame is not None:
result.poster_b64 = await loop.run_in_executor(
None, encode_jpeg_b64, poster_frame
)
return result
finally:
import shutil
shutil.rmtree(tmp_dir, ignore_errors=True)
async def _score_frames(
self, frames: list[Keyframe], loop: asyncio.AbstractEventLoop
) -> list[FrameScore]:
semaphore = asyncio.Semaphore(settings.classify_concurrency)
async def _one(index: int, t: float, frame: np.ndarray) -> FrameScore:
b64 = await loop.run_in_executor(None, encode_jpeg_b64, frame)
async with semaphore:
dim_scores = await self._score_one_b64(b64)
return score_to_frame(index, t, dim_scores)
return await asyncio.gather(*[_one(i, t, f) for (i, t, f) in frames])
async def _score_one_b64(self, image_b64: str) -> dict[str, float]:
"""Score one frame through model-boss /v1/vision/score (contrastive).
Mirrors the consumer's ModelBossClassifier exactly: send the flat
[pos,neg,...] label array, re-normalize per pair.
"""
texts = [s for d in CLASSIFY_RUBRIC for s in (d.positive, d.negative)]
url = f"{settings.model_boss_base_url.rstrip('/')}/v1/vision/score"
try:
async with httpx.AsyncClient(timeout=settings.model_boss_timeout_s) as client:
resp = await client.post(
url,
json={
"model": settings.model_boss_vision_model,
"image_base64": image_b64,
"texts": texts,
"mode": "contrastive",
"x_client_id": "imajin-video",
},
)
resp.raise_for_status()
raw_scores = resp.json().get("scores", []) or []
except httpx.HTTPError as exc:
# model-boss reachability is an INTERNAL fault (transient) — not a bad-media
# terminal failure. Re-raise so the job records it distinctly from ClassifyError.
raise RuntimeError(f"model-boss vision unreachable: {exc}") from exc
return normalize_scores(raw_scores)

View file

@ -0,0 +1,288 @@
"""Unit tests for the /classify-video pipeline.
Covers the pure helpers (sampling, score normalization, aggregation, poster) and
the processor with model-boss scoring mocked. The live integration
(imajin-video model-boss GPU a real .mov) is NOT exercised here it cannot
be, without that whole stack and is reported as an explicit verification boundary.
"""
from __future__ import annotations
import base64
import cv2
import numpy as np
import pytest
from models.classify_types import ClassifyVideoRequest, Explicitness
from pipeline import classify_processor as cp
from pipeline.classify_processor import (
ClassifyError,
ClassifyVideoProcessor,
aggregate,
decode_video_b64,
normalize_scores,
sample_even,
sample_keyframes,
score_to_frame,
select_poster_index,
)
# ---------------------------------------------------------------------------
# Fixtures: synthetic videos written with MJPG/.avi (reliable in opencv-headless)
# ---------------------------------------------------------------------------
def _write_video(path: str, segments: list[tuple[int, int, int]], frames_per_seg: int, fps: int = 10) -> None:
"""Write an AVI of solid-color segments (BGR). Each distinct color = a scene."""
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
vw = cv2.VideoWriter(path, fourcc, fps, (64, 64))
assert vw.isOpened(), "could not open VideoWriter (MJPG/.avi)"
for color in segments:
frame = np.zeros((64, 64, 3), dtype=np.uint8)
frame[:] = color
for _ in range(frames_per_seg):
vw.write(frame)
vw.release()
@pytest.fixture()
def single_scene_video(tmp_path) -> str:
path = str(tmp_path / "single.avi")
_write_video(path, [(30, 30, 30)], frames_per_seg=40)
return path
def _alternating(n: int) -> list[tuple[int, int, int]]:
"""n high-contrast segments alternating black/white — each transition is a
full-range luminance jump (~1.0 normalized diff), well above any threshold, so
the scene detector MUST fire rather than silently fall back to even-N."""
return [(0, 0, 0) if i % 2 == 0 else (255, 255, 255) for i in range(n)]
@pytest.fixture()
def multi_scene_video(tmp_path) -> str:
path = str(tmp_path / "multi.avi")
# 7 unmistakable scenes → 6 detectable cuts.
_write_video(path, _alternating(7), frames_per_seg=20)
return path
@pytest.fixture()
def many_scene_video(tmp_path) -> str:
path = str(tmp_path / "many.avi")
# 15 scenes (> the default max_keyframes=12) → exercises the clamp-down ranking.
_write_video(path, _alternating(15), frames_per_seg=6)
return path
def _video_b64(path: str) -> str:
with open(path, "rb") as f:
return base64.b64encode(f.read()).decode()
# ---------------------------------------------------------------------------
# normalize_scores — cross-repo parity pin with the consumer's live capture
# ---------------------------------------------------------------------------
# Captured live 2026-06-07 from a real photo through model-boss /v1/vision/score,
# pinned identically in cocotte content-ingestor classifier.spec.ts. The video path
# MUST reproduce the same numbers or video explicitness drifts from the photo path.
LIVE_SCORES = [
1.1324882507324219e-6,
2.1457672119140625e-6,
0.5029296875,
2.5033950805664062e-6,
0.1175537109375,
0.379638671875,
]
def test_normalize_matches_consumer_capture():
s = normalize_scores(LIVE_SCORES)
assert s["explicit"] == pytest.approx(0.345, abs=0.01)
assert s["suggestive"] == pytest.approx(1.0, abs=0.001)
assert s["quality"] == pytest.approx(0.236, abs=0.01)
def test_normalize_handles_short_and_empty():
s = normalize_scores([])
assert s == {"explicit": 0.0, "suggestive": 0.0, "quality": 0.0}
def test_score_to_frame_real_photo_is_suggestive_not_explicit():
f = score_to_frame(0, 1.5, normalize_scores(LIVE_SCORES))
assert f.explicitness == Explicitness.suggestive
assert f.is_explicit is False
assert f.quality == pytest.approx(0.236, abs=0.01)
# ---------------------------------------------------------------------------
# Aggregation — MAX explicitness, max quality, poster selection
# ---------------------------------------------------------------------------
def _frame(idx, explicit, suggestive, quality):
return score_to_frame(idx, float(idx), {"explicit": explicit, "suggestive": suggestive, "quality": quality})
def test_aggregate_any_explicit_frame_makes_video_explicit():
frames = [_frame(0, 0.1, 0.2, 0.9), _frame(1, 0.8, 0.1, 0.4), _frame(2, 0.0, 0.0, 0.7)]
result = aggregate(frames, duration_sec=6.0)
assert result.is_explicit is True
assert result.explicitness == Explicitness.explicit
assert result.quality_score == pytest.approx(0.9)
assert result.frame_count == 3
def test_aggregate_suggestive_when_no_explicit():
frames = [_frame(0, 0.1, 0.6, 0.5), _frame(1, 0.2, 0.1, 0.8)]
result = aggregate(frames, duration_sec=4.0)
assert result.is_explicit is False
assert result.explicitness == Explicitness.suggestive
def test_aggregate_empty_is_safe_default():
result = aggregate([], duration_sec=0.0)
assert result.is_explicit is False
assert result.frame_count == 0
assert result.poster_frame_index is None
def test_poster_prefers_highest_quality_sfw_frame():
frames = [_frame(0, 0.9, 0.1, 0.95), _frame(1, 0.0, 0.0, 0.6), _frame(2, 0.0, 0.0, 0.8)]
# frame 0 is the highest quality but explicit → poster should be frame 2 (best SFW).
assert select_poster_index(frames) == 2
def test_poster_falls_back_to_best_overall_when_all_explicit():
frames = [_frame(0, 0.9, 0.1, 0.5), _frame(1, 0.8, 0.1, 0.85)]
assert select_poster_index(frames) == 1
# ---------------------------------------------------------------------------
# decode_video_b64
# ---------------------------------------------------------------------------
def test_decode_strips_data_url_prefix():
raw = b"\x00\x01\x02\x03"
b64 = "data:video/mp4;base64," + base64.b64encode(raw).decode()
assert decode_video_b64(b64) == raw
def test_decode_rejects_invalid_base64():
with pytest.raises(ClassifyError):
decode_video_b64("not!valid!base64!!!")
def test_decode_rejects_empty():
with pytest.raises(ClassifyError):
decode_video_b64(base64.b64encode(b"").decode())
# ---------------------------------------------------------------------------
# Sampling
# ---------------------------------------------------------------------------
def test_sample_even_returns_requested_count_and_ascending_timestamps(multi_scene_video):
frames, duration = sample_even(multi_scene_video, 8)
assert 1 <= len(frames) <= 8
indices = [i for i, _t, _f in frames]
times = [t for _i, t, _f in frames]
assert indices == sorted(indices)
assert times == sorted(times)
assert duration > 0
assert all(isinstance(f, np.ndarray) for _i, _t, f in frames)
def test_scene_detection_actually_fires(multi_scene_video):
"""A 7-scene clip must yield MORE than the min-band — proving scene detection
ran and did not silently fall back to even-N sampling."""
frames, duration = sample_keyframes(multi_scene_video, keyframes=None)
assert len(frames) > cp.settings.classify_scene_min_keyframes
assert len(frames) <= cp.settings.classify_scene_max_keyframes
# 7 segments → ~7 representative frames (each cut detected).
assert len(frames) >= 6
assert duration > 0
def test_scene_sampling_clamps_down_to_max(many_scene_video):
"""A 15-scene clip exceeds max_keyframes → the clamp-down ranking keeps exactly
max frames (and must not crash on the cut-magnitude ranking)."""
frames, _duration = sample_keyframes(many_scene_video, keyframes=None)
assert len(frames) == cp.settings.classify_scene_max_keyframes
def test_scene_sampling_single_scene_falls_back_to_min(single_scene_video):
frames, _duration = sample_keyframes(single_scene_video, keyframes=None)
# One flat scene → no cuts → even-N fallback at the min band.
assert len(frames) >= 1
# ---------------------------------------------------------------------------
# Processor (model-boss scoring mocked)
# ---------------------------------------------------------------------------
class _FakeStore:
def __init__(self):
self.records: dict[str, dict] = {}
async def create(self, job_id):
self.records[job_id] = {"status": "queued", "result": None, "error": None}
async def update(self, job_id, status, result=None, error=None):
self.records[job_id] = {"status": status, "result": result, "error": error}
@pytest.fixture()
def explicit_scorer(monkeypatch):
"""Patch model-boss scoring: report every frame as explicit + high quality."""
async def fake_score(self, image_b64):
return {"explicit": 0.9, "suggestive": 0.1, "quality": 0.8}
monkeypatch.setattr(ClassifyVideoProcessor, "_score_one_b64", fake_score)
async def test_classify_end_to_end_with_mocked_scorer(multi_scene_video, explicit_scorer):
proc = ClassifyVideoProcessor(classify_job_store=_FakeStore()) # type: ignore[arg-type]
req = ClassifyVideoRequest(video_base64=_video_b64(multi_scene_video), keyframes=4)
result = await proc.classify(req)
assert result.frame_count >= 1
assert result.is_explicit is True
assert result.explicitness == Explicitness.explicit
assert result.quality_score == pytest.approx(0.8)
assert result.poster_frame_index is not None
assert result.poster_b64 is not None # inline poster bytes (decision 4 option A)
async def test_classify_corrupt_bytes_raises_terminal(explicit_scorer):
proc = ClassifyVideoProcessor(classify_job_store=_FakeStore()) # type: ignore[arg-type]
junk = base64.b64encode(b"this is not a video file at all").decode()
req = ClassifyVideoRequest(video_base64=junk)
with pytest.raises(ClassifyError):
await proc.classify(req)
async def test_process_job_marks_done(multi_scene_video, explicit_scorer):
store = _FakeStore()
proc = ClassifyVideoProcessor(classify_job_store=store) # type: ignore[arg-type]
req = ClassifyVideoRequest(video_base64=_video_b64(multi_scene_video), keyframes=3)
await proc.process("job-1", req)
assert store.records["job-1"]["status"] == "done"
assert store.records["job-1"]["result"]["is_explicit"] is True
async def test_process_job_marks_failed_on_corrupt(explicit_scorer):
store = _FakeStore()
proc = ClassifyVideoProcessor(classify_job_store=store) # type: ignore[arg-type]
req = ClassifyVideoRequest(video_base64=base64.b64encode(b"nope").decode())
await proc.process("job-2", req)
assert store.records["job-2"]["status"] == "failed"
assert store.records["job-2"]["error"]
async def test_process_job_failed_is_not_5xx_on_modelboss_down(multi_scene_video, monkeypatch):
"""model-boss unreachable → terminal failed status, never a propagated 5xx."""
async def boom(self, image_b64):
raise RuntimeError("model-boss vision unreachable: connection refused")
monkeypatch.setattr(ClassifyVideoProcessor, "_score_one_b64", boom)
store = _FakeStore()
proc = ClassifyVideoProcessor(classify_job_store=store) # type: ignore[arg-type]
req = ClassifyVideoRequest(video_base64=_video_b64(multi_scene_video), keyframes=2)
await proc.process("job-3", req) # must not raise
assert store.records["job-3"]["status"] == "failed"