This commit is contained in:
Ryan de Kleer 2023-10-30 16:09:53 +01:00 committed by GitHub
commit a03acfea9e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 1004 additions and 368 deletions

View file

@ -15,10 +15,7 @@ from collections.abc import Iterable
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# pelican.log has to be the first pelican module to be loaded
# because logging.setLoggerClass has to be called before logging.getLogger
from pelican.log import console
from pelican.log import init as init_logging
from pelican import log
from pelican.generators import (ArticlesGenerator, # noqa: I100
PagesGenerator, SourceFileGenerator,
StaticGenerator, TemplatePagesGenerator)
@ -164,15 +161,18 @@ class Pelican:
'draft page',
'draft pages')
console.print('Done: Processed {}, {}, {}, {}, {} and {} in {:.2f} seconds.'
.format(
pluralized_articles,
pluralized_drafts,
pluralized_hidden_articles,
pluralized_pages,
pluralized_hidden_pages,
pluralized_draft_pages,
time.time() - start_time))
log.console.print(
'Done: Processed {}, {}, {}, {}, {} and {} in {:.2f} seconds.'
.format(
pluralized_articles,
pluralized_drafts,
pluralized_hidden_articles,
pluralized_pages,
pluralized_hidden_pages,
pluralized_draft_pages,
time.time() - start_time
)
)
def _get_generator_classes(self):
discovered_generators = [
@ -229,13 +229,13 @@ class Pelican:
class PrintSettings(argparse.Action):
def __call__(self, parser, namespace, values, option_string):
init_logging(name=__name__)
log.init(level=logging.WARNING)
try:
instance, settings = get_instance(namespace)
except Exception as e:
logger.critical("%s: %s", e.__class__.__name__, e)
console.print_exception()
log.console.print_exception()
sys.exit(getattr(e, 'exitcode', 1))
if values:
@ -247,15 +247,17 @@ class PrintSettings(argparse.Action):
setting_format = '\n{}:\n{}'
else:
setting_format = '\n{}: {}'
console.print(setting_format.format(
log.console.print(setting_format.format(
setting,
pprint.pformat(settings[setting])))
else:
console.print('\n{} is not a recognized setting.'.format(setting))
log.console.print(
'\n{} is not a recognized setting.'.format(setting)
)
break
else:
# No argument was given to --print-settings, so print all settings
console.print(settings)
log.console.print(settings)
parser.exit()
@ -358,15 +360,17 @@ def parse_arguments(argv=None):
dest='selected_paths', default=None,
help='Comma separated list of selected paths to write')
parser.add_argument('--fatal', metavar='errors|warnings',
choices=('errors', 'warnings'), default='',
parser.add_argument('--fatal', choices=('errors', 'warnings'),
default=None,
# type conversion deferred (see: [FATAL] section below)
help=('Exit the program with non-zero status if any '
'errors/warnings encountered.'))
parser.add_argument('--logs-dedup-min-level', default='WARNING',
choices=('DEBUG', 'INFO', 'WARNING', 'ERROR'),
help=('Only enable log de-duplication for levels equal'
' to or above the specified value'))
metavar=('logging severity level'),
type=log.severity_from_name, dest='once_lvl',
help=('Enable log de-duplication for this severity'
' level and above.'))
parser.add_argument('-l', '--listen', dest='listen', action='store_true',
help='Serve content files via HTTP and port 8000.')
@ -390,6 +394,10 @@ def parse_arguments(argv=None):
args = parser.parse_args(argv)
# [FATAL] value type is converted *after* parsing, to get around argparse
# complaining that converted type does not match the "choices" param type.
args.fatal = log.severity_from_name(args.fatal)
if args.port is not None and not args.listen:
logger.warning('--port without --listen has no effect')
if args.bind is not None and not args.listen:
@ -424,6 +432,10 @@ def get_config(args):
config['PORT'] = args.port
if args.bind is not None:
config['BIND'] = args.bind
if args.fatal is not None:
config['LOG_FATAL'] = args.fatal
if args.once_lvl is not None:
config['LOG_ONCE_LEVEL'] = args.once_lvl
config['DEBUG'] = args.verbosity == logging.DEBUG
config.update(args.overrides)
@ -449,8 +461,11 @@ def get_instance(args):
def autoreload(args, excqueue=None):
console.print(' --- AutoReload Mode: Monitoring `content`, `theme` and'
' `settings` for changes. ---')
log.console.print(
' --- '
'AutoReload Mode: Monitoring `content`, `theme` and `settings` for changes.'
' ---'
)
pelican, settings = get_instance(args)
settings_file = os.path.abspath(args.settings)
while True:
@ -463,7 +478,7 @@ def autoreload(args, excqueue=None):
if settings_file in changed_files:
pelican, settings = get_instance(args)
console.print('\n-> Modified: {}. re-generating...'.format(
log.console.print('\n-> Modified: {}. re-generating...'.format(
', '.join(changed_files)))
except KeyboardInterrupt:
@ -485,9 +500,14 @@ def autoreload(args, excqueue=None):
def listen(server, port, output, excqueue=None):
# set logging level to at least "INFO" (so we can see the server requests)
if logger.level < logging.INFO:
logger.setLevel(logging.INFO)
# elevate logging to *at least* "INFO" level to see the server requests:
server_logger = logging.getLogger('pelican.server')
if server_logger.getEffectiveLevel() > logging.INFO:
logObj = server_logger
while logObj:
if logObj.level and logObj.level > logging.INFO:
logObj.setLevel(logging.INFO)
logObj = logObj.parent
RootedHTTPServer.allow_reuse_address = True
try:
@ -500,8 +520,8 @@ def listen(server, port, output, excqueue=None):
return
try:
console.print("Serving site at: http://{}:{} - Tap CTRL-C to stop".format(
server, port))
log.console.print("Serving site at: http://{}:{} - Tap CTRL-C to stop".format(
server, port))
httpd.serve_forever()
except Exception as e:
if excqueue is not None:
@ -517,15 +537,28 @@ def listen(server, port, output, excqueue=None):
def main(argv=None):
args = parse_arguments(argv)
logs_dedup_min_level = getattr(logging, args.logs_dedup_min_level)
init_logging(level=args.verbosity, fatal=args.fatal,
name=__name__, logs_dedup_min_level=logs_dedup_min_level)
# logging: add rich handler & set verbosity
debug_mode = logging.DEBUG == args.verbosity
log.init(level=args.verbosity)
logger.debug('Pelican version: %s', __version__)
logger.debug('Python version: %s', sys.version.split()[0])
try:
pelican, settings = get_instance(args)
logger.info("Reading config...")
# temp handler to catch possible "fatal" logs while applying settings
with log.temp_handler(
logging.getLogger(),
log.AbortHandler(args.fatal or logging.CRITICAL)
):
pelican, settings = get_instance(args)
if debug_mode:
# LOG_FILTER is disabled in --debug
settings.pop('LOG_FILTER')
log.configure(settings)
if args.autoreload and args.listen:
excqueue = multiprocessing.Queue()
@ -551,13 +584,15 @@ def main(argv=None):
listen(settings.get('BIND'), settings.get('PORT'),
settings.get("OUTPUT_PATH"))
else:
with console.status("Generating..."):
with log.console.status("Generating..."):
pelican.run()
except KeyboardInterrupt:
logger.warning('Keyboard interrupt received. Exiting.')
except Exception as e:
logger.critical("%s: %s", e.__class__.__name__, e)
if debug_mode: # include traceback:
log.AbortHandler.trim_traceback(e)
log.console.print_exception()
else:
log.console.print(e)
if args.verbosity == logging.DEBUG:
console.print_exception()
sys.exit(getattr(e, 'exitcode', 1))

View file

@ -1,142 +1,446 @@
import logging
from collections import defaultdict
from contextlib import contextmanager
from rich.console import Console
from rich.logging import RichHandler
__all__ = [
'init'
]
console = Console()
# handles messages logged from anywhere within the pelican namespace.
PKG_LOGGER = logging.getLogger('pelican')
def severity_from_name(string_val):
"""Convert string value to appropriate severity (AKA log level)."""
if not string_val:
return logging.NOTSET
level_name = string_val.upper().rstrip('S')
try:
return logging.getLevelNamesMapping()[level_name]
except AttributeError: # Python < 3.11
return logging._nameToLevel.copy()[level_name]
@contextmanager
def temp_filter(logger, flt):
"""
Context in which a custom filter is temporarily applied to logger.
"""
try:
logger.addFilter(flt)
yield flt
finally:
logger.removeFilter(flt)
@contextmanager
def temp_handler(logger, hnd):
"""
Context in which a custom handler is temporarily applied to logger.
"""
try:
logger.addHandler(hnd)
yield hnd
finally:
logger.removeHandler(hnd)
class AbortException(Exception):
def __init__(self, *args, record=None):
self.record = record
super().__init__(*args)
class AbortHandler(logging.Handler):
"""
Set severity level to trigger an AbortException (eg. to exit application).
Example::
# add a handler to raise an exception on logs of warning level or higher
root = logging.getLogger()
root.addHandler(AbortHandler(logging.WARNING))
root.info("Safe")
try:
root.warning("Critical!")
except AbortHandler.EXCEPTION_CLS as e:
print(e)
print('exiting...')
raise
"""
EXCEPTION_CLS = AbortException
_HANDLER_DEPTH = 3
@classmethod
def trim_traceback(cls, e):
# only trim if exception came from this handler...
if isinstance(e, cls.EXCEPTION_CLS):
# find bottom of tb ...
next_tb = e.__traceback__.tb_next
record = e.record
while next_tb:
# Seek traceback for logging call that triggered the handler
if (
next_tb.tb_frame.f_code.co_filename == record.pathname
and
next_tb.tb_lineno == record.lineno
):
next_tb.tb_next = None # remaining traceback is irrelevant
break
else:
next_tb = next_tb.tb_next
def emit(self, record):
# Fail on set severity level (or higher)
if (record.levelno >= self.level):
exc = AbortException(
'Aborting on log severity of {lvl!r} or greater.'.format(
lvl=logging.getLevelName(self.level)
),
record=record
)
raise exc
class LimitFilter(logging.Filter):
"""
Remove duplicates records, and limit the number of records in the same
group.
Modify logs using a generic message (`limit_msg`) after reaching threshold.
Groups are specified by the message to use when the number of records in
the same group hit the limit.
E.g.: log.warning(('43 is not the answer', 'More erroneous answers'))
Messages logged with the `limit_msg` keyword in `extra` are tracked as a
group; once the group has reached the threshold, the record is modified to
use the limit_msg instead. All subsequent log messages in the group are
ignored.
EG.::
inputs = list(range(43, 55))
for i in inputs:
logger.warning(
'%s is not the answer', i,
extra={
'limit_msg': 'Many erroneous answers: %s',
'limit_args': inputs
}
)
"""
LOGS_DEDUP_MIN_LEVEL = logging.WARNING
@classmethod
def get_filters(cls, logger):
"""Return a list of LimitFilters currently on logger."""
found = []
for x in logger.filters:
if isinstance(x, cls):
found.append(x)
return found
_ignore = set()
_raised_messages = set()
_threshold = 5
_group_count = defaultdict(int)
def __init__(self, threshold=5):
"""
:int threshold:
For messages logged with the `limit_msg` in extra, the limit_msg
will be used after the threshold is met.
Example::
if resource.is_missing:
logger.warning(
'The resource %s is missing', resource.name,
extra={'limit_msg': 'Other resources were missing'}
)
# if threshold=5, logs:
# WARNING: The resource prettiest_cat.jpg is missing
# WARNING: The resource best_cat_ever.jpg is missing
# WARNING: The resource cutest_cat.jpg is missing
# WARNING: The resource lolcat.jpg is missing
# WARNING: Other resources were missing
"""
super().__init__()
self.limit_threshold = None
if threshold is not None:
# react at, not after, threshold
self.limit_threshold = threshold - 1
self.group_count = defaultdict(int)
def filter(self, record):
# don't limit log messages for anything above "warning"
if record.levelno > self.LOGS_DEDUP_MIN_LEVEL:
# ``limit_msg`` handling
limit_template = record.__dict__.get('limit_msg', None)
if limit_template:
key = (record.levelno, limit_template)
counter = self.group_count[key]
if counter > self.limit_threshold:
return False # reject: threshold exceeded
elif counter == self.limit_threshold:
# change message to the "limit_msg"
record.msg = limit_template
record.args = record.__dict__.get('limit_args', ())
self.group_count[key] += 1
return True # accept
class OnceFilter(logging.Filter):
"""
Allow only the first occurance of matching log messages.
Similar to the "once" behavior in the ``warnings`` module.
Parameters
----------
level: int
Only enable log de-duplication for levels equal to or above this
value.
"""
def __init__(self, level=logging.NOTSET):
super().__init__()
self.level = level
self._ignored = set()
def ignore(self, level, message):
self._ignored.add((level, message))
def filter(self, record):
if record.levelno < self.level:
# Do not dedeuplicate messages of lower severity
return True
# extract group
group = record.__dict__.get('limit_msg', None)
group_args = record.__dict__.get('limit_args', ())
# ignore duplicates of resolved/formatted messages:
message_key = (record.levelno, record.getMessage()) # resolved msg
ignored = self._ignored
# ignore record if it was already raised
message_key = (record.levelno, record.getMessage())
if message_key in self._raised_messages:
return False
if message_key in ignored:
return False # skip messages already encountered
else:
self._raised_messages.add(message_key)
ignored.add(message_key) # continue, but mark to ignore next time
# ignore LOG_FILTER records by templates or messages
# when "debug" isn't enabled
logger_level = logging.getLogger().getEffectiveLevel()
if logger_level > logging.DEBUG:
template_key = (record.levelno, record.msg)
message_key = (record.levelno, record.getMessage())
if (template_key in self._ignore or message_key in self._ignore):
return False
# check if we went over threshold
if group:
key = (record.levelno, group)
self._group_count[key] += 1
if self._group_count[key] == self._threshold:
record.msg = group
record.args = group_args
elif self._group_count[key] > self._threshold:
return False
return True
class LimitLogger(logging.Logger):
class BlacklistFilter(logging.Filter):
"""Use to prevent messages of specific level & content from emitting."""
def __init__(self, item_pairs=()):
super().__init__()
self._ignored = set()
for lvl, msg in item_pairs:
self.ignore(lvl, msg)
@classmethod
def get_filters(cls, logger):
"""Return a list of BlacklistFilters currently on logger."""
found = []
for x in logger.filters:
if isinstance(x, cls):
found.append(x)
return found
def ignore(self, level, message):
self._ignored.add((level, message))
def filter(self, record):
if not self._ignored:
return True
if (
((record.levelno, record.msg) in self._ignored)
or
((record.levelno, record.getMessage()) in self._ignored)
):
return False # do not emit msg
return True
def init(
level=None, # init root log level
handlers=None, # root log handler
):
"""
A logger which adds LimitFilter automatically
Wrapper for logging.basicConfig, but with a rich.console default handler.
If the root logger has *not already been configured* (eg. has no handlers),
then the specified handler (default:rich.ConsoleHandler) will be added, and
the logging level will be set (Python defaults to WARNING level).
"""
limit_filter = LimitFilter()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.enable_filter()
def disable_filter(self):
self.removeFilter(LimitLogger.limit_filter)
def enable_filter(self):
self.addFilter(LimitLogger.limit_filter)
class FatalLogger(LimitLogger):
warnings_fatal = False
errors_fatal = False
def warning(self, *args, **kwargs):
super().warning(*args, **kwargs)
if FatalLogger.warnings_fatal:
raise RuntimeError('Warning encountered')
def error(self, *args, **kwargs):
super().error(*args, **kwargs)
if FatalLogger.errors_fatal:
raise RuntimeError('Error encountered')
logging.setLoggerClass(FatalLogger)
# force root logger to be of our preferred class
logging.getLogger().__class__ = FatalLogger
def init(level=None, fatal='', handler=RichHandler(console=console), name=None,
logs_dedup_min_level=None):
FatalLogger.warnings_fatal = fatal.startswith('warning')
FatalLogger.errors_fatal = bool(fatal)
LOG_FORMAT = "%(message)s"
if handlers is None: # default handler:
handlers = [RichHandler(console=console)]
logging.basicConfig(
level=level,
format=LOG_FORMAT,
datefmt="[%H:%M:%S]",
handlers=[handler]
format="%(message)s", # default format
datefmt="[%H:%M:%S]", # default date format
handlers=handlers
)
logger = logging.getLogger(name)
if level:
logger.setLevel(level)
if logs_dedup_min_level:
LimitFilter.LOGS_DEDUP_MIN_LEVEL = logs_dedup_min_level
def configure(settings):
"""
Apply logging settings, and return the configured logger.
Applicable settings
--------------------
LOG_LOGGER:
Default: ``"pelican"`` (PKG_LOGGER.name).
The name of the logger to apply filtering to.
LOG_LIMIT_THRESHOLD:
Default: ``5``.
The nth occurance at which a log message will be substituted with its
``limit_msg`` value.
LOG_ONCE_LEVEL:
Default: ``logging.WARNING``
The lowest severity at which log messages will be deduplicated.
LOG_FILTER:
An iterable of (severity, msg) tuples which will be silenced by the
logger.
LOG_FATAL:
The minimum severity at which logs should raise an ``AbortException``.
NOTE: this appends a (filtered) handler to the root logger; this
handler will *only* act on messages the LOG_LOGGER would propogate --
effectively still targeting *only* logs from the specified logger (and
its sub-loggers), despite being at the root level. This allows all
handlers to complete *before* triggering the ``AbortException``.
"""
cfg = dict( # config values that have defaults:
LOG_LOGGER=PKG_LOGGER.name,
LOG_LIMIT_THRESHOLD=5,
LOG_ONCE_LEVEL="WARNING"
)
cfg.update(settings.copy())
# validate & transform types/values: treat ``None`` as logging.NOTSET
if not isinstance(cfg.get('LOG_ONCE_LEVEL') or logging.NOTSET, int):
cfg['LOG_ONCE_LEVEL'] = severity_from_name(cfg['LOG_ONCE_LEVEL'])
if not isinstance(cfg.get('LOG_LOGGER'), logging.Logger):
cfg['LOG_LOGGER'] = logging.getLogger(cfg.get('LOG_LOGGER'))
# == FILTERING:
logger = set_filters(
logger=cfg.get('LOG_LOGGER'),
limit_threshold=cfg.get('LOG_LIMIT_THRESHOLD'),
once_lvl=cfg.get('LOG_ONCE_LEVEL'),
blacklist=cfg.get('LOG_FILTER')
)
# == HANDLING:
if cfg.get('LOG_FATAL'):
hnd = AbortHandler(cfg['LOG_FATAL'])
# Add to root logger so all other handlers can write/emit first.
root = logging.getLogger()
if logger is not root:
# handler only acts on records from specified logger (& children).
hnd.addFilter(logging.Filter(logger.name))
root.addHandler(hnd)
return logger
def log_warnings():
import warnings
logging.captureWarnings(True)
warnings.simplefilter("default", DeprecationWarning)
init(logging.DEBUG, name='py.warnings')
def set_filters(
logger=None, # logger of interest
limit_threshold=None, # LimitFilter: when to start using limit_msg
once_lvl=None, # OnceFilter: which severity to dedupe msgs at
blacklist=None # BlacklistFilter:
):
"""
Apply filtering on the logger (Default: ``PKG_LOGGER``).
Up to three filters are (optionally) applied:
``LimitFilter``:
Modifies log records using `limit_msg` in the `extras` dict. The
`limit_threshold` specifies how many records using the same
limit_msg are emitted before converting the message. See
``LimitFilter`` for an example.
``OnceFilter``:
De-duplicates messages of identical severity & content. Only
affects messages of once_lvl and higher.
``BlacklistFilter``:
Prevents log records of (severity, msg) from being emitted.
Returns
-------
The logger object the filters were applied to.
"""
if logger is None:
logger = PKG_LOGGER
if blacklist:
try:
flt = BlacklistFilter.get_filters(logger)[0]
except IndexError:
flt = BlacklistFilter()
logger.addFilter(flt)
for lvl, msg in blacklist:
flt.ignore(lvl, msg)
if once_lvl is not None:
# deduplicate this severity and above:
logger.addFilter(OnceFilter(once_lvl))
if limit_threshold is not None:
try:
flt = LimitFilter.get_filters(logger)[0].threshold = limit_threshold
except IndexError:
logger.addFilter(LimitFilter(limit_threshold))
logger.debug("{!r} : log filtering configured.".format(logger))
return logger
if __name__ == '__main__':
init(level=logging.DEBUG, name=__name__)
if __name__ == "__main__":
init(level=logging.INFO)
segue = "And now for something completely different: %s"
ignored_records = [
(logging.WARNING, "I'd like to have an argument, please."),
(logging.INFO, segue)
]
configure(
dict(
LOG_FATAL=logging.ERROR,
LOG_FILTER=ignored_records
)
)
logger = PKG_LOGGER
logger.info("hey")
logger.info("hey") # INFO < WARNING, not deduplicated
logger.log(*ignored_records[0]) # blacklisted
logger.log(*ignored_records[1]) # blacklisted
logger.info(segue, "a man with 3 buttocks.") # blacklisted
logger.warning("oof")
logger.warning("oof") # deduplicated
logger.warning("oof") # deduplicated
root_logger = logging.getLogger(__name__)
root_logger.debug('debug')
root_logger.info('info')
root_logger.warning('warning')
root_logger.error('error')
root_logger.critical('critical')
# on 5th occurance, limit_msg is used.
for i in range(1, 11):
logger.warning("watch out x%s!", i,
extra={"limit_msg": "[more watches were outed]"})
# on 5th occurance, limit_msg is used, formatting msg w/ limit_args
inputs = list(range(43, 55))
for i in inputs:
logger.warning(
'%s is not the answer', i,
extra={
'limit_msg': 'Many erroneous answers: %s',
'limit_args': inputs
}
)
# logger not from pkg, will not trip the AbortHandler
logging.getLogger("notpelican").error("that's not good...")
# logger from pkg: *will* trip the AbortHandler
logging.getLogger("pelican.log.example").error("x_x")
logger.critical("This line should not run.")

View file

@ -9,8 +9,6 @@ import sys
from os.path import isabs
from pathlib import Path
from pelican.log import LimitFilter
def load_source(name, path):
spec = importlib.util.spec_from_file_location(name, path)
@ -516,9 +514,18 @@ def configure_settings(settings):
raise Exception('You need to specify a path containing the content'
' (see pelican --help for more information)')
# specify the log messages to be ignored
log_filter = settings.get('LOG_FILTER', DEFAULT_CONFIG['LOG_FILTER'])
LimitFilter._ignore.update(set(log_filter))
# LOG_FILTER expects a list of (log_level, str) pairs.
blacklist = settings.get('LOG_FILTER', DEFAULT_CONFIG['LOG_FILTER'])
for n, (lvl, msg) in enumerate(blacklist):
if not isinstance(lvl, int):
raise ValueError(
"Invalid LOG_FILTER (item {n}): level={lvl}".format(**locals())
)
if not isinstance(msg, str):
raise ValueError(
"Invalid LOG_FILTER (item {n}): msg={msg!r}".format(**locals())
)
settings['LOG_FILTER'] = blacklist
# lookup the theme in "pelican/themes" if the given one doesn't exist
if not os.path.isdir(settings['THEME']):

View file

@ -1,15 +1,13 @@
import logging
import warnings
from pelican.log import log_warnings
# Direct Warnings to the "py.warnings" logger
logging.captureWarnings(True)
# redirect warnings module to use logging instead
log_warnings()
# setup warnings to log DeprecationWarning's and error on
# warnings in pelican's codebase
# enable DeprecationWarnings
warnings.simplefilter("default", DeprecationWarning)
# treat warnings in pelican's codebase as errors
warnings.filterwarnings("error", ".*", Warning, "pelican")
# Add a NullHandler to silence warning about no available handlers
logging.getLogger().addHandler(logging.NullHandler())
# Use: pytest --cli-log-level DEBUG for debug-level logging
logging.basicConfig(handlers=[logging.NullHandler()])

View file

@ -11,12 +11,13 @@ from io import StringIO
from logging.handlers import BufferingHandler
from shutil import rmtree
from tempfile import mkdtemp
from typing import Optional
from pelican.contents import Article
from pelican.readers import default_metadata
from pelican.settings import DEFAULT_CONFIG
__all__ = ['get_article', 'unittest', ]
__all__ = ['get_article', 'unittest', 'LogCountHandler']
@contextmanager
@ -26,7 +27,7 @@ def temporary_folder():
This allows to do something like this in tests:
>>> with temporary_folder() as d:
# do whatever you want
... pass # do whatever you want
"""
tempdir = mkdtemp()
try:
@ -76,8 +77,8 @@ def mute(returns_output=False):
execution, so be careful with what you apply it to.
>>> def numbers():
print "42"
print "1984"
... print("42")
... print("1984")
...
>>> numbers()
42
@ -193,37 +194,12 @@ def get_context(settings=None, **kwargs):
return context
class LogCountHandler(BufferingHandler):
"""Capturing and counting logged messages."""
def __init__(self, capacity=1000):
super().__init__(capacity)
def count_logs(self, msg=None, level=None):
return len([
rec
for rec
in self.buffer
if (msg is None or re.match(msg, rec.getMessage())) and
(level is None or rec.levelno == level)
])
def count_formatted_logs(self, msg=None, level=None):
return len([
rec
for rec
in self.buffer
if (msg is None or re.search(msg, self.format(rec))) and
(level is None or rec.levelno == level)
])
def diff_subproc(first, second):
"""
Return a subprocess that runs a diff on the two paths.
Check results with::
>>> proc = diff_subproc("first.txt","second.txt")
>>> out_stream, err_stream = proc.communicate()
>>> didCheckFail = proc.returnCode != 0
"""
@ -236,24 +212,103 @@ def diff_subproc(first, second):
)
class LoggedTestCase(unittest.TestCase):
"""A test case that captures log messages."""
class LogCountHandler(BufferingHandler):
"""Capturing and counting logged messages."""
def setUp(self):
super().setUp()
self._logcount_handler = LogCountHandler()
logging.getLogger().addHandler(self._logcount_handler)
def __init__(self, capacity=1000):
super().__init__(capacity)
def tearDown(self):
logging.getLogger().removeHandler(self._logcount_handler)
super().tearDown()
@classmethod
@contextmanager
def examine(cls, loggerObj):
"""
Context in which a logger's propagated messages can be examined.
def assertLogCountEqual(self, count=None, msg=None, **kwargs):
actual = self._logcount_handler.count_logs(msg=msg, **kwargs)
self.assertEqual(
actual, count,
msg='expected {} occurrences of {!r}, but found {}'.format(
count, msg, actual))
Yields
======
A handle to ``LogCountHandler.assert_count`` that has been added to the
specified logger for the duration of the context.
The yielded caller can be used to assert whether a certain number of
log messages have occurred within the context.
"""
hnd = cls()
try:
loggerObj.addHandler(hnd)
yield hnd.assert_count
finally:
loggerObj.removeHandler(hnd)
def assert_count(
self,
count: int,
msg: Optional[str] = None,
level: Optional[int] = None,
as_regex: bool = False
):
"""
Assert how often the specified messages have been handled.
Raises
-------
AssertionError
"""
occurances = self.count_logs(msg, level, as_regex)
if count != occurances:
report = 'Logged occurrence'
if msg is not None:
report += ' of {!r}'.format(msg)
if level is not None:
raise AssertionError(
' at {}'.format(logging.getLevelName(level))
)
raise AssertionError(
report + ': expected/found {}/{}'.format(count, occurances)
)
def match_record(
self,
pattern: re.Pattern,
record: logging.LogRecord,
level: Optional[int]
) -> Optional[re.Match]:
"""
Return regex object if pattern found in message at specified severity.
"""
if level is not None and level != record.levelno:
return None
# prefix pattern with "^" for re.match behavior
return pattern.search(record.getMessage())
def count_logs(
self,
msg: Optional[str],
level: Optional[int],
as_regex: bool = False
) -> int:
"""
Returns the number of times a message has been logged.
"""
if not msg:
if not level:
matched = self.buffer # all logged messages
else:
# all logged messages of matching severity level
matched = [rec for rec in self.buffer if rec.levelno == level]
else:
# all logged messages matching the regex and level
if not as_regex:
msg = re.escape(msg)
pattern = re.compile(msg)
matched = [
record for record in self.buffer
if self.match_record(pattern, record, level)
]
return len(matched)
class TestCaseWithCLocale(unittest.TestCase):

View file

@ -2,6 +2,7 @@ import datetime
import locale
import logging
import os.path
import unittest
from posixpath import join as posix_join
from sys import platform
@ -10,17 +11,15 @@ from jinja2.utils import generate_lorem_ipsum
from pelican.contents import Article, Author, Category, Page, Static
from pelican.plugins.signals import content_object_init
from pelican.settings import DEFAULT_CONFIG
from pelican.tests.support import (LoggedTestCase, get_context, get_settings,
unittest)
from pelican.tests.support import (LogCountHandler, get_context, get_settings)
from pelican.utils import (path_to_url, posixize_path, truncate_html_words)
# generate one paragraph, enclosed with <p>
TEST_CONTENT = str(generate_lorem_ipsum(n=1))
TEST_SUMMARY = generate_lorem_ipsum(n=1, html=False)
class TestBase(LoggedTestCase):
class TestBase(unittest.TestCase):
def setUp(self):
super().setUp()
@ -41,19 +40,9 @@ class TestBase(LoggedTestCase):
},
'source_path': '/path/to/file/foo.ext'
}
self._disable_limit_filter()
def tearDown(self):
locale.setlocale(locale.LC_ALL, self.old_locale)
self._enable_limit_filter()
def _disable_limit_filter(self):
from pelican.contents import logger
logger.disable_filter()
def _enable_limit_filter(self):
from pelican.contents import logger
logger.enable_filter()
def _copy_page_kwargs(self):
# make a deep copy of page_kwargs
@ -82,11 +71,15 @@ class TestPage(TestBase):
def test_mandatory_properties(self):
# If the title is not set, must throw an exception.
page = Page('content')
self.assertFalse(page._has_valid_mandatory_properties())
self.assertLogCountEqual(
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
page = Page('content')
self.assertFalse(page._has_valid_mandatory_properties())
count_msgs(
count=1,
msg="Skipping .*: could not find information about 'title'",
as_regex=True,
level=logging.ERROR)
page = Page('content', metadata={'title': 'foobar'})
self.assertTrue(page._has_valid_mandatory_properties())
@ -132,11 +125,16 @@ class TestPage(TestBase):
page_kwargs = self._copy_page_kwargs()
page = Page(**page_kwargs)
self.assertEqual(page.summary, TEST_SUMMARY)
self.assertEqual(page._get_summary(), TEST_SUMMARY)
self.assertLogCountEqual(
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
self.assertEqual(page._get_summary(), TEST_SUMMARY)
count_msgs(
count=1,
msg=r"_get_summary\(\) has been deprecated since 3\.6\.4\. "
"Use the summary decorator instead",
msg=(
"_get_summary() has been deprecated since 3.6.4. Use the"
" summary decorator instead"
),
level=logging.WARNING)
def test_slug(self):
@ -761,7 +759,7 @@ class TestArticle(TestBase):
self.assertTrue(article._has_valid_save_as())
class TestStatic(LoggedTestCase):
class TestStatic(unittest.TestCase):
def setUp(self):
super().setUp()
@ -986,30 +984,34 @@ class TestStatic(LoggedTestCase):
metadata={'title': 'fakepage'}, settings=self.settings,
source_path=os.path.join('dir', 'otherdir', 'fakepage.md'),
context=self.context)
content = page.get_content('')
self.assertEqual(content, html)
self.assertLogCountEqual(
count=1,
msg="Replacement Indicator 'unknown' not recognized, "
"skipping replacement",
level=logging.WARNING)
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
content = page.get_content('')
self.assertEqual(content, html)
count_msgs(
count=1,
msg="Replacement Indicator 'unknown' not recognized, "
"skipping replacement",
level=logging.WARNING)
def test_link_to_unknown_file(self):
"{filename} link to unknown file should trigger warning."
html = '<a href="{filename}foo">link</a>'
page = Page(content=html,
metadata={'title': 'fakepage'}, settings=self.settings,
source_path=os.path.join('dir', 'otherdir', 'fakepage.md'),
context=self.context)
content = page.get_content('')
self.assertEqual(content, html)
self.assertLogCountEqual(
count=1,
msg="Unable to find 'foo', skipping url replacement.",
level=logging.WARNING)
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
page = Page(content=html,
metadata={'title': 'fakepage'}, settings=self.settings,
source_path=os.path.join('dir', 'otherdir', 'fakepage.md'),
context=self.context)
content = page.get_content('')
self.assertEqual(content, html)
count_msgs(
count=1,
msg="Unable to find 'foo', skipping url replacement.",
level=logging.WARNING)
def test_index_link_syntax_with_spaces(self):
"""{index} link syntax triggers url replacement

View file

@ -1,82 +1,268 @@
import logging
import unittest
from collections import defaultdict
from contextlib import contextmanager
from pelican import log
from pelican.tests.support import LogCountHandler
class TestLog(unittest.TestCase):
LOG_LVLS = [
logging.DEBUG,
logging.INFO,
logging.WARNING,
logging.ERROR,
logging.CRITICAL
]
# Set this module's logging level to DEBUG to prevent
# tests' loggers effectiveLevels from reaching **root logger**,
# which we *don't* want to play with here.
logging.getLogger(__name__).setLevel(logging.DEBUG)
class _LoggingTestCase(unittest.TestCase):
"""Test Case w/ test-specific loggers, and contexts for temp filters."""
def setUp(self):
super().setUp()
self.logger = logging.getLogger(__name__)
self.handler = LogCountHandler()
self.logger.addHandler(self.handler)
def tearDown(self):
self._reset_limit_filter()
super().tearDown()
def _reset_limit_filter(self):
log.LimitFilter._ignore = set()
log.LimitFilter._raised_messages = set()
log.LimitFilter._threshold = 5
log.LimitFilter._group_count = defaultdict(int)
"Each test should use a unique logger and handle all levels."
self.logger = logging.getLogger(self.id())
self.logger.setLevel(logging.NOTSET) # log all levels
@contextmanager
def reset_logger(self):
try:
yield None
finally:
self._reset_limit_filter()
self.handler.flush()
def temp_filter(self, flt):
"""
Context in which a filter is temporarily applied to test's logger.
"""
with log.temp_filter(self.logger, flt):
yield flt
def test_log_filter(self):
def do_logging():
for i in range(5):
self.logger.warning('Log %s', i)
self.logger.warning('Another log %s', i)
# no filter
with self.reset_logger():
do_logging()
self.assertEqual(
self.handler.count_logs('Log \\d', logging.WARNING),
5)
self.assertEqual(
self.handler.count_logs('Another log \\d', logging.WARNING),
5)
# filter by template
with self.reset_logger():
log.LimitFilter._ignore.add((logging.WARNING, 'Log %s'))
do_logging()
self.assertEqual(
self.handler.count_logs('Log \\d', logging.WARNING),
0)
self.assertEqual(
self.handler.count_logs('Another log \\d', logging.WARNING),
5)
class TestLogLevel(unittest.TestCase):
# filter by exact message
with self.reset_logger():
log.LimitFilter._ignore.add((logging.WARNING, 'Log 3'))
do_logging()
self.assertEqual(
self.handler.count_logs('Log \\d', logging.WARNING),
4)
self.assertEqual(
self.handler.count_logs('Another log \\d', logging.WARNING),
5)
def test_severity_name(self):
self.assertEqual(log.severity_from_name('errors'), logging.ERROR)
self.assertEqual(log.severity_from_name('error'), logging.ERROR)
self.assertEqual(log.severity_from_name('WARNINGS'), logging.WARNING)
self.assertEqual(log.severity_from_name('info'), logging.INFO)
self.assertEqual(log.severity_from_name('INFO'), logging.INFO)
self.assertEqual(log.severity_from_name(''), logging.NOTSET)
self.assertEqual(log.severity_from_name('error'), logging.ERROR)
# filter by both
with self.reset_logger():
log.LimitFilter._ignore.add((logging.WARNING, 'Log 3'))
log.LimitFilter._ignore.add((logging.WARNING, 'Another log %s'))
do_logging()
self.assertEqual(
self.handler.count_logs('Log \\d', logging.WARNING),
4)
self.assertEqual(
self.handler.count_logs('Another log \\d', logging.WARNING),
0)
# add a custom level:
logging.addLevelName(5, "TRACE")
self.assertEqual(log.severity_from_name('trace'), 5)
with self.assertRaises(KeyError):
self.assertEqual(log.severity_from_name("wrong"), logging.NOTSET)
class TestFatal(_LoggingTestCase):
def test_levels(self):
logger = self.logger
with log.temp_handler(self.logger, log.AbortHandler(logging.ERROR)):
logger.debug("Calculating airspeed velocity of an unladen swallow...")
logger.info("Tis but a scratch.")
logger.warning("And now for something completely different.")
with self.assertRaises(log.AbortException):
logger.error("This is a late parrot!")
with self.assertRaises(log.AbortException):
logger.critical("Unexpected: Spanish Inquisition.")
class TestLogInit(_LoggingTestCase):
def test_alt_logger(self):
self.assertEqual(log.set_filters(self.logger), self.logger)
def test_default(self):
null_settings = dict(
LOG_LIMIT_THRESHOLD=None,
LOG_ONCE_LEVEL=None,
)
# assert returned logger is "pelican".
pkg_logger = logging.getLogger('pelican')
self.assertEqual(pkg_logger.filters, [])
self.assertEqual(pkg_logger, log.configure(null_settings))
# because no filtering settings supplied,
# no filters should have been added:
self.assertEqual(pkg_logger.filters, [])
# assert no pre-existing filters on test's logger:
self.assertEqual(log.LimitFilter.get_filters(self.logger), [])
logger = log.configure(dict(LOG_LOGGER=self.logger.name))
# assert returned logger has been set up with a LimitFilter
self.assertNotEqual(log.LimitFilter.get_filters(self.logger), [])
with LogCountHandler.examine(logger) as count_msgs:
msg = 'init with alt logger: %s' % logger
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg) # suppressed for WARNINGS and worse
count_msgs(
sum([1 if lvl >= logging.WARNING # dedupe warnings & above
else 2 # lvls below should not be deduped
for lvl in LOG_LVLS]),
msg
)
def test_dedupe_all(self):
logger = log.set_filters(self.logger, once_lvl=logging.NOTSET) # dedup all
with LogCountHandler.examine(logger) as count_msgs:
msg = 'init with alt logger: %s' % logger
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg) # poss. suppressed
count_msgs(len(LOG_LVLS), msg)
def test_dedupe_none(self):
logger = log.set_filters(self.logger, once_lvl=None)
with LogCountHandler.examine(logger) as count_msgs:
msg = 'init with alt logger: %s' % logger
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg) # do not suppress
count_msgs(len(LOG_LVLS * 2), msg)
class classTestOnceFilter(_LoggingTestCase):
def test_levels(self):
"""
Don't de-duplicate messages for levels below filter's value.
"""
logger = self.logger
msg = "levels_test"
with LogCountHandler.examine(logger) as count_msgs:
with self.temp_filter(log.OnceFilter()): # default=NOTSET: dedup all
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg) # copy should be suppressed
count_msgs(len(LOG_LVLS), msg)
with LogCountHandler.examine(logger) as count_msgs:
with self.temp_filter(log.OnceFilter(logging.WARNING)):
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg) # copy should be suppressed
count_msgs(
sum([1 if lvl >= logging.WARNING # dedupe warnings & above
else 2 # lvls below should not be deduped
for lvl in LOG_LVLS]),
msg
)
def test_deduplication(self):
"""Confirm that duplicate messages are ignored."""
logger = self.logger
# ensure counting is correct before adding filters
msg = "without filter"
with LogCountHandler.examine(logger) as count_msgs:
for level in LOG_LVLS:
logger.log(level, msg)
# count w/ level requirements
count_msgs(1, msg, level=logging.DEBUG)
count_msgs(1, msg, level=logging.INFO)
# count w/ no level requirqement
count_msgs(len(LOG_LVLS), msg)
# add filter, check count
with self.temp_filter(log.OnceFilter()):
msg = "with filter"
with LogCountHandler.examine(logger) as count_msgs:
for level in LOG_LVLS:
logger.log(level, msg)
logger.log(level, msg)
logger.log(level, msg)
# counting w/ level requirements
for level in LOG_LVLS:
count_msgs(1, msg, level=level)
# counting w/ no level requirement
count_msgs(len(LOG_LVLS), msg)
# check counting again after filter removed
msg = "removed filter"
with LogCountHandler.examine(logger) as count_msgs:
for level in LOG_LVLS:
logger.log(level, msg)
count_msgs(5, msg)
class TestLimitFilter(_LoggingTestCase):
def setUp(self):
super().setUp()
self.logger.setLevel(logging.DEBUG)
def test_get_filters(self):
"""Assert LimitFilters on a logger are collected."""
fltrs = [log.LimitFilter() for x in range(5)]
assert not self.logger.filters
for each in fltrs:
self.logger.addFilter(each)
self.assertEqual(fltrs, log.LimitFilter.get_filters(self.logger))
for each in fltrs:
self.logger.removeFilter(each)
assert not self.logger.filters
def test_threshold(self):
"""
Assert that the `limit_msg` activates after the threshold is reached.
"""
threshold = 4
with self.temp_filter(log.LimitFilter(threshold)):
msg = "Threshold: %s"
limits = {'limit_msg': "Threshold exceeded"}
with LogCountHandler.examine(self.logger) as count_msgs:
for i in range(1, threshold + 3):
self.logger.warning(msg, i, extra=limits)
# up to threshold
count_msgs(threshold - 1, "Threshold: [1-9+]", as_regex=True)
# exactly one "limit_msg"
count_msgs(1, limits['limit_msg'])
# total count should be == threshold, all in all.
count_msgs(threshold)
def test_limit_args(self):
"""
`limit_args` should be used in the `limit_msg`.
"""
threshold = 3
with self.temp_filter(log.LimitFilter(threshold)):
limits = {'limit_msg': "Threshold exceeded @ %s %s",
'limit_args': ('count of', threshold)}
with LogCountHandler.examine(self.logger) as count_msgs:
for i in range(threshold + 1):
self.logger.warning("Threshold: %s", i, extra=limits)
count_msgs(1, limits['limit_msg'] % limits['limit_args'])
def test_ignore(self):
"""Confirm blacklisted messages (eg. from config) are not emitted."""
blacklist = [
"SPAM SPAM SPAM",
"I'll have your spam. I love it."
]
with self.temp_filter(log.BlacklistFilter()) as flt:
for template in blacklist:
flt.ignore(logging.WARNING, template)
with LogCountHandler.examine(self.logger) as count_msgs:
count_msgs(0)
self.logger.warning(blacklist[0]) # ignored
self.logger.warning(blacklist[0]) # ignored
self.logger.warning("I don't like spam!") # 1
self.logger.warning("Sshh, dear, don't cause a fuss.") # 2
self.logger.warning(blacklist[1]) # ignored
self.logger.info(blacklist[1]) # 3 : diff. level (below)
self.logger.error(blacklist[1]) # 4 : diff. level (above)
count_msgs(4)

