从 Promise.race 到并发任务调度器:实现可控的文件上传

在实现文件上传功能时,很多时候会使用 Promise.race 来控制并发:

const pool = new Set();

for (const file of files) {
  const task = upload(file);
  pool.add(task);
  task.finally(() => pool.delete(task));

  if (pool.size >= limit) {
    await Promise.race(pool);
  }
}

这段代码确实能限制同时上传几个任务,但它本质上只是一个”信号灯”——它只知道”有一个任务完成了”,却不知道是哪 一个,也不知道当前队列的真实状态。

当我们需要实现暂停、恢复、取消、状态追踪等功能时,Promise.race 就力不从心了。

1.Promise.race 的局限性

功能 Promise.race 任务调度系统
并发数量控制 支持 支持
任务管理(增删改查) 不支持 支持
状态管理 不支持 支持
暂停/恢复 不支持 支持
失败重试策略 不支持 支持
优先级调度 不支持 支持

核心问题:Promise.race 无法追踪任务状态,无法干预任务执行

2.任务调度系统的设计

2.1 系统架构

核心流程

  1. 任务进入等待队列
  2. 调度器从队列取出任务
  3. 固定数量的 Worker 消费任务
  4. 任务完成后,Worker 空闲,继续消费队列

2.2 任务状态定义

// 定义任务的所有可能状态
const TASK_STATUS = {
  WAITING: 'waiting',     // 等待中
  UPLOADING: 'uploading',  // 上传中
  SUCCESS: 'success',      // 上传成功
  ERROR: 'error',          // 上传失败
  PAUSED: 'paused',       // 已暂停
};

// 上传任务类
class UploadTask {
  constructor(file) {
    this.id = crypto.randomUUID();    // 唯一标识
    this.file = file;                  // 文件对象
    this.status = TASK_STATUS.WAITING; // 当前状态
    this.progress = 0;                // 进度 0-1
    this.retryCount = 0;              // 重试次数
    this.maxRetry = 3;               // 最大重试次数
    this.controller = new AbortController(); // 取消控制器
  }
}

2.3 调度器核心实现

class UploadScheduler {
  constructor(concurrency = 3) {
    this.concurrency = concurrency;     // 最大并发数
    this.queue = [];                   // 等待队列
    this.running = new Map();         // 正在运行的任务 Map<id, task>
    this.paused = false;               // 全局暂停标志
  }

  // 添加任务
  addTask(file) {
    const task = new UploadTask(file);
    this.queue.push(task);
    this.schedule();
    return task;
  }

  // 核心调度逻辑
  schedule() {
    // 暂停时不调度
    if (this.paused) return;

    // 只要还有空闲 worker 且队列还有任务
    while (
      this.running.size < this.concurrency &&
      this.queue.length > 0
    ) {
      const task = this.queue.shift();
      this.runTask(task);
    }
  }

  // 执行任务
  async runTask(task) {
    // 放入运行池
    this.running.set(task.id, task);
    task.status = TASK_STATUS.UPLOADING;

    try {
      await this.uploadFile(task);
      task.status = TASK_STATUS.SUCCESS;
      console.log(`${task.file.name} 上传成功`);
      // 成功时从 running 删除
      this.running.delete(task.id);
    } catch (err) {
      // 关键:区分是用户暂停还是真实错误
      if (err.name === 'AbortError') {
        // 用户主动暂停,不删除!保持 running 中的引用
        task.status = TASK_STATUS.PAUSED;
      } else {
        console.error(`${task.file.name} 上传失败`, err);
        // 真实错误,触发重试逻辑
        if (task.retryCount < task.maxRetry) {
          task.retryCount++;
          console.log(`${task.file.name} 重试第 ${task.retryCount} 次`);
          task.status = TASK_STATUS.WAITING;
          this.running.delete(task.id);  // 重试入队,从 running 删除
          this.queue.push(task);  // 重新入队
        } else {
          task.status = TASK_STATUS.ERROR;
          this.running.delete(task.id);  // 失败耗尽,从 running 删除
        }
      }
    }

    // 继续调度
    this.schedule();
  }

  // 上传文件
  async uploadFile(task) {
    const formData = new FormData();
    formData.append('file', task.file);

    const response = await fetch('/upload', {
      method: 'POST',
      body: formData,
      signal: task.controller.signal,  // 关联取消控制器
    });

    return await response.json();
  }
}

代码解读

  1. 任务池:使用 running Map 追踪正在运行的任务
  2. 调度循环while 循环保证只要有空闲 worker 就会继续调度
  3. 自动释放:finally 块确保任务结束后 worker 会被释放
  4. 重试机制:失败任务重新入队,而不是直接标记失败

2.4 暂停与恢复

// 暂停全部
pause() {
  this.paused = true;

  // 中止所有正在运行的请求
  for (const task of this.running.values()) {
    task.controller.abort();
    // 注意:status 会在 runTask 的 catch 中被设为 PAUSED
  }
}

