Javascript异步编程2

Generators

Generator 介绍

我们可能都不自觉的有一个认识,那就是JavaScript的函数有一个特点,run to completion。意思是说在一个时间点,只会有一个函数在运行。当这个函数运行结束前,没有任何一个外部函数可以从外部抢占执行的权利。

Generator是在ES6引入的。Generator函数可以在执行的过程中让出执行的机会,自己暂停下来。在需要的时候,外部主动调用Generator的next方法继续执行剩下的部分。
可以参考阮一峰博客。这里先给一个简单的代码示例。

1
2
3
4
5
6
7
8
9
function* gen(){
console.log("hello");
yield;
console.log("world");
}

var it = gen();
it.next(); // hello
it.next(); // world

创建一个Generator需要用function* 的关键字,以区别于普通的function定义。还有一个新的关键字是yield。它的功能就像我们的播放器上的暂停按钮一样,暂停了Generator函数的执行。注意,Generator函数暂停的点是由函数定义的时候制定好的。外部无法主动地去设置Generator停在哪一行。我们把这个叫Coorperate concurrency,而不是Preemptive concurrency。Preemptive意思就是抢占式的。在调用gen()时,只是创建一个generator对象,函数并没有开始执行。调用Generator产生一个iterator,iterator就是用来遍历数据的。我们在第8行调用next函数,这个Generator由暂停状态变为执行状态,等到运行到yield的时候,这个Generator再次变为暂停状态。第9行再次调用next,函数继续执行。

Generator 消息

1
2
3
4
5
6
7
8
9
10
11
12
13
function *main() {
yield 1;
yield 2;
yield 3;
}

var it = main();

it.next(); // { value: 1, done: false}
it.next(); // { value: 2, done: false}
it.next(); // { value: 3, done: false}

it.next(); // { value: undifined, done: true}

我们之前的例子yield没有返回任何值,那么他返回的就是undifined。上边这个例子中我们yield反回了1,2,3,可以看出,
yield 1时,也就是我们的第9行执行的时候,返回了value为1,done为false的object。yield 2时,执行第10行,返回了value为2,done为false的object。
yield 3时,执行第11行,返回了value为2,看起来我们的Generator没有其他可以再执行的了,但是这个Generator还没有还行完毕,所以我们的done还是false。
当执行最后一次next时,我们才得到done为true,value是undefined。那么undefined是哪里来的呢?这是因为所以得JavaScript函数如果没有return语句的话,那么返回值就是undefined。如果我们的Generator最后return返回了一个值,例如42,那么这里的值就是value:42, done: true。
ES6引入了for of循环,来遍历任何一个iterator。所以我们可以用一个循环来执行Generator,直到到结束。

以下内容可能产生不适。。因为如果从来没有用过generator的话会很不习惯。我们先定义一个generator的执行器。

1
2
3
4
5
6
function coroutine(g){
var it = g();
return function(){
return it.next.apply(it, arguments);
}
}

coroutine函数其实就是一个generator的wrapper,第2行我们把传入的generator直接初始化,每当我们的调用由coroutine返回的函数时,其实就是调用generator的next函数。

下面看这个代码

1
2
3
4
5
6
7
8
9
var run =  coroutine(function*(){
var x = 1 + (yield);
var y = 1 + (yield);
yield (x + y);
});

run();
run(10);
console.log("Meaning of life: " + run(30).value);

我们调用coroutine来包装起来我们的generator函数,返回一个run方法,初始化这个generator。每一次调用run都是在调用next方法。

第7行调用run,开始执行generator,运行第2行,执行语句var x = 1 +, 然后我们遇到了yield。generator要暂停,把执行权返回到第8行。generator只有等待外部的输入值时才可以继续计算完这个表达式。第7行,我们继续调用run,也就是next,传入10,完成第一个表达式的计算的到x=11;generator继续执行var y = 1 +,我们遇到yield,让出执行权到第8行。run(30), 将30传入generator,完成y的计算y=31,generator继续执行,yield(x+y),generator暂停,把42作为value传出去。

从generator外部看,也就是第7行开始,我们每运行一次next,就会暂停下来,把值传入到generator,让generator继续运行到下一个yield的地方。
从generator内部看,每一次yield,其实是generator缺少一个依赖的值,不能继续计算,而暂停下来。只有等待外部的传入的值和继续执行的信号,才能继续。
每一个yield,可以理解为,我这个地方需要一个value,我会一直等待,直到有人在外边给我传入一个值。

异步Generator

我们改造一下上边的例子,做一个异步的Generator