View file

@ -12,7 +12,7 @@ from pelican import Pelican
from pelican.generators import StaticGenerator
from pelican.settings import read_settings
from pelican.tests.support import (
LoggedTestCase,
LogCountHandler,
diff_subproc,
locale_available,
mute,
@ -20,8 +20,9 @@ from pelican.tests.support import (
)
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
SAMPLES_PATH = os.path.abspath(os.path.join(
CURRENT_DIR, os.pardir, os.pardir, 'samples'))
SAMPLES_PATH = os.path.abspath(
os.path.join(CURRENT_DIR, os.pardir, os.pardir, 'samples')
)
OUTPUT_PATH = os.path.abspath(os.path.join(CURRENT_DIR, 'output'))
INPUT_PATH = os.path.join(SAMPLES_PATH, "content")
@ -41,7 +42,7 @@ def recursiveDiff(dcmp):
return diff
class TestPelican(LoggedTestCase):
class TestPelican(unittest.TestCase):
# general functional testing for pelican. Basically, this test case tries
# to run pelican in different situations and see how it behaves
@ -88,7 +89,8 @@ class TestPelican(LoggedTestCase):
"StaticGenerator must be the last generator, but it isn't!")
self.assertIsInstance(
generator_classes, Sequence,
"_get_generator_classes() must return a Sequence to preserve order")
"_get_generator_classes() must return a Sequence to preserve order"
)
@skipIfNoExecutable(['git', '--version'])
def test_basic_generation_works(self):
@ -101,14 +103,19 @@ class TestPelican(LoggedTestCase):
'LOCALE': locale.normalize('en_US'),
})
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
mute(True)(pelican.run)()
count_msgs(
count=1,
msg="Unable to find .* skipping url replacement",
as_regex=True,
level=logging.WARNING
)
self.assertDirsEqual(
self.temp_path, os.path.join(OUTPUT_PATH, 'basic')
)
self.assertLogCountEqual(
count=1,
msg="Unable to find.*skipping url replacement",
level=logging.WARNING)
@skipIfNoExecutable(['git', '--version'])
def test_custom_generation_works(self):
@ -198,16 +205,16 @@ class TestPelican(LoggedTestCase):
],
'LOCALE': locale.normalize('en_US'),
})
pelican = Pelican(settings=settings)
logger = logging.getLogger()
orig_level = logger.getEffectiveLevel()
logger = logging.getLogger("pelican.writers")
logger.setLevel(logging.INFO)
mute(True)(pelican.run)()
logger.setLevel(orig_level)
self.assertLogCountEqual(
count=2,
msg="Writing .*",
level=logging.INFO)
with LogCountHandler.examine(logger) as count_msgs:
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
count_msgs(count=2,
msg="Writing .*",
as_regex=True,
level=logging.INFO)
def test_cyclic_intersite_links_no_warnings(self):
settings = read_settings(path=None, override={
@ -215,37 +222,46 @@ class TestPelican(LoggedTestCase):
'OUTPUT_PATH': self.temp_path,
'CACHE_PATH': self.temp_cache,
})
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
# There are four different intersite links:
# - one pointing to the second article from first and third
# - one pointing to the first article from second and third
# - one pointing to the third article from first and second
# - one pointing to a nonexistent from each
# If everything goes well, only the warning about the nonexistent
# article should be printed. Only two articles are not sufficient,
# since the first will always have _context['generated_content'] empty
# (thus skipping the link resolving) and the second will always have it
# non-empty, containing the first, thus always succeeding.
self.assertLogCountEqual(
count=1,
msg="Unable to find '.*\\.rst', skipping url replacement.",
level=logging.WARNING)
with LogCountHandler.examine(
logging.getLogger("pelican.contents")
) as count_msgs:
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
# There are four different intersite links:
# - one pointing to the second article from first and third
# - one pointing to the first article from second and third
# - one pointing to the third article from first and second
# - one pointing to a nonexistent from each If everything goes
# well, only the warning about the nonexistent article should be
# printed. Only two articles are not sufficient, since the first
# will always have _context['generated_content'] empty (thus
# skipping the link resolving) and the second will always have it
# non-empty, containing the first, thus always succeeding.
count_msgs(
count=3,
msg="Unable to find '.*[.]rst', skipping url replacement.",
as_regex=True,
level=logging.WARNING
)
def test_md_extensions_deprecation(self):
"""Test that a warning is issued if MD_EXTENSIONS is used"""
settings = read_settings(path=None, override={
'PATH': INPUT_PATH,
'OUTPUT_PATH': self.temp_path,
'CACHE_PATH': self.temp_cache,
'MD_EXTENSIONS': {},
})
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
self.assertLogCountEqual(
count=1,
msg="MD_EXTENSIONS is deprecated use MARKDOWN instead.",
level=logging.WARNING)
with LogCountHandler.examine(
logging.getLogger("pelican.settings")
) as count_msgs:
settings = read_settings(path=None, override={
'PATH': INPUT_PATH,
'OUTPUT_PATH': self.temp_path,
'CACHE_PATH': self.temp_cache,
'MD_EXTENSIONS': {},
})
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
count_msgs(
count=1,
msg="MD_EXTENSIONS is deprecated use MARKDOWN instead.",
level=logging.WARNING
)
def test_parse_errors(self):
# Verify that just an error is printed and the application doesn't
@ -255,12 +271,17 @@ class TestPelican(LoggedTestCase):
'OUTPUT_PATH': self.temp_path,
'CACHE_PATH': self.temp_cache,
})
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
self.assertLogCountEqual(
count=1,
msg="Could not process .*parse_error.rst",
level=logging.ERROR)
with LogCountHandler.examine(
logging.getLogger("pelican.generators")
) as count_msgs:
pelican = Pelican(settings=settings)
mute(True)(pelican.run)()
count_msgs(
count=1,
msg="Could not process .*parse_error.rst",
as_regex=True,
level=logging.ERROR
)
def test_module_load(self):
"""Test loading via python -m pelican --help displays the help"""

