|
1 | | -import { defineConfig } from './config' |
| 1 | +import { Worker } from 'node:worker_threads' |
2 | 2 | import { Stats } from 'node:fs' |
3 | | -import { haveTimeout, pendingPromise } from './cross' |
4 | | -import { stat } from 'fs/promises' |
| 3 | +import { pendingPromise } from './cross' |
5 | 4 |
|
6 | | -const fileTimeout = defineConfig('file_timeout', 3, x => x * 1000) |
| 5 | +// all stat requests for the same worker are serialized, potentially introducing extra latency |
7 | 6 |
|
8 | | -// since nodejs' UV_THREADPOOL_SIZE is limited, avoid using multiple slots for the same UNC host, and always leave one free for local operations |
9 | | -const poolSize = Number(process.env.UV_THREADPOOL_SIZE || 4) |
10 | | -const previous = new Map<string, Promise<Stats>>() // wrapped promises with haveTimeout |
11 | | -const working = new Set<Promise<Stats>>() // plain stat's promise |
12 | | -export async function statWithTimeout(path: string) { |
13 | | - const uncHost = /^\\\\([^\\]+)\\/.exec(path)?.[1] |
14 | | - if (!uncHost) |
15 | | - return haveTimeout(fileTimeout.compiled(), stat(path)) |
16 | | - const busy = process.env.HFS_PARALLEL_UNC ? null : previous.get(uncHost) // by default we serialize requests on the same UNC host, to keep threadpool usage low |
17 | | - const ret = pendingPromise<Stats>() |
18 | | - previous.set(uncHost, ret) // reserve the slot before starting the operation |
19 | | - const err = await busy?.then(() => false, e => e.message === 'timeout' && e) // only timeout error is shared with pending requests |
20 | | - if (err) { |
21 | | - if (previous.get(uncHost) === ret) // but we don't want to block forever, only involve those that were already waiting |
22 | | - previous.delete(uncHost) |
23 | | - ret.reject(err) |
| 7 | +const pool = new Map<string, (path: string) => Promise<Stats>>() |
| 8 | + |
| 9 | +export function getStatWorker(key: string) { |
| 10 | + const existing = pool.get(key) |
| 11 | + if (existing) |
| 12 | + return existing |
| 13 | + const worker = new Worker(__dirname + '/statWorker.js') |
| 14 | + worker.unref() |
| 15 | + const requests = new Map() |
| 16 | + worker.on('message', (msg: any) => { |
| 17 | + requests.get(msg.path)?.resolve(msg.error |
| 18 | + ? Promise.reject(new Error(msg.error)) |
| 19 | + : Object.setPrototypeOf(msg.result, Stats.prototype) |
| 20 | + ) |
| 21 | + requests.delete(msg.path) |
| 22 | + }) |
| 23 | + worker.on('error', (err) => { |
| 24 | + for (const p of requests.values()) |
| 25 | + p.reject(err) |
| 26 | + requests.clear() |
| 27 | + worker.terminate().catch(() => {}) |
| 28 | + pool.delete(key) |
| 29 | + }) |
| 30 | + pool.set(key, query) |
| 31 | + return query |
| 32 | + |
| 33 | + function query(path: string) { |
| 34 | + const ret = pendingPromise<Stats>() |
| 35 | + requests.set(path, ret) |
| 36 | + worker.postMessage(path) |
24 | 37 | return ret |
25 | 38 | } |
26 | | - while (working.size >= poolSize - 1) // always leave one slot free for local operations |
27 | | - await Promise.race(working.values()).catch(() => {}) // we are assuming UV_THREADPOOL_SIZE > 1, otherwise race() will deadlock |
28 | | - const op = stat(path) |
29 | | - working.add(op) |
30 | | - try { |
31 | | - ret.resolve(await haveTimeout(fileTimeout.compiled(), |
32 | | - op.finally(() => working.delete(op)) )) |
33 | | - } |
34 | | - catch (e) { |
35 | | - ret.reject(e) |
36 | | - } |
37 | | - finally { |
38 | | - if (previous.get(uncHost) === ret) |
39 | | - previous.delete(uncHost) |
40 | | - } |
41 | | - return ret |
42 | 39 | } |
0 commit comments