Node异步流程控制实践与总结

从 “async” 到 async

Posted by Ray on 2018-03-06

Node的异步概念

理解异步非阻塞

提到Node,异步非阻塞会是第一个需要你理解的概念。很多人会把这实际上是两个概念的词混为一谈,认为异步就是非阻塞的,而同步就是阻塞的。从实际的效果出发,异步IO和非阻塞IO实际上都能达到我们对于IO繁重的网络应用并行IO的追求。但是实际上这是两个很不一样的概念。

从操作系统的内核角度出发,I/O调用只有两种方式,阻塞和非阻塞。二者的区别在于,对于使用阻塞IO调用,应用程序需要等待IO的整个过程都全部完成,即完成整个IO目的,此期间CPU进行等待,无法得到充分的利用。而对于使用非阻塞IO调用来说,应用程序发起IO请求之后不等待数据就立即返回,接下来的CPU时间片可用于其他任务,由于整个IO的过程并没有完成,所以还需要使用轮询技术去试探数据是否完整准备好。关于轮询技术细节和发展,此处不过多赘述,很推荐朴灵老师《深入浅出NodeJs》的第三章。

不难理解,从应用程序的角度出发,我不管你操作系统内核是阻塞的IO调用还是非阻塞的IO调用,只要是我要的数据并没有给我,那么这就是同步的,因为我依旧是在等数据。所以对于这种情况下,应用程序的那“一根筋”就可以选择用同步还是异步的方式去面对该情况。同步即等待操作系统给到数据再进行下面的代码(单线程),异步即发出请求之后也立即返回,用某一种方式注册未完成的任务(回调函数)然后继续往下执行代码。

理解进程,线程,协程

为了使多个程序能够并发(同一时刻只有一个在运行,时间维度稍微拉长,就会感觉起来像多个同时运行)便有了这个在操作系统中能够独立运行并作为资源分配的基本单位

进程是资源分配的基本单位,进程的调度涉及到的内容比较多(存储空间,CPU,I/O资源等,进程现场保护),调度开销较大,在并发的切换过程效率较低。为了更高效的进行调度,提出了比进程更轻量的独立运行和调度的基本单位线程。最主要的一点同一个进程的多个线程共享进程的资源,这就会暴露出一个多线程编程中需要加入多线程的锁机制来控制资源的互斥性(同时写变量冲突)。线程调度能大幅度减小调度的成本(相对于进程来说),线程的切换不会引起进程的切换,但是毕竟还是有成本。

面对着线程相关的问题,出现了协程。协程是用户模式下的轻量级线程操作系统内核对协程一无所知,协程的调度完全有应用程序来控制,操作系统不管这部分的调度。

协程的特点在于是一个线程执行,因此最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。第二大优势就是不需要多线程的锁机制,因为只有一个线程,就也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

依据上述概念本身我们可能可以得出一种暂时性的结论:考虑到利用多核CPU,并且充分发挥协程的高效率,又可获得极高的性能,面向开发人员最简单的方法是多进程+协程,既充分利用多核

在Node中利用多核CPU的子进程文档

回调函数问题

在Node中每一个异步的IO回调函数并不是由开发人员所控制主动执行的。

那么对于Node的异步IO,在我们最常使用的异步回调的形式下,我们发出调用到回调函数执行这中间发生了什么?

整个过程可简单的抽象成四个基本要素:IO线程池观察者请求对象,以及事件循环,盗用《深入浅出NodeJS》的Windows借用IOCP实现异步回调过程的一张图片:

其中所要执行的异步回调函数以及相关的所有状态参数会被封装成一个请求对象然后被推入到IO线程池中,当操作系统执行完IO得到结果之后会将数据放入请求对象中,并归还当前线程至线程池,通知IOCP完成了IO过程,然后事件循环IO观察者中得到已经可以执行的请求对象中的回调,灌注IO数据结果开始执行。

Node本身是多线程的,开发人员的JS代码单线程化身为一个老板,实现高效的异步逻辑依靠的是Node机制内部的各个线程池,模拟出了一个异步非阻塞的特点。呈现在开发人员面前的是表现形式为各种各样的callback组成的一个原生编程风格

异步编程与“回调地狱”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const fs = require('fs')

