μΉ΄ν…Œκ³ λ¦¬ 보관물: Python

Python

νŒŒμ΄μ¬μ—μ„œ λ©€ν‹° ν”„λ‘œμ„Έμ‹±μ„ μ‚¬μš©ν•˜λŠ” λ™μ•ˆ μ–΄λ–»κ²Œ λ‘œκ·ΈμΈν•΄μ•Όν•©λ‹ˆκΉŒ? 그것을 μ“°κΈ° μ—¬λŸ¬ ν”„λ‘œμ„ΈμŠ€λ₯Όν•¨μœΌλ‘œμ¨ (λ˜λŠ”

ν˜„μž¬ Python 2.6 multiprocessingλͺ¨λ“ˆμ„ μ‚¬μš©ν•˜μ—¬ μ—¬λŸ¬ ν”„λ‘œμ„ΈμŠ€λ₯Ό μƒμ„±ν•˜λŠ” ν”„λ ˆμž„ μ›Œν¬μ— 쀑앙 λͺ¨λ“ˆμ΄ μžˆμŠ΅λ‹ˆλ‹€. λ₯Ό μ‚¬μš©ν•˜κΈ° λ•Œλ¬Έμ— multiprocessingλͺ¨λ“ˆ μˆ˜μ€€μ˜ 닀쀑 처리 인식 λ‘œκ·Έκ°€ LOG = multiprocessing.get_logger()μžˆμŠ΅λ‹ˆλ‹€. λ‹Ή μ›Œλ“œ ν”„λ‘œμ„Έμ„œ ,이 λ‘œκ±°λŠ” μ œλŒ€λ‘œ 해석 κ²ƒλ“€μ—ν•˜μ§€ μ•Šλ„λ‘ ν”„λ‘œμ„ΈμŠ€ 곡유 μž κΈˆμ„ κ°€μ§€κ³  sys.stderrλ™μ‹œμ— 그것을 μ“°κΈ° μ—¬λŸ¬ ν”„λ‘œμ„ΈμŠ€λ₯Όν•¨μœΌλ‘œμ¨ (λ˜λŠ” 무엇이든 ν•Έλ“€).

λ‚΄κ°€ κ°€μ§„ λ¬Έμ œλŠ” ν”„λ ˆμž„ μ›Œν¬μ˜ λ‹€λ₯Έ λͺ¨λ“ˆμ΄ 닀쀑 처리λ₯Ό μΈμ‹ν•˜μ§€ λͺ»ν•œλ‹€λŠ” κ²ƒμž…λ‹ˆλ‹€. λ‚΄κ°€ λ³΄λŠ” λ°©μ‹μœΌλ‘œ,이 쀑앙 λͺ¨λ“ˆμ— λŒ€ν•œ λͺ¨λ“  쒅속성이 닀쀑 처리 인식 λ‘œκΉ…μ„ μ‚¬μš©ν•˜λ„λ‘ν•΄μ•Όν•©λ‹ˆλ‹€. 그것은 ν”„λ ˆμž„ μ›Œν¬μ˜ λͺ¨λ“  ν΄λΌμ΄μ–ΈνŠΈ μ—κ²ŒλŠ” λ¬Όλ‘  ν”„λ ˆμž„ μ›Œν¬ λ‚΄ μ—μ„œ μ„±κ°€μ‹  μΌμž…λ‹ˆλ‹€. λ‚΄κ°€ μƒκ°ν•˜μ§€ μ•ŠλŠ” λŒ€μ•ˆμ΄ μžˆμŠ΅λ‹ˆκΉŒ?



λ‹΅λ³€

λ°©ν•΄ν•˜μ§€ μ•Šκ³  이것을 μ²˜λ¦¬ν•˜λŠ” μœ μΌν•œ 방법은 λ‹€μŒκ³Ό κ°™μŠ΅λ‹ˆλ‹€.

  1. 각 μž‘μ—…μž ν”„λ‘œμ„ΈμŠ€λ₯Ό μƒμ„±ν•˜μ—¬ λ‘œκ·Έκ°€ λ‹€λ₯Έ 파일 λ””μŠ€ν¬λ¦½ν„° (λ””μŠ€ν¬ λ˜λŠ” νŒŒμ΄ν”„) 둜 μ΄λ™ν•˜λ„λ‘ ν•˜λŠ” 것이 κ°€μž₯ μ’‹μŠ΅λ‹ˆλ‹€. λͺ¨λ“  둜그 ν•­λͺ©μ˜ νƒ€μž„ μŠ€νƒ¬ν”„λ₯Ό μ§€μ •ν•΄μ•Όν•©λ‹ˆλ‹€.
  2. 그런 λ‹€μŒ 컨트둀러 ν”„λ‘œμ„ΈμŠ€λŠ” λ‹€μŒ 쀑 ν•˜λ‚˜ λ₯Ό μˆ˜ν–‰ ν•  수 μžˆμŠ΅λ‹ˆλ‹€ .
    • λ””μŠ€ν¬ νŒŒμΌμ„ μ‚¬μš©ν•˜λŠ” 경우 : 싀행이 끝날 λ•Œ 둜그 νŒŒμΌμ„ νƒ€μž„ μŠ€νƒ¬ν”„λ³„λ‘œ μ •λ ¬ν•˜μ—¬ 병합 ν•©λ‹ˆλ‹€.
    • νŒŒμ΄ν”„λ₯Ό μ‚¬μš©ν•˜λŠ” 경우 (ꢌμž₯) : λͺ¨λ“  νŒŒμ΄ν”„μ—μ„œ 쀑앙 둜그 파일둜 μ¦‰μ‹œ 둜그 ν•­λͺ©μ„ λ³‘ν•©ν•©λ‹ˆλ‹€. (예 : selectνŒŒμ΄ν”„μ˜ 파일 λ””μŠ€ν¬λ¦½ν„°μ—μ„œ 주기적 으둜 μ‚¬μš© κ°€λŠ₯ν•œ 둜그 ν•­λͺ©μ— λŒ€ν•΄ 병합 정렬을 μˆ˜ν–‰ν•˜κ³  쀑앙 둜그둜 ν”ŒλŸ¬μ‹œν•˜μ‹­μ‹œμ˜€. λ°˜λ³΅ν•˜μ‹­μ‹œμ˜€.)

λ‹΅λ³€

방금 νŒŒμ΄ν”„λ₯Ό 톡해 λͺ¨λ“  것을 λΆ€λͺ¨ ν”„λ‘œμ„ΈμŠ€μ— κ³΅κΈ‰ν•˜λŠ” 둜그 ν•Έλ“€λŸ¬λ₯Ό μž‘μ„±ν–ˆμŠ΅λ‹ˆλ‹€. λ‚˜λŠ” 그것을 10 λΆ„ λ™μ•ˆ 만 ν…ŒμŠ€νŠΈν–ˆμ§€λ§Œ κ½€ 잘 μž‘λ™ν•˜λŠ” 것 κ°™μŠ΅λ‹ˆλ‹€.

( μ°Έκ³  : 이 ν•˜λ“œ μ½”λ”©λ˜μ–΄ RotatingFileHandlerλ‚΄ 유슀 μΌ€μ΄μŠ€μž…λ‹ˆλ‹€.)


μ—…λ°μ΄νŠΈ : @javier μ§€κΈˆ Pypi에 νŒ¨ν‚€μ§€λ‘œμ΄ 방법을 μ‚¬μš©ν•  수 μœ μ§€ – λ³Ό λ©€ν‹° ν”„λ‘œμ„Έμ‹± λ‘œκΉ… μ—μ„œ Pypi에, github에 https://github.com/jruere/multiprocessing-logging을


μ—…λ°μ΄νŠΈ : κ΅¬ν˜„!

μ΄μ œλŠ” λ™μ‹œμ„±μ„ μ˜¬λ°”λ₯΄κ²Œ μ²˜λ¦¬ν•˜κΈ° μœ„ν•΄ 큐λ₯Ό μ‚¬μš©ν•˜κ³  였λ₯˜λ₯Ό μ˜¬λ°”λ₯΄κ²Œ λ³΅κ΅¬ν•©λ‹ˆλ‹€. λ‚˜λŠ” 이것을 λͺ‡ 달 λ™μ•ˆ ν”„λ‘œλ•μ…˜ ν™˜κ²½μ—μ„œ μ‚¬μš© ν•΄ μ™”μœΌλ©° μ•„λž˜μ˜ ν˜„μž¬ 버전은 λ¬Έμ œμ—†μ΄ μž‘λ™ν•©λ‹ˆλ‹€.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)

