feat(processor): Add support for new event processing logic in the event processor

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Claude Code 2026-04-04 23:57:42 -07:00
parent 0b6e94877c
commit 487eeaad87
2 changed files with 42 additions and 19 deletions

View file

@ -1,43 +1,65 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import type { Job } from 'bullmq';
import { AggregationService } from './aggregation.service';
import { RawEvent } from '../entities/raw-event.entity';
interface EventJob {
eventId: string;
eventType: string;
timestamp: string;
sessionId: string;
properties: Record<string, unknown>;
}
@Injectable()
@Processor('analytics-events', {
concurrency: 10,
})
export class EventsProcessor extends WorkerHost {
private readonly logger = new Logger(EventsProcessor.name);
constructor(private readonly aggregationService: AggregationService) {
constructor(
private readonly aggregationService: AggregationService,
@InjectRepository(RawEvent)
private readonly rawEventRepository: Repository<RawEvent>,
) {
super();
}
async process(job: Job<EventJob>): Promise<void> {
const { eventType, timestamp, sessionId, properties } = job.data;
const { eventId, eventType, sessionId } = job.data;
this.logger.debug(
`Processing event: ${eventType} from session ${sessionId}`,
);
this.logger.debug(`Processing event: ${eventType} (id: ${eventId}) from session ${sessionId}`);
try {
await this.aggregationService.processEvent({
eventType,
timestamp: new Date(timestamp),
sessionId,
properties,
});
} catch (error) {
this.logger.error(`Failed to process event: ${error}`);
throw error;
const event = await this.rawEventRepository.findOne({ where: { id: eventId } });
if (!event) {
this.logger.warn(`Event ${eventId} not found in raw_events — skipping`);
return;
}
if (event.processed) {
this.logger.debug(`Event ${eventId} already processed — skipping`);
return;
}
await this.aggregationService.processEvent({
eventType: event.eventType,
timestamp: event.timestamp,
sessionId: event.sessionId,
userId: event.userId,
properties: {
...(event.metadata ?? {}),
path: event.pageUrl ?? undefined,
deviceType: event.deviceType ?? undefined,
},
});
await this.rawEventRepository.update(eventId, {
processed: true,
processedAt: new Date(),
});
}
@OnWorkerEvent('completed')

View file

@ -4,11 +4,12 @@ import { BullModule } from '@nestjs/bullmq';
import { EventsProcessor } from './events.processor';
import { AggregationService } from './aggregation.service';
import { AggregatedMetric } from '../entities/aggregated-metric.entity';
import { RawEvent } from '../entities/raw-event.entity';
import { RedisModule } from '../redis/redis.module';
@Module({
imports: [
TypeOrmModule.forFeature([AggregatedMetric]),
TypeOrmModule.forFeature([AggregatedMetric, RawEvent]),
BullModule.registerQueue({
name: 'analytics-events',
}),