1
0
Fork 0
forked from github/pelican
pelican-theme/pelican/__init__.py
2024-05-30 10:53:38 -05:00

686 lines
22 KiB
Python

import argparse
import importlib.metadata
import json
import logging
import multiprocessing
import os
import pprint
import sys
import time
import traceback
from collections.abc import Iterable
# Combines all paths to `pelican` package accessible from `sys.path`
# Makes it possible to install `pelican` and namespace plugins into different
# locations in the file system (e.g. pip with `-e` or `--user`)
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# pelican.log has to be the first pelican module to be loaded
# because logging.setLoggerClass has to be called before logging.getLogger
from pelican.log import console, DEFAULT_LOG_HANDLER # noqa: I001
from pelican.log import init as init_logging
from pelican.generators import (
ArticlesGenerator,
PagesGenerator,
SourceFileGenerator,
StaticGenerator,
TemplatePagesGenerator,
)
from pelican.plugins import signals
from pelican.plugins._utils import get_plugin_name, load_plugins
from pelican.readers import Readers
from pelican.server import ComplexHTTPRequestHandler, RootedHTTPServer
from pelican.settings import read_settings
from pelican.utils import clean_output_dir, maybe_pluralize, wait_for_changes
from pelican.writers import Writer
try:
__version__ = importlib.metadata.version("pelican")
except Exception:
__version__ = "unknown"
DEFAULT_CONFIG_NAME = "pelicanconf.py"
logger = logging.getLogger(__name__)
class Pelican:
def __init__(self, settings):
"""Pelican initialization
Performs some checks on the environment before doing anything else.
"""
# define the default settings
self.settings = settings
self.path = settings["PATH"]
self.theme = settings["THEME"]
self.output_path = settings["OUTPUT_PATH"]
self.ignore_files = settings["IGNORE_FILES"]
self.delete_outputdir = settings["DELETE_OUTPUT_DIRECTORY"]
self.output_retention = settings["OUTPUT_RETENTION"]
self.init_path()
self.init_plugins()
signals.initialized.send(self)
def init_path(self):
if not any(p in sys.path for p in ["", os.curdir]):
logger.debug("Adding current directory to system path")
sys.path.insert(0, "")
def init_plugins(self):
self.plugins = []
for plugin in load_plugins(self.settings):
name = get_plugin_name(plugin)
logger.debug("Registering plugin `%s`", name)
try:
plugin.register()
self.plugins.append(plugin)
except Exception as e:
logger.error(
"Cannot register plugin `%s`\n%s",
name,
e,
stacklevel=2,
)
if self.settings.get("DEBUG", False):
console.print_exception()
self.settings["PLUGINS"] = [get_plugin_name(p) for p in self.plugins]
def run(self):
"""Run the generators and return"""
start_time = time.time()
context = self.settings.copy()
# Share these among all the generators and content objects
# They map source paths to Content objects or None
context["generated_content"] = {}
context["static_links"] = set()
context["static_content"] = {}
context["localsiteurl"] = self.settings["SITEURL"]
generators = [
cls(
context=context,
settings=self.settings,
path=self.path,
theme=self.theme,
output_path=self.output_path,
)
for cls in self._get_generator_classes()
]
# Delete the output directory if (1) the appropriate setting is True
# and (2) that directory is not the parent of the source directory
if self.delete_outputdir and os.path.commonpath(
[os.path.realpath(self.output_path)]
) != os.path.commonpath(
[os.path.realpath(self.output_path), os.path.realpath(self.path)]
):
clean_output_dir(self.output_path, self.output_retention)
for p in generators:
if hasattr(p, "generate_context"):
p.generate_context()
# for plugins that create/edit the summary
logger.debug("Signal all_generators_finalized.send(<generators>)")
signals.all_generators_finalized.send(generators)
# update links in the summary, etc
for p in generators:
if hasattr(p, "refresh_metadata_intersite_links"):
p.refresh_metadata_intersite_links()
writer = self._get_writer()
for p in generators:
if hasattr(p, "generate_output"):
p.generate_output(writer)
signals.finalized.send(self)
articles_generator = next(
g for g in generators if isinstance(g, ArticlesGenerator)
)
pages_generator = next(g for g in generators if isinstance(g, PagesGenerator))
pluralized_articles = maybe_pluralize(
(len(articles_generator.articles) + len(articles_generator.translations)),
"article",
"articles",
)
pluralized_drafts = maybe_pluralize(
(
len(articles_generator.drafts)
+ len(articles_generator.drafts_translations)
),
"draft",
"drafts",
)
pluralized_hidden_articles = maybe_pluralize(
(
len(articles_generator.hidden_articles)
+ len(articles_generator.hidden_translations)
),
"hidden article",
"hidden articles",
)
pluralized_pages = maybe_pluralize(
(len(pages_generator.pages) + len(pages_generator.translations)),
"page",
"pages",
)
pluralized_hidden_pages = maybe_pluralize(
(
len(pages_generator.hidden_pages)
+ len(pages_generator.hidden_translations)
),
"hidden page",
"hidden pages",
)
pluralized_draft_pages = maybe_pluralize(
(
len(pages_generator.draft_pages)
+ len(pages_generator.draft_translations)
),
"draft page",
"draft pages",
)
console.print(
f"Done: Processed {pluralized_articles}, {pluralized_drafts}, {pluralized_hidden_articles}, {pluralized_pages}, {pluralized_hidden_pages} and {pluralized_draft_pages} in {time.time() - start_time:.2f} seconds."
)
def _get_generator_classes(self):
discovered_generators = [
(ArticlesGenerator, "internal"),
(PagesGenerator, "internal"),
]
if self.settings["TEMPLATE_PAGES"]:
discovered_generators.append((TemplatePagesGenerator, "internal"))
if self.settings["OUTPUT_SOURCES"]:
discovered_generators.append((SourceFileGenerator, "internal"))
for receiver, values in signals.get_generators.send(self):
if not isinstance(values, Iterable):
values = (values,)
for generator in values:
if generator is None:
continue # plugin did not return a generator
discovered_generators.append((generator, receiver.__module__))
# StaticGenerator must run last, so it can identify files that
# were skipped by the other generators, and so static files can
# have their output paths overridden by the {attach} link syntax.
discovered_generators.append((StaticGenerator, "internal"))
generators = []
for generator, origin in discovered_generators:
if not isinstance(generator, type):
logger.error("Generator %s (%s) cannot be loaded", generator, origin)
continue
logger.debug("Found generator: %s (%s)", generator.__name__, origin)
generators.append(generator)
return generators
def _get_writer(self):
writers = [w for _, w in signals.get_writer.send(self) if isinstance(w, type)]
num_writers = len(writers)
if num_writers == 0:
return Writer(self.output_path, settings=self.settings)
if num_writers > 1:
logger.warning("%s writers found, using only first one", num_writers)
writer = writers[0]
logger.debug("Found writer: %s (%s)", writer.__name__, writer.__module__)
return writer(self.output_path, settings=self.settings)
class PrintSettings(argparse.Action):
def __call__(self, parser, namespace, values, option_string):
init_logging(name=__name__)
try:
instance, settings = get_instance(namespace)
except Exception as e:
logger.critical("%s: %s", e.__class__.__name__, e)
console.print_exception()
sys.exit(getattr(e, "exitcode", 1))
if values:
# One or more arguments provided, so only print those settings
for setting in values:
if setting in settings:
# Only add newline between setting name and value if dict
if isinstance(settings[setting], (dict, tuple, list)):
setting_format = "\n{}:\n{}"
else:
setting_format = "\n{}: {}"
console.print(
setting_format.format(
setting, pprint.pformat(settings[setting])
)
)
else:
console.print(f"\n{setting} is not a recognized setting.")
break
else:
# No argument was given to --print-settings, so print all settings
console.print(settings)
parser.exit()
class ParseOverrides(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
overrides = {}
for item in values:
try:
k, v = item.split("=", 1)
except ValueError:
raise ValueError(
"Extra settings must be specified as KEY=VALUE pairs "
f"but you specified {item}"
)
try:
overrides[k] = json.loads(v)
except json.decoder.JSONDecodeError:
raise ValueError(
f"Invalid JSON value: {v}. "
"Values specified via -e / --extra-settings flags "
"must be in JSON notation. "
"Use -e KEY='\"string\"' to specify a string value; "
"-e KEY=null to specify None; "
"-e KEY=false (or true) to specify False (or True)."
)
setattr(namespace, self.dest, overrides)
def parse_arguments(argv=None):
parser = argparse.ArgumentParser(
description="A tool to generate a static blog, "
" with restructured text input files.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
dest="path",
nargs="?",
help="Path where to find the content files.",
default=None,
)
parser.add_argument(
"-t",
"--theme-path",
dest="theme",
help="Path where to find the theme templates. If not "
"specified, it will use the default one included with "
"pelican.",
)
parser.add_argument(
"-o",
"--output",
dest="output",
help="Where to output the generated files. If not "
"specified, a directory will be created, named "
'"output" in the current path.',
)
parser.add_argument(
"-s",
"--settings",
dest="settings",
help="The settings of the application, this is "
f"automatically set to {DEFAULT_CONFIG_NAME} if a file exists with this "
"name.",
)
parser.add_argument(
"-d",
"--delete-output-directory",
dest="delete_outputdir",
action="store_true",
default=None,
help="Delete the output directory.",
)
parser.add_argument(
"-v",
"--verbose",
action="store_const",
const=logging.INFO,
dest="verbosity",
help="Show all messages.",
)
parser.add_argument(
"-q",
"--quiet",
action="store_const",
const=logging.CRITICAL,
dest="verbosity",
help="Show only critical errors.",
)
parser.add_argument(
"-D",
"--debug",
action="store_const",
const=logging.DEBUG,
dest="verbosity",
help="Show all messages, including debug messages.",
)
parser.add_argument(
"--version",
action="version",
version=__version__,
help="Print the pelican version and exit.",
)
parser.add_argument(
"-r",
"--autoreload",
dest="autoreload",
action="store_true",
help="Relaunch pelican each time a modification occurs"
" on the content files.",
)
parser.add_argument(
"--print-settings",
dest="print_settings",
nargs="*",
action=PrintSettings,
metavar="SETTING_NAME",
help="Print current configuration settings and exit. "
"Append one or more setting name arguments to see the "
"values for specific settings only.",
)
parser.add_argument(
"--relative-urls",
dest="relative_paths",
action="store_true",
help="Use relative urls in output, " "useful for site development",
)
parser.add_argument(
"--cache-path",
dest="cache_path",
help=(
"Directory in which to store cache files. "
'If not specified, defaults to "cache".'
),
)
parser.add_argument(
"--ignore-cache",
action="store_true",
dest="ignore_cache",
help="Ignore content cache " "from previous runs by not loading cache files.",
)
parser.add_argument(
"--fatal",
metavar="errors|warnings",
choices=("errors", "warnings"),
default="",
help=(
"Exit the program with non-zero status if any "
"errors/warnings encountered."
),
)
LOG_HANDLERS = {"plain": None, "rich": DEFAULT_LOG_HANDLER}
parser.add_argument(
"--log-handler",
default="rich",
choices=LOG_HANDLERS,
help=(
"Which handler to use to format log messages. "
"The `rich` handler prints output in columns."
),
)
parser.add_argument(
"--logs-dedup-min-level",
default="WARNING",
choices=("DEBUG", "INFO", "WARNING", "ERROR"),
help=(
"Only enable log de-duplication for levels equal"
" to or above the specified value"
),
)
parser.add_argument(
"-l",
"--listen",
dest="listen",
action="store_true",
help="Serve content files via HTTP and port 8000.",
)
parser.add_argument(
"-p",
"--port",
dest="port",
type=int,
help="Port to serve HTTP files at. (default: 8000)",
)
parser.add_argument(
"-b",
"--bind",
dest="bind",
help="IP to bind to when serving files via HTTP " "(default: 127.0.0.1)",
)
parser.add_argument(
"-e",
"--extra-settings",
dest="overrides",
help="Specify one or more SETTING=VALUE pairs to "
"override settings. VALUE must be in JSON notation: "
"specify string values as SETTING='\"some string\"'; "
"booleans as SETTING=true or SETTING=false; "
"None as SETTING=null.",
nargs="*",
action=ParseOverrides,
default={},
)
args = parser.parse_args(argv)
if args.port is not None and not args.listen:
logger.warning("--port without --listen has no effect")
if args.bind is not None and not args.listen:
logger.warning("--bind without --listen has no effect")
args.log_handler = LOG_HANDLERS[args.log_handler]
return args
def get_config(args):
"""Builds a config dictionary based on supplied `args`."""
config = {}
if args.path:
config["PATH"] = os.path.abspath(os.path.expanduser(args.path))
if args.output:
config["OUTPUT_PATH"] = os.path.abspath(os.path.expanduser(args.output))
if args.theme:
abstheme = os.path.abspath(os.path.expanduser(args.theme))
config["THEME"] = abstheme if os.path.exists(abstheme) else args.theme
if args.delete_outputdir is not None:
config["DELETE_OUTPUT_DIRECTORY"] = args.delete_outputdir
if args.ignore_cache:
config["LOAD_CONTENT_CACHE"] = False
if args.cache_path:
config["CACHE_PATH"] = args.cache_path
if args.relative_paths:
config["RELATIVE_URLS"] = args.relative_paths
if args.port is not None:
config["PORT"] = args.port
if args.bind is not None:
config["BIND"] = args.bind
config["DEBUG"] = args.verbosity == logging.DEBUG
config.update(args.overrides)
return config
def get_instance(args):
config_file = args.settings
if config_file is None and os.path.isfile(DEFAULT_CONFIG_NAME):
config_file = DEFAULT_CONFIG_NAME
args.settings = DEFAULT_CONFIG_NAME
settings = read_settings(config_file, override=get_config(args))
cls = settings["PELICAN_CLASS"]
if isinstance(cls, str):
module, cls_name = cls.rsplit(".", 1)
module = __import__(module)
cls = getattr(module, cls_name)
return cls(settings), settings
def autoreload(args, excqueue=None):
console.print(
" --- AutoReload Mode: Monitoring `content`, `theme` and"
" `settings` for changes. ---"
)
pelican, settings = get_instance(args)
settings_file = os.path.abspath(args.settings)
while True:
try:
pelican.run()
changed_files = wait_for_changes(args.settings, Readers, settings)
changed_files = {c[1] for c in changed_files}
if settings_file in changed_files:
pelican, settings = get_instance(args)
console.print(
"\n-> Modified: {}. re-generating...".format(", ".join(changed_files))
)
except KeyboardInterrupt:
if excqueue is not None:
excqueue.put(None)
return
raise
except Exception as e:
if args.verbosity == logging.DEBUG:
if excqueue is not None:
excqueue.put(traceback.format_exception_only(type(e), e)[-1])
else:
raise
logger.warning(
'Caught exception:\n"%s".', e, exc_info=settings.get("DEBUG", False)
)
def listen(server, port, output, excqueue=None):
# set logging level to at least "INFO" (so we can see the server requests)
if logger.level < logging.INFO:
logger.setLevel(logging.INFO)
RootedHTTPServer.allow_reuse_address = True
try:
httpd = RootedHTTPServer(output, (server, port), ComplexHTTPRequestHandler)
except OSError as e:
logging.error("Could not listen on port %s, server %s.", port, server)
if excqueue is not None:
excqueue.put(traceback.format_exception_only(type(e), e)[-1])
return
try:
console.print(f"Serving site at: http://{server}:{port} - Tap CTRL-C to stop")
httpd.serve_forever()
except Exception as e:
if excqueue is not None:
excqueue.put(traceback.format_exception_only(type(e), e)[-1])
return
except KeyboardInterrupt:
httpd.socket.close()
if excqueue is not None:
return
raise
def main(argv=None):
args = parse_arguments(argv)
logs_dedup_min_level = getattr(logging, args.logs_dedup_min_level)
init_logging(
level=args.verbosity,
fatal=args.fatal,
name=__name__,
handler=args.log_handler,
logs_dedup_min_level=logs_dedup_min_level,
)
logger.debug("Pelican version: %s", __version__)
logger.debug("Python version: %s", sys.version.split()[0])
try:
pelican, settings = get_instance(args)
if args.autoreload and args.listen:
excqueue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=autoreload, args=(args, excqueue))
p2 = multiprocessing.Process(
target=listen,
args=(
settings.get("BIND"),
settings.get("PORT"),
settings.get("OUTPUT_PATH"),
excqueue,
),
)
try:
p1.start()
p2.start()
exc = excqueue.get()
if exc is not None:
logger.critical(exc)
finally:
p1.terminate()
p2.terminate()
elif args.autoreload:
autoreload(args)
elif args.listen:
listen(
settings.get("BIND"), settings.get("PORT"), settings.get("OUTPUT_PATH")
)
else:
with console.status("Generating..."):
pelican.run()
except KeyboardInterrupt:
logger.warning("Keyboard interrupt received. Exiting.")
except Exception as e:
logger.critical("%s: %s", e.__class__.__name__, e)
if args.verbosity == logging.DEBUG:
console.print_exception()
sys.exit(getattr(e, "exitcode", 1))