Roger Guo's Blog


  • 首页

  • 关于

  • 归档

Java并发编程-性能与活跃性以及并发程序测试

发表于 2018-02-11

活跃性问题

我们通过加锁来确保线程安全,但是如果过度使用加锁,可能引起死锁。我们通过信号量和线程池,来限制对有限资源的使用,但是在有些情况下,这也可能造成死锁。
Java程序无法自动从死锁状态恢复,所以我们在设计程序时候,要找到可能导致死锁的条件。

死锁

经典的死锁问题就是哲学家进餐问题。
Dining philosophers problem
有些筷子管理算法能够让每个人都吃到东西,但有些算法可能导致一部分或者所有哲学家都饿死。

后一种情况就是死锁:每个人拥有其他人需要的资源,同时又等待其他人已经拥有的资源,并且每个人在获得所需要的所有资源前都不会放弃已经拥有的资源。

顺序死锁

Left Right Deadlock
如果所有线程都按照固定的顺序来获得锁,那么就不会出现顺序死锁问题。
如果想要验证锁的顺序的一致性,需要对程序中的加锁行为进行全局分析。

动态的顺序死锁问题

有时候并不是可以很清楚的知道是否会发生顺序死锁。

动态的顺序死锁
1
2
3
4
5
6
7
8
9
10
11
12
13
// DeadLock!
public void transferMoney(Account fromAccount, Account toAccount, DollarAmout amount) throws InsufficientFundsException {
synchronized (fromAccount) {
synchronized (toAccount) {
if (fromAccount.getBalance().compareTo(amount) < 0) {
throw new InsufficientFundsException();
} else {
fromAccount.debit(amount);
toAccount.credit(amount);
}
}
}
}

transferMoney在什么时候会发生死锁?看起来线程是按照相同的顺序来加锁的,但事实上,加锁的顺序取决于传入参数的顺序。
A: transferMoney(myAccount, yourAccount, 10);
B: transferMoney(yourAccount, myAccount, 20);

要解决上面的问题,则一定要定义锁的顺序。比较直观的,就是在要加锁的所有参数上做一个排序。可以使用System.identityHashCode方法,该方法返回Object.hashCode的返回值。

下边是一个经典的解决方法

通过合理顺序来避免死锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static final Object tieLock = new Object();

public void transferMoney(final Account fromAcct, final Account toAcct, DollarAmount amount) throws InsufficientFundsException {
class Helper {
public void transfer() throws InsufficientFundsException {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAcct.debit(amount);
toAcct.credit(amount);
}
}
}
int fromHash = System.identityHashCode(fromAcct);
int toHash = System.identityHashCode(toAcct);

if(fromHash < toHash) {
synchronized (fromAcct) {
synchronized(toAcct) {
new Helper().transfer();
}
}
} else if (fromHash > toHash) {
synchronized (toAcct) {
synchronized(fromAcct) {
new Helper().transfer();
}
}
} else {
synchronized(tieLock) { // 极少数情况下,两个对象可能有相同的HashCode,通过一个‘加时赛’锁,保证每次只有一个线程会处理这种情况,避免死锁
synchronized (fromAcct) {
synchronized(toAcct) {
new Helper().transfer();
}
}
}
}
}

协作对象之间发生死锁

有些获取多个锁的操作并不像之前的例子那么明显,两个锁可能不是在同一个方法中被获得的。在持有所锁的情况下调用外部的方法,需要警惕死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// DeadLock! Don't do this
class Taxi {
@GuardedBy("this") private Point location, destination;

public Taxi(Dispatcher dispather) {
this.dispather = dispather;
}

public synchronized Point getLocation(){
return this.location;
}

public synchronized void setLocation(Point location) {
this.location = location;
if(location.equals(destination))
dispathcher.notifyAvaiable(this);
}
}

class Dispatcher {
@GuardedBy("this") private final Set<Taxi> taxis;
@GuardedBy("this") private final Set<Taxi> availableTaxis;

public Dispatcher() {
taxis = new HashSet<Taxi>();
availableTaxis = new HashSet<Taxi>();
}

public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}

public synchronized Image getImage() {
Image image = new Image();
for(Taxi t: taxis)
image.drawMarker(t.getLocation());
return image;
}
}

尽管没有任何方法会显示的获取两个锁,但是setLocation和getImage等方法的调用者都会获得两个锁。但是他们的顺序是相反的。所有可能产生死锁。

如果在持有锁的情况下调用外部方法,那么有可能会出现活跃性问题。如果这个外部方法去获得其他的锁,或者阻塞时间过长,那么会导致其他的线程无法获得当前被持有的锁。

解决方法。调用这种依赖外部方法的函数时,使用开放调用。也就是不对整个方法使用synchronized,而是对需要涉及共享状态的操作加锁,来保证加锁的顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// DeadLock! Don't do this
class Taxi {
...
public synchronized Point getLocation(){
return this.location;
}

public void setLocation(Point location) {
boolean reachedLocation;
synchronized(this) {
this.location = location;
reachedLocation = location.equals(desitination);
}
if(reachedLocation)
dispathcher.notifyAvaiable(this);
}
}

class Dispatcher {
...

public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}

public synchronized Image getImage() {
Set<Taxi> copy;
sychronized(this) {
copy = new HashSet<Taxi>(taxis);
}
Image image = new Image();
for(Taxi t: taxis)
image.drawMarker(t.getLocation());
return image;
}
}

使用开放调用,更容易分析加锁顺序,找出死锁的可能性。

有时候,把原来的同步代码块用开放调用的形式重写后,会发生一些意想不到的结果。因为可能会失去一些原子性。多数情况可以接受失去原子性。而某些情况下,失去原子性会引发错误。这需要通过一些调用的协议,而非加锁来解决。

资源死锁

例如几个服务需要两个数据库连接来完成任务,而数据库连接池很小,会发生资源死锁。

避免死锁

如果一个程序在执行每个任务时只能获得一个锁,那么永远都不会产生因为锁顺序引起的死锁。当然这样并不现实。
如果必须获得多个锁,那么在设计时一定要考虑锁的顺序问题:减少潜在的加锁交互的数量,将获取锁时需要遵循的协议写入文档。
在使用细粒度的加锁时,首先找出什么地方会获得多个锁,然后进行全局分析,确保顺序一致。
尽可能的使用开放调用,这可以简化分析过程。

可以使用指定超时时限的锁,tryLock,超时后会返回一个失败信息。可以记录这个失败信息,然后进行分析,或者可以retry。
通过JVM的Thread Dump来帮助识别死锁的发生。Dump的信息包括运行中的各个线程的栈信息,还包括加锁信息,例如每个线程持有哪些锁,在那些被阻塞的线程正在等待哪个锁。

在Unix平台上出发Thread Dump,是向JVM进行发送SIGQUIT信号(kill-3),或者Ctrl-\按键。Windows下Ctrl-Break。

其他的活跃性问题

饥饿

最多的例子是因为优先级问题,有些任务永远得不到执行。
尽量不去使用线程的优先级。在大多数并发应用中,都可以使用默认的优先级。

响应性

CPU密集型的后台任务可能会对性能造成影响,因为它们会与事件线程/主线程共同竞争CPU时钟周期。

活锁

线程中出现了谦让,到这线程之前都无法完成任务,时间都消耗在不断的拿锁,放弃,拿锁,放弃。
类似于过独木桥,两个人在桥上遇到后堵住了对方,然后都退后,过相同的时间后双方有重试,周而复始,永远没有人能前进。

性能与可伸缩性

性能

想要通过并发来获得更好的性能,需要做好两件事情:

  1. 更有效的利用现有的计算与处理资源。
  2. 在添加了新的计算和处理资源后,尽可能的利用这些新的资源。

从性能监测的角度来看,CPU需要一直处于忙碌状态。如果程序无法使现有的CPU保持忙碌,那么再添加新的CPU也没用。

可伸缩性是指当计算资源增加时(CPU,内存,I/O带宽等),程序的吞吐量或者处理能力能够相应的增加。

对于服务器应用来说,“多少”这个方面————可伸缩性,吞吐量,生产量,往往要比“多快”这个方面更受重视。

性能权衡

在几乎所有的工程决策中都会涉及某些形式的权衡。在建设桥梁时,使用更粗的钢筋可以提高桥的负载能力和安全性,但是也会提高建造成本。在软件工程中谁绕路会涉及资金和人身安全,但是做出正确的权衡时也需要考虑很多方面的信息。
不要乱优化。首先要使程序正确,如果运行的还不够快,再去设法提高他的运行速度。
在任何与性能相关的决策时,都应该考虑以下问题:

  1. 怎么理解更快,可以有数字比较吗?
  2. 在什么环境和情况下这个方法可以运行的更快?能否有实验数据支持?
  3. 这些情况发生的频率有多高?
  4. 其他情况下不同环境中能否重用这些代码?
  5. 有没有其他隐形的代价?例如开发风险和维护风险?

要以测试为基准,不可以猜测。利用现有的成熟的工具,perbar等,可以监测CPU的忙碌程度。

Amdahl定律

有些任务并行可以提高效率,有些并不能。

Amdahl定律描述的是,再增加计算资源的情况下,程序在理论上能够实现的最高加速比。F是必须串行与所有任务的比值,N是处理器数量
Amdahl
当N区域无穷大时,最高的加速比时1/F,因此加入有50%的计算需要串行执行,那么最高的加速比是2。

下图可以看出,处理器的利用率在不同串行比例(0.25%,0.5%……)以及处理器数量下的变化曲线。可以明显的看到串行比例对CPU利用率的影响是很大的。
Amdahl-curv

Amdahl定律的作用,主要是量化当有更多资源时,可以加速的比率。更重要的时对出现的可伸缩性局限有清醒的认识。

线程引入的开销

上下文切换

