基础
Java
Java
  • 基础知识
    • Java 语言的特点
    • Java 基础
      • 语法基础
      • 类型
      • 泛型
      • 注解
      • 异常
      • 反射机制
      • Java 容器
    • Java IO
      • 基础IO
      • NIO
    • Java 并发
      • Java 内存模型
        • 主内存与工作内存
        • 对于 volatile 型变量的特殊规则
        • long 和 double 的非原子性协定
        • 原子性、可见性与有序性
        • 先行发生(Happens-Before)原则
      • 线程
        • 状态转换
        • 线程安全性
          • 对象的共享
            • 可见性
            • 线程封闭
            • 不可变性
            • 安全发布
          • 在现有的线程安全类中添加功能
        • 线程池
          • Executor 框架
          • ExecutorService
          • Executors
          • Future
          • CompletionService
          • 设置线程池的大小
          • ThreadPoolExecutor
      • 线程安全的容器
        • 同步容器类
        • 并发容器
          • ConcurrentHashMap
          • CopyOnWriteArrayList
          • BlockingQueue
            • 串行线程封闭
            • 双端队列与工作密取
      • 任务取消
        • 自定义的取消标志
        • 线程中断
        • 通过 Future 来实现取消
      • 条件队列
        • 内置条件队列
        • 显式的 Condition 对象
      • JUC 中的 AQS
        • AbstractQueuedSynchronizer
        • ReentrantLock
        • ReentrantReadWriteLock
        • Semaphore
        • CountDownLatch
      • 原子变量
        • CAS
        • 原子变量类
        • ABA 问题
        • 非阻塞算法
          • 非阻塞的栈
          • 非阻塞的链表(X)
    • Java 虚拟机
      • JVM 的运行机制
      • 类加载器
      • 运行时数据区
        • JVM 的内存区域
        • 永久代与元空间
        • OutOfMemoryError
      • Java 中的 4 种引用类型
      • 垃圾收集(GC)
        • 如何确定垃圾
        • 垃圾回收算法
        • 垃圾收集器
          • Serial 收集器
          • ParNew 收集器
          • Parallel Scavenge 收集器
          • Serial Old 收集器
          • Parallel Old 收集器
          • CMS 收集器
          • Garbage First 收集器
  • Group 1
    • JDK 与 JRE
    • JVM默认配置
    • java与HTTPS
    • 构建高效且可伸缩的结果缓存
    • 基础补充
      • 在 Switch 中使用 String
      • 为什么 Java 语言不支持多重继承?
      • 为什么在重写 equals 方法的时候需要重写 hashCode 方法
      • 为什么 String 要设计为不可变的?
      • 移位运算符
      • SPI 机制
      • 为何 HashMap 不是线程安全的
      • Class.forName() 和ClassLoader.loadClass() 区别
      • synchronized 关键字
    • 零拷贝
    • Java中的锁优化技术
      • 自旋锁与自适应自旋
      • 锁消除
      • 锁粗化
      • 轻量级锁
      • 偏向锁
    • Arthas
    • Thread.sleep()、Object.wait()、Condition.await()、LockSupport.park()
由 GitBook 提供支持
在本页
  1. 基础知识
  2. Java 并发
  3. 线程
  4. 线程池

CompletionService

如果向 Executor 提交了一组计算任务,并且希望在计算完成后获得结果,可以使用 CompletionService 来完成。

CompletionService 将 Executor 和 BlockingQueue 的功能融合在一起。可以将 Callable 任务提交给它来执行,然后使用类似于队列操作的 take 和 poll 等方法来获得已完成的结果,而这些结果会在完成时将被封装为 Future。

ExecutorCompletionService 实现了 CompletionService,并将计算部分委托给一个 Executor。

ExecutorCompletionService
public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
    
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }
    
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }
 
    .....   
} 
   /**
     * FutureTask extension to enqueue upon completion.
     */
    private static class QueueingFuture<V> extends FutureTask<Void> {
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        
        protected void done() { 
            completionQueue.add(task); 
        }
    }
FutureTask 中 run 方法调用了 done 方法
public class FutureTask<V> implements RunnableFuture<V> {
	
	public void run() {
        ...
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            ...
        }
    }
    
    protected void set(V v) {
        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
            ...
            finishCompletion();
        }
    }
    
    private void finishCompletion() {
        ...

        done();

        callable = null;        // to reduce footprint
    }
    
}

ExecutorCompletionService 的实现非常简单:

  • 在构造函数中创建一个 BlockingQueue 来保存计算完成的结果。

  • 当计算完成时,调用 FutureTask 中的 done 方法。

  • 当提交某个任务时,该任务将首先包装为一个 QueueingFuture,这是 FutureTask 的一个子类,然后再改写子类的 done 方法,并将结果放入 BlockingQueue 中。

上一页Future下一页设置线程池的大小

最后更新于9个月前