1
0
Fork 0
forked from github/pelican

Apply code style to project via: ruff format .

This commit is contained in:
Chris Rose 2023-10-29 22:18:29 +01:00 committed by Justin Mayer
commit cabdb26cee
41 changed files with 6505 additions and 5163 deletions

View file

@ -9,19 +9,25 @@ import sys
import time
import traceback
from collections.abc import Iterable
# Combines all paths to `pelican` package accessible from `sys.path`
# Makes it possible to install `pelican` and namespace plugins into different
# locations in the file system (e.g. pip with `-e` or `--user`)
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# pelican.log has to be the first pelican module to be loaded
# because logging.setLoggerClass has to be called before logging.getLogger
from pelican.log import console
from pelican.log import init as init_logging
from pelican.generators import (ArticlesGenerator, # noqa: I100
PagesGenerator, SourceFileGenerator,
StaticGenerator, TemplatePagesGenerator)
from pelican.generators import (
ArticlesGenerator, # noqa: I100
PagesGenerator,
SourceFileGenerator,
StaticGenerator,
TemplatePagesGenerator,
)
from pelican.plugins import signals
from pelican.plugins._utils import get_plugin_name, load_plugins
from pelican.readers import Readers
@ -35,12 +41,11 @@ try:
except Exception:
__version__ = "unknown"
DEFAULT_CONFIG_NAME = 'pelicanconf.py'
DEFAULT_CONFIG_NAME = "pelicanconf.py"
logger = logging.getLogger(__name__)
class Pelican:
def __init__(self, settings):
"""Pelican initialization
@ -50,35 +55,34 @@ class Pelican:
# define the default settings
self.settings = settings
self.path = settings['PATH']
self.theme = settings['THEME']
self.output_path = settings['OUTPUT_PATH']
self.ignore_files = settings['IGNORE_FILES']
self.delete_outputdir = settings['DELETE_OUTPUT_DIRECTORY']
self.output_retention = settings['OUTPUT_RETENTION']
self.path = settings["PATH"]
self.theme = settings["THEME"]
self.output_path = settings["OUTPUT_PATH"]
self.ignore_files = settings["IGNORE_FILES"]
self.delete_outputdir = settings["DELETE_OUTPUT_DIRECTORY"]
self.output_retention = settings["OUTPUT_RETENTION"]
self.init_path()
self.init_plugins()
signals.initialized.send(self)
def init_path(self):
if not any(p in sys.path for p in ['', os.curdir]):
if not any(p in sys.path for p in ["", os.curdir]):
logger.debug("Adding current directory to system path")
sys.path.insert(0, '')
sys.path.insert(0, "")
def init_plugins(self):
self.plugins = []
for plugin in load_plugins(self.settings):
name = get_plugin_name(plugin)
logger.debug('Registering plugin `%s`', name)
logger.debug("Registering plugin `%s`", name)
try:
plugin.register()
self.plugins.append(plugin)
except Exception as e:
logger.error('Cannot register plugin `%s`\n%s',
name, e)
logger.error("Cannot register plugin `%s`\n%s", name, e)
self.settings['PLUGINS'] = [get_plugin_name(p) for p in self.plugins]
self.settings["PLUGINS"] = [get_plugin_name(p) for p in self.plugins]
def run(self):
"""Run the generators and return"""
@ -87,10 +91,10 @@ class Pelican:
context = self.settings.copy()
# Share these among all the generators and content objects
# They map source paths to Content objects or None
context['generated_content'] = {}
context['static_links'] = set()
context['static_content'] = {}
context['localsiteurl'] = self.settings['SITEURL']
context["generated_content"] = {}
context["static_links"] = set()
context["static_content"] = {}
context["localsiteurl"] = self.settings["SITEURL"]
generators = [
cls(
@ -99,23 +103,25 @@ class Pelican:
path=self.path,
theme=self.theme,
output_path=self.output_path,
) for cls in self._get_generator_classes()
)
for cls in self._get_generator_classes()
]
# Delete the output directory if (1) the appropriate setting is True
# and (2) that directory is not the parent of the source directory
if (self.delete_outputdir
and os.path.commonpath([os.path.realpath(self.output_path)]) !=
os.path.commonpath([os.path.realpath(self.output_path),
os.path.realpath(self.path)])):
if self.delete_outputdir and os.path.commonpath(
[os.path.realpath(self.output_path)]
) != os.path.commonpath(
[os.path.realpath(self.output_path), os.path.realpath(self.path)]
):
clean_output_dir(self.output_path, self.output_retention)
for p in generators:
if hasattr(p, 'generate_context'):
if hasattr(p, "generate_context"):
p.generate_context()
for p in generators:
if hasattr(p, 'refresh_metadata_intersite_links'):
if hasattr(p, "refresh_metadata_intersite_links"):
p.refresh_metadata_intersite_links()
signals.all_generators_finalized.send(generators)
@ -123,61 +129,75 @@ class Pelican:
writer = self._get_writer()
for p in generators:
if hasattr(p, 'generate_output'):
if hasattr(p, "generate_output"):
p.generate_output(writer)
signals.finalized.send(self)
articles_generator = next(g for g in generators
if isinstance(g, ArticlesGenerator))
pages_generator = next(g for g in generators
if isinstance(g, PagesGenerator))
articles_generator = next(
g for g in generators if isinstance(g, ArticlesGenerator)
)
pages_generator = next(g for g in generators if isinstance(g, PagesGenerator))
pluralized_articles = maybe_pluralize(
(len(articles_generator.articles) +
len(articles_generator.translations)),
'article',
'articles')
(len(articles_generator.articles) + len(articles_generator.translations)),
"article",
"articles",
)
pluralized_drafts = maybe_pluralize(
(len(articles_generator.drafts) +
len(articles_generator.drafts_translations)),
'draft',
'drafts')
(
len(articles_generator.drafts)
+ len(articles_generator.drafts_translations)
),
"draft",
"drafts",
)
pluralized_hidden_articles = maybe_pluralize(
(len(articles_generator.hidden_articles) +
len(articles_generator.hidden_translations)),
'hidden article',
'hidden articles')
(
len(articles_generator.hidden_articles)
+ len(articles_generator.hidden_translations)
),
"hidden article",
"hidden articles",
)
pluralized_pages = maybe_pluralize(
(len(pages_generator.pages) +
len(pages_generator.translations)),
'page',
'pages')
(len(pages_generator.pages) + len(pages_generator.translations)),
"page",
"pages",
)
pluralized_hidden_pages = maybe_pluralize(
(len(pages_generator.hidden_pages) +
len(pages_generator.hidden_translations)),
'hidden page',
'hidden pages')
(
len(pages_generator.hidden_pages)
+ len(pages_generator.hidden_translations)
),
"hidden page",
"hidden pages",
)
pluralized_draft_pages = maybe_pluralize(
(len(pages_generator.draft_pages) +
len(pages_generator.draft_translations)),
'draft page',
'draft pages')
(
len(pages_generator.draft_pages)
+ len(pages_generator.draft_translations)
),
"draft page",
"draft pages",
)
console.print('Done: Processed {}, {}, {}, {}, {} and {} in {:.2f} seconds.'
.format(
pluralized_articles,
pluralized_drafts,
pluralized_hidden_articles,
pluralized_pages,
pluralized_hidden_pages,
pluralized_draft_pages,
time.time() - start_time))
console.print(
"Done: Processed {}, {}, {}, {}, {} and {} in {:.2f} seconds.".format(
pluralized_articles,
pluralized_drafts,
pluralized_hidden_articles,
pluralized_pages,
pluralized_hidden_pages,
pluralized_draft_pages,
time.time() - start_time,
)
)
def _get_generator_classes(self):
discovered_generators = [
(ArticlesGenerator, "internal"),
(PagesGenerator, "internal")
(PagesGenerator, "internal"),
]
if self.settings["TEMPLATE_PAGES"]:
@ -236,7 +256,7 @@ class PrintSettings(argparse.Action):
except Exception as e:
logger.critical("%s: %s", e.__class__.__name__, e)
console.print_exception()
sys.exit(getattr(e, 'exitcode', 1))
sys.exit(getattr(e, "exitcode", 1))
if values:
# One or more arguments provided, so only print those settings
@ -244,14 +264,16 @@ class PrintSettings(argparse.Action):
if setting in settings:
# Only add newline between setting name and value if dict
if isinstance(settings[setting], (dict, tuple, list)):
setting_format = '\n{}:\n{}'
setting_format = "\n{}:\n{}"
else:
setting_format = '\n{}: {}'
console.print(setting_format.format(
setting,
pprint.pformat(settings[setting])))
setting_format = "\n{}: {}"
console.print(
setting_format.format(
setting, pprint.pformat(settings[setting])
)
)
else:
console.print('\n{} is not a recognized setting.'.format(setting))
console.print("\n{} is not a recognized setting.".format(setting))
break
else:
# No argument was given to --print-settings, so print all settings
@ -268,170 +290,258 @@ class ParseOverrides(argparse.Action):
k, v = item.split("=", 1)
except ValueError:
raise ValueError(
'Extra settings must be specified as KEY=VALUE pairs '
f'but you specified {item}'
"Extra settings must be specified as KEY=VALUE pairs "
f"but you specified {item}"
)
try:
overrides[k] = json.loads(v)
except json.decoder.JSONDecodeError:
raise ValueError(
f'Invalid JSON value: {v}. '
'Values specified via -e / --extra-settings flags '
'must be in JSON notation. '
'Use -e KEY=\'"string"\' to specify a string value; '
'-e KEY=null to specify None; '
'-e KEY=false (or true) to specify False (or True).'
f"Invalid JSON value: {v}. "
"Values specified via -e / --extra-settings flags "
"must be in JSON notation. "
"Use -e KEY='\"string\"' to specify a string value; "
"-e KEY=null to specify None; "
"-e KEY=false (or true) to specify False (or True)."
)
setattr(namespace, self.dest, overrides)
def parse_arguments(argv=None):
parser = argparse.ArgumentParser(
description='A tool to generate a static blog, '
' with restructured text input files.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
description="A tool to generate a static blog, "
" with restructured text input files.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(dest='path', nargs='?',
help='Path where to find the content files.',
default=None)
parser.add_argument(
dest="path",
nargs="?",
help="Path where to find the content files.",
default=None,
)
parser.add_argument('-t', '--theme-path', dest='theme',
help='Path where to find the theme templates. If not '
'specified, it will use the default one included with '
'pelican.')
parser.add_argument(
"-t",
"--theme-path",
dest="theme",
help="Path where to find the theme templates. If not "
"specified, it will use the default one included with "
"pelican.",
)
parser.add_argument('-o', '--output', dest='output',
help='Where to output the generated files. If not '
'specified, a directory will be created, named '
'"output" in the current path.')
parser.add_argument(
"-o",
"--output",
dest="output",
help="Where to output the generated files. If not "
"specified, a directory will be created, named "
'"output" in the current path.',
)
parser.add_argument('-s', '--settings', dest='settings',
help='The settings of the application, this is '
'automatically set to {} if a file exists with this '
'name.'.format(DEFAULT_CONFIG_NAME))
parser.add_argument(
"-s",
"--settings",
dest="settings",
help="The settings of the application, this is "
"automatically set to {} if a file exists with this "
"name.".format(DEFAULT_CONFIG_NAME),
)
parser.add_argument('-d', '--delete-output-directory',
dest='delete_outputdir', action='store_true',
default=None, help='Delete the output directory.')
parser.add_argument(
"-d",
"--delete-output-directory",
dest="delete_outputdir",
action="store_true",
default=None,
help="Delete the output directory.",
)
parser.add_argument('-v', '--verbose', action='store_const',
const=logging.INFO, dest='verbosity',
help='Show all messages.')
parser.add_argument(
"-v",
"--verbose",
action="store_const",
const=logging.INFO,
dest="verbosity",
help="Show all messages.",
)
parser.add_argument('-q', '--quiet', action='store_const',
const=logging.CRITICAL, dest='verbosity',
help='Show only critical errors.')
parser.add_argument(
"-q",
"--quiet",
action="store_const",
const=logging.CRITICAL,
dest="verbosity",
help="Show only critical errors.",
)
parser.add_argument('-D', '--debug', action='store_const',
const=logging.DEBUG, dest='verbosity',
help='Show all messages, including debug messages.')
parser.add_argument(
"-D",
"--debug",
action="store_const",
const=logging.DEBUG,
dest="verbosity",
help="Show all messages, including debug messages.",
)
parser.add_argument('--version', action='version', version=__version__,
help='Print the pelican version and exit.')
parser.add_argument(
"--version",
action="version",
version=__version__,
help="Print the pelican version and exit.",
)
parser.add_argument('-r', '--autoreload', dest='autoreload',
action='store_true',
help='Relaunch pelican each time a modification occurs'
' on the content files.')
parser.add_argument(
"-r",
"--autoreload",
dest="autoreload",
action="store_true",
help="Relaunch pelican each time a modification occurs"
" on the content files.",
)
parser.add_argument('--print-settings', dest='print_settings', nargs='*',
action=PrintSettings, metavar='SETTING_NAME',
help='Print current configuration settings and exit. '
'Append one or more setting name arguments to see the '
'values for specific settings only.')
parser.add_argument(
"--print-settings",
dest="print_settings",
nargs="*",
action=PrintSettings,
metavar="SETTING_NAME",
help="Print current configuration settings and exit. "
"Append one or more setting name arguments to see the "
"values for specific settings only.",
)
parser.add_argument('--relative-urls', dest='relative_paths',
action='store_true',
help='Use relative urls in output, '
'useful for site development')
parser.add_argument(
"--relative-urls",
dest="relative_paths",
action="store_true",
help="Use relative urls in output, " "useful for site development",
)
parser.add_argument('--cache-path', dest='cache_path',
help=('Directory in which to store cache files. '
'If not specified, defaults to "cache".'))
parser.add_argument(
"--cache-path",
dest="cache_path",
help=(
"Directory in which to store cache files. "
'If not specified, defaults to "cache".'
),
)
parser.add_argument('--ignore-cache', action='store_true',
dest='ignore_cache', help='Ignore content cache '
'from previous runs by not loading cache files.')
parser.add_argument(
"--ignore-cache",
action="store_true",
dest="ignore_cache",
help="Ignore content cache " "from previous runs by not loading cache files.",
)
parser.add_argument('-w', '--write-selected', type=str,
dest='selected_paths', default=None,
help='Comma separated list of selected paths to write')
parser.add_argument(
"-w",
"--write-selected",
type=str,
dest="selected_paths",
default=None,
help="Comma separated list of selected paths to write",
)
parser.add_argument('--fatal', metavar='errors|warnings',
choices=('errors', 'warnings'), default='',
help=('Exit the program with non-zero status if any '
'errors/warnings encountered.'))
parser.add_argument(
"--fatal",
metavar="errors|warnings",
choices=("errors", "warnings"),
default="",
help=(
"Exit the program with non-zero status if any "
"errors/warnings encountered."
),
)
parser.add_argument('--logs-dedup-min-level', default='WARNING',
choices=('DEBUG', 'INFO', 'WARNING', 'ERROR'),
help=('Only enable log de-duplication for levels equal'
' to or above the specified value'))
parser.add_argument(
"--logs-dedup-min-level",
default="WARNING",
choices=("DEBUG", "INFO", "WARNING", "ERROR"),
help=(
"Only enable log de-duplication for levels equal"
" to or above the specified value"
),
)
parser.add_argument('-l', '--listen', dest='listen', action='store_true',
help='Serve content files via HTTP and port 8000.')
parser.add_argument(
"-l",
"--listen",
dest="listen",
action="store_true",
help="Serve content files via HTTP and port 8000.",
)
parser.add_argument('-p', '--port', dest='port', type=int,
help='Port to serve HTTP files at. (default: 8000)')
parser.add_argument(
"-p",
"--port",
dest="port",
type=int,
help="Port to serve HTTP files at. (default: 8000)",
)
parser.add_argument('-b', '--bind', dest='bind',
help='IP to bind to when serving files via HTTP '
'(default: 127.0.0.1)')
parser.add_argument(
"-b",
"--bind",
dest="bind",
help="IP to bind to when serving files via HTTP " "(default: 127.0.0.1)",
)
parser.add_argument('-e', '--extra-settings', dest='overrides',
help='Specify one or more SETTING=VALUE pairs to '
'override settings. VALUE must be in JSON notation: '
'specify string values as SETTING=\'"some string"\'; '
'booleans as SETTING=true or SETTING=false; '
'None as SETTING=null.',
nargs='*',
action=ParseOverrides,
default={})
parser.add_argument(
"-e",
"--extra-settings",
dest="overrides",
help="Specify one or more SETTING=VALUE pairs to "
"override settings. VALUE must be in JSON notation: "
"specify string values as SETTING='\"some string\"'; "
"booleans as SETTING=true or SETTING=false; "
"None as SETTING=null.",
nargs="*",
action=ParseOverrides,
default={},
)
args = parser.parse_args(argv)
if args.port is not None and not args.listen:
logger.warning('--port without --listen has no effect')
logger.warning("--port without --listen has no effect")
if args.bind is not None and not args.listen:
logger.warning('--bind without --listen has no effect')
logger.warning("--bind without --listen has no effect")
return args
def get_config(args):
"""Builds a config dictionary based on supplied `args`.
"""
"""Builds a config dictionary based on supplied `args`."""
config = {}
if args.path:
config['PATH'] = os.path.abspath(os.path.expanduser(args.path))
config["PATH"] = os.path.abspath(os.path.expanduser(args.path))
if args.output:
config['OUTPUT_PATH'] = \
os.path.abspath(os.path.expanduser(args.output))
config["OUTPUT_PATH"] = os.path.abspath(os.path.expanduser(args.output))
if args.theme:
abstheme = os.path.abspath(os.path.expanduser(args.theme))
config['THEME'] = abstheme if os.path.exists(abstheme) else args.theme
config["THEME"] = abstheme if os.path.exists(abstheme) else args.theme
if args.delete_outputdir is not None:
config['DELETE_OUTPUT_DIRECTORY'] = args.delete_outputdir
config["DELETE_OUTPUT_DIRECTORY"] = args.delete_outputdir
if args.ignore_cache:
config['LOAD_CONTENT_CACHE'] = False
config["LOAD_CONTENT_CACHE"] = False
if args.cache_path:
config['CACHE_PATH'] = args.cache_path
config["CACHE_PATH"] = args.cache_path
if args.selected_paths:
config['WRITE_SELECTED'] = args.selected_paths.split(',')
config["WRITE_SELECTED"] = args.selected_paths.split(",")
if args.relative_paths:
config['RELATIVE_URLS'] = args.relative_paths
config["RELATIVE_URLS"] = args.relative_paths
if args.port is not None:
config['PORT'] = args.port
config["PORT"] = args.port
if args.bind is not None:
config['BIND'] = args.bind
config['DEBUG'] = args.verbosity == logging.DEBUG
config["BIND"] = args.bind
config["DEBUG"] = args.verbosity == logging.DEBUG
config.update(args.overrides)
return config
def get_instance(args):
config_file = args.settings
if config_file is None and os.path.isfile(DEFAULT_CONFIG_NAME):
config_file = DEFAULT_CONFIG_NAME
@ -439,9 +549,9 @@ def get_instance(args):
settings = read_settings(config_file, override=get_config(args))
cls = settings['PELICAN_CLASS']
cls = settings["PELICAN_CLASS"]
if isinstance(cls, str):
module, cls_name = cls.rsplit('.', 1)
module, cls_name = cls.rsplit(".", 1)
module = __import__(module)
cls = getattr(module, cls_name)
@ -449,8 +559,10 @@ def get_instance(args):
def autoreload(args, excqueue=None):
console.print(' --- AutoReload Mode: Monitoring `content`, `theme` and'
' `settings` for changes. ---')
console.print(
" --- AutoReload Mode: Monitoring `content`, `theme` and"
" `settings` for changes. ---"
)
pelican, settings = get_instance(args)
settings_file = os.path.abspath(args.settings)
while True:
@ -463,8 +575,9 @@ def autoreload(args, excqueue=None):
if settings_file in changed_files:
pelican, settings = get_instance(args)
console.print('\n-> Modified: {}. re-generating...'.format(
', '.join(changed_files)))
console.print(
"\n-> Modified: {}. re-generating...".format(", ".join(changed_files))
)
except KeyboardInterrupt:
if excqueue is not None:
@ -473,15 +586,14 @@ def autoreload(args, excqueue=None):
raise
except Exception as e:
if (args.verbosity == logging.DEBUG):
if args.verbosity == logging.DEBUG:
if excqueue is not None:
excqueue.put(
traceback.format_exception_only(type(e), e)[-1])
excqueue.put(traceback.format_exception_only(type(e), e)[-1])
else:
raise
logger.warning(
'Caught exception:\n"%s".', e,
exc_info=settings.get('DEBUG', False))
'Caught exception:\n"%s".', e, exc_info=settings.get("DEBUG", False)
)
def listen(server, port, output, excqueue=None):
@ -491,8 +603,7 @@ def listen(server, port, output, excqueue=None):
RootedHTTPServer.allow_reuse_address = True
try:
httpd = RootedHTTPServer(
output, (server, port), ComplexHTTPRequestHandler)
httpd = RootedHTTPServer(output, (server, port), ComplexHTTPRequestHandler)
except OSError as e:
logging.error("Could not listen on port %s, server %s.", port, server)
if excqueue is not None:
@ -500,8 +611,9 @@ def listen(server, port, output, excqueue=None):
return
try:
console.print("Serving site at: http://{}:{} - Tap CTRL-C to stop".format(
server, port))
console.print(
"Serving site at: http://{}:{} - Tap CTRL-C to stop".format(server, port)
)
httpd.serve_forever()
except Exception as e:
if excqueue is not None:
@ -518,24 +630,31 @@ def listen(server, port, output, excqueue=None):
def main(argv=None):
args = parse_arguments(argv)
logs_dedup_min_level = getattr(logging, args.logs_dedup_min_level)
init_logging(level=args.verbosity, fatal=args.fatal,
name=__name__, logs_dedup_min_level=logs_dedup_min_level)
init_logging(
level=args.verbosity,
fatal=args.fatal,
name=__name__,
logs_dedup_min_level=logs_dedup_min_level,
)
logger.debug('Pelican version: %s', __version__)
logger.debug('Python version: %s', sys.version.split()[0])
logger.debug("Pelican version: %s", __version__)
logger.debug("Python version: %s", sys.version.split()[0])
try:
pelican, settings = get_instance(args)
if args.autoreload and args.listen:
excqueue = multiprocessing.Queue()
p1 = multiprocessing.Process(
target=autoreload,
args=(args, excqueue))
p1 = multiprocessing.Process(target=autoreload, args=(args, excqueue))
p2 = multiprocessing.Process(
target=listen,
args=(settings.get('BIND'), settings.get('PORT'),
settings.get("OUTPUT_PATH"), excqueue))
args=(
settings.get("BIND"),
settings.get("PORT"),
settings.get("OUTPUT_PATH"),
excqueue,
),
)
try:
p1.start()
p2.start()
@ -548,16 +667,17 @@ def main(argv=None):
elif args.autoreload:
autoreload(args)
elif args.listen:
listen(settings.get('BIND'), settings.get('PORT'),
settings.get("OUTPUT_PATH"))
listen(
settings.get("BIND"), settings.get("PORT"), settings.get("OUTPUT_PATH")
)
else:
with console.status("Generating..."):
pelican.run()
except KeyboardInterrupt:
logger.warning('Keyboard interrupt received. Exiting.')
logger.warning("Keyboard interrupt received. Exiting.")
except Exception as e:
logger.critical("%s: %s", e.__class__.__name__, e)
if args.verbosity == logging.DEBUG:
console.print_exception()
sys.exit(getattr(e, 'exitcode', 1))
sys.exit(getattr(e, "exitcode", 1))