// 恢复全部
resume() {
  this.paused = false;

  // 将暂停的任务重新入队
  for (const task of this.running.values()) {
    if (task.status === TASK_STATUS.PAUSED) {
      // 创建新的 AbortController(一个只能 abort 一次)
      task.controller = new AbortController();
      task.status = TASK_STATUS.WAITING;
      this.queue.push(task);
    }
  }

  this.schedule();
}

// 暂停单个任务
pauseTask(taskId) {
  const task = this.running.get(taskId);
  if (task) {
    task.controller.abort();
    // status 会在 runTask 的 catch 中被设为 PAUSED
  }
}

// 恢复单个任务
resumeTask(taskId) {
  const task = this.running.get(taskId);
  if (task && task.status === TASK_STATUS.PAUSED) {
    // 创建新的 AbortController(一个只能 abort 一次)
    task.controller = new AbortController();
    task.status = TASK_STATUS.WAITING;
    this.queue.push(task);
    this.schedule();
  }
}

2.5 取消任务

// 取消指定任务
cancel(taskId) {
  const task = this.running.get(taskId);

  if (!task) return;

  // 终止请求
  task.controller.abort();
  // 注意:status 会在 runTask 的 catch 中被设为 ERROR

  this.running.delete(taskId);
  this.schedule();
}

2.6 任务状态流转

任务调度系统本质上是一个状态机,不同状态之间会根据任务执行结果发生流转。

状态流转说明

  • WAITING:任务进入等待队列,尚未开始上传
  • UPLOADING:任务被调度器分配给 Worker,正在上传
  • PAUSED:用户主动暂停,通过 AbortController 中断请求
  • ERROR:上传失败,若未超过最大重试次数会重新进入等待队列
  • SUCCESS:任务上传完成,生命周期结束

3. 结合分片上传的任务调度

在实际的文件上传场景中,每个大文件会被分成多个 chunk 上传。我们需要在任务调度的基础上,记录每个 chunk 的状态。

3.1 支持分片的任务

class UploadTask {
  constructor(file) {
    this.id = crypto.randomUUID();
    this.file = file;
    this.status = TASK_STATUS.WAITING;
    this.progress = 0;

    // 分片相关
    this.chunks = [];           // 分片数组
    this.chunkStates = [];      // 每个分片的状态
    this.totalChunks = 0;
    this.doneChunks = 0;

    // 取消控制器
    this.controller = new AbortController();
  }
}

3.2 分片上传逻辑

async uploadChunks(task) {
  // 生成分片
  const chunks = this.buildChunks(task.file);
  task.chunks = chunks;
  task.totalChunks = chunks.length;
  task.chunkStates = new Array(task.totalChunks).fill('pending');

  // 逐个上传分片
  for (let i = 0; i < task.totalChunks; i++) {
    // 检查是否被暂停/取消
    if (task.controller.signal.aborted) {
      return;  // 用户暂停,直接返回,不抛错误
    }

    // 跳过已完成的分片(支持恢复)
    if (task.chunkStates[i] === 'done') {
      continue;
    }

    task.chunkStates[i] = 'uploading';

    try {
      await this.uploadChunk(task, i);
      task.chunkStates[i] = 'done';
      task.doneChunks++;
      task.progress = task.doneChunks / task.totalChunks;
    } catch (err) {
      // 关键:区分是用户暂停还是真实错误
      if (err.name === 'AbortError') {
        return;  // 用户暂停,不抛错误,不触发外层重试
      }
      // 真实错误
      task.chunkStates[i] = 'error';
      throw err;  // 抛出给外层处理
    }
  }

  // 合并分片
  await this.mergeChunks(task);
}

// 生成分片
buildChunks(file) {
  const chunks = [];
  for (let i = 0; i < file.size; i += CHUNK_SIZE) {
    chunks.push(file.slice(i, Math.min(i + CHUNK_SIZE, file.size)));
  }
  return chunks;
}

3.3 支持恢复的分片上传

恢复上传的关键是:跳过已完成的分片

async resumeUpload(task) {
  // 创建新的 AbortController
  task.controller = new AbortController();
  task.status = TASK_STATUS.UPLOADING;

  // 从上次中断的地方继续
  for (let i = 0; i < task.totalChunks; i++) {
    if (task.controller.signal.aborted) return;

    // 跳过已完成和失败的分片
    if (task.chunkStates[i] !== 'done') {
      if (task.chunkStates[i] === 'error') {
        // 重试失败的 chunk
        task.chunkStates[i] = 'uploading';
        try {
          await this.uploadChunk(task, i);
          task.chunkStates[i] = 'done';
          task.doneChunks++;
        } catch (err) {
          // 关键:区分是用户暂停还是真实错误
          if (err.name === 'AbortError') {
            return;  // 用户暂停,不抛错误
          }
          task.chunkStates[i] = 'error';
          throw err;  // 真实错误抛给外层
        }
      }
    }

    task.progress = task.doneChunks / task.totalChunks;
  }

  await this.mergeChunks(task);
}

3.4 完整示例

