Rework log (setup and filtering) implementation

Overview
========
Motivated by issue #3037 (Ensure proper log line/filename in
warning/errors).

``pelican.log`` is no longer required to be early in the import order,
as the package no longer relies on a custom logging class.

Logging setup is now uncoupled so that format and filters can be applied
independently, which makes both setup and testing more flexible.

Behavior of commandline calls have not changed, except for some
modification to help texts.

``LOG_FATAL`` and ``LOG_ONCE_LEVEL`` are new configuration keys.

Summary of changes
==================

:pelican/__init__.py:

    * Parsing commandline:
        * Unify type-converstion from string to logging levels.
        * Tweak some help text in ``parse_arguments``.
    * Preserve ``log`` namespace throughout (for clarity).
    * FIX: ``pelican.listen`` verbosity: avoid assumption that the
      logger's level will be NOTSET.

:pelican/log.py:

    * FIX (#3037): warnings & errors now display the correct line numbers.
    * Split logging filters with focus on a specific aspect of behavior.
      This allows for more
        a) independence in testing and
        b) powerful logger configuration, with fewer side effects.
    * Isolate configuration *reading* from configuration *application*
      (see ``pelican/settings.py``).

:pelican/settings.py:

    * Remove ``pelican.log`` dependency.
    * Isolate configuration *reading* from configuration *application*;
      to remove dependency on a global state when modifying log filters.
    * Add validation for LOG_LIMIT values.

:pelican/tests/__init__.py:

    * Remove dependency on ``pelican.log`` module as test runners setup
      their own console handler anyway.
    * Only add ``NullHandler`` in absence of a configured logger.

:pelican/tests/support.py:

    * Remove ``LoggedTestCase`` class; tests requiring logging analysis
      use a contextmanager (``LogCountHandler.examine``) instead.

:pelican/tests/test_log.py:

    * Rewrite to reflect changes in pelican's logging module.
    * Increase code coverage.

:pelican/tests/test_contents.py:
:pelican/tests/test_pelican.py:
:pelican/tests/test_utils.py:

    * Import ``unittest`` directly, rather than from other modules.
    * Switch to using contextmanager (``LogCountHandler.examine``) for
      test-specific log inspection.
    * Remove dependency on a custom TestCase (used in logging analysis).
    * Remove code involved with temporarily disabling log filters --
      the filters in question are now only enabled in tests directly
      testing them (eg. in ``test_log.py``)

:pelican/tests/test_settings.py:

    * Add test for LOG_FILTER values from configs.
This commit is contained in:
Ryan de Kleer 2023-10-04 22:01:38 -07:00
commit b662bdbc50
10 changed files with 1004 additions and 368 deletions

View file

