概述

多线程开发中有几个痛点:

  1. 主线程如何正确的关闭异步线程?
  2. 主线程怎么知道异步线程是否执行完成?

当然已经又很好的方案来解决这些问题了,这就是 Future 模式。

Future

Future 模式在请求发生时,会先产生一个 Future 对象给发出请求的客户,而真正的结果是由一个新线程执行,结果生成之后,将之设定至 Future 之中,而当客户端需要结果时,Future 也已经准备好,可以让客户提取使用。

一个简单的示范
1
2
3
4
5
6
7
8
9
10
11
12
13
public Future request() {
final Future future = new Future();

new Thread() {
public void run() {
// 下面这动作可能是耗时的
RealSubject subject = new RealSubject();
future.setRealSubject(subject);
}
}.start();

return future;
}

JDK 里面也有对 Future 模式的实现,我们先来看看 Future 接口包含什么内容

1
2
3
4
5
6
7
8
9
10
package java.util.concurrent;

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

支持的功能:

  • 关闭线程
  • 判断是否关闭
  • 判断是否完成
  • 获取结果

线程中 Future 模式

单线程使用
1
2
3
4
5
6
7
8
9
10
FutureTask<Integer> future = new FutureTask<Integer>(new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt(100);
}
});
new Thread(future).start();

Thread.sleep(5000);// do something

System.out.println(future.get());

这里的 FutureTask 的继承关系

1
2
3
4
5
public class FutureTask<V> implements RunnableFuture<V> {}

public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

由于继承了 RunnableFuture 两个接口,所以可以被在线程中运行,同样具有 Future 功能,需要说明的是FutureTask在 JDK 中是唯一一个实现 Future 的类。

线程池中 Future 模式

线程池使用
1
2
3
4
5
6
7
8
9
10
ExecutorService threadPool = Executors.newSingleThreadExecutor();
Future<Integer> future = threadPool.submit(new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt(100);
}
});

Thread.sleep(5000);// do something

System.out.println(future.get());

通过submit(Callable)可以得到一个Future,之前我们说过FutureTask是 JDK 中是唯一的 Future 的实体类,所以submit返回的Future一定是一个FutureTask。其内部其实是通过newTaskFor方法来转换的。

1
2
3
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

ThreadPool 框架

这里我们来封装一下 JDK 中 Future 模式,使其更好用。

  • 实现线程完成或关闭后的通知回调
使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
List<ThreadTask<String>> results = new ArrayList<ThreadTask<String>>();

ThreadPool threadPool = new ThreadPool(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

for (int i = 1; i < 5; i++) {
ThreadTask<String> threadTask = threadPool.submit(new Task(i));

threadTask.setOnTaskDoneListener(new OnTaskDoneListener() {

@Override
public void onTaskDone(ThreadTask<?> task) {
System.out.println("onTaskDone " + task.get());
}
});

threadTask.setOnTaskCancelListener(new OnTaskCancelListener() {

@Override
public void onTaskCancel(ThreadTask<?> task) {
System.out.println(task + " onTaskCancel");
}
});

results.add(threadTask);
}

for (ThreadTask<String> res : results) {
res.cancel();
System.out.println("result :" + res.get());
}

threadPool.shutdown();

实现源码:https://github.com/gavinliu/MThread