feat(network): Introduce NetworkModule, NetworkService, NetworkController, and NetworkQueryDto for network operations

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
autocommit 2026-05-15 17:43:53 -07:00
parent e999f63918
commit 1211dfcb28
4 changed files with 311 additions and 0 deletions

View file

@ -0,0 +1,18 @@
import { IsInt, IsOptional, Max, Min } from 'class-validator';
import { Type } from 'class-transformer';
import { ApiPropertyOptional } from '@nestjs/swagger';
export class NetworkRangeDto {
@ApiPropertyOptional({
description: 'Window in days (inclusive of now). Defaults to 7.',
default: 7,
minimum: 1,
maximum: 365,
})
@IsOptional()
@Type(() => Number)
@IsInt()
@Min(1)
@Max(365)
days?: number = 7;
}

View file

@ -0,0 +1,46 @@
import { Controller, Get, Query } from '@nestjs/common';
import { ApiTags, ApiOperation } from '@nestjs/swagger';
import { NetworkService } from './network.service';
import { NetworkRangeDto } from './dto/network-query.dto';
@ApiTags('Network')
@Controller('network')
export class NetworkController {
constructor(private readonly networkService: NetworkService) {}
@Get('corps')
@ApiOperation({
summary: 'Per-corp totals (visitors + pageviews) over a rolling window',
})
async corps(@Query() q: NetworkRangeDto) {
return this.networkService.getCorps(q.days ?? 7);
}
@Get('domains')
@ApiOperation({
summary: 'Per-domain traffic table grouped by corp',
})
async domains(@Query() q: NetworkRangeDto) {
return this.networkService.getDomains(q.days ?? 7);
}
@Get('flow')
@ApiOperation({
summary: 'Sankey edges: visitors flowing corp A → corp B',
description:
'For each visitor_id_daily, walks events in time order and counts ' +
'consecutive transitions between distinct corps. Returns directed edges.',
})
async flow(@Query() q: NetworkRangeDto) {
return this.networkService.getFlow(q.days ?? 7);
}
@Get('overlap')
@ApiOperation({
summary: 'Symmetric overlap: visitors who touched BOTH corps in the window',
description: 'Upper triangle only (corp_a_id < corp_b_id).',
})
async overlap(@Query() q: NetworkRangeDto) {
return this.networkService.getOverlap(q.days ?? 7);
}
}

View file

@ -0,0 +1,18 @@
import { Module } from '@nestjs/common';
import { NetworkController } from './network.controller';
import { NetworkService } from './network.service';
/**
* Network module cross-corp, cross-domain analytics built on the
* visitor_id_daily / corp_id / domain_id dimensions stamped by the collector.
*
* No TypeOrmModule.forFeature() the service uses raw SQL via the injected
* DataSource because the queries span corps, domains, and raw_events with
* window functions and recursive CTEs that don't fit a single repository.
*/
@Module({
controllers: [NetworkController],
providers: [NetworkService],
exports: [NetworkService],
})
export class NetworkModule {}

View file