1
2
3
4
5
6
7
8
9
10
11
12
function getData(d){
setTimeout(function(){ run(d); }, 1000);
}

var run = coroutine(function*(){
var x = 1 + (yield getData(10));
var y = 1 + (yield getData(30));
var answer = (yield getData("Meaning of life: " + (x + y)));
console.log(answer);
});

run();

先看getData方法,我们注册一个timeout,从而异步的在第2行调用run方法,让generator继续执行。
我们在generator里,yield了一个表达式,也就是getData。在执行var x = 1 + (yield getData(10))时,generator暂停,把执行权交给外部,同时getData开始运行。当getData中的回调函数运行,执行run,传入10,这样让generator继续完成表达式的计算,x=11。继续执行下一行var y = 1 + (yield getData(30)),遇到yield暂停,并且执行getData(30)。同样等timeout的回调执行时,把30传回generator,继续完成y的计算,y=41;同理下一行计算answer,yield暂停,等timeout回来再继续完成计算。最后console输出。

在上边这个例子的generator中我们的代码看上去是同步的,背后的getData是异步的。这样的代码结构跟我们的思维方式是一样的,代码可读性增强了很多,比之前的promise还要好。

Promise and Generator

我们之前的代码中还是有一些缺点。就是我们的run方法被放在了callback中执行,这就是inversion of control。程序继续的调用的控制不够清晰。
而且我们之前的代码中yield都是undifined。所以我们考虑能不能yield一个promise,也就是让getData返回一个promise,等这个promise完成时,也就是getData完成时,调用Generator的next方法继续执行。等到下一个语句再yield另一个promise,等promise完成继续调用next,如此下去,等到next返回done为true时,也就是我们的generator执行完毕的时候。
后边的流程很像递归。也是一个跟业务逻辑无关的流程控制器。外边的lib有很多开源实现,就是一个返回promise的generator的执行器。下边的代码是我自己随意实现了的一个执行器,里边没有包含出错处理这方面的代码,只是为了给出一个大概的样子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
function getData(d){
return new Promise(function(resolve, reject){
setTimeout(function(){ resolve(d); }, 1000);
})
}

function* gen(){
var x = 1 + (yield getData(10));
var y = 1 + (yield getData(30));
var answer = (yield getData("Meaning of life: " + (x + y)));
console.log(answer);
}

var it = gen();
function handleNext(value){
var next = it.next(value);
if( next.done ){
return Promise.resolve(next.value);
} else {
return next.value.then(handleNext);
}
}

handleNext();

我们把这个通用的执行器runner方法提出来以后也可以用
1
2
3
4
5
6
7
8
9
10
11
12
13
function runner(gen){
var it = gen();
function handleNext(value){
var next = it.next(value);
if( next.done ){
return Promise.resolve(next.value);
} else {
return next.value.then(handleNext);
}
}

handleNext();
}

Generator练习

还是之前的情景,我们来思考一下用Generator是怎么解决的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
function fakeAjax(url,cb) {
var fake_responses = {
"file1": "The first text",
"file2": "The middle text",
"file3": "The last text"
};
var randomDelay = (Math.round(Math.random() * 1E4) % 8000) + 1000;

console.log("Requesting: " + url);

setTimeout(function(){
cb(fake_responses[url]);
},randomDelay);
}

function output(text) {
console.log(text);
}

// **************************************

function getFile(file) {
return new Promise(function(resolve){
fakeAjax(file,resolve);
});
}

// Request all files at once in
// "parallel" via `getFile(..)`.
//
// Render as each one finishes,
// but only once previous rendering
// is done.

// ???

function* gen(){
var p1 = getFile("file1");
var p2 = getFile("file2");
var p3 = getFile("file3");
output(yield p1);
output(yield p2);
output(yield p3);
output('completed');
}
runner(gen);

//use map
function* gen(){
var promises = ["file1", "file2", "file3"].map(function(fname){
return getFile(fname);
});
for(p of promises){
output((yield p));
}
output('completed');
}
runner(gen);

有了之前的例子,这里的的示例代码更容易写出来,其实就是利用Promise和Generator,我们的解也分为用普通的流程和用list的map和循环来写。
可以看出来这样的代码更为清晰易懂,比之前用promise解决可读性更胜一筹。

Observable

Events and promises

