从 Promise.race 到并发任务调度器:实现可控的文件上传
在实现文件上传功能时,很多时候会使用 Promise.race 来控制并发:
const pool = new Set();
for (const file of files) {
const task = upload(file);
pool.add(task);
task.finally(() => pool.delete(task));
if (pool.size >= limit) {
await Promise.race(pool);
}
}
这段代码确实能限制同时上传几个任务,但它本质上只是一个”信号灯”——它只知道”有一个任务完成了”,却不知道是哪 一个,也不知道当前队列的真实状态。
当我们需要实现暂停、恢复、取消、状态追踪等功能时,Promise.race 就力不从心了。
1.Promise.race 的局限性
| 功能 | Promise.race | 任务调度系统 |
|---|---|---|
| 并发数量控制 | 支持 | 支持 |
| 任务管理(增删改查) | 不支持 | 支持 |
| 状态管理 | 不支持 | 支持 |
| 暂停/恢复 | 不支持 | 支持 |
| 失败重试策略 | 不支持 | 支持 |
| 优先级调度 | 不支持 | 支持 |
核心问题:Promise.race 无法追踪任务状态,无法干预任务执行。
2.任务调度系统的设计
2.1 系统架构

核心流程:
- 任务进入等待队列
- 调度器从队列取出任务
- 固定数量的 Worker 消费任务
- 任务完成后,Worker 空闲,继续消费队列
2.2 任务状态定义
// 定义任务的所有可能状态
const TASK_STATUS = {
WAITING: 'waiting', // 等待中
UPLOADING: 'uploading', // 上传中
SUCCESS: 'success', // 上传成功
ERROR: 'error', // 上传失败
PAUSED: 'paused', // 已暂停
};
// 上传任务类
class UploadTask {
constructor(file) {
this.id = crypto.randomUUID(); // 唯一标识
this.file = file; // 文件对象
this.status = TASK_STATUS.WAITING; // 当前状态
this.progress = 0; // 进度 0-1
this.retryCount = 0; // 重试次数
this.maxRetry = 3; // 最大重试次数
this.controller = new AbortController(); // 取消控制器
}
}
2.3 调度器核心实现
class UploadScheduler {
constructor(concurrency = 3) {
this.concurrency = concurrency; // 最大并发数
this.queue = []; // 等待队列
this.running = new Map(); // 正在运行的任务 Map<id, task>
this.paused = false; // 全局暂停标志
}
// 添加任务
addTask(file) {
const task = new UploadTask(file);
this.queue.push(task);
this.schedule();
return task;
}
// 核心调度逻辑
schedule() {
// 暂停时不调度
if (this.paused) return;
// 只要还有空闲 worker 且队列还有任务
while (
this.running.size < this.concurrency &&
this.queue.length > 0
) {
const task = this.queue.shift();
this.runTask(task);
}
}
// 执行任务
async runTask(task) {
// 放入运行池
this.running.set(task.id, task);
task.status = TASK_STATUS.UPLOADING;
try {
await this.uploadFile(task);
task.status = TASK_STATUS.SUCCESS;
console.log(`${task.file.name} 上传成功`);
// 成功时从 running 删除
this.running.delete(task.id);
} catch (err) {
// 关键:区分是用户暂停还是真实错误
if (err.name === 'AbortError') {
// 用户主动暂停,不删除!保持 running 中的引用
task.status = TASK_STATUS.PAUSED;
} else {
console.error(`${task.file.name} 上传失败`, err);
// 真实错误,触发重试逻辑
if (task.retryCount < task.maxRetry) {
task.retryCount++;
console.log(`${task.file.name} 重试第 ${task.retryCount} 次`);
task.status = TASK_STATUS.WAITING;
this.running.delete(task.id); // 重试入队,从 running 删除
this.queue.push(task); // 重新入队
} else {
task.status = TASK_STATUS.ERROR;
this.running.delete(task.id); // 失败耗尽,从 running 删除
}
}
}
// 继续调度
this.schedule();
}
// 上传文件
async uploadFile(task) {
const formData = new FormData();
formData.append('file', task.file);
const response = await fetch('/upload', {
method: 'POST',
body: formData,
signal: task.controller.signal, // 关联取消控制器
});
return await response.json();
}
}
代码解读:
- 任务池:使用
runningMap 追踪正在运行的任务 - 调度循环:
while循环保证只要有空闲 worker 就会继续调度 - 自动释放:finally 块确保任务结束后 worker 会被释放
- 重试机制:失败任务重新入队,而不是直接标记失败
2.4 暂停与恢复
// 暂停全部
pause() {
this.paused = true;
// 中止所有正在运行的请求
for (const task of this.running.values()) {
task.controller.abort();
// 注意:status 会在 runTask 的 catch 中被设为 PAUSED
}
}
// 恢复全部
resume() {
this.paused = false;
// 将暂停的任务重新入队
for (const task of this.running.values()) {
if (task.status === TASK_STATUS.PAUSED) {
// 创建新的 AbortController(一个只能 abort 一次)
task.controller = new AbortController();
task.status = TASK_STATUS.WAITING;
this.queue.push(task);
}
}
this.schedule();
}
// 暂停单个任务
pauseTask(taskId) {
const task = this.running.get(taskId);
if (task) {
task.controller.abort();
// status 会在 runTask 的 catch 中被设为 PAUSED
}
}
// 恢复单个任务
resumeTask(taskId) {
const task = this.running.get(taskId);
if (task && task.status === TASK_STATUS.PAUSED) {
// 创建新的 AbortController(一个只能 abort 一次)
task.controller = new AbortController();
task.status = TASK_STATUS.WAITING;
this.queue.push(task);
this.schedule();
}
}
2.5 取消任务
// 取消指定任务
cancel(taskId) {
const task = this.running.get(taskId);
if (!task) return;
// 终止请求
task.controller.abort();
// 注意:status 会在 runTask 的 catch 中被设为 ERROR
this.running.delete(taskId);
this.schedule();
}
2.6 任务状态流转
任务调度系统本质上是一个状态机,不同状态之间会根据任务执行结果发生流转。