在大多数通用的处理器中,上下文切换的开销相当于5000~10000个时钟周期,也就是几微妙。
UNIX系统的vmstat命令和Windows的perfmon工具都能够查看上下文切换次数以及在内核中执行时间所占比例等信息。如果内核占用率较高(超过10%),那么表示调度活动发生的很频繁,可能是由I/O或竞争锁导致的阻塞引起的。

内存同步

在synchronized和volatile提供的可见性保证中,会使用一些特殊指令,也就是内存栅栏Memory Barrier,来刷新缓存。内存栅栏时缓存无效刷新硬件的写缓冲。
所以内存栅栏可能会对性能带来间接的影响,因为它在抑制一些优化,包括编译器的优化,重排序。
现代的JVM能够通过优化来去掉一些不会发生竞争的锁,从而减少不必要的同步开销。

阻塞

竞争的同步可能需要操作系统接入,从而增加开销。在锁上发生竞争后,竞争失败的线程肯定会阻塞。JVM在实现阻塞行为时,可以采用自旋等待,或者通过操作系统挂起被阻塞的线程。大多数JVM在等待锁时都是将线程挂起。

减少锁的竞争

  1. 缩小锁的范围,快进快出,在访问完共享资源后尽快释放

  2. 减小锁的粒度,只锁自己需要的资源

  3. 锁分段,Lock Striping, 例如ConcurrentHashMap

  4. 避免热点域,例如每个操作都会修改一个变量,HashMap的Size方法

  5. 使用ReadWriteLock,原子变量来代替独占锁

  6. 避免使用对象池

并发程序测试

由于并发程序有一些不确定性,这和串行程序比,要增加

Java并发编程-结构化并发应用程序

发表于 2018-02-07

任务执行

大多数并发应用程序都是围绕任务执行来构造的。任务,在理想状态下,是一些离散的工作单元。通过把应用程序的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的并行结构来提升工作的并发性。

在线程中执行任务

当围绕任务执行来设计应用程序的结构时,第一步就是要找到清晰的任务边界。大多数服务器应用程序都提供一种自然的任务边界选择方式:以独立的客户请求为边界。Web服务器,邮件服务器,文件服务器,数据库服务器等。将独立的请求作为任务表边界,既可以实现任务的独立性,又可以实现合理的任务规模。

串行执行

Serialized Web Server
1
2
3
4
5
6
7
8
9
class SingleThreadWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while(true) {
Socket connection = socket.accept();
handleRequest(connection);
}
}
}

串行的处理请求,一般来说无法提供高吞吐率或快速的响应。服务器的资源利用率会非常低,多数时间都耗费在IO操作上。

显示的创建线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Don't do this way
class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
whilte(true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
}
new Thread(task).start();
}
}
}

不停创建线程的缺点

  1. 线程生命周期的开销非常高
  2. 资源消耗,大量的空闲线程会占用很多内存.
  3. 稳定性,OutOfMemoryError

    Executor框架

    1
    2
    3
    public interface Executor {
    void execute(Runnable command);
    }
    Executor提供了一种标准的方法把任务提交与任务执行解耦开来。Executor基于生产者消费者模式,提交任务的操作是生产者,执行人物的消费者。

    例,基于Executor的Web服务器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    class TaskExecutionWebServer {
    private static final int NTHREADS = 100;
    private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws IOException {
    ServerSocket socket = new ServerSocket(80);
    while(true) {
    final Socket connection = socket.accept();
    Runnable task = new Runnable() {
    public void run() {
    handleRequest(connection);
    }
    };
    exec.execute(task);
    }
    }
    }
    通过使用Executor,将请求处理任务的提交与任务的实际执行解耦。只要改变Executor的实现,便可改变任务的执行方式,策略和服务器的行为。

    执行策略

    不同的执行策略,是不同的资源管理方式,最好的策略要根据当时可用的计算资源以及对服务质量的需求而定。
    通过限制并发线程数量,可以避免程序因为过多的请求而耗尽服务器资源,最终导致失败。或者由于资源短缺,发生竞争,而导致性能问题。
    把任务提交,与执行策略分离开来,有助于在部署时选择与可用资源最匹配的策略,发挥最大效能。

    线程池

    线程池,管理一组同类型工作线程的资源池。线程池与工作队列密切相关。工作队列保存了所有等待执行的任务。

java.util.concurrent.Executors中提供了常用的静态工厂方法,来创建线程池。

1
2
3
4
5
6
7
8
9
10
11
newFixedThreadPool(int nThreads)
Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.

newCachedThreadPool()
Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.

newSingleThreadExecutor()
Creates an Executor that uses a single worker thread operating off an unbounded queue.

newScheduledThreadPool(int corePoolSize)
Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.

其中前三个方法都是对java.util.concurrent.ThreadPoolExecutor这个类的包装使用,而ThreadPoolExecutor实现了ExecutorService接口。

Executor生命周期

Executor的生命周期有三种状态:运行,关闭,已终止。

Life cycle management API of ExecutorService
1
2
3
4
5
6
7
8
public interface ExecutorService entends Executor {
void shutdown(); // Stop accept new request
List<Runnable> shutdownNow(); // Stop accept, and immediately stop all the executing threads
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // wait until it is terminated.
// ...
}

延迟任务与周期任务

通过使用ScheduledThreadPoolExecutor来执行延迟任务或者周期任务。

可利用并行性

大多数服务器应用程序,明显的任务边界是每个客户的请求。在单个客户请求中仍可以发掘并发的可能性。

例,串行页面渲染器

1
2
3
4
5
6
7
8
9
10
public class SingleThreadRenderer {
void renderPage(CharSequence source) {
renderText(source);
List<ImageData> imageData = new ArrayList<ImageData>();
for (ImageInfo imageInfo: scanForImageInfo(source))
imageData.add(imageInfo.downloadImage()); // During this loop, the CPU is almost in idle state due to I/O, not utilize the resources fully.
for (ImageData data: imageData)
renderImage(data);
}
}

有结果的任务Callable与Future

Runnable的最大局限在于,他不能返回一个值,或者抛出受检查的异常。
Callable是一种更好的抽象。
Future表示一个任务的生命周期,可以判断任务是否完成或者已经取消。
ExecutorService中的submit方法会返回一个Future。或者可以显示的为某个指定的Runnable或者Callable实例化一个FutureTask,用newTaskFor方法或者其构造函数。

1
2
3
4
5
6
7
8
9
10
11
public interface Callable<V> {
V call() throws Exception;
}

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCanceleed();
boolean isDone();
V get() throws InterruptedException, ExecutionException, CancellationException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException;
}

提交Runnable或者Callable时,就是把Runnable从任务提交的线程,安全的发布到最终执行任务的线程,等待执行。
同样的在给Future的结果赋值时,也是把结果从计算线程,安全的发布到调用get方法的线程中。

例,使用Future实现页面渲染器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class FutureRenderer {
private final ExecutorService executor = ...;

void renderPage(CharSequence source) {
final List<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>> task = new Callable<List<ImageData>>() {
public List<ImageData> call() {
List<ImageData> result = new ArrayList<ImageData>();
for(ImageInfo imageInfo: imageinfos)
result.add(imageInfo.downloadImage());
return result;
}
}

Future<List<ImageData>> future = executor.submit(task);
renderText(source);

try {
List<ImageData> imageData = future.get();
for(ImageData data: imageData)
renderImage(data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
future.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}

上面的代码把渲染文字与下载图像分开,稍微提升了一点性能。但是问题在于,必须要等所有图像都下载完毕,Future才能完成,才可以渲染图像。
我们可以使用CompletionService来更好地利用并行资源。CompletionService把Executor和BlockingQueue的功能融合,可以通过submit来提交Callable,也可以使用take和poll方法,来获得已经完成的任务结果。
ExecutorCompletionService的实现也很简单。他需要一个BlockingQueue来保存结果。当提交任务时,我们把任务包装成一个叫QueueingFuture的类,这个类继承FutureTask,并且改写protected的方法done,在这方法中加入把结果放入BlockingQueue的逻辑。当一个计算任务结束时,就调用FutureTask的done方法。

1
2
3
4
5
6
7
8
private class QueueingFuture<V> extends FutureTask<V> {
QueueingFuture(Callable<V> c) { super(c); }
QueueingFuture(Runnable t, V r) { super(t, r); }

protected void done() {
completionQueue.add(this);
}
}

使用CompletionService的页面渲染器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Renderer {
private final ExecutorService executor;

Renderer(ExecutorService executor) { this.executor = executor; }

void renderPage(CharSequence source) {
List<ImageInfo> info = scanForImageImage(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor);
for(final ImageInfo imageInfo: info)
completionService.submit(new Callable<ImageData>() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
renderText(source);

try{
for(int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take();
ImageData data = f.get();
renderImage(data);
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
} catch(ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}

取消与关闭

把任务和线程安全、快速、可靠的停止下来,不是一件容易的事情。
中断是一种协作机制,可以使一个线程种植另一个线程的当前工作。这种协作模式的好处在于,当需要停止时,它们会首先清除当前正在执行的工作,然后在结束。

任务取消

取消一个任务的原因有,
用户请求取消,例如用户点击了取消按钮;
有时间限制的操作,例如超时;
应用程序事件,例如算法中可能并行的去计算多个空间,当其中一个得出结果时,可以取消其他的分支;
错误,例如写入文件是空间不足会报错;
关闭,当程序要关闭时,需要处理正在运行和在等待的任务。

Java中没有一种安全的抢占式的停止方式来终止一个任务线程,而协作模式是更好的一种方式。

一个可取消的任务必须有它的取消策略。这个策略中详细的定义了其他代码如何取消该任务,任务在何时检查是否已经请求了取消,以及响应取消时会执行哪些操作。

一个原始的办法时任务在执行过程中不断的检查一个自定义变量,在被设置后退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@ThreadSafe
public class PrimeGeneraor implements Runnable {
@GuardedBy("this")
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;

public void run() {
BigInteger p = BigIntger.ONE;
while(!cancelled) {
p = p.nextProbalePrime();
synchronized(this) {
primes.add(p);
}
}
}

public void cancel() { cancelled = true; }

public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
}

中断

使用检查自定义变量的方法的问题在于,当执行一个阻塞方法时,例如阻塞队列put方法调用时队列满,就不会再有机会检查到这个标志位。

每个线程都有一个boolean类型的中断状态。当线程中断时,这个线程的中断状态将被设置为true。

1
2
3
4
5
public class Thread {
public void interrupt() { ... } // 中断目标线程
public boolean isInterrupted() { ... } // 返回目标线程是否中断
public static boolean interrupted() { ... } // 清除当前线程的中断状态
}

对中断操作的正确理解是,它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由下一个适合的时刻中断自己。
使用静态的interrupted时应该小心,因为它会清除当前线程的中断状态。如果调用interrupted时返回了true,那么除非你想屏蔽这个中断,否则一定要处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;

PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}

public void run() {
try {
BigInteger p = BigInteger.ONE;
while(!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch(InterrunptedException consumed) {
// exit...
}
}

public void cancel() { interrupt(); }
}

中断策略

中断策略规定线程如何解释某个中断请求————当发现中断请求时,应该做哪些工作,那些工作单元对于终端来说是原子操作,以及以多块的速度来响应中断。
最合理的中断策略是某种形式的线程级取消操作或者服务级取消操作:尽快推出,在必要时清理,通知某个所有者该线程已经退出。
由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

响应中断

当调用可中断的阻塞函数时,有两种使用的策略处理中断异常:

  1. 传递异常
  2. 恢复中断状态

如果不想传递或无法传递InterruptedException,那么需要寻找另一种方式来保存中断请求。一个方法就是通过再次调用interrupt来恢复中断状态。
只有实现了线程中断策略的代码才可以屏蔽中断请求。常规的任务和库函数代码中都不应屏蔽中断请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while(true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// 如果在这里恢复中断状态,会引起无限循环
}
}
} finally {
if(interrupted)
Thread.currentThread().interrupt(); // 在返回前恢复中断状态
}
}

例,计时运行

下面的代码,是一个任意的Runnable在指定时间内运行完成的例子。在调用这个timedRun方法的线程中运行这个任务,并安排了一个取消操作。

scheduling an Interrupt on a Borrowed Thread
1
2
3
4
5
6
7
8
9
10
11
private static final ScheduledExecutorService cancelExec = ...;

public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
final Thread taskThread = Thread.currentThread();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
r.run();
}

这个方法非常简单,但是破坏了规则,就是在中断线程前,应该了解他的中断策略。因为timedRun可以从任意一个线程中调用,它无法知道这个调用线程的中断策略。如果run方法在超时限制前完成,那么线程中断操作是什么代码我们就不得而知了。反过来,如果任务不响应中断,那么timedRun会在任务结束时才返回。会给调用者带来负面影响。

通过Future实现取消

将任务提交交给一个ExecutorService,并通过一个定时的get方法获得结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void timedRun(Runnable r, long timeout, Timeunit unit) throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// 接下来任务将被取消
} catch (ExecutionException e) {
// 如果在任务中抛出了异常,那么重新抛出该异常
throw launderThrowable(e.getCause());
} finally {
// 如果任务已经结束,那么执行取消操作也没有任何负面影响
// 如果任务正在执行,那么会被中断
task.cancel(true);
}
}

处理不可中断的阻塞

并非所有的可阻塞方法或者阻塞机制都能响应中断。如果一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞,那么中断请求只能设置现成的中断状态,除此之外没有其他任何作用。
对于由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些县城,但要求我们必须知道线程阻塞的原因。

  1. 同步的Socket I/O。虽然InputStream和OutputStream中的read,write都不会响应中断,但是通过关闭底层的Socket,可以使线程抛出一个SocketException。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class ReaderThread extends Thread {
    private final Socket socket;
    ...

    public void interrupt() {
    try {
    socket.close();
    }
    catch (IOException ignored) {}
    finally {
    super.interrupt();
    }
    }
    }
  2. Java.io 中的同步I/O。Channel会派出AsynchronousCloseException。
  3. 异步的Socket I/O,也是调用close或者wakeup方法。
  4. 获取某个锁。无法响应。但是通过Lock类中提供的lockInterruptibly方法,运行在等待一个锁时仍能响应中断。

停止基于线程的服务

包括了LogWrite,LogService,关闭ExecutorService,Poison Pill,等例子。

处理非正常的线程终止

任何代码都可能抛出一个RuntimeException。每当调用另一个方法时,都要对他的行为保持怀疑。

典型的线程池Worker线程结构
1
2
3
4
5
6
7
8
9
10
11
public void run() {
Throwable thrown = null;
try {
while(!isInterrupted())
runTask(getTaskFromQueue());
} catch (Throwable e) {
thrown = e;
} finally {
threadExited(this, thrown);
}
}

未捕获异常处理,UncaughtExceptionHandler。

JVM关闭

Shutdown Hook

Daemon Thread的使用场景与局限

避免使用finalize

线程池的使用

在任务与执行策略之间的隐耦合

包括

  1. 依赖性任务。两个并行的任务之间有依赖。
  2. 使用线程封闭机制的任务。
  3. 对时间响应铭感的任务,GUI程序。
  4. 使用ThreadLocal的任务。线程池中不该使用。

线程池大小

根据CPU个数,目标使用率等参数决定。
ThreadPoolExecutor的设计。

GUI界面的程序

为什么GUI是单线程的

因为简单。对线程的GUI系统容易死锁。

当前的GUI框架使用了专门的事件分发线程(Event Dispath Thread, EDT)来处理GUI事件。也就是主线程。

GUI应用程序需要处理一些细粒度的时间,例如点击鼠标、按下键盘或定时器超时等。因为只有单个线程来处理所有的GUI任务,因此会采用串行处理的方式。串行任务处理的不利之处在于,如果某个任务执行的时间很长,那么其他任务必须等到该任务执行结束。
在事件线程中执行的任务必须尽快的把控制权交还给事件线程。

短时间的GIU任务和长时间的GUI任务,处理有区别。

Java并发編程实践读书笔记1

发表于 2017-12-28

并发程序

“编写正确的程序很难,而编写正确的并发程序则难上加难。”这句话是《Java并发编程实践》里的第一句话。我很喜欢。

线程是Java语言的重要功能。它将复杂的异步代码变的更容易。

并发的好处

  1. 资源利用率, 在IO等待时去做其他的任务来提高资源利用率。
  2. 公平。时间片,让不同的用户和程序公平的使用计算资源,而不是一个程序从头跑到尾。
  3. 便利性。在同时完成多个任务时,编写多线程能相比于写一个单一线程的程序要简单。

线程带来的问题

  1. 安全性。 不正确的结果。
  2. 活跃性。死锁与饥饿,活锁
  3. 性能。响应不及时,频繁的上下文切换开销

多线程的应用广泛

  • JVM本身就是多线程的,主线程main,后台线程垃圾回收,终结操作等。
  • Servlet, RMI
  • Swing,AWT

线程的安全性

线程安全性的定义。核心就是正确性。

  • 核心是正确性
  • 线程安全类

原子性

读取 修改 写入的序列。依赖于之前的状态。
  • Race condition
  • 延迟初始化的例子
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // Don't do this
    @NotThreadSafe
    public class LazyInitRace {
    private ExpensiveObject instance = null;

    public ExpensiveObject getInstance() {
    if( instance == null)
    instance = new ExpensiveObject();
    return instance;
    }
    }
  • 复合操作/原子类

    加锁机制

    在单个原子操作中更新所有相关变量
  • 内置锁(Intrisic Lock)。Synchronized
  • 重入。获取锁的操作粒度是线程而不是调用
  • 用锁来保护状态

    活跃性与性能

  • 同步块的代码大小分割。分得过细,可能带来开销问题。
  • 分的过大 锁持有时间长。活跃性不好。

对象的共享

编写正确的并发程序,关键在于访问共享的可变状态时,要进行正确的管理。Synchronized关键字除了实现原子性,还有内存可见性。

可见性

没有同步的情况下 编译器 处理器 运行时 都可能重排序。

  • 失效数据
  • 非原子的64位操作
  • 加锁与可见性
    Image of visibilty and lock
    加锁的含义不仅仅局限于护持行为,还包括内存可见性。
  • volatile变量
    volatile变量对可见性的影响比volatile变量本身更为重要。

发布、逸出(Escape)

发布是指对象能在当前作用于之外的代码中访问使用。

1
2
3
4
5
6
7
// Don't do this
class UnsafeStates {
private String[] states = new String[] {
"AK", "AL", ...
};
public String[] getStates() { return states; }
}

如果按照上边的代码发布states,就会出现问题,因为任何调用者都可以修改这个数组的内容。

  • 定义。某个不该发布的对象被发布。
  • 对象逸出后必须假定外部会误用该对象。
  • 封装能够简化程序的正确性分析。
  • this隐式逸出,引用一个未构造完的对象。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // Don't do this
    public class ThisEscape {
    public ThisEscape(EventSource source) {
    source.registerLisenter(
    new EventListener() {
    public void onEvent(Event e) {
    doSomething(e); // source may use 'this' before constructor finished.
    }
    }
    )
    }
    }
  • 用私有构造函数+工厂方法来避免逸出。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class SafeListender {
    private final EventListener listerner;

    private SafeListener() {
    listener = new EventListener() {
    public void onEvent(Event e) {
    doSomething(e);
    }
    }
    };

    public static SafeListener newInstance(EventSource source) {
    SafeListener safe = new SafeListener();
    source.registerListener(safe.listener);
    return safe;
    }
    }

线程封闭

  • adhoc线程封闭。维护线程的封闭性完全由程序去实现。非常脆弱。
  • 栈封闭。有点functional的感觉 线程之间独立。
  • ThreadLocal类。为每个使用该变量的线程都存储一个独立的副本。

不变性(Immutable)

不变性的条件

  • 对象创建后其状态就不能修改
  • 对象所有的域都是final类型
  • 对象是正确创建的。this没有在构造期间逸出
  • final域
  • 用volatile类型来发布不可变对象

安全发布,针对可变对象

  • 不正确的发布,会使正确的对象被破坏。
    1
    2
    3
    4
    5
    6
    7
    8
    public class Holder {
    private int n;
    public Holder(int n){ this.n = n; }
    public void assertSanity(){
    if(n != n)
    throw new AssertionError("This statement is false.");
    }
    }
    如果上边的Holder类没有被正确发布,那么另一个线程调用assertSanity就可能会抛出AssertionError。问题不在Holder本身,而在于Holder没有被正确的发布。如果n是final类型,那么没有问题。
    Object的构造函数会在Holder构造函数运行前先将默认值写入n,也就是0.因此默认值可能被视为失效的。
  • 任何线程都可以在不需要额外同步的情况下安全的访问不可变对象。
  • 安全发布的常用模式。
    (首先对象构造要正确,也就是this没有逸出。)
    在静态出书画函数中初始化对象的引用
    将对象引用保存到volatile类型的域
    将对象引用保存到某个正确构造对象的final域
    将对象引用保存到一个由锁保护的域中
  • 事实不可变对象
  • 可变对象 需要同步或者线程安全的方式来发布以及修改可变对象
  • 安全的共享对象

对象组合

如何设计线程安全的类?如何判断一个类是否是线程安全的?

  1. 分析构成对象的状态的所有变量;分为基本类型和引用类型,被引用对象的域。例如LinkedList的状态就包括该链表中所有节点的对象的状态。
  2. 约束状态的不变条件;例如最大值最小值等独立限制。
  3. 可变状态的并发访问管理策略。也就是同步策略,对其状态的访问操作进行协同。

设计线程安全类的步骤

  1. 收集同步需求
    final类型的域使用的越多,就能简化对象可能状态的分析过程。
    • 首先确保不可变条件不会破坏。例如一个计数器一定是正数。
    • 后验条件。例如Counter的当前状态是17,那么下一个有效状态只能是18。
    • 状态转换的约束,需要复合操作。只适用于下一个状态需要依赖当前状态的情况。
  2. 依赖状态的操作
    • 某个操作中包含有基于状态的先验条件。例如删除元素前,队列必须是非空。
    • 简单的方法是使用现有的类库,例如阻塞队列,信号量等。
  3. 状态的所有权
    • 所有权和封装性相关联。对象封装他拥有的状态,即对他封装的状态有所有权。封装的好,则分析起来简单。
    • 如果发布了某个可变状态的引用,则不再拥有独占的控制权。
    • 容器类通常表现出所有权分离的形式。

实例封闭

针对非线程安全的对象,要使用别的招数来让它安全。

  1. 当一个对象被封装在一个类里,那么能够访问这个对象的所有代码都是已知的。更易于分析。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @ThreadSafe
    public class PersonSet {
    @GuardedBy("this")
    private final Set<Person> mySet = new HashSet<Person>();

    public synchronized void addPerson(Person p){
    mySet.add(p);
    }

    public synchronized boolean containsPerson(Person p) {
    return mySet.contains(p);
    }
    }
  2. 实例封闭是构建线程安全类的一个最简单方式。
  3. Java类库中线程封闭实例。Collections.synchronizedList。

线程安全性的委托

大多数对象都是组合对象。如果类中的各个组件都已经是线程安全的,是否需要再额外的加一个线程安全层要视情况而定。
不可变类一定时线程安全的。
CopyOnWriteArrayList是一个线程安全的链表。

在现有的线程安全类中添加功能

通过继承扩展方法比直接将代码添加到类中更脆弱,因为现在的同步策略实现被分不到多个单独维护的源代码文件中。

客户端加锁

组合

将同步策略文档化

客户以及维护人员可以了解线程安全方面的策略。

基础构建模块

同步容器类

同步容器类的问题

同步容器类包括Vector和Hashtable… 这些类实现线程安全的方法是将它的状态封装起来,并且对public方法同步,所以每次都只有一个线程能访问。

问题在于复合操作,包括迭代,跳转,条件计算(如putIfAbsent)。我们在自定义符合操作时,要知道要在哪个对象上加锁。

迭代器与ConcurrentModificationException

要想避免出现ConcurrentModificationException,就必须在迭代过程持有容器的锁

隐藏迭代器

例子:隐式调用集合类Set的toString方法会出发调用迭代器

并发容器

通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。

ConcurrentHashMap

粒度更细的加锁机制,称为分段锁(Lock Striping)。可以在并发环境下实现更高的吞吐量。

ConcurrentHashMap返回的迭代器如有弱一致性,意思就是容忍并发的修改。size返回的结果是一个估计值。

size与isEmpty的弱一致性换得了其他更重要操作的性能。

额外的原子Map操作
1
2
3
4
5
6
7
8
9
public interface ConcurrentHashMap<K, V> extends Map<K, V> {
V putIfAbsent(K key, V value);

boolean remove(K key, V value);

boolean replace(K key, V oldValue, V newValue);

V replace(K key, V newValue);
}

CopyOnWriteArrayList

每次修改时都会创建并重新发布一个新的容器的副本。

仅当迭代操作远远多于修改操作时,才应该使用CopyOnWrite容器。适用于事件通知系统,因为多数情况下,注册和撤销注册listener的操作要远远少于接受事件通知的操作。

阻塞队列和生产者-消费者模式

生产者消费者·模式能简化开发过程,因为它消除了生产者和消费者类之间的代码依赖性。

BlockingQueue简化了生产者消费者设计的实现过程,它支持任意数量的生产者消费者。一种常见的例子就是线程池和工作队列的组合,Executor任务执行框架中就体现了这种模式。阻塞特点,让编码更为简单。

在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具,他们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下更加健壮。

BlockingQueue的实现包括LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue.

SynchronousQueue是一个特殊的实现,实际上它不是一个真正的队列,因为它不会为队列中的元素维护空间。它维护一组线程,这些线程在等待着把元素加入或移除队列。由于是线程直接交付数据,put和take会一直阻塞。

阻塞方法与中断方法

阻塞方法的执行线程在被阻塞后,必须等待某个不受他控制的事件发生后才能继续执行。
BlockingQueue的put和take方法会抛出Checked Exception,InterruptedException,这与类库中其他的一些方法做法相同,例如Thread.sleep。当某个方法抛出InterruptedException时,表示该方法是一个阻塞方法,如果这个方法被中断,那么他将努力提前结束阻塞状态。

当一个方法调用了阻塞方法时,他本身也变成了阻塞方法,并且必须要处理中断响应。一般有两种处理方式,一是传递,二是恢复中断。

同步工具类

闭锁

CountDownLatch

FutureTask

信号量 Semaphore

栅栏 CyclicBarrier

构建高效且伸缩的结果缓存

Javascript异步编程2

发表于 2017-12-03

Generators

Generator 介绍

我们可能都不自觉的有一个认识,那就是JavaScript的函数有一个特点,run to completion。意思是说在一个时间点,只会有一个函数在运行。当这个函数运行结束前,没有任何一个外部函数可以从外部抢占执行的权利。

Generator是在ES6引入的。Generator函数可以在执行的过程中让出执行的机会,自己暂停下来。在需要的时候,外部主动调用Generator的next方法继续执行剩下的部分。
可以参考阮一峰博客。这里先给一个简单的代码示例。

1
2
3
4
5
6
7
8
9
function* gen(){
console.log("hello");
yield;
console.log("world");
}

var it = gen();
it.next(); // hello
it.next(); // world

创建一个Generator需要用function* 的关键字,以区别于普通的function定义。还有一个新的关键字是yield。它的功能就像我们的播放器上的暂停按钮一样,暂停了Generator函数的执行。注意,Generator函数暂停的点是由函数定义的时候制定好的。外部无法主动地去设置Generator停在哪一行。我们把这个叫Coorperate concurrency,而不是Preemptive concurrency。Preemptive意思就是抢占式的。在调用gen()时,只是创建一个generator对象,函数并没有开始执行。调用Generator产生一个iterator,iterator就是用来遍历数据的。我们在第8行调用next函数,这个Generator由暂停状态变为执行状态,等到运行到yield的时候,这个Generator再次变为暂停状态。第9行再次调用next,函数继续执行。

Generator 消息

1
2
3
4
5
6
7
8
9
10
11
12
13
function *main() {
yield 1;
yield 2;
yield 3;
}

var it = main();

it.next(); // { value: 1, done: false}
it.next(); // { value: 2, done: false}
it.next(); // { value: 3, done: false}

it.next(); // { value: undifined, done: true}

我们之前的例子yield没有返回任何值,那么他返回的就是undifined。上边这个例子中我们yield反回了1,2,3,可以看出,
yield 1时,也就是我们的第9行执行的时候,返回了value为1,done为false的object。yield 2时,执行第10行,返回了value为2,done为false的object。
yield 3时,执行第11行,返回了value为2,看起来我们的Generator没有其他可以再执行的了,但是这个Generator还没有还行完毕,所以我们的done还是false。
当执行最后一次next时,我们才得到done为true,value是undefined。那么undefined是哪里来的呢?这是因为所以得JavaScript函数如果没有return语句的话,那么返回值就是undefined。如果我们的Generator最后return返回了一个值,例如42,那么这里的值就是value:42, done: true。
ES6引入了for of循环,来遍历任何一个iterator。所以我们可以用一个循环来执行Generator,直到到结束。

以下内容可能产生不适。。因为如果从来没有用过generator的话会很不习惯。我们先定义一个generator的执行器。

1
2
3
4
5
6
function coroutine(g){
var it = g();
return function(){
return it.next.apply(it, arguments);
}
}

coroutine函数其实就是一个generator的wrapper,第2行我们把传入的generator直接初始化,每当我们的调用由coroutine返回的函数时,其实就是调用generator的next函数。

下面看这个代码

1
2
3
4
5
6
7
8
9
var run =  coroutine(function*(){
var x = 1 + (yield);
var y = 1 + (yield);
yield (x + y);
});

run();
run(10);
console.log("Meaning of life: " + run(30).value);

我们调用coroutine来包装起来我们的generator函数,返回一个run方法,初始化这个generator。每一次调用run都是在调用next方法。

第7行调用run,开始执行generator,运行第2行,执行语句var x = 1 +, 然后我们遇到了yield。generator要暂停,把执行权返回到第8行。generator只有等待外部的输入值时才可以继续计算完这个表达式。第7行,我们继续调用run,也就是next,传入10,完成第一个表达式的计算的到x=11;generator继续执行var y = 1 +,我们遇到yield,让出执行权到第8行。run(30), 将30传入generator,完成y的计算y=31,generator继续执行,yield(x+y),generator暂停,把42作为value传出去。

从generator外部看,也就是第7行开始,我们每运行一次next,就会暂停下来,把值传入到generator,让generator继续运行到下一个yield的地方。
从generator内部看,每一次yield,其实是generator缺少一个依赖的值,不能继续计算,而暂停下来。只有等待外部的传入的值和继续执行的信号,才能继续。
每一个yield,可以理解为,我这个地方需要一个value,我会一直等待,直到有人在外边给我传入一个值。

异步Generator

我们改造一下上边的例子,做一个异步的Generator

1
2
3
4
5
6
7
8
9
10
11
12
function getData(d){
setTimeout(function(){ run(d); }, 1000);
}

var run = coroutine(function*(){
var x = 1 + (yield getData(10));
var y = 1 + (yield getData(30));
var answer = (yield getData("Meaning of life: " + (x + y)));
console.log(answer);
});

run();

先看getData方法,我们注册一个timeout,从而异步的在第2行调用run方法,让generator继续执行。
我们在generator里,yield了一个表达式,也就是getData。在执行var x = 1 + (yield getData(10))时,generator暂停,把执行权交给外部,同时getData开始运行。当getData中的回调函数运行,执行run,传入10,这样让generator继续完成表达式的计算,x=11。继续执行下一行var y = 1 + (yield getData(30)),遇到yield暂停,并且执行getData(30)。同样等timeout的回调执行时,把30传回generator,继续完成y的计算,y=41;同理下一行计算answer,yield暂停,等timeout回来再继续完成计算。最后console输出。

在上边这个例子的generator中我们的代码看上去是同步的,背后的getData是异步的。这样的代码结构跟我们的思维方式是一样的,代码可读性增强了很多,比之前的promise还要好。

Promise and Generator

我们之前的代码中还是有一些缺点。就是我们的run方法被放在了callback中执行,这就是inversion of control。程序继续的调用的控制不够清晰。
而且我们之前的代码中yield都是undifined。所以我们考虑能不能yield一个promise,也就是让getData返回一个promise,等这个promise完成时,也就是getData完成时,调用Generator的next方法继续执行。等到下一个语句再yield另一个promise,等promise完成继续调用next,如此下去,等到next返回done为true时,也就是我们的generator执行完毕的时候。
后边的流程很像递归。也是一个跟业务逻辑无关的流程控制器。外边的lib有很多开源实现,就是一个返回promise的generator的执行器。下边的代码是我自己随意实现了的一个执行器,里边没有包含出错处理这方面的代码,只是为了给出一个大概的样子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
function getData(d){
return new Promise(function(resolve, reject){
setTimeout(function(){ resolve(d); }, 1000);
})
}

function* gen(){
var x = 1 + (yield getData(10));
var y = 1 + (yield getData(30));
var answer = (yield getData("Meaning of life: " + (x + y)));
console.log(answer);
}

var it = gen();
function handleNext(value){
var next = it.next(value);
if( next.done ){
return Promise.resolve(next.value);
} else {
return next.value.then(handleNext);
}
}

handleNext();

我们把这个通用的执行器runner方法提出来以后也可以用
1
2
3
4
5
6
7
8
9
10
11
12
13
function runner(gen){
var it = gen();
function handleNext(value){
var next = it.next(value);
if( next.done ){
return Promise.resolve(next.value);
} else {
return next.value.then(handleNext);
}
}

handleNext();
}

Generator练习

还是之前的情景,我们来思考一下用Generator是怎么解决的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
function fakeAjax(url,cb) {
var fake_responses = {
"file1": "The first text",
"file2": "The middle text",
"file3": "The last text"
};
var randomDelay = (Math.round(Math.random() * 1E4) % 8000) + 1000;

console.log("Requesting: " + url);

setTimeout(function(){
cb(fake_responses[url]);
},randomDelay);
}

function output(text) {
console.log(text);
}

// **************************************

function getFile(file) {
return new Promise(function(resolve){
fakeAjax(file,resolve);
});
}

// Request all files at once in
// "parallel" via `getFile(..)`.
//
// Render as each one finishes,
// but only once previous rendering
// is done.

// ???

function* gen(){
var p1 = getFile("file1");
var p2 = getFile("file2");
var p3 = getFile("file3");
output(yield p1);
output(yield p2);
output(yield p3);
output('completed');
}
runner(gen);

//use map
function* gen(){
var promises = ["file1", "file2", "file3"].map(function(fname){
return getFile(fname);
});
for(p of promises){
output((yield p));
}
output('completed');
}
runner(gen);

有了之前的例子,这里的的示例代码更容易写出来,其实就是利用Promise和Generator,我们的解也分为用普通的流程和用list的map和循环来写。
可以看出来这样的代码更为清晰易懂,比之前用promise解决可读性更胜一筹。

Observable

Events and promises

前端的代码中的并发,异步,我们之前也说过,核心问题是流程控制的管理。比如之前说过的promise,处理的是单个请求和单个回复的情况。
如果我们要处理一个消息流,也就是连续不断地同类event,该如何?Promise还能胜任吗?
而且我们在编程中大部分要出里的异步问题,是面向event stream的。在UI中的,用户的操作,比如点击;服务器发来的数据处理,都是面向event的。在这种情况下我们把promise套如,可能会有些问题。
举个例子,我的页面上可能有一些处理用户点击的代码。我想记录一些btn点击的log。我知道promise怎么创建,套用到这个情景中应该是这样的,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var p1 = new Promise(function(resolve, rejct){
$('#btn').click(function(evt){
var className = evt.target.className;
if(/foobar/.test(className)){
resolve(className);
}else{
reject();
}
});
});

p1.then(function(className){
console.log(className);
});

可是上面的代码有一个问题,就是promise只能resolve一次。我创建了一个promise,但是只能响应一次用户点击。我们之前介绍了那么多工具,promise,generator的高级工具,可是遇到这个最最常见的情景,好像不是很好使。我们该怎么办?

可能你会这么想,我反一下,把promise的创建搬到event的listener里边,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
$("#btn").click(function(evt){
var className = evt.target.className;
var p1 = new Promise(function(resolve,reject){
if(/foobar/.test(className)){
resolve(className);
}else{
reject();
}
});
p1.then(function(className){
console.log(className);
})
})

我们每次在click的callback中都创建一个新的promise,这样就可以每次都resolve了。这样看起来挺不错,可是,我们为什么要这样做?这与我们一开始的callback hell不是一样了吗?我们立即resolve了一个promise,并且调用了then。我们之前的代码还可以在一个地方设置好我的消息源,而在我程序的另外一个地方处理这个消息。现在全部都回到了一起,我们把两个不同的任务混在了一个地方。

其实问题在于我们的promise并不是特别适合于一个面向event的环境。我们需要更好地工具。我们需要把两个任务分开,也就是消息源的设置,以及消息的处理。

Observables

现在的Obserable还不是JavaScript原生支持的。也就是说ES6中没有这个东西,我们都需要用第三方的lib。但未来Observable很可能会成为JavaScript的一部分。现在外部有一个很好的Observable的lib,也就是Rxjs。

Concept

在Excel中,我们知道有计算单元格这个东西。就是说有些单元格的数据是通过计算其他源单元格的数据得到的。例如求和,求平均值,甚至更复杂的计算。当源数据发生改变的时候,这些计算单元数据也会相应的变化。假如我们的源数据被多个计算单元格依赖,那么就会触发所有这些计算单元格的更新。源数据单元的变化可能有好多次,变化的事件就像流一样。而依赖这个数据源的计算单元格都需要订阅这个变化,并且执行相应的步骤来更新自己的数据。

Observable或者Reactive Programming(响应式编程)就是类似的概念。Observable是一个事件的转换器,它连接在事件的源上边,每一次有一个新的事件发生,它就产生一个新的promise。重点在于,我可以在一个地方设置好这个转换器,而在程序的另一个地方甚至多个不同的地方,再定义如何处理这个事件流。我们不用和之前举得反例一样把事件的订阅初始化代码和处理代码放在同一个地方。

Rxjs例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var obsv = Rx.Observable.fromEvent(btn, "click");

obsv
.map(function mapper(evt){
return evt.target.className;
})
.filter(function filterer(className){
return /foobar/.test(className);
})
.distinctUntilChanged()
.subscribe(function(data){
var className = data[1];
console.log(className);
});

例子中的第一行我们创建了一个Observable,使用了Rx.Observable.fromEvent这个常用的工具。它可以把一个dom元素上的事件变成一个Observable。Rxjs还有很多强大的工具可以使用。
在程序的其他地方,我可以定义我想如何处理这个事件流。这里的map和filter等等,都是Observable的工具,用来对这个事件流进行一些转换和处理,也是十分易于理解使用。最后调用subscribe,来操作转换好的事件。我们就好像定义好了一系列的步骤来处理这个流过来的事件一样。我们把Observable甚至可以当做数组一样对待,数组其实也是一个数据流。

这里比较有意思的一个工具是distinctUntilChanged,这个是说加入现在连续来了5个event,都是hello,那么只有第一个可以进入下一步,其他4个都被过滤,因为重复了。然后来了连续的5个world,那么只有第一个world可以流入下一步。这是distinct的意思。那么接着,又来了5个hello,那么还是只有第一个hello可以通过。这是untilchanged的意思。

RxMarbles有一个很好的这些工具的图示化介绍。

Rxjs常用工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// From an event
Rx.Observable.fromEvent(document.querySelector('button'), 'click');

// From array of values
Rx.Observable.from([1,2,3]);

// From one or multiple values
Rx.Observable.of('foo', 'bar')

// Externally produce new events.
var myObservable = new Rx.Subject();
myObservable.subscribe(value => console.log(value));
myObservable.next('foo');

// Merge Observables
// Creates an output Observable which concurrently emits all values from every given input Observable.
var observable1 = Rx.Observable.interval(1000);
var observable2 = Rx.Observable.interval(400);

var merged = Rx.Observable.merge(observable1, observable2)

// Zip
// Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.
let age$ = Observable.of(27, 25, 29);
let name$ = Observable.of('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of(true, true, false);

Observable
.zip(age$,
name$,
isDev$,
(age, name, isDev) => ({ age, name, isDev }))
.subscribe(x => console.log(x));

Observable练习

这次是一个新的的练习。需求是对页面上的btn点击行为进行采样。注意不是debounce,同一个btn在短时间内点击多次,例如每一秒内,我只响应一次点击。
我在下面的codepen中,已经引用了jQeury和Rxjs。有兴趣的可以打开试着写一写。

有人可能会考虑用zip来解决这个问题,但是zip其实不能完成采样的任务。那样的话,我们的clicks的队列会不断地增长,而不是真正的采样。我在下边给出两份参考代码
Solution1:

Solution2:

可以看出来Rxjs中的工具还是非常的多和实用的。我这里是抛砖引玉,更多的内容都在官网上有。掌握或者了解Rxjs的各种工具的适用场景,我认为是成为一个合格的Reactive Programmer的基本。

我在刚刚接触Observable的时候,也是理解了好几次,特别是event stream。看官方的文档,资料,也看了几次。看多了用多了也就有一些感觉,代码的质量也就能提升。

我们的工具箱中又多了一个工具。没有任何一个工具是万金油。不同的场景有不同的合适的方法,对每个的特点都理解,不要用的太死板是我们的该做到的。

CSP (Communicating Sequential Processes)/ Channels

CSP的目的是用Channel来设计并发程序。和我们之前所介绍的各种方法一样。

CSP的提出在这里,说实话我也没怎么看过这篇文章,作者Hoare现在仍在更新CSP的理论。CSP的理念在Go语言和Clojure Script中使用的很多

Channel

那么Channel是什么?Channel有些像Stream,也类似于管道Pipe。如果了解Actor模型的话(Scala中大量使用的并发模型),Channel确实和Actor很类似。但Channel有一个重点在于,默认情况下他没有缓冲区,因此Channel自然有了这样一个特点就是,反向压力(Back Pressure)。举个生活中的例子,我拿着水管接在水龙头上边给外边的花草浇水,我打开洒水口的开关就行了;当我浇完了我直接把洒水口的开关关掉就行了,我并不需要告诉源头我要开还是要关,我只需要操作我手边的开关就可以了。这是反向的从消费者到生产者的一个交流,告诉上边的生产者说,我不需要水了。Channel的Back Presse,也就是说Channnel的send动作是阻塞的。只有当Channel在另一方,调用了取消息take后,才会解除阻塞,反之亦然。而Actor的send是异步的,也就是非阻塞的。

在我们的程序中,我们有这样的代码,我们的生产者和消费者之间没有直接的联系,因为我们需要这么做,以达到职责清晰,代码的局部性更好。我们需要生产者和消费者之间有一个交流,但是如果我们不想引入一个类似于全局变量的东西,告诉生产者需不需要继续生产,别给我发了。(Rxjs Observable中有Hot Cold)。
另外一种设计方式就是用我们的Channel,利用Back Pressure,可以做到,你不能往Channel放更多地东西,直到我准备好了可以继续取;你不能继续取东西,直到我可以再往里边放东西。

我们提到了阻塞,你可能会联想到JavaScript唯一有阻塞功能的工具,就是我们刚才讲过的Generator函数。确实如此,CSP的JavaScript版本需要用到Generator。

看点代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var ch = chan();

function *process1(){
yield put(ch, "hello"); // 1. put hello into ch and pause proc1 until someone take it from the ch
var msg = yield take(ch); // 2. resume proc1 because someone take hello out, and wait for someone to put something into the ch
console.log(msg); // 3. someone put something into ch to let proc1 to resume.
}

function *process2(){
var greeting = yield take(ch); // 1. pause proc2 until someone put something into ch
yield put(ch, greeting + " world"); // 2. resume proc2 because someone has put data in ch, and put data into ch, then pause proc2 again until someone have take that from ch
console.log("done"); // 3. resume proc2 because someone take the data from ch
}

// hello world
// done

chan, put, take我们先当做现有的函数来理解这段代码。我们用到了yield,因为我们需要阻塞或者说暂停我们的函数。 假设proc1和proc2在两个不同的线程运行,他们通过ch就可以协调消费者生产者的运行时交流与相互控制。

Blocking Channel

再看一个例子,我们用了一个csp的lib,里边用到了go函数,因为是模仿了Go语言的方式调用类似线程运行的功能。csp.timeout返回一个channel,而我们并不需要关心这个channel到底在哪,我们只需要知道过500ms他会在里边放一个消息,让我们的yield的地方继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var ch = chan();

csp.go(function*(){
while(true) {
yield csp.put(ch, Math.random()); // repeately put random number in ch, pause when blocking
}
})

csp.go(function*(){
while(true){
yield csp.take( csp.timeout(500) ); // wait 500ms
var num = yield csp.take(ch); // take data from ch
console.log(num);
}
})

下面的例子alts,功能比较像promise的race。

1
2
3
4
5
6
csp.go(function*(){
while(true){
var msg = yield csp.alts(ch1, ch2, ch3);
console.log(msg);
}
})

再来一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
csp.go(function*(){
var table = csp.chan();

csp.go(player, ["ping", table]); // setup ping 'proc'
csp.go(player, ["pong", table]); // setup pong 'proc'

yield csp.put(table, {hits: 0}); // start the game by put msg into channel table
yield csp.timeout(1000); // wait for 1000ms to close the channel table
table.close();
})

function* player(name, table) {
while(true) {
var ball = yield csp.take(table); // try to get the ball from table
if( ball === csp.CLOSED) {
console.log(name + ": table's gone");
return;
}
ball.hits += 1;
console.log(name + " " + ball.hits);
yield csp.timeout(100);
yield csp.put(table, ball); // put updated ball back to ch
}
}

Event Channel

我们在Observable有一个工具是fromEvent,我们这里用csp的方法设计一个自己的fromEvent。这里用了putAsync,我想从字面也能理解这个是非阻塞的put。我为什么用putAsnc呢,因为这个函数不是一个Generator,而是一个普通的JavaScript函数。putAyns会返回一个promise,当这个put成功是resolve。但是我这里并不关心她什么时候put成功。这里是不是很像我们在Observable中不停地调用next函数?csp与Observable的核心区别在于有Back Pressure。如果Channel数据没有被取走,生产者无法继续往队列里放数据。连续不停地调用putAsync,并不会覆盖Channel的数据,每一个putAsync都会返回promise,他们排着队等待往Channel里放数据。直到有人从Channel中开始取数据时,这个队列才会向前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function fromEvent(el, eventType){
var ch = csp.chan();
$(el).bind(eventType, function(evt){
csp.putAsync(ch, evt);
});
return ch;
}

csp.go(function*(){
var ch = fromEvent(el, "mousemove");
while(true){
var evt = yield csp.take(ch);
console.log(evt.clientX + "," + evt.clientY);
}
})

CSP练习

我认为下面这个练习的情景有些生硬,不是特别适合Channel的场景,没能完全体现出Channel的优势。而且我认为原来培训中的参考答案并不完全正确的。但是还是可以参考一下这个题目吧。需求跟Observable的btn采样是一样的。我给直接给出了我的代码。我们最后还是用到了shouldWrite变量来控制写,而且好像没有更好地办法。

这里我引用了js-csp。对于看过Rethinking的,我一直没有使用Trainer的ASQ库是因为我觉得他的lib东西太多了,什么都可以做,而且使用什么都要在前面加ASQ,看上去有些奇怪。不过他确实很厉害,自己写了这么多工具的实现。

大练习 A tale of three lists

Trainer的这个练习还是很赞的,给出地址git,有兴趣的可以clone下来,用我们讲过的各种工具(Callback, thunk, promise, generator, obserable, csp)都实现以下这个练习。还是很有挑战的。

小结

两篇文章介绍了各种JavaScript异步编程的工具。我的理解是,没有一个万金油可以解决所有的问题。如果你听身边有人说Observable可以解决一切异步编程的问题,或者说CSP可以搞定所有的异步编程设计,那么他一定是过分沉浸在自己的世界了,所谓生搬硬套。我们需要做的就是掌握这个工具箱里的所有工具,在适合的时候拿出来,写出漂亮的代码,解决这个问题。

最后推荐一些我写第二篇时看到的一些有关的好文章的连接

https://juejin.im/entry/56f480ccc4c9710051bffd2b
https://www.zhihu.com/question/26192499
http://lucasmreis.github.io/blog/quick-introduction-to-csp-in-javascript/

JavaScript异步编程1

发表于 2017-11-18

写在前面

最近看了Lynda上的一个课程叫做Rethinking Asynchronous Javascript。 在此写一写整理一下自己的心得。

异步与并发

其实谈到异步编程我们基本上会讲到并发(Parallel)和异步(Async)。从很高的角度来看,他们说的是同一个事情。然而他们在实际工作中一般各有所指。

从理念上看,并发指的是优化,资源同时的得到执行,加快了时间。比如玩过山车,如果每次只让一个人上车,那么其他人都要等待。如果一次上去30个人,那么这30个人就是同时享受到了服务。从计算机的角度看,那就是线程和多核,内核最终支持多少个线程并发,就是类似于过山车一次可以装多少个人上去。所以说一般谈到并发,讲的是资源的最大利用率和效率的问题,最终让请求或者任务得到快速的响应。

谈到异步,则是从编程的角度来看,其实也就是说的非阻塞。如果说到Javascript中我们最最基本的应该就是Callback了。通常我们会在一段代码中有一些需要XHR或者SetTimeout类似的工作,然后我们可以注册一个Callback,然后在等这个注册的函数在任务完成后执行。然而注册这个Callback并不会影响他之后代码的执行。

传统的Web服务器,例如Apache,是多线程的。每一个用户的请求都会编程一个线程,在服务器端执行,只有在服务器完成了用户请求后才可以返回。这样,如果访问的人越来越多,那么线程也会越来越多。所以并发和多线程在这种设计情景下,就是要最大限度的利用服务器的资源。

有关Javascript,我们都知道JS是单线程的执行的,因为它一开始设计得时候就是为了操作浏览器中的DOM元素,单线程也是最简单的设计。我们也都知道Android和iOS程序所有修改UI的操作都需要在主线程也就是UI线程进行。所以JS一定是顺序执行的。而所有的阻塞操作,都是通过回调的方式进行的。还有一个话题就是JS的Event Loop。这个放一放。
Nodejs的服务器是单线程的,因为用了Chrome V8的JS Engine。也是因为js的单线程特性,所以js的代码必须是异步的,也就是说是非阻塞的。如果包含例如文件读写,网络请求等,那么这些操作
程都会被这个线程接收,然后转到后台的MicroTask任务执行。线程只负责转发这个任务而不是真正执行。只有当任务完成后,这个线程又会得到消息,再把这个结果返回给用户。这样,这个线程所做的工作就比较简单。

Callback

Callback,不用多说,js最最原始的异步调用方式。每注册一次Callback会产生两片代码区域。他们的执行是异步的。看下边的code中的setTimeout注册了一个回调,一段代码在callback函数外边,包括了doSomething,setTimeout的调用和doOtherThing,另一段代码是setTimeout里的回调函数,console.log。第一段代码不知道第二段代码是否已经执行,何时执行。第二段代码也不会知道第一段代码执行到了哪里。

1
2
3
4
5
doSomething();
setTimeout(function(){
console.log("callback!");
}, 1000);
doOtherThings();

Callback Hell

下边的代码就是所谓的Callback Hell。

1
2
3
4
5
6
7
8
9
setTimeout(function(){
console.log("One");
setTimeout(function(){
console.log("Two");
setTimeout(function(){
console.log("Three");
}, 1000);
}, 1000);
}, 1000);

Callback Hell,大家都知道的一点叫Indentation Nesting。就是层数越多,缩进越多,代码越难看,越难以维护。
其实这个只是表象,我们甚至可以改写上边的代码,如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
function one(cb){
console.log("one");
setTimeout(cb, 1000);
}
function two(cb){
console.log("two");
setTimeout(cb, 1000);
}
function three(cb){
console.log("three");
}
one(function(){
two(three);
});

这样我们没有之前代码那样的缩进,但是这个代码还是看着很别扭。

一个模拟的实际问题

再看下边一个稍实际一点例子(之后会继续用这个例子)。
需求有三点:

  1. 并行的请求3个文件
  2. 尽快的输出文件内容,但是不要等所有请求都完成再去输出
  3. 输出是有顺序的,file1内容先输出,之后file2,最后输出file3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
function fakeAjax(url,cb) {
var fake_responses = {
"file1": "The first text",
"file2": "The middle text",
"file3": "The last text"
};
var randomDelay = (Math.round(Math.random() * 1E4) % 8000) + 1000;

console.log("Requesting: " + url);

setTimeout(function(){
cb(fake_responses[url]);
},randomDelay);
}

function output(text) {
console.log(text);
}

// **************************************
// The old-n-busted callback way

function getFile(file) {
fakeAjax(file,function(text){
// what do we do here?
});
}

// request all files at once in "parallel"
getFile("file1");
getFile("file2");
getFile("file3");

我们该怎么办?因为要满足并发,在代码最后三行我们同时调用了getFile。那么如何让结果及时输出并有序呢?那么一定是在fakeAjax里的回调函数中处理。而我们并不知道每一个请求到底要花多少时间才能返回,所以我们需要在回调函数中通过一些变量或者状态,知道目前这三个请求的状态,才能顺序输出。例如,如果我们file3最先返回了结果,但是file1或者file2还没有返回,那么我们并不能立即输出file3的内容,而要等到file1和file2的内容都已经输出了再输出。所以我们的回调函数中,需要知道几乎所有的事情。那么我们看看我们该怎么做:

  1. 我们需要定义输出顺序,也就是file1 > file2 > file3。
  2. 我们需要知道每一个文件请求结果,以及这个内容是否已经输出过

实现参考以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
function fakeAjax(url,cb) {
var fake_responses = {
"file1": "The first text",
"file2": "The middle text",
"file3": "The last text"
};
var randomDelay = (Math.round(Math.random() * 1E4) % 8000) + 1000;

console.log("Requesting: " + url);

setTimeout(function(){
cb(fake_responses[url]);
},randomDelay);
}

function output(text) {
console.log(text);
}

// **************************************
// The old-n-busted callback way

function getFile(file) {
fakeAjax(file,function(text){
fileReceived(file,text);
});
}

function fileReceived(file,text) {
// haven't received this text yet?
if (!responses[file]) {
responses[file] = text;
}

var files = ["file1","file2","file3"];

// loop through responses in order for rendering
for (var i=0; i<files.length; i++) {
// response received?
if (files[i] in responses) {
// response needs to be rendered?
if (responses[files[i]] !== true) {
output(responses[files[i]]);
responses[files[i]] = true;
}
}
// can't render yet
else {
// not complete!
return false;
}
}

output("Complete!");
}

// hold responses in whatever order they come back
var responses = {};

// request all files at once in "parallel"
getFile("file1");
getFile("file2");
getFile("file3");

可以看到我们的回调函数中的逻辑还是比较复杂的。 我们遍历了输出文件顺序的files数组,然后依次查看responses有没有返回信息,以及目前正在遍历到的file有没有被输出过。如果结果还没有返回,那么就退出,把这个输出的调用留到下一个回调函数触发的时候。如果返回了,那么还要看输出过了没,如果输出过了,那就过掉。
我们的回调函数中有了不少代码,跟这个回调本身该做的事情没有关系。这些代码,都是跟时间或者说状态有关的。输出的顺序,输出过的状态。而我们人脑的思维方式是习惯于顺序思考的。比如上面这个例子如果不考虑阻塞什么的,写伪代码的话我们大概会这么写

1
2
3
4
5
6
7
8
9
fetch( file1 );
fetch( file2 );
fetch( file3 );
var file1Result = getFileContent ("file1");
var file2Result = getFileContent ("file2");
var file3Result = getFileContent ("file3");
output(file1Result);
output(file2Result);
output(file3Result);

上面的伪代码,是我们一般喜欢的思考方式,代码读起来也很容易。
反观Callback Hell,我们把本该做纯粹输出的Callback函数加了很多他本不应该具备的功能,比如,把程序运行的控制逻辑放入了Callback函数中。造成了Non-Local, Non-Sequential的代码。课程中,把这个叫做Inversion Of Control。这里的IOC只是指这种我们刚才描述现象,而跟其他我们一般说起的IOC不想关,比如Java的一些设计模式,以及流行的框架Spring。

Thunk

我们再看一个叫做Thunk的模式。Thunk出现的非常早,比JavaScript语言出现要早很多。
阮一峰的博客里讲到了thunk
从同步的角度来看Thunk是一种不需要你传入任何参数的函数,不用传入任何状态,任何时候你调用他,你可以得到相同的结果。

1
2
3
4
5
6
7
8
9
function add(x, y){
return x + y;
}

var thunk = function(){
return add(11, 31);
}

thunk();//42

Thunk从上面的例子看出,我们创建了一个Thunk,在里边Hard Code了调用add函数的参数。只要调用这个Thunk,他都会返回相同的值42。在这个Thunk中,我们用它包裹了一个状态的集合,之后我们不管把这个thunk传到哪里,只要调用他就能得到一个确定的值。我并不需要传递任何状态的值,我只需要这个Wrapper,并且调用他就可以得到我需要的值。这其实也是Promise设计的重要基础之一。而Promise要更为的高级,复杂。
我们可以把thunk从同步的模式,转换到异步的模式。那么什么是一个异步的thunk?其实这也没有标准答案。按照之前我们同步版thunk的特点,我们的thunk是一个状态和值的wrapper。那么,比较直接的想法,一个异步的thunk,是我们不传入任何跟值或者状态有关的东西,而只需要把回调的函数传入。
1
2
3
4
5
6
7
8
9
10
11
12
13
function addAsync(x, y, cb){
setTimeout(function(){
cb(x + y);
}, 1000);
}

var thunk = function(cb){
addSync(11, 31, cb);
}

thunk(function(sum){
sum;//42
});

每一次调用thunk,把Callback传入,我们知道我们会得到相应的结果。外部的代码并不知道,也不需要关心thunk里的参数或者计算倒地是什么细节。从thunk内部来说,他里边可能有复杂的计算,或者ajax call;他甚至可以记录计算后的结果,当你第二次调用他的时候立即返回这个结果。但是外部的代码并不需要关心这些,只要你调用thunk,就可以期待一个值在Callback中返回。
创建一个异步thunk,就是创建了一个返回值的wrapper,而且这个值是多少,与调用的时间无关。而时间,是程序代码中最复杂的状态因子。管理好程序执行的时机,是最难的工作之一。
Thunk就是一个value的wrapper,我不管把他传到哪里,我想知道这个value是,传入Callback,都会得到结果。
这其实跟promise的概念很相似。理解了这一点也就很容易理解promise。promise更为的复杂。

Lazy Thunk, Active Thunk

我们之前的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
function addAsync(x, y, cb){
setTimeout(function(){
cb(x + y);
}, 1000);
}

var thunk = function(cb){
addSync(11, 31, cb);
}

thunk(function(sum){
sum;//42
});

只有在调用Thunk时,才回去请求addAsync函数。再联系到我们之前Callback最后那个题目的情景,如果我们要同时、并发去请求文件,这样的thunk是做不到的。我们起个名字,把这种叫Lazy Thunk。
那怎么实现我们的并发需求呢?Lazy Thunk似乎并不能满足我们的并发需求。我再把之前的例子贴过来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
function fakeAjax(url,cb) {
var fake_responses = {
"file1": "The first text",
"file2": "The middle text",
"file3": "The last text"
};
var randomDelay = (Math.round(Math.random() * 1E4) % 8000) + 1000;

console.log("Requesting: " + url);

setTimeout(function(){
cb(fake_responses[url]);
},randomDelay);
}

function output(text) {
console.log(text);
}

// **************************************

function getFile(file) {
// what do we do here?
}

// request all files at once in "parallel"
// ???

我们的getFile应该返回一个接受一个Callback参数的thunk,所以我们最后的代码应该有这样一段,前三个调用创建三个thunk,并且创建时就去请求,最后顺序输出结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
var th1 = getFile('file1');
var th2 = getFile('file2');
var th3 = getFile('file3');
th1(function(file1){
output(file1);
th2(function(file2){
output(file2);
th3(function(file3){
output(file3);
output('Compeleted!');
})
})
})

下面是重点getFile怎么实现。而要满足并发这个需求,我们必须要在getFile中,第一时间去调用fakeAjax。
1
2
3
4
5
6
7
8
function getFile(file) {
fakeAjax(file, function(response){
//Do something
});
return function(cb){
//Do other thing
}
}

下面有一个时机问题,我们的fakeAjax的回调函数执行时间,与thunk里传入的回调函数,倒地哪个先执行?

  1. fakeAjax中的cb先执行。
    那么我们需要把response存在一个local变量中,等thunk里的cb执行时,直接调用cb,传入这个变量存好的值就可以了
  2. thunk传入的cb先执行。
    这个看上去不是很直接,我们的返回值还没有,怎么调用这个cb?其实我们要做的事情跟前边的一样,只需要把这个cb存到local的变量中,等fakeAjax返回,再去执行这个cb,把返回的response传入。
    最后的参考代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    function getFile(file) {
    var content, fn;
    fakeAjax(file, function(response){
    if(fn){
    fn(response);
    }else{
    content = response;
    }
    });
    return function(cb){
    if(content) cb(content)
    else fn = cb;
    }
    }
    可能看上去还不大习惯,但是这个代码风格还是很实用的。这个就是Active Thunk。
    Thunk并不能解决我们之前在Callback中遇到的大部分缺陷,比如IOC,我们还是会有callback,callback hell的问题。但是这个模式比callback那个代码清晰许多,代码可读性,可维护性都增加了不少。
    这个用纯js代码实现的异步工具,十分的实用,利用这样的代码可以构建很多有用的工具方法。理解了这个,也就更容易理解promise。

Promise

Promise,大家肯定都知道了。Promise也成了ES6的标准,主流浏览器的都支持。可以参考MDN,Promise。
Promise中比较重要的一点是一个Promise只能Resolve一次。举个例子,你做了一个电商网站,专门卖电视。其中有下单,给用户信用卡扣款的流程。如果使用callback,可能的代码是这样的。其中trackCheckout返回一个listener,你注册一个时间监听comletion,之后完成扣款。但是这个方法,可能来自于一个别人写的Module,甚至有可能是一个第三方的lib。如果这个模块中有bug,同一个订单的completion在某些情况下被触发了两次,那么这样的代码下,会扣两次款。这样的callback形式的代码对这种bug特别敏感,要处理这样的情况也需要做很多的额外代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
functiion finish(){
chargeCreditCard(purchaseInfo);
showThankYouPage();
}

function error(err){
logStatsError(err);
finish();
}

var listener = trackCheckout(purchaseInfo);
listener.on('completion', finish);
listener.on('error', error);

如果使用Promise,让trackCheckout返回一个promise,利用promise只能resolve一次的特点,就不会有个问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function trackCheckout(purchaseInfo){
return new Promise(function(resolve, reject){
//call resolve when task complete
//call rejct when task encounter error
});
}

//

var prommise = trackCheckout(purchageInfo);

promise.then(
finish,
error
);

不管你在trackCheckout中调用resolve多少次,then只会执行一次。
Promise保证

  1. only resolve once
  2. either success or error
  3. messages are kept/ passed
  4. exceptions treat as error
  5. immutable once resolved

Promise可以理解成一个callback manager,让我们的callback调用更有保证,更清晰明白,

Flow Control

下面看看我们一般从外边资料学习到的promise是哪些内容
一般我们写的js代码做一些复杂的事情,比如同时发request,等得到结果了以后做什么事情。就跟我们之前做的callback,thunk的练习一样。这就是flow control。我们看看promise做我们的flow control

Chaining Promise

一个重要的特点,也是设计Promise API的最重要的点。就是Chaining。Promise A代表步骤A, Promise B代表步骤B,…

1
2
3
4
5
6
7
8
9
10
11
doFirstThing()
.then(function(){
return doSecondThing();
})
.then(function(){
return doThirdThing():
})
.then(
complete,
error
);

Promise不仅是flow control,也是data flow的体现。

练习

跟之前的例子一样,我们看看如何用promise完成输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
function fakeAjax(url,cb) {
var fake_responses = {
"file1": "The first text",
"file2": "The middle text",
"file3": "The last text"
};
var randomDelay = (Math.round(Math.random() * 1E4) % 8000) + 1000;

console.log("Requesting: " + url);

setTimeout(function(){
cb(fake_responses[url]);
},randomDelay);
}

function output(text) {
console.log(text);
}

// **************************************

function getFile(file) {
// what do we do here?
}

我们需要在getFile中立马发出ajax请求,并且返回一个promise。
我们需要按顺序输出,这里应该要用到chaining.如果用过promise这个代码应该很容易写出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
function fakeAjax(url,cb) {
var fake_responses = {
"file1": "The first text",
"file2": "The middle text",
"file3": "The last text"
};
var randomDelay = (Math.round(Math.random() * 1E4) % 8000) + 1000;

console.log("Requesting: " + url);

setTimeout(function(){
cb(fake_responses[url]);
},randomDelay);
}

function output(text) {
console.log(text);
}

// **************************************

function getFile(file) {
return new Promise(function(resolve){
fakeAjax(file,resolve);
});
}

// Request all files at once in
// "parallel" via `getFile(..)`.
var p1 = getFile("file1");
var p2 = getFile("file2");
var p3 = getFile("file3");

// Render as each one finishes,
// but only once previous rendering
// is done.
p1
.then(output)
.then(function(){
return p2;
})
.then(output)
.then(function(){
return p3;
})
.then(output)
.then(function(){
output("Complete!");
});

有一些问题

  1. 我们需要自己new 一个Promise吗?
    一般的Library都会把异步操作封装好,直接返回一个promise。但是我们有时候还是需要和上面的代码一样,做一个异步操作的封装,封装的有可能是一个callback或者其他的调用。
  2. 能不能把output写到function里面?
    可以,但是function应该有一个单一的功能,能分开的功能尽量分开
  3. output的chain后面的promise.then的传入参数是什么?
    传入一个新的promise,传入的参数是output的返回值。
  4. resolve是哪里来的?
    来自Promise库函数。
  5. catch逻辑是怎么样的?

Promise Hell

Promise一定要用对,不然很容易写成promise hell。如果你发现自己陷入了嵌套迭代很深的缩进时,说明你的promise代码很可能写错了。

1
2
3
4
5
6
7
8
9
10
11
p1
.then(function(text){
output(text);
p2.then(function(text){
output(text);
p3.then(function(text){
output(text);
output('completed!');
} )
})
});

上边的练习里边我们有三个文件要去请求,那么如果我们的请求的文件数目增加我们是不是要写很多的.then呢?有没有更好的办法?
可以用map和reduce方法来减少我们的重复代码。
我们可以把任意个file的list转换成promise的list,再用reduce函数一一处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
function fakeAjax(url,cb) {
var fake_responses = {
"file1": "The first text",
"file2": "The middle text",
"file3": "The last text"
};
var randomDelay = (Math.round(Math.random() * 1E4) % 8000) + 1000;

console.log("Requesting: " + url);

setTimeout(function(){
cb(fake_responses[url]);
},randomDelay);
}

function output(text) {
console.log(text);
}

// **************************************

function getFile(file) {
return new Promise(function(resolve){
fakeAjax(file,resolve);
});
}

// Request all files at once in
// "parallel" via `getFile(..)`.
//
// Render as each one finishes,
// but only once previous rendering
// is done.
["file1","file2","file3"]
.map(getFile)
.reduce(
function(chain,filePromise){
return chain
.then(function(){
return filePromise;
})
.then(output);
},
Promise.resolve() // fulfilled promise to start chain
)
.then(function() {
output("Complete!");
});

Promise 其他的API

Promise.all

如果任何一个里面的promise reject,那么all立刻reject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Promise.all([
doTask1a(),
doTask1b(),
doTask1c()
])
.then(function (results){
return doTask2(
Math.max(
results[0],
results[1],
results[2],
);
);
});

Promise.race

只要有一个promise完成,resolve或者reject

1
2
3
4
5
6
7
8
9
10
11
12
13
var p = trySomeAsyncThing();
Promise.race([
p,
new Promise(function(_, reject){
setTimeout(function(){
reject('timeout!');
}, 3000)
})
])
.then(
success,
error
);

123

Roger Guo

25 日志
12 标签
© 2019 Roger Guo
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4