源代码地址:https://github.com/GoogleChromeLabs/comlink ;惊艳!

Comlink 是用来解决 web worker通信大量模板代码的一种方案,通过RPC调用的思路来对上层使用者屏蔽通信逻辑

img

如果不使用 comlink ,我们可能需要这么写。。

// main.js
const worer = new Worker('worker.js');
worker.addEventListener('message', e => {
    // Do something...
});
worker.postMessage({
    // ...
});



// worker.js
onMessage = (e) => {
    if (matchCond1(e)) {
        congsole.log('.....');
        postMessage('...');
    }
    if (matchCond2()) {
        congsole.log('.....');
        postMessage('...');
    }
}

整体流程图

流程图一览

前置知识

MessageChannel [Class]

https://developer.mozilla.org/zh-CN/docs/Web/API/MessageChannel

可以创建一个消息传输的channel,channel两端是双向的,用来传数据

new MessageChannel()` 返回两个 `MessagePort

MessagePort [Interface]

https://developer.mozilla.org/zh-CN/docs/Web/API/MessagePort

EventTarget的超集,拥有 postMessage、start(开始发送端口中的消息)、close(断开端口)的能力

Transferable [Interface]

https://developer.mozilla.org/zh-CN/docs/Web/API/Transferable

用在 postMessage 里的transfers入参,转移所有权到接收方(move语义)

满足 Transferable的类型:

  1. MessagePort
  2. ImageBitMap
  3. ArrayBuffer

简易Demo:https://stackblitz.com/edit/web-platform-fceuqj?file=myWorker.js

RPC请求与响应

Message会有如下几个Type,使用id来确保请求与响应的一一对应

  1. GET:从Worker中获取值
  2. SET:修改Worker中的值
  3. APPLY:调用Worker中的函数
  4. CONSTRUCT:new一个 Worker中的Class
  5. ENDPOINT:创建一个新的ENDPOINT用于通信
  6. RELEASE:释放该proxy,再请求会throw error

EndPoint - 消息传递的媒介

就是具备发送消息,接收消息能力的端口,重新做了下抽象。

  • Worker线程的EndPoint是worker线程中的全局对象self即可

  • 主线程的EndPoint是主线程的worker对象

MessagePort 也是 EndPoint类型,所以可以通过 new MessageChannel() 来创建任意多个端口进行消息的传递,而不局限于主线程与Worker线程之间

export interface Endpoint  {
  addEventListener(
    type: string,
    listener: EventListenerOrEventListenerObject,
    options?: {}
  ): void;

  removeEventListener(
    type: string,
    listener: EventListenerOrEventListenerObject,
    options?: {}
  ): void;

  postMessage(message: any, transfer?: Transferable[]): void;

  start?: () => void;
}

Worker - 响应端

使用 expose 来包装,接收不同的 message 来执行对应的操作

export function expose(obj: any, ep: Endpoint = self as any) {
  ep.addEventListener( message , function callback(ev: MessageEvent) {
    if (!ev || !ev.data) {
      return;
    }
    const { id, type, path } = {
      path: [] as string[],
      ...(ev.data as Message),
    };
    const argumentList = (ev.data.argumentList || []).map(fromWireValue);
    let returnValue;
    try {
      // 把序列化的prop path重新组合成完成的object
      const parent = path.slice(0, -1).reduce((obj, prop) => obj[prop], obj);
      const rawValue = path.reduce((obj, prop) => obj[prop], obj);
      switch (type) {
        case MessageType.GET:
          {
            returnValue = rawValue;
          }
          break;
        case MessageType.SET:
          {
            parent[path.slice(-1)[0]] = fromWireValue(ev.data.value);
            returnValue = true;
          }
          break;
        case MessageType.APPLY:
          {
            returnValue = rawValue.apply(parent, argumentList);
          }
          break;
        // other cases
        default:
          return;
      }
    } catch (value) {
      // 处理错误,标上特殊的symbol prop,之后 throwhandler会对齐进行处理
      returnValue = { value, [throwMarker]: 0 };
    }
    Promise.resolve(returnValue)
      .catch((value) => {
        return { value, [throwMarker]: 0 };
      })
      .then((returnValue) => {
        const [wireValue, transferables] = toWireValue(returnValue);
        ep.postMessage({ ...wireValue, id }, transferables);
      });
  } as any);
  if (ep.start) {
    ep.start();
  }
}