fs.readFile("./test1.txt", "utf-8", function(err,content1){
if (err) {
console.log(err)
} else {
fs.readFile(content1, "utf-8", function(err,content2){
if (err) {
console.log(err);
} else {
fs.readFile(content2, "utf-8", function(err,content3){
if (err) {
console.log(err);
} else {
console.log(content3)
}
});
}
});
}
});

console.log('主线程')


try {
console.log(content3)
} catch(e) {
console.log("还没有获取到content3!");
}

读取的每一个 .txt 文件中的内容是要读取的下一个文件的路径地址,最后一个txt文件(test3.txt)中的内容是“callback hell is not finished……”

打印结果:

1
2
3
主线程
还没有获取到content3!
callback hell is not finished......

可以理解为Node代码一根筋的往下想尽快结束所谓的主线程,所以遇到设计异步的就自动忽略并跳过为了往下执行,所以出现了第一句非异步的打印操作,打印“主线程”,再往下执行遇到需要打印 content3 这个变量的时候,主线程就“懵”了,因为命名空间内并没有获取到任何 content3 的数据,甚至在主线程命名空间内都没有定义这个变量,如果不用 try-catch 那么应该会报 “content3 is not defined”的错误。

此外,callback hell 一览无余,一味地因为依赖而采用嵌套回调函数的方式,哪怕是上述代码那么简单的一个原子性的操作都会被这种“横向发展”的代码和无休止的大括号嵌套让业务逻辑代码丧失掉可维护性和可读性。

为了避免这种回调地狱,解决问题的方案和第三方模块就开始层出不穷百花齐放了。

这个async不是ES2017的async

async是一个十分强大,功能十分全面提供异步编程解决法案的一个第三方npm模块。也是我所接触的公司中的项目中大范围使用的。下面是关于这个模块的常用函数使用介绍,先感受一下。

流程控制函数

  • async.parallel(tasks,callback)
    • tasks 可以是一个数组也可以是个对象,他的数组元素值或者对象的属性值就是一个一个异步的方法。

parallel方法用于并行执行多个方法,所有传入的方法都是立即执行,方法之间没有数据传递。传递给最终callback的数组中的数据按照tasks中声明的顺序,而不是执行完成的顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//以数组形式传入需要执行的多个方法
async.parallel([
function(callback){//每个function均需要传入一个错误优先的callback
// 异步函数1,比如 fs.readFile(path,callback)
},
function(callback){
// 异步函数2
}
],
//最终回调
function(err, results){
// 当tasks中的任一方法发生错误,即回调形式为callback('错误信息')时,错误将被传递给err参数,未发生错误err参数为空
if(err){
console.log(err)
}else{
let one = results[0];
let two = results[1];
//你的各种操作
}
// results中为数组中,两个方法的结果数组:[异步1的结果, 异步2的结果] ,即使第二个方法先执行完成,其结果也是在第一个方法结果之后
});

//以object对象形式传入需要执行的多个方法
async.parallel({
one: function(callback){
// 异步函数1
},
two: function(callback){
// 异步函数2
}
},
function(err, results) {
// 当tasks中的任一方法发生错误,即回调形式为callback('错误信息')时,错误将被传递给err参数,未发生错误err参数为空
// // results 现在等于: {one: 异步1的结果, two: 异步2的结果}
});
  • 使用时所要注意的事项:
    • 当tasks中的任一方法发生错误时,错误将被传递给最终回调函数的err参数,未发生错误err参数为空。
    • tasks用数组的写法,即使第二个方法先执行完成,其结果也是在第一个方法结果之后,两个方法的结果数组:[异步1的结果, 异步2的结果]

个人感受:这个方法的大量使用让我觉得当一个要展示很多方面的信息的首页时,解耦成了代码可读性的最关键因素,亲身体会的是使用这个方法在企业业务逻辑中理想情况是在 tasks 中注册的并行任务得到的结果最好能够直接使用,而不是在第一个async.parallel的最终回调中依旧需要依赖得到的结果再进行下个系列的异步操作,因为这样导致的结果直接就变成了代码继续向着横向发展,比原生的 callback hell 并没有要好到哪里去。篇幅原因就不展示实际代码了,总之虽然结果流程得到了一个较为明确的控制,但是依旧没有良好的可读性

  • async.series(tasks,callback)

series方法用于依次执行多个方法,一个方法执行完毕后才会进入下一方法,方法之间没有数据传递!!