λ‹΅λ³€

QueueHandlerPython 3.2 μ΄μƒμ—μ„œ 기본으둜 제곡되며 μ •ν™•ν•˜κ²Œ μˆ˜ν–‰ν•©λ‹ˆλ‹€. 이전 λ²„μ „μ—μ„œ μ‰½κ²Œ λ³΅μ œλ©λ‹ˆλ‹€.

파이썬 λ¬Έμ„œμ—λŠ” 두 κ°€μ§€ μ™„μ „ν•œ μ˜ˆμ œκ°€ μžˆμŠ΅λ‹ˆλ‹€. μ—¬λŸ¬ ν”„λ‘œμ„ΈμŠ€μ—μ„œ 단일 νŒŒμΌμ— λ‘œκΉ…

Python <3.2λ₯Ό μ‚¬μš©ν•˜λŠ” QueueHandlerμ‚¬μš©μžλŠ” https://gist.github.com/vsajip/591589 μ—μ„œ μžμ‹ μ˜ μ½”λ“œλ‘œ 볡사 ν•˜κ±°λ‚˜ import logutilsλ₯Ό μ‚¬μš©ν•˜μ‹­μ‹œμ˜€ .

각 ν”„λ‘œμ„ΈμŠ€ (λΆ€λͺ¨ ν”„λ‘œμ„ΈμŠ€ 포함)λŠ”μ— λ‘œκΉ…μ„ QueueκΈ°λ‘ν•œ λ‹€μŒ listenerμŠ€λ ˆλ“œ λ˜λŠ” ν”„λ‘œμ„ΈμŠ€ (예 : 각 ν”„λ‘œμ„ΈμŠ€λ§ˆλ‹€ ν•˜λ‚˜μ˜ μ˜ˆμ œκ°€ 제곡됨)μ—μ„œ ν•΄λ‹Ή ν”„λ‘œμ„ΈμŠ€λ₯Ό μ„ νƒν•˜μ—¬ νŒŒμΌμ— κΈ°λ‘ν•©λ‹ˆλ‹€. μ†μƒμ΄λ‚˜ μ†μƒμ˜ μœ„ν—˜μ΄ μ—†μŠ΅λ‹ˆλ‹€.


