Files
Imhotep/packages/imhotep-bench/src/parallel.ts
T

284 lines
7.4 KiB
TypeScript

// parallel.ts - Worker parallelism utilities for Imhotep bench harness
// Supports both worker_thread pools and lightweight Promise-based pools.
import { Worker } from 'node:worker_threads'
import { EventEmitter } from 'node:events'
export interface WorkerTask<TInput, TOutput> {
/** Unique task identifier */
id: string
/** Task payload */
input: TInput
/** Worker script path for worker_thread execution */
workerScript: string
}
export interface WorkerPoolOptions {
/** Maximum concurrent workers */
maxWorkers: number
/** Per-task timeout in milliseconds */
taskTimeoutMs: number
}
export interface WorkerPoolMetrics {
queued: number
running: number
completed: number
failed: number
}
interface PendingTask<TInput, TOutput> {
task: WorkerTask<TInput, TOutput>
resolve: (value: TOutput) => void
reject: (reason: Error) => void
timer?: ReturnType<typeof setTimeout>
}
/**
* WorkerPool manages a fixed number of worker threads.
* Tasks are queued and executed as workers become available.
*/
export class WorkerPool extends EventEmitter {
private queue: Array<PendingTask<unknown, unknown>> = []
private activeTasks = new Map<string, PendingTask<unknown, unknown>>()
private metrics = { queued: 0, running: 0, completed: 0, failed: 0 }
private shuttingDown = false
constructor(private options: WorkerPoolOptions) {
super()
}
/**
* Execute a task on an available worker.
* Returns a promise that resolves with the worker output.
*/
execute<TInput, TOutput>(
task: WorkerTask<TInput, TOutput>
): Promise<TOutput> {
if (this.shuttingDown) {
return Promise.reject(new Error('WorkerPool is shutting down'))
}
return new Promise<TOutput>((resolve, reject) => {
const pending: PendingTask<TInput, TOutput> = {
task,
resolve: resolve as (value: unknown) => void,
reject: reject as (reason: Error) => void,
}
this.queue.push(pending as PendingTask<unknown, unknown>)
this.metrics.queued++
this.emit('queued', task.id)
this._pump()
})
}
/** Execute multiple tasks in parallel, returning results in input order. */
async executeAll<TInput, TOutput>(
tasks: WorkerTask<TInput, TOutput>[]
): Promise<TOutput[]> {
const promises = tasks.map(t => this.execute(t))
return Promise.all(promises)
}
/** Current pool metrics snapshot */
getMetrics(): WorkerPoolMetrics {
return { ...this.metrics }
}
/** Terminate all workers and clear the queue */
async shutdown(): Promise<void> {
this.shuttingDown = true
for (const pending of this.queue) {
pending.reject(new Error('WorkerPool shutdown'))
}
this.queue = []
for (const [, pending] of this.activeTasks) {
if (pending.timer) clearTimeout(pending.timer)
pending.reject(new Error('WorkerPool shutdown'))
}
this.activeTasks.clear()
}
private _pump(): void {
if (this.queue.length === 0) return
if (this.activeTasks.size >= this.options.maxWorkers) return
const pending = this.queue.shift()!
this.metrics.queued--
this.metrics.running++
const taskId = pending.task.id
this.activeTasks.set(taskId, pending)
const timer = setTimeout(() => {
pending.reject(
new Error(
`Task ${pending.task.id} timed out after ${this.options.taskTimeoutMs}ms`
)
)
this.activeTasks.delete(taskId)
this.metrics.running--
this.metrics.failed++
this.emit('timeout', pending.task.id)
this._pump()
}, this.options.taskTimeoutMs)
pending.timer = timer
const worker = new Worker(pending.task.workerScript)
worker.once(
'message',
(message: {
success: boolean
result?: unknown
error?: string
}) => {
clearTimeout(timer)
this.activeTasks.delete(taskId)
this.metrics.running--
void worker.terminate()
if (message.success) {
pending.resolve(message.result)
this.metrics.completed++
this.emit('completed', pending.task.id)
} else {
pending.reject(new Error(message.error || 'Worker task failed'))
this.metrics.failed++
this.emit('failed', pending.task.id)
}
this._pump()
}
)
worker.once('error', (err: Error) => {
clearTimeout(timer)
this.activeTasks.delete(taskId)
this.metrics.running--
this.metrics.failed++
void worker.terminate()
pending.reject(err)
this.emit('failed', pending.task.id)
this._pump()
})
worker.postMessage({ id: pending.task.id, input: pending.task.input })
}
}
export interface PromiseTask<TInput, TOutput> {
id: string
input: TInput
fn: (input: TInput) => TOutput | Promise<TOutput>
}
export interface PromisePoolOptions {
maxConcurrency: number
taskTimeoutMs: number
}
/**
* PromisePool provides lightweight parallelism without worker threads.
* Runs async functions with controlled concurrency.
*/
export class PromisePool extends EventEmitter {
private queue: Array<{
task: PromiseTask<unknown, unknown>
resolve: (value: unknown) => void
reject: (reason: Error) => void
}> = []
private running = 0
private metrics = { queued: 0, completed: 0, failed: 0 }
private shuttingDown = false
constructor(private options: PromisePoolOptions) {
super()
}
execute<TInput, TOutput>(
task: PromiseTask<TInput, TOutput>
): Promise<TOutput> {
if (this.shuttingDown) {
return Promise.reject(new Error('PromisePool is shutting down'))
}
return new Promise<TOutput>((resolve, reject) => {
this.queue.push({
task: task as PromiseTask<unknown, unknown>,
resolve: resolve as (value: unknown) => void,
reject,
})
this.metrics.queued++
this.emit('queued', task.id)
this._pump()
})
}
async executeAll<TInput, TOutput>(
tasks: PromiseTask<TInput, TOutput>[]
): Promise<TOutput[]> {
const promises = tasks.map(t => this.execute(t))
return Promise.all(promises)
}
getMetrics(): WorkerPoolMetrics {
return {
queued: this.metrics.queued,
running: this.running,
completed: this.metrics.completed,
failed: this.metrics.failed,
}
}
async shutdown(): Promise<void> {
this.shuttingDown = true
for (const item of this.queue) {
item.reject(new Error('PromisePool shutdown'))
}
this.queue = []
}
private _pump(): void {
if (this.queue.length === 0) return
if (this.running >= this.options.maxConcurrency) return
const item = this.queue.shift()!
this.metrics.queued--
this.running++
const timer = setTimeout(() => {
this.running--
this.metrics.failed++
item.reject(
new Error(
`Task ${item.task.id} timed out after ${this.options.taskTimeoutMs}ms`
)
)
this.emit('timeout', item.task.id)
this._pump()
}, this.options.taskTimeoutMs)
Promise.resolve(item.task.fn(item.task.input))
.then(result => {
clearTimeout(timer)
this.running--
this.metrics.completed++
item.resolve(result)
this.emit('completed', item.task.id)
this._pump()
})
.catch(err => {
clearTimeout(timer)
this.running--
this.metrics.failed++
item.reject(err instanceof Error ? err : new Error(String(err)))
this.emit('failed', item.task.id)
this._pump()
})
}
}