@ -15,10 +15,7 @@ from collections.abc import Iterable
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# pelican.log has to be the first pelican module to be loaded
# because logging.setLoggerClass has to be called before logging.getLogger
from pelican.log import console
from pelican.log import init as init_logging
from pelican import log
from pelican.generators import (ArticlesGenerator, # noqa: I100
PagesGenerator, SourceFileGenerator,
StaticGenerator, TemplatePagesGenerator)
@ -164,15 +161,18 @@ class Pelican:
'draft page',
'draft pages')
console.print('Done: Processed {}, {}, {}, {}, {} and {} in {:.2f} seconds.'
.format(
pluralized_articles,
pluralized_drafts,
pluralized_hidden_articles,
pluralized_pages,
pluralized_hidden_pages,
pluralized_draft_pages,
time.time() - start_time))
log.console.print(
'Done: Processed {}, {}, {}, {}, {} and {} in {:.2f} seconds.'
.format(
pluralized_articles,
pluralized_drafts,
pluralized_hidden_articles,
pluralized_pages,
pluralized_hidden_pages,
pluralized_draft_pages,
time.time() - start_time
)
)
def _get_generator_classes(self):
discovered_generators = [
@ -229,13 +229,13 @@ class Pelican:
class PrintSettings(argparse.Action):
def __call__(self, parser, namespace, values, option_string):
init_logging(name=__name__)
log.init(level=logging.WARNING)
try:
instance, settings = get_instance(namespace)
except Exception as e:
logger.critical("%s: %s", e.__class__.__name__, e)
console.print_exception()
log.console.print_exception()
sys.exit(getattr(e, 'exitcode', 1))
if values:
@ -247,15 +247,17 @@ class PrintSettings(argparse.Action):
setting_format = '\n{}:\n{}'
else:
setting_format = '\n{}: {}'
console.print(setting_format.format(
log.console.print(setting_format.format(
setting,
pprint.pformat(settings[setting])))
else:
console.print('\n{} is not a recognized setting.'.format(setting))
log.console.print(
'\n{} is not a recognized setting.'.format(setting)
)
break
else:
# No argument was given to --print-settings, so print all settings
console.print(settings)
log.console.print(settings)
parser.exit()
@ -358,15 +360,17 @@ def parse_arguments(argv=None):
dest='selected_paths', default=None,
help='Comma separated list of selected paths to write')
parser.add_argument('--fatal', metavar='errors|warnings',
choices=('errors', 'warnings'), default='',
parser.add_argument('--fatal', choices=('errors', 'warnings'),
default=None,
# type conversion deferred (see: [FATAL] section below)
help=('Exit the program with non-zero status if any '
'errors/warnings encountered.'))
parser.add_argument('--logs-dedup-min-level', default='WARNING',
choices=('DEBUG', 'INFO', 'WARNING', 'ERROR'),
help=('Only enable log de-duplication for levels equal'
' to or above the specified value'))
metavar=('logging severity level'),
type=log.severity_from_name, dest='once_lvl',
help=('Enable log de-duplication for this severity'
' level and above.'))
parser.add_argument('-l', '--listen', dest='listen', action='store_true',
help='Serve content files via HTTP and port 8000.')
@ -390,6 +394,10 @@ def parse_arguments(argv=None):
args = parser.parse_args(argv)
# [FATAL] value type is converted *after* parsing, to get around argparse
# complaining that converted type does not match the "choices" param type.
args.fatal = log.severity_from_name(args.fatal)
if args.port is not None and not args.listen:
logger.warning('--port without --listen has no effect')
if args.bind is not None and not args.listen:
@ -424,6 +432,10 @@ def get_config(args):
config['PORT'] = args.port
if args.bind is not None:
config['BIND'] = args.bind
if args.fatal is not None:
config['LOG_FATAL'] = args.fatal
if args.once_lvl is not None:
config['LOG_ONCE_LEVEL'] = args.once_lvl
config['DEBUG'] = args.verbosity == logging.DEBUG
config.update(args.overrides)
@ -449,8 +461,11 @@ def get_instance(args):
def autoreload(args, excqueue=None):
console.print(' --- AutoReload Mode: Monitoring `content`, `theme` and'
' `settings` for changes. ---')
log.console.print(
' --- '
'AutoReload Mode: Monitoring `content`, `theme` and `settings` for changes.'
' ---'
)
pelican, settings = get_instance(args)
settings_file = os.path.abspath(args.settings)
while True:
@ -463,7 +478,7 @@ def autoreload(args, excqueue=None):
if settings_file in changed_files:
pelican, settings = get_instance(args)
console.print('\n-> Modified: {}. re-generating...'.format(
log.console.print('\n-> Modified: {}. re-generating...'.format(
', '.join(changed_files)))
except KeyboardInterrupt:
@ -485,9 +500,14 @@ def autoreload(args, excqueue=None):
def listen(server, port, output, excqueue=None):
# set logging level to at least "INFO" (so we can see the server requests)
if logger.level < logging.INFO:
logger.setLevel(logging.INFO)
# elevate logging to *at least* "INFO" level to see the server requests:
server_logger = logging.getLogger('pelican.server')
if server_logger.getEffectiveLevel() > logging.INFO:
logObj = server_logger
while logObj:
if logObj.level and logObj.level > logging.INFO:
logObj.setLevel(logging.INFO)
logObj = logObj.parent
RootedHTTPServer.allow_reuse_address = True
try:
@ -500,8 +520,8 @@ def listen(server, port, output, excqueue=None):
return
try:
console.print("Serving site at: http://{}:{} - Tap CTRL-C to stop".format(
server, port))
log.console.print("Serving site at: http://{}:{} - Tap CTRL-C to stop".format(
server, port))
httpd.serve_forever()
except Exception as e:
if excqueue is not None:
@ -517,15 +537,28 @@ def listen(server, port, output, excqueue=None):
def main(argv=None):
args = parse_arguments(argv)
logs_dedup_min_level = getattr(logging, args.logs_dedup_min_level)
init_logging(level=args.verbosity, fatal=args.fatal,
name=__name__, logs_dedup_min_level=logs_dedup_min_level)
# logging: add rich handler & set verbosity
debug_mode = logging.DEBUG == args.verbosity
log.init(level=args.verbosity)
logger.debug('Pelican version: %s', __version__)
logger.debug('Python version: %s', sys.version.split()[0])
try:
pelican, settings = get_instance(args)
logger.info("Reading config...")
# temp handler to catch possible "fatal" logs while applying settings
with log.temp_handler(
logging.getLogger(),
log.AbortHandler(args.fatal or logging.CRITICAL)
):
pelican, settings = get_instance(args)
if debug_mode:
# LOG_FILTER is disabled in --debug
settings.pop('LOG_FILTER')
log.configure(settings)
if args.autoreload and args.listen:
excqueue = multiprocessing.Queue()
@ -551,13 +584,15 @@ def main(argv=None):
listen(settings.get('BIND'), settings.get('PORT'),
settings.get("OUTPUT_PATH"))
else:
with console.status("Generating..."):
with log.console.status("Generating..."):
pelican.run()
except KeyboardInterrupt:
logger.warning('Keyboard interrupt received. Exiting.')
except Exception as e:
logger.critical("%s: %s", e.__class__.__name__, e)
if debug_mode: # include traceback:
log.AbortHandler.trim_traceback(e)
log.console.print_exception()
else:
log.console.print(e)
if args.verbosity == logging.DEBUG:
console.print_exception()
sys.exit(getattr(e, 'exitcode', 1))

View file

@ -1,142 +1,446 @@
import logging
from collections import defaultdict
from contextlib import contextmanager
from rich.console import Console
from rich.logging import RichHandler
__all__ = [
'init'
]
console = Console()
# handles messages logged from anywhere within the pelican namespace.
PKG_LOGGER = logging.getLogger('pelican')
def severity_from_name(string_val):
"""Convert string value to appropriate severity (AKA log level)."""
if not string_val:
return logging.NOTSET
level_name = string_val.upper().rstrip('S')
try:
return logging.getLevelNamesMapping()[level_name]
except AttributeError: # Python < 3.11
return logging._nameToLevel.copy()[level_name]
@contextmanager
def temp_filter(logger, flt):
"""
Context in which a custom filter is temporarily applied to logger.
"""
try:
logger.addFilter(flt)
yield flt
finally:
logger.removeFilter(flt)
@contextmanager
def temp_handler(logger, hnd):
"""
Context in which a custom handler is temporarily applied to logger.
"""
try:
logger.addHandler(hnd)
yield hnd
finally:
logger.removeHandler(hnd)
class AbortException(Exception):
def __init__(self, *args, record=None):
self.record = record
super().__init__(*args)
class AbortHandler(logging.Handler):
"""
Set severity level to trigger an AbortException (eg. to exit application).
Example::
# add a handler to raise an exception on logs of warning level or higher
root = logging.getLogger()
root.addHandler(AbortHandler(logging.WARNING))
root.info("Safe")
try:
root.warning("Critical!")
except AbortHandler.EXCEPTION_CLS as e:
print(e)
print('exiting...')
raise
"""
EXCEPTION_CLS = AbortException
_HANDLER_DEPTH = 3
@classmethod
def trim_traceback(cls, e):
# only trim if exception came from this handler...
if isinstance(e, cls.EXCEPTION_CLS):
# find bottom of tb ...
next_tb = e.__traceback__.tb_next
record = e.record
while next_tb:
# Seek traceback for logging call that triggered the handler
if (
next_tb.tb_frame.f_code.co_filename == record.pathname
and
next_tb.tb_lineno == record.lineno
):
next_tb.tb_next = None # remaining traceback is irrelevant
break
else:
next_tb = next_tb.tb_next
def emit(self, record):
# Fail on set severity level (or higher)
if (record.levelno >= self.level):
exc = AbortException(
'Aborting on log severity of {lvl!r} or greater.'.format(
lvl=logging.getLevelName(self.level)
),
record=record
)
raise exc
class LimitFilter(logging.Filter):
"""
Remove duplicates records, and limit the number of records in the same
group.
Modify logs using a generic message (`limit_msg`) after reaching threshold.
Groups are specified by the message to use when the number of records in
the same group hit the limit.
E.g.: log.warning(('43 is not the answer', 'More erroneous answers'))
Messages logged with the `limit_msg` keyword in `extra` are tracked as a
group; once the group has reached the threshold, the record is modified to
use the limit_msg instead. All subsequent log messages in the group are
ignored.
EG.::
inputs = list(range(43, 55))
for i in inputs:
logger.warning(
'%s is not the answer', i,
extra={
'limit_msg': 'Many erroneous answers: %s',
'limit_args': inputs
}
)
"""
LOGS_DEDUP_MIN_LEVEL = logging.WARNING
@classmethod
def get_filters(cls, logger):
"""Return a list of LimitFilters currently on logger."""
found = []
for x in logger.filters:
if isinstance(x, cls):
found.append(x)
return found
_ignore = set()
_raised_messages = set()
_threshold = 5
_group_count = defaultdict(int)
def __init__(self, threshold=5):
"""
:int threshold:
For messages logged with the `limit_msg` in extra, the limit_msg
will be used after the threshold is met.
Example::
if resource.is_missing:
logger.warning(
'The resource %s is missing', resource.name,
extra={'limit_msg': 'Other resources were missing'}
)
# if threshold=5, logs:
# WARNING: The resource prettiest_cat.jpg is missing
# WARNING: The resource best_cat_ever.jpg is missing
# WARNING: The resource cutest_cat.jpg is missing
# WARNING: The resource lolcat.jpg is missing
# WARNING: Other resources were missing
"""
super().__init__()
self.limit_threshold = None
if threshold is not None:
# react at, not after, threshold
self.limit_threshold = threshold - 1
self.group_count = defaultdict(int)
def filter(self, record):
# don't limit log messages for anything above "warning"
if record.levelno > self.LOGS_DEDUP_MIN_LEVEL:
# ``limit_msg`` handling
limit_template = record.__dict__.get('limit_msg', None)
if limit_template:
key = (record.levelno, limit_template)
counter = self.group_count[key]
if counter > self.limit_threshold:
return False # reject: threshold exceeded
elif counter == self.limit_threshold:
# change message to the "limit_msg"
record.msg = limit_template
record.args = record.__dict__.get('limit_args', ())
self.group_count[key] += 1
return True # accept
class OnceFilter(logging.Filter):
"""
Allow only the first occurance of matching log messages.
Similar to the "once" behavior in the ``warnings`` module.
Parameters
----------
level: int
Only enable log de-duplication for levels equal to or above this
value.
"""
def __init__(self, level=logging.NOTSET):
super().__init__()
self.level = level
self._ignored = set()
def ignore(self, level, message):
self._ignored.add((level, message))
def filter(self, record):
if record.levelno < self.level:
# Do not dedeuplicate messages of lower severity
return True
# extract group
group = record.__dict__.get('limit_msg', None)
group_args = record.__dict__.get('limit_args', ())
# ignore duplicates of resolved/formatted messages:
message_key = (record.levelno, record.getMessage()) # resolved msg
ignored = self._ignored
# ignore record if it was already raised
message_key = (record.levelno, record.getMessage())
if message_key in self._raised_messages:
return False
if message_key in ignored:
return False # skip messages already encountered
else:
self._raised_messages.add(message_key)
ignored.add(message_key) # continue, but mark to ignore next time
# ignore LOG_FILTER records by templates or messages
# when "debug" isn't enabled
logger_level = logging.getLogger().getEffectiveLevel()
if logger_level > logging.DEBUG:
template_key = (record.levelno, record.msg)
message_key = (record.levelno, record.getMessage())
if (template_key in self._ignore or message_key in self._ignore):
return False
# check if we went over threshold
if group:
key = (record.levelno, group)
self._group_count[key] += 1
if self._group_count[key] == self._threshold:
record.msg = group
record.args = group_args
elif self._group_count[key] > self._threshold:
return False
return True
class LimitLogger(logging.Logger):
class BlacklistFilter(logging.Filter):
"""Use to prevent messages of specific level & content from emitting."""
def __init__(self, item_pairs=()):
super().__init__()
self._ignored = set()
for lvl, msg in item_pairs:
self.ignore(lvl, msg)
@classmethod
def get_filters(cls, logger):
"""Return a list of BlacklistFilters currently on logger."""
found = []
for x in logger.filters:
if isinstance(x, cls):
found.append(x)
return found
def ignore(self, level, message):
self._ignored.add((level, message))
def filter(self, record):
if not self._ignored:
return True
if (
((record.levelno, record.msg) in self._ignored)
or
((record.levelno, record.getMessage()) in self._ignored)
):
return False # do not emit msg
return True
def init(
level=None, # init root log level
handlers=None, # root log handler
):
"""
A logger which adds LimitFilter automatically
Wrapper for logging.basicConfig, but with a rich.console default handler.
If the root logger has *not already been configured* (eg. has no handlers),
then the specified handler (default:rich.ConsoleHandler) will be added, and
the logging level will be set (Python defaults to WARNING level).
"""
limit_filter = LimitFilter()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.enable_filter()
def disable_filter(self):
self.removeFilter(LimitLogger.limit_filter)
def enable_filter(self):
self.addFilter(LimitLogger.limit_filter)
class FatalLogger(LimitLogger):
warnings_fatal = False
errors_fatal = False
def warning(self, *args, **kwargs):
super().warning(*args, **kwargs)
if FatalLogger.warnings_fatal:
raise RuntimeError('Warning encountered')
def error(self, *args, **kwargs):
super().error(*args, **kwargs)
if FatalLogger.errors_fatal:
raise RuntimeError('Error encountered')
logging.setLoggerClass(FatalLogger)
# force root logger to be of our preferred class
logging.getLogger().__class__ = FatalLogger
def init(level=None, fatal='', handler=RichHandler(console=console), name=None,
logs_dedup_min_level=None):
FatalLogger.warnings_fatal = fatal.startswith('warning')
FatalLogger.errors_fatal = bool(fatal)
LOG_FORMAT = "%(message)s"
if handlers is None: # default handler:
handlers = [RichHandler(console=console)]
logging.basicConfig(
level=level,
format=LOG_FORMAT,
datefmt="[%H:%M:%S]",
handlers=[handler]
format="%(message)s", # default format
datefmt="[%H:%M:%S]", # default date format
handlers=handlers
)
logger = logging.getLogger(name)
if level:
logger.setLevel(level)
if logs_dedup_min_level:
LimitFilter.LOGS_DEDUP_MIN_LEVEL = logs_dedup_min_level
def configure(settings):
"""
Apply logging settings, and return the configured logger.
Applicable settings
--------------------
LOG_LOGGER:
Default: ``"pelican"`` (PKG_LOGGER.name).
The name of the logger to apply filtering to.
LOG_LIMIT_THRESHOLD:
Default: ``5``.
The nth occurance at which a log message will be substituted with its
``limit_msg`` value.
LOG_ONCE_LEVEL:
Default: ``logging.WARNING``
The lowest severity at which log messages will be deduplicated.
LOG_FILTER:
An iterable of (severity, msg) tuples which will be silenced by the
logger.
LOG_FATAL:
The minimum severity at which logs should raise an ``AbortException``.
NOTE: this appends a (filtered) handler to the root logger; this
handler will *only* act on messages the LOG_LOGGER would propogate --
effectively still targeting *only* logs from the specified logger (and
its sub-loggers), despite being at the root level. This allows all
handlers to complete *before* triggering the ``AbortException``.
"""
cfg = dict( # config values that have defaults:
LOG_LOGGER=PKG_LOGGER.name,
LOG_LIMIT_THRESHOLD=5,
LOG_ONCE_LEVEL="WARNING"
)
cfg.update(settings.copy())
# validate & transform types/values: treat ``None`` as logging.NOTSET
if not isinstance(cfg.get('LOG_ONCE_LEVEL') or logging.NOTSET, int):
cfg['LOG_ONCE_LEVEL'] = severity_from_name(cfg['LOG_ONCE_LEVEL'])
if not isinstance(cfg.get('LOG_LOGGER'), logging.Logger):
cfg['LOG_LOGGER'] = logging.getLogger(cfg.get('LOG_LOGGER'))
# == FILTERING:
logger = set_filters(
logger=cfg.get('LOG_LOGGER'),
limit_threshold=cfg.get('LOG_LIMIT_THRESHOLD'),
once_lvl=cfg.get('LOG_ONCE_LEVEL'),
blacklist=cfg.get('LOG_FILTER')
)
# == HANDLING:
if cfg.get('LOG_FATAL'):
hnd = AbortHandler(cfg['LOG_FATAL'])
# Add to root logger so all other handlers can write/emit first.
root = logging.getLogger()
if logger is not root:
# handler only acts on records from specified logger (& children).
hnd.addFilter(logging.Filter(logger.name))
root.addHandler(hnd)
return logger
def log_warnings():
import warnings
logging.captureWarnings(True)
warnings.simplefilter("default", DeprecationWarning)
init(logging.DEBUG, name='py.warnings')
def set_filters(
logger=None, # logger of interest
limit_threshold=None, # LimitFilter: when to start using limit_msg
once_lvl=None, # OnceFilter: which severity to dedupe msgs at
blacklist=None # BlacklistFilter:
):
"""
Apply filtering on the logger (Default: ``PKG_LOGGER``).
Up to three filters are (optionally) applied:
``LimitFilter``:
Modifies log records using `limit_msg` in the `extras` dict. The
`limit_threshold` specifies how many records using the same
limit_msg are emitted before converting the message. See
``LimitFilter`` for an example.
``OnceFilter``:
De-duplicates messages of identical severity & content. Only
affects messages of once_lvl and higher.
``BlacklistFilter``:
Prevents log records of (severity, msg) from being emitted.
Returns
-------
The logger object the filters were applied to.
"""
if logger is None:
logger = PKG_LOGGER
if blacklist:
try:
flt = BlacklistFilter.get_filters(logger)[0]
except IndexError:
flt = BlacklistFilter()
logger.addFilter(flt)
for lvl, msg in blacklist:
flt.ignore(lvl, msg)
if once_lvl is not None:
# deduplicate this severity and above:
logger.addFilter(OnceFilter(once_lvl))
if limit_threshold is not None:
try:
flt = LimitFilter.get_filters(logger)[0].threshold = limit_threshold
except IndexError:
logger.addFilter(LimitFilter(limit_threshold))
logger.debug("{!r} : log filtering configured.".format(logger))
return logger
if __name__ == '__main__':
init(level=logging.DEBUG, name=__name__)
if __name__ == "__main__":
init(level=logging.INFO)
segue = "And now for something completely different: %s"
ignored_records = [
(logging.WARNING, "I'd like to have an argument, please."),
(logging.INFO, segue)
]
configure(
dict(
LOG_FATAL=logging.ERROR,
LOG_FILTER=ignored_records
)
)
logger = PKG_LOGGER
logger.info("hey")
logger.info("hey") # INFO < WARNING, not deduplicated
logger.log(*ignored_records[0]) # blacklisted
logger.log(*ignored_records[1]) # blacklisted
logger.info(segue, "a man with 3 buttocks.") # blacklisted
logger.warning("oof")
logger.warning("oof") # deduplicated
logger.warning("oof") # deduplicated
root_logger = logging.getLogger(__name__)
root_logger.debug('debug')
root_logger.info('info')
root_logger.warning('warning')
root_logger.error('error')
root_logger.critical('critical')
# on 5th occurance, limit_msg is used.
for i in range(1, 11):
logger.warning("watch out x%s!", i,
extra={"limit_msg": "[more watches were outed]"})
# on 5th occurance, limit_msg is used, formatting msg w/ limit_args
inputs = list(range(43, 55))
for i in inputs:
logger.warning(
'%s is not the answer', i,
extra={
'limit_msg': 'Many erroneous answers: %s',
'limit_args': inputs
}
)
# logger not from pkg, will not trip the AbortHandler
logging.getLogger("notpelican").error("that's not good...")
# logger from pkg: *will* trip the AbortHandler
logging.getLogger("pelican.log.example").error("x_x")
logger.critical("This line should not run.")

View file

@ -9,8 +9,6 @@ import sys
from os.path import isabs
from pathlib import Path
from pelican.log import LimitFilter
def load_source(name, path):
spec = importlib.util.spec_from_file_location(name, path)
@ -516,9 +514,18 @@ def configure_settings(settings):
raise Exception('You need to specify a path containing the content'
' (see pelican --help for more information)')
# specify the log messages to be ignored
log_filter = settings.get('LOG_FILTER', DEFAULT_CONFIG['LOG_FILTER'])
LimitFilter._ignore.update(set(log_filter))
# LOG_FILTER expects a list of (log_level, str) pairs.
blacklist = settings.get('LOG_FILTER', DEFAULT_CONFIG['LOG_FILTER'])
for n, (lvl, msg) in enumerate(blacklist):
if not isinstance(lvl, int):
raise ValueError(
"Invalid LOG_FILTER (item {n}): level={lvl}".format(**locals())
)
if not isinstance(msg, str):
raise ValueError(
"Invalid LOG_FILTER (item {n}): msg={msg!r}".format(**locals())
)
settings['LOG_FILTER'] = blacklist
# lookup the theme in "pelican/themes" if the given one doesn't exist
if not os.path.isdir(settings['THEME']):

View file

@ -1,15 +1,13 @@
import logging
import warnings
from pelican.log import log_warnings
# Direct Warnings to the "py.warnings" logger
logging.captureWarnings(True)
# redirect warnings module to use logging instead
log_warnings()
# setup warnings to log DeprecationWarning's and error on
# warnings in pelican's codebase
# enable DeprecationWarnings
warnings.simplefilter("default", DeprecationWarning)
# treat warnings in pelican's codebase as errors
warnings.filterwarnings("error", ".*", Warning, "pelican")
# Add a NullHandler to silence warning about no available handlers
logging.getLogger().addHandler(logging.NullHandler())
# Use: pytest --cli-log-level DEBUG for debug-level logging
logging.basicConfig(handlers=[logging.NullHandler()])

View file

@ -11,12 +11,13 @@ from io import StringIO
from logging.handlers import BufferingHandler
from shutil import rmtree
from tempfile import mkdtemp
from typing import Optional
from pelican.contents import Article
from pelican.readers import default_metadata
from pelican.settings import DEFAULT_CONFIG
__all__ = ['get_article', 'unittest', ]
__all__ = ['get_article', 'unittest', 'LogCountHandler']
@contextmanager
@ -26,7 +27,7 @@ def temporary_folder():
This allows to do something like this in tests:
>>> with temporary_folder() as d:
# do whatever you want
... pass # do whatever you want
"""
tempdir = mkdtemp()
try:
@ -76,8 +77,8 @@ def mute(returns_output=False):
execution, so be careful with what you apply it to.
>>> def numbers():
print "42"
print "1984"
... print("42")
... print("1984")
...
>>> numbers()
42
@ -193,37 +194,12 @@ def get_context(settings=None, **kwargs):
return context
class LogCountHandler(BufferingHandler):
"""Capturing and counting logged messages."""
def __init__(self, capacity=1000):
super().__init__(capacity)
def count_logs(self, msg=None, level=None):
return len([
rec
for rec
in self.buffer
if (msg is None or re.match(msg, rec.getMessage())) and
(level is None or rec.levelno == level)
])
def count_formatted_logs(self, msg=None, level=None):
return len([
rec
for rec
in self.buffer
if (msg is None or re.search(msg, self.format(rec))) and
(level is None or rec.levelno == level)
])
def diff_subproc(first, second):
"""
Return a subprocess that runs a diff on the two paths.
Check results with::
>>> proc = diff_subproc("first.txt","second.txt")
>>> out_stream, err_stream = proc.communicate()
>>> didCheckFail = proc.returnCode != 0
"""
@ -236,24 +212,103 @@ def diff_subproc(first, second):
)
class LoggedTestCase(unittest.TestCase):
"""A test case that captures log messages."""
class LogCountHandler(BufferingHandler):
"""Capturing and counting logged messages."""
def setUp(self):
super().setUp()
self._logcount_handler = LogCountHandler()
logging.getLogger().addHandler(self._logcount_handler)
def __init__(self, capacity=1000):
super().__init__(capacity)
def tearDown(self):
logging.getLogger().removeHandler(self._logcount_handler)
super().tearDown()
@classmethod
@contextmanager
def examine(cls, loggerObj):
"""
Context in which a logger's propagated messages can be examined.
def assertLogCountEqual(self, count=None, msg=None, **kwargs):
actual = self._logcount_handler.count_logs(msg=msg, **kwargs)
self.assertEqual(
actual, count,
msg='expected {} occurrences of {!r}, but found {}'.format(
count, msg, actual))
Yields
======
A handle to ``LogCountHandler.assert_count`` that has been added to the
specified logger for the duration of the context.
The yielded caller can be used to assert whether a certain number of
log messages have occurred within the context.
"""
hnd = cls()
try:
loggerObj.addHandler(hnd)
yield hnd.assert_count
finally:
loggerObj.removeHandler(hnd)
def assert_count(
self,
count: int,
msg: Optional[str] = None,
level: Optional[int] = None,
as_regex: bool = False
):
"""
Assert how often the specified messages have been handled.
Raises
-------
AssertionError
"""
occurances = self.count_logs(msg, level, as_regex)
if count != occurances:
report = 'Logged occurrence'
if msg is not None:
report += ' of {!r}'.format(msg)
if level is not None:
raise AssertionError(
' at {}'.format(logging.getLevelName(level))
)
raise AssertionError(
report + ': expected/found {}/{}'.format(count, occurances)
)
def match_record(
self,
pattern: re.Pattern,
record: logging.LogRecord,
level: Optional[int]
) -> Optional[re.Match]:
"""
Return regex object if pattern found in message at specified severity.
"""
if level is not None and level != record.levelno:
return None
# prefix pattern with "^" for re.match behavior
return pattern.search(record.getMessage())
def count_logs(
self,
msg: Optional[str],
level: Optional[int],
as_regex: bool = False
) -> int:
"""
Returns the number of times a message has been logged.
"""
if not msg:
if not level:
matched = self.buffer # all logged messages
else:
# all logged messages of matching severity level
matched = [rec for rec in self.buffer if rec.levelno == level]
else:
# all logged messages matching the regex and level
if not as_regex:
msg = re.escape(msg)
pattern = re.compile(msg)
matched = [
record for record in self.buffer
if self.match_record(pattern, record, level)
]
return len(matched)
class TestCaseWithCLocale(unittest.TestCase):

View file

@ -2,6 +2,7 @@ import datetime
import locale
import logging
import os.path
import unittest
from posixpath import join as posix_join
from sys import platform
@ -10,17 +11,15 @@ from jinja2.utils import generate_lorem_ipsum
from pelican.contents import Article, Author, Category, Page, Static
from pelican.plugins.signals import content_object_init
from pelican.settings import DEFAULT_CONFIG
from pelican.tests.support import (LoggedTestCase, get_context, get_settings,
unittest)
from pelican.tests.support import (LogCountHandler, get_context, get_settings)
from pelican.utils import (path_to_url, posixize_path, truncate_html_words)
# generate one paragraph, enclosed with <p>
TEST_CONTENT = str(generate_lorem_ipsum(n=1))
TEST_SUMMARY = generate_lorem_ipsum(n=1, html=False)
class TestBase(LoggedTestCase):
class TestBase(unittest.TestCase):
def setUp(self):
super().setUp()
@ -41,19 +40,9 @@ class TestBase(LoggedTestCase):
},
'source_path': '/path/to/file/foo.ext'
}
self._disable_limit_filter()
def tearDown(self):
locale.setlocale(locale.LC_ALL, self.old_locale)
self._enable_limit_filter()
def _disable_limit_filter(self):
from pelican.contents import logger
logger.disable_filter()
def _enable_limit_filter(self):
from pelican.contents import logger
logger.enable_filter()
def _copy_page_kwargs(self):
# make a deep copy of page_kwargs
@ -82,11 +71,15 @@ class TestPage(TestBase):
def test_mandatory_properties(self):
# If the title is not set, must throw an exception.
page = Page('content')
self.assertFalse(page._has_valid_mandatory_properties())
self.assertLogCountEqual(
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
page = Page('content')
self.assertFalse(page._has_valid_mandatory_properties())
count_msgs(
count=1,
msg="Skipping .*: could not find information about 'title'",
as_regex=True,
level=logging.ERROR)
page = Page('content', metadata={'title': 'foobar'})
self.assertTrue(page._has_valid_mandatory_properties())
@ -132,11 +125,16 @@ class TestPage(TestBase):
page_kwargs = self._copy_page_kwargs()
page = Page(**page_kwargs)
self.assertEqual(page.summary, TEST_SUMMARY)
self.assertEqual(page._get_summary(), TEST_SUMMARY)
self.assertLogCountEqual(
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
self.assertEqual(page._get_summary(), TEST_SUMMARY)
count_msgs(
count=1,
msg=r"_get_summary\(\) has been deprecated since 3\.6\.4\. "
"Use the summary decorator instead",
msg=(
"_get_summary() has been deprecated since 3.6.4. Use the"
" summary decorator instead"
),
level=logging.WARNING)
def test_slug(self):
@ -761,7 +759,7 @@ class TestArticle(TestBase):
self.assertTrue(article._has_valid_save_as())
class TestStatic(LoggedTestCase):
class TestStatic(unittest.TestCase):
def setUp(self):
super().setUp()
@ -986,30 +984,34 @@ class TestStatic(LoggedTestCase):
metadata={'title': 'fakepage'}, settings=self.settings,
source_path=os.path.join('dir', 'otherdir', 'fakepage.md'),
context=self.context)
content = page.get_content('')
self.assertEqual(content, html)
self.assertLogCountEqual(
count=1,
msg="Replacement Indicator 'unknown' not recognized, "
"skipping replacement",
level=logging.WARNING)
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
content = page.get_content('')
self.assertEqual(content, html)
count_msgs(
count=1,
msg="Replacement Indicator 'unknown' not recognized, "
"skipping replacement",
level=logging.WARNING)
def test_link_to_unknown_file(self):
"{filename} link to unknown file should trigger warning."
html = '<a href="{filename}foo">link</a>'
page = Page(content=html,
metadata={'title': 'fakepage'}, settings=self.settings,
source_path=os.path.join('dir', 'otherdir', 'fakepage.md'),
context=self.context)
content = page.get_content('')
self.assertEqual(content, html)
self.assertLogCountEqual(
count=1,
msg="Unable to find 'foo', skipping url replacement.",
level=logging.WARNING)
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
page = Page(content=html,
metadata={'title': 'fakepage'}, settings=self.settings,
source_path=os.path.join('dir', 'otherdir', 'fakepage.md'),
context=self.context)
content = page.get_content('')
self.assertEqual(content, html)
count_msgs(
count=1,
msg="Unable to find 'foo', skipping url replacement.",
level=logging.WARNING)
def test_index_link_syntax_with_spaces(self):
"""{index} link syntax triggers url replacement

View file

@ -1,82 +1,268 @@
import logging
import unittest
from collections import defaultdict
from contextlib import contextmanager
from pelican import log
from pelican.tests.support import LogCountHandler
class TestLog(unittest.TestCase):
LOG_LVLS = [
logging.DEBUG,
logging.INFO,
logging.WARNING,
logging.ERROR,
logging.CRITICAL
]
# Set this module's logging level to DEBUG to prevent
# tests' loggers effectiveLevels from reaching **root logger**,
# which we *don't* want to play with here.
logging.getLogger(__name__).setLevel(logging.DEBUG)
class _LoggingTestCase(unittest.TestCase):
"""Test Case w/ test-specific loggers, and contexts for temp filters."""
def setUp(self):
super().setUp()
self.logger = logging.getLogger(__name__)
self.handler = LogCountHandler()
self.logger.addHandler(self.handler)
def tearDown(self):
self._reset_limit_filter()
super().tearDown()
def _reset_limit_filter(self):
log.LimitFilter._ignore = set()
log.LimitFilter._raised_messages = set()
log.LimitFilter._threshold = 5
log.LimitFilter._group_count = defaultdict(int)
"Each test should use a unique logger and handle all levels."
self.logger = logging.getLogger(self.id())
self.logger.setLevel(logging.NOTSET) # log all levels
@contextmanager
def reset_logger(self):
try:
yield None
finally:
self._reset_limit_filter()
self.handler.flush()
def temp_filter(self, flt):
"""
Context in which a filter is temporarily applied to test's logger.
"""
with log.temp_filter(self.logger, flt):
yield flt
def test_log_filter(self):
def do_logging():
for i in range(5):
self.logger.warning('Log %s', i)
self.logger.warning('Another log %s', i)
# no filter
with self.reset_logger():
do_logging()
self.assertEqual(
self.handler.count_logs('Log \\d', logging.WARNING),
5)
self.assertEqual(
self.handler.count_logs('Another log \\d', logging.WARNING),
5)
# filter by template
with self.reset_logger():
log.LimitFilter._ignore.add((logging.WARNING, 'Log %s'))
do_logging()
self.assertEqual(
self.handler.count_logs('Log \\d', logging.WARNING),
0)
self.assertEqual(
self.handler.count_logs('Another log \\d', logging.WARNING),
5)
class TestLogLevel(unittest.TestCase):
# filter by exact message
with self.reset_logger():
log.LimitFilter._ignore.add((logging.WARNING, 'Log 3'))
do_logging()
self.assertEqual(
self.handler.count_logs('Log \\d', logging.WARNING),
4)
self.assertEqual(
self.handler.count_logs('Another log \\d', logging.WARNING),
5)
def test_severity_name(self):
self.assertEqual(log.severity_from_name('errors'), logging.ERROR)
self.assertEqual(log.severity_from_name('error'), logging.ERROR)
self.assertEqual(log.severity_from_name('WARNINGS'), logging.WARNING)
self.assertEqual(log.severity_from_name('info'), logging.INFO)
self.assertEqual(log.severity_from_name('INFO'), logging.INFO)
self.assertEqual(log.severity_from_name(''), logging.NOTSET)
self.assertEqual(log.severity_from_name('error'), logging.ERROR)
# filter by both
with self.reset_logger():
log.LimitFilter._ignore.add((logging.WARNING, 'Log 3'))
log.LimitFilter._ignore.add((logging.WARNING, 'Another log %s'))
do_logging()
self.assertEqual(
self.handler.count_logs('Log \\d', logging.WARNING),
4)
self.assertEqual(
self.handler.count_logs('Another log \\d', logging.WARNING),
0)
# add a custom level:
logging.addLevelName(5, "TRACE")
self.assertEqual(log.severity_from_name('trace'), 5)
with self.assertRaises(KeyError):
self.assertEqual(log.severity_from_name("wrong"), logging.NOTSET)
class TestFatal(_LoggingTestCase):
def test_levels(self):
logger = self.logger
with log.temp_handler(self.logger, log.AbortHandler(logging.ERROR)):
logger.debug("Calculating airspeed velocity of an unladen swallow...")
logger.info("Tis but a scratch.")
logger.warning("And now for something completely different.")
with self.assertRaises(log.AbortException):
logger.error("This is a late parrot!")
with self.assertRaises(log.AbortException):
logger.critical("Unexpected: Spanish Inquisition.")
class TestLogInit(_LoggingTestCase):
def test_alt_logger(self):
self.assertEqual(log.set_filters(self.logger), self.logger)
def test_default(self):
null_settings = dict(
LOG_LIMIT_THRESHOLD=None,
LOG_ONCE_LEVEL=None,
)
# assert returned logger is "pelican".
pkg_logger = logging.getLogger('pelican')
self.assertEqual(pkg_logger.filters, [])
self.assertEqual(pkg_logger, log.configure(null_settings))
# because no filtering settings supplied,
# no filters should have been added:
self.assertEqual(pkg_logger.filters, [])
# assert no pre-existing filters on test's logger:
self.assertEqual(log.LimitFilter.get_filters(self.logger), [])
logger = log.configure(dict(LOG_LOGGER=self.logger.name))
# assert returned logger has been set up with a LimitFilter
self.assertNotEqual(log.LimitFilter.get_filters(self.logger), [])
with LogCountHandler.examine(logger) as count_msgs:
msg = 'init with alt logger: %s' % logger
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg) # suppressed for WARNINGS and worse
count_msgs(
sum([1 if lvl >= logging.WARNING # dedupe warnings & above
else 2 # lvls below should not be deduped
for lvl in LOG_LVLS]),
msg
)
def test_dedupe_all(self):
logger = log.set_filters(self.logger, once_lvl=logging.NOTSET) # dedup all
with LogCountHandler.examine(logger) as count_msgs:
msg = 'init with alt logger: %s' % logger
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg) # poss. suppressed
count_msgs(len(LOG_LVLS), msg)
def test_dedupe_none(self):
logger = log.set_filters(self.logger, once_lvl=None)
with LogCountHandler.examine(logger) as count_msgs:
msg = 'init with alt logger: %s' % logger
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg) # do not suppress
count_msgs(len(LOG_LVLS * 2), msg)
class classTestOnceFilter(_LoggingTestCase):
def test_levels(self):
"""
Don't de-duplicate messages for levels below filter's value.
"""
logger = self.logger
msg = "levels_test"
with LogCountHandler.examine(logger) as count_msgs:
with self.temp_filter(log.OnceFilter()): # default=NOTSET: dedup all
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg) # copy should be suppressed
count_msgs(len(LOG_LVLS), msg)
with LogCountHandler.examine(logger) as count_msgs:
with self.temp_filter(log.OnceFilter(logging.WARNING)):
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg) # copy should be suppressed
count_msgs(
sum([1 if lvl >= logging.WARNING # dedupe warnings & above
else 2 # lvls below should not be deduped
for lvl in LOG_LVLS]),
msg
)
def test_deduplication(self):
"""Confirm that duplicate messages are ignored."""
logger = self.logger
# ensure counting is correct before adding filters
msg = "without filter"
with LogCountHandler.examine(logger) as count_msgs:
for level in LOG_LVLS:
logger.log(level, msg)
# count w/ level requirements
count_msgs(1, msg, level=logging.DEBUG)
count_msgs(1, msg, level=logging.INFO)
# count w/ no level requirqement
count_msgs(len(LOG_LVLS), msg)
# add filter, check count
with self.temp_filter(log.OnceFilter()):
msg = "with filter"
with LogCountHandler.examine(logger) as count_msgs:
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg)
logger.log(level, msg)
# counting w/ level requirements
for level in LOG_LVLS:
count_msgs(1, msg, level=level)
# counting w/ no level requirement
count_msgs(len(LOG_LVLS), msg)
# check counting again after filter removed
msg = "removed filter"
with LogCountHandler.examine(logger) as count_msgs:
for level in LOG_LVLS:
logger.log(level, msg)
count_msgs(5, msg)
class TestLimitFilter(_LoggingTestCase):
def setUp(self):
super().setUp()
self.logger.setLevel(logging.DEBUG)
def test_get_filters(self):
"""Assert LimitFilters on a logger are collected."""
fltrs = [log.LimitFilter() for x in range(5)]
assert not self.logger.filters
for each in fltrs:
self.logger.addFilter(each)
self.assertEqual(fltrs, log.LimitFilter.get_filters(self.logger))
for each in fltrs:
self.logger.removeFilter(each)
assert not self.logger.filters
def test_threshold(self):
"""
Assert that the `limit_msg` activates after the threshold is reached.
"""
threshold = 4
with self.temp_filter(log.LimitFilter(threshold)):
msg = "Threshold: %s"
limits = {'limit_msg': "Threshold exceeded"}
with LogCountHandler.examine(self.logger) as count_msgs:
for i in range(1, threshold + 3):
self.logger.warning(msg, i, extra=limits)
# up to threshold
count_msgs(threshold - 1, "Threshold: [1-9+]", as_regex=True)
# exactly one "limit_msg"
count_msgs(1, limits['limit_msg'])
# total count should be == threshold, all in all.
count_msgs(threshold)
def test_limit_args(self):
"""
`limit_args` should be used in the `limit_msg`.
"""
threshold = 3
with self.temp_filter(log.LimitFilter(threshold)):
limits = {'limit_msg': "Threshold exceeded @ %s %s",
'limit_args': ('count of', threshold)}
with LogCountHandler.examine(self.logger) as count_msgs:
for i in range(threshold + 1):
self.logger.warning("Threshold: %s", i, extra=limits)
count_msgs(1, limits['limit_msg'] % limits['limit_args'])
def test_ignore(self):
"""Confirm blacklisted messages (eg. from config) are not emitted."""
blacklist = [
"SPAM SPAM SPAM",
"I'll have your spam. I love it."
]
with self.temp_filter(log.BlacklistFilter()) as flt:
for template in blacklist:
flt.ignore(logging.WARNING, template)
with LogCountHandler.examine(self.logger) as count_msgs:
count_msgs(0)
self.logger.warning(blacklist[0]) # ignored
self.logger.warning(blacklist[0]) # ignored
self.logger.warning("I don't like spam!") # 1
self.logger.warning("Sshh, dear, don't cause a fuss.") # 2
self.logger.warning(blacklist[1]) # ignored
self.logger.info(blacklist[1]) # 3 : diff. level (below)
self.logger.error(blacklist[1]) # 4 : diff. level (above)
count_msgs(4)

