源代码地址:https://github.com/GoogleChromeLabs/comlink ;惊艳!
何为Comlink
Comlink
是用来解决 web worker通信大量模板代码的一种方案,通过RPC调用的思路来对上层使用者屏蔽通信逻辑
如果不使用 comlink
,我们可能需要这么写。。
// main.js
const worer = new Worker('worker.js');
worker.addEventListener('message', e => {
// Do something...
});
worker.postMessage({
// ...
});
// worker.js
onMessage = (e) => {
if (matchCond1(e)) {
congsole.log('.....');
postMessage('...');
}
if (matchCond2()) {
congsole.log('.....');
postMessage('...');
}
}
整体流程图
前置知识
MessageChannel [Class]
https://developer.mozilla.org/zh-CN/docs/Web/API/MessageChannel
可以创建一个消息传输的channel,channel两端是双向的,用来传数据
new MessageChannel()` 返回两个 `MessagePort
MessagePort [Interface]
https://developer.mozilla.org/zh-CN/docs/Web/API/MessagePort
EventTarget的超集,拥有 postMessage、start(开始发送端口中的消息)、close(断开端口)的能力
Transferable [Interface]
https://developer.mozilla.org/zh-CN/docs/Web/API/Transferable
用在 postMessage
里的transfers入参,转移所有权到接收方(move语义)
满足 Transferable的类型:
- MessagePort
- ImageBitMap
- ArrayBuffer
简易Demo:https://stackblitz.com/edit/web-platform-fceuqj?file=myWorker.js
RPC请求与响应
Message会有如下几个Type,使用id来确保请求与响应的一一对应
- GET:从Worker中获取值
- SET:修改Worker中的值
- APPLY:调用Worker中的函数
- CONSTRUCT:new一个 Worker中的Class
- ENDPOINT:创建一个新的ENDPOINT用于通信
- RELEASE:释放该proxy,再请求会throw error
EndPoint - 消息传递的媒介
就是具备发送消息,接收消息能力的端口,重新做了下抽象。
Worker线程的EndPoint是worker线程中的全局对象self即可
主线程的EndPoint是主线程的worker对象
MessagePort 也是 EndPoint类型,所以可以通过 new MessageChannel() 来创建任意多个端口进行消息的传递,而不局限于主线程与Worker线程之间
export interface Endpoint {
addEventListener(
type: string,
listener: EventListenerOrEventListenerObject,
options?: {}
): void;
removeEventListener(
type: string,
listener: EventListenerOrEventListenerObject,
options?: {}
): void;
postMessage(message: any, transfer?: Transferable[]): void;
start?: () => void;
}
Worker - 响应端
使用 expose
来包装,接收不同的 message 来执行对应的操作
export function expose(obj: any, ep: Endpoint = self as any) {
ep.addEventListener( message , function callback(ev: MessageEvent) {
if (!ev || !ev.data) {
return;
}
const { id, type, path } = {
path: [] as string[],
...(ev.data as Message),
};
const argumentList = (ev.data.argumentList || []).map(fromWireValue);
let returnValue;
try {
// 把序列化的prop path重新组合成完成的object
const parent = path.slice(0, -1).reduce((obj, prop) => obj[prop], obj);
const rawValue = path.reduce((obj, prop) => obj[prop], obj);
switch (type) {
case MessageType.GET:
{
returnValue = rawValue;
}
break;
case MessageType.SET:
{
parent[path.slice(-1)[0]] = fromWireValue(ev.data.value);
returnValue = true;
}
break;
case MessageType.APPLY:
{
returnValue = rawValue.apply(parent, argumentList);
}
break;
// other cases
default:
return;
}
} catch (value) {
// 处理错误,标上特殊的symbol prop,之后 throwhandler会对齐进行处理
returnValue = { value, [throwMarker]: 0 };
}
Promise.resolve(returnValue)
.catch((value) => {
return { value, [throwMarker]: 0 };
})
.then((returnValue) => {
const [wireValue, transferables] = toWireValue(returnValue);
ep.postMessage({ ...wireValue, id }, transferables);
});
} as any);
if (ep.start) {
ep.start();
}
}
Main - 请求端
使用 requestResponseMessage
来发送消息(类型就是上面那些)
function requestResponseMessage(
ep: Endpoint,
msg: Message,
transfers?: Transferable[]
): Promise<WireValue> {
return new Promise((resolve) => {
const id = generateUUID();
// 接收响应回来的消息
ep.addEventListener( message , function l(ev: MessageEvent) {
if (!ev.data || !ev.data.id || ev.data.id !== id) {
return;
}
ep.removeEventListener( message , l as any);
resolve(ev.data);
} as any);
if (ep.start) {
ep.start();
}
// 发送消息
ep.postMessage({ id, ...msg }, transfers);
});
}
Proxy代理
使用 createProxy
来包装 new Worker()
的结果,处理 get, set, apply, construct等钩子(与Message Type对应),根据不同的操作类型,发送不同的message到Worker,并处理对应的返回。
get处理
对远程object进行操作,最后一定会调用await获取异步最终值,这里使用的await,其实可以被get钩子捕获到,其对应的props为 then
,所以只要识别到 then
这个props,然后进行RPC请求操作即可
function createProxy<T>(
ep: Endpoint,
path: (string | number | symbol)[] = [],
target: object = function () {}
) {
const proxy = new Proxy(target, {
get(_target, prop) {
if (prop === 'then' ) {
if (path.length === 0) {
// 如果访问的是根对象,直接返回即可
return { then: () => proxy };
}
const r = requestResponseMessage(ep, {
type: MessageType.GET,
path: path.map((p) => p.toString()),
}).then(fromWireValue);
return r.then.bind(r);
}
// proxy的钩子只针对第一层,对于嵌套的object(path数组的length > 0),
// 也需要嵌套地调用createProxy
return createProxy(ep, [...path, prop]);
},
});
return proxy;
}
proxy的释放
proxy不用了要及时释放,不然会一直无法被gc,释放很简单,调用特殊prop即可
// 主线程
get(_target, prop) {
if (prop === releaseProxy) {
return () => {
return requestResponseMessage(ep, {
type: MessageType.RELEASE,
path: path.map((p) => p.toString()),
}).then(() => {
closeEndPoint(ep);
isProxyReleased = true;
});
};
}
},
// worker侧
Promise.resolve(returnValue)
.catch((value) => {
return { value, [throwMarker]: 0 };
})
.then((returnValue) => {
const [wireValue, transferables] = toWireValue(returnValue);
ep.postMessage({ ...wireValue, id }, transferables);
if (type === MessageType.RELEASE) {
// detach and deactive after sending release response above.
ep.removeEventListener( message , callback as any);
closeEndPoint(ep);
}
});
数据传递
线程通信,其中的数据需要进行序列化传递
转换方式
- 传入channel:ToWireValue
- 从channel中取出:FromWireValue
WireValue分为两种
- RAW:无需转换,直接在channel里传入传出
- HANDLER:需要对应的transferhandler来进行 转换
export const enum WireValueType {
RAW = RAW ,
PROXY = PROXY ,
THROW = THROW ,
HANDLER = HANDLER ,
}
export interface RawWireValue {
id?: string;
type: WireValueType.RAW;
value: {};
}
export interface HandlerWireValue {
id?: string;
type: WireValueType.HANDLER;
name: string; // 记录应该用哪一种handler进行反序列化
value: unknown;
}
export type WireValue = RawWireValue | HandlerWireValue;
内部实现
- TransferHandler,进行类型的转换(原始类型 <-> 序列化类型)
export interface TransferHandler<T, S> {
// 判断是否可以处理该原始类型T
canHandle(value: unknown): value is T;
// 将原始类型T 序列化为 序列化类型S
serialize(value: T): [S, Transferable[]];
// 将序列化类型S 反解为 原始类型T
deserialize(value: S): T;
}
读取方式
- Clone (Plain Object)[https://github.com/GoogleChromeLabs/comlink/blob/main/structured-clone-table.md]
- Proxy (Method, Function, Class)
- Transfer (ArrayBuffer, BitMap, MessagePort)
函数类型
函数类型无法被序列化,所以不能在线程中传递,需要再采用 proxy 的 形式来进行,(这里套娃了)
// 将对应属性标记为true,这样可以在proxyTransferHandler里对其进行proxy处理
export function proxy<T>(obj: T): T & ProxyMarked {
return Object.assign(obj, { [proxyMarker]: true }) as any;
}
/**
* Internal transfer handle to handle objects marked to proxy.
*/
const proxyTransferHandler: TransferHandler<object, MessagePort> = {
canHandle: (val): val is ProxyMarked =>
isObject(val) && (val as ProxyMarked)[proxyMarker],
serialize(obj) {
// 因为这个proxy关系是新的,所以要新建一对MessagePort来进行消息通信
const { port1, port2 } = new MessageChannel();
expose(obj, port1); // port1留在线程1,并进行消息接收的封装
return [port2, [port2]]; // 把port2送往线程2
},
deserialize(port) { // 拿到传过来的port2,创建proxy与port1开始通信
port.start();
return wrap(port);
},
};
所有权转移
对于像图像,文件等二进制数据,因为底层是 ArrayBuffer
,所以使用 transfer 的方式去传递数据是更优的
const transferCache = new WeakMap<any, Transferable[]>();
// 标记为transfer,会把底层数据放到cache里
export function transfer<T>(obj: T, transfers: Transferable[]): T {
transferCache.set(obj, transfers);
return obj;
}
function toWireValue(value: any): [WireValue, Transferable[]] {
for (const [name, handler] of transferHandlers) {
if (handler.canHandle(value)) {
const [serializedValue, transferables] = handler.serialize(value);
return [
{
type: WireValueType.HANDLER,
name,
value: serializedValue,
},
transferables,
];
}
}
// 进行 toWireValue 序列化时,从cache里拿出obj对应的底层数据,传递给其他线程
return [
{
type: WireValueType.RAW,
value,
},
transferCache.get(value) || [],
];
}
throw错误
throw出的错误为了能在主线程去throw,需要特殊处理
/**
* Internal transfer handler to handle thrown exceptions.
*/
const throwTransferHandler: TransferHandler<
ThrownValue,
SerializedThrownValue
> = {
canHandle: (value): value is ThrownValue =>
isObject(value) && throwMarker in value,
serialize({ value }) {
let serialized: SerializedThrownValue;
if (value instanceof Error) {
serialized = {
isError: true,
value: {
message: value.message,
name: value.name,
stack: value.stack,
},
};
} else {
serialized = { isError: false, value };
}
return [serialized, []];
},
deserialize(serialized) {
if (serialized.isError) {
throw Object.assign(
new Error(serialized.value.message),
serialized.value
);
}
throw serialized.value;
},
};
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!