From f6a8d17a9e9a7232ba067750b2bbf3e72fca9a72 Mon Sep 17 00:00:00 2001 From: Matteo Virgilio Date: Thu, 9 Apr 2026 19:40:17 +0200 Subject: [PATCH 1/2] Add custom stop conditions (stopWhen) and release v0.4.0 Add stopWhen option to SimulationEngineOptions for custom termination conditions evaluated after each event. When the callback returns true, the simulation ends with status 'stopConditionMet'. Useful for optimisation, steady-state detection, and Monte Carlo convergence. Breaking: SimulationEngineOptions now takes generics. --- CHANGELOG.md | 16 +++ README.md | 10 +- docs/simloop-general-specs.md | 19 +++- examples/stop-condition/main.ts | 67 ++++++++++++ examples/stop-condition/tsconfig.json | 14 +++ examples/store-counter/tsconfig.json | 14 +++ package.json | 5 +- src/engine.test.ts | 150 ++++++++++++++++++++++++++ src/engine.ts | 19 +++- src/types.ts | 13 ++- 10 files changed, 316 insertions(+), 11 deletions(-) create mode 100644 examples/stop-condition/main.ts create mode 100644 examples/stop-condition/tsconfig.json create mode 100644 examples/store-counter/tsconfig.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 87dfbea..d0d8453 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/), and this project adheres to [Semantic Versioning](https://semver.org/). +## [0.4.0] - 2026-04-09 + +### Added + +- `stopWhen` option in `SimulationEngineOptions` — custom stop condition evaluated after each + processed event; when it returns `true` the simulation ends with status `'stopConditionMet'`. + Useful for optimisation, steady-state detection, and Monte Carlo convergence. +- `'stopConditionMet'` value added to `SimulationEndStatus` + +### Changed + +- **Breaking**: `SimulationEngineOptions` now takes two generic parameters + `` (previously only ``). Bare usage and inline options are unaffected; + only explicit standalone typing needs updating. +- `stop-condition` example demonstrating `stopWhen` with coefficient of variation convergence + ## [0.3.0] - 2026-04-08 ### Added diff --git a/README.md b/README.md index bf40cc1..fce5ffc 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Simloop provides a minimal, type-safe API for building simulations of real-world - **Deterministic** — seeded PRNG ensures reproducible results - **Simple API** — define handlers with `sim.on()`, schedule events with `ctx.schedule()` - **Probability distributions** — uniform, gaussian, exponential, poisson, bernoulli, zipf, triangular, weibull, lognormal, erlang, geometric -- **Context-bound distributions** — `ctx.dist.exponential(rate)()` eliminates repetitive RNG wiring +- **Custom stop conditions** — `stopWhen` callback to halt the simulation when an arbitrary condition is met - **Warm-up period** — `warmUpTime` option auto-resets statistics after transient phase for steady-state analysis - **Lifecycle management** — run, pause, resume, stop, reset - **Built-in statistics** — online mean, variance, min, max, count @@ -166,6 +166,8 @@ const sim = new SimulationEngine({ name: 'MySim', // log prefix (default: 'Simulation') realTimeDelay: 100, // ms delay between events in runAsync (default: 0) warmUpTime: 500, // reset stats after this sim-time (default: undefined) + stopWhen: (ctx) => // custom stop condition (default: undefined) + ctx.stats.get('queue').mean > threshold, store: { ... }, // initial global store value (default: {}) }); ``` @@ -182,7 +184,7 @@ result.totalEventsCancelled // number of cancelled events skipped result.finalClock // final simulation time result.wallClockMs // real-world execution time in ms result.stats // Record -result.status // 'finished' | 'stopped' | 'maxTimeReached' | 'maxEventsReached' +result.status // 'finished' | 'stopped' | 'maxTimeReached' | 'maxEventsReached' | 'stopConditionMet' result.store // TStore — final state of the global store ``` @@ -239,11 +241,13 @@ See the [examples/](examples/) directory: - **[store-counter](examples/store-counter/)** — minimal example showing `ctx.store` usage - **[coffee-shop](examples/coffee-shop/)** — multi-barista coffee shop with customer patience, drink types, and queue management - **[network-packets](examples/network-packets/)** — network router simulation using all six probability distributions +- **[stop-condition](examples/stop-condition/)** — Monte Carlo convergence using `stopWhen` to halt when the coefficient of variation is low enough ```bash npm run example:store-counter npm run example:coffee-shop npm run example:network-packets +npm run example:stop-condition ``` ## Probability Distributions @@ -316,7 +320,7 @@ console.log(sampler()); // sample from exponential - `SimContext` — handler context - `EventHandler` — handler function signature - `SimulationResult` — run result -- `SimulationEngineOptions` — engine configuration +- `SimulationEngineOptions` — engine configuration - `ResourceOptions` / `RequestOptions` / `RequestHandle` / `ResourceSnapshot` — Resource types - `StatsCollector` / `StatsSummary` — statistics interfaces - `DistributionHelper` — interface for the `ctx.dist` object diff --git a/docs/simloop-general-specs.md b/docs/simloop-general-specs.md index fbb542f..4749e3d 100644 --- a/docs/simloop-general-specs.md +++ b/docs/simloop-general-specs.md @@ -125,7 +125,7 @@ The main class that users instantiate and configure. ```typescript class SimulationEngine, TStore = Record> { - constructor(options?: SimulationEngineOptions); + constructor(options?: SimulationEngineOptions); /** Register a handler for an event type */ on( @@ -220,7 +220,7 @@ interface SimulationResult> { readonly finalClock: number; readonly wallClockMs: number; readonly stats: Record; - readonly status: 'finished' | 'stopped' | 'maxTimeReached' | 'maxEventsReached'; + readonly status: 'finished' | 'stopped' | 'maxTimeReached' | 'maxEventsReached' | 'stopConditionMet'; readonly store: TStore; } ``` @@ -228,7 +228,10 @@ interface SimulationResult> { ### 3.9 Configuration ```typescript -interface SimulationEngineOptions> { +interface SimulationEngineOptions< + TEventMap extends Record = Record, + TStore = Record, +> { /** PRNG seed for reproducibility. Default: Date.now() */ seed?: number; @@ -255,6 +258,12 @@ interface SimulationEngineOptions> { * Default: undefined (no warm-up). */ warmUpTime?: number; + /** Custom stop condition evaluated after each processed event. When it returns `true` + * the simulation ends with status `'stopConditionMet'`. + * Useful for optimisation, steady-state detection, and Monte Carlo convergence. + * Default: undefined (no custom stop condition). */ + stopWhen?: (ctx: SimContext) => boolean; + /** Initial value for the global simulation store. Deep-cloned on init and on reset(). Default: {} */ store?: TStore; } @@ -296,6 +305,10 @@ while queue is not empty run all afterEach hooks increment processedCount + if stopWhen is defined AND stopWhen(context) is true: + mark stopConditionMet + break + run all onEnd hooks return SimulationResult ``` diff --git a/examples/stop-condition/main.ts b/examples/stop-condition/main.ts new file mode 100644 index 0000000..15fad5c --- /dev/null +++ b/examples/stop-condition/main.ts @@ -0,0 +1,67 @@ +/** + * Stop Condition Example + * + * Demonstrates the `stopWhen` option to halt a simulation when a custom + * condition is met. Here we model a simple sampling process: each event + * draws a random value and records it as a statistic. The simulation + * stops as soon as the running mean stabilises — i.e., when enough + * samples have been collected (count >= 200) and the coefficient of + * variation (CV = stddev / mean) drops below a threshold. + */ +import { SimulationEngine } from '../../src/index.js'; + +// --- Types --- + +type Events = { + sample: Record; +}; + +// --- Configuration --- + +const CV_THRESHOLD = 0.35; // stop when CV < 35 % (theoretical CV for this lognormal ≈ 0.31) +const MIN_SAMPLES = 200; + +const sim = new SimulationEngine({ + seed: 123, + logLevel: 'silent', + + stopWhen: (ctx) => { + const s = ctx.stats.get('value'); + if (s.count < MIN_SAMPLES) return false; + const cv = Math.sqrt(s.variance) / Math.abs(s.mean); + return cv < CV_THRESHOLD; + }, +}); + +// --- Handlers --- + +sim.on('sample', (_event, ctx) => { + // Draw from a lognormal distribution (right-skewed, mean ≈ e^(mu + sigma²/2)) + const value = ctx.dist.lognormal(2, 0.3)(); + ctx.stats.record('value', value); + + // Schedule next sample at t + 1 + ctx.schedule('sample', ctx.clock + 1, {}); +}); + +// --- Init --- + +sim.init((ctx) => { + ctx.schedule('sample', 0, {}); +}); + +// --- Run --- + +const result = sim.run(); + +const stats = result.stats['value']; +const cv = Math.sqrt(stats.variance) / Math.abs(stats.mean); + +console.log('=== Stop Condition Example ==='); +console.log(`Status : ${result.status}`); +console.log(`Samples : ${stats.count}`); +console.log(`Mean : ${stats.mean.toFixed(4)}`); +console.log(`Std dev : ${Math.sqrt(stats.variance).toFixed(4)}`); +console.log(`CV : ${(cv * 100).toFixed(2)} %`); +console.log(`Sim time : ${result.finalClock}`); +console.log(`Wall clock : ${result.wallClockMs.toFixed(1)} ms`); diff --git a/examples/stop-condition/tsconfig.json b/examples/stop-condition/tsconfig.json new file mode 100644 index 0000000..64291b5 --- /dev/null +++ b/examples/stop-condition/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "ES2022", + "moduleResolution": "bundler", + "lib": ["ES2022"], + "types": ["node"], + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "outDir": "./dist" + }, + "include": ["."] +} diff --git a/examples/store-counter/tsconfig.json b/examples/store-counter/tsconfig.json new file mode 100644 index 0000000..64291b5 --- /dev/null +++ b/examples/store-counter/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "ES2022", + "moduleResolution": "bundler", + "lib": ["ES2022"], + "types": ["node"], + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "outDir": "./dist" + }, + "include": ["."] +} diff --git a/package.json b/package.json index 6b81f65..47e01b9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "simloop", - "version": "0.3.0", + "version": "0.4.0", "description": "A general-purpose discrete event simulation framework for Node.js", "type": "module", "sideEffects": false, @@ -30,7 +30,8 @@ "prepublishOnly": "npm run typecheck && npm run test && npm run build", "example:store-counter": "tsx examples/store-counter/main.ts", "example:coffee-shop": "tsx examples/coffee-shop/main.ts", - "example:network-packets": "tsx examples/network-packets/main.ts" + "example:network-packets": "tsx examples/network-packets/main.ts", + "example:stop-condition": "tsx examples/stop-condition/main.ts" }, "keywords": [ "simulation", diff --git a/src/engine.test.ts b/src/engine.test.ts index cda5bbc..6095ad4 100644 --- a/src/engine.test.ts +++ b/src/engine.test.ts @@ -461,4 +461,154 @@ describe('SimulationEngine', () => { expect(flags).toEqual([false, true, false, true]); }); }); + + describe('custom stop conditions (stopWhen)', () => { + it('should stop when stopWhen returns true', () => { + const sim = new SimulationEngine({ + stopWhen: (ctx) => ctx.stats.get('count').sum >= 3, + }); + + sim.on('tick', (_e, ctx) => { + ctx.stats.record('count', 1); + ctx.schedule('tick', ctx.clock + 1, {}); + }); + + sim.init((ctx) => { + ctx.schedule('tick', 0, {}); + }); + + const result = sim.run(); + expect(result.status).toBe('stopConditionMet'); + expect(result.totalEventsProcessed).toBe(3); + }); + + it('should provide updated context to stopWhen', () => { + const clocks: number[] = []; + + const sim = new SimulationEngine({ + store: { total: 0 }, + stopWhen: (ctx) => { + clocks.push(ctx.clock); + return ctx.store.total >= 30; + }, + }); + + sim.on('tick', (_e, ctx) => { + ctx.store.total += 10; + ctx.schedule('tick', ctx.clock + 5, {}); + }); + + sim.init((ctx) => { + ctx.schedule('tick', 0, {}); + }); + + const result = sim.run(); + expect(result.status).toBe('stopConditionMet'); + expect(clocks).toEqual([0, 5, 10]); + expect(result.store.total).toBe(30); + }); + + it('should return maxTimeReached when maxTime triggers before stopWhen', () => { + const sim = new SimulationEngine({ + maxTime: 20, + stopWhen: () => false, // never triggers + }); + + sim.on('tick', (_e, ctx) => { + ctx.schedule('tick', ctx.clock + 10, {}); + }); + + sim.init((ctx) => { + ctx.schedule('tick', 0, {}); + }); + + const result = sim.run(); + expect(result.status).toBe('maxTimeReached'); + }); + + it('should return maxEventsReached when maxEvents triggers before stopWhen', () => { + const sim = new SimulationEngine({ + maxEvents: 2, + stopWhen: () => false, + }); + + sim.on('tick', (_e, ctx) => { + ctx.schedule('tick', ctx.clock + 1, {}); + }); + + sim.init((ctx) => { + ctx.schedule('tick', 0, {}); + }); + + const result = sim.run(); + expect(result.status).toBe('maxEventsReached'); + }); + + it('should process the event that triggers the stop condition', () => { + let processed = 0; + + const sim = new SimulationEngine({ + stopWhen: (ctx) => ctx.clock >= 10, + }); + + sim.on('tick', (_e, ctx) => { + processed++; + ctx.schedule('tick', ctx.clock + 5, {}); + }); + + sim.init((ctx) => { + ctx.schedule('tick', 0, {}); + }); + + const result = sim.run(); + expect(result.status).toBe('stopConditionMet'); + // Events at t=0,5,10 — the event at t=10 IS processed + expect(processed).toBe(3); + expect(result.finalClock).toBe(10); + }); + + it('should work with runAsync', async () => { + const sim = new SimulationEngine({ + stopWhen: (ctx) => ctx.stats.get('count').sum >= 3, + }); + + sim.on('tick', (_e, ctx) => { + ctx.stats.record('count', 1); + ctx.schedule('tick', ctx.clock + 1, {}); + }); + + sim.init((ctx) => { + ctx.schedule('tick', 0, {}); + }); + + const result = await sim.runAsync(); + expect(result.status).toBe('stopConditionMet'); + expect(result.totalEventsProcessed).toBe(3); + }); + + it('should reset stopConditionMet flag on reset', () => { + const sim = new SimulationEngine({ + stopWhen: (ctx) => ctx.clock >= 5, + }); + + sim.on('tick', (_e, ctx) => { + ctx.schedule('tick', ctx.clock + 5, {}); + }); + + sim.init((ctx) => { + ctx.schedule('tick', 0, {}); + }); + + const result1 = sim.run(); + expect(result1.status).toBe('stopConditionMet'); + + sim.reset(); + sim.init((ctx) => { + ctx.schedule('tick', 0, {}); + }); + + const result2 = sim.run(); + expect(result2.status).toBe('stopConditionMet'); + }); + }); }); diff --git a/src/engine.ts b/src/engine.ts index b02148b..af3d9d3 100644 --- a/src/engine.ts +++ b/src/engine.ts @@ -62,6 +62,8 @@ export class SimulationEngine, TStore private readonly maxEvents: number; private readonly realTimeDelay: number; private readonly warmUpTime: number | undefined; + private readonly stopWhen?: (ctx: SimContext) => boolean; + private _stopConditionMet = false; private _warmUpCompleted = false; private _store: TStore; @@ -70,12 +72,13 @@ export class SimulationEngine, TStore private eventIdCounter = 0; private context!: SimContext; - constructor(private readonly options: SimulationEngineOptions = {}) { + constructor(private readonly options: SimulationEngineOptions = {}) { this.seed = options.seed ?? Date.now(); this.maxTime = options.maxTime ?? Infinity; this.maxEvents = options.maxEvents ?? Infinity; this.realTimeDelay = options.realTimeDelay ?? 0; this.warmUpTime = options.warmUpTime; + this.stopWhen = options.stopWhen; this.rng = new SeededRandom(this.seed); this._stats = new DefaultStatsCollector(); @@ -210,6 +213,7 @@ export class SimulationEngine, TStore this.rng.reset(this.seed); this._store = structuredClone(this._initialStore); this._warmUpCompleted = false; + this._stopConditionMet = false; this._status = 'idle'; this.buildContext(); this.logInternal('debug', 'Simulation reset'); @@ -247,6 +251,11 @@ export class SimulationEngine, TStore } this._eventsProcessed++; + + if (this.stopWhen && this.stopWhen(this.context)) { + this._stopConditionMet = true; + break; + } } this.finalize(); @@ -285,6 +294,12 @@ export class SimulationEngine, TStore } this._eventsProcessed++; + + if (this.stopWhen && this.stopWhen(this.context)) { + this._stopConditionMet = true; + break; + } + batchCount++; // Yield to the Node.js event loop periodically @@ -338,6 +353,8 @@ export class SimulationEngine, TStore endStatus = 'maxEventsReached'; } else if (this._clock >= this.maxTime) { endStatus = 'maxTimeReached'; + } else if (this._stopConditionMet) { + endStatus = 'stopConditionMet'; } else { endStatus = 'finished'; } diff --git a/src/types.ts b/src/types.ts index 04c1969..1bb7dfb 100644 --- a/src/types.ts +++ b/src/types.ts @@ -95,7 +95,7 @@ export type EventHandler< export type SimulationStatus = 'idle' | 'running' | 'paused' | 'stopped' | 'finished'; /** Result termination reason */ -export type SimulationEndStatus = 'finished' | 'stopped' | 'maxTimeReached' | 'maxEventsReached'; +export type SimulationEndStatus = 'finished' | 'stopped' | 'maxTimeReached' | 'maxEventsReached' | 'stopConditionMet'; /** Result returned after a simulation run */ export interface SimulationResult> { @@ -109,7 +109,10 @@ export interface SimulationResult> { } /** Engine configuration options */ -export interface SimulationEngineOptions> { +export interface SimulationEngineOptions< + TEventMap extends Record = Record, + TStore = Record, +> { seed?: number; maxTime?: number; maxEvents?: number; @@ -123,5 +126,11 @@ export interface SimulationEngineOptions> { * Default: undefined (no warm-up). */ warmUpTime?: number; + /** Custom stop condition evaluated after each event. When it returns `true` the + * simulation ends with status `'stopConditionMet'`. + * Useful for optimisation, steady-state detection, and Monte Carlo convergence. + * Default: undefined (no custom stop condition). */ + stopWhen?: (ctx: SimContext) => boolean; + store?: TStore; } From 47063fb4e9a2157cfdf521a838bcb23b45074afc Mon Sep 17 00:00:00 2001 From: Matteo Virgilio Date: Fri, 10 Apr 2026 20:00:12 +0200 Subject: [PATCH 2/2] Add Queue primitive with bounded capacity and overflow policies Standalone FIFO/priority queue for modeling buffers, pipelines, and WIP limits. Supports maxCapacity with drop/block overflow policies and auto-collected statistics (enqueued, dequeued, dropped, blocked, blockTime, waitTime, queueLength, throughput). --- CHANGELOG.md | 7 + README.md | 38 ++++ docs/queue-spec.md | 264 +++++++++++++++++++++ examples/queue-buffer/main.ts | 76 +++++++ examples/queue-buffer/tsconfig.json | 14 ++ package.json | 3 +- src/index.ts | 2 + src/queue.test.ts | 341 ++++++++++++++++++++++++++++ src/queue.ts | 236 +++++++++++++++++++ 9 files changed, 980 insertions(+), 1 deletion(-) create mode 100644 docs/queue-spec.md create mode 100644 examples/queue-buffer/main.ts create mode 100644 examples/queue-buffer/tsconfig.json create mode 100644 src/queue.test.ts create mode 100644 src/queue.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index d0d8453..0ad4d9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/). processed event; when it returns `true` the simulation ends with status `'stopConditionMet'`. Useful for optimisation, steady-state detection, and Monte Carlo convergence. - `'stopConditionMet'` value added to `SimulationEndStatus` +- `Queue` — standalone FIFO/priority queue with bounded capacity, overflow policies + (drop/block), and auto-collected statistics (enqueued, dequeued, dropped, blocked, + blockTime, waitTime, queueLength, throughput) +- `QueueOptions`, `EnqueueOptions`, `QueueSnapshot` — exported types +- `docs/queue-spec.md` — full API specification with overflow policies, statistics reference, + and edge case documentation +- `queue-buffer` example demonstrating bounded production line with drop-on-overflow ### Changed diff --git a/README.md b/README.md index fce5ffc..e1fd3b8 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ Simloop provides a minimal, type-safe API for building simulations of real-world - **Deterministic** — seeded PRNG ensures reproducible results - **Simple API** — define handlers with `sim.on()`, schedule events with `ctx.schedule()` - **Probability distributions** — uniform, gaussian, exponential, poisson, bernoulli, zipf, triangular, weibull, lognormal, erlang, geometric +- **Queue primitive** — standalone FIFO/priority queue with bounded capacity, overflow policies (drop/block), and auto-collected stats - **Custom stop conditions** — `stopWhen` callback to halt the simulation when an arbitrary condition is met - **Warm-up period** — `warmUpTime` option auto-resets statistics after transient phase for steady-state analysis - **Lifecycle management** — run, pause, resume, stop, reset @@ -234,6 +235,39 @@ Auto-collected statistics: `resource.{name}.waitTime`, `queueLength`, `utilizati For the full API — priority queuing, cancellation, edge cases, and M/M/c examples — see [docs/resource-spec.md](docs/resource-spec.md). +## Queue + +`Queue` is a standalone FIFO/priority queue for modeling buffers, pipelines, conveyor belts, and WIP limits. Supports bounded capacity with overflow policies (drop or block). + +```typescript +import { SimulationEngine, Queue } from 'simloop'; + +type Events = { + 'item:produce': { itemId: number }; + 'item:consume': Record; +}; + +const sim = new SimulationEngine({ seed: 42 }); +const buffer = new Queue('buffer', { maxCapacity: 5 }); + +sim.on('item:produce', (event, ctx) => { + buffer.enqueue(ctx, event.payload.itemId); + ctx.schedule('item:produce', ctx.clock + ctx.dist.exponential(1)(), { + itemId: event.payload.itemId + 1, + }); +}); + +sim.on('item:consume', (_e, ctx) => { + const item = buffer.dequeue(ctx); + if (item !== undefined) ctx.stats.increment('consumed'); + ctx.schedule('item:consume', ctx.clock + ctx.dist.exponential(0.8)(), {}); +}); +``` + +Auto-collected statistics: `queue.{name}.enqueued`, `dequeued`, `dropped`, `blocked`, `blockTime`, `waitTime`, `queueLength`, `throughput`. + +For the full API — overflow policies, priority queuing, blocking, and edge cases — see [docs/queue-spec.md](docs/queue-spec.md). + ## Examples See the [examples/](examples/) directory: @@ -242,12 +276,14 @@ See the [examples/](examples/) directory: - **[coffee-shop](examples/coffee-shop/)** — multi-barista coffee shop with customer patience, drink types, and queue management - **[network-packets](examples/network-packets/)** — network router simulation using all six probability distributions - **[stop-condition](examples/stop-condition/)** — Monte Carlo convergence using `stopWhen` to halt when the coefficient of variation is low enough +- **[queue-buffer](examples/queue-buffer/)** — bounded production line with drop-on-overflow buffer ```bash npm run example:store-counter npm run example:coffee-shop npm run example:network-packets npm run example:stop-condition +npm run example:queue-buffer ``` ## Probability Distributions @@ -293,6 +329,7 @@ console.log(sampler()); // sample from exponential - `SimulationEngine` — main simulation engine - `Resource` — seize/delay/release primitive for shared resources +- `Queue` — standalone FIFO/priority queue with bounded capacity and overflow policies - `SimulationError` — error thrown for invalid operations - `ConsoleLogger` — default logger implementation - `DefaultStatsCollector` — default statistics collector @@ -322,6 +359,7 @@ console.log(sampler()); // sample from exponential - `SimulationResult` — run result - `SimulationEngineOptions` — engine configuration - `ResourceOptions` / `RequestOptions` / `RequestHandle` / `ResourceSnapshot` — Resource types +- `QueueOptions` / `EnqueueOptions` / `QueueSnapshot` — Queue types - `StatsCollector` / `StatsSummary` — statistics interfaces - `DistributionHelper` — interface for the `ctx.dist` object - `SimLogger` / `LogLevel` — logging interfaces diff --git a/docs/queue-spec.md b/docs/queue-spec.md new file mode 100644 index 0000000..83e1643 --- /dev/null +++ b/docs/queue-spec.md @@ -0,0 +1,264 @@ +# Queue — simloop primitive + +## 1. Overview + +### Problem + +Many simulations need to model buffers, pipelines, conveyor belts, and work-in-progress (WIP) limits. These are fundamentally different from `Resource` (which models capacity-constrained servers with seize/delay/release) — a `Queue` holds **items** that flow through the system. + +### Queue vs Resource + +| | Queue | Resource | +|---|---|---| +| **Models** | Buffers, pipelines, WIP limits | Servers, machines, staff | +| **Pattern** | enqueue item → wait → dequeue item | seize slot → delay → release slot | +| **Items** | Typed values (`Queue`) | Anonymous slots | +| **Capacity** | Bounded or unbounded | Always bounded (>= 1) | +| **Overflow** | Drop or block | Always queues | + +### Reference + +- GPSS QUEUE/DEPART blocks +- SimPy Store/FilterStore +- Arena QUEUE module + +--- + +## 2. Quick Start + +### Bounded buffer (drop on overflow) + +```ts +import { SimulationEngine, Queue } from 'simloop'; + +type Events = { + 'item:produce': { itemId: number }; + 'item:consume': Record; +}; + +const sim = new SimulationEngine({ seed: 42, maxTime: 100 }); +const buffer = new Queue('buffer', { maxCapacity: 5 }); + +sim.on('item:produce', (event, ctx) => { + const accepted = buffer.enqueue(ctx, event.payload.itemId); + if (!accepted) ctx.log('warn', `Item ${event.payload.itemId} dropped`); + + ctx.schedule('item:produce', ctx.clock + ctx.dist.exponential(1)(), { + itemId: event.payload.itemId + 1, + }); +}); + +sim.on('item:consume', (_e, ctx) => { + const item = buffer.dequeue(ctx); + if (item !== undefined) ctx.stats.increment('consumed'); + ctx.schedule('item:consume', ctx.clock + ctx.dist.exponential(0.8)(), {}); +}); + +sim.init((ctx) => { + ctx.schedule('item:produce', 0, { itemId: 1 }); + ctx.schedule('item:consume', 1, {}); +}); + +const result = sim.run(); +console.log('Consumed:', result.stats['consumed']?.count); +console.log('Dropped:', result.stats['queue.buffer.dropped']?.count ?? 0); +console.log('Avg wait:', result.stats['queue.buffer.waitTime']?.mean.toFixed(2)); +``` + +### Unbounded queue (default) + +```ts +const pipeline = new Queue('pipeline'); // maxCapacity = Infinity +``` + +### Blocking buffer + +```ts +const belt = new Queue('belt', { maxCapacity: 10, overflowPolicy: 'block' }); +// Items blocked when full are automatically admitted when dequeue() frees a slot +``` + +--- + +## 3. API Reference + +### `new Queue(name, options?)` + +```ts +const queue = new Queue(name: string, options?: QueueOptions); +``` + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `maxCapacity` | `number` | `Infinity` | Maximum number of items. Must be > 0. | +| `overflowPolicy` | `'drop' \| 'block'` | `'drop'` | Behaviour when enqueueing to a full queue. | +| `statsPrefix` | `string` | `name` | Prefix for all auto-collected stat keys. | + +Throws `SimulationError` if `maxCapacity <= 0`. + +--- + +### `queue.enqueue(ctx, item, options?)` + +```ts +const accepted = queue.enqueue(ctx, item, { priority: 0 }); +``` + +- If the queue has space: the item is inserted in priority order and returns `true`. +- If the queue is full and `overflowPolicy` is `'drop'`: returns `false`, item is discarded. +- If the queue is full and `overflowPolicy` is `'block'`: returns `false`, item is held in a waiting list and automatically admitted when `dequeue()` frees a slot. + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `priority` | `number` | `0` | Lower value = higher precedence. Ties broken by arrival order (FIFO). | + +--- + +### `queue.dequeue(ctx)` + +```ts +const item = queue.dequeue(ctx); // T | undefined +``` + +Removes and returns the front item (highest-priority, or oldest for same priority). + +Returns `undefined` if the queue is empty. + +If blocked items are waiting and the dequeue frees a slot, the oldest blocked item is automatically admitted. + +--- + +### `queue.peek()` + +```ts +const item = queue.peek(); // T | undefined +``` + +Returns the front item without removing it. Does not record any stats. + +--- + +### `queue.snapshot()` + +```ts +const snap = queue.snapshot(); +// { name, maxCapacity, length, items: readonly T[] } +``` + +Returns a plain object with the current state. Useful for logging and assertions. + +--- + +### `queue.reset()` + +```ts +queue.reset(); +``` + +Clears all items, blocked entries, and the insertion counter. **Must be called after `engine.reset()`** before re-running the simulation. + +--- + +### Accessors + +| Accessor | Type | Description | +|----------|------|-------------| +| `queue.name` | `string` | Name given at construction | +| `queue.maxCapacity` | `number` | Maximum capacity | +| `queue.overflowPolicy` | `'drop' \| 'block'` | Overflow behaviour | +| `queue.length` | `number` | Current number of items | +| `queue.isFull` | `boolean` | `length >= maxCapacity` | +| `queue.isEmpty` | `boolean` | `length === 0` | + +--- + +## 4. Overflow Policies + +### Drop (default) + +When the queue is full, `enqueue()` returns `false` and the item is silently discarded. The `queue.{name}.dropped` counter is incremented. + +Use this for systems where items can be lost (e.g., network packet buffers, production lines with no backpressure). + +### Block + +When the queue is full, the item is held in an internal waiting list. When `dequeue()` removes an item and frees a slot, the oldest blocked item is automatically admitted to the queue. The `queue.{name}.blocked` counter is incremented on block, and `queue.{name}.blockTime` records the time from block to admission. + +Use this for systems with backpressure (e.g., conveyor belts, bounded producer-consumer). + +--- + +## 5. Statistics Reference + +All stat keys are prefixed with `queue.{statsPrefix}.` (default: `queue.{name}.`). + +| Key | Collected via | Description | +|-----|---------------|-------------| +| `queue.{n}.enqueued` | `stats.increment` | Total successful enqueue operations | +| `queue.{n}.dequeued` | `stats.increment` | Total successful dequeue operations | +| `queue.{n}.throughput` | `stats.increment` | Alias for dequeued (useful for throughput reporting) | +| `queue.{n}.dropped` | `stats.increment` | Items discarded due to overflow (policy='drop') | +| `queue.{n}.blocked` | `stats.increment` | Items blocked waiting for space (policy='block') | +| `queue.{n}.blockTime` | `stats.record` | Time from block to admission | +| `queue.{n}.waitTime` | `stats.record` | Time from enqueue to dequeue per item | +| `queue.{n}.queueLength` | `stats.record` | Queue depth snapshot after each enqueue/dequeue | + +**Reading stats from `SimulationResult`:** + +```ts +const result = sim.run(); +const wt = result.stats['queue.buffer.waitTime']; +console.log(`Mean wait: ${wt.mean.toFixed(2)}, max: ${wt.max.toFixed(2)}`); + +const dropped = result.stats['queue.buffer.dropped']?.count ?? 0; +console.log(`Drop rate: ${(dropped / total * 100).toFixed(1)}%`); +``` + +--- + +## 6. Priority Queuing + +By default, all items have `priority = 0` and are served in FIFO order. + +```ts +queue.enqueue(ctx, item, { priority: 1 }); // high priority +queue.enqueue(ctx, item, { priority: 10 }); // low priority +``` + +Within the same priority level, items are served in the order `enqueue()` was called (FIFO). The insertion counter is internal and never resets until `queue.reset()`. + +**Negative priorities are allowed.** Priority is a plain `number`; the minimum value wins. + +--- + +## 7. Edge Cases + +### 7.1 Dequeue from empty queue + +Returns `undefined`. No stats are recorded. + +### 7.2 `maxCapacity = Infinity` (default) + +The queue is never full — `isFull` always returns `false`, `overflowPolicy` is irrelevant. + +### 7.3 Queue state after `engine.reset()` + +Like `Resource`, `Queue` is external to the engine. After resetting, call `queue.reset()` before the next run: + +```ts +sim.run(); +sim.reset(); +queue.reset(); // ← required +sim.init((ctx) => { /* re-init */ }); +sim.run(); +``` + +### 7.4 Multiple queues with the same name + +Stats will be recorded under the same prefix and will be merged — producing incorrect statistics. Names (or `statsPrefix` values) must be unique per simulation instance. + +--- + +## 8. Full Example: Production Line Buffer + +See the full annotated source: [examples/queue-buffer/main.ts](../examples/queue-buffer/main.ts) diff --git a/examples/queue-buffer/main.ts b/examples/queue-buffer/main.ts new file mode 100644 index 0000000..c289b4c --- /dev/null +++ b/examples/queue-buffer/main.ts @@ -0,0 +1,76 @@ +/** + * Queue Buffer Example + * + * A bounded production line: a producer generates items at random intervals + * and pushes them into a buffer (Queue with maxCapacity=5). A consumer + * dequeues items for processing at a slightly slower rate. When the buffer + * is full, items are dropped. + * + * At the end, the simulation reports throughput, drop rate, and wait times. + */ +import { SimulationEngine, Queue } from '../../src/index.js'; + +// --- Types --- + +type Events = { + 'item:produce': { itemId: number }; + 'item:consume': Record; +}; + +// --- Setup --- + +const sim = new SimulationEngine({ + seed: 42, + maxTime: 500, + logLevel: 'silent', +}); + +const buffer = new Queue('buffer', { maxCapacity: 5, overflowPolicy: 'drop' }); + +// --- Handlers --- + +sim.on('item:produce', (event, ctx) => { + buffer.enqueue(ctx, event.payload.itemId); + + // Schedule next production (mean inter-arrival = 1.0) + ctx.schedule('item:produce', ctx.clock + ctx.dist.exponential(1.0)(), { + itemId: event.payload.itemId + 1, + }); +}); + +sim.on('item:consume', (_e, ctx) => { + const item = buffer.dequeue(ctx); + if (item !== undefined) { + ctx.stats.increment('consumed'); + } + + // Schedule next consumption (mean inter-service = 1.2, slightly slower than production) + ctx.schedule('item:consume', ctx.clock + ctx.dist.exponential(1 / 1.2)(), {}); +}); + +// --- Init --- + +sim.init((ctx) => { + ctx.schedule('item:produce', 0, { itemId: 1 }); + ctx.schedule('item:consume', 0.5, {}); +}); + +// --- Run --- + +const result = sim.run(); + +const enqueued = result.stats['queue.buffer.enqueued']?.count ?? 0; +const dequeued = result.stats['queue.buffer.dequeued']?.count ?? 0; +const dropped = result.stats['queue.buffer.dropped']?.count ?? 0; +const waitTime = result.stats['queue.buffer.waitTime']; + +console.log('=== Queue Buffer Example ==='); +console.log(`Sim time : ${result.finalClock.toFixed(1)}`); +console.log(`Produced : ${enqueued + dropped}`); +console.log(`Enqueued : ${enqueued}`); +console.log(`Consumed : ${dequeued}`); +console.log(`Dropped : ${dropped}`); +console.log(`Drop rate : ${(dropped / (enqueued + dropped) * 100).toFixed(1)} %`); +console.log(`Avg wait : ${waitTime?.mean.toFixed(2) ?? 'N/A'}`); +console.log(`Max wait : ${waitTime?.max.toFixed(2) ?? 'N/A'}`); +console.log(`Wall clock : ${result.wallClockMs.toFixed(1)} ms`); diff --git a/examples/queue-buffer/tsconfig.json b/examples/queue-buffer/tsconfig.json new file mode 100644 index 0000000..64291b5 --- /dev/null +++ b/examples/queue-buffer/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "ES2022", + "moduleResolution": "bundler", + "lib": ["ES2022"], + "types": ["node"], + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "outDir": "./dist" + }, + "include": ["."] +} diff --git a/package.json b/package.json index 47e01b9..a08b799 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,8 @@ "example:store-counter": "tsx examples/store-counter/main.ts", "example:coffee-shop": "tsx examples/coffee-shop/main.ts", "example:network-packets": "tsx examples/network-packets/main.ts", - "example:stop-condition": "tsx examples/stop-condition/main.ts" + "example:stop-condition": "tsx examples/stop-condition/main.ts", + "example:queue-buffer": "tsx examples/queue-buffer/main.ts" }, "keywords": [ "simulation", diff --git a/src/index.ts b/src/index.ts index 3df3c51..07f3a2a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,5 +22,7 @@ export type { } from './types.js'; export { Resource } from './resource.js'; +export { Queue } from './queue.js'; export type { ResourceOptions, RequestOptions, RequestHandle, ResourceSnapshot } from './resource.js'; +export type { QueueOptions, EnqueueOptions, QueueSnapshot } from './queue.js'; diff --git a/src/queue.test.ts b/src/queue.test.ts new file mode 100644 index 0000000..8392e43 --- /dev/null +++ b/src/queue.test.ts @@ -0,0 +1,341 @@ +import { describe, it, expect } from 'vitest'; +import { SimulationEngine } from './engine.js'; +import { Queue } from './queue.js'; + +type TestEvents = { + produce: { item: string }; + consume: Record; +}; + +describe('Queue', () => { + // Helper: run a single event that calls `fn` with ctx, then return result + function withCtx(fn: (ctx: any) => void): void { + const sim = new SimulationEngine({ logLevel: 'silent' }); + sim.on('produce', (_e, ctx) => fn(ctx)); + sim.init((ctx) => ctx.schedule('produce', 0, { item: '' })); + sim.run(); + } + + describe('basic FIFO', () => { + it('should dequeue items in insertion order', () => { + const q = new Queue('buf'); + const items: string[] = []; + + withCtx((ctx) => { + q.enqueue(ctx, 'a'); + q.enqueue(ctx, 'b'); + q.enqueue(ctx, 'c'); + items.push(q.dequeue(ctx)!); + items.push(q.dequeue(ctx)!); + items.push(q.dequeue(ctx)!); + }); + + expect(items).toEqual(['a', 'b', 'c']); + }); + }); + + describe('priority ordering', () => { + it('should dequeue lower priority number first', () => { + const q = new Queue('buf'); + const items: string[] = []; + + withCtx((ctx) => { + q.enqueue(ctx, 'low', { priority: 10 }); + q.enqueue(ctx, 'high', { priority: 1 }); + q.enqueue(ctx, 'mid', { priority: 5 }); + items.push(q.dequeue(ctx)!); + items.push(q.dequeue(ctx)!); + items.push(q.dequeue(ctx)!); + }); + + expect(items).toEqual(['high', 'mid', 'low']); + }); + + it('should use FIFO within same priority', () => { + const q = new Queue('buf'); + const items: string[] = []; + + withCtx((ctx) => { + q.enqueue(ctx, 'first', { priority: 1 }); + q.enqueue(ctx, 'second', { priority: 1 }); + q.enqueue(ctx, 'third', { priority: 1 }); + items.push(q.dequeue(ctx)!); + items.push(q.dequeue(ctx)!); + items.push(q.dequeue(ctx)!); + }); + + expect(items).toEqual(['first', 'second', 'third']); + }); + }); + + describe('peek', () => { + it('should return front item without removing it', () => { + const q = new Queue('buf'); + + withCtx((ctx) => { + q.enqueue(ctx, 'a'); + q.enqueue(ctx, 'b'); + expect(q.peek()).toBe('a'); + expect(q.length).toBe(2); + }); + }); + + it('should return undefined on empty queue', () => { + const q = new Queue('buf'); + expect(q.peek()).toBeUndefined(); + }); + }); + + describe('accessors', () => { + it('should track length, isEmpty, isFull', () => { + const q = new Queue('buf', { maxCapacity: 2 }); + + expect(q.isEmpty).toBe(true); + expect(q.isFull).toBe(false); + expect(q.length).toBe(0); + + withCtx((ctx) => { + q.enqueue(ctx, 1); + expect(q.length).toBe(1); + expect(q.isEmpty).toBe(false); + expect(q.isFull).toBe(false); + + q.enqueue(ctx, 2); + expect(q.length).toBe(2); + expect(q.isFull).toBe(true); + }); + }); + }); + + describe('unbounded queue', () => { + it('should never be full with default maxCapacity', () => { + const q = new Queue('buf'); + + withCtx((ctx) => { + for (let i = 0; i < 1000; i++) { + expect(q.enqueue(ctx, i)).toBe(true); + } + expect(q.isFull).toBe(false); + expect(q.length).toBe(1000); + }); + }); + }); + + describe('bounded + drop policy', () => { + it('should drop items when full and return false', () => { + const q = new Queue('buf', { maxCapacity: 2, overflowPolicy: 'drop' }); + + withCtx((ctx) => { + expect(q.enqueue(ctx, 'a')).toBe(true); + expect(q.enqueue(ctx, 'b')).toBe(true); + expect(q.enqueue(ctx, 'c')).toBe(false); // dropped + expect(q.length).toBe(2); + }); + }); + + it('should increment dropped stat', () => { + const sim = new SimulationEngine({ seed: 1, logLevel: 'silent' }); + const q = new Queue('buf', { maxCapacity: 1 }); + + sim.on('produce', (_e, ctx) => { + q.enqueue(ctx, 'a'); + q.enqueue(ctx, 'b'); // dropped + q.enqueue(ctx, 'c'); // dropped + }); + + sim.init((ctx) => ctx.schedule('produce', 0, { item: '' })); + const result = sim.run(); + + expect(result.stats['queue.buf.dropped'].count).toBe(2); + }); + }); + + describe('bounded + block policy', () => { + it('should block items when full and admit on dequeue', () => { + const q = new Queue('buf', { maxCapacity: 2, overflowPolicy: 'block' }); + + withCtx((ctx) => { + q.enqueue(ctx, 'a'); + q.enqueue(ctx, 'b'); + expect(q.enqueue(ctx, 'c')).toBe(false); // blocked + expect(q.length).toBe(2); + + const item = q.dequeue(ctx); + expect(item).toBe('a'); + // 'c' should now be admitted + expect(q.length).toBe(2); // 'b' + 'c' + expect(q.dequeue(ctx)).toBe('b'); + expect(q.dequeue(ctx)).toBe('c'); + }); + }); + + it('should record blockTime stat', () => { + const sim = new SimulationEngine({ seed: 1, logLevel: 'silent' }); + const q = new Queue('buf', { maxCapacity: 1, overflowPolicy: 'block' }); + + sim.on('produce', (event, ctx) => { + if (event.payload.item === 'first') { + q.enqueue(ctx, 'a'); + q.enqueue(ctx, 'b'); // blocked at t=0 + ctx.schedule('produce', ctx.clock + 10, { item: 'second' }); + } else { + q.dequeue(ctx); // at t=10, admits 'b', blockTime = 10 + } + }); + + sim.init((ctx) => ctx.schedule('produce', 0, { item: 'first' })); + const result = sim.run(); + + expect(result.stats['queue.buf.blocked'].count).toBe(1); + expect(result.stats['queue.buf.blockTime'].mean).toBe(10); + }); + }); + + describe('stats collection', () => { + it('should record all stats correctly', () => { + const sim = new SimulationEngine({ seed: 1, logLevel: 'silent' }); + const q = new Queue('buf'); + + sim.on('produce', (_e, ctx) => { + q.enqueue(ctx, 'a'); + q.enqueue(ctx, 'b'); + ctx.schedule('consume', ctx.clock + 5, {}); + }); + + sim.on('consume', (_e, ctx) => { + q.dequeue(ctx); + }); + + sim.init((ctx) => ctx.schedule('produce', 0, { item: '' })); + const result = sim.run(); + + expect(result.stats['queue.buf.enqueued'].count).toBe(2); + expect(result.stats['queue.buf.dequeued'].count).toBe(1); + expect(result.stats['queue.buf.throughput'].count).toBe(1); + }); + }); + + describe('waitTime', () => { + it('should record time between enqueue and dequeue', () => { + const sim = new SimulationEngine({ seed: 1, logLevel: 'silent' }); + const q = new Queue('buf'); + + sim.on('produce', (_e, ctx) => { + q.enqueue(ctx, 'a'); + ctx.schedule('consume', ctx.clock + 7, {}); + }); + + sim.on('consume', (_e, ctx) => { + q.dequeue(ctx); + }); + + sim.init((ctx) => ctx.schedule('produce', 0, { item: '' })); + const result = sim.run(); + + expect(result.stats['queue.buf.waitTime'].mean).toBe(7); + }); + }); + + describe('dequeue from empty', () => { + it('should return undefined', () => { + const q = new Queue('buf'); + + withCtx((ctx) => { + expect(q.dequeue(ctx)).toBeUndefined(); + }); + }); + }); + + describe('snapshot', () => { + it('should return correct state', () => { + const q = new Queue('buf', { maxCapacity: 5 }); + + withCtx((ctx) => { + q.enqueue(ctx, 'a'); + q.enqueue(ctx, 'b'); + + const snap = q.snapshot(); + expect(snap.name).toBe('buf'); + expect(snap.maxCapacity).toBe(5); + expect(snap.length).toBe(2); + expect(snap.items).toEqual(['a', 'b']); + }); + }); + }); + + describe('reset', () => { + it('should clear all internal state', () => { + const q = new Queue('buf', { maxCapacity: 2, overflowPolicy: 'block' }); + + withCtx((ctx) => { + q.enqueue(ctx, 'a'); + q.enqueue(ctx, 'b'); + q.enqueue(ctx, 'c'); // blocked + }); + + expect(q.length).toBe(2); + + q.reset(); + + expect(q.length).toBe(0); + expect(q.isEmpty).toBe(true); + expect(q.peek()).toBeUndefined(); + }); + }); + + describe('custom statsPrefix', () => { + it('should use custom prefix for stats', () => { + const sim = new SimulationEngine({ seed: 1, logLevel: 'silent' }); + const q = new Queue('buf', { statsPrefix: 'myprefix' }); + + sim.on('produce', (_e, ctx) => { + q.enqueue(ctx, 'a'); + q.dequeue(ctx); + }); + + sim.init((ctx) => ctx.schedule('produce', 0, { item: '' })); + const result = sim.run(); + + expect(result.stats['queue.myprefix.enqueued'].count).toBe(1); + expect(result.stats['queue.myprefix.dequeued'].count).toBe(1); + }); + }); + + describe('constructor validation', () => { + it('should throw for invalid maxCapacity', () => { + expect(() => new Queue('buf', { maxCapacity: 0 })).toThrow(); + expect(() => new Queue('buf', { maxCapacity: -1 })).toThrow(); + }); + }); + + describe('integration', () => { + it('should work within a full simulation run', () => { + const sim = new SimulationEngine({ seed: 42, maxEvents: 20, logLevel: 'silent' }); + const q = new Queue('pipeline', { maxCapacity: 3, overflowPolicy: 'drop' }); + const consumed: string[] = []; + + sim.on('produce', (event, ctx) => { + q.enqueue(ctx, event.payload.item); + const nextId = `item-${ctx.stats.get('queue.pipeline.enqueued').count + ctx.stats.get('queue.pipeline.dropped').count + 1}`; + ctx.schedule('produce', ctx.clock + 1, { item: nextId }); + }); + + sim.on('consume', (_e, ctx) => { + const item = q.dequeue(ctx); + if (item !== undefined) consumed.push(item); + ctx.schedule('consume', ctx.clock + 3, {}); + }); + + sim.init((ctx) => { + ctx.schedule('produce', 0, { item: 'item-1' }); + ctx.schedule('consume', 1, {}); + }); + + const result = sim.run(); + + expect(result.totalEventsProcessed).toBe(20); + expect(consumed.length).toBeGreaterThan(0); + expect(result.stats['queue.pipeline.enqueued']).toBeDefined(); + }); + }); +}); diff --git a/src/queue.ts b/src/queue.ts new file mode 100644 index 0000000..c74841c --- /dev/null +++ b/src/queue.ts @@ -0,0 +1,236 @@ +import type { SimContext } from './types.js'; +import { SimulationError } from './engine.js'; + +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- + +export interface QueueOptions { + /** Maximum number of items. Default: Infinity (unbounded) */ + maxCapacity?: number; + /** What happens when enqueue is called on a full queue. Default: 'drop' */ + overflowPolicy?: 'drop' | 'block'; + /** Prefix for all auto-collected stat keys. Default: queue name */ + statsPrefix?: string; +} + +export interface EnqueueOptions { + /** Lower value = higher precedence. Ties broken by arrival order (FIFO). Default: 0 */ + priority?: number; +} + +export interface QueueSnapshot { + name: string; + maxCapacity: number; + length: number; + items: readonly T[]; +} + +// --------------------------------------------------------------------------- +// Internal types +// --------------------------------------------------------------------------- + +interface QueueEntry { + readonly item: T; + readonly enqueuedAt: number; + readonly priority: number; + readonly insertionOrder: number; +} + +interface BlockedEntry { + readonly item: T; + readonly blockedAt: number; + readonly priority: number; + readonly insertionOrder: number; +} + +// --------------------------------------------------------------------------- +// Queue class +// --------------------------------------------------------------------------- + +/** + * A standalone FIFO/priority queue with optional bounded capacity, overflow + * policies (drop/block), and auto-collected statistics. + * + * Unlike `Resource` (seize/delay/release for capacity-constrained servers), + * `Queue` models buffers, pipelines, conveyor belts, and WIP limits. + * + * @example + * ```ts + * const buffer = new Queue('buffer', { maxCapacity: 10 }); + * + * sim.on('item:produce', (event, ctx) => { + * buffer.enqueue(ctx, event.payload.itemId); + * }); + * + * sim.on('item:consume', (_event, ctx) => { + * const item = buffer.dequeue(ctx); + * if (item !== undefined) { ... } + * }); + * ``` + */ +export class Queue { + readonly name: string; + readonly maxCapacity: number; + readonly overflowPolicy: 'drop' | 'block'; + + private readonly _statsPrefix: string; + private readonly _items: QueueEntry[] = []; + private readonly _blocked: BlockedEntry[] = []; + private _insertionCounter = 0; + + constructor(name: string, options: QueueOptions = {}) { + const maxCapacity = options.maxCapacity ?? Infinity; + if (maxCapacity <= 0) { + throw new SimulationError(`Queue '${name}': maxCapacity must be > 0, got ${maxCapacity}`); + } + this.name = name; + this.maxCapacity = maxCapacity; + this.overflowPolicy = options.overflowPolicy ?? 'drop'; + this._statsPrefix = options.statsPrefix ?? name; + } + + // --------------------------------------------------------------------------- + // Accessors + // --------------------------------------------------------------------------- + + /** Current number of items in the queue */ + get length(): number { + return this._items.length; + } + + /** True when length >= maxCapacity */ + get isFull(): boolean { + return this._items.length >= this.maxCapacity; + } + + /** True when length === 0 */ + get isEmpty(): boolean { + return this._items.length === 0; + } + + // --------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------- + + /** + * Add an item to the queue. + * + * - If the queue is not full, the item is inserted in priority order (FIFO + * within the same priority). + * - If the queue is full and `overflowPolicy` is `'drop'`, returns `false` + * and the item is discarded. + * - If the queue is full and `overflowPolicy` is `'block'`, the item is held + * in a waiting list and automatically admitted when `dequeue()` frees a slot. + * + * @returns `true` if the item was enqueued, `false` if dropped or blocked. + */ + enqueue, TStore>(ctx: SimContext, item: T, options: EnqueueOptions = {}): boolean { + const prefix = this._statsPrefix; + const priority = options.priority ?? 0; + const insertionOrder = ++this._insertionCounter; + + if (this._items.length < this.maxCapacity) { + this._insertSorted({ item, enqueuedAt: ctx.clock, priority, insertionOrder }); + ctx.stats.increment(`queue.${prefix}.enqueued`); + ctx.stats.record(`queue.${prefix}.queueLength`, this._items.length); + return true; + } + + if (this.overflowPolicy === 'drop') { + ctx.stats.increment(`queue.${prefix}.dropped`); + return false; + } + + // block + this._blocked.push({ item, blockedAt: ctx.clock, priority, insertionOrder }); + ctx.stats.increment(`queue.${prefix}.blocked`); + return false; + } + + /** + * Remove and return the highest-priority (or oldest) item from the queue. + * + * If blocked items are waiting and the dequeue frees a slot, the oldest + * blocked item is automatically admitted. + * + * @returns The item, or `undefined` if the queue is empty. + */ + dequeue, TStore>(ctx: SimContext): T | undefined { + if (this._items.length === 0) return undefined; + + const entry = this._items.shift()!; + const prefix = this._statsPrefix; + + ctx.stats.increment(`queue.${prefix}.dequeued`); + ctx.stats.increment(`queue.${prefix}.throughput`); + ctx.stats.record(`queue.${prefix}.waitTime`, ctx.clock - entry.enqueuedAt); + ctx.stats.record(`queue.${prefix}.queueLength`, this._items.length); + + // Admit oldest blocked item if space is available + if (this._blocked.length > 0 && this._items.length < this.maxCapacity) { + const blocked = this._blocked.shift()!; + const blockTime = ctx.clock - blocked.blockedAt; + ctx.stats.record(`queue.${prefix}.blockTime`, blockTime); + + this._insertSorted({ + item: blocked.item, + enqueuedAt: ctx.clock, + priority: blocked.priority, + insertionOrder: blocked.insertionOrder, + }); + ctx.stats.increment(`queue.${prefix}.enqueued`); + ctx.stats.record(`queue.${prefix}.queueLength`, this._items.length); + } + + return entry.item; + } + + /** + * Look at the front item without removing it. + * + * @returns The item, or `undefined` if the queue is empty. + */ + peek(): T | undefined { + return this._items.length > 0 ? this._items[0].item : undefined; + } + + /** + * Return a plain snapshot of the queue's current state. + */ + snapshot(): QueueSnapshot { + return { + name: this.name, + maxCapacity: this.maxCapacity, + length: this._items.length, + items: this._items.map((e) => e.item), + }; + } + + /** + * Reset internal state. Call after engine.reset() before re-running. + */ + reset(): void { + this._items.length = 0; + this._blocked.length = 0; + this._insertionCounter = 0; + } + + // --------------------------------------------------------------------------- + // Private helpers + // --------------------------------------------------------------------------- + + /** Insert entry maintaining sort order: (priority ASC, insertionOrder ASC) */ + private _insertSorted(entry: QueueEntry): void { + let i = this._items.length; + while ( + i > 0 && + (this._items[i - 1].priority > entry.priority || + (this._items[i - 1].priority === entry.priority && + this._items[i - 1].insertionOrder > entry.insertionOrder)) + ) { + i--; + } + this._items.splice(i, 0, entry); + } +}