Main - 请求端

使用 requestResponseMessage 来发送消息(类型就是上面那些)

function requestResponseMessage(
  ep: Endpoint,
  msg: Message,
  transfers?: Transferable[]
): Promise<WireValue> {
  return new Promise((resolve) => {
    const id = generateUUID();
    // 接收响应回来的消息
    ep.addEventListener( message , function l(ev: MessageEvent) {
      if (!ev.data || !ev.data.id || ev.data.id !== id) {
        return;
      }
      ep.removeEventListener( message , l as any);
      resolve(ev.data);
    } as any);
    if (ep.start) {
      ep.start();
    }
    // 发送消息
    ep.postMessage({ id, ...msg }, transfers);
  });
}

Proxy代理

使用 createProxy 来包装 new Worker() 的结果,处理 get, set, apply, construct等钩子(与Message Type对应),根据不同的操作类型,发送不同的message到Worker,并处理对应的返回。

get处理

对远程object进行操作,最后一定会调用await获取异步最终值,这里使用的await,其实可以被get钩子捕获到,其对应的props为 then,所以只要识别到 then 这个props,然后进行RPC请求操作即可

function createProxy<T>(
  ep: Endpoint,
  path: (string | number | symbol)[] = [],
  target: object = function () {}
) {
    const proxy = new Proxy(target, {
    get(_target, prop) {
      if (prop === 'then' ) {
        if (path.length === 0) {
          // 如果访问的是根对象,直接返回即可
          return { then: () => proxy };
        }
        const r = requestResponseMessage(ep, {
          type: MessageType.GET,
          path: path.map((p) => p.toString()),
        }).then(fromWireValue);
        return r.then.bind(r);
      }
      // proxy的钩子只针对第一层,对于嵌套的object(path数组的length > 0),
      // 也需要嵌套地调用createProxy
      return createProxy(ep, [...path, prop]);
    },
  });
  return proxy;
}

proxy的释放

proxy不用了要及时释放,不然会一直无法被gc,释放很简单,调用特殊prop即可

// 主线程
get(_target, prop) {
  if (prop === releaseProxy) {
      return () => {
        return requestResponseMessage(ep, {
          type: MessageType.RELEASE,
          path: path.map((p) => p.toString()),
        }).then(() => {
          closeEndPoint(ep);
          isProxyReleased = true;
        });
    };
  }    
},

// worker侧
Promise.resolve(returnValue)
  .catch((value) => {
    return { value, [throwMarker]: 0 };
  })
  .then((returnValue) => {
    const [wireValue, transferables] = toWireValue(returnValue);
    ep.postMessage({ ...wireValue, id }, transferables);
    if (type === MessageType.RELEASE) {
      // detach and deactive after sending release response above.
      ep.removeEventListener( message , callback as any);
      closeEndPoint(ep);
    }
});

数据传递

线程通信,其中的数据需要进行序列化传递

转换方式

  1. 传入channel:ToWireValue
  2. 从channel中取出:FromWireValue

WireValue分为两种

  1. RAW:无需转换,直接在channel里传入传出
  2. HANDLER:需要对应的transferhandler来进行 转换
export const enum WireValueType {
  RAW =  RAW ,
  PROXY =  PROXY ,
  THROW =  THROW ,
  HANDLER =  HANDLER ,
}

export interface RawWireValue {
  id?: string;
  type: WireValueType.RAW;
  value: {};
}

