현재 Python 2.6 multiprocessing
모듈을 사용하여 여러 프로세스를 생성하는 프레임 워크에 중앙 모듈이 있습니다. 를 사용하기 때문에 multiprocessing
모듈 수준의 다중 처리 인식 로그가 LOG = multiprocessing.get_logger()
있습니다. 당 워드 프로세서 ,이 로거는 제대로 해석 것들에하지 않도록 프로세스 공유 잠금을 가지고 sys.stderr
동시에 그것을 쓰기 여러 프로세스를함으로써 (또는 무엇이든 핸들).
내가 가진 문제는 프레임 워크의 다른 모듈이 다중 처리를 인식하지 못한다는 것입니다. 내가 보는 방식으로,이 중앙 모듈에 대한 모든 종속성이 다중 처리 인식 로깅을 사용하도록해야합니다. 그것은 프레임 워크의 모든 클라이언트 에게는 물론 프레임 워크 내 에서 성가신 일입니다. 내가 생각하지 않는 대안이 있습니까?
답변
방해하지 않고 이것을 처리하는 유일한 방법은 다음과 같습니다.
- 각 작업자 프로세스를 생성하여 로그가 다른 파일 디스크립터 (디스크 또는 파이프) 로 이동하도록 하는 것이 가장 좋습니다. 모든 로그 항목의 타임 스탬프를 지정해야합니다.
- 그런 다음 컨트롤러 프로세스는 다음 중 하나 를 수행 할 수 있습니다 .
- 디스크 파일을 사용하는 경우 : 실행이 끝날 때 로그 파일을 타임 스탬프별로 정렬하여 병합 합니다.
- 파이프를 사용하는 경우 (권장) : 모든 파이프에서 중앙 로그 파일로 즉시 로그 항목을 병합합니다. (예 :
select
파이프의 파일 디스크립터에서 주기적 으로 사용 가능한 로그 항목에 대해 병합 정렬을 수행하고 중앙 로그로 플러시하십시오. 반복하십시오.)
답변
방금 파이프를 통해 모든 것을 부모 프로세스에 공급하는 로그 핸들러를 작성했습니다. 나는 그것을 10 분 동안 만 테스트했지만 꽤 잘 작동하는 것 같습니다.
( 참고 : 이 하드 코딩되어 RotatingFileHandler
내 유스 케이스입니다.)
업데이트 : @javier 지금 Pypi에 패키지로이 방법을 사용할 수 유지 – 볼 멀티 프로세싱 로깅 에서 Pypi에, github에 https://github.com/jruere/multiprocessing-logging을
업데이트 : 구현!
이제는 동시성을 올바르게 처리하기 위해 큐를 사용하고 오류를 올바르게 복구합니다. 나는 이것을 몇 달 동안 프로덕션 환경에서 사용 해 왔으며 아래의 현재 버전은 문제없이 작동합니다.
from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback
class MultiProcessingLog(logging.Handler):
def __init__(self, name, mode, maxsize, rotate):
logging.Handler.__init__(self)
self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
self.queue = multiprocessing.Queue(-1)
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()
def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)
def receive(self):
while True:
try:
record = self.queue.get()
self._handler.emit(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
def send(self, s):
self.queue.put_nowait(s)
def _format_record(self, record):
# ensure that exc_info and args
# have been stringified. Removes any chance of
# unpickleable things inside and possibly reduces
# message size sent over the pipe
if record.args:
record.msg = record.msg % record.args
record.args = None
if record.exc_info:
dummy = self.format(record)
record.exc_info = None
return record
def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
self._handler.close()
logging.Handler.close(self)
답변
QueueHandler
Python 3.2 이상에서 기본으로 제공되며 정확하게 수행합니다. 이전 버전에서 쉽게 복제됩니다.
파이썬 문서에는 두 가지 완전한 예제가 있습니다. 여러 프로세스에서 단일 파일에 로깅
Python <3.2를 사용하는 QueueHandler
사용자는 https://gist.github.com/vsajip/591589 에서 자신의 코드로 복사 하거나 import logutils를 사용하십시오 .
각 프로세스 (부모 프로세스 포함)는에 로깅을 Queue
기록한 다음 listener
스레드 또는 프로세스 (예 : 각 프로세스마다 하나의 예제가 제공됨)에서 해당 프로세스를 선택하여 파일에 기록합니다. 손상이나 손상의 위험이 없습니다.
답변
다음은 Google에서 온 다른 사람 (예 : 나)의 단순성에 중점을 둔 또 다른 솔루션입니다. 로깅이 쉬워야합니다! 3.2 이상 만.
import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random
def f(i):
time.sleep(random.uniform(.01, .05))
logging.info('function called with {} in worker thread.'.format(i))
time.sleep(random.uniform(.01, .05))
return i
def worker_init(q):
# all records from worker processes go to qh and then into q
qh = QueueHandler(q)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(qh)
def logger_init():
q = multiprocessing.Queue()
# this is the handler for all log records
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))
# ql gets records from the queue and sends them to the handler
ql = QueueListener(q, handler)
ql.start()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# add the handler to the logger so records from this process are handled
logger.addHandler(handler)
return ql, q
def main():
q_listener, q = logger_init()
logging.info('hello from main thread')
pool = multiprocessing.Pool(4, worker_init, [q])
for result in pool.map(f, range(10)):
pass
pool.close()
pool.join()
q_listener.stop()
if __name__ == '__main__':
main()
답변
또 다른 대안은 logging
패키지의 파일 기반이 아닌 다양한 로깅 핸들러 일 수 있습니다 .
SocketHandler
DatagramHandler
SyslogHandler
(다른 사람)
이렇게하면 안전하게 쓸 수 있고 결과를 올바르게 처리 할 수있는 로깅 데몬을 쉽게 가질 수 있습니다. 예를 들어, 메시지를 피클 링하고 자체 회전 파일 핸들러로 내보내는 간단한 소켓 서버입니다.
은 SyslogHandler
너무, 당신이 알아서 것입니다. 물론 syslog
시스템 인스턴스가 아닌 자체 인스턴스를 사용할 수 있습니다 .
답변
로깅 및 큐 스레드를 별도로 유지하는 다른 변형.
"""sample code for logging in subprocesses using multiprocessing
* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
process.
* As in the other implementations, a thread reads the queue and calls the
handlers. Except in this implementation, the thread is defined outside of a
handler, which makes the logger definitions simpler.
* Works with multiple handlers. If the logger in the main process defines
multiple handlers, they will all be fed records generated by the
subprocesses loggers.
tested with Python 2.5 and 2.6 on Linux and Windows
"""
import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys
DEFAULT_LEVEL = logging.DEBUG
formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")
class SubProcessLogHandler(logging.Handler):
"""handler used by subprocesses
It simply puts items on a Queue for the main process to log.
"""
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
self.queue.put(record)
class LogQueueReader(threading.Thread):
"""thread to write subprocesses log records to main process log
This thread reads the records written by subprocesses and writes them to
the handlers defined in the main process's handlers.
"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
self.daemon = True
def run(self):
"""read from the queue and write to the log handlers
The logging documentation says logging is thread safe, so there
shouldn't be contention between normal logging (from the main
process) and this thread.
Note that we're using the name of the original logger.
"""
# Thanks Mike for the error checking code.
while True:
try:
record = self.queue.get()
# get the logger for this record
logger = logging.getLogger(record.name)
logger.callHandlers(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
class LoggingProcess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def _setupLogger(self):
# create the logger to use.
logger = logging.getLogger('test.subprocess')
# The only handler desired is the SubProcessLogHandler. If any others
# exist, remove them. In this case, on Unix and Linux the StreamHandler
# will be inherited.
for handler in logger.handlers:
# just a check for my sanity
assert not isinstance(handler, SubProcessLogHandler)
logger.removeHandler(handler)
# add the handler
handler = SubProcessLogHandler(self.queue)
handler.setFormatter(formatter)
logger.addHandler(handler)
# On Windows, the level will not be inherited. Also, we could just
# set the level to log everything here and filter it in the main
# process handlers. For now, just set it from the global default.
logger.setLevel(DEFAULT_LEVEL)
self.logger = logger
def run(self):
self._setupLogger()
logger = self.logger
# and here goes the logging
p = multiprocessing.current_process()
logger.info('hello from process %s with pid %s' % (p.name, p.pid))
if __name__ == '__main__':
# queue used by the subprocess loggers
queue = multiprocessing.Queue()
# Just a normal logger
logger = logging.getLogger('test')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(DEFAULT_LEVEL)
logger.info('hello from the main process')
# This thread will read from the subprocesses and write to the main log's
# handlers.
log_queue_reader = LogQueueReader(queue)
log_queue_reader.start()
# create the processes.
for i in range(10):
p = LoggingProcess(queue)
p.start()
# The way I read the multiprocessing warning about Queue, joining a
# process before it has finished feeding the Queue can cause a deadlock.
# Also, Queue.empty() is not realiable, so just make sure all processes
# are finished.
# active_children joins subprocesses when they're finished.
while multiprocessing.active_children():
time.sleep(.1)
답변
모든 현재 솔루션이 핸들러를 사용하여 로깅 구성에 너무 연결되어 있습니다. 내 솔루션에는 다음과 같은 아키텍처와 기능이 있습니다.
- 당신은 사용할 수 있습니다 하나를 당신이 원하는 로깅 구성
- 데몬 스레드에서 로깅이 수행됩니다.
- 컨텍스트 관리자를 사용하여 데몬의 안전한 종료
- 로깅 스레드와의 통신은 다음과 같이 수행됩니다.
multiprocessing.Queue
- 서브 프로세스
logging.Logger
(및 이미 정의 된 인스턴스)에서 모든 레코드를 큐에 보내도록 패치 됩니다. - 새로운 : 형식 추적 및 메시지 산세 오류를 방지하기 위해 큐에 보내기 전에
사용 예제 및 출력 코드는 다음 요점에서 찾을 수 있습니다. https://gist.github.com/schlamar/7003737