1.FutureTask 在研究AsyncTask源代码的时候发现它的内部使用了FutureTask、Future、Callable类,然后就学习了一下FutureTask类。
FutureTask是一个用来执行异步任务的类,同时当程序执行完成之后还会返回运算的结果。 我们之前经常使用Thread+Runnable来执行异步任务的,但是使用这种方式不能获取到执行的结果。FutureTask解决了这个问题。
2.FutureTask用法 我们使用FutureTask来实现一个模拟一个非常简单的事情,使用Thread.sleep()来模拟执行耗时的操作工作,然后将执行完成的结果返回出来,然后打印出来。 public class FutureTaskActivity extends Activity implements View.OnClickListener { … //创建一个实现了Callable接口并且在call()方法中做耗时操作的类 class WorkTask implements Callable< Integer> { @Override public Integer call() throws Exception { //使用线程sleep来模拟耗时的操作 Thread.sleep(5000); //将执行的结果返回出去 return 1000; } } … private void executeTask() { //创建一个实现了Callable的类的对象,并且当作参数传入到FutureTask中 WorkTask workTask = new WorkTask(); FutureTask< Integer> futureTask = new FutureTask< Integer>(workTask) { @Override protected void done() { try { //该方法是在线程运行结束后回调的,然后获取call方法中返回的结果 int result = get(); Log.i(“LOH”, “result…” + result); Thread.currentThread().getName(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }; //将FutureTask作为参数传入到Thread函数中执行 Thread thread = new Thread(futureTask); //启动线程执行任务 thread.start(); } } 其实FutureTask的使用跟Runnable的使用差不多的,只是多出了一个用来实现Callable接口的类作为参数传入到 FutureTask中,并且以前的那种方法不能监听线程执行完成的。 以前在使用线程的时候都知道只有三种方式来执行异步任务: ①使用new Thread(new Runnable()).start()的方法来执行异步任务,也就是说将实现Runnable接口的类当作参数传入到Thread 类中,然后使用thread.start来启动线程执行 ②通过继承Thread类并且重写该类的 run 方法,在该方法中执行耗时的操作,同样也是使用 Thread.start()来启动线程执行 ③使用线程池来执行实现了 Runnable 接口的类,这样子也可以达到执行异步任务的目的。
通过对上面的这些总结我们可以知道了要想实现异步任务的话就必须实现 Runnable() 接口才行的,所以我们也可以非常肯定的断定FutureTask也是实现了该接口的,不然就无法执行异步任务的。
3.源码解析 首先我们通过一个类的关系图来看看这几个类之间的关系图: FutureTask实现了RunnableFuture接口,而RunnableFuture实现了Runnable接口和Future接口,所以FutureTask可以当作任务在线程池中执行,也可以当作参数传入Thread中进行启动任务,在创建FutureTask对象的时候需要传入一个Callable接口的实现类,从上面可以看到执行FutureTask任务同样也是在 run方法中的。 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); private static final long STATE; private static final long RUNNER; private static final long WAITERS; static { try { //通过Unsafe来获取字段state相对于本对象内存地址的偏移地址 STATE = U.objectFieldOffset( FutureTask.class.getDeclaredField(“state”)); //获取字段runner在内存中的偏移地址 RUNNER = U.objectFieldOffset( FutureTask.class.getDeclaredField(“runner”)); //获取字段waiters在内存中的偏移地址 WAITERS = U.objectFieldOffset( FutureTask.class.getDeclaredField(“waiters”)); } catch (ReflectiveOperationException e) { throw new Error(e); } Class<?> ensureLoaded = LockSupport.class; } 在创建FutureTask对象之前,有一个非常重要的类sun.misc.Unsafe。上面静态代码块中的功能主要是获取该对象的字段在内存中的偏移地址,获取这些偏移地址的作用是为了直接操作内存中某个变量的变量值做准备的,防止多线程并发带来的问题。在学习C或者是C++的时候知道,如果知道了某个对象的某个字段的内存地址话,那我们就可以直接通过地址的方式来更新该字段的内存值了。 public FutureTask(Callable< V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
由于FutureTask是个Runnable,一旦线程池或者子线程执行,优先执行 run()方法,那么,看看它的源码: public class FutureTask< V> implements RunnableFuture< V> { … public void run() { //如果当前线程的状态不是原始状态,或者当前线程已经赋值成功,则不用重复执行该逻辑 if (state != NEW || !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread())) return; try { //将传进来的callable对象赋值给一个临时变量 Callable< V> c = callable; //判断传入进来的callable对象不为空并且线程状态也是新建的 if (c != null && state == NEW) { V result; boolean ran; try { //原来我们总是说的call()方法原来是在run方法中执行的,然后call()返回一个泛型类型的返回值 ,这种通过实现接口的方法在我们平时中是很常见的吧. result = c.call(); ran = true; //在定义call方法的时候抛出异常,然后这里捕捉异常进行处理,因为我们在call方法中写代码难免会有异常问题的 } catch (Throwable ex) { result = null; ran = false; setException(ex); } //如果call方法中不跑出异常的话,则通过set()方法将结果保存起来,该set()方法其实也是Future接口定义的方法 if (ran) set(result); } } finally { runner = null; int s = state; //如果当前的状态不是 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } … } state刚开始值为NEW,如果不等于NEW,说明已经被重新赋值了。成员变量runner的值为null,把当前所在线程赋值给runner, 如果成功,则返回 true;如果runner不为null, 或者赋值失败,返回false,则说明已经赋过值了,不用重复操作。 call()方法就是在run()方法中执行的。 boolean ran是判断Callable中执行耗时操作是否有异常,是否成功的标识。如果异常了,result会置空,ran = false; 然后执行 setException(ex);操作,看看代码 : protected void setException(Throwable t) { if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) { outcome = t; U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state finishCompletion(); } } 第一行是把成员变量state的值由NEW变为COMPLETING,然后把异常赋值给outcome,赋值成功后,把state的值,由COMPLETING变为EXCEPTION,然后执行finishCompletion()方法,此方法是一个循环执行。 如果 result = c.call(); 代码中没有异常,正常执行,就会调用set方法。 protected void set(V v) { //state的值由NEW变成COMOLETING if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) { //最后将结果赋值给成员变量 outcome outcome = v; //更新state状态为NORMAL U.putOrderedInt(this, STATE, NORMAL); finishCompletion(); } } boolean compareAndSwapInt()方法是Unsafe中的本地方法,主要作用是用于在多线程中并发的时候修改和读取某个值,其主要原理:根据传入的期望的数据跟内存中的数据进行对比,如果期望的数据跟内存中的数据相同的话,说明该变量的值没有被其他的线程修改过,同时将我们需要更改的新数据替换内存中的数据,修改成功之后并且返回true,表示的是修改新数据成功了。相反如果有其他线程修改了内存中的则放弃更新新数据,并且返回true。它有三个参数:object 表示更改数据的对象;offset 表示对象上字段的偏移地址;expectedValue 表示期望的数据,该数据的作用是用于跟内存中的数据进行比较,如果两者不相等的话说明内存中的数据被其他线程修改过;newValue表示更新的值。最后如果更新某个内存地址的值成功的话,则返回true,否则返回false。这个也是我们平时用到的多线程并发的原理基石理论:CAS(Compare and Swap, 翻译成比较并交换)。
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { //根据寻找内存地址的方式来修改属性在内存中的值,将该waiters对象置为null if (U.compareAndSwapObject(this, WAITERS, q, null)) { for ( ; ; ) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //回调Future接口的方法,标识程序正常的结束了,我们需要重写该方法. done(); //特别需要注意的是需要将该接口变量置为空,防止出现因为引用问题导致内存泄漏 callable = null; // to reduce footprint, } 当执行到该方法的时候,就标识程序正常的运行结束了,首先会将所有等待的线程全部唤醒,因为在执行FutureTask任务的时候调用get()方法是阻塞的,因为call()方法都还没有执行完成,这个时候你是获取不到任何结果的,所以会将当前调用get()方法的线程阻塞等待,直到调用finishCompletion()方法来解除线程阻塞,最后调用done()方法,这个时候我们就可以在该结束方法中执行我们想要的逻辑了;从代码中我们可以看出done()方法其实也还是运行在子线程的,所以我们不可以在done()方法中更新UI,还是需要Handler来发送消息的。 在线程全部执行结束之后,我们就可以在done()方法通过调用get()方法来获取最后执行的结果了,也就是刚刚在set()方法中看到的outcome的值。 public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } 之前在调用set()方法时通过寻址(根据内存地址的偏移量)的方式修改过了state的值为NORMAL了,所以NORMAL大于COMPLETING,最后直接调用report()方法,最后直接通过return x 来返回结果。 private V report(int s) throws ExecutionException { Object x = outcome; if (s==NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } 我们通过调用Future接口的isDone()来判断程序是否结束,可以直接根据state的状态判断是否是新创建的,该类的线程有7中不同的状态,主要状态切换成其中的一种我们就可以说程序结束了。 private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
public boolean isDone() { return state != NEW; } 只要状态值大于CANCELLED(4),也就是用户主动调用cancel()方法,不管是主动中断线程还是其他的方式都属于取消的操作的。 public boolean isCancelled() { return state >= CANCELLED; } 当程序里面的FutureTask未执行完成的时候get()方法会一直阻塞调用该方法的线程,直到FutureTask里面的任务执行完才会解除阻塞。所以get()方法是一个阻塞式的去获取结果的,从上面的get()方法的代码中我们可以得出 当状态还是NEW的时候,会调用awaitDone(false ,0)方法。 private int awaitDone(boolean timed, long nanos) throws InterruptedException { long startTime = 0L; // Special value 0L means not yet parked WaitNode q = null; boolean queued = false; for ( ; ; ) { int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s==COMPLETING) // We may have already promised (via isDone) that we are done, so never return empty-handed or throw InterruptedException Thread.yield(); else if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } else if (q == null) { if (timed && nanos <= 0L) return s; q = new WaitNode(); } else if (!queued) queued = U.compareAndSwapObject(this, WAITERS, q.next = waiters, q); else if (timed) { … } else LockSupport.park(this); } } 该方法有个无限循环直到状态值大于COMPLETING才返回一个状态值,我们在线程未执行完成的时候调用了get()方法,可以看到首先会创建一个WaitNode对象,然后通过Unsafe类来更新成员变量waiter的值为 q,然后再次循环最后会进入LockSupport.park(this) 分支,该函数主要是获取许可阻塞当前的线程,直到程序执行结束之后,调用LockSupport.unpark(this)来释放阻塞。所以如果我们在主线程中直接调用get()方法来获取结果的话则很有可能导致ANR,直到程序结束之后才会释放阻塞的,正确的用法就是在done()方法里面调用get()来获取执行的结果的。
我们平时在使用AsyncTask的时候有一个cancel()方法来取消当前执行的任务,我们之前也说了AsyncTask的本质其实也是使用了FutureTask来实现的。其实它的cancel()方法也是调用FutureTask的取消方法的,下面看看取消的原理: //如果返回值为true的话表示取消成功,否则为取消失败了 public boolean cancel(boolean mayInterruptIfRunning) { //首先判断当前的状态是否是NEW,然后再通过Unsafe类去更新内存中state字段的值为cancel。 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { //如果以上的状态值设置成功的话,则判断是否设置中断运行 if (mayInterruptIfRunning) { try { Thread t = runner; //直接通过调用Thread的中断方法来强制中断当前运行的线程 if (t != null) t.interrupt(); } finally { // final state //最后修改当前状态state的值为 INTERRUPTED 为中断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //最后解锁所有被阻塞的线程 finishCompletion(); } return true; } 第一行代码,是一个校验,如果state值为NEW, 根据mayInterruptIfRunning是否为true把state值由NEW变为INTERRUPTING或者CANCELLED,如果都成功了,则继续执行下一步,否则,终止,返回失败。比如已经执行完耗时操作,state值已经变成了COMPLETING,此时就不能取消了,直接返回false,表示失败;如果还没开始执行,或者执行了,但还没执行完,还在耗时操作之中,则根据 mayInterruptIfRunning 值,state 值由NEW变为INTERRUPTING或CANCELLED,此时,即使执行了run()方法,set(v)时也会有if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING))这个校验,此时state为INTERRUPTING或者CANCELLED,就不会继续赋值了。 继续往下看, 如果mayInterruptIfRunning为true,则会获取到耗时操作的线程,如果该线程已经执行,说明线程存在,不为空,则把它强制终止,然后会把state值由INTERRUPTING变为INTERRUPTED,表示已经终止,INTERRUPTING的意思是终止进行中。然后会执行 finishCompletion(); 方法。 我们在取消任务的时候可以设置强制中断线程运行,只要调用cancel(true) 就行了,有时候我们调用cancel(false)并不能立刻停止线程执行完成的,因为这个时候程序在run()方法中已经执行过了状态(state)值判断的话,这个时候就直接执行call()方法了,但是call()方法也没有执行完成,如果这个时候我们去取消的话, 因为我们知道取消的原理就是使用Unsafe类去修改内存中的state的值,但是这个时候设置已经来不急了。
虽然我们调用了cancel(false)方法去取消任务的,但是很多的时候还是不能马上终止任务执行,最后线程还是会继续执行的,但是到了set()方法的时候,这里会有一个状态值的判断的。之前我们已经介绍了线程并发的基石CAS,首先我们使用Unsafe类去比较state状态值是否发生了变化,如果state的值被其他的线程修改了,则不会调用done()方法了。 protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { … } }
|