参数和形式与上面的 async.parallel(tasks,callback)一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//以数组形式传入需要执行的多个方法
async.series([
function(callback){
fs.readFile(path1,callback)
},
function(callback){
fs.readFile(path2,callback)
}
],
// 可选的最终回调
function(err, results){
// 当tasks中的任一方法发生错误,即回调形式为callback('错误信息')时,错误将被传递给err参数,未发生错误err参数为空
// results中为数组中两个方法的结果数组:['one', 'two']
});

这个方法在 tasks 中注册的异步函数之间虽然没有数据传递,但是这个方法控制了这些个异步方法的执行顺序,并且只要一个函数执行失败了接下来的函数就不会再执行了,并且把 err 传递到最终的回调函数中的 err 参数中。正如它的名字 “series”所说,这个方法有点数据库中的事务控制的意思,只不过原生不支持回滚罢了。

  • async.waterfall(tasks,callback)

waterfall方法与series方法类似用于依次执行多个方法,一个方法执行完毕后才会进入下一方法,不同与series方法的是,waterfall之间有数据传递,前一个函数的输出为后一个函数的输入。waterfall的多个方法只能以数组形式传入,不支持object对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async.waterfall([
function(callback) {
callback(null, 'one', 'two');
},
function(arg1, arg2, callback) {
// arg1 现在是 'one', arg2 现在是 'two'
callback(null, 'three');
},
function(arg1, callback) {
// arg1 现在是 'three'
callback(null, 'done');
}
], function (err, result) {
//执行的任务中方法回调err参数时,将被传递至本方法的err参数
// 参数result为最后一个方法的回调结果'done'
});

因为 tasks 中注册的异步函数数组中前一个函数的输出作为后一个输入,很自然的就可以想到可以通过前一个函数传递“处理成功信号”在第二个函数中进行判断来进行一系列完整的简单类似于事务控制的逻辑操作。

  • async.auto(tasks,callback)

auto方法根据传入的任务类型选择最佳的执行方式。不依赖于其它任务的方法将并发执行,依赖于其它任务的方法将在其执行完成后执行。类似于“依赖注入”概念。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async.auto({
getData: function(callback){
//一个取数据的方法
// 与makeFolder方法并行执行
callback(null, 'data', 'converted to array');
},
makeFolder: function(callback){
// 一个创建文件夹的方法
// 与make_folder方法并行执行
callback(null, 'folder');
},
writeFile: ['getData', 'makeFolder', function(callback, results){
// 此方法在等待getData方法和makeFolder执行完成后执行,并且在results中拿到依赖函数的数据
callback(null, 'filename');
}],
sendEmail: ['writeFile', function(callback, results){
// 等待writeFile执行完成后执行,results中拿到依赖项的数据
callback(null, {'file':results.writeFile, 'email':'[email protected]'});
}]
}, function(err, results) {
console.log('err = ', err);
console.log('results = ', results);
});

个人评价:喜欢这种方法,有清晰的可读性,依赖规则以及控制一目了然,很可惜的是在我们的代码里面并没有使用。缺点是相比较我们的最终解决方案的优雅,这个还是会有可能嵌套很多层的大括号的方式有它本身的劣势。

异步集合操作

  • async.each(arr,iterator(item, callback),callback)

对数组arr中的每一项执行iterator操作。iterator方法中会传一个当前执行的项及一个回调方法。each方法中所有对象是并行执行的。对数组中每一项进行 iterator 函数处理,如果有一项出错则最终的回调的 err 就回事该 err。但是,出错并不会影响到其他的数组元素执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const async = require('async')
const fs = require('fs')
let arr = ['./Test/file1.txt',"./Test/file2.txt","./Test/file3.txt"]
let iterator = (item,callback)=>{
fs.readFile(item,"utf-8",(err,results)=>{
if(item === "./Test/file2.txt"){
callback(new Error('wrong'))
}else{
console.log(results);
callback(null,results)
}
})
}
async.each(arr,iterator,function(err){
if(err){
console.log(err)
}
})

打印结果:

1
2
3
4
5
3
Error: wrong
at fs.readFile (/Users/liulei/Desktop/asyncEach/test.js:10:26)
at FSReqWrap.readFileAfterClose [as oncomplete] (fs.js:511:3)
1

