feat(tracking): ✨ Introduce cross-domain tracking system with RawEvent, TrackingService, DomainResolverService, and IdentityService integration
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
parent
9ca4222c0d
commit
8826e7129a
7 changed files with 252 additions and 5 deletions
|
|
@ -7,7 +7,10 @@ import { TypeOrmModule } from '@nestjs/typeorm';
|
|||
import { TrackingModule } from './tracking/tracking.module';
|
||||
import { HealthModule } from './health/health.module';
|
||||
import { RawEvent } from './entities/raw-event.entity';
|
||||
import { SessionFingerprint } from './entities/session-fingerprint.entity';
|
||||
import { SessionFingerprint } from "./entities/session-fingerprint.entity";
|
||||
import { Corp } from "./entities/corp.entity";
|
||||
import { Domain } from "./entities/domain.entity";
|
||||
import { VisitorSalt } from "./entities/visitor-salt.entity";
|
||||
import { WriteKeyGuard } from './auth/write-key.guard';
|
||||
|
||||
@Module({
|
||||
|
|
@ -36,7 +39,7 @@ import { WriteKeyGuard } from './auth/write-key.guard';
|
|||
username: config.get('DATABASE_USER', 'analytics'),
|
||||
password: config.get('DATABASE_PASSWORD', 'analytics'),
|
||||
database: config.get('DATABASE_NAME', 'analytics'),
|
||||
entities: [RawEvent, SessionFingerprint],
|
||||
entities: [RawEvent, SessionFingerprint, Corp, Domain, VisitorSalt],
|
||||
synchronize: config.get('DB_SYNCHRONIZE') === 'true',
|
||||
logging: config.get('NODE_ENV') === 'development',
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -14,6 +14,9 @@ import {
|
|||
@Index(['sessionId', 'timestamp'])
|
||||
@Index(['eventType', 'timestamp'])
|
||||
@Index(['processed', 'timestamp'])
|
||||
@Index(['visitorIdDaily', 'timestamp'])
|
||||
@Index(['corpId', 'timestamp'])
|
||||
@Index(['domainId', 'timestamp'])
|
||||
export class RawEvent {
|
||||
@PrimaryGeneratedColumn('uuid')
|
||||
id!: string;
|
||||
|
|
@ -70,4 +73,21 @@ export class RawEvent {
|
|||
/** Processing timestamp */
|
||||
@Column({ type: 'timestamptz', nullable: true })
|
||||
processedAt?: Date | null;
|
||||
|
||||
/**
|
||||
* Daily-rotating visitor identity for cross-domain stitching.
|
||||
* sha256(daily_salt || ip || ua || lang). 32 bytes. Stable for the same
|
||||
* visitor across all our domains within a UTC day; unrecoverable after
|
||||
* salt rotation (cookie-free, GDPR-clean).
|
||||
*/
|
||||
@Column({ type: 'bytea', nullable: true, name: 'visitor_id_daily' })
|
||||
visitorIdDaily?: Buffer | null;
|
||||
|
||||
/** Corp that owns the originating domain. FK → corps(id). */
|
||||
@Column({ type: 'smallint', nullable: true, name: 'corp_id' })
|
||||
corpId?: number | null;
|
||||
|
||||
/** Domain the event originated from. FK → domains(id). */
|
||||
@Column({ type: 'integer', nullable: true, name: 'domain_id' })
|
||||
domainId?: number | null;
|
||||
}
|
||||
|
|
|
|||
71
services/collector/src/tracking/domain-resolver.service.ts
Normal file
71
services/collector/src/tracking/domain-resolver.service.ts
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
import { Domain } from '../entities/domain.entity';
|
||||
|
||||
export interface ResolvedDomain {
|
||||
domainId: number;
|
||||
corpId: number;
|
||||
hostname: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves request → (corp_id, domain_id) by matching the request's hostname
|
||||
* against the `domains` table. The proxy strips `Host` upstream, so we read
|
||||
* Origin first, then Referer as a fallback.
|
||||
*
|
||||
* Unknown hosts return null — the caller decides whether to drop the event or
|
||||
* persist with NULL dimensions. Default: drop, to keep the dataset clean.
|
||||
*/
|
||||
@Injectable()
|
||||
export class DomainResolverService {
|
||||
private readonly logger = new Logger(DomainResolverService.name);
|
||||
|
||||
/** hostname → resolved row. Reloaded on miss. */
|
||||
private cache = new Map<string, ResolvedDomain | null>();
|
||||
|
||||
constructor(
|
||||
@InjectRepository(Domain)
|
||||
private readonly domainRepo: Repository<Domain>,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Extract the hostname from the most reliable header available.
|
||||
* Returns lowercase, no port, no path.
|
||||
*/
|
||||
hostnameFromHeaders(headers: Record<string, string | string[] | undefined>): string | null {
|
||||
const pick = (h: string | string[] | undefined): string | undefined => {
|
||||
if (Array.isArray(h)) return h[0];
|
||||
return h;
|
||||
};
|
||||
const origin = pick(headers['origin']);
|
||||
const referer = pick(headers['referer']);
|
||||
const host = pick(headers['host']);
|
||||
const candidate = origin ?? referer ?? host;
|
||||
if (!candidate) return null;
|
||||
try {
|
||||
const u = candidate.includes('://') ? new URL(candidate) : new URL(`http://${candidate}`);
|
||||
return u.hostname.toLowerCase();
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/** Look up a domain row by hostname. */
|
||||
async resolve(hostname: string | null): Promise<ResolvedDomain | null> {
|
||||
if (!hostname) return null;
|
||||
const cached = this.cache.get(hostname);
|
||||
if (cached !== undefined) return cached;
|
||||
|
||||
const row = await this.domainRepo.findOne({ where: { hostname } });
|
||||
const resolved: ResolvedDomain | null = row
|
||||
? { domainId: row.id, corpId: row.corpId, hostname: row.hostname }
|
||||
: null;
|
||||
this.cache.set(hostname, resolved);
|
||||
if (!resolved) {
|
||||
this.logger.warn({ hostname }, 'Unknown domain — event will be dropped or untagged');
|
||||
}
|
||||
return resolved;
|
||||
}
|
||||
}
|
||||
98
services/collector/src/tracking/identity.service.ts
Normal file
98
services/collector/src/tracking/identity.service.ts
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Repository } from 'typeorm';
|
||||
import * as crypto from 'crypto';
|
||||
|
||||
import { VisitorSalt } from '../entities/visitor-salt.entity';
|
||||
|
||||
/**
|
||||
* Server-side visitor identity for cross-domain stitching.
|
||||
*
|
||||
* visitor_id_daily = sha256(daily_salt || ip || ua || accept_language)
|
||||
*
|
||||
* Same visitor across all our domains within a UTC day → same id.
|
||||
* Salt rotates at 00:00 UTC and is purged from `visitor_salts` after 7 days,
|
||||
* making historical re-identification mathematically impossible.
|
||||
*/
|
||||
@Injectable()
|
||||
export class IdentityService {
|
||||
private readonly logger = new Logger(IdentityService.name);
|
||||
|
||||
/** Process-local salt cache. Keyed by YYYY-MM-DD (UTC). */
|
||||
private cache = new Map<string, Buffer>();
|
||||
|
||||
constructor(
|
||||
@InjectRepository(VisitorSalt)
|
||||
private readonly saltRepo: Repository<VisitorSalt>,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Compute today's visitor id from request attributes.
|
||||
* Returns a 32-byte Buffer (sha256 digest), or null if no identifying
|
||||
* attribute is present (extremely unlikely — we always have at minimum a UA).
|
||||
*/
|
||||
async visitorIdDaily(
|
||||
ip: string | undefined,
|
||||
ua: string | undefined,
|
||||
lang: string | undefined,
|
||||
now: Date = new Date(),
|
||||
): Promise<Buffer | null> {
|
||||
if (!ip && !ua && !lang) return null;
|
||||
|
||||
const day = this.utcDay(now);
|
||||
const salt = await this.getSaltFor(day);
|
||||
|
||||
const h = crypto.createHash('sha256');
|
||||
h.update(salt);
|
||||
h.update('\n');
|
||||
h.update(ip ?? '');
|
||||
h.update('\n');
|
||||
h.update(ua ?? '');
|
||||
h.update('\n');
|
||||
h.update(lang ?? '');
|
||||
return h.digest();
|
||||
}
|
||||
|
||||
/** YYYY-MM-DD in UTC. */
|
||||
private utcDay(d: Date): string {
|
||||
return d.toISOString().slice(0, 10);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the salt for `day`, creating one race-safely if missing.
|
||||
* Memoized for the lifetime of the process; cache invalidates when the
|
||||
* UTC day rolls over (entries from older days remain valid for late events).
|
||||
*/
|
||||
private async getSaltFor(day: string): Promise<Buffer> {
|
||||
const cached = this.cache.get(day);
|
||||
if (cached) return cached;
|
||||
|
||||
// Try read.
|
||||
const existing = await this.saltRepo.findOne({ where: { day } });
|
||||
if (existing) {
|
||||
const buf = Buffer.isBuffer(existing.salt) ? existing.salt : Buffer.from(existing.salt);
|
||||
this.cache.set(day, buf);
|
||||
return buf;
|
||||
}
|
||||
|
||||
// Create. Race-safe via ON CONFLICT DO NOTHING — re-fetch if we lost.
|
||||
const fresh = crypto.randomBytes(32);
|
||||
await this.saltRepo
|
||||
.createQueryBuilder()
|
||||
.insert()
|
||||
.into(VisitorSalt)
|
||||
.values({ day, salt: fresh })
|
||||
.orIgnore()
|
||||
.execute();
|
||||
|
||||
const final = await this.saltRepo.findOne({ where: { day } });
|
||||
if (!final) {
|
||||
// Should be impossible — orIgnore() either inserted ours or kept theirs.
|
||||
this.logger.error({ day }, 'Failed to read visitor_salts row after race-safe insert');
|
||||
throw new Error(`visitor_salts row for ${day} missing after insert`);
|
||||
}
|
||||
const buf = Buffer.isBuffer(final.salt) ? final.salt : Buffer.from(final.salt);
|
||||
this.cache.set(day, buf);
|
||||
return buf;
|
||||
}
|
||||
}
|
||||
|
|
@ -11,3 +11,6 @@ export type {
|
|||
AttributionInput,
|
||||
ResolvedAttribution,
|
||||
} from './attribution.service';
|
||||
export { IdentityService } from "./identity.service";
|
||||
export { DomainResolverService } from "./domain-resolver.service";
|
||||
export type { ResolvedDomain } from "./domain-resolver.service";
|
||||
|
|
|
|||
|
|
@ -3,21 +3,33 @@ import { TypeOrmModule } from '@nestjs/typeorm';
|
|||
import { BullModule } from '@nestjs/bullmq';
|
||||
import { RawEvent } from '../entities/raw-event.entity';
|
||||
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
|
||||
import { Corp } from '../entities/corp.entity';
|
||||
import { Domain } from '../entities/domain.entity';
|
||||
import { VisitorSalt } from '../entities/visitor-salt.entity';
|
||||
import { TrackingService } from './tracking.service';
|
||||
import { TrackingController } from './tracking.controller';
|
||||
import { DeviceEnrichmentService } from './device-enrichment.service';
|
||||
import { AttributionService } from './attribution.service';
|
||||
import { GovDetectionService } from './gov-detection.service';
|
||||
import { IdentityService } from './identity.service';
|
||||
import { DomainResolverService } from './domain-resolver.service';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
TypeOrmModule.forFeature([RawEvent, SessionFingerprint]),
|
||||
TypeOrmModule.forFeature([RawEvent, SessionFingerprint, Corp, Domain, VisitorSalt]),
|
||||
BullModule.registerQueue({
|
||||
name: 'analytics-events',
|
||||
}),
|
||||
],
|
||||
controllers: [TrackingController],
|
||||
providers: [TrackingService, DeviceEnrichmentService, AttributionService, GovDetectionService],
|
||||
exports: [TrackingService],
|
||||
providers: [
|
||||
TrackingService,
|
||||
DeviceEnrichmentService,
|
||||
AttributionService,
|
||||
GovDetectionService,
|
||||
IdentityService,
|
||||
DomainResolverService,
|
||||
],
|
||||
exports: [TrackingService, IdentityService, DomainResolverService],
|
||||
})
|
||||
export class TrackingModule {}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ import { RawEvent } from '../entities/raw-event.entity';
|
|||
import { SessionFingerprint } from '../entities/session-fingerprint.entity';
|
||||
import { DeviceEnrichmentService, ClientDeviceData, EnrichedDeviceData } from './device-enrichment.service';
|
||||
import { AttributionService, AttributionInput, ResolvedAttribution } from './attribution.service';
|
||||
import { IdentityService } from './identity.service';
|
||||
import { DomainResolverService } from './domain-resolver.service';
|
||||
|
||||
export interface TrackViewInput {
|
||||
pageUrl: string;
|
||||
|
|
@ -57,6 +59,8 @@ export class TrackingService {
|
|||
constructor(
|
||||
private readonly deviceEnrichmentService: DeviceEnrichmentService,
|
||||
private readonly attributionService: AttributionService,
|
||||
private readonly identityService: IdentityService,
|
||||
private readonly domainResolver: DomainResolverService,
|
||||
@InjectRepository(RawEvent)
|
||||
private readonly rawEventRepository: Repository<RawEvent>,
|
||||
@InjectRepository(SessionFingerprint)
|
||||
|
|
@ -65,6 +69,35 @@ export class TrackingService {
|
|||
private readonly eventsQueue: Queue,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Derive cross-domain dimensions for a request:
|
||||
* - visitor_id_daily: sha256(daily_salt || ip || ua || lang)
|
||||
* - corp_id, domain_id: from Origin/Referer/Host → domains row
|
||||
*
|
||||
* Returns nulls for any dimension we can't resolve. Callers persist all
|
||||
* three on the event row.
|
||||
*/
|
||||
private async resolveCrossDomainDimensions(request: Request): Promise<{
|
||||
visitorIdDaily: Buffer | null;
|
||||
corpId: number | null;
|
||||
domainId: number | null;
|
||||
}> {
|
||||
const ua = (request.headers['user-agent'] as string | undefined) ?? undefined;
|
||||
const lang = (request.headers['accept-language'] as string | undefined) ?? undefined;
|
||||
const ip = this.extractIpAddress(request) ?? undefined;
|
||||
|
||||
const visitorIdDaily = await this.identityService.visitorIdDaily(ip, ua, lang);
|
||||
|
||||
const hostname = this.domainResolver.hostnameFromHeaders(request.headers);
|
||||
const resolved = await this.domainResolver.resolve(hostname);
|
||||
|
||||
return {
|
||||
visitorIdDaily,
|
||||
corpId: resolved?.corpId ?? null,
|
||||
domainId: resolved?.domainId ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Track page view with device fingerprinting
|
||||
*/
|
||||
|
|
@ -100,6 +133,10 @@ export class TrackingService {
|
|||
data.pageUrl,
|
||||
);
|
||||
|
||||
// Cross-domain dimensions
|
||||
const { visitorIdDaily, corpId, domainId } =
|
||||
await this.resolveCrossDomainDimensions(request);
|
||||
|
||||
// Create raw event
|
||||
const event = this.rawEventRepository.create({
|
||||
eventType: 'pageview',
|
||||
|
|
@ -111,6 +148,9 @@ export class TrackingService {
|
|||
metadata: data.metadata ?? null,
|
||||
timestamp: new Date(),
|
||||
processed: false,
|
||||
visitorIdDaily,
|
||||
corpId,
|
||||
domainId,
|
||||
});
|
||||
|
||||
const saved = await this.rawEventRepository.save(event);
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue