Vastiny

Nov 27, 2019

分析异步转同步库 deasync

最近看到一个实现异步方法转换为同步方法的库 deasync。正好看到有些场景可以用到,所以分析一下库的源代码。
这个库的源码在这里 deasync。最好是看了我上一篇的介绍 NodeJS debug C++ 的那篇文章,同样的内容我就不注释了。

源代码

这其实是一个 NodeJS 的 C++ 的插件。核心代码只有下面的一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <uv.h>
#include <v8.h>
#include <napi.h>
#include <uv.h>

using namespace Napi;

Napi::Value Run(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
Napi::HandleScope scope(env);
uv_run(uv_default_loop(), UV_RUN_ONCE);
return env.Undefined();
}

static Napi::Object init(Napi::Env env, Napi::Object exports) {
exports.Set(Napi::String::New(env, "run"), Napi::Function::New(env, Run));
return exports;
}

NODE_API_MODULE(deasync, init)

这里面用的是 N-API 的接口,但这个库里面的代码不能正常的编译出 Debug 模块,所以把这核心代码拿出来又写了一个新的库,然后重新编译即可。

源码分析

直接看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <uv.h>
#include <v8.h>
#include <napi.h>
#include <node.h>

using namespace Napi;

/**
* CallbackInfo 是一个参数类,如果有调用方法有传参,可以通过 info[0]、info[1]、info.Length() 来获取
*/
Napi::Value Run(const Napi::CallbackInfo &info)
{
// 每个调用方法都能通过 CallbackInfo 类获取到 env,也可以获取 Context ,通过 env 获取到值和操作
Napi::Env env = info.Env();

// HandleScope 句柄作用域,控制对象的生命周期,是 napi_open_handle_scope 的封装
// 当一个句柄作用域对象被删除时,或者这个 Run 函数结束了,作用域内的对象就算被别的地方引用,也会被垃圾回收器随时释放
Napi::HandleScope scope(env);

// uv_runL 运行 Event Loop,这里加了 UV_RUN_ONCE,说明只轮询 I/O 一次
// 如果是 UV_RUN_NOWAIT 标识,轮询 I/O 一次但不会阻塞
// 如果 uv_run 里面没有待执行的任务回调,就会等待,算出 timeout 时间,这里就是异步转同步的关键
// 然后调用 epoll_pwait 去让整个程序等待
// 返回是否成功
// -----
// uv_default_loop: 返回初始化过的默认 event loop
// 返回一个 loop 结构体,记录了初始化过的 loop 信息
uv_run(uv_default_loop(), UV_RUN_ONCE);

// 返回 js value: undefined
return env.Undefined();
}

static Napi::Object init(Napi::Env env, Napi::Object exports)
{
// 给 Object 对象 exports 设置属性, exports 相当于 js 中的 object {},相当于
// const exports = {}
// exports.run = function Run() { ... }
exports.Set(Napi::String::New(env, "run"), Napi::Function::New(env, Run));
return exports;
}

NODE_API_MODULE(NODE_GYP_MODULE_NAME, init)

uv_run 显式的运行一次 EventLoop, 并且暴露 run 方法,然后用在下面的 js 代码中:

1
2
3
4
5
6
7
const deasyncAddon = require('./build/Debug/hello.node')
const loopWhile = function (pred) {
while (pred()) {
process._tickCallback()
if (pred()) deasyncAddon.run()
}
}

先不管 process._tickCallback(),看起来就像是写了一个阻塞的循环代码,只有 pred() 或者 isEnd() 结束后才会中断:

1
2
3
4
5
const wait = function (time) {
const start = Date.now()
const isEnd = () => (Date.now() - start) < time
while (isEnd()) {}
}

但几乎完全不一样。pred 这个回调方法完成后,就会终止,而不像 wait 方法让整个线程都阻塞,干不了其它事情。

在 loopWhite 方法中, process._tickCallback() 的使用也很巧妙,在 NodeJS 源码中有说:

1
2
3
4
5
process.nextTick = nextTick;
// Used to emulate a tick manually in the JS land.
// A better name for this function would be `runNextTicks` but
// it has been exposed to the process object so we keep this legacy name
process._tickCallback = runNextTicks;

在正常的代码中出现 _tickCallback(),则马上执行之前所有的 nextTicks 任务,接着执行同步任务。但我看 runNextTicks 源码的时候,发现其实是执行 runMicrotasks 😂。所以会把 Process.nextTick、Promise.then catch finally、MutationObserver 这些全部执行完毕后,才会进入下一步。

所以如果代码是这样的:

1
2
3
4
5
6
7
8
9
10
11
setImmediate(() => { console.log('setImmediate 1')})
process.nextTick(() => { console.log('nextTick 1') })
Promise.resolve().then(() => { console.log('promise then 1')})
process._tickCallback()
console.log('sync 1')

// result:
// nextTick 1
// promise then 1
// sync 1
// setImmediate 1

为什么会这样呢?因为 nextTicks 的名字有误,nodejs 网站上面有说, setImmediate 意思应该互换 nextTick。nextTick 才是在同一个阶段立即执行, 而 setImmediate 是在 EventLoop 接下来执行,或者在 tick 上触发。(文末参考链接)

使用例子

首先 deasync 库把 loopWhile 封装成一个等待方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
function deasync(fn) {
return function () {
let done = false
let args = Array.prototype.slice.apply(arguments).concat(cb)
let err
let res

fn.apply(this, args)
loopWhile(() => !done)
if (err) throw err

return res

function cb(e, r) {
err = e
res = r
done = true
}
}
}

然后可以写一个简单的阻塞 sleep 方法:

1
2
3
4
5
const sleep = deasync(function (timeout, done) {
setTimeout(done, timeout)
})
sleep(1000) // 阻塞 1s
console.log('run here!')

更多可以去看看 deasync 中的例子。

un promisify

deasync 默认只处理带 callback 的方法。如果是想要处理 promise 的方法,需要包裹一层转化一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const unPromisify = fn => {
return (...args) => {
if (args.length<1) throw new Error('unPromisify arguments length must at least 2.')
const cb = args.pop()
fn(...args).then(data => cb(null, data)).catch(err => cb(err, null))
}
}

async function say(e,e2) {
console.log('good', e, e2)
return 'r'
}

const saySync = deasync(unPromisify(say))
try {
const c = saySync('allen', 'alex')
console.log('return:', c)
} catch (e) {
console.error(e)
}
console.log('done')

同样类型的库

还有一个类似功能的库:node-fibers,不过它的实现原理不同,并且复杂得多。

参考

OLDER > < NEWER