X

曜彤.手记

随记,关于互联网技术、产品与创业

  1. 一个例子
  2. Shared Linear Memory
  3. Atomic Memory Accesses
  4. Wait and Notify operators
  5. Fence operator

WebAssembly - Threading

本篇来看的提案是 - “Threading”,GitHub 链接在这里。该提案为 Wasm 提供了用于支持多线程处理的相关概念和指令,包括可以被多个模块实例共享的 “shared memory”;用于操作内存的原子指令;用于线程等待和唤醒的 waitnotify 指令;以及与内存序相关的 fence 指令等。Wasm 的多线程模型实现依赖于其所处的具体执行环境,比如在 Web 环境中,线程即对应 worker。

一个例子

我们来看提案中给出的例子:

(module
  (import "env" "memory" (memory 1 1 shared)) 
  (func $tryLockMutex (export "tryLockMutex") (param $mutexAddr i32) (result i32)
    (i32.atomic.rmw.cmpxchg
      (local.get $mutexAddr)
      (i32.const 0) 
      (i32.const 1)) 
    (i32.eqz)
  )

  ;; Lock a mutex at the given address, retrying until successful.
  (func (export "lockMutex") (param $mutexAddr i32)
    (block $done
      (loop $retry
        (call $tryLockMutex (local.get $mutexAddr))
        (br_if $done)

        ;; Wait for the other agent to finish with mutex.
        (memory.atomic.wait32
          (local.get $mutexAddr) ;; mutex address.
          (i32.const 1)          ;; expected value (1 => locked).
          (i64.const -1))        ;; infinite timeout.
        (drop)

        ;; Try to acquire the lock again.
        (br $retry)
      )
    )
  )

  ;; Unlock a mutex at the given address.
  (func (export "unlockMutex") (param $mutexAddr i32)
    ;; Unlock the mutex.
    (i32.atomic.store
      (local.get $mutexAddr) 
      (i32.const 0)) 

    ;; Notify one agent that is waiting on this lock.
    (drop
      (memory.atomic.notify
        (local.get $mutexAddr) 
        (i32.const 1)))   ;; Count of waiter.
  )
)

这个例子在 Wasm 中模拟了 mutex 同步原语,并借由相关指令实现了「条件变量」的部分功能。先来看 tryLockMutex 函数,该函数用于尝试为一个互斥量上锁,互斥量本身的值被直接存放在线性内存中,偏移位置由参数 $mutexAddr 指定。内存段通过添加标记 “shared” 被设置为可在多个 Wasm 实例间共享,这使得对内存段数据的任何修改都可以同时被多个 agent 观测到。为互斥量上锁的过程是通过 RMW( Read-Modify-Write)指令 cmpxchg(Compare-and-Exchange)实现的,这是一种常见的 mutex lock 实现方式。cmpxchg 指令是原子性的,它接收一个内存偏移地址、期望值、以及替换值。这里,若内存中互斥量的值为 0(unlocked),则表示其没有上锁,该指令会将互斥量更改为 1(locked),并返回值更新前的原始值(0);否则,指令直接返回读到的值(1)。函数最后的指令 i32.eqz 用于对 cmpxchg 指令返回的结果取反,以正确表示函数的执行状态(即返回 1 表示上锁成功,否则失败)。

函数 lockMutex 实现了完整的互斥量上锁过程。其内部的 loop 循环会重复调用 tryLockMutex 函数尝试上锁,若上锁失败(返回 0),则使用 wait 指令将当前线程阻塞直到被其他线程通过对应的 notify 指令唤醒,这是常见的条件变量使用方式。若上锁成功,则通过 br 指令跳出当前 block。指令 wait32 也接收三个参数,一个内存偏移地址、期望值、以及阻塞超时时间。这里的内存地址便为互斥量的存放地址,值为 “-1” 的超时时间意味着永不超时,即线程会一直阻塞直到被再次唤醒。

函数 unlockMutex 的内容比较简单,它原子性地修改互斥量所在的内存值,并通过调用 notify 指令唤醒指定数量的,等待为同一个互斥量加锁的线程。

上述 Wasm 函数在 JavaScript 宿主上的具体使用方式可以参考如下 Node.js 代码:

import { readFileSync } from 'node:fs'
import { fileURLToPath } from 'url'
import { Worker, isMainThread, workerData } from 'node:worker_threads'

const wasmFilePath = '...'  // Path to the Wasm module file.
const wasmBinary = readFileSync(wasmFilePath)
const memory = new WebAssembly.Memory({ initial: 1, maximum: 1, shared: true, })
const mutexAddr = 0

const doSomeBusyStuff = (msg) => {
  return new Promise(resolve => {
    console.log(`${msg} is working ... (Got the lock)`)
    setTimeout(() => {
      console.log(`${msg} is done ...`)
      resolve()
    }, 2000)
  })
}

if (isMainThread) {  // Main thread.
  new Worker(fileURLToPath(import.meta.url), { workerData: memory, })
  const { instance } = await WebAssembly.instantiate(wasmBinary, { env: { memory, } })
  if (instance.exports.tryLockMutex(mutexAddr)) {  // Try to get the lock.
    await doSomeBusyStuff('Main thread')
    instance.exports.unlockMutex(mutexAddr)  // Release the lock, and wake other threads.
  }
} else {  // Worker thread.
  const { instance } = await WebAssembly.instantiate(wasmBinary, { env: { memory: workerData, } })
  instance.exports.lockMutex(mutexAddr)  // Blocking on the mutex, waiting to be woken.
  await doSomeBusyStuff('Worker thread')
  instance.exports.unlockMutex(mutexAddr)
}

Shared Linear Memory

  • 需要显式指定内存段的最大尺寸;
  • 可以通过指令 memory.grow 扩展内存段的大小;
  • 可以通过指令 memory.size 获得当前共享内存段的大小;
  • 指令是“顺序一致(sequentially consistent)”的,即对于每个线程来说,指令的实际运行顺序与代码保持一致。

Atomic Memory Accesses

  • 指令可以应用在共享以及非共享线性内存段上;
  • 非对齐(misaligned)地址上的原子内存访问会导致 trap,比如:
(module
  (memory 1 1)
  (func (export "foo")
    (i32.atomic.store (i32.const 1) (i32.const 1))
  )
)  
  • 指令是“顺序一致”的。

Wait and Notify operators

  • 非对齐(misaligned)地址上的操作会导致 trap;
  • 非共享内存段上的 wait 操作会导致 trap;
  • Web 平台上可通过 Atomics.waitAtomics.notify 阻塞或唤醒线程(worker)。

Fence operator

Fence 指令 atomic.fence 主要用于防止编译器对 Wasm 指令进行重排序,一般应用在对非原子指令执行顺序有要求的场景,目前仅支持“顺序一致”。




评论 | Comments


Loading ...