View file

@ -1,5 +1,6 @@
import copy
import locale
import logging
import os
from os.path import abspath, dirname, join
@ -16,6 +17,7 @@ class TestSettingsConfiguration(unittest.TestCase):
append new values to the settings (if any), and apply basic settings
optimizations.
"""
def setUp(self):
self.old_locale = locale.setlocale(locale.LC_ALL)
locale.setlocale(locale.LC_ALL, 'C')
@ -304,3 +306,19 @@ class TestSettingsConfiguration(unittest.TestCase):
[(r'C\+\+', 'cpp')] +
self.settings['SLUG_REGEX_SUBSTITUTIONS'])
self.assertNotIn('SLUG_SUBSTITUTIONS', settings)
def test_log_filter_setting(self):
read_settings(None, override=dict(
LOG_FILTER=[
(logging.INFO, "Ignored Info message."),
(logging.WARNING, "Ignored Warning message."),
(logging.ERROR, "Ignored Error message.")
]
))
with self.assertRaises(ValueError):
read_settings(None, override=dict(
LOG_FILTER=[
("Wrong order.", logging.ERROR)
]
))

View file

@ -2,6 +2,7 @@ import locale
import logging
import os
import shutil
import unittest
from datetime import timezone
from sys import platform
from tempfile import mkdtemp
@ -11,15 +12,19 @@ try:
except ModuleNotFoundError:
from backports.zoneinfo import ZoneInfo
from pelican import log
from pelican import utils
from pelican.generators import TemplatePagesGenerator
from pelican.settings import read_settings
from pelican.tests.support import (LoggedTestCase, get_article,
locale_available, unittest)
from pelican.tests.support import (
LogCountHandler,
get_article,
locale_available
)
from pelican.writers import Writer
class TestUtils(LoggedTestCase):
class TestUtils(unittest.TestCase):
_new_attribute = 'new_value'
def setUp(self):
@ -37,13 +42,17 @@ class TestUtils(LoggedTestCase):
return None
def test_deprecated_attribute(self):
value = self._old_attribute
self.assertEqual(value, self._new_attribute)
self.assertLogCountEqual(
count=1,
msg=('_old_attribute has been deprecated since 3.1.0 and will be '
'removed by version 4.1.3. Use _new_attribute instead'),
level=logging.WARNING)
with LogCountHandler.examine(log.PKG_LOGGER) as count_msgs:
value = self._old_attribute
self.assertEqual(value, self._new_attribute)
count_msgs(
count=1,
msg=(
'_old_attribute has been deprecated since 3.1.0 and will '
'be removed by version 4.1.3. Use _new_attribute instead'
),
level=logging.WARNING
)
def test_get_date(self):
# valid ones
@ -269,9 +278,10 @@ class TestUtils(LoggedTestCase):
'<p>' + 'word ' * 20 + '<span>marker</span></p>')
self.assertEqual(
utils.truncate_html_words(
'<span\nstyle="\n\n">' + 'word ' * 100 + '</span>', 20,
'<span>marker</span>'),
'<span\nstyle="\n\n">' + 'word ' * 20 + '<span>marker</span></span>')
'<span\nstyle="\n\n">' + 'word ' * 100 + '</span>', 20,
'<span>marker</span>'),
'<span\nstyle="\n\n">' + 'word ' * 20 + '<span>marker</span></span>'
)
self.assertEqual(
utils.truncate_html_words('<br>' + 'word ' * 100, 20,
'<span>marker</span>'),