可见,由于并发的原因,即是第二项出错,也不会影响其余的元素执行。如果想要让数组中的元素按照顺序执行,并且一旦一个出错,后面的数组元素都将不会执行的情况应该用另一个函数 async.eachSeeries(arr,iterator(item, callback),callback),用法什么的都一样,这里就不赘述了。

此外,each方法的最终回调函数可以看出来的是,并不会被传入任何结果,所以最终的回调函数就只有一个参数那就是 err,如果想要向最终回调函数中传入某些结果那么还要用到接下来介绍的 asycnc.map()

  • async.map(arr,iterator(item, callback),callback)

map方法使用方式和each完全一样,与each方法不同的是,map方法用于操作对象的转换,转换后的新的结果集会被传递至最终回调方法中(不出错的情况下)呈现一个新的数组的形似。

同样的是,map也是并行操作,如需按顺序并且出错就停止则需要使用 async.mapSeries

向Promise的过渡

Promise基础简要介绍

一个简单清晰的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const fs = require('fs')

fs.readFile("./Test/file1.txt", "utf-8", (err, content) => {
if (err) {
console.log(err);
} else {
console.log(content);
}
})

let readFile = () => {
return new Promise((resolve, reject) => {
fs.readFile("./Test/file2.txt", "utf-8", (err, content) => {
if (err) {
reject(err)
} else {
resolve(content);
}
})
})
}

readFile()
.then((res) => {
console.log(res);
})
.catch((err) => {
console.log(err);
})

只是比原生的callback形式的异步函数多了一步封装包裹的过程。Promise是一个对象,可以把它看做是一个包含着异步函数可能出现的结果(成功或者失败(err))的“异步状态小球”。得到了这个小球你就能用 then 去弄他,用 catch 去捕获它的失败。简单的概括,也仅此而已。基于这个小球,我们就能得到所谓的“现代异步处理方案”了,后话。

前端 Promisify Ajax请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
let btn = document.getElementById("btn")
let getData = (api) => {
return new Promise((resolve,reject)=>{
let req = new XMLHttpRequest();
req.open("GET",api,true)
req.onload = () => {
if (req.status === 200) {
resolve(req.responseText)
} else {
reject(new Error(req.statusText))
}
}

req.onerror = () => {
reject(new Error(req.statusText))
}
req.send()
})
}

btn.onclick = function(e) {
getData('/api')
.then((res) => {
let content=JSON.parse(res).msg
document.getElementById("content").innerText = content
})
.catch((err) => {
console.log(err);
})
}

Node提供的原生模块的API基本上都是基于一个 callback 形式的函数,我们想用 Promise ,难不成甚至原生的这些最原始的函数都要我们手动去进行 return 一个 Promise 对象的改造?其实不是这样的,Node 风格的 callback 都遵从着“错误优先”的回到函数方案,即形如(err,res)=>{},并且回调函数都是最后一个参数,他们的形式都是一致的。所以Node的原生util模块提供了一个方便我们将函数 Promisfy 的工具——util.promisfy(origin)

1
2
3
4
5
6
7
8
9
let readFileSeccond = util.promisify(fs.readFile)

readFileSeccond("./Test/file3.txt","utf-8")
.then((res) => {
console.log(res);
})
.catch((err) => {
console.log(err);
})

注意,这个原生工具会对原生回调的结果进行封装,如果在最后的回调函数中除了 err 参数之外,还有不止一个结果的情况,那么 util.promisify 会将结果都统一封装进一个对象之中。

用Promise提供方法应对不同的情况

实际代码逻辑中我们可能会面对各种异步流程控制的情况,像是之前介绍 async 模块一样,一种很常见的情况就是有很多的异步方法是可以同时并发发起请求的,即互相不依赖对方的结果,async.parallel的效果那样。Promise 除了封装异步之外还未我们提供了一些原生方法去面对类似这样的情况:

知识准备

  • Promise.resolve(value)

它是下面这段代码的语法糖:

1
2
3
new Promise((resolve)=>{
resolve(value)
})

注意点,在 then 调用的时候即便一个promise对象是立即进入完成状态的,那Promise的 then 调用也是异步的,这是为了避免同步和异步之间状态出现了模糊。所以你可以认为,Promise 只能是异步的,用接下的代码说明:

