作为一门单线程语言,JS有锁这种概念吗。诚然,因为JS单线程运行,而我们一般对变量的并发操作都是同步的,但如果是异步场景,那就两说了。
并发问题构造
以最经典的 counter
问题为例子,来看下,每个worker都会从server拿取数据,并加到全局的counter上。同时启动100个worker,查看最终的counter值。
const delay = () => new Promise(resolve => setTimeout(resolve, 0))
const fetchFromServer = () => new Promise(resolve => setTimeout(() => resolve(10), Math.random() * 1000))
async function updateObj(patch) {
const cnt = globalObj.counter
await delay()
globalObj.counter = cnt + patch
}
const globalObj = {
counter: 0,
}
async function worker() {
const patch = await fetchFromServer()
globalObj.counter += patch // 并发读写操作
}
Promise.all(Array.from({length: 100}, _ => worker()))
.then(() => console.log(globalObj)) // => 1000
上面的并发代码为什么能输出正确的结果呢?仔细分析,我们的并发读写操作只有一行,那就是 globalObj.counter += patch
这一自增操作,而正因为JS是单线程的,所以这个自增操作不会被拆开,所以多个Promise去修改变量,实际上也是串行的操作。那如果我们进行构造,让这个自增动作的中间,让出CPU,那就使得自增不是原子操作。这里我们使用 await
来实现
const delay = () => new Promise(resolve => setTimeout(resolve, 0))
const fetchFromServer = () => new Promise(resolve => setTimeout(() => resolve(10), Math.random() * 1000))
async function updateObj(patch) {
const cnt = globalObj.counter
await delay()
globalObj.counter = cnt + patch
}
const globalObj = {
counter: 0,
}
async function worker() {
const patch = await fetchFromServer()
await updateObj(patch)
}
Promise.all(Array.from({length: 100}, _ => worker()))
.then(() => console.log(globalObj)) // => what?
这样一来,输出就不固定了,也就复现出了其他多线程语言的counter问题。所以回到最前面的问题,JS需要锁吗?答案是需要的
预期效果
实现一个互斥锁 Mutex
,它拥有获取和释放两个方法
interface IMutex {
acquire: () => Promise<void>; // 不断轮询直到获取锁为止
release: () => void; // 释放锁
}
从而能够让第二个例子的输出始终为 1000
const mutex = new Mutex()
async function worker() {
const patch = await fetchFromServer()
await mutex.acquire()
await updateObj(patch)
mutex.release()
}
// the rest
Promise.all(Array.from({length: 100}, _ => worker()))
.then(() => console.log(globalObj)) // => always 1000
锁的实现
轮询
其他语言最简单的实现锁的方法就是自旋锁(spin lock
),通过不停的 while
循环并查看锁的状态,吃光CPU,这对于单线程的JS肯定是不能这么玩的。
在JS上,最简单的实现轮询方式那就是 setInterval
,把查询操作塞到 eventloop
里,我们来改造一下
class SpinMutex implements IMutex {
private _locked = false
acquire() {
return new Promise<void>(resolve => {
const timer = setInterval(() => {
if (!this._locked) {
this._locked = true
clearInterval(timer)
resolve()
}
}, 0)
})
}
release() {
this._locked = false
}
}
// print 1000
事件驱动
使用轮询会把大量重复的 callback
塞到 eventloop
里,显然这部分冗余是可以消除的,那就是自己实现一个任务队列,把 进入临界区
这一操作push到队列中,当目前的任务执行完毕,便再从队头的任务取出来执行。如此,便取代了轮询这一低效的操作。事实上,因为使用了队列,所以还可以做到控制并发量的效果(Mutex相当于最大并发量为1的特殊情况)
class EventMutex implements IMutex {
private _queue: Function[] = [];
private _maxConcurrency = 1; // 最大并发量为1
async acquire() {
if (this._maxConcurrency <= 0) {
await new Promise<void>(resolve => this._queue.push(resolve))
}
this._maxConcurrency--
}
release() {
this._maxConcurrency++;
this._queue.shift()?.();
}
}
// print 1000
参考
- TS playground
- https://blog.mayflower.de/6369-javascript-mutex-synchronizing-async-operations.html
- https://blog.jcoglan.com/2016/07/12/mutexes-and-javascript/
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!