From 487eeaad8780cb9bfdf17bd08a604d4ca2219691 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Sat, 4 Apr 2026 23:57:42 -0700 Subject: [PATCH] =?UTF-8?q?feat(processor):=20=E2=9C=A8=20Add=20support=20?= =?UTF-8?q?for=20new=20event=20processing=20logic=20in=20the=20event=20pro?= =?UTF-8?q?cessor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- .../src/processors/events.processor.ts | 58 +++++++++++++------ .../src/processors/processors.module.ts | 3 +- 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/services/processor/src/processors/events.processor.ts b/services/processor/src/processors/events.processor.ts index 2a039bc..8b6772c 100644 --- a/services/processor/src/processors/events.processor.ts +++ b/services/processor/src/processors/events.processor.ts @@ -1,43 +1,65 @@ import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq'; -import { Logger } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; import type { Job } from 'bullmq'; import { AggregationService } from './aggregation.service'; +import { RawEvent } from '../entities/raw-event.entity'; interface EventJob { + eventId: string; eventType: string; - timestamp: string; sessionId: string; - properties: Record; } +@Injectable() @Processor('analytics-events', { concurrency: 10, }) export class EventsProcessor extends WorkerHost { private readonly logger = new Logger(EventsProcessor.name); - constructor(private readonly aggregationService: AggregationService) { + constructor( + private readonly aggregationService: AggregationService, + @InjectRepository(RawEvent) + private readonly rawEventRepository: Repository, + ) { super(); } async process(job: Job): Promise { - const { eventType, timestamp, sessionId, properties } = job.data; + const { eventId, eventType, sessionId } = job.data; - this.logger.debug( - `Processing event: ${eventType} from session ${sessionId}`, - ); + this.logger.debug(`Processing event: ${eventType} (id: ${eventId}) from session ${sessionId}`); - try { - await this.aggregationService.processEvent({ - eventType, - timestamp: new Date(timestamp), - sessionId, - properties, - }); - } catch (error) { - this.logger.error(`Failed to process event: ${error}`); - throw error; + const event = await this.rawEventRepository.findOne({ where: { id: eventId } }); + + if (!event) { + this.logger.warn(`Event ${eventId} not found in raw_events — skipping`); + return; } + + if (event.processed) { + this.logger.debug(`Event ${eventId} already processed — skipping`); + return; + } + + await this.aggregationService.processEvent({ + eventType: event.eventType, + timestamp: event.timestamp, + sessionId: event.sessionId, + userId: event.userId, + properties: { + ...(event.metadata ?? {}), + path: event.pageUrl ?? undefined, + deviceType: event.deviceType ?? undefined, + }, + }); + + await this.rawEventRepository.update(eventId, { + processed: true, + processedAt: new Date(), + }); } @OnWorkerEvent('completed') diff --git a/services/processor/src/processors/processors.module.ts b/services/processor/src/processors/processors.module.ts index a5c534a..4083716 100644 --- a/services/processor/src/processors/processors.module.ts +++ b/services/processor/src/processors/processors.module.ts @@ -4,11 +4,12 @@ import { BullModule } from '@nestjs/bullmq'; import { EventsProcessor } from './events.processor'; import { AggregationService } from './aggregation.service'; import { AggregatedMetric } from '../entities/aggregated-metric.entity'; +import { RawEvent } from '../entities/raw-event.entity'; import { RedisModule } from '../redis/redis.module'; @Module({ imports: [ - TypeOrmModule.forFeature([AggregatedMetric]), + TypeOrmModule.forFeature([AggregatedMetric, RawEvent]), BullModule.registerQueue({ name: 'analytics-events', }),