1
2
3
4
5
6
7
8
9
10
11
let promiseA = new Promise((resolve) => {
console.log("1.构造Promise函数");
resolve("ray is handsome")
})

promiseA.then((res) => {
console.log("2.成功态");
console.log(res);
})

console.log("3.最后书写");

上面的代码,打印的结果如下:

1
2
3
4
1.构造Promise函数
3.最后书写
2.成功态
ray is handsome

promise 可以链式 then ,每一个 then 之后都会产生一个新的 promise 对象,在 then 链中前一个 then 这种可以通过 return的方式想下一个 then 传递值,这个值会自动调用 promise.resolve()转化成一个promise对象,代码说明吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const fs = require('fs')
let promise = Promise.resolve(1)
promise
.then((value) => {
console.log(value)
return value+1
})
.then((value) => {
console.log(`first那里传下来的${value}`);
return value+1
})
.then((value) => {
console.log(`second那里传下来的${value}`);
console.log(value)
})
.catch((err) => {
console.log(err);
})

上面的代码答应的结果:

1
2
3
4
1
first那里传下来的2
second那里传下来的3
3

此外 then 链中应该添加 catch 捕获异常,某一个 then 中出现了错误则执行链会跳过后来的 then 直接进入 catch

得到 async.parallel同样的效果

Promise 提供了一个原生方法 Promise.all(arr),其中arr是一个由 promise 对象组成的一个数组。该方法可以实现让传入该方法的数组中的 promise 同时执行,并在所有的 promise 都有了最终的状态之后,才会调用接下来的 then 方法,并且得到的结果和在数组中注册的结果保持一致。看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const fs = require('fs')
const util = require('util')

let readFile = util.promisify(fs.readFile)

let files = [readFile("../../Test/file1.txt","utf-8"),
readFile("../../Test/file2.txt","utf-8"),
readFile("../../Test/file3.txt","utf-8"),]

Promise.all(files)
.then((res) => {
console.log(res)
})
.catch((err) => {
console.log(err);
})

上面的代码最终会打印,即是按顺序的三个txt文件里面的内容组成的数组:

1
[‘1’,‘2’,‘3’]

对比 async.parallel的用法,发现得到相同的结果。

此外,与 Promise.all方法相对应的还有一个Promise.race,该方法与all用法相同,同样是传入一个由 promise 对象组成的数组,你可以把上面的代码中的 all 直接换成 race 看看是什么效果。没错,对于指导 race 这个英文单词意思的可能已经猜出来了,race 竞争,赛跑,就是只要数组中有一个 promise 到达最终态,该方法的 then 就会执行。所以该代码有可能会出现’1’,’2’,’3’中的任何一个字符串。

至此,我们解决了要改造的代码的第一个问题,那就是多异步的同时执行,那么之前 async 模块介绍的其他的的功能在实际运用中也很常见的几个场景,类似顺序执行异步函数,异步集合操作要怎么使用新的方案模拟出来呢?真正的原生 async要登场了。

所谓的异步流程控制的“终极解决方案”————async

在开始介绍 async 之前,想先聊一种情况。

基于 Promise 的这一套看似可以让代码“竖着写”,可以很好的解决“callbackHell”回调地狱的窘境,但是上述所有的例子都是简单场景下。在基于 Promise 的 then 链中我们不难发现,虽然一层层往下的 then 链可以向下一层传递本层处理好的数据,但是这种链条并不能跨层使用数据,就是说如果第3层的 then 想直接使用第一层的结果必须有一个前提就是第二层不仅将自己处理好的数据 return 给第三层,同时还要把第一层传下来的再一次传给第三层使用。不然还有一种方式,那就是我们从回调地狱陷入另一种地狱 “Promise地狱”。

借用这篇博客 的一个操作 mongoDB 场景例子说明:

1
2
3
4
5
6
7
8
9
MongoClient.connect(url + db_name).then(db => {
return db.collection('blogs');
}).then(coll => {
return coll.find().toArray();
}).then(blogs => {
console.log(blogs.length);
}).catch(err => {
console.log(err);
})