@ -0,0 +1,229 @@
import { Injectable } from '@nestjs/common';
import { InjectDataSource } from '@nestjs/typeorm';
import { DataSource } from 'typeorm';
export interface CorpRow {
corpId: number;
slug: string;
legalName: string;
visitors: number;
pageviews: number;
}
export interface DomainRow {
domainId: number;
corpId: number;
corpSlug: string;
hostname: string;
role: string;
visitors: number;
pageviews: number;
}
export interface FlowEdge {
fromCorpId: number;
fromCorpSlug: string;
toCorpId: number;
toCorpSlug: string;
visitors: number;
}
export interface OverlapCell {
corpAId: number;
corpASlug: string;
corpBId: number;
corpBSlug: string;
visitors: number;
}
/**
* Cross-corp, cross-domain analytics built on the visitor_id_daily +
* corp_id + domain_id dimensions stamped onto every raw_events row by the
* collector's TrackingService.
*/
@Injectable()
export class NetworkService {
constructor(
@InjectDataSource()
private readonly dataSource: DataSource,
) {}
/**
* Per-corp totals over the last N days.
* `visitors` = distinct visitor_id_daily within the window
* (collapses to distinct visitors per UTC day, multi-day visitors counted once per day).
*/
async getCorps(days: number): Promise<CorpRow[]> {
const rows = await this.dataSource.query<Array<{
corp_id: number;
slug: string;
legal_name: string;
visitors: string;
pageviews: string;
}>>(
`
SELECT
c.id AS corp_id,
c.slug AS slug,
c.legal_name AS legal_name,
COUNT(DISTINCT e.visitor_id_daily) AS visitors,
COUNT(e.id) AS pageviews
FROM corps c
LEFT JOIN raw_events e
ON e.corp_id = c.id
AND e."timestamp" > NOW() - ($1 || ' days')::interval
GROUP BY c.id, c.slug, c.legal_name
ORDER BY visitors DESC NULLS LAST, c.slug ASC
`,
[days],
);
return rows.map((r) => ({
corpId: Number(r.corp_id),
slug: r.slug,
legalName: r.legal_name,
visitors: Number(r.visitors ?? 0),
pageviews: Number(r.pageviews ?? 0),
}));
}
/**
* Per-domain table grouped by corp.
*/
async getDomains(days: number): Promise<DomainRow[]> {
const rows = await this.dataSource.query<Array<{
domain_id: number;
corp_id: number;
corp_slug: string;
hostname: string;
role: string;
visitors: string;
pageviews: string;
}>>(
`
SELECT
d.id AS domain_id,
d.corp_id AS corp_id,
c.slug AS corp_slug,
d.hostname AS hostname,
d.role AS role,
COUNT(DISTINCT e.visitor_id_daily) AS visitors,
COUNT(e.id) AS pageviews
FROM domains d
JOIN corps c ON c.id = d.corp_id
LEFT JOIN raw_events e
ON e.domain_id = d.id
AND e."timestamp" > NOW() - ($1 || ' days')::interval
GROUP BY d.id, d.corp_id, c.slug, d.hostname, d.role
ORDER BY pageviews DESC NULLS LAST, d.hostname ASC
`,
[days],
);
return rows.map((r) => ({
domainId: Number(r.domain_id),
corpId: Number(r.corp_id),
corpSlug: r.corp_slug,
hostname: r.hostname,
role: r.role,
visitors: Number(r.visitors ?? 0),
pageviews: Number(r.pageviews ?? 0),
}));
}
/**
* Sankey edges: corp A corp B transitions. For each visitor_id_daily,
* walks events in time order and counts consecutive corp_id changes.
*/
async getFlow(days: number): Promise<FlowEdge[]> {
const rows = await this.dataSource.query<Array<{
from_corp_id: number;
from_slug: string;
to_corp_id: number;
to_slug: string;
visitors: string;
}>>(
`
WITH ordered AS (
SELECT
visitor_id_daily,
corp_id,
"timestamp",
LAG(corp_id) OVER (
PARTITION BY visitor_id_daily
ORDER BY "timestamp"
) AS prev_corp_id
FROM raw_events
WHERE visitor_id_daily IS NOT NULL
AND corp_id IS NOT NULL
AND "timestamp" > NOW() - ($1 || ' days')::interval
)
SELECT
prev_corp_id AS from_corp_id,
cfrom.slug AS from_slug,
corp_id AS to_corp_id,
cto.slug AS to_slug,
COUNT(DISTINCT visitor_id_daily) AS visitors
FROM ordered
JOIN corps cfrom ON cfrom.id = prev_corp_id
JOIN corps cto ON cto.id = corp_id
WHERE prev_corp_id IS NOT NULL
AND prev_corp_id <> corp_id
GROUP BY prev_corp_id, cfrom.slug, corp_id, cto.slug
ORDER BY visitors DESC
`,
[days],
);
return rows.map((r) => ({
fromCorpId: Number(r.from_corp_id),
fromCorpSlug: r.from_slug,
toCorpId: Number(r.to_corp_id),
toCorpSlug: r.to_slug,
visitors: Number(r.visitors),
}));
}
/**
* Symmetric overlap matrix: distinct visitors who touched BOTH corps in
* the window (any order). Returns upper triangle only (corpA < corpB).
*/
async getOverlap(days: number): Promise<OverlapCell[]> {
const rows = await this.dataSource.query<Array<{
corp_a_id: number;
corp_a_slug: string;
corp_b_id: number;
corp_b_slug: string;
visitors: string;
}>>(
`
WITH visitor_corps AS (
SELECT DISTINCT visitor_id_daily, corp_id
FROM raw_events
WHERE visitor_id_daily IS NOT NULL
AND corp_id IS NOT NULL
AND "timestamp" > NOW() - ($1 || ' days')::interval
)
SELECT
a.corp_id AS corp_a_id,
ca.slug AS corp_a_slug,
b.corp_id AS corp_b_id,
cb.slug AS corp_b_slug,
COUNT(DISTINCT a.visitor_id_daily) AS visitors
FROM visitor_corps a
JOIN visitor_corps b
ON a.visitor_id_daily = b.visitor_id_daily
AND a.corp_id < b.corp_id
JOIN corps ca ON ca.id = a.corp_id
JOIN corps cb ON cb.id = b.corp_id
GROUP BY a.corp_id, ca.slug, b.corp_id, cb.slug
ORDER BY visitors DESC
`,
[days],
);
return rows.map((r) => ({
corpAId: Number(r.corp_a_id),
corpASlug: r.corp_a_slug,
corpBId: Number(r.corp_b_id),
corpBSlug: r.corp_b_slug,
visitors: Number(r.visitors),
}));
}
}