forked from github/pelican
Implementation is buggy and unreliable. Therefore, it is better to remove the functionality until a robust implementation is added.
670 lines
21 KiB
Python
670 lines
21 KiB
Python
import argparse
|
|
import importlib.metadata
|
|
import json
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import pprint
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from collections.abc import Iterable
|
|
|
|
# Combines all paths to `pelican` package accessible from `sys.path`
|
|
# Makes it possible to install `pelican` and namespace plugins into different
|
|
# locations in the file system (e.g. pip with `-e` or `--user`)
|
|
from pkgutil import extend_path
|
|
|
|
__path__ = extend_path(__path__, __name__)
|
|
|
|
# pelican.log has to be the first pelican module to be loaded
|
|
# because logging.setLoggerClass has to be called before logging.getLogger
|
|
from pelican.log import console
|
|
from pelican.log import init as init_logging
|
|
from pelican.generators import (
|
|
ArticlesGenerator, # noqa: I100
|
|
PagesGenerator,
|
|
SourceFileGenerator,
|
|
StaticGenerator,
|
|
TemplatePagesGenerator,
|
|
)
|
|
from pelican.plugins import signals
|
|
from pelican.plugins._utils import get_plugin_name, load_plugins
|
|
from pelican.readers import Readers
|
|
from pelican.server import ComplexHTTPRequestHandler, RootedHTTPServer
|
|
from pelican.settings import read_settings
|
|
from pelican.utils import clean_output_dir, maybe_pluralize, wait_for_changes
|
|
from pelican.writers import Writer
|
|
|
|
try:
|
|
__version__ = importlib.metadata.version("pelican")
|
|
except Exception:
|
|
__version__ = "unknown"
|
|
|
|
DEFAULT_CONFIG_NAME = "pelicanconf.py"
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Pelican:
|
|
def __init__(self, settings):
|
|
"""Pelican initialization
|
|
|
|
Performs some checks on the environment before doing anything else.
|
|
"""
|
|
|
|
# define the default settings
|
|
self.settings = settings
|
|
|
|
self.path = settings["PATH"]
|
|
self.theme = settings["THEME"]
|
|
self.output_path = settings["OUTPUT_PATH"]
|
|
self.ignore_files = settings["IGNORE_FILES"]
|
|
self.delete_outputdir = settings["DELETE_OUTPUT_DIRECTORY"]
|
|
self.output_retention = settings["OUTPUT_RETENTION"]
|
|
|
|
self.init_path()
|
|
self.init_plugins()
|
|
signals.initialized.send(self)
|
|
|
|
def init_path(self):
|
|
if not any(p in sys.path for p in ["", os.curdir]):
|
|
logger.debug("Adding current directory to system path")
|
|
sys.path.insert(0, "")
|
|
|
|
def init_plugins(self):
|
|
self.plugins = []
|
|
for plugin in load_plugins(self.settings):
|
|
name = get_plugin_name(plugin)
|
|
logger.debug("Registering plugin `%s`", name)
|
|
try:
|
|
plugin.register()
|
|
self.plugins.append(plugin)
|
|
except Exception as e:
|
|
logger.error("Cannot register plugin `%s`\n%s", name, e)
|
|
|
|
self.settings["PLUGINS"] = [get_plugin_name(p) for p in self.plugins]
|
|
|
|
def run(self):
|
|
"""Run the generators and return"""
|
|
start_time = time.time()
|
|
|
|
context = self.settings.copy()
|
|
# Share these among all the generators and content objects
|
|
# They map source paths to Content objects or None
|
|
context["generated_content"] = {}
|
|
context["static_links"] = set()
|
|
context["static_content"] = {}
|
|
context["localsiteurl"] = self.settings["SITEURL"]
|
|
|
|
generators = [
|
|
cls(
|
|
context=context,
|
|
settings=self.settings,
|
|
path=self.path,
|
|
theme=self.theme,
|
|
output_path=self.output_path,
|
|
)
|
|
for cls in self._get_generator_classes()
|
|
]
|
|
|
|
# Delete the output directory if (1) the appropriate setting is True
|
|
# and (2) that directory is not the parent of the source directory
|
|
if self.delete_outputdir and os.path.commonpath(
|
|
[os.path.realpath(self.output_path)]
|
|
) != os.path.commonpath(
|
|
[os.path.realpath(self.output_path), os.path.realpath(self.path)]
|
|
):
|
|
clean_output_dir(self.output_path, self.output_retention)
|
|
|
|
for p in generators:
|
|
if hasattr(p, "generate_context"):
|
|
p.generate_context()
|
|
|
|
for p in generators:
|
|
if hasattr(p, "refresh_metadata_intersite_links"):
|
|
p.refresh_metadata_intersite_links()
|
|
|
|
signals.all_generators_finalized.send(generators)
|
|
|
|
writer = self._get_writer()
|
|
|
|
for p in generators:
|
|
if hasattr(p, "generate_output"):
|
|
p.generate_output(writer)
|
|
|
|
signals.finalized.send(self)
|
|
|
|
articles_generator = next(
|
|
g for g in generators if isinstance(g, ArticlesGenerator)
|
|
)
|
|
pages_generator = next(g for g in generators if isinstance(g, PagesGenerator))
|
|
|
|
pluralized_articles = maybe_pluralize(
|
|
(len(articles_generator.articles) + len(articles_generator.translations)),
|
|
"article",
|
|
"articles",
|
|
)
|
|
pluralized_drafts = maybe_pluralize(
|
|
(
|
|
len(articles_generator.drafts)
|
|
+ len(articles_generator.drafts_translations)
|
|
),
|
|
"draft",
|
|
"drafts",
|
|
)
|
|
pluralized_hidden_articles = maybe_pluralize(
|
|
(
|
|
len(articles_generator.hidden_articles)
|
|
+ len(articles_generator.hidden_translations)
|
|
),
|
|
"hidden article",
|
|
"hidden articles",
|
|
)
|
|
pluralized_pages = maybe_pluralize(
|
|
(len(pages_generator.pages) + len(pages_generator.translations)),
|
|
"page",
|
|
"pages",
|
|
)
|
|
pluralized_hidden_pages = maybe_pluralize(
|
|
(
|
|
len(pages_generator.hidden_pages)
|
|
+ len(pages_generator.hidden_translations)
|
|
),
|
|
"hidden page",
|
|
"hidden pages",
|
|
)
|
|
pluralized_draft_pages = maybe_pluralize(
|
|
(
|
|
len(pages_generator.draft_pages)
|
|
+ len(pages_generator.draft_translations)
|
|
),
|
|
"draft page",
|
|
"draft pages",
|
|
)
|
|
|
|
console.print(
|
|
"Done: Processed {}, {}, {}, {}, {} and {} in {:.2f} seconds.".format(
|
|
pluralized_articles,
|
|
pluralized_drafts,
|
|
pluralized_hidden_articles,
|
|
pluralized_pages,
|
|
pluralized_hidden_pages,
|
|
pluralized_draft_pages,
|
|
time.time() - start_time,
|
|
)
|
|
)
|
|
|
|
def _get_generator_classes(self):
|
|
discovered_generators = [
|
|
(ArticlesGenerator, "internal"),
|
|
(PagesGenerator, "internal"),
|
|
]
|
|
|
|
if self.settings["TEMPLATE_PAGES"]:
|
|
discovered_generators.append((TemplatePagesGenerator, "internal"))
|
|
|
|
if self.settings["OUTPUT_SOURCES"]:
|
|
discovered_generators.append((SourceFileGenerator, "internal"))
|
|
|
|
for receiver, values in signals.get_generators.send(self):
|
|
if not isinstance(values, Iterable):
|
|
values = (values,)
|
|
for generator in values:
|
|
if generator is None:
|
|
continue # plugin did not return a generator
|
|
discovered_generators.append((generator, receiver.__module__))
|
|
|
|
# StaticGenerator must run last, so it can identify files that
|
|
# were skipped by the other generators, and so static files can
|
|
# have their output paths overridden by the {attach} link syntax.
|
|
discovered_generators.append((StaticGenerator, "internal"))
|
|
|
|
generators = []
|
|
|
|
for generator, origin in discovered_generators:
|
|
if not isinstance(generator, type):
|
|
logger.error("Generator %s (%s) cannot be loaded", generator, origin)
|
|
continue
|
|
|
|
logger.debug("Found generator: %s (%s)", generator.__name__, origin)
|
|
generators.append(generator)
|
|
|
|
return generators
|
|
|
|
def _get_writer(self):
|
|
writers = [w for _, w in signals.get_writer.send(self) if isinstance(w, type)]
|
|
num_writers = len(writers)
|
|
|
|
if num_writers == 0:
|
|
return Writer(self.output_path, settings=self.settings)
|
|
|
|
if num_writers > 1:
|
|
logger.warning("%s writers found, using only first one", num_writers)
|
|
|
|
writer = writers[0]
|
|
|
|
logger.debug("Found writer: %s (%s)", writer.__name__, writer.__module__)
|
|
return writer(self.output_path, settings=self.settings)
|
|
|
|
|
|
class PrintSettings(argparse.Action):
|
|
def __call__(self, parser, namespace, values, option_string):
|
|
init_logging(name=__name__)
|
|
|
|
try:
|
|
instance, settings = get_instance(namespace)
|
|
except Exception as e:
|
|
logger.critical("%s: %s", e.__class__.__name__, e)
|
|
console.print_exception()
|
|
sys.exit(getattr(e, "exitcode", 1))
|
|
|
|
if values:
|
|
# One or more arguments provided, so only print those settings
|
|
for setting in values:
|
|
if setting in settings:
|
|
# Only add newline between setting name and value if dict
|
|
if isinstance(settings[setting], (dict, tuple, list)):
|
|
setting_format = "\n{}:\n{}"
|
|
else:
|
|
setting_format = "\n{}: {}"
|
|
console.print(
|
|
setting_format.format(
|
|
setting, pprint.pformat(settings[setting])
|
|
)
|
|
)
|
|
else:
|
|
console.print(f"\n{setting} is not a recognized setting.")
|
|
break
|
|
else:
|
|
# No argument was given to --print-settings, so print all settings
|
|
console.print(settings)
|
|
|
|
parser.exit()
|
|
|
|
|
|
class ParseOverrides(argparse.Action):
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
overrides = {}
|
|
for item in values:
|
|
try:
|
|
k, v = item.split("=", 1)
|
|
except ValueError:
|
|
raise ValueError(
|
|
"Extra settings must be specified as KEY=VALUE pairs "
|
|
f"but you specified {item}"
|
|
)
|
|
try:
|
|
overrides[k] = json.loads(v)
|
|
except json.decoder.JSONDecodeError:
|
|
raise ValueError(
|
|
f"Invalid JSON value: {v}. "
|
|
"Values specified via -e / --extra-settings flags "
|
|
"must be in JSON notation. "
|
|
"Use -e KEY='\"string\"' to specify a string value; "
|
|
"-e KEY=null to specify None; "
|
|
"-e KEY=false (or true) to specify False (or True)."
|
|
)
|
|
setattr(namespace, self.dest, overrides)
|
|
|
|
|
|
def parse_arguments(argv=None):
|
|
parser = argparse.ArgumentParser(
|
|
description="A tool to generate a static blog, "
|
|
" with restructured text input files.",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
)
|
|
|
|
parser.add_argument(
|
|
dest="path",
|
|
nargs="?",
|
|
help="Path where to find the content files.",
|
|
default=None,
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-t",
|
|
"--theme-path",
|
|
dest="theme",
|
|
help="Path where to find the theme templates. If not "
|
|
"specified, it will use the default one included with "
|
|
"pelican.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output",
|
|
dest="output",
|
|
help="Where to output the generated files. If not "
|
|
"specified, a directory will be created, named "
|
|
'"output" in the current path.',
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-s",
|
|
"--settings",
|
|
dest="settings",
|
|
help="The settings of the application, this is "
|
|
"automatically set to {} if a file exists with this "
|
|
"name.".format(DEFAULT_CONFIG_NAME),
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-d",
|
|
"--delete-output-directory",
|
|
dest="delete_outputdir",
|
|
action="store_true",
|
|
default=None,
|
|
help="Delete the output directory.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-v",
|
|
"--verbose",
|
|
action="store_const",
|
|
const=logging.INFO,
|
|
dest="verbosity",
|
|
help="Show all messages.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-q",
|
|
"--quiet",
|
|
action="store_const",
|
|
const=logging.CRITICAL,
|
|
dest="verbosity",
|
|
help="Show only critical errors.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-D",
|
|
"--debug",
|
|
action="store_const",
|
|
const=logging.DEBUG,
|
|
dest="verbosity",
|
|
help="Show all messages, including debug messages.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--version",
|
|
action="version",
|
|
version=__version__,
|
|
help="Print the pelican version and exit.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-r",
|
|
"--autoreload",
|
|
dest="autoreload",
|
|
action="store_true",
|
|
help="Relaunch pelican each time a modification occurs"
|
|
" on the content files.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--print-settings",
|
|
dest="print_settings",
|
|
nargs="*",
|
|
action=PrintSettings,
|
|
metavar="SETTING_NAME",
|
|
help="Print current configuration settings and exit. "
|
|
"Append one or more setting name arguments to see the "
|
|
"values for specific settings only.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--relative-urls",
|
|
dest="relative_paths",
|
|
action="store_true",
|
|
help="Use relative urls in output, " "useful for site development",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--cache-path",
|
|
dest="cache_path",
|
|
help=(
|
|
"Directory in which to store cache files. "
|
|
'If not specified, defaults to "cache".'
|
|
),
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--ignore-cache",
|
|
action="store_true",
|
|
dest="ignore_cache",
|
|
help="Ignore content cache " "from previous runs by not loading cache files.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--fatal",
|
|
metavar="errors|warnings",
|
|
choices=("errors", "warnings"),
|
|
default="",
|
|
help=(
|
|
"Exit the program with non-zero status if any "
|
|
"errors/warnings encountered."
|
|
),
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--logs-dedup-min-level",
|
|
default="WARNING",
|
|
choices=("DEBUG", "INFO", "WARNING", "ERROR"),
|
|
help=(
|
|
"Only enable log de-duplication for levels equal"
|
|
" to or above the specified value"
|
|
),
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-l",
|
|
"--listen",
|
|
dest="listen",
|
|
action="store_true",
|
|
help="Serve content files via HTTP and port 8000.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-p",
|
|
"--port",
|
|
dest="port",
|
|
type=int,
|
|
help="Port to serve HTTP files at. (default: 8000)",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-b",
|
|
"--bind",
|
|
dest="bind",
|
|
help="IP to bind to when serving files via HTTP " "(default: 127.0.0.1)",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-e",
|
|
"--extra-settings",
|
|
dest="overrides",
|
|
help="Specify one or more SETTING=VALUE pairs to "
|
|
"override settings. VALUE must be in JSON notation: "
|
|
"specify string values as SETTING='\"some string\"'; "
|
|
"booleans as SETTING=true or SETTING=false; "
|
|
"None as SETTING=null.",
|
|
nargs="*",
|
|
action=ParseOverrides,
|
|
default={},
|
|
)
|
|
|
|
args = parser.parse_args(argv)
|
|
|
|
if args.port is not None and not args.listen:
|
|
logger.warning("--port without --listen has no effect")
|
|
if args.bind is not None and not args.listen:
|
|
logger.warning("--bind without --listen has no effect")
|
|
|
|
return args
|
|
|
|
|
|
def get_config(args):
|
|
"""Builds a config dictionary based on supplied `args`."""
|
|
config = {}
|
|
if args.path:
|
|
config["PATH"] = os.path.abspath(os.path.expanduser(args.path))
|
|
if args.output:
|
|
config["OUTPUT_PATH"] = os.path.abspath(os.path.expanduser(args.output))
|
|
if args.theme:
|
|
abstheme = os.path.abspath(os.path.expanduser(args.theme))
|
|
config["THEME"] = abstheme if os.path.exists(abstheme) else args.theme
|
|
if args.delete_outputdir is not None:
|
|
config["DELETE_OUTPUT_DIRECTORY"] = args.delete_outputdir
|
|
if args.ignore_cache:
|
|
config["LOAD_CONTENT_CACHE"] = False
|
|
if args.cache_path:
|
|
config["CACHE_PATH"] = args.cache_path
|
|
if args.relative_paths:
|
|
config["RELATIVE_URLS"] = args.relative_paths
|
|
if args.port is not None:
|
|
config["PORT"] = args.port
|
|
if args.bind is not None:
|
|
config["BIND"] = args.bind
|
|
config["DEBUG"] = args.verbosity == logging.DEBUG
|
|
config.update(args.overrides)
|
|
|
|
return config
|
|
|
|
|
|
def get_instance(args):
|
|
config_file = args.settings
|
|
if config_file is None and os.path.isfile(DEFAULT_CONFIG_NAME):
|
|
config_file = DEFAULT_CONFIG_NAME
|
|
args.settings = DEFAULT_CONFIG_NAME
|
|
|
|
settings = read_settings(config_file, override=get_config(args))
|
|
|
|
cls = settings["PELICAN_CLASS"]
|
|
if isinstance(cls, str):
|
|
module, cls_name = cls.rsplit(".", 1)
|
|
module = __import__(module)
|
|
cls = getattr(module, cls_name)
|
|
|
|
return cls(settings), settings
|
|
|
|
|
|
def autoreload(args, excqueue=None):
|
|
console.print(
|
|
" --- AutoReload Mode: Monitoring `content`, `theme` and"
|
|
" `settings` for changes. ---"
|
|
)
|
|
pelican, settings = get_instance(args)
|
|
settings_file = os.path.abspath(args.settings)
|
|
while True:
|
|
try:
|
|
pelican.run()
|
|
|
|
changed_files = wait_for_changes(args.settings, Readers, settings)
|
|
changed_files = {c[1] for c in changed_files}
|
|
|
|
if settings_file in changed_files:
|
|
pelican, settings = get_instance(args)
|
|
|
|
console.print(
|
|
"\n-> Modified: {}. re-generating...".format(", ".join(changed_files))
|
|
)
|
|
|
|
except KeyboardInterrupt:
|
|
if excqueue is not None:
|
|
excqueue.put(None)
|
|
return
|
|
raise
|
|
|
|
except Exception as e:
|
|
if args.verbosity == logging.DEBUG:
|
|
if excqueue is not None:
|
|
excqueue.put(traceback.format_exception_only(type(e), e)[-1])
|
|
else:
|
|
raise
|
|
logger.warning(
|
|
'Caught exception:\n"%s".', e, exc_info=settings.get("DEBUG", False)
|
|
)
|
|
|
|
|
|
def listen(server, port, output, excqueue=None):
|
|
# set logging level to at least "INFO" (so we can see the server requests)
|
|
if logger.level < logging.INFO:
|
|
logger.setLevel(logging.INFO)
|
|
|
|
RootedHTTPServer.allow_reuse_address = True
|
|
try:
|
|
httpd = RootedHTTPServer(output, (server, port), ComplexHTTPRequestHandler)
|
|
except OSError as e:
|
|
logging.error("Could not listen on port %s, server %s.", port, server)
|
|
if excqueue is not None:
|
|
excqueue.put(traceback.format_exception_only(type(e), e)[-1])
|
|
return
|
|
|
|
try:
|
|
console.print(f"Serving site at: http://{server}:{port} - Tap CTRL-C to stop")
|
|
httpd.serve_forever()
|
|
except Exception as e:
|
|
if excqueue is not None:
|
|
excqueue.put(traceback.format_exception_only(type(e), e)[-1])
|
|
return
|
|
|
|
except KeyboardInterrupt:
|
|
httpd.socket.close()
|
|
if excqueue is not None:
|
|
return
|
|
raise
|
|
|
|
|
|
def main(argv=None):
|
|
args = parse_arguments(argv)
|
|
logs_dedup_min_level = getattr(logging, args.logs_dedup_min_level)
|
|
init_logging(
|
|
level=args.verbosity,
|
|
fatal=args.fatal,
|
|
name=__name__,
|
|
logs_dedup_min_level=logs_dedup_min_level,
|
|
)
|
|
|
|
logger.debug("Pelican version: %s", __version__)
|
|
logger.debug("Python version: %s", sys.version.split()[0])
|
|
|
|
try:
|
|
pelican, settings = get_instance(args)
|
|
|
|
if args.autoreload and args.listen:
|
|
excqueue = multiprocessing.Queue()
|
|
p1 = multiprocessing.Process(target=autoreload, args=(args, excqueue))
|
|
p2 = multiprocessing.Process(
|
|
target=listen,
|
|
args=(
|
|
settings.get("BIND"),
|
|
settings.get("PORT"),
|
|
settings.get("OUTPUT_PATH"),
|
|
excqueue,
|
|
),
|
|
)
|
|
try:
|
|
p1.start()
|
|
p2.start()
|
|
exc = excqueue.get()
|
|
if exc is not None:
|
|
logger.critical(exc)
|
|
finally:
|
|
p1.terminate()
|
|
p2.terminate()
|
|
elif args.autoreload:
|
|
autoreload(args)
|
|
elif args.listen:
|
|
listen(
|
|
settings.get("BIND"), settings.get("PORT"), settings.get("OUTPUT_PATH")
|
|
)
|
|
else:
|
|
with console.status("Generating..."):
|
|
pelican.run()
|
|
except KeyboardInterrupt:
|
|
logger.warning("Keyboard interrupt received. Exiting.")
|
|
except Exception as e:
|
|
logger.critical("%s: %s", e.__class__.__name__, e)
|
|
|
|
if args.verbosity == logging.DEBUG:
|
|
console.print_exception()
|
|
sys.exit(getattr(e, "exitcode", 1))
|