如果我想要在最后一个 then 中得到 db 对象用来执行 db.close()关闭数据库操作,我只能选择让每一层都传递这个 db 对象直至我使用操作 then 的尽头,像下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
MongoClient.connect(url + db_name).then(db => {
return {db:db,coll:db.collection('blogs')};
}).then(result => {
return {db:result.db,blogs:result.coll.find().toArray()};
}).then(result => {
return result.blogs.then(blogs => { //注意这里,result.coll.find().toArray()返回的是一个Promise,因此这里需要再解析一层
return {db:result.db,blogs:blogs}
})
}).then(result => {
console.log(result.blogs.length);
result.db.close();
}).catch(err => {
console.log(err);
});

下面陷入 “Promise地狱”:

1
2
3
4
5
6
7
8
9
10
11
MongoClient.connect(url + db_name).then(db => {
let coll = db.collection('blogs');
coll.find().toArray().then(blogs => {
console.log(blogs.length);
db.close();
}).catch(err => {
console.log(err);
});
}).catch(err => {
console.log(err);
})

看上去不是那么明显,但是已经出现了 then 里面嵌套 then 了,操作一多直接一夜回到解放前,再一次丧失了让人想看代码的欲望。OK,用传说中的 async 呢

1
2
3
4
5
6
7
8
9
(async function(){
let db = await MongoClient.connect(url + db_name);
let coll = db.collection('blogs');
let blogs = await coll.find().toArray();
console.log(blogs.length);
db.close();
})().catch(err => {
console.log(err);
});

各种异步写的像同步了,async(异步)关键字声明,告诉读代码的这是一个包含了各种异步操作的函数,await(得等它)关键字说明后面的是个异步操作,卡死了等他执行完再往下。这个语义以及视觉确实没法否认这可能是“最好的”异步解决方案了吧。

不得不提的 co 模块

众所周知的是 async 函数式 generator 的语法糖,generator 在异步流程控制中的执行依赖于执行器,co 模块就是一个 generator 的执行器,在真正介绍和使用 async 解决法案之前有必要简单了解一下大名鼎鼎的 co 模块。

什么是 generator,详细请参考Ecmascript6 入门

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var fs = require('fs');

var readFile = function (fileName){
return new Promise(function (resolve, reject){
fs.readFile(fileName, function(error, data){
if (error) reject(error);
resolve(data);
});
});
};

var gen = function* (){
var f1 = yield readFile('/etc/fstab');
var f2 = yield readFile('/etc/shells');
console.log(f1.toString());
console.log(f2.toString());
};
// 执行生成器,返回一个生成器内部的指针
var g = gen();
//手动 generator 执行器
g.next().value.then(function(data){
g.next(data).value.then(function(data){
g.next(data);
});
})

上述代码采用 generator 的方式在 yeild 关键字后面封装了异步操作并通过 next()去手动执行它。调用 g.next() 是去执行 yield 后面的异步,这个方案就是经典的异步的“协程”(多个线程互相协作,完成异步任务)处理方案。

协程执行步骤:

  1. 协程A开始执行。
  2. 协程A执行到一半,进入暂停,执行权转移到协程B。
  3. (一段时间后)协程B交还执行权。
  4. 协程A恢复执行。

协程遇到 yield 命令就暂停 等到执行权返回,再从暂停的地方继续往后执行。

翻译上述代码:

  • gen()执行后返回一个生成器的内部执行指针,gen 生成器就是一个协程。
  • gen.next()让生成器内部开始执行代码到遇到 yield 执行 yield 后,就暂停该协程,并且交出执行权,此时执行权落到了JS主线程的手里,即开始执行 Promise 的 then 解析。
  • then 的回调里取得了该异步数据结果,调用g.next(data)通过网next()函数传参的形式,将结果返回给生成器的f1变量。
  • 依次回调类推。

说明:

  • g.next()返回一个对象,形如{ value: 一个Promise, done: false }到生成器内部代码执行完毕返回{ value: undefined, done: true }

引出一个问题: 我们不能每一次用 generator 处理异步都要手写 generator 的 then 回调执行器,该格式相同,每次都是调用.next(),所以可以用递归函数封装成一个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function run(gen){
var g = gen();

function next(data){
var result = g.next(data);
if (result.done) return result.value;
result.value.then(function(data){
next(data);
});
}

next();
}

run(gen);

上述执行器的函数编写 co 模块考虑周全的写好了,co模块源码

你只需要:

1
2
3
4
5
6
7
8
const co = require('co')
co(function* () {
var res = yield [
Promise.resolve(1),
Promise.resolve(2)
];
console.log(res);
}).catch(onerror);

