스레드 풀과 함께 MDC를 사용하는 방법은 무엇입니까? 스레드에서 MDC.getCopyOfContextMap ()을

소프트웨어에서 MDC를 광범위하게 사용하여 웹 요청에 대한 세션 ID 및 사용자 이름과 같은 것을 추적합니다. 원래 스레드에서 실행하는 동안 제대로 작동합니다. 그러나 백그라운드에서 처리해야 할 것들이 많이 있습니다. 이를 위해 자체 롤링 된 비동기 실행 서비스와 함께 java.concurrent.ThreadPoolExecutorjava.util.Timer클래스를 사용합니다 . 이러한 모든 서비스는 자체 스레드 풀을 관리합니다.

이러한 환경에서 MDC를 사용하는 것에 대한 Logback의 설명서 는 다음과 같이 말합니다.

맵핑 된 진단 컨텍스트의 사본이 시작 스레드에서 작업자 스레드로 항상 상속 될 수는 없습니다. java.util.concurrent.Executors가 스레드 관리에 사용되는 경우입니다. 예를 들어, newCachedThreadPool 메소드는 ThreadPoolExecutor를 작성하고 다른 스레드 풀링 코드와 같이 복잡한 스레드 작성 로직을 가지고 있습니다.

이러한 경우, 실행 프로그램에 태스크를 제출하기 전에 원래 (마스터) 스레드에서 MDC.getCopyOfContextMap ()을 호출하는 것이 좋습니다. 작업이 첫 번째 작업으로 실행될 때 MDC.setContextMapValues ​​()를 호출하여 원래 MDC 값의 저장된 사본을 새 Executor 관리 스레드와 연관시켜야합니다.

이것은 좋을 것입니다. 그러나 이러한 호출을 추가하는 것을 잊어 버리는 것은 매우 쉽고, 너무 늦을 때까지 문제를 인식하는 쉬운 방법은 없습니다. Log4j의 유일한 표시는 로그에서 MDC 정보가 누락되고 Logback을 사용하면 오래된 MDC 정보를 얻는다는 것입니다 (트레드 풀의 스레드가 실행 된 첫 번째 작업에서 MDC를 상속하기 때문에). 둘 다 프로덕션 시스템에서 심각한 문제입니다.

나는 우리의 상황을 어떤 식 으로든 특별하게 보지 못했지만 웹 에서이 문제에 대해 많이 찾지 못했습니다. 분명히 이것은 많은 사람들이 부딪 치는 것이 아니므로 피할 수있는 방법이 있어야합니다. 우리 여기서 뭐하는거야?



답변

예, 이것은 내가 겪었던 일반적인 문제입니다. 설명 된대로 수동 설정과 같은 몇 가지 해결 방법이 있지만 이상적으로는

  • MDC를 일관되게 설정합니다.
  • MDC가 부정확하지만 모르는 버그를 피하십시오. 과
  • 당신이 스레드 풀을 사용하는 방법에 최소화 변경 (예를 들어, 서브 클래스 CallableMyCallable모든 곳에서, 또는 유사한 추함).

다음은이 세 가지 요구를 충족시키는 솔루션입니다. 코드는 설명이 필요합니다.

참고로 Guava ‘s MoreExecutors.listeningDecorator()를 사용하는 경우이 실행 프로그램을 작성하여 Guava ‘s 에 제공 할 수 있습니다 ListanableFuture.

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}


답변

우리는 비슷한 문제에 부딪쳤다. 새 스레드를 시작 / 중지하기 전에 필요한 MDC 호출을 작성하기 위해 ThreadPoolExecutor를 확장하고 before / afterExecute 메소드를 대체 할 수 있습니다.


답변

IMHO 최고의 솔루션은 다음과 같습니다.

  • 사용하다 ThreadPoolTaskExecutor
  • 당신의 자신을 구현 TaskDecorator
  • 그걸 써: executor.setTaskDecorator(new LoggingTaskDecorator());

데코레이터는 다음과 같습니다.

private final class LoggingTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable task) {
        // web thread
        Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
        return () -> {
            // work thread
            try {
                // TODO: is this thread safe?
                MDC.setContextMap(webThreadContext);
                task.run();
            } finally {
                MDC.clear();
            }
        };
    }

}


답변

이것이 고정 스레드 풀 및 실행 프로그램으로 수행하는 방법입니다.

ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

스레딩 부분에서 :

executor.submit(() -> {
    MDC.setContextMap(mdcContextMap);
    // my stuff
});


답변

이전에 게시 된 솔루션과 마찬가지로, newTaskFor방법 RunnableCallable를 만들 때 인수를 마무리하기 위해 덮어 쓸 수 있습니다 (허용 솔루션을 참조) RunnableFuture.

참고 : 따라서 메소드 대신 executorServicesubmit메소드를 호출해야합니다 execute.

의 경우 ScheduledThreadPoolExecutordecorateTask방법 대신 덮어 쓸 것입니다.


답변

@Async주석을 사용하여 태스크를 실행하는 스프링 프레임 워크 관련 환경에서이 문제점에 직면 하는 경우 TaskDecorator 접근법 을 사용하여 태스크를 장식 할 수 있습니다 . 그것을하는 방법의 샘플은 여기에 제공됩니다 : https://moelholm.com/blog/2017/07/24/spring-43-using-a-taskdecorator-to-copy-mdc-data-to-async-threads

나는이 문제에 직면했으며 위의 기사는이 문제를 해결하는 데 도움이되었으므로 여기서 공유하고 있습니다.


답변

여기에있는 기존 답변과 유사한 또 다른 변형 ExecutorService은 대리자를 전달하고 전달하는 것입니다. 그런 다음 제네릭을 사용하면 통계를 얻으려는 경우 (다른 수정 방법을 사용하지 않는 한) 실제 대리인을 계속 노출시킬 수 있습니다.

참조 코드 :

public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {

    private final D delegate;

    public MDCExecutorService(D delegate) {
        this.delegate = delegate;
    }

    @Override
    public void shutdown() {
        delegate.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return delegate.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return delegate.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return delegate.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.awaitTermination(timeout, unit);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return delegate.submit(wrap(task), result);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return delegate.invokeAny(wrapCollection(tasks));
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        delegate.execute(wrap(command));
    }

    public D getDelegate() {
        return delegate;
    }

    /* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
    /concurrent/MDCWrappers.java */

    private static Runnable wrap(final Runnable runnable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Callable<T> wrap(final Callable<T> callable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return (t) -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                consumer.accept(t);
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
        Collection<Callable<T>> wrapped = new ArrayList<>();
        for (Callable<T> task : tasks) {
            wrapped.add(wrap(task));
        }
        return wrapped;
    }
}