Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions loguru/_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -1752,6 +1752,40 @@ def configure(self, *, handlers=None, levels=None, extra=None, patcher=None, act

return [self.add(**params) for params in handlers]

def reinstall(self):
"""Reinstall the core of logger.

When using multiprocessing, you can pass logger as a parameter to the target of
``multiprocessing.Process``, and run this method once, thus you don't need to pass
logger to every function you called in the same process with spawn multiprocessing.

Examples
--------
>>> def subworker(logger):
... logger.reinstall()
... logger.info("Child")
... deeper_subworker()

>>> def deeper_subworker():
... logger.info("Grandchild")

>>> def test_process_spawn():
... spawn_context = multiprocessing.get_context("spawn")
... logger.add("file.log", context=spawn_context, enqueue=True, catch=False)
...
... process = spawn_context.Process(target=subworker, args=(logger,))
... process.start()
... process.join()

... assert process.exitcode == 0

... logger.info("Main")
... logger.remove()
"""
from loguru import logger

logger._core = self._core

def _change_activation(self, name, status):
if not (name is None or isinstance(name, str)):
raise TypeError(
Expand Down
72 changes: 72 additions & 0 deletions tests/test_reinstall.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import multiprocessing
import os

import pytest

from loguru import logger


@pytest.fixture
def fork_context():
yield multiprocessing.get_context("fork")


@pytest.fixture
def spawn_context():
yield multiprocessing.get_context("spawn")


class Writer:
def __init__(self):
self._output = ""

def write(self, message):
self._output += message

def read(self):
return self._output


def subworker(logger):
logger.reinstall()
logger.info("Child")
deeper_subworker()


def deeper_subworker():
logger.info("Grandchild")


@pytest.mark.skipif(os.name == "nt", reason="Windows does not support forking")
def test_process_fork(fork_context):
writer = Writer()

logger.add(writer, context=fork_context, format="{message}", enqueue=True, catch=False)

process = fork_context.Process(target=subworker, args=(logger,))
process.start()
process.join()

assert process.exitcode == 0

logger.info("Main")
logger.remove()

assert writer.read() == "Child\nGrandchild\nMain\n"


def test_process_spawn(spawn_context):
writer = Writer()

logger.add(writer, context=spawn_context, format="{message}", enqueue=True, catch=False)

process = spawn_context.Process(target=subworker, args=(logger,))
process.start()
process.join()

assert process.exitcode == 0

logger.info("Main")
logger.remove()

assert writer.read() == "Child\nGrandchild\nMain\n"