284 lines
7.4 KiB
TypeScript
284 lines
7.4 KiB
TypeScript
|
|
// parallel.ts - Worker parallelism utilities for Imhotep bench harness
|
||
|
|
// Supports both worker_thread pools and lightweight Promise-based pools.
|
||
|
|
|
||
|
|
import { Worker } from 'node:worker_threads'
|
||
|
|
import { EventEmitter } from 'node:events'
|
||
|
|
|
||
|
|
export interface WorkerTask<TInput, TOutput> {
|
||
|
|
/** Unique task identifier */
|
||
|
|
id: string
|
||
|
|
/** Task payload */
|
||
|
|
input: TInput
|
||
|
|
/** Worker script path for worker_thread execution */
|
||
|
|
workerScript: string
|
||
|
|
}
|
||
|
|
|
||
|
|
export interface WorkerPoolOptions {
|
||
|
|
/** Maximum concurrent workers */
|
||
|
|
maxWorkers: number
|
||
|
|
/** Per-task timeout in milliseconds */
|
||
|
|
taskTimeoutMs: number
|
||
|
|
}
|
||
|
|
|
||
|
|
export interface WorkerPoolMetrics {
|
||
|
|
queued: number
|
||
|
|
running: number
|
||
|
|
completed: number
|
||
|
|
failed: number
|
||
|
|
}
|
||
|
|
|
||
|
|
interface PendingTask<TInput, TOutput> {
|
||
|
|
task: WorkerTask<TInput, TOutput>
|
||
|
|
resolve: (value: TOutput) => void
|
||
|
|
reject: (reason: Error) => void
|
||
|
|
timer?: ReturnType<typeof setTimeout>
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* WorkerPool manages a fixed number of worker threads.
|
||
|
|
* Tasks are queued and executed as workers become available.
|
||
|
|
*/
|
||
|
|
export class WorkerPool extends EventEmitter {
|
||
|
|
private queue: Array<PendingTask<unknown, unknown>> = []
|
||
|
|
private activeTasks = new Map<string, PendingTask<unknown, unknown>>()
|
||
|
|
private metrics = { queued: 0, running: 0, completed: 0, failed: 0 }
|
||
|
|
private shuttingDown = false
|
||
|
|
|
||
|
|
constructor(private options: WorkerPoolOptions) {
|
||
|
|
super()
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Execute a task on an available worker.
|
||
|
|
* Returns a promise that resolves with the worker output.
|
||
|
|
*/
|
||
|
|
execute<TInput, TOutput>(
|
||
|
|
task: WorkerTask<TInput, TOutput>
|
||
|
|
): Promise<TOutput> {
|
||
|
|
if (this.shuttingDown) {
|
||
|
|
return Promise.reject(new Error('WorkerPool is shutting down'))
|
||
|
|
}
|
||
|
|
|
||
|
|
return new Promise<TOutput>((resolve, reject) => {
|
||
|
|
const pending: PendingTask<TInput, TOutput> = {
|
||
|
|
task,
|
||
|
|
resolve: resolve as (value: unknown) => void,
|
||
|
|
reject: reject as (reason: Error) => void,
|
||
|
|
}
|
||
|
|
this.queue.push(pending as PendingTask<unknown, unknown>)
|
||
|
|
this.metrics.queued++
|
||
|
|
this.emit('queued', task.id)
|
||
|
|
this._pump()
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
/** Execute multiple tasks in parallel, returning results in input order. */
|
||
|
|
async executeAll<TInput, TOutput>(
|
||
|
|
tasks: WorkerTask<TInput, TOutput>[]
|
||
|
|
): Promise<TOutput[]> {
|
||
|
|
const promises = tasks.map(t => this.execute(t))
|
||
|
|
return Promise.all(promises)
|
||
|
|
}
|
||
|
|
|
||
|
|
/** Current pool metrics snapshot */
|
||
|
|
getMetrics(): WorkerPoolMetrics {
|
||
|
|
return { ...this.metrics }
|
||
|
|
}
|
||
|
|
|
||
|
|
/** Terminate all workers and clear the queue */
|
||
|
|
async shutdown(): Promise<void> {
|
||
|
|
this.shuttingDown = true
|
||
|
|
|
||
|
|
for (const pending of this.queue) {
|
||
|
|
pending.reject(new Error('WorkerPool shutdown'))
|
||
|
|
}
|
||
|
|
this.queue = []
|
||
|
|
|
||
|
|
for (const [, pending] of this.activeTasks) {
|
||
|
|
if (pending.timer) clearTimeout(pending.timer)
|
||
|
|
pending.reject(new Error('WorkerPool shutdown'))
|
||
|
|
}
|
||
|
|
this.activeTasks.clear()
|
||
|
|
}
|
||
|
|
|
||
|
|
private _pump(): void {
|
||
|
|
if (this.queue.length === 0) return
|
||
|
|
if (this.activeTasks.size >= this.options.maxWorkers) return
|
||
|
|
|
||
|
|
const pending = this.queue.shift()!
|
||
|
|
this.metrics.queued--
|
||
|
|
this.metrics.running++
|
||
|
|
|
||
|
|
const taskId = pending.task.id
|
||
|
|
this.activeTasks.set(taskId, pending)
|
||
|
|
|
||
|
|
const timer = setTimeout(() => {
|
||
|
|
pending.reject(
|
||
|
|
new Error(
|
||
|
|
`Task ${pending.task.id} timed out after ${this.options.taskTimeoutMs}ms`
|
||
|
|
)
|
||
|
|
)
|
||
|
|
this.activeTasks.delete(taskId)
|
||
|
|
this.metrics.running--
|
||
|
|
this.metrics.failed++
|
||
|
|
this.emit('timeout', pending.task.id)
|
||
|
|
this._pump()
|
||
|
|
}, this.options.taskTimeoutMs)
|
||
|
|
|
||
|
|
pending.timer = timer
|
||
|
|
|
||
|
|
const worker = new Worker(pending.task.workerScript)
|
||
|
|
|
||
|
|
worker.once(
|
||
|
|
'message',
|
||
|
|
(message: {
|
||
|
|
success: boolean
|
||
|
|
result?: unknown
|
||
|
|
error?: string
|
||
|
|
}) => {
|
||
|
|
clearTimeout(timer)
|
||
|
|
this.activeTasks.delete(taskId)
|
||
|
|
this.metrics.running--
|
||
|
|
void worker.terminate()
|
||
|
|
|
||
|
|
if (message.success) {
|
||
|
|
pending.resolve(message.result)
|
||
|
|
this.metrics.completed++
|
||
|
|
this.emit('completed', pending.task.id)
|
||
|
|
} else {
|
||
|
|
pending.reject(new Error(message.error || 'Worker task failed'))
|
||
|
|
this.metrics.failed++
|
||
|
|
this.emit('failed', pending.task.id)
|
||
|
|
}
|
||
|
|
|
||
|
|
this._pump()
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
worker.once('error', (err: Error) => {
|
||
|
|
clearTimeout(timer)
|
||
|
|
this.activeTasks.delete(taskId)
|
||
|
|
this.metrics.running--
|
||
|
|
this.metrics.failed++
|
||
|
|
void worker.terminate()
|
||
|
|
pending.reject(err)
|
||
|
|
this.emit('failed', pending.task.id)
|
||
|
|
this._pump()
|
||
|
|
})
|
||
|
|
|
||
|
|
worker.postMessage({ id: pending.task.id, input: pending.task.input })
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
export interface PromiseTask<TInput, TOutput> {
|
||
|
|
id: string
|
||
|
|
input: TInput
|
||
|
|
fn: (input: TInput) => TOutput | Promise<TOutput>
|
||
|
|
}
|
||
|
|
|
||
|
|
export interface PromisePoolOptions {
|
||
|
|
maxConcurrency: number
|
||
|
|
taskTimeoutMs: number
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* PromisePool provides lightweight parallelism without worker threads.
|
||
|
|
* Runs async functions with controlled concurrency.
|
||
|
|
*/
|
||
|
|
export class PromisePool extends EventEmitter {
|
||
|
|
private queue: Array<{
|
||
|
|
task: PromiseTask<unknown, unknown>
|
||
|
|
resolve: (value: unknown) => void
|
||
|
|
reject: (reason: Error) => void
|
||
|
|
}> = []
|
||
|
|
private running = 0
|
||
|
|
private metrics = { queued: 0, completed: 0, failed: 0 }
|
||
|
|
private shuttingDown = false
|
||
|
|
|
||
|
|
constructor(private options: PromisePoolOptions) {
|
||
|
|
super()
|
||
|
|
}
|
||
|
|
|
||
|
|
execute<TInput, TOutput>(
|
||
|
|
task: PromiseTask<TInput, TOutput>
|
||
|
|
): Promise<TOutput> {
|
||
|
|
if (this.shuttingDown) {
|
||
|
|
return Promise.reject(new Error('PromisePool is shutting down'))
|
||
|
|
}
|
||
|
|
|
||
|
|
return new Promise<TOutput>((resolve, reject) => {
|
||
|
|
this.queue.push({
|
||
|
|
task: task as PromiseTask<unknown, unknown>,
|
||
|
|
resolve: resolve as (value: unknown) => void,
|
||
|
|
reject,
|
||
|
|
})
|
||
|
|
this.metrics.queued++
|
||
|
|
this.emit('queued', task.id)
|
||
|
|
this._pump()
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
async executeAll<TInput, TOutput>(
|
||
|
|
tasks: PromiseTask<TInput, TOutput>[]
|
||
|
|
): Promise<TOutput[]> {
|
||
|
|
const promises = tasks.map(t => this.execute(t))
|
||
|
|
return Promise.all(promises)
|
||
|
|
}
|
||
|
|
|
||
|
|
getMetrics(): WorkerPoolMetrics {
|
||
|
|
return {
|
||
|
|
queued: this.metrics.queued,
|
||
|
|
running: this.running,
|
||
|
|
completed: this.metrics.completed,
|
||
|
|
failed: this.metrics.failed,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
async shutdown(): Promise<void> {
|
||
|
|
this.shuttingDown = true
|
||
|
|
for (const item of this.queue) {
|
||
|
|
item.reject(new Error('PromisePool shutdown'))
|
||
|
|
}
|
||
|
|
this.queue = []
|
||
|
|
}
|
||
|
|
|
||
|
|
private _pump(): void {
|
||
|
|
if (this.queue.length === 0) return
|
||
|
|
if (this.running >= this.options.maxConcurrency) return
|
||
|
|
|
||
|
|
const item = this.queue.shift()!
|
||
|
|
this.metrics.queued--
|
||
|
|
this.running++
|
||
|
|
|
||
|
|
const timer = setTimeout(() => {
|
||
|
|
this.running--
|
||
|
|
this.metrics.failed++
|
||
|
|
item.reject(
|
||
|
|
new Error(
|
||
|
|
`Task ${item.task.id} timed out after ${this.options.taskTimeoutMs}ms`
|
||
|
|
)
|
||
|
|
)
|
||
|
|
this.emit('timeout', item.task.id)
|
||
|
|
this._pump()
|
||
|
|
}, this.options.taskTimeoutMs)
|
||
|
|
|
||
|
|
Promise.resolve(item.task.fn(item.task.input))
|
||
|
|
.then(result => {
|
||
|
|
clearTimeout(timer)
|
||
|
|
this.running--
|
||
|
|
this.metrics.completed++
|
||
|
|
item.resolve(result)
|
||
|
|
this.emit('completed', item.task.id)
|
||
|
|
this._pump()
|
||
|
|
})
|
||
|
|
.catch(err => {
|
||
|
|
clearTimeout(timer)
|
||
|
|
this.running--
|
||
|
|
this.metrics.failed++
|
||
|
|
item.reject(err instanceof Error ? err : new Error(String(err)))
|
||
|
|
this.emit('failed', item.task.id)
|
||
|
|
this._pump()
|
||
|
|
})
|
||
|
|
}
|
||
|
|
}
|