λ‹΅λ³€

λ‹€μŒμ€ Googleμ—μ„œ 온 λ‹€λ₯Έ μ‚¬λžŒ (예 : λ‚˜)의 λ‹¨μˆœμ„±μ— 쀑점을 λ‘” 또 λ‹€λ₯Έ μ†”λ£¨μ…˜μž…λ‹ˆλ‹€. λ‘œκΉ…μ΄ μ‰¬μ›Œμ•Όν•©λ‹ˆλ‹€! 3.2 이상 만.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()

λ‹΅λ³€

또 λ‹€λ₯Έ λŒ€μ•ˆμ€ loggingνŒ¨ν‚€μ§€μ˜ 파일 기반이 μ•„λ‹Œ λ‹€μ–‘ν•œ λ‘œκΉ… ν•Έλ“€λŸ¬ 일 수 μžˆμŠ΅λ‹ˆλ‹€ .

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(λ‹€λ₯Έ μ‚¬λžŒ)

μ΄λ ‡κ²Œν•˜λ©΄ μ•ˆμ „ν•˜κ²Œ μ“Έ 수 있고 κ²°κ³Όλ₯Ό μ˜¬λ°”λ₯΄κ²Œ 처리 ν•  μˆ˜μžˆλŠ” λ‘œκΉ… 데λͺ¬μ„ μ‰½κ²Œ κ°€μ§ˆ 수 μžˆμŠ΅λ‹ˆλ‹€. 예λ₯Ό λ“€μ–΄, λ©”μ‹œμ§€λ₯Ό 피클 λ§ν•˜κ³  자체 νšŒμ „ 파일 ν•Έλ“€λŸ¬λ‘œ λ‚΄λ³΄λ‚΄λŠ” κ°„λ‹¨ν•œ μ†ŒμΌ“ μ„œλ²„μž…λ‹ˆλ‹€.