export interface HandlerWireValue {
  id?: string;
  type: WireValueType.HANDLER;
  name: string; // 记录应该用哪一种handler进行反序列化
  value: unknown;
}

export type WireValue = RawWireValue | HandlerWireValue;

内部实现

  1. TransferHandler,进行类型的转换(原始类型 <-> 序列化类型)
export interface TransferHandler<T, S> {
  // 判断是否可以处理该原始类型T
  canHandle(value: unknown): value is T;
  // 将原始类型T 序列化为 序列化类型S
  serialize(value: T): [S, Transferable[]];
  // 将序列化类型S 反解为 原始类型T
  deserialize(value: S): T;
}

读取方式

  1. Clone (Plain Object)[https://github.com/GoogleChromeLabs/comlink/blob/main/structured-clone-table.md]
  2. Proxy (Method, Function, Class)
  3. Transfer (ArrayBuffer, BitMap, MessagePort)

函数类型

函数类型无法被序列化,所以不能在线程中传递,需要再采用 proxy 的 形式来进行,(这里套娃了)

// 将对应属性标记为true,这样可以在proxyTransferHandler里对其进行proxy处理
export function proxy<T>(obj: T): T & ProxyMarked {
  return Object.assign(obj, { [proxyMarker]: true }) as any;
}

/**
 * Internal transfer handle to handle objects marked to proxy.
 */
const proxyTransferHandler: TransferHandler<object, MessagePort> = {
  canHandle: (val): val is ProxyMarked =>
    isObject(val) && (val as ProxyMarked)[proxyMarker],
  serialize(obj) {
    // 因为这个proxy关系是新的,所以要新建一对MessagePort来进行消息通信
    const { port1, port2 } = new MessageChannel();
    expose(obj, port1); // port1留在线程1,并进行消息接收的封装
    return [port2, [port2]]; // 把port2送往线程2
  },
  deserialize(port) { // 拿到传过来的port2,创建proxy与port1开始通信
    port.start();
    return wrap(port);
  },
};

所有权转移

对于像图像,文件等二进制数据,因为底层是 ArrayBuffer ,所以使用 transfer 的方式去传递数据是更优的

const transferCache = new WeakMap<any, Transferable[]>();
// 标记为transfer,会把底层数据放到cache里
export function transfer<T>(obj: T, transfers: Transferable[]): T {
  transferCache.set(obj, transfers);
  return obj;
}


function toWireValue(value: any): [WireValue, Transferable[]] {
  for (const [name, handler] of transferHandlers) {
    if (handler.canHandle(value)) {
      const [serializedValue, transferables] = handler.serialize(value);
      return [
        {
          type: WireValueType.HANDLER,
          name,
          value: serializedValue,
        },
        transferables,
      ];
    }
  }
  // 进行 toWireValue 序列化时,从cache里拿出obj对应的底层数据,传递给其他线程
  return [
    {
      type: WireValueType.RAW,
      value,
    },
    transferCache.get(value) || [],
  ];
}

throw错误

throw出的错误为了能在主线程去throw,需要特殊处理

/**
 * Internal transfer handler to handle thrown exceptions.
 */
const throwTransferHandler: TransferHandler<
  ThrownValue,
  SerializedThrownValue
> = {
  canHandle: (value): value is ThrownValue =>
    isObject(value) && throwMarker in value,
  serialize({ value }) {
    let serialized: SerializedThrownValue;
    if (value instanceof Error) {
      serialized = {
        isError: true,
        value: {
          message: value.message,
          name: value.name,
          stack: value.stack,
        },
      };
    } else {
      serialized = { isError: false, value };
    }
    return [serialized, []];
  },
  deserialize(serialized) {
    if (serialized.isError) {
      throw Object.assign(
        new Error(serialized.value.message),
        serialized.value
      );
    }
    throw serialized.value;
  },
};


经验分享      JS

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!