View file

@ -12,7 +12,7 @@ from pelican import Pelican
from pelican.generators import StaticGenerator
from pelican.settings import read_settings
from pelican.tests.support import (
LoggedTestCase,
LogCountHandler,
diff_subproc,
locale_available,
mute,
@ -20,8 +20,9 @@ from pelican.tests.support import (
)
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
SAMPLES_PATH = os.path.abspath(os.path.join(
CURRENT_DIR, os.pardir, os.pardir, 'samples'))
SAMPLES_PATH = os.path.abspath(
os.path.join(CURRENT_DIR, os.pardir, os.pardir, 'samples')
)
OUTPUT_PATH = os.path.abspath(os.path.join(CURRENT_DIR, 'output'))
INPUT_PATH = os.path.join(SAMPLES_PATH, "content")
@ -41,7 +42,7 @@ def recursiveDiff(dcmp):
return diff
class TestPelican(LoggedTestCase):
class TestPelican(unittest.TestCase):
# general functional testing for pelican. Basically, this test case tries
# to run pelican in different situations and see how it behaves
@ -88,7 +89,8 @@ class TestPelican(LoggedTestCase):
"StaticGenerator must be the last generator, but it isn't!")
self.assertIsInstance(
generator_classes, Sequence,
"_get_generator_classes() must return a Sequence to preserve order")
"_get_generator_classes() must return a Sequence to preserve order"
)
@skipIfNoExecutable(['git', '--version'])
def test_basic_generation_works(self):
@ -101,14 +103,19 @@ class TestPelican(LoggedTestCase):
'LOCALE': locale.normalize('en_US'),
})
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
mute(True)(pelican.run)()
count_msgs(
count=1,
msg="Unable to find .* skipping url replacement",
as_regex=True,
level=logging.WARNING
)
self.assertDirsEqual(
self.temp_path, os.path.join(OUTPUT_PATH, 'basic')
)
self.assertLogCountEqual(
count=1,
msg="Unable to find.*skipping url replacement",
level=logging.WARNING)
@skipIfNoExecutable(['git', '--version'])
def test_custom_generation_works(self):
@ -198,16 +205,16 @@ class TestPelican(LoggedTestCase):
],
'LOCALE': locale.normalize('en_US'),
})
pelican = Pelican(settings=settings)
logger = logging.getLogger()
orig_level = logger.getEffectiveLevel()
logger = logging.getLogger("pelican.writers")
logger.setLevel(logging.INFO)
mute(True)(pelican.run)()
logger.setLevel(orig_level)
self.assertLogCountEqual(
count=2,
msg="Writing .*",
level=logging.INFO)
with LogCountHandler.examine(logger) as count_msgs:
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
count_msgs(count=2,
msg="Writing .*",
as_regex=True,
level=logging.INFO)
def test_cyclic_intersite_links_no_warnings(self):
settings = read_settings(path=None, override={
@ -215,37 +222,46 @@ class TestPelican(LoggedTestCase):
'OUTPUT_PATH': self.temp_path,
'CACHE_PATH': self.temp_cache,
})
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
# There are four different intersite links:
# - one pointing to the second article from first and third
# - one pointing to the first article from second and third
# - one pointing to the third article from first and second
# - one pointing to a nonexistent from each
# If everything goes well, only the warning about the nonexistent
# article should be printed. Only two articles are not sufficient,
# since the first will always have _context['generated_content'] empty
# (thus skipping the link resolving) and the second will always have it
# non-empty, containing the first, thus always succeeding.
self.assertLogCountEqual(
count=1,
msg="Unable to find '.*\\.rst', skipping url replacement.",
level=logging.WARNING)
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
# There are four different intersite links:
# - one pointing to the second article from first and third
# - one pointing to the first article from second and third
# - one pointing to the third article from first and second
# - one pointing to a nonexistent from each If everything goes
# well, only the warning about the nonexistent article should be
# printed. Only two articles are not sufficient, since the first
# will always have _context['generated_content'] empty (thus
# skipping the link resolving) and the second will always have it
# non-empty, containing the first, thus always succeeding.
count_msgs(
count=3,
msg="Unable to find '.*[.]rst', skipping url replacement.",
as_regex=True,
level=logging.WARNING
)
def test_md_extensions_deprecation(self):
"""Test that a warning is issued if MD_EXTENSIONS is used"""
settings = read_settings(path=None, override={
'PATH': INPUT_PATH,
'OUTPUT_PATH': self.temp_path,
'CACHE_PATH': self.temp_cache,
'MD_EXTENSIONS': {},
})
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
self.assertLogCountEqual(
count=1,
msg="MD_EXTENSIONS is deprecated use MARKDOWN instead.",
level=logging.WARNING)
with LogCountHandler.examine(
logging.getLogger("pelican.settings")
) as count_msgs:
settings = read_settings(path=None, override={
'PATH': INPUT_PATH,
'OUTPUT_PATH': self.temp_path,
'CACHE_PATH': self.temp_cache,
'MD_EXTENSIONS': {},
})
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
count_msgs(
count=1,
msg="MD_EXTENSIONS is deprecated use MARKDOWN instead.",
level=logging.WARNING
)
def test_parse_errors(self):
# Verify that just an error is printed and the application doesn't
@ -255,12 +271,17 @@ class TestPelican(LoggedTestCase):
'OUTPUT_PATH': self.temp_path,
'CACHE_PATH': self.temp_cache,
})
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
self.assertLogCountEqual(
count=1,
msg="Could not process .*parse_error.rst",
level=logging.ERROR)
with LogCountHandler.examine(
logging.getLogger("pelican.generators")
) as count_msgs:
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
count_msgs(
count=1,
msg="Could not process .*parse_error.rst",
as_regex=True,
level=logging.ERROR
)
def test_module_load(self):
"""Test loading via python -m pelican --help displays the help"""

