νμ¬ Python 2.6 multiprocessing
λͺ¨λμ μ¬μ©νμ¬ μ¬λ¬ νλ‘μΈμ€λ₯Ό μμ±νλ νλ μ μν¬μ μ€μ λͺ¨λμ΄ μμ΅λλ€. λ₯Ό μ¬μ©νκΈ° λλ¬Έμ multiprocessing
λͺ¨λ μμ€μ λ€μ€ μ²λ¦¬ μΈμ λ‘κ·Έκ° LOG = multiprocessing.get_logger()
μμ΅λλ€. λΉ μλ νλ‘μΈμ ,μ΄ λ‘κ±°λ μ λλ‘ ν΄μ κ²λ€μνμ§ μλλ‘ νλ‘μΈμ€ 곡μ μ κΈμ κ°μ§κ³ sys.stderr
λμμ κ·Έκ²μ μ°κΈ° μ¬λ¬ νλ‘μΈμ€λ₯Όν¨μΌλ‘μ¨ (λλ 무μμ΄λ νΈλ€).
λ΄κ° κ°μ§ λ¬Έμ λ νλ μ μν¬μ λ€λ₯Έ λͺ¨λμ΄ λ€μ€ μ²λ¦¬λ₯Ό μΈμνμ§ λͺ»νλ€λ κ²μ λλ€. λ΄κ° 보λ λ°©μμΌλ‘,μ΄ μ€μ λͺ¨λμ λν λͺ¨λ μ’ μμ±μ΄ λ€μ€ μ²λ¦¬ μΈμ λ‘κΉ μ μ¬μ©νλλ‘ν΄μΌν©λλ€. κ·Έκ²μ νλ μ μν¬μ λͺ¨λ ν΄λΌμ΄μΈνΈ μκ²λ λ¬Όλ‘ νλ μ μν¬ λ΄ μμ μ±κ°μ μΌμ λλ€. λ΄κ° μκ°νμ§ μλ λμμ΄ μμ΅λκΉ?
λ΅λ³
λ°©ν΄νμ§ μκ³ μ΄κ²μ μ²λ¦¬νλ μ μΌν λ°©λ²μ λ€μκ³Ό κ°μ΅λλ€.
- κ° μμ μ νλ‘μΈμ€λ₯Ό μμ±νμ¬ λ‘κ·Έκ° λ€λ₯Έ νμΌ λμ€ν¬λ¦½ν° (λμ€ν¬ λλ νμ΄ν) λ‘ μ΄λνλλ‘ νλ κ²μ΄ κ°μ₯ μ’μ΅λλ€. λͺ¨λ λ‘κ·Έ νλͺ©μ νμ μ€ν¬νλ₯Ό μ§μ ν΄μΌν©λλ€.
- κ·Έλ° λ€μ 컨νΈλ‘€λ¬ νλ‘μΈμ€λ λ€μ μ€ νλ λ₯Ό μν ν μ μμ΅λλ€ .
- λμ€ν¬ νμΌμ μ¬μ©νλ κ²½μ° : μ€νμ΄ λλ λ λ‘κ·Έ νμΌμ νμ μ€ν¬νλ³λ‘ μ λ ¬νμ¬ λ³ν© ν©λλ€.
- νμ΄νλ₯Ό μ¬μ©νλ κ²½μ° (κΆμ₯) : λͺ¨λ νμ΄νμμ μ€μ λ‘κ·Έ νμΌλ‘ μ¦μ λ‘κ·Έ νλͺ©μ λ³ν©ν©λλ€. (μ :
select
νμ΄νμ νμΌ λμ€ν¬λ¦½ν°μμ μ£ΌκΈ°μ μΌλ‘ μ¬μ© κ°λ₯ν λ‘κ·Έ νλͺ©μ λν΄ λ³ν© μ λ ¬μ μννκ³ μ€μ λ‘κ·Έλ‘ νλ¬μνμμμ€. λ°λ³΅νμμμ€.)
λ΅λ³
λ°©κΈ νμ΄νλ₯Ό ν΅ν΄ λͺ¨λ κ²μ λΆλͺ¨ νλ‘μΈμ€μ 곡κΈνλ λ‘κ·Έ νΈλ€λ¬λ₯Ό μμ±νμ΅λλ€. λλ κ·Έκ²μ 10 λΆ λμ λ§ ν μ€νΈνμ§λ§ κ½€ μ μλνλ κ² κ°μ΅λλ€.
( μ°Έκ³ : μ΄ νλ μ½λ©λμ΄ RotatingFileHandler
λ΄ μ μ€ μΌμ΄μ€μ
λλ€.)
μ λ°μ΄νΈ : @javier μ§κΈ Pypiμ ν¨ν€μ§λ‘μ΄ λ°©λ²μ μ¬μ©ν μ μ μ§ β λ³Ό λ©ν° νλ‘μΈμ± λ‘κΉ μμ Pypiμ, githubμ https://github.com/jruere/multiprocessing-loggingμ
μ λ°μ΄νΈ : ꡬν!
μ΄μ λ λμμ±μ μ¬λ°λ₯΄κ² μ²λ¦¬νκΈ° μν΄ νλ₯Ό μ¬μ©νκ³ μ€λ₯λ₯Ό μ¬λ°λ₯΄κ² 볡ꡬν©λλ€. λλ μ΄κ²μ λͺ λ¬ λμ νλ‘λμ νκ²½μμ μ¬μ© ν΄ μμΌλ©° μλμ νμ¬ λ²μ μ λ¬Έμ μμ΄ μλν©λλ€.
from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback
class MultiProcessingLog(logging.Handler):
def __init__(self, name, mode, maxsize, rotate):
logging.Handler.__init__(self)
self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
self.queue = multiprocessing.Queue(-1)
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()
def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)
def receive(self):
while True:
try:
record = self.queue.get()
self._handler.emit(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
def send(self, s):
self.queue.put_nowait(s)
def _format_record(self, record):
# ensure that exc_info and args
# have been stringified. Removes any chance of
# unpickleable things inside and possibly reduces
# message size sent over the pipe
if record.args:
record.msg = record.msg % record.args
record.args = None
if record.exc_info:
dummy = self.format(record)
record.exc_info = None
return record
def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
self._handler.close()
logging.Handler.close(self)
λ΅λ³
QueueHandler
Python 3.2 μ΄μμμ κΈ°λ³ΈμΌλ‘ μ 곡λλ©° μ ννκ² μνν©λλ€. μ΄μ λ²μ μμ μ½κ² 볡μ λ©λλ€.
νμ΄μ¬ λ¬Έμμλ λ κ°μ§ μμ ν μμ κ° μμ΅λλ€. μ¬λ¬ νλ‘μΈμ€μμ λ¨μΌ νμΌμ λ‘κΉ
Python <3.2λ₯Ό μ¬μ©νλ QueueHandler
μ¬μ©μλ https://gist.github.com/vsajip/591589 μμ μμ μ μ½λλ‘ λ³΅μ¬ νκ±°λ import logutilsλ₯Ό μ¬μ©νμμμ€ .
κ° νλ‘μΈμ€ (λΆλͺ¨ νλ‘μΈμ€ ν¬ν¨)λμ λ‘κΉ
μ Queue
κΈ°λ‘ν λ€μ listener
μ€λ λ λλ νλ‘μΈμ€ (μ : κ° νλ‘μΈμ€λ§λ€ νλμ μμ κ° μ 곡λ¨)μμ ν΄λΉ νλ‘μΈμ€λ₯Ό μ ννμ¬ νμΌμ κΈ°λ‘ν©λλ€. μμμ΄λ μμμ μνμ΄ μμ΅λλ€.
λ΅λ³
λ€μμ Googleμμ μ¨ λ€λ₯Έ μ¬λ (μ : λ)μ λ¨μμ±μ μ€μ μ λ λ λ€λ₯Έ μ루μ μ λλ€. λ‘κΉ μ΄ μ¬μμΌν©λλ€! 3.2 μ΄μ λ§.
import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random
def f(i):
time.sleep(random.uniform(.01, .05))
logging.info('function called with {} in worker thread.'.format(i))
time.sleep(random.uniform(.01, .05))
return i
def worker_init(q):
# all records from worker processes go to qh and then into q
qh = QueueHandler(q)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(qh)
def logger_init():
q = multiprocessing.Queue()
# this is the handler for all log records
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))
# ql gets records from the queue and sends them to the handler
ql = QueueListener(q, handler)
ql.start()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# add the handler to the logger so records from this process are handled
logger.addHandler(handler)
return ql, q
def main():
q_listener, q = logger_init()
logging.info('hello from main thread')
pool = multiprocessing.Pool(4, worker_init, [q])
for result in pool.map(f, range(10)):
pass
pool.close()
pool.join()
q_listener.stop()
if __name__ == '__main__':
main()
λ΅λ³
λ λ€λ₯Έ λμμ logging
ν¨ν€μ§μ νμΌ κΈ°λ°μ΄ μλ λ€μν λ‘κΉ
νΈλ€λ¬ μΌ μ μμ΅λλ€ .
SocketHandler
DatagramHandler
SyslogHandler
(λ€λ₯Έ μ¬λ)
μ΄λ κ²νλ©΄ μμ νκ² μΈ μ μκ³ κ²°κ³Όλ₯Ό μ¬λ°λ₯΄κ² μ²λ¦¬ ν μμλ λ‘κΉ λ°λͺ¬μ μ½κ² κ°μ§ μ μμ΅λλ€. μλ₯Ό λ€μ΄, λ©μμ§λ₯Ό νΌν΄ λ§νκ³ μ체 νμ νμΌ νΈλ€λ¬λ‘ λ΄λ³΄λ΄λ κ°λ¨ν μμΌ μλ²μ λλ€.
μ SyslogHandler
λ무, λΉμ μ΄ μμμ κ²μ
λλ€. λ¬Όλ‘ syslog
μμ€ν
μΈμ€ν΄μ€κ° μλ μ체 μΈμ€ν΄μ€λ₯Ό μ¬μ©ν μ μμ΅λλ€ .
λ΅λ³
λ‘κΉ λ° ν μ€λ λλ₯Ό λ³λλ‘ μ μ§νλ λ€λ₯Έ λ³ν.
"""sample code for logging in subprocesses using multiprocessing
* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
process.
* As in the other implementations, a thread reads the queue and calls the
handlers. Except in this implementation, the thread is defined outside of a
handler, which makes the logger definitions simpler.
* Works with multiple handlers. If the logger in the main process defines
multiple handlers, they will all be fed records generated by the
subprocesses loggers.
tested with Python 2.5 and 2.6 on Linux and Windows
"""
import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys
DEFAULT_LEVEL = logging.DEBUG
formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")
class SubProcessLogHandler(logging.Handler):
"""handler used by subprocesses
It simply puts items on a Queue for the main process to log.
"""
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
self.queue.put(record)
class LogQueueReader(threading.Thread):
"""thread to write subprocesses log records to main process log
This thread reads the records written by subprocesses and writes them to
the handlers defined in the main process's handlers.
"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
self.daemon = True
def run(self):
"""read from the queue and write to the log handlers
The logging documentation says logging is thread safe, so there
shouldn't be contention between normal logging (from the main
process) and this thread.
Note that we're using the name of the original logger.
"""
# Thanks Mike for the error checking code.
while True:
try:
record = self.queue.get()
# get the logger for this record
logger = logging.getLogger(record.name)
logger.callHandlers(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
class LoggingProcess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def _setupLogger(self):
# create the logger to use.
logger = logging.getLogger('test.subprocess')
# The only handler desired is the SubProcessLogHandler. If any others
# exist, remove them. In this case, on Unix and Linux the StreamHandler
# will be inherited.
for handler in logger.handlers:
# just a check for my sanity
assert not isinstance(handler, SubProcessLogHandler)
logger.removeHandler(handler)
# add the handler
handler = SubProcessLogHandler(self.queue)
handler.setFormatter(formatter)
logger.addHandler(handler)
# On Windows, the level will not be inherited. Also, we could just
# set the level to log everything here and filter it in the main
# process handlers. For now, just set it from the global default.
logger.setLevel(DEFAULT_LEVEL)
self.logger = logger
def run(self):
self._setupLogger()
logger = self.logger
# and here goes the logging
p = multiprocessing.current_process()
logger.info('hello from process %s with pid %s' % (p.name, p.pid))
if __name__ == '__main__':
# queue used by the subprocess loggers
queue = multiprocessing.Queue()
# Just a normal logger
logger = logging.getLogger('test')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(DEFAULT_LEVEL)
logger.info('hello from the main process')
# This thread will read from the subprocesses and write to the main log's
# handlers.
log_queue_reader = LogQueueReader(queue)
log_queue_reader.start()
# create the processes.
for i in range(10):
p = LoggingProcess(queue)
p.start()
# The way I read the multiprocessing warning about Queue, joining a
# process before it has finished feeding the Queue can cause a deadlock.
# Also, Queue.empty() is not realiable, so just make sure all processes
# are finished.
# active_children joins subprocesses when they're finished.
while multiprocessing.active_children():
time.sleep(.1)
λ΅λ³
λͺ¨λ νμ¬ μ루μ μ΄ νΈλ€λ¬λ₯Ό μ¬μ©νμ¬ λ‘κΉ κ΅¬μ±μ λ무 μ°κ²°λμ΄ μμ΅λλ€. λ΄ μ루μ μλ λ€μκ³Ό κ°μ μν€ν μ²μ κΈ°λ₯μ΄ μμ΅λλ€.
- λΉμ μ μ¬μ©ν μ μμ΅λλ€ νλλ₯Ό λΉμ μ΄ μνλ λ‘κΉ κ΅¬μ±
- λ°λͺ¬ μ€λ λμμ λ‘κΉ μ΄ μνλ©λλ€.
- 컨ν μ€νΈ κ΄λ¦¬μλ₯Ό μ¬μ©νμ¬ λ°λͺ¬μ μμ ν μ’ λ£
- λ‘κΉ
μ€λ λμμ ν΅μ μ λ€μκ³Ό κ°μ΄ μνλ©λλ€.
multiprocessing.Queue
- μλΈ νλ‘μΈμ€
logging.Logger
(λ° μ΄λ―Έ μ μ λ μΈμ€ν΄μ€)μμ λͺ¨λ λ μ½λλ₯Ό νμ 보λ΄λλ‘ ν¨μΉ λ©λλ€. - μλ‘μ΄ : νμ μΆμ λ° λ©μμ§ μ°μΈ μ€λ₯λ₯Ό λ°©μ§νκΈ° μν΄ νμ 보λ΄κΈ° μ μ
μ¬μ© μμ λ° μΆλ ₯ μ½λλ λ€μ μμ μμ μ°Ύμ μ μμ΅λλ€. https://gist.github.com/schlamar/7003737