forked from github/pelican
Drop duplicates logs. Allow for logs to be grouped, enforcing a maximum number of logs per group. Add the LOG_FILTER setting to ask from the configuration file to ignore some logs (of level up to warning).
144 lines
3.8 KiB
Python
144 lines
3.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import unicode_literals, print_function
|
|
|
|
__all__ = [
|
|
'init'
|
|
]
|
|
|
|
import os
|
|
import sys
|
|
import logging
|
|
|
|
from collections import defaultdict
|
|
|
|
|
|
RESET_TERM = '\033[0;m'
|
|
|
|
COLOR_CODES = {
|
|
'red': 31,
|
|
'yellow': 33,
|
|
'cyan': 36,
|
|
'white': 37,
|
|
'bgred': 41,
|
|
'bggrey': 100,
|
|
}
|
|
|
|
|
|
def ansi(color, text):
|
|
"""Wrap text in an ansi escape sequence"""
|
|
code = COLOR_CODES[color]
|
|
return '\033[1;{0}m{1}{2}'.format(code, text, RESET_TERM)
|
|
|
|
|
|
class ANSIFormatter(logging.Formatter):
|
|
"""Convert a `logging.LogRecord' object into colored text, using ANSI
|
|
escape sequences.
|
|
|
|
"""
|
|
def format(self, record):
|
|
msg = record.getMessage()
|
|
if record.levelname == 'INFO':
|
|
return ansi('cyan', '-> ') + msg
|
|
elif record.levelname == 'WARNING':
|
|
return ansi('yellow', record.levelname) + ': ' + msg
|
|
elif record.levelname == 'ERROR':
|
|
return ansi('red', record.levelname) + ': ' + msg
|
|
elif record.levelname == 'CRITICAL':
|
|
return ansi('bgred', record.levelname) + ': ' + msg
|
|
elif record.levelname == 'DEBUG':
|
|
return ansi('bggrey', record.levelname) + ': ' + msg
|
|
else:
|
|
return ansi('white', record.levelname) + ': ' + msg
|
|
|
|
|
|
class TextFormatter(logging.Formatter):
|
|
"""
|
|
Convert a `logging.LogRecord' object into text.
|
|
"""
|
|
|
|
def format(self, record):
|
|
if not record.levelname or record.levelname == 'INFO':
|
|
return record.getMessage()
|
|
else:
|
|
return record.levelname + ': ' + record.getMessage()
|
|
|
|
|
|
class LimitFilter(logging.Filter):
|
|
"""
|
|
Remove duplicates records, and limit the number of records in the same
|
|
group.
|
|
|
|
Groups are specified by the message to use when the number of records in
|
|
the same group hit the limit.
|
|
E.g.: log.warning(('43 is not the answer', 'More erroneous answers'))
|
|
"""
|
|
|
|
ignore = set()
|
|
threshold = 5
|
|
group_count = defaultdict(int)
|
|
|
|
def filter(self, record):
|
|
# don't limit levels over warnings
|
|
if record.levelno > logging.WARN:
|
|
return record
|
|
# extract group
|
|
group = None
|
|
if len(record.msg) == 2:
|
|
record.msg, group = record.msg
|
|
# ignore record if it was already raised
|
|
# use .getMessage() and not .msg for string formatting
|
|
ignore_key = (record.levelno, record.getMessage())
|
|
to_ignore = ignore_key in LimitFilter.ignore
|
|
LimitFilter.ignore.add(ignore_key)
|
|
if to_ignore:
|
|
return False
|
|
# check if we went over threshold
|
|
if group:
|
|
key = (record.levelno, group)
|
|
LimitFilter.group_count[key] += 1
|
|
if LimitFilter.group_count[key] == LimitFilter.threshold:
|
|
record.msg = group
|
|
if LimitFilter.group_count[key] > LimitFilter.threshold:
|
|
return False
|
|
return record
|
|
|
|
|
|
class LimitLogger(logging.Logger):
|
|
"""
|
|
A logger which add LimitFilter automatically
|
|
"""
|
|
|
|
limit_filter = LimitFilter()
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(LimitLogger, self).__init__(*args, **kwargs)
|
|
self.addFilter(LimitLogger.limit_filter)
|
|
|
|
logging.setLoggerClass(LimitLogger)
|
|
|
|
|
|
def init(level=None, handler=logging.StreamHandler()):
|
|
|
|
logger = logging.getLogger()
|
|
|
|
if (os.isatty(sys.stdout.fileno())
|
|
and not sys.platform.startswith('win')):
|
|
fmt = ANSIFormatter()
|
|
else:
|
|
fmt = TextFormatter()
|
|
handler.setFormatter(fmt)
|
|
logger.addHandler(handler)
|
|
|
|
if level:
|
|
logger.setLevel(level)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
init(level=logging.DEBUG)
|
|
|
|
root_logger = logging.getLogger()
|
|
root_logger.debug('debug')
|
|
root_logger.info('info')
|
|
root_logger.warning('warning')
|
|
root_logger.error('error')
|
|
root_logger.critical('critical')
|