前端的代码中的并发,异步,我们之前也说过,核心问题是流程控制的管理。比如之前说过的promise,处理的是单个请求和单个回复的情况。
如果我们要处理一个消息流,也就是连续不断地同类event,该如何?Promise还能胜任吗?
而且我们在编程中大部分要出里的异步问题,是面向event stream的。在UI中的,用户的操作,比如点击;服务器发来的数据处理,都是面向event的。在这种情况下我们把promise套如,可能会有些问题。
举个例子,我的页面上可能有一些处理用户点击的代码。我想记录一些btn点击的log。我知道promise怎么创建,套用到这个情景中应该是这样的,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var p1 = new Promise(function(resolve, rejct){
$('#btn').click(function(evt){
var className = evt.target.className;
if(/foobar/.test(className)){
resolve(className);
}else{
reject();
}
});
});

p1.then(function(className){
console.log(className);
});

可是上面的代码有一个问题,就是promise只能resolve一次。我创建了一个promise,但是只能响应一次用户点击。我们之前介绍了那么多工具,promise,generator的高级工具,可是遇到这个最最常见的情景,好像不是很好使。我们该怎么办?

可能你会这么想,我反一下,把promise的创建搬到event的listener里边,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
$("#btn").click(function(evt){
var className = evt.target.className;
var p1 = new Promise(function(resolve,reject){
if(/foobar/.test(className)){
resolve(className);
}else{
reject();
}
});
p1.then(function(className){
console.log(className);
})
})

我们每次在click的callback中都创建一个新的promise,这样就可以每次都resolve了。这样看起来挺不错,可是,我们为什么要这样做?这与我们一开始的callback hell不是一样了吗?我们立即resolve了一个promise,并且调用了then。我们之前的代码还可以在一个地方设置好我的消息源,而在我程序的另外一个地方处理这个消息。现在全部都回到了一起,我们把两个不同的任务混在了一个地方。

其实问题在于我们的promise并不是特别适合于一个面向event的环境。我们需要更好地工具。我们需要把两个任务分开,也就是消息源的设置,以及消息的处理。

Observables

现在的Obserable还不是JavaScript原生支持的。也就是说ES6中没有这个东西,我们都需要用第三方的lib。但未来Observable很可能会成为JavaScript的一部分。现在外部有一个很好的Observable的lib,也就是Rxjs。

Concept

在Excel中,我们知道有计算单元格这个东西。就是说有些单元格的数据是通过计算其他源单元格的数据得到的。例如求和,求平均值,甚至更复杂的计算。当源数据发生改变的时候,这些计算单元数据也会相应的变化。假如我们的源数据被多个计算单元格依赖,那么就会触发所有这些计算单元格的更新。源数据单元的变化可能有好多次,变化的事件就像流一样。而依赖这个数据源的计算单元格都需要订阅这个变化,并且执行相应的步骤来更新自己的数据。

Observable或者Reactive Programming(响应式编程)就是类似的概念。Observable是一个事件的转换器,它连接在事件的源上边,每一次有一个新的事件发生,它就产生一个新的promise。重点在于,我可以在一个地方设置好这个转换器,而在程序的另一个地方甚至多个不同的地方,再定义如何处理这个事件流。我们不用和之前举得反例一样把事件的订阅初始化代码和处理代码放在同一个地方。

Rxjs例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var obsv = Rx.Observable.fromEvent(btn, "click");

obsv
.map(function mapper(evt){
return evt.target.className;
})
.filter(function filterer(className){
return /foobar/.test(className);
})
.distinctUntilChanged()
.subscribe(function(data){
var className = data[1];
console.log(className);
});

例子中的第一行我们创建了一个Observable,使用了Rx.Observable.fromEvent这个常用的工具。它可以把一个dom元素上的事件变成一个Observable。Rxjs还有很多强大的工具可以使用。
在程序的其他地方,我可以定义我想如何处理这个事件流。这里的map和filter等等,都是Observable的工具,用来对这个事件流进行一些转换和处理,也是十分易于理解使用。最后调用subscribe,来操作转换好的事件。我们就好像定义好了一系列的步骤来处理这个流过来的事件一样。我们把Observable甚至可以当做数组一样对待,数组其实也是一个数据流。

这里比较有意思的一个工具是distinctUntilChanged,这个是说加入现在连续来了5个event,都是hello,那么只有第一个可以进入下一步,其他4个都被过滤,因为重复了。然后来了连续的5个world,那么只有第一个world可以流入下一步。这是distinct的意思。那么接着,又来了5个hello,那么还是只有第一个hello可以通过。这是untilchanged的意思。

RxMarbles有一个很好的这些工具的图示化介绍。

Rxjs常用工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// From an event
Rx.Observable.fromEvent(document.querySelector('button'), 'click');

// From array of values
Rx.Observable.from([1,2,3]);

// From one or multiple values
Rx.Observable.of('foo', 'bar')

