ThreadPoolExecutor의 submit () 메서드 블록이 포화 상태 인 경우 만드는 방법은 무엇입니까? 만들고 싶습니다 .

ThreadPoolExecutor최대 크기에 도달하고 대기열이 가득 차면 새 작업을 추가하려고 할 때 submit()메서드가 차단 되도록 만들고 싶습니다 . 이를 위해 사용자 정의를 구현해야 RejectedExecutionHandler합니까 아니면 표준 Java 라이브러리를 사용하여이를 수행하는 기존 방법이 있습니까?



답변

방금 찾은 가능한 해결책 중 하나 :

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

다른 해결책이 있습니까? RejectedExecutionHandler이러한 상황을 처리하는 표준 방법처럼 보이기 때문에 기반으로하는 것을 선호 합니다.


답변

ThreadPoolExecutor 및 blockingQueue를 사용할 수 있습니다.

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}


답변

CallerRunsPolicy호출 스레드에서 거부 된 작업을 실행하는를 사용해야합니다 . 이렇게하면 해당 작업이 완료 될 때까지 실행 프로그램에 새 작업을 제출할 수 없습니다.이 시점에서 여유 풀 스레드가 생기거나 프로세스가 반복됩니다.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

문서에서 :

거부 된 작업

execute (java.lang.Runnable) 메서드에 제출 된 새 작업은 Executor가 종료 될 때와 Executor가 최대 스레드와 작업 대기열 용량 모두에 대해 유한 경계를 사용하고 포화 상태 일 때 거부됩니다. 두 경우 모두 execute 메소드는 RejectedExecutionHandler의 RejectedExecutionHandler.rejectedExecution (java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 메소드를 호출합니다. 4 개의 사전 정의 된 핸들러 정책이 제공됩니다.

  1. 기본 ThreadPoolExecutor.AbortPolicy에서 핸들러는 거부시 런타임 RejectedExecutionException을 발생시킵니다.
  2. ThreadPoolExecutor.CallerRunsPolicy에서 실행을 호출하는 스레드가 작업을 실행합니다. 이것은 새로운 작업이 제출되는 속도를 늦추는 간단한 피드백 제어 메커니즘을 제공합니다.
  3. ThreadPoolExecutor.DiscardPolicy에서 실행할 수없는 작업은 단순히 삭제됩니다.
  4. ThreadPoolExecutor.DiscardOldestPolicy에서 실행 프로그램이 종료되지 않으면 작업 대기열의 헤드에있는 작업이 삭제 된 다음 실행이 다시 시도됩니다 (다시 실패하여이 작업이 반복 될 수 있음).

또한 ThreadPoolExecutor생성자를 호출 할 때 ArrayBlockingQueue와 같은 제한된 큐를 사용해야합니다 . 그렇지 않으면 아무것도 거부되지 않습니다.

편집 : 귀하의 의견에 대한 응답으로 ArrayBlockingQueue의 크기를 스레드 풀의 최대 크기와 동일하게 설정하고 AbortPolicy를 사용하십시오.

편집 2 : 좋아, 당신이 무엇을 얻고 있는지 봅니다. 이건 어떨까요 :를 초과하지 않는지 beforeExecute()확인하기 위해 메서드를 재정의하고 getActiveCount()초과하지 않으면 getMaximumPoolSize()잠자고 다시 시도합니까?


답변

Hibernate BlockPolicy는 간단하고 원하는 것을 할 수 있습니다 :

참조 : Executors.java

/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}


답변

BoundedExecutor에서 위의 인용 답변을 연습 자바 동시성 실행 프로그램에 대한 무제한의 큐를 사용하거나 결합 세마포어는 큐의 크기보다 더 큰없는 경우에만 제대로 작동합니다. 세마포어는 제출 스레드와 풀의 스레드간에 공유되는 상태이므로 큐 크기 <바운드 <= (큐 크기 + 풀 크기) 인 경우에도 실행기를 포화시킬 수 있습니다.

사용 CallerRunsPolicy은 작업이 영원히 실행되지 않는 경우에만 유효합니다.이 경우 제출 스레드가 rejectedExecution영원히 유지되며 제출 스레드가 새 작업을 제출할 수 없기 때문에 작업을 실행하는 데 오랜 시간이 걸리는 경우에는 좋지 않습니다. 작업 자체를 실행하는 경우 다른 작업을 수행하십시오.

허용되지 않는 경우 작업을 제출하기 전에 실행자의 제한된 대기열 크기를 확인하는 것이 좋습니다. 대기열이 가득 찬 경우 다시 제출하기 전에 잠시 기다리십시오. 처리량은 저하되지만 제안 된 다른 많은 솔루션보다 더 간단한 솔루션이며 거부되는 작업이 없음을 보장합니다.