은 SyslogHandlerλ„ˆλ¬΄, 당신이 μ•Œμ•„μ„œ κ²ƒμž…λ‹ˆλ‹€. λ¬Όλ‘  syslogμ‹œμŠ€ν…œ μΈμŠ€ν„΄μŠ€κ°€ μ•„λ‹Œ 자체 μΈμŠ€ν„΄μŠ€λ₯Ό μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€ .


λ‹΅λ³€

λ‘œκΉ… 및 큐 μŠ€λ ˆλ“œλ₯Ό λ³„λ„λ‘œ μœ μ§€ν•˜λŠ” λ‹€λ₯Έ λ³€ν˜•.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)

λ‹΅λ³€

λͺ¨λ“  ν˜„μž¬ μ†”λ£¨μ…˜μ΄ ν•Έλ“€λŸ¬λ₯Ό μ‚¬μš©ν•˜μ—¬ λ‘œκΉ… ꡬ성에 λ„ˆλ¬΄ μ—°κ²°λ˜μ–΄ μžˆμŠ΅λ‹ˆλ‹€. λ‚΄ μ†”λ£¨μ…˜μ—λŠ” λ‹€μŒκ³Ό 같은 μ•„ν‚€ν…μ²˜μ™€ κΈ°λŠ₯이 μžˆμŠ΅λ‹ˆλ‹€.

  • 당신은 μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€ ν•˜λ‚˜λ₯Ό 당신이 μ›ν•˜λŠ” λ‘œκΉ… ꡬ성
  • 데λͺ¬ μŠ€λ ˆλ“œμ—μ„œ λ‘œκΉ…μ΄ μˆ˜ν–‰λ©λ‹ˆλ‹€.
  • μ»¨ν…μŠ€νŠΈ κ΄€λ¦¬μžλ₯Ό μ‚¬μš©ν•˜μ—¬ 데λͺ¬μ˜ μ•ˆμ „ν•œ μ’…λ£Œ
  • λ‘œκΉ… μŠ€λ ˆλ“œμ™€μ˜ 톡신은 λ‹€μŒκ³Ό 같이 μˆ˜ν–‰λ©λ‹ˆλ‹€. multiprocessing.Queue
  • μ„œλΈŒ ν”„λ‘œμ„ΈμŠ€ logging.Logger(및 이미 μ •μ˜ 된 μΈμŠ€ν„΄μŠ€)μ—μ„œ λͺ¨λ“  λ ˆμ½”λ“œλ₯Ό 큐에 보내도둝 패치 λ©λ‹ˆλ‹€.
  • μƒˆλ‘œμš΄ : ν˜•μ‹ 좔적 및 λ©”μ‹œμ§€ μ‚°μ„Έ 였λ₯˜λ₯Ό λ°©μ§€ν•˜κΈ° μœ„ν•΄ 큐에 보내기 전에

μ‚¬μš© 예제 및 좜λ ₯ μ½”λ“œλŠ” λ‹€μŒ μš”μ μ—μ„œ 찾을 수 μžˆμŠ΅λ‹ˆλ‹€. https://gist.github.com/schlamar/7003737


이 글은 Python μΉ΄ν…Œκ³ λ¦¬λ‘œ λΆ„λ₯˜λ˜μ—ˆκ³  λ‹˜μ— μ˜ν•΄ 에 μž‘μ„±λμŠ΅λ‹ˆλ‹€.