View file

@ -1,5 +1,6 @@
import copy
import locale
import logging
import os
from os.path import abspath, dirname, join
@ -16,6 +17,7 @@ class TestSettingsConfiguration(unittest.TestCase):
append new values to the settings (if any), and apply basic settings
optimizations.
"""
def setUp(self):
self.old_locale = locale.setlocale(locale.LC_ALL)
locale.setlocale(locale.LC_ALL, 'C')
@ -304,3 +306,19 @@ class TestSettingsConfiguration(unittest.TestCase):
[(r'C\+\+', 'cpp')] +
self.settings['SLUG_REGEX_SUBSTITUTIONS'])
self.assertNotIn('SLUG_SUBSTITUTIONS', settings)
def test_log_filter_setting(self):
read_settings(None, override=dict(
LOG_FILTER=[
(logging.INFO, "Ignored Info message."),
(logging.WARNING, "Ignored Warning message."),
(logging.ERROR, "Ignored Error message.")
]
))
with self.assertRaises(ValueError):
read_settings(None, override=dict(
LOG_FILTER=[
("Wrong order.", logging.ERROR)
]
))

View file

@ -2,6 +2,7 @@ import locale
import logging
import os
import shutil
import unittest
from datetime import timezone
from sys import platform
from tempfile import mkdtemp
@ -11,15 +12,19 @@ try:
except ModuleNotFoundError:
from backports.zoneinfo import ZoneInfo
from pelican import log
from pelican import utils
from pelican.generators import TemplatePagesGenerator
from pelican.settings import read_settings
from pelican.tests.support import (LoggedTestCase, get_article,
locale_available, unittest)
from pelican.tests.support import (
LogCountHandler,
get_article,
locale_available
)
from pelican.writers import Writer
class TestUtils(LoggedTestCase):
class TestUtils(unittest.TestCase):
_new_attribute = 'new_value'
def setUp(self):
@ -37,13 +42,17 @@ class TestUtils(LoggedTestCase):
return None
def test_deprecated_attribute(self):
value = self._old_attribute
self.assertEqual(value, self._new_attribute)
self.assertLogCountEqual(
count=1,
msg=('_old_attribute has been deprecated since 3.1.0 and will be '
'removed by version 4.1.3. Use _new_attribute instead'),
level=logging.WARNING)
with LogCountHandler.examine(log.PKG_LOGGER) as count_msgs:
value = self._old_attribute
self.assertEqual(value, self._new_attribute)
count_msgs(
count=1,
msg=(
'_old_attribute has been deprecated since 3.1.0 and will '
'be removed by version 4.1.3. Use _new_attribute instead'
),
level=logging.WARNING
)
def test_get_date(self):
# valid ones
@ -269,9 +278,10 @@ class TestUtils(LoggedTestCase):
'<p>' + 'word ' * 20 + '<span>marker</span></p>')
self.assertEqual(
utils.truncate_html_words(
'<span\nstyle="\n\n">' + 'word ' * 100 + '</span>', 20,
'<span>marker</span>'),
'<span\nstyle="\n\n">' + 'word ' * 20 + '<span>marker</span></span>')
'<span\nstyle="\n\n">' + 'word ' * 100 + '</span>', 20,
'<span>marker</span>'),
'<span\nstyle="\n\n">' + 'word ' * 20 + '<span>marker</span></span>'
)
self.assertEqual(
utils.truncate_html_words('<br>' + 'word ' * 100, 20,
'<span>marker</span>'),