yield 后面的是并发。

此时我们来对比 async 写法:)

1
2
3
4
5
6
7
async function(){
var res = await [
Promise.resolve(1),
Promise.resolve(2)
]
console.log(res);
}().catch(onerror);

async 函数就是将 Generator 函数的星号()替换成 async,将 yield 替换成 await,仅此而已。并且它不需要额外的执行器,因为它*自带 Generator 执行器

本质上其实并没有脱离“协程”异步的处理方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const fs = require('fs')
const util = require('util')


let readFile = util.promisify(fs.readFile);

(async function fn() {
var a = await readFile('./test1.txt',"utf-8")
var b = await readFile('./test2.txt',"utf-8")
console.log(a)
console.log(b)
})()
.catch((e)=>{
console.log("出错了")
})



console.log('主线程')

打印结果会先输出“主线程”。

async 解决方案

前文我们通过 Promise.all()解决了 async.paralle()的功能,现在我们来看看用 Promise 配合原生 async 来达到“async”模块的其他功能。

  • 实现 async.series 顺序执行异步函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//源代码
async.series([
function(callback) {
if (version.other_parameters != otherParams) { // 更新其他参数
var newVersion = {
id: version.id,
other_parameters: otherParams,
};
CVersion.update(newVersion, callback);
} else {
callback(null, null);
}
},
function(callback) {
cVersionModel.removeParams(version.id, toBeRemovedParams, callback);
},
function(callback) {
cVersionModel.addParams(version.id, toBeAddedParams, callback);
},
function(callback) {
CVersion.get(version.id, callback);
},
], function(err, results) {
if (err) {
logger.error("更新电路图参数失败!");
logger.error(version);
logger.error(tagNames);
logger.error(err);
callback(err);
} else {
callback(null, results[3].parameters);
}
});


//新代码

(async function(){
if (version.other_parameters != otherParams) { // 更新其参数
var newVersion = {
id: version.id,
other_parameters: otherParams,
};
await CVersion.update(newVersion);
} else {
return null
}
await cVersionModel.removeParams(version.id, toBeRemovedParams)
await cVersionModel.addParams(version.id, toBeAddedParams)
let result = await CVersion.get(version.id)
return result
})()
..catch((err)=>{
logger.error("更新参数失败!");
logger.error(version);
logger.error(tagNames);
logger.error(err);
})
  • 实现 async.each 的遍历集合每一个元素实现异步操作功能:
1
2
3
4
5
6
7
8
9
10
11
12
13
//源代码
Notification.newNotifications= function(notifications, callback) {
function iterator(notification, callback) {
Notification.newNotification(notification, function(err, results) {
logger.error(err);
callback(err);
});
}

async.each(notifications, iterator, function(err) {
callback(err, null);
});
}

新代码:

1
2
3
4
5
6
7
8
9
10
11
//新代码
Notification.newNotifications= function(notifications){
notifications.forEach(async function(notification){
try{
await Notification.newNotification(notification)//异步操作
} catch (err) {
logger.error(err);
return err;
}
});
}

上述代码需要说明的情况是,在forEach 体内的每一个元素的 await 都是并发执行的,因为这正好满足了 async.each 的特点,如果你希望的是数组元素继发执行异步操作,也就是前文所提到的 async.eachSeries 的功能,你需要协程一个 for 循环而不是 forEach 的形式,类似如下代码:

1
2
3
4
5
6
7
async function dbFuc(db) {
let docs = [{}, {}, {}];

for (let doc of docs) {
await db.post(doc);//异步数据库操作
}
}

如果你觉得上述并发集合操作使用 forEach 的方式依旧不太直观,也可以改为配合Promise.all的形式:

1
2
3
4
5
6
7
async function dbFuc(db) {
let docs = [{}, {}, {}];
let promises = docs.map((doc) => db.post(doc));

let results = await Promise.all(promises);
console.log(results);
}

上述代码现先对数组元素进行遍历,将传入了数组元素参数的一步操作封装成为一个数组,通过await Promise.all(promises)的形式进行并发操作。Tips: Promise.all 有自动将数组的每个元素变成Promise对象的能力。

本文为原创文章作为学习交流笔记,如有错误请您评论指教
转载请注明来源:https://isliulei.com/article/Node异步流程控制实践与总结/