// Externally produce new events.
var myObservable = new Rx.Subject();
myObservable.subscribe(value => console.log(value));
myObservable.next('foo');

// Merge Observables
// Creates an output Observable which concurrently emits all values from every given input Observable.
var observable1 = Rx.Observable.interval(1000);
var observable2 = Rx.Observable.interval(400);

var merged = Rx.Observable.merge(observable1, observable2)

// Zip
// Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.
let age$ = Observable.of(27, 25, 29);
let name$ = Observable.of('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of(true, true, false);

Observable
.zip(age$,
name$,
isDev$,
(age, name, isDev) => ({ age, name, isDev }))
.subscribe(x => console.log(x));

Observable练习

这次是一个新的的练习。需求是对页面上的btn点击行为进行采样。注意不是debounce,同一个btn在短时间内点击多次,例如每一秒内,我只响应一次点击。
我在下面的codepen中,已经引用了jQeury和Rxjs。有兴趣的可以打开试着写一写。

有人可能会考虑用zip来解决这个问题,但是zip其实不能完成采样的任务。那样的话,我们的clicks的队列会不断地增长,而不是真正的采样。我在下边给出两份参考代码
Solution1:

Solution2:

可以看出来Rxjs中的工具还是非常的多和实用的。我这里是抛砖引玉,更多的内容都在官网上有。掌握或者了解Rxjs的各种工具的适用场景,我认为是成为一个合格的Reactive Programmer的基本。

我在刚刚接触Observable的时候,也是理解了好几次,特别是event stream。看官方的文档,资料,也看了几次。看多了用多了也就有一些感觉,代码的质量也就能提升。

我们的工具箱中又多了一个工具。没有任何一个工具是万金油。不同的场景有不同的合适的方法,对每个的特点都理解,不要用的太死板是我们的该做到的。

CSP (Communicating Sequential Processes)/ Channels

CSP的目的是用Channel来设计并发程序。和我们之前所介绍的各种方法一样。

CSP的提出在这里,说实话我也没怎么看过这篇文章,作者Hoare现在仍在更新CSP的理论。CSP的理念在Go语言和Clojure Script中使用的很多

Channel

那么Channel是什么?Channel有些像Stream,也类似于管道Pipe。如果了解Actor模型的话(Scala中大量使用的并发模型),Channel确实和Actor很类似。但Channel有一个重点在于,默认情况下他没有缓冲区,因此Channel自然有了这样一个特点就是,反向压力(Back Pressure)。举个生活中的例子,我拿着水管接在水龙头上边给外边的花草浇水,我打开洒水口的开关就行了;当我浇完了我直接把洒水口的开关关掉就行了,我并不需要告诉源头我要开还是要关,我只需要操作我手边的开关就可以了。这是反向的从消费者到生产者的一个交流,告诉上边的生产者说,我不需要水了。Channel的Back Presse,也就是说Channnel的send动作是阻塞的。只有当Channel在另一方,调用了取消息take后,才会解除阻塞,反之亦然。而Actor的send是异步的,也就是非阻塞的。

在我们的程序中,我们有这样的代码,我们的生产者和消费者之间没有直接的联系,因为我们需要这么做,以达到职责清晰,代码的局部性更好。我们需要生产者和消费者之间有一个交流,但是如果我们不想引入一个类似于全局变量的东西,告诉生产者需不需要继续生产,别给我发了。(Rxjs Observable中有Hot Cold)。
另外一种设计方式就是用我们的Channel,利用Back Pressure,可以做到,你不能往Channel放更多地东西,直到我准备好了可以继续取;你不能继续取东西,直到我可以再往里边放东西。

我们提到了阻塞,你可能会联想到JavaScript唯一有阻塞功能的工具,就是我们刚才讲过的Generator函数。确实如此,CSP的JavaScript版本需要用到Generator。

看点代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var ch = chan();

function *process1(){
yield put(ch, "hello"); // 1. put hello into ch and pause proc1 until someone take it from the ch
var msg = yield take(ch); // 2. resume proc1 because someone take hello out, and wait for someone to put something into the ch
console.log(msg); // 3. someone put something into ch to let proc1 to resume.
}

function *process2(){
var greeting = yield take(ch); // 1. pause proc2 until someone put something into ch
yield put(ch, greeting + " world"); // 2. resume proc2 because someone has put data in ch, and put data into ch, then pause proc2 again until someone have take that from ch
console.log("done"); // 3. resume proc2 because someone take the data from ch
}

// hello world
// done

chan, put, take我们先当做现有的函数来理解这段代码。我们用到了yield,因为我们需要阻塞或者说暂停我们的函数。 假设proc1和proc2在两个不同的线程运行,他们通过ch就可以协调消费者生产者的运行时交流与相互控制。

Blocking Channel

再看一个例子,我们用了一个csp的lib,里边用到了go函数,因为是模仿了Go语言的方式调用类似线程运行的功能。csp.timeout返回一个channel,而我们并不需要关心这个channel到底在哪,我们只需要知道过500ms他会在里边放一个消息,让我们的yield的地方继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var ch = chan();

csp.go(function*(){
while(true) {
yield csp.put(ch, Math.random()); // repeately put random number in ch, pause when blocking
}
})

csp.go(function*(){
while(true){
yield csp.take( csp.timeout(500) ); // wait 500ms
var num = yield csp.take(ch); // take data from ch
console.log(num);
}
})

下面的例子alts,功能比较像promise的race。

1
2
3
4
5
6
csp.go(function*(){
while(true){
var msg = yield csp.alts(ch1, ch2, ch3);
console.log(msg);
}
})

再来一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
csp.go(function*(){
var table = csp.chan();

csp.go(player, ["ping", table]); // setup ping 'proc'
csp.go(player, ["pong", table]); // setup pong 'proc'

yield csp.put(table, {hits: 0}); // start the game by put msg into channel table
yield csp.timeout(1000); // wait for 1000ms to close the channel table
table.close();
})

function* player(name, table) {
while(true) {
var ball = yield csp.take(table); // try to get the ball from table
if( ball === csp.CLOSED) {
console.log(name + ": table's gone");
return;
}
ball.hits += 1;
console.log(name + " " + ball.hits);
yield csp.timeout(100);
yield csp.put(table, ball); // put updated ball back to ch
}
}

Event Channel

我们在Observable有一个工具是fromEvent,我们这里用csp的方法设计一个自己的fromEvent。这里用了putAsync,我想从字面也能理解这个是非阻塞的put。我为什么用putAsnc呢,因为这个函数不是一个Generator,而是一个普通的JavaScript函数。putAyns会返回一个promise,当这个put成功是resolve。但是我这里并不关心她什么时候put成功。这里是不是很像我们在Observable中不停地调用next函数?csp与Observable的核心区别在于有Back Pressure。如果Channel数据没有被取走,生产者无法继续往队列里放数据。连续不停地调用putAsync,并不会覆盖Channel的数据,每一个putAsync都会返回promise,他们排着队等待往Channel里放数据。直到有人从Channel中开始取数据时,这个队列才会向前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function fromEvent(el, eventType){
var ch = csp.chan();
$(el).bind(eventType, function(evt){
csp.putAsync(ch, evt);
});
return ch;
}

csp.go(function*(){
var ch = fromEvent(el, "mousemove");
while(true){
var evt = yield csp.take(ch);
console.log(evt.clientX + "," + evt.clientY);
}
})

CSP练习

我认为下面这个练习的情景有些生硬,不是特别适合Channel的场景,没能完全体现出Channel的优势。而且我认为原来培训中的参考答案并不完全正确的。但是还是可以参考一下这个题目吧。需求跟Observable的btn采样是一样的。我给直接给出了我的代码。我们最后还是用到了shouldWrite变量来控制写,而且好像没有更好地办法。

这里我引用了js-csp。对于看过Rethinking的,我一直没有使用Trainer的ASQ库是因为我觉得他的lib东西太多了,什么都可以做,而且使用什么都要在前面加ASQ,看上去有些奇怪。不过他确实很厉害,自己写了这么多工具的实现。

大练习 A tale of three lists

Trainer的这个练习还是很赞的,给出地址git,有兴趣的可以clone下来,用我们讲过的各种工具(Callback, thunk, promise, generator, obserable, csp)都实现以下这个练习。还是很有挑战的。

小结

两篇文章介绍了各种JavaScript异步编程的工具。我的理解是,没有一个万金油可以解决所有的问题。如果你听身边有人说Observable可以解决一切异步编程的问题,或者说CSP可以搞定所有的异步编程设计,那么他一定是过分沉浸在自己的世界了,所谓生搬硬套。我们需要做的就是掌握这个工具箱里的所有工具,在适合的时候拿出来,写出漂亮的代码,解决这个问题。

最后推荐一些我写第二篇时看到的一些有关的好文章的连接

https://juejin.im/entry/56f480ccc4c9710051bffd2b
https://www.zhihu.com/question/26192499
http://lucasmreis.github.io/blog/quick-introduction-to-csp-in-javascript/