파이썬에서 멀티 프로세싱을 사용하는 동안 어떻게 로그인해야합니까? 그것을 쓰기 여러 프로세스를함으로써 (또는

현재 Python 2.6 multiprocessing모듈을 사용하여 여러 프로세스를 생성하는 프레임 워크에 중앙 모듈이 있습니다. 를 사용하기 때문에 multiprocessing모듈 수준의 다중 처리 인식 로그가 LOG = multiprocessing.get_logger()있습니다. 당 워드 프로세서 ,이 로거는 제대로 해석 것들에하지 않도록 프로세스 공유 잠금을 가지고 sys.stderr동시에 그것을 쓰기 여러 프로세스를함으로써 (또는 무엇이든 핸들).

내가 가진 문제는 프레임 워크의 다른 모듈이 다중 처리를 인식하지 못한다는 것입니다. 내가 보는 방식으로,이 중앙 모듈에 대한 모든 종속성이 다중 처리 인식 로깅을 사용하도록해야합니다. 그것은 프레임 워크의 모든 클라이언트 에게는 물론 프레임 워크 에서 성가신 일입니다. 내가 생각하지 않는 대안이 있습니까?



답변

방해하지 않고 이것을 처리하는 유일한 방법은 다음과 같습니다.

  1. 각 작업자 프로세스를 생성하여 로그가 다른 파일 디스크립터 (디스크 또는 파이프) 로 이동하도록 하는 것이 가장 좋습니다. 모든 로그 항목의 타임 스탬프를 지정해야합니다.
  2. 그런 다음 컨트롤러 프로세스는 다음 중 하나 를 수행 할 수 있습니다 .
    • 디스크 파일을 사용하는 경우 : 실행이 끝날 때 로그 파일을 타임 스탬프별로 정렬하여 병합 합니다.
    • 파이프를 사용하는 경우 (권장) : 모든 파이프에서 중앙 로그 파일로 즉시 로그 항목을 병합합니다. (예 : select파이프의 파일 디스크립터에서 주기적 으로 사용 가능한 로그 항목에 대해 병합 정렬을 수행하고 중앙 로그로 플러시하십시오. 반복하십시오.)

답변

방금 파이프를 통해 모든 것을 부모 프로세스에 공급하는 로그 핸들러를 작성했습니다. 나는 그것을 10 분 동안 만 테스트했지만 꽤 잘 작동하는 것 같습니다.

( 참고 : 이 하드 코딩되어 RotatingFileHandler내 유스 케이스입니다.)


업데이트 : @javier 지금 Pypi에 패키지로이 방법을 사용할 수 유지 – 볼 멀티 프로세싱 로깅 에서 Pypi에, github에 https://github.com/jruere/multiprocessing-logging을


업데이트 : 구현!

이제는 동시성을 올바르게 처리하기 위해 큐를 사용하고 오류를 올바르게 복구합니다. 나는 이것을 몇 달 동안 프로덕션 환경에서 사용 해 왔으며 아래의 현재 버전은 문제없이 작동합니다.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)


답변

QueueHandlerPython 3.2 이상에서 기본으로 제공되며 정확하게 수행합니다. 이전 버전에서 쉽게 복제됩니다.

파이썬 문서에는 두 가지 완전한 예제가 있습니다. 여러 프로세스에서 단일 파일에 로깅

Python <3.2를 사용하는 QueueHandler사용자는 https://gist.github.com/vsajip/591589 에서 자신의 코드로 복사 하거나 import logutils를 사용하십시오 .

각 프로세스 (부모 프로세스 포함)는에 로깅을 Queue기록한 다음 listener스레드 또는 프로세스 (예 : 각 프로세스마다 하나의 예제가 제공됨)에서 해당 프로세스를 선택하여 파일에 기록합니다. 손상이나 손상의 위험이 없습니다.


답변

다음은 Google에서 온 다른 사람 (예 : 나)의 단순성에 중점을 둔 또 다른 솔루션입니다. 로깅이 쉬워야합니다! 3.2 이상 만.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()


답변

또 다른 대안은 logging패키지의 파일 기반이 아닌 다양한 로깅 핸들러 일 수 있습니다 .

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(다른 사람)

이렇게하면 안전하게 쓸 수 있고 결과를 올바르게 처리 할 수있는 로깅 데몬을 쉽게 가질 수 있습니다. 예를 들어, 메시지를 피클 링하고 자체 회전 파일 핸들러로 내보내는 간단한 소켓 서버입니다.

SyslogHandler너무, 당신이 알아서 것입니다. 물론 syslog시스템 인스턴스가 아닌 자체 인스턴스를 사용할 수 있습니다 .


답변

로깅 및 큐 스레드를 별도로 유지하는 다른 변형.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)


답변

모든 현재 솔루션이 핸들러를 사용하여 로깅 구성에 너무 연결되어 있습니다. 내 솔루션에는 다음과 같은 아키텍처와 기능이 있습니다.

  • 당신은 사용할 수 있습니다 하나를 당신이 원하는 로깅 구성
  • 데몬 스레드에서 로깅이 수행됩니다.
  • 컨텍스트 관리자를 사용하여 데몬의 안전한 종료
  • 로깅 스레드와의 통신은 다음과 같이 수행됩니다. multiprocessing.Queue
  • 서브 프로세스 logging.Logger(및 이미 정의 된 인스턴스)에서 모든 레코드를 큐에 보내도록 패치 됩니다.
  • 새로운 : 형식 추적 및 메시지 산세 오류를 방지하기 위해 큐에 보내기 전에

사용 예제 및 출력 코드는 다음 요점에서 찾을 수 있습니다. https://gist.github.com/schlamar/7003737