「并发进阶」Future

「并发进阶」Future明天的你会感谢今天努力的你举手之劳,加个关注Future框架概述JDK中的Future框架实际就是Future 模式的实现,通常情况下我们会配合线程池使用,但也可以单独使用;下面我们就单独使用简单举例:应用实例// demo 片段Futur

大家好,欢迎来到IT知识分享网。

明天的你会感谢今天努力的你

举手之劳,加个关注

Future框架概述

JDK中的Future框架实际就是Future 模式的实现,通常情况下我们会配合线程池使用,但也可以单独使用;下面我们就单独使用简单举例:

应用实例

// demo 片段 FutureTask<String> future = new FutureTask<>(()->{ log.info("异步任务执行..."); Thread.sleep(2000); log.info("过来很久很久..."); return "异步任务完成"; }); log.info("启动异步任务..."); new Thread(future).start(); log.info("继续其他任务..."); Thread.sleep(1000); log.info("获取异步任务结果:{}",future.get());
[15:38:03,231 INFO ] [main] - 启动异步任务... [15:38:03,231 INFO ] [main] - 继续其他任务... [15:38:03,231 INFO ] [Thread-0] - 异步任务执行... [15:38:05,232 INFO ] [Thread-0] - 过了很久很久... [15:38:05,236 INFO ] [main] - 获取异步任务结果:异步任务完成

首先我们将要执行的任务包装成 Callable , 这里如果不需要返回值也可以使用 Runnable ; 然后构建 FutureTask 由一个线程启动,最后使用 Future.get() 获取异步任务结果;

Future运行逻辑

对于Future模式的流程图如下

「并发进阶」Future

「并发进阶」Future

对比上面的实例代码, 大家可能会发现有些不一样,因为在FutureTask 同事继承了 Runnable 和 Future 接口, 所以在提交任务后没有返回Future , 而是直接使用自身调用get; 下面我们就对源码进行分析;

public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); } public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; // 任务运行状态 private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callable<V> callable; // 异步任务 /** The result to return or exception to throw from get() */ private Object outcome; //返回结果 non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; // 异步任务执行线程 /** Treiber stack of waiting threads */ private volatile WaitNode waiters; //等待异步结果的线程栈(通过Treiber stack 算法实现) public FutureTaks(Callable<V> callable){ if(callable == null){ throw new NullPointerException(); } this.callable = callable ; this.state = NEW; } public FutureTask(Runnable runnable, V result){ this.callable = Exceutors.callable(runnable,result); this.state = NEW ; } }

另外在代码中还可以看见有很多地方都是用了 CAS 来更新变量, 而JDK1.6 甚至使用了 AQS 来实现,其原因就是同一个 FutureTask 可以多个线程同时提交, 也可以多个线程同时获取; 所以代码中有很多的状态变量;

// FutureTask.state 取值 private static final int NEW = 0; // 初始化到结果返回前 private static final int COMPLETING = 1; // 结果赋值 private static final int NORMAL = 2; // 执行完毕 private static final int EXCEPTIONAL = 3; // 执行异常 private static final int CANCELLED = 4; // 任务取消 private static final int INTERRUPTING = 5; // 设置中断状态 private static final int INTERRUPTED = 6; // 任务中断

同时源码的注释中也详细给出了可能出现的状态转换:

  • NEW -> COMPLETING -> NORMAL //任务正常执行
  • NEW-> COMPLETING -> EXCEPTION // 任务执行异常
  • NEW -> CANCELLED // 任务取消
  • NEW -> INITERRUPTING -> INTERRUPTED // 任务中断

注意这里的 COMPLETING 状态是一个很微妙的状态, 正因为有他的存在才能实现无锁赋值; 大家先留意这个状态,然后代码中应该能体会到; 另外这里有一个变量需要注意, WaitNode ; 使用 Treiber stack 算法实现的无锁栈; 其中原理说明可以 参考下面的第三节:

public void run(){ if(state != NEW //确保任务执行完成后,不在重复执行 !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())){// 确保只有一个线程执行 return ; } }

任务执行

public void run() { if (state != NEW || // 确保任务执行完成后,不再重复执行 !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) // 确保只有一个线程执行 return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); // 设置异常结果 } if (ran) set(result); // 设置结果 } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); // 确保中断状态已经设置 } }
// 设置异步任务结果 protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保证结果只能设置一次 outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); // 唤醒等待线程 } }
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保证结果只能设置一次 outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }

任务取消

public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && // 只有在任务执行阶段才能取消 UNSAFE.compareAndSwapInt(this, stateOffset, NEW, // 设置取消状态 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }

注意 cancel(false) 也就是仅取消, 并没有打断;异步任务会继续执行,只是这里首先设置了 FutureTask.state = CANCELLED, 所以最后在设置结果的时候会失败, UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING);

获取结果

public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); // 阻塞等待 return report(s); } private V report(int s) throws ExecutionException { // 根据最后的状态返回结果 Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); // 移除等待节点 throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 任务已完成 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 正在赋值,直接先出让线程 Thread.yield(); else if (q == null) // 任务还未完成需要等待 q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 使用 Treiber stack 算法 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }

Treiber stack

创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上, 同时还要维护数据的一致性

jcip-annotations.jar @ThreadSafe

@ThreadSafe public class ConcurrentStack <E> { AtomicReference<Node<E>> top = new AtomicReference<>(); private static class Node <E> { public final E item; public Node<E> next; public Node(E item) { this.item = item; } } public void push(E item) { Node<E> newHead = new Node<>(item); Node<E> oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while (!top.compareAndSet(oldHead, newHead)); } public E pop() { Node<E> oldHead; Node<E> newHead; do { oldHead = top.get(); if (oldHead == null) return null; newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; } }

总结

  • 总体来讲源码比较简单,因为本身只是一个Future 模式的实现
  • 但是其中的状态量的设置,还有很多无锁的处理方式,才是 FutureTask 带给我们的精华

「并发进阶」Future

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/85850.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信