状态流转说明
WAITING:任务进入等待队列,尚未开始上传UPLOADING:任务被调度器分配给 Worker,正在上传PAUSED:用户主动暂停,通过AbortController中断请求ERROR:上传失败,若未超过最大重试次数会重新进入等待队列SUCCESS:任务上传完成,生命周期结束
3. 结合分片上传的任务调度
在实际的文件上传场景中,每个大文件会被分成多个 chunk 上传。我们需要在任务调度的基础上,记录每个 chunk 的状态。
3.1 支持分片的任务
class UploadTask {
constructor(file) {
this.id = crypto.randomUUID();
this.file = file;
this.status = TASK_STATUS.WAITING;
this.progress = 0;
// 分片相关
this.chunks = []; // 分片数组
this.chunkStates = []; // 每个分片的状态
this.totalChunks = 0;
this.doneChunks = 0;
// 取消控制器
this.controller = new AbortController();
}
}
3.2 分片上传逻辑
async uploadChunks(task) {
// 生成分片
const chunks = this.buildChunks(task.file);
task.chunks = chunks;
task.totalChunks = chunks.length;
task.chunkStates = new Array(task.totalChunks).fill('pending');
// 逐个上传分片
for (let i = 0; i < task.totalChunks; i++) {
// 检查是否被暂停/取消
if (task.controller.signal.aborted) {
return; // 用户暂停,直接返回,不抛错误
}
// 跳过已完成的分片(支持恢复)
if (task.chunkStates[i] === 'done') {
continue;
}
task.chunkStates[i] = 'uploading';
try {
await this.uploadChunk(task, i);
task.chunkStates[i] = 'done';
task.doneChunks++;
task.progress = task.doneChunks / task.totalChunks;
} catch (err) {
// 关键:区分是用户暂停还是真实错误
if (err.name === 'AbortError') {
return; // 用户暂停,不抛错误,不触发外层重试
}
// 真实错误
task.chunkStates[i] = 'error';
throw err; // 抛出给外层处理
}
}
// 合并分片
await this.mergeChunks(task);
}
// 生成分片
buildChunks(file) {
const chunks = [];
for (let i = 0; i < file.size; i += CHUNK_SIZE) {
chunks.push(file.slice(i, Math.min(i + CHUNK_SIZE, file.size)));
}
return chunks;
}
3.3 支持恢复的分片上传
恢复上传的关键是:跳过已完成的分片。
async resumeUpload(task) {
// 创建新的 AbortController
task.controller = new AbortController();
task.status = TASK_STATUS.UPLOADING;
// 从上次中断的地方继续
for (let i = 0; i < task.totalChunks; i++) {
if (task.controller.signal.aborted) return;
// 跳过已完成和失败的分片
if (task.chunkStates[i] !== 'done') {
if (task.chunkStates[i] === 'error') {
// 重试失败的 chunk
task.chunkStates[i] = 'uploading';
try {
await this.uploadChunk(task, i);
task.chunkStates[i] = 'done';
task.doneChunks++;
} catch (err) {
// 关键:区分是用户暂停还是真实错误
if (err.name === 'AbortError') {
return; // 用户暂停,不抛错误
}
task.chunkStates[i] = 'error';
throw err; // 真实错误抛给外层
}
}
}
task.progress = task.doneChunks / task.totalChunks;
}
await this.mergeChunks(task);
}
3.4 完整示例
class UploadScheduler {
constructor(concurrency = 3) {
this.concurrency = concurrency;
this.queue = [];
this.running = new Map();
this.paused = false;
}
addTask(file) {
const task = new UploadTask(file);
this.queue.push(task);
this.schedule();
return task;
}
schedule() {
if (this.paused) return;
while (
this.running.size < this.concurrency &&
this.queue.length > 0
) {
const task = this.queue.shift();
this.runTask(task);
}
}
async runTask(task) {
this.running.set(task.id, task);
task.status = TASK_STATUS.UPLOADING;
try {
await this.uploadChunks(task);
task.status = TASK_STATUS.SUCCESS;
// 成功或重试入队后,才从 running 删除
this.running.delete(task.id);
} catch (err) {
if (err.name === 'AbortError') {
// 用户暂停:不删除!保持 running 中的引用
task.status = TASK_STATUS.PAUSED;
// 注意:不在这里删除 task,让它在 running 中保留
// resume 时可以找到并重新入队
} else {
// 真实错误
if (task.retryCount < task.maxRetry) {
task.retryCount++;
task.status = TASK_STATUS.WAITING;
this.running.delete(task.id); // 重试入队,从 running 删除
this.queue.push(task);
} else {
task.status = TASK_STATUS.ERROR;
this.running.delete(task.id); // 失败耗尽,从 running 删除
}
}
}
this.schedule();
}
async uploadChunks(task) {
const chunks = this.buildChunks(task.file);
task.chunks = chunks;
task.totalChunks = chunks.length;
task.chunkStates = new Array(task.totalChunks).fill('pending');
task.doneChunks = 0;
for (let i = 0; i < task.totalChunks; i++) {
if (task.controller.signal.aborted) return;
if (task.chunkStates[i] === 'done') continue;
task.chunkStates[i] = 'uploading';
try {
await this.uploadChunk(task, i);
task.chunkStates[i] = 'done';
task.doneChunks++;
task.progress = task.doneChunks / task.totalChunks;
} catch (err) {
// 关键:区分是用户暂停还是真实错误
if (err.name === 'AbortError') {
return;
}
task.chunkStates[i] = 'error';
throw err;
}
}
await this.mergeChunks(task);
}
buildChunks(file) {
const chunks = [];
for (let i = 0; i < file.size; i += CHUNK_SIZE) {
chunks.push(file.slice(i, Math.min(i + CHUNK_SIZE, file.size)));
}
return chunks;
}
async uploadChunk(task, chunkIndex) {
const formData = new FormData();
formData.append('chunk', task.chunks[chunkIndex]);
formData.append('hash', task.hash);
const response = await fetch('/upload', {
method: 'POST',
body: formData,
signal: task.controller.signal,
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
}
async mergeChunks(task) {
if (DEMO_MODE) return;
const response = await fetch('/upload/merge', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ hash: task.hash, name: task.file.name, totalChunks: task.totalChunks }),
signal: task.controller.signal,
});
if (!response.ok) throw new Error('Merge failed');
}
pause() {
this.paused = true;
for (const task of this.running.values()) {
task.controller.abort();
// status 由 runTask 的 catch 设置为 PAUSED
}
}
resume() {
this.paused = false;
for (const task of this.running.values()) {
if (task.status === TASK_STATUS.PAUSED) {
task.controller = new AbortController();
task.status = TASK_STATUS.WAITING;
this.queue.push(task);
}
}
this.schedule();
}
cancel(taskId) {
const task = this.running.get(taskId);
if (task) {
task.controller.abort();
// status 由 runTask 的 catch 设置为 ERROR(因为不是 AbortError)
}
}
getStats() {
return {
queueLength: this.queue.length,
runningCount: this.running.size,
paused: this.paused,
tasks: Array.from(this.running.values()).map(t => ({
id: t.id,
name: t.file.name,
status: t.status,
progress: t.progress,
})),
};
}
}
4. 总结
任务调度系统的核心优势
| 特性 | 实现方式 |
|---|---|
| 并发控制 | running.size < concurrency 条件判断 |
| 任务追踪 | Map<id, task> 维护所有任务 |
| 状态管理 | 任务状态机 + 状态流转图 |
| 暂停恢复 | AbortController + 重新入队 |
| 失败重试 | 失败任务重新入队 + 重试计数器 |
| 统一调度 | schedule() 方法集中处理 |
关键设计思想
- 池化思想:Worker 池复用,而不是每次创建新 worker
- 状态驱动:任务状态决定行为,而不是流程控制
- 可中断:所有异步操作都关联
AbortController - 自动调度:
finally块确保 worker 始终被释放
相比 Promise.race 的”只管放行,不管回收”,任务调度系统提供了完整的生命周期管理和干预能力。