class UploadScheduler {
  constructor(concurrency = 3) {
    this.concurrency = concurrency;
    this.queue = [];
    this.running = new Map();
    this.paused = false;
  }

  addTask(file) {
    const task = new UploadTask(file);
    this.queue.push(task);
    this.schedule();
    return task;
  }

  schedule() {
    if (this.paused) return;

    while (
      this.running.size < this.concurrency &&
      this.queue.length > 0
    ) {
      const task = this.queue.shift();
      this.runTask(task);
    }
  }

  async runTask(task) {
    this.running.set(task.id, task);
    task.status = TASK_STATUS.UPLOADING;

    try {
      await this.uploadChunks(task);
      task.status = TASK_STATUS.SUCCESS;
      // 成功或重试入队后,才从 running 删除
      this.running.delete(task.id);
    } catch (err) {
      if (err.name === 'AbortError') {
        // 用户暂停:不删除!保持 running 中的引用
        task.status = TASK_STATUS.PAUSED;
        // 注意:不在这里删除 task,让它在 running 中保留
        // resume 时可以找到并重新入队
      } else {
        // 真实错误
        if (task.retryCount < task.maxRetry) {
          task.retryCount++;
          task.status = TASK_STATUS.WAITING;
          this.running.delete(task.id);  // 重试入队,从 running 删除
          this.queue.push(task);
        } else {
          task.status = TASK_STATUS.ERROR;
          this.running.delete(task.id);  // 失败耗尽,从 running 删除
        }
      }
    }

    this.schedule();
  }

  async uploadChunks(task) {
    const chunks = this.buildChunks(task.file);
    task.chunks = chunks;
    task.totalChunks = chunks.length;
    task.chunkStates = new Array(task.totalChunks).fill('pending');
    task.doneChunks = 0;

    for (let i = 0; i < task.totalChunks; i++) {
      if (task.controller.signal.aborted) return;

      if (task.chunkStates[i] === 'done') continue;

      task.chunkStates[i] = 'uploading';

      try {
        await this.uploadChunk(task, i);
        task.chunkStates[i] = 'done';
        task.doneChunks++;
        task.progress = task.doneChunks / task.totalChunks;
      } catch (err) {
        // 关键:区分是用户暂停还是真实错误
        if (err.name === 'AbortError') {
          return;
        }
        task.chunkStates[i] = 'error';
        throw err;
      }
    }

    await this.mergeChunks(task);
  }

  buildChunks(file) {
    const chunks = [];
    for (let i = 0; i < file.size; i += CHUNK_SIZE) {
      chunks.push(file.slice(i, Math.min(i + CHUNK_SIZE, file.size)));
    }
    return chunks;
  }

  async uploadChunk(task, chunkIndex) {
    const formData = new FormData();
    formData.append('chunk', task.chunks[chunkIndex]);
    formData.append('hash', task.hash);

    const response = await fetch('/upload', {
      method: 'POST',
      body: formData,
      signal: task.controller.signal,
    });

    if (!response.ok) throw new Error(`HTTP ${response.status}`);
  }

  async mergeChunks(task) {
    if (DEMO_MODE) return;
    const response = await fetch('/upload/merge', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ hash: task.hash, name: task.file.name, totalChunks: task.totalChunks }),
      signal: task.controller.signal,
    });
    if (!response.ok) throw new Error('Merge failed');
  }

  pause() {
    this.paused = true;
    for (const task of this.running.values()) {
      task.controller.abort();
      // status 由 runTask 的 catch 设置为 PAUSED
    }
  }

  resume() {
    this.paused = false;
    for (const task of this.running.values()) {
      if (task.status === TASK_STATUS.PAUSED) {
        task.controller = new AbortController();
        task.status = TASK_STATUS.WAITING;
        this.queue.push(task);
      }
    }
    this.schedule();
  }

  cancel(taskId) {
    const task = this.running.get(taskId);
    if (task) {
      task.controller.abort();
      // status 由 runTask 的 catch 设置为 ERROR(因为不是 AbortError)
    }
  }

  getStats() {
    return {
      queueLength: this.queue.length,
      runningCount: this.running.size,
      paused: this.paused,
      tasks: Array.from(this.running.values()).map(t => ({
        id: t.id,
        name: t.file.name,
        status: t.status,
        progress: t.progress,
      })),
    };
  }
}

4. 总结

任务调度系统的核心优势

特性 实现方式
并发控制 running.size < concurrency 条件判断
任务追踪 Map<id, task> 维护所有任务
状态管理 任务状态机 + 状态流转图
暂停恢复 AbortController + 重新入队
失败重试 失败任务重新入队 + 重试计数器
统一调度 schedule() 方法集中处理

关键设计思想

  1. 池化思想:Worker 池复用,而不是每次创建新 worker
  2. 状态驱动:任务状态决定行为,而不是流程控制
  3. 可中断:所有异步操作都关联 AbortController
  4. 自动调度finally 块确保 worker 始终被释放

相比 Promise.race 的”只管放行,不管回收”,任务调度系统提供了完整的生命周期管理和干预能力。