답변

나는 그것이 해킹이라는 것을 알고 있지만 내 생각에는 여기에서 제공되는 것들 사이의 가장 깨끗한 해킹 😉

ThreadPoolExecutor는 “put”대신 차단 대기열 “offer”를 사용하므로 차단 대기열의 “offer”동작을 재정의 할 수 있습니다.

class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {

    BlockingQueueHack(int size) {
        super(size);
    }

    public boolean offer(T task) {
        try {
            this.put(task);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }
}

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));

나는 그것을 테스트했고 작동하는 것 같습니다. 시간 제한 정책을 구현하는 것은 독자의 연습으로 남겨집니다.


답변

다음 클래스는 ThreadPoolExecutor를 감싸고 Semaphore를 사용하여 차단하면 작업 대기열이 가득 찼습니다.

public final class BlockingExecutor {

    private final Executor executor;
    private final Semaphore semaphore;

    public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
        this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }

    private void execImpl (final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            // will never be thrown with an unbounded buffer (LinkedBlockingQueue)
            semaphore.release();
            throw e;
        }
    }

    public void execute (Runnable command) throws InterruptedException {
        execImpl(command);
    }
}

이 래퍼 클래스는 Brian Goetz의 Java Concurrency in Practice 책에 제공된 솔루션을 기반으로합니다. 이 책의 솔루션은 두 개의 생성자 매개 변수 인 an Executor과 세마포에 사용되는 경계 만 사용합니다. 이것은 Fixpoint가 제공하는 답변에 나와 있습니다. 이 접근 방식에는 문제가 있습니다. 풀 스레드가 사용 중이고 큐가 꽉 찼지만 세마포어가 방금 허가를 해제 한 상태가 될 수 있습니다. ( semaphore.release()finally 블록에서). 이 상태에서 새 작업은 방금 해제 된 허가를 얻을 수 있지만 작업 대기열이 가득 차서 거부됩니다. 물론 이것은 당신이 원하는 것이 아닙니다. 이 경우 차단하고 싶습니다.

이를 해결하려면 JCiP가 명확하게 언급했듯이 제한되지 않은 대기열을 사용해야합니다. 세마포어는 가드 역할을하여 가상 큐 크기의 영향을줍니다. 이것은 유닛이 maxPoolSize + virtualQueueSize + maxPoolSize작업 을 포함 할 수 있다는 부작용이 있습니다 . 왜 그런 겁니까? 왜냐하면
semaphore.release()finally 블록에서. 모든 풀 스레드가 동시에이 문을 호출하면 maxPoolSize허용이 해제되어 동일한 수의 작업이 장치에 들어갈 수 있습니다. 제한된 대기열을 사용하는 경우 여전히 가득 차서 작업이 거부됩니다. 이제 이것은 풀 스레드가 거의 완료되었을 때만 발생한다는 것을 알고 있으므로 문제가되지 않습니다. 풀 스레드가 차단되지 않으므로 곧 대기열에서 작업을 가져옵니다.

그래도 제한된 대기열을 사용할 수 있습니다. 크기가 virtualQueueSize + maxPoolSize. 더 큰 크기는 쓸모가 없으며 세마포어는 더 많은 항목을 허용하지 않습니다. 크기가 작을수록 작업이 거부됩니다. 작업이 거부 될 가능성은 크기가 줄어들수록 증가합니다. 예를 들어, maxPoolSize = 2 및 virtualQueueSize = 5 인 제한된 실행기를 원한다고 가정합니다. 그런 다음 5 + 2 = 7 허용과 5 + 2 = 7의 실제 큐 크기로 세마포어를 가져옵니다. 유닛에있을 수있는 실제 작업 수는 2 + 5 + 2 = 9입니다. 실행기가 가득 차고 (대기열에 5 개 작업, 스레드 풀에 2 개, 따라서 0 개 허용 가능) 모든 풀 스레드가 허용을 해제하면 들어오는 작업이 정확히 2 개 허용을 가져올 수 있습니다.

이제 JCiP의 솔루션은 이러한 모든 제약 (제한되지 않은 대기열 또는 해당 수학 제한 등)을 적용하지 않기 때문에 사용하기가 다소 번거 롭습니다. 나는 이것이 이미 사용 가능한 부분을 기반으로 새 스레드 안전 클래스를 빌드 할 수있는 방법을 보여주는 좋은 예일 뿐이지 만 완전히 성장하고 재사용 가능한 클래스가 아니라고 생각합니다. 나는 후자가 저자의 의도라고 생각하지 않습니다.