为什么multiprocessing.Process()和parallel.futures.ProcessPoolExecutor()通过logging.handlers.QueueHandler()提供不同的日志输出?

我正在尝试使我的日志记录与concurrent.futures.ProcesspoolExecutor并行处理兼容。

我正在关注this example

这是示例中的代码(在worker_process()main()中进行了修改):

# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing
import concurrent.futures
# Next two import lines for this demo only
from random import choice,random
import time
import os

#
# Because you'll want to define the logging configurations for listener and workers,the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,# which they use for communication.
#
# In practice,you can configure the listener however you want,but note that in this
# simple example,the listener does not apply level or filter logic to received records.
# In practice,you would probably want to do this logic in the worker processes,to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log','a',3000000,10)
    f = logging.Formatter('%(asctime)s %(processname)-10s %(name)s %(levelname)-8s %(message)s')
    h.setformatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them,quit when you get a None for a
# LogRecord.
def listener_process(queue,configurer):
    configurer()
    old_record = ''
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break

            if not record == old_record:
                logger = logging.getLogger(record.name)
                logger.handle(record)  # No level or filter logic applied - just do it!
            old_record = record
        except Exception:
            import sys,traceback
            print('Whoops! Problem:',file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG,logging.INFO,logging.WARNING,logging.ERROR,logging.CRITICAL]

LOGGERS = ['a.b.c','d.e.f']


# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics,so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages,for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

# This is the worker process top-level loop,which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue,configurer,k):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    time.sleep(random())
    logger = logging.getLogger(choice(LOGGERS))
    level = choice(LEVELS)
    message = f'Message #{k}'
    logger.log(level,message)
    print('Worker finished: %s' % name)

# Here's where the demo gets orchestrated. Create the queue,create and start
# the listener,create ten workers and start them,wait for them to finish,# then send a None to the queue to tell the listener to finish.
def main(mode):
    n = 30
    queue = multiprocessing.Manager().Queue(-1)
    listener = multiprocessing.Process(target=listener_process,args=(queue,listener_configurer))
    listener.start()

    if mode == 'multiprocessing':
        workers = []
        for i in range(n):
            worker = multiprocessing.Process(target=worker_process,worker_configurer,i))
            workers.append(worker)
            worker.start()
        for w in workers:
            w.join()
    elif mode == 'concurrent.futures':
        with concurrent.futures.ProcesspoolExecutor(max_workers=n) as executor:
            for i in range(n):
                executor.submit(worker_process,queue,i)

    queue.put_nowait(None)
    listener.join()


if __name__ == '__main__':
    if os.path.exists('mptest.log'): os.unlink('mptest.log')
    main(mode='multiprocessing')

如果我设置了mode='multiprocessing',那么生成的日志文件将包含30行文本,正如我所期望的那样。

但是,如果我设置mode='concurrent.features',则生成的日志文件将包含30至60行文本,并且行数会有所不同。

您对为什么会发生这种情况以及如何解决这个问题有任何想法吗?

huxinyuhao 回答:为什么multiprocessing.Process()和parallel.futures.ProcessPoolExecutor()通过logging.handlers.QueueHandler()提供不同的日志输出?

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3101465.html

大家都在问