nucypher/tests/unit/test_logging.py

368 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from functools import partial
from io import StringIO
from json.encoder import py_encode_basestring_ascii
from unittest import mock
import pytest
from twisted.logger import Logger as TwistedLogger
from twisted.logger import (
LogLevel,
formatEvent,
globalLogPublisher,
jsonFileLogObserver,
)
from nucypher.utilities.emitters import StdoutEmitter
from nucypher.utilities.logging import (
GlobalLoggerSettings,
Logger,
observer_log_level_wrapper,
)
def naive_print_observer(event):
print(formatEvent(event), end="")
def get_json_observer_for_file(logfile):
def json_observer(event):
observer = jsonFileLogObserver(outFile=logfile)
return observer(event)
return json_observer
def expected_processing(string_with_curly_braces):
ascii_string = py_encode_basestring_ascii(string_with_curly_braces)[1:-1]
expected_output = Logger.escape_format_string(ascii_string)
return expected_output
# Any string without curly braces won't have any problem
ordinary_strings = (
"Because there's nothing worse in life than being ordinary.",
"🍌 🍌 🍌 terracotta 🍌 🍌 🍌 terracotta terracotta 🥧",
'"You can quote me on this"',
f"Some bytes: {b''.join(chr(i).encode() for i in range(1024) if chr(i) not in '{}')}"
)
# Strings that have curly braces but that appear in groups of even length are considered safe too,
# as curly braces are escaped this way, according to PEP 3101. Twisted will eat these just fine,
# but since they have curly braces, we will have to process them in our Logger.
quirky_strings = (
"{{}}", "{{hola}}", "{{{{}}}}", "foo{{}}",
)
# These are strings that are definitely going to cause trouble for Twisted Logger
freaky_format_strings = ( # Including the expected exception and error message
("{", ValueError, "Single '{' encountered in format string"),
("}", ValueError, "Single '}' encountered in format string"),
("foo}", ValueError, "Single '}' encountered in format string"),
("bar{", ValueError, "Single '{' encountered in format string"),
("}{", ValueError, "Single '}' encountered in format string"),
("{{}", ValueError, "Single '}' encountered in format string"),
("}}{", ValueError, "Single '{' encountered in format string"),
(f"{b'{'}", ValueError, "expected '}' before end of string"),
(f"{b'}'}", ValueError, "Single '}' encountered in format string"),
("{}", KeyError, ""),
("{}{", KeyError, ""),
("{}}", KeyError, ""),
("{{{}}}", KeyError, ""),
("{{{{{}}}}}", KeyError, ""),
("{bananas}", KeyError, "bananas"),
(str({'bananas': '🍌🍌🍌'}), KeyError, "bananas"),
(f"Some bytes: {b''.join(chr(i).encode() for i in range(1024))}", KeyError, "|")
)
# Embrace the quirky!
acceptable_strings = (*ordinary_strings, *quirky_strings)
def test_twisted_logger_doesnt_like_curly_braces(capsys):
twisted_logger = TwistedLogger('twisted', observer=naive_print_observer)
# Normal strings are logged normally
for string in acceptable_strings:
twisted_logger.info(string)
captured = capsys.readouterr()
assert string.format() == captured.out
assert not captured.err
# But curly braces are not
for string, _exception, exception_message in freaky_format_strings:
twisted_logger.info(string)
captured = capsys.readouterr()
assert string != captured.out
assert "Unable to format event" in captured.out
assert exception_message in captured.out
def test_twisted_json_logger_doesnt_like_curly_braces_either():
twisted_logger = TwistedLogger('twisted-json')
# Normal strings are logged normally
for string in acceptable_strings:
file = StringIO()
twisted_logger.observer = get_json_observer_for_file(file)
twisted_logger.info(string)
logged_event = file.getvalue()
assert '"log_level": {"name": "info"' in logged_event
assert f'"log_format": "{expected_processing(string.format())}"' in logged_event
# But curly braces are not
for string, exception, exception_message in freaky_format_strings:
file = StringIO()
twisted_logger.observer = get_json_observer_for_file(file)
with pytest.raises(exception, match=exception_message):
twisted_logger.info(string)
def test_but_nucypher_logger_is_cool_with_that(capsys):
nucypher_logger = Logger('nucypher-logger', observer=naive_print_observer)
# Normal strings are logged normally
for string in acceptable_strings:
nucypher_logger.info(string)
captured = capsys.readouterr()
assert string == captured.out
assert not captured.err
# And curly braces too!
for string, _exception, _exception_message in freaky_format_strings:
nucypher_logger.info(string)
captured = capsys.readouterr()
assert "Unable to format event" not in captured.out
assert not captured.err
assert string == captured.out
def test_even_nucypher_json_logger_is_cool():
nucypher_logger = Logger('nucypher-logger-json')
# Normal strings are logged normally
for string in acceptable_strings:
file = StringIO()
nucypher_logger.observer = get_json_observer_for_file(file)
nucypher_logger.info(string)
logged_event = file.getvalue()
assert '"log_level": {"name": "info"' in logged_event
assert f'"log_format": "{expected_processing(string)}"' in logged_event
# And curly braces too!
for string, _exception, _exception_message in freaky_format_strings:
file = StringIO()
nucypher_logger.observer = get_json_observer_for_file(file)
nucypher_logger.info(string)
logged_event = file.getvalue()
assert '"log_level": {"name": "info"' in logged_event
assert f'"log_format": "{expected_processing(string)}"' in logged_event
def test_pause_all_logging_while():
# get state beforehand
former_global_observers = dict(GlobalLoggerSettings._observers)
former_registered_observers = list(globalLogPublisher._observers)
with GlobalLoggerSettings.pause_all_logging_while():
# within context manager
assert len(GlobalLoggerSettings._observers) == 0
assert len(globalLogPublisher._observers) == 0
# exited context manager
assert former_global_observers == GlobalLoggerSettings._observers
assert former_registered_observers == globalLogPublisher._observers
@pytest.mark.parametrize("global_log_level", LogLevel._enumerants.values())
def test_log_level_adhered_to(global_log_level, mocker):
old_log_level = GlobalLoggerSettings.log_level.name
try:
GlobalLoggerSettings.set_log_level(global_log_level.name)
logger_observer = mocker.Mock()
logger = Logger("test-logger")
logger.observer = logger_observer
message = "People without self-doubt should never put themselves in a position of complete power" # - Chuck Rhoades (Billions)
num_logged_events = 0
for level in LogLevel._enumerants.values():
# call logger.<level>(message)
getattr(logger, level.name)(message)
if level >= global_log_level:
num_logged_events += 1
# else not logged
assert logger_observer.call_count == num_logged_events
finally:
GlobalLoggerSettings.set_log_level(old_log_level)
@pytest.mark.parametrize("global_log_level", LogLevel._enumerants.values())
def test_observer_log_level_wrapper_for_adhering_to_log_levels(
global_log_level, mocker
):
old_log_level = GlobalLoggerSettings.log_level.name
try:
GlobalLoggerSettings.set_log_level(global_log_level.name)
logger_observer = mocker.Mock()
logger = TwistedLogger("test-logger")
logger.observer = observer_log_level_wrapper(logger_observer)
message = "People without self-doubt should never put themselves in a position of complete power" # - Chuck Rhoades (Billions)
num_logged_events = 0
for level in LogLevel._enumerants.values():
# call logger.<level>(message)
getattr(logger, level.name)(message)
if level >= global_log_level:
num_logged_events += 1
# else not logged
assert logger_observer.call_count == num_logged_events
finally:
GlobalLoggerSettings.set_log_level(old_log_level)
@pytest.mark.parametrize("global_log_level", LogLevel._enumerants.values())
def test_global_observer_uses_wrapper_for_adhering_to_log_levels(
global_log_level, mocker
):
original_method = observer_log_level_wrapper
logger_observer = mocker.Mock()
def adjusted_call(observer):
# replace observer used with our Mock
return original_method(logger_observer)
old_log_level = GlobalLoggerSettings.log_level.name
try:
GlobalLoggerSettings.set_log_level(global_log_level.name)
with GlobalLoggerSettings.pause_all_logging_while():
# patch call to wrapper, to replace global observer with our own
mocker.patch(
"nucypher.utilities.logging.observer_log_level_wrapper",
side_effect=adjusted_call,
)
GlobalLoggerSettings.start_console_logging()
assert len(GlobalLoggerSettings._observers) == 1
logger = TwistedLogger("test-logger")
message = "People without self-doubt should never put themselves in a position of complete power" # - Chuck Rhoades (Billions)
num_logged_events = 0
for level in LogLevel._enumerants.values():
# call logger.<level>(message)
getattr(logger, level.name)(message)
if level >= global_log_level:
num_logged_events += 1
# else not logged
assert logger_observer.call_count == num_logged_events
finally:
GlobalLoggerSettings.set_log_level(old_log_level)
@pytest.mark.parametrize(
"start_logging_fn, stop_logging_fn, logging_type",
(
(
GlobalLoggerSettings.start_console_logging,
GlobalLoggerSettings.stop_console_logging,
GlobalLoggerSettings.LoggingType.CONSOLE,
),
(
GlobalLoggerSettings.start_text_file_logging,
GlobalLoggerSettings.stop_text_file_logging,
GlobalLoggerSettings.LoggingType.TEXT,
),
(
GlobalLoggerSettings.start_json_file_logging,
GlobalLoggerSettings.stop_json_file_logging,
GlobalLoggerSettings.LoggingType.JSON,
),
(
partial(GlobalLoggerSettings.start_sentry_logging, dsn="mock_dsn"),
GlobalLoggerSettings.stop_sentry_logging,
GlobalLoggerSettings.LoggingType.SENTRY,
),
),
)
@mock.patch("nucypher.utilities.logging.initialize_sentry", return_value=None)
def test_addition_removal_global_observers(
sentry_mock_init, start_logging_fn, stop_logging_fn, logging_type
):
with GlobalLoggerSettings.pause_all_logging_while():
# start logging
start_logging_fn()
assert len(globalLogPublisher._observers) == 1
assert len(GlobalLoggerSettings._observers) == 1
original_global_observer = GlobalLoggerSettings._observers[logging_type]
assert original_global_observer is not None
assert original_global_observer in globalLogPublisher._observers
# try starting again - noop
start_logging_fn()
assert len(globalLogPublisher._observers) == 1
assert original_global_observer in globalLogPublisher._observers
assert len(GlobalLoggerSettings._observers) == 1
assert GlobalLoggerSettings._observers[logging_type] == original_global_observer
# stop logging
stop_logging_fn()
assert len(globalLogPublisher._observers) == 0
assert len(GlobalLoggerSettings._observers) == 0
# try stopping again, when already stopped/removed - noop
stop_logging_fn()
def test_stdout_emitter_link_to_logging(mocker):
message = "Learn from the mistakes of others. You cant live long enough to make them all yourself." # - Eleanor Roosevelt
emit_fn = mocker.Mock()
with mocker.patch("nucypher.utilities.logging.Logger.emit", side_effect=emit_fn):
call_count = 0
assert emit_fn.call_count == call_count
emitter = StdoutEmitter()
# message(...)
for verbosity in [0, 1, 2]:
emitter.message(message=message, verbosity=verbosity)
call_count += 1
assert emit_fn.call_count == call_count
if verbosity < 2:
emit_fn.assert_called_with(LogLevel.info, message)
else:
emit_fn.assert_called_with(LogLevel.debug, message)
# echo(...)
for verbosity in [0, 1, 2]:
emitter.echo(message=message, verbosity=verbosity)
assert (
emit_fn.call_count == call_count
), "user interactions so no change in call count"
# error(...)
exception = ValueError(message)
emitter.error(exception)
call_count += 1
assert emit_fn.call_count == call_count
emit_fn.assert_called_with(LogLevel.error, str(exception))
# banner(...)
emitter.banner(banner=message)
assert (
emit_fn.call_count == call_count
), "just a banner so no change in call count"