概述
多线程开发中有几个痛点:
- 主线程如何正确的关闭异步线程?
- 主线程怎么知道异步线程是否执行完成?
当然已经又很好的方案来解决这些问题了,这就是 Future 模式。
Future
Future 模式在请求发生时,会先产生一个 Future 对象给发出请求的客户,而真正的结果是由一个新线程执行,结果生成之后,将之设定至 Future 之中,而当客户端需要结果时,Future 也已经准备好,可以让客户提取使用。
一个简单的示范1 2 3 4 5 6 7 8 9 10 11 12 13
| public Future request() { final Future future = new Future();
new Thread() { public void run() { RealSubject subject = new RealSubject(); future.setRealSubject(subject); } }.start();
return future; }
|
JDK 里面也有对 Future 模式的实现,我们先来看看 Future
接口包含什么内容
1 2 3 4 5 6 7 8 9 10
| package java.util.concurrent;
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
|
支持的功能:
线程中 Future 模式
单线程使用1 2 3 4 5 6 7 8 9 10
| FutureTask<Integer> future = new FutureTask<Integer>(new Callable<Integer>() { public Integer call() throws Exception { return new Random().nextInt(100); } }); new Thread(future).start();
Thread.sleep(5000);
System.out.println(future.get());
|
这里的 FutureTask
的继承关系
1 2 3 4 5
| public class FutureTask<V> implements RunnableFuture<V> {}
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
|
由于继承了 Runnable
和 Future
两个接口,所以可以被在线程中运行,同样具有 Future 功能,需要说明的是FutureTask
在 JDK 中是唯一一个实现 Future 的类。
线程池中 Future 模式
线程池使用1 2 3 4 5 6 7 8 9 10
| ExecutorService threadPool = Executors.newSingleThreadExecutor(); Future<Integer> future = threadPool.submit(new Callable<Integer>() { public Integer call() throws Exception { return new Random().nextInt(100); } });
Thread.sleep(5000);
System.out.println(future.get());
|
通过submit(Callable)
可以得到一个Future
,之前我们说过FutureTask
是 JDK 中是唯一的 Future 的实体类,所以submit
返回的Future
一定是一个FutureTask
。其内部其实是通过newTaskFor
方法来转换的。
1 2 3
| protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
|
ThreadPool 框架
这里我们来封装一下 JDK 中 Future 模式,使其更好用。
使用示例1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| List<ThreadTask<String>> results = new ArrayList<ThreadTask<String>>();
ThreadPool threadPool = new ThreadPool(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
for (int i = 1; i < 5; i++) { ThreadTask<String> threadTask = threadPool.submit(new Task(i));
threadTask.setOnTaskDoneListener(new OnTaskDoneListener() {
@Override public void onTaskDone(ThreadTask<?> task) { System.out.println("onTaskDone " + task.get()); } });
threadTask.setOnTaskCancelListener(new OnTaskCancelListener() {
@Override public void onTaskCancel(ThreadTask<?> task) { System.out.println(task + " onTaskCancel"); } });
results.add(threadTask); }
for (ThreadTask<String> res : results) { res.cancel(); System.out.println("result :" + res.get()); }
threadPool.shutdown();
|
实现源码:https://github.com/gavinliu/MThread