任务执行
大多数并发应用程序都是围绕任务执行来构造的。任务,在理想状态下,是一些离散的工作单元。通过把应用程序的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的并行结构来提升工作的并发性。
在线程中执行任务
当围绕任务执行来设计应用程序的结构时,第一步就是要找到清晰的任务边界。大多数服务器应用程序都提供一种自然的任务边界选择方式:以独立的客户请求为边界。Web服务器,邮件服务器,文件服务器,数据库服务器等。将独立的请求作为任务表边界,既可以实现任务的独立性,又可以实现合理的任务规模。
串行执行
1 | class SingleThreadWebServer { |
串行的处理请求,一般来说无法提供高吞吐率或快速的响应。服务器的资源利用率会非常低,多数时间都耗费在IO操作上。
显示的创建线程
1 | // Don't do this way |
不停创建线程的缺点
- 线程生命周期的开销非常高
- 资源消耗,大量的空闲线程会占用很多内存.
- 稳定性,OutOfMemoryError
Executor框架
Executor提供了一种标准的方法把任务提交与任务执行解耦开来。Executor基于生产者消费者模式,提交任务的操作是生产者,执行人物的消费者。1
2
3public interface Executor {
void execute(Runnable command);
}例,基于Executor的Web服务器
通过使用Executor,将请求处理任务的提交与任务的实际执行解耦。只要改变Executor的实现,便可改变任务的执行方式,策略和服务器的行为。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while(true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
exec.execute(task);
}
}
}执行策略
不同的执行策略,是不同的资源管理方式,最好的策略要根据当时可用的计算资源以及对服务质量的需求而定。
通过限制并发线程数量,可以避免程序因为过多的请求而耗尽服务器资源,最终导致失败。或者由于资源短缺,发生竞争,而导致性能问题。
把任务提交,与执行策略分离开来,有助于在部署时选择与可用资源最匹配的策略,发挥最大效能。线程池
线程池,管理一组同类型工作线程的资源池。线程池与工作队列密切相关。工作队列保存了所有等待执行的任务。
java.util.concurrent.Executors中提供了常用的静态工厂方法,来创建线程池。
1 | newFixedThreadPool(int nThreads) |
其中前三个方法都是对java.util.concurrent.ThreadPoolExecutor这个类的包装使用,而ThreadPoolExecutor实现了ExecutorService接口。
Executor生命周期
Executor的生命周期有三种状态:运行,关闭,已终止。1
2
3
4
5
6
7
8public interface ExecutorService entends Executor {
void shutdown(); // Stop accept new request
List<Runnable> shutdownNow(); // Stop accept, and immediately stop all the executing threads
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // wait until it is terminated.
// ...
}
延迟任务与周期任务
通过使用ScheduledThreadPoolExecutor来执行延迟任务或者周期任务。
可利用并行性
大多数服务器应用程序,明显的任务边界是每个客户的请求。在单个客户请求中仍可以发掘并发的可能性。
例,串行页面渲染器
1 | public class SingleThreadRenderer { |
有结果的任务Callable与Future
Runnable的最大局限在于,他不能返回一个值,或者抛出受检查的异常。
Callable是一种更好的抽象。
Future表示一个任务的生命周期,可以判断任务是否完成或者已经取消。
ExecutorService中的submit方法会返回一个Future。或者可以显示的为某个指定的Runnable或者Callable实例化一个FutureTask,用newTaskFor方法或者其构造函数。1
2
3
4
5
6
7
8
9
10
11public interface Callable<V> {
V call() throws Exception;
}
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCanceleed();
boolean isDone();
V get() throws InterruptedException, ExecutionException, CancellationException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException;
}
提交Runnable或者Callable时,就是把Runnable从任务提交的线程,安全的发布到最终执行任务的线程,等待执行。
同样的在给Future的结果赋值时,也是把结果从计算线程,安全的发布到调用get方法的线程中。
例,使用Future实现页面渲染器
1 | public class FutureRenderer { |
上面的代码把渲染文字与下载图像分开,稍微提升了一点性能。但是问题在于,必须要等所有图像都下载完毕,Future才能完成,才可以渲染图像。
我们可以使用CompletionService来更好地利用并行资源。CompletionService把Executor和BlockingQueue的功能融合,可以通过submit来提交Callable,也可以使用take和poll方法,来获得已经完成的任务结果。
ExecutorCompletionService的实现也很简单。他需要一个BlockingQueue来保存结果。当提交任务时,我们把任务包装成一个叫QueueingFuture的类,这个类继承FutureTask,并且改写protected的方法done,在这方法中加入把结果放入BlockingQueue的逻辑。当一个计算任务结束时,就调用FutureTask的done方法。1
2
3
4
5
6
7
8private class QueueingFuture<V> extends FutureTask<V> {
QueueingFuture(Callable<V> c) { super(c); }
QueueingFuture(Runnable t, V r) { super(t, r); }
protected void done() {
completionQueue.add(this);
}
}
使用CompletionService的页面渲染器
1 | public class Renderer { |
取消与关闭
把任务和线程安全、快速、可靠的停止下来,不是一件容易的事情。
中断是一种协作机制,可以使一个线程种植另一个线程的当前工作。这种协作模式的好处在于,当需要停止时,它们会首先清除当前正在执行的工作,然后在结束。
任务取消
取消一个任务的原因有,
用户请求取消,例如用户点击了取消按钮;
有时间限制的操作,例如超时;
应用程序事件,例如算法中可能并行的去计算多个空间,当其中一个得出结果时,可以取消其他的分支;
错误,例如写入文件是空间不足会报错;
关闭,当程序要关闭时,需要处理正在运行和在等待的任务。
Java中没有一种安全的抢占式的停止方式来终止一个任务线程,而协作模式是更好的一种方式。
一个可取消的任务必须有它的取消策略。这个策略中详细的定义了其他代码如何取消该任务,任务在何时检查是否已经请求了取消,以及响应取消时会执行哪些操作。
一个原始的办法时任务在执行过程中不断的检查一个自定义变量,在被设置后退出。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class PrimeGeneraor implements Runnable {
"this") (
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigIntger.ONE;
while(!cancelled) {
p = p.nextProbalePrime();
synchronized(this) {
primes.add(p);
}
}
}
public void cancel() { cancelled = true; }
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
}
中断
使用检查自定义变量的方法的问题在于,当执行一个阻塞方法时,例如阻塞队列put方法调用时队列满,就不会再有机会检查到这个标志位。
每个线程都有一个boolean类型的中断状态。当线程中断时,这个线程的中断状态将被设置为true。1
2
3
4
5public class Thread {
public void interrupt() { ... } // 中断目标线程
public boolean isInterrupted() { ... } // 返回目标线程是否中断
public static boolean interrupted() { ... } // 清除当前线程的中断状态
}
对中断操作的正确理解是,它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由下一个适合的时刻中断自己。
使用静态的interrupted时应该小心,因为它会清除当前线程的中断状态。如果调用interrupted时返回了true,那么除非你想屏蔽这个中断,否则一定要处理。
1 | class PrimeProducer extends Thread { |
中断策略
中断策略规定线程如何解释某个中断请求————当发现中断请求时,应该做哪些工作,那些工作单元对于终端来说是原子操作,以及以多块的速度来响应中断。
最合理的中断策略是某种形式的线程级取消操作或者服务级取消操作:尽快推出,在必要时清理,通知某个所有者该线程已经退出。
由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。
响应中断
当调用可中断的阻塞函数时,有两种使用的策略处理中断异常:
- 传递异常
- 恢复中断状态
如果不想传递或无法传递InterruptedException,那么需要寻找另一种方式来保存中断请求。一个方法就是通过再次调用interrupt来恢复中断状态。
只有实现了线程中断策略的代码才可以屏蔽中断请求。常规的任务和库函数代码中都不应屏蔽中断请求。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while(true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// 如果在这里恢复中断状态,会引起无限循环
}
}
} finally {
if(interrupted)
Thread.currentThread().interrupt(); // 在返回前恢复中断状态
}
}
例,计时运行
下面的代码,是一个任意的Runnable在指定时间内运行完成的例子。在调用这个timedRun方法的线程中运行这个任务,并安排了一个取消操作。1
2
3
4
5
6
7
8
9
10
11private static final ScheduledExecutorService cancelExec = ...;
public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
final Thread taskThread = Thread.currentThread();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
r.run();
}
这个方法非常简单,但是破坏了规则,就是在中断线程前,应该了解他的中断策略。因为timedRun可以从任意一个线程中调用,它无法知道这个调用线程的中断策略。如果run方法在超时限制前完成,那么线程中断操作是什么代码我们就不得而知了。反过来,如果任务不响应中断,那么timedRun会在任务结束时才返回。会给调用者带来负面影响。
通过Future实现取消
将任务提交交给一个ExecutorService,并通过一个定时的get方法获得结果。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public static void timedRun(Runnable r, long timeout, Timeunit unit) throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// 接下来任务将被取消
} catch (ExecutionException e) {
// 如果在任务中抛出了异常,那么重新抛出该异常
throw launderThrowable(e.getCause());
} finally {
// 如果任务已经结束,那么执行取消操作也没有任何负面影响
// 如果任务正在执行,那么会被中断
task.cancel(true);
}
}
处理不可中断的阻塞
并非所有的可阻塞方法或者阻塞机制都能响应中断。如果一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞,那么中断请求只能设置现成的中断状态,除此之外没有其他任何作用。
对于由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些县城,但要求我们必须知道线程阻塞的原因。
- 同步的Socket I/O。虽然InputStream和OutputStream中的read,write都不会响应中断,但是通过关闭底层的Socket,可以使线程抛出一个SocketException。
1
2
3
4
5
6
7
8
9
10
11
12
13
14public class ReaderThread extends Thread {
private final Socket socket;
...
public void interrupt() {
try {
socket.close();
}
catch (IOException ignored) {}
finally {
super.interrupt();
}
}
} - Java.io 中的同步I/O。Channel会派出AsynchronousCloseException。
- 异步的Socket I/O,也是调用close或者wakeup方法。
- 获取某个锁。无法响应。但是通过Lock类中提供的lockInterruptibly方法,运行在等待一个锁时仍能响应中断。
停止基于线程的服务
包括了LogWrite,LogService,关闭ExecutorService,Poison Pill,等例子。
处理非正常的线程终止
任何代码都可能抛出一个RuntimeException。每当调用另一个方法时,都要对他的行为保持怀疑。1
2
3
4
5
6
7
8
9
10
11public void run() {
Throwable thrown = null;
try {
while(!isInterrupted())
runTask(getTaskFromQueue());
} catch (Throwable e) {
thrown = e;
} finally {
threadExited(this, thrown);
}
}
未捕获异常处理,UncaughtExceptionHandler。
JVM关闭
Shutdown Hook
Daemon Thread的使用场景与局限
避免使用finalize
线程池的使用
在任务与执行策略之间的隐耦合
包括
- 依赖性任务。两个并行的任务之间有依赖。
- 使用线程封闭机制的任务。
- 对时间响应铭感的任务,GUI程序。
- 使用ThreadLocal的任务。线程池中不该使用。
线程池大小
根据CPU个数,目标使用率等参数决定。
ThreadPoolExecutor的设计。
GUI界面的程序
为什么GUI是单线程的
因为简单。对线程的GUI系统容易死锁。
当前的GUI框架使用了专门的事件分发线程(Event Dispath Thread, EDT)来处理GUI事件。也就是主线程。
GUI应用程序需要处理一些细粒度的时间,例如点击鼠标、按下键盘或定时器超时等。因为只有单个线程来处理所有的GUI任务,因此会采用串行处理的方式。串行任务处理的不利之处在于,如果某个任务执行的时间很长,那么其他任务必须等到该任务执行结束。
在事件线程中执行的任务必须尽快的把控制权交还给事件线程。
短时间的GIU任务和长时间的GUI任务,处理有区别。