作为一门单线程语言,JS有锁这种概念吗。诚然,因为JS单线程运行,而我们一般对变量的并发操作都是同步的,但如果是异步场景,那就两说了。

并发问题构造

以最经典的 counter 问题为例子,来看下,每个worker都会从server拿取数据,并加到全局的counter上。同时启动100个worker,查看最终的counter值。

const delay = () => new Promise(resolve => setTimeout(resolve, 0))
const fetchFromServer = () => new Promise(resolve => setTimeout(() => resolve(10), Math.random() * 1000))

async function updateObj(patch) {
  const cnt = globalObj.counter
  await delay()
  globalObj.counter = cnt + patch
}

const globalObj = {
  counter: 0,
}

async function worker() {
  const patch = await fetchFromServer()
  globalObj.counter += patch // 并发读写操作
}

Promise.all(Array.from({length: 100}, _ => worker()))
  .then(() => console.log(globalObj)) // => 1000

上面的并发代码为什么能输出正确的结果呢?仔细分析,我们的并发读写操作只有一行,那就是 globalObj.counter += patch 这一自增操作,而正因为JS是单线程的,所以这个自增操作不会被拆开,所以多个Promise去修改变量,实际上也是串行的操作。那如果我们进行构造,让这个自增动作的中间,让出CPU,那就使得自增不是原子操作。这里我们使用 await 来实现

const delay = () => new Promise(resolve => setTimeout(resolve, 0))
const fetchFromServer = () => new Promise(resolve => setTimeout(() => resolve(10), Math.random() * 1000))

async function updateObj(patch) {
  const cnt = globalObj.counter
  await delay()
  globalObj.counter = cnt + patch
}

const globalObj = {
  counter: 0,
}

async function worker() {
  const patch = await fetchFromServer()
  await updateObj(patch)
}

Promise.all(Array.from({length: 100}, _ => worker()))
  .then(() => console.log(globalObj)) // => what?

这样一来,输出就不固定了,也就复现出了其他多线程语言的counter问题。所以回到最前面的问题,JS需要锁吗?答案是需要的

预期效果

实现一个互斥锁 Mutex ,它拥有获取和释放两个方法

interface IMutex {
  acquire: () => Promise<void>; // 不断轮询直到获取锁为止
  release: () => void; // 释放锁
}

从而能够让第二个例子的输出始终为 1000

const mutex = new Mutex()

async function worker() {
  const patch = await fetchFromServer()
  await mutex.acquire()
  await updateObj(patch)
  mutex.release()
}
// the rest
Promise.all(Array.from({length: 100}, _ => worker()))
  .then(() => console.log(globalObj)) // => always 1000

锁的实现

轮询

其他语言最简单的实现锁的方法就是自旋锁(spin lock),通过不停的 while 循环并查看锁的状态,吃光CPU,这对于单线程的JS肯定是不能这么玩的。

在JS上,最简单的实现轮询方式那就是 setInterval,把查询操作塞到 eventloop 里,我们来改造一下

class SpinMutex implements IMutex {
  private _locked = false 
  acquire() {
    return new Promise<void>(resolve => {
      const timer = setInterval(() => {
        if (!this._locked) {
          this._locked = true
          clearInterval(timer)
          resolve()
        }
      }, 0)
    })
  }
  release() {
    this._locked = false
  }
}
// print 1000

事件驱动

使用轮询会把大量重复的 callback 塞到 eventloop 里,显然这部分冗余是可以消除的,那就是自己实现一个任务队列,把 进入临界区 这一操作push到队列中,当目前的任务执行完毕,便再从队头的任务取出来执行。如此,便取代了轮询这一低效的操作。事实上,因为使用了队列,所以还可以做到控制并发量的效果(Mutex相当于最大并发量为1的特殊情况)

class EventMutex implements IMutex {
  private _queue: Function[] = [];
  private _maxConcurrency = 1; // 最大并发量为1
  async acquire() {
    if (this._maxConcurrency <= 0) {
      await new Promise<void>(resolve => this._queue.push(resolve))
    }
    this._maxConcurrency--
  }

  release() {
    this._maxConcurrency++;
    this._queue.shift()?.();
  }
}
// print 1000

参考

  1. TS playground
  2. https://blog.mayflower.de/6369-javascript-mutex-synchronizing-async-operations.html
  3. https://blog.jcoglan.com/2016/07/12/mutexes-and-javascript/


经验分享      JS

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!