forked from github/pelican
Get rid of the coerce_overrides() function. Add the ParseOverrides argparse.Action to parse overrides. Treat all extra settings values strictly as json values. Test overrides. Edit docs and cli help.
570 lines
22 KiB
Python
570 lines
22 KiB
Python
import argparse
|
|
import json
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import pprint
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from collections.abc import Iterable
|
|
# Combines all paths to `pelican` package accessible from `sys.path`
|
|
# Makes it possible to install `pelican` and namespace plugins into different
|
|
# locations in the file system (e.g. pip with `-e` or `--user`)
|
|
from pkgutil import extend_path
|
|
__path__ = extend_path(__path__, __name__)
|
|
|
|
# pelican.log has to be the first pelican module to be loaded
|
|
# because logging.setLoggerClass has to be called before logging.getLogger
|
|
from pelican.log import console
|
|
from pelican.log import init as init_logging
|
|
from pelican.generators import (ArticlesGenerator, # noqa: I100
|
|
PagesGenerator, SourceFileGenerator,
|
|
StaticGenerator, TemplatePagesGenerator)
|
|
from pelican.plugins import signals
|
|
from pelican.plugins._utils import get_plugin_name, load_plugins
|
|
from pelican.readers import Readers
|
|
from pelican.server import ComplexHTTPRequestHandler, RootedHTTPServer
|
|
from pelican.settings import read_settings
|
|
from pelican.utils import (FileSystemWatcher, clean_output_dir, maybe_pluralize)
|
|
from pelican.writers import Writer
|
|
|
|
try:
|
|
__version__ = __import__('pkg_resources') \
|
|
.get_distribution('pelican').version
|
|
except Exception:
|
|
__version__ = "unknown"
|
|
|
|
DEFAULT_CONFIG_NAME = 'pelicanconf.py'
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Pelican:
|
|
|
|
def __init__(self, settings):
|
|
"""Pelican initialization
|
|
|
|
Performs some checks on the environment before doing anything else.
|
|
"""
|
|
|
|
# define the default settings
|
|
self.settings = settings
|
|
|
|
self.path = settings['PATH']
|
|
self.theme = settings['THEME']
|
|
self.output_path = settings['OUTPUT_PATH']
|
|
self.ignore_files = settings['IGNORE_FILES']
|
|
self.delete_outputdir = settings['DELETE_OUTPUT_DIRECTORY']
|
|
self.output_retention = settings['OUTPUT_RETENTION']
|
|
|
|
self.init_path()
|
|
self.init_plugins()
|
|
signals.initialized.send(self)
|
|
|
|
def init_path(self):
|
|
if not any(p in sys.path for p in ['', os.curdir]):
|
|
logger.debug("Adding current directory to system path")
|
|
sys.path.insert(0, '')
|
|
|
|
def init_plugins(self):
|
|
self.plugins = []
|
|
for plugin in load_plugins(self.settings):
|
|
name = get_plugin_name(plugin)
|
|
logger.debug('Registering plugin `%s`', name)
|
|
try:
|
|
plugin.register()
|
|
self.plugins.append(plugin)
|
|
except Exception as e:
|
|
logger.error('Cannot register plugin `%s`\n%s',
|
|
name, e)
|
|
|
|
self.settings['PLUGINS'] = [get_plugin_name(p) for p in self.plugins]
|
|
|
|
def run(self):
|
|
"""Run the generators and return"""
|
|
start_time = time.time()
|
|
|
|
context = self.settings.copy()
|
|
# Share these among all the generators and content objects
|
|
# They map source paths to Content objects or None
|
|
context['generated_content'] = {}
|
|
context['static_links'] = set()
|
|
context['static_content'] = {}
|
|
context['localsiteurl'] = self.settings['SITEURL']
|
|
|
|
generators = [
|
|
cls(
|
|
context=context,
|
|
settings=self.settings,
|
|
path=self.path,
|
|
theme=self.theme,
|
|
output_path=self.output_path,
|
|
) for cls in self._get_generator_classes()
|
|
]
|
|
|
|
# Delete the output directory if (1) the appropriate setting is True
|
|
# and (2) that directory is not the parent of the source directory
|
|
if (self.delete_outputdir
|
|
and os.path.commonpath([os.path.realpath(self.output_path)]) !=
|
|
os.path.commonpath([os.path.realpath(self.output_path),
|
|
os.path.realpath(self.path)])):
|
|
clean_output_dir(self.output_path, self.output_retention)
|
|
|
|
for p in generators:
|
|
if hasattr(p, 'generate_context'):
|
|
p.generate_context()
|
|
|
|
for p in generators:
|
|
if hasattr(p, 'refresh_metadata_intersite_links'):
|
|
p.refresh_metadata_intersite_links()
|
|
|
|
signals.all_generators_finalized.send(generators)
|
|
|
|
writer = self._get_writer()
|
|
|
|
for p in generators:
|
|
if hasattr(p, 'generate_output'):
|
|
p.generate_output(writer)
|
|
|
|
signals.finalized.send(self)
|
|
|
|
articles_generator = next(g for g in generators
|
|
if isinstance(g, ArticlesGenerator))
|
|
pages_generator = next(g for g in generators
|
|
if isinstance(g, PagesGenerator))
|
|
|
|
pluralized_articles = maybe_pluralize(
|
|
(len(articles_generator.articles) +
|
|
len(articles_generator.translations)),
|
|
'article',
|
|
'articles')
|
|
pluralized_drafts = maybe_pluralize(
|
|
(len(articles_generator.drafts) +
|
|
len(articles_generator.drafts_translations)),
|
|
'draft',
|
|
'drafts')
|
|
pluralized_hidden_articles = maybe_pluralize(
|
|
(len(articles_generator.hidden_articles) +
|
|
len(articles_generator.hidden_translations)),
|
|
'hidden article',
|
|
'hidden articles')
|
|
pluralized_pages = maybe_pluralize(
|
|
(len(pages_generator.pages) +
|
|
len(pages_generator.translations)),
|
|
'page',
|
|
'pages')
|
|
pluralized_hidden_pages = maybe_pluralize(
|
|
(len(pages_generator.hidden_pages) +
|
|
len(pages_generator.hidden_translations)),
|
|
'hidden page',
|
|
'hidden pages')
|
|
pluralized_draft_pages = maybe_pluralize(
|
|
(len(pages_generator.draft_pages) +
|
|
len(pages_generator.draft_translations)),
|
|
'draft page',
|
|
'draft pages')
|
|
|
|
console.print('Done: Processed {}, {}, {}, {}, {} and {} in {:.2f} seconds.'
|
|
.format(
|
|
pluralized_articles,
|
|
pluralized_drafts,
|
|
pluralized_hidden_articles,
|
|
pluralized_pages,
|
|
pluralized_hidden_pages,
|
|
pluralized_draft_pages,
|
|
time.time() - start_time))
|
|
|
|
def _get_generator_classes(self):
|
|
discovered_generators = [
|
|
(ArticlesGenerator, "internal"),
|
|
(PagesGenerator, "internal")
|
|
]
|
|
|
|
if self.settings["TEMPLATE_PAGES"]:
|
|
discovered_generators.append((TemplatePagesGenerator, "internal"))
|
|
|
|
if self.settings["OUTPUT_SOURCES"]:
|
|
discovered_generators.append((SourceFileGenerator, "internal"))
|
|
|
|
for receiver, values in signals.get_generators.send(self):
|
|
if not isinstance(values, Iterable):
|
|
values = (values,)
|
|
for generator in values:
|
|
if generator is None:
|
|
continue # plugin did not return a generator
|
|
discovered_generators.append((generator, receiver.__module__))
|
|
|
|
# StaticGenerator must run last, so it can identify files that
|
|
# were skipped by the other generators, and so static files can
|
|
# have their output paths overridden by the {attach} link syntax.
|
|
discovered_generators.append((StaticGenerator, "internal"))
|
|
|
|
generators = []
|
|
|
|
for generator, origin in discovered_generators:
|
|
if not isinstance(generator, type):
|
|
logger.error("Generator %s (%s) cannot be loaded", generator, origin)
|
|
continue
|
|
|
|
logger.debug("Found generator: %s (%s)", generator.__name__, origin)
|
|
generators.append(generator)
|
|
|
|
return generators
|
|
|
|
def _get_writer(self):
|
|
writers = [w for _, w in signals.get_writer.send(self) if isinstance(w, type)]
|
|
num_writers = len(writers)
|
|
|
|
if num_writers == 0:
|
|
return Writer(self.output_path, settings=self.settings)
|
|
|
|
if num_writers > 1:
|
|
logger.warning("%s writers found, using only first one", num_writers)
|
|
|
|
writer = writers[0]
|
|
|
|
logger.debug("Found writer: %s (%s)", writer.__name__, writer.__module__)
|
|
return writer(self.output_path, settings=self.settings)
|
|
|
|
|
|
class PrintSettings(argparse.Action):
|
|
def __call__(self, parser, namespace, values, option_string):
|
|
init_logging(name=__name__)
|
|
|
|
try:
|
|
instance, settings = get_instance(namespace)
|
|
except Exception as e:
|
|
logger.critical("%s: %s", e.__class__.__name__, e)
|
|
console.print_exception()
|
|
sys.exit(getattr(e, 'exitcode', 1))
|
|
|
|
if values:
|
|
# One or more arguments provided, so only print those settings
|
|
for setting in values:
|
|
if setting in settings:
|
|
# Only add newline between setting name and value if dict
|
|
if isinstance(settings[setting], (dict, tuple, list)):
|
|
setting_format = '\n{}:\n{}'
|
|
else:
|
|
setting_format = '\n{}: {}'
|
|
console.print(setting_format.format(
|
|
setting,
|
|
pprint.pformat(settings[setting])))
|
|
else:
|
|
console.print('\n{} is not a recognized setting.'.format(setting))
|
|
break
|
|
else:
|
|
# No argument was given to --print-settings, so print all settings
|
|
console.print(settings)
|
|
|
|
parser.exit()
|
|
|
|
|
|
class ParseOverrides(argparse.Action):
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
overrides = {}
|
|
for item in values:
|
|
try:
|
|
k, v = item.split("=", 1)
|
|
except ValueError:
|
|
raise ValueError(
|
|
'Extra settings must be specified as KEY=VALUE pairs '
|
|
f'but you specified {item}'
|
|
)
|
|
try:
|
|
overrides[k] = json.loads(v)
|
|
except json.decoder.JSONDecodeError:
|
|
raise ValueError(
|
|
f'Invalid JSON value: {v}. '
|
|
'Values specified via -e / --extra-settings flags '
|
|
'must be in JSON notation. '
|
|
'Use -e KEY=\'"string"\' to specify a string value; '
|
|
'-e KEY=null to specify None; '
|
|
'-e KEY=false (or true) to specify False (or True).'
|
|
)
|
|
setattr(namespace, self.dest, overrides)
|
|
|
|
|
|
def parse_arguments(argv=None):
|
|
parser = argparse.ArgumentParser(
|
|
description='A tool to generate a static blog, '
|
|
' with restructured text input files.',
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
|
)
|
|
|
|
parser.add_argument(dest='path', nargs='?',
|
|
help='Path where to find the content files.',
|
|
default=None)
|
|
|
|
parser.add_argument('-t', '--theme-path', dest='theme',
|
|
help='Path where to find the theme templates. If not '
|
|
'specified, it will use the default one included with '
|
|
'pelican.')
|
|
|
|
parser.add_argument('-o', '--output', dest='output',
|
|
help='Where to output the generated files. If not '
|
|
'specified, a directory will be created, named '
|
|
'"output" in the current path.')
|
|
|
|
parser.add_argument('-s', '--settings', dest='settings',
|
|
help='The settings of the application, this is '
|
|
'automatically set to {} if a file exists with this '
|
|
'name.'.format(DEFAULT_CONFIG_NAME))
|
|
|
|
parser.add_argument('-d', '--delete-output-directory',
|
|
dest='delete_outputdir', action='store_true',
|
|
default=None, help='Delete the output directory.')
|
|
|
|
parser.add_argument('-v', '--verbose', action='store_const',
|
|
const=logging.INFO, dest='verbosity',
|
|
help='Show all messages.')
|
|
|
|
parser.add_argument('-q', '--quiet', action='store_const',
|
|
const=logging.CRITICAL, dest='verbosity',
|
|
help='Show only critical errors.')
|
|
|
|
parser.add_argument('-D', '--debug', action='store_const',
|
|
const=logging.DEBUG, dest='verbosity',
|
|
help='Show all messages, including debug messages.')
|
|
|
|
parser.add_argument('--version', action='version', version=__version__,
|
|
help='Print the pelican version and exit.')
|
|
|
|
parser.add_argument('-r', '--autoreload', dest='autoreload',
|
|
action='store_true',
|
|
help='Relaunch pelican each time a modification occurs'
|
|
' on the content files.')
|
|
|
|
parser.add_argument('--print-settings', dest='print_settings', nargs='*',
|
|
action=PrintSettings, metavar='SETTING_NAME',
|
|
help='Print current configuration settings and exit. '
|
|
'Append one or more setting name arguments to see the '
|
|
'values for specific settings only.')
|
|
|
|
parser.add_argument('--relative-urls', dest='relative_paths',
|
|
action='store_true',
|
|
help='Use relative urls in output, '
|
|
'useful for site development')
|
|
|
|
parser.add_argument('--cache-path', dest='cache_path',
|
|
help=('Directory in which to store cache files. '
|
|
'If not specified, defaults to "cache".'))
|
|
|
|
parser.add_argument('--ignore-cache', action='store_true',
|
|
dest='ignore_cache', help='Ignore content cache '
|
|
'from previous runs by not loading cache files.')
|
|
|
|
parser.add_argument('-w', '--write-selected', type=str,
|
|
dest='selected_paths', default=None,
|
|
help='Comma separated list of selected paths to write')
|
|
|
|
parser.add_argument('--fatal', metavar='errors|warnings',
|
|
choices=('errors', 'warnings'), default='',
|
|
help=('Exit the program with non-zero status if any '
|
|
'errors/warnings encountered.'))
|
|
|
|
parser.add_argument('--logs-dedup-min-level', default='WARNING',
|
|
choices=('DEBUG', 'INFO', 'WARNING', 'ERROR'),
|
|
help=('Only enable log de-duplication for levels equal'
|
|
' to or above the specified value'))
|
|
|
|
parser.add_argument('-l', '--listen', dest='listen', action='store_true',
|
|
help='Serve content files via HTTP and port 8000.')
|
|
|
|
parser.add_argument('-p', '--port', dest='port', type=int,
|
|
help='Port to serve HTTP files at. (default: 8000)')
|
|
|
|
parser.add_argument('-b', '--bind', dest='bind',
|
|
help='IP to bind to when serving files via HTTP '
|
|
'(default: 127.0.0.1)')
|
|
|
|
parser.add_argument('-e', '--extra-settings', dest='overrides',
|
|
help='Specify one or more SETTING=VALUE pairs to '
|
|
'override settings. VALUE must be in JSON notation: '
|
|
'specify string values as SETTING=\'"some string"\'; '
|
|
'booleans as SETTING=true or SETTING=false; '
|
|
'None as SETTING=null.',
|
|
nargs='*',
|
|
action=ParseOverrides,
|
|
default={})
|
|
|
|
args = parser.parse_args(argv)
|
|
|
|
if args.port is not None and not args.listen:
|
|
logger.warning('--port without --listen has no effect')
|
|
if args.bind is not None and not args.listen:
|
|
logger.warning('--bind without --listen has no effect')
|
|
|
|
return args
|
|
|
|
|
|
def get_config(args):
|
|
"""Builds a config dictionary based on supplied `args`.
|
|
"""
|
|
config = {}
|
|
if args.path:
|
|
config['PATH'] = os.path.abspath(os.path.expanduser(args.path))
|
|
if args.output:
|
|
config['OUTPUT_PATH'] = \
|
|
os.path.abspath(os.path.expanduser(args.output))
|
|
if args.theme:
|
|
abstheme = os.path.abspath(os.path.expanduser(args.theme))
|
|
config['THEME'] = abstheme if os.path.exists(abstheme) else args.theme
|
|
if args.delete_outputdir is not None:
|
|
config['DELETE_OUTPUT_DIRECTORY'] = args.delete_outputdir
|
|
if args.ignore_cache:
|
|
config['LOAD_CONTENT_CACHE'] = False
|
|
if args.cache_path:
|
|
config['CACHE_PATH'] = args.cache_path
|
|
if args.selected_paths:
|
|
config['WRITE_SELECTED'] = args.selected_paths.split(',')
|
|
if args.relative_paths:
|
|
config['RELATIVE_URLS'] = args.relative_paths
|
|
if args.port is not None:
|
|
config['PORT'] = args.port
|
|
if args.bind is not None:
|
|
config['BIND'] = args.bind
|
|
config['DEBUG'] = args.verbosity == logging.DEBUG
|
|
config.update(args.overrides)
|
|
|
|
return config
|
|
|
|
|
|
def get_instance(args):
|
|
|
|
config_file = args.settings
|
|
if config_file is None and os.path.isfile(DEFAULT_CONFIG_NAME):
|
|
config_file = DEFAULT_CONFIG_NAME
|
|
args.settings = DEFAULT_CONFIG_NAME
|
|
|
|
settings = read_settings(config_file, override=get_config(args))
|
|
|
|
cls = settings['PELICAN_CLASS']
|
|
if isinstance(cls, str):
|
|
module, cls_name = cls.rsplit('.', 1)
|
|
module = __import__(module)
|
|
cls = getattr(module, cls_name)
|
|
|
|
return cls(settings), settings
|
|
|
|
|
|
def autoreload(args, excqueue=None):
|
|
console.print(' --- AutoReload Mode: Monitoring `content`, `theme` and'
|
|
' `settings` for changes. ---')
|
|
pelican, settings = get_instance(args)
|
|
watcher = FileSystemWatcher(args.settings, Readers, settings)
|
|
sleep = False
|
|
while True:
|
|
try:
|
|
# Don't sleep first time, but sleep afterwards to reduce cpu load
|
|
if sleep:
|
|
time.sleep(0.5)
|
|
else:
|
|
sleep = True
|
|
|
|
modified = watcher.check()
|
|
|
|
if modified['settings']:
|
|
pelican, settings = get_instance(args)
|
|
watcher.update_watchers(settings)
|
|
|
|
if any(modified.values()):
|
|
console.print('\n-> Modified: {}. re-generating...'.format(
|
|
', '.join(k for k, v in modified.items() if v)))
|
|
pelican.run()
|
|
|
|
except KeyboardInterrupt:
|
|
if excqueue is not None:
|
|
excqueue.put(None)
|
|
return
|
|
raise
|
|
|
|
except Exception as e:
|
|
if (args.verbosity == logging.DEBUG):
|
|
if excqueue is not None:
|
|
excqueue.put(
|
|
traceback.format_exception_only(type(e), e)[-1])
|
|
else:
|
|
raise
|
|
logger.warning(
|
|
'Caught exception:\n"%s".', e,
|
|
exc_info=settings.get('DEBUG', False))
|
|
|
|
|
|
def listen(server, port, output, excqueue=None):
|
|
# set logging level to at least "INFO" (so we can see the server requests)
|
|
if logger.level < logging.INFO:
|
|
logger.setLevel(logging.INFO)
|
|
|
|
RootedHTTPServer.allow_reuse_address = True
|
|
try:
|
|
httpd = RootedHTTPServer(
|
|
output, (server, port), ComplexHTTPRequestHandler)
|
|
except OSError as e:
|
|
logging.error("Could not listen on port %s, server %s.", port, server)
|
|
if excqueue is not None:
|
|
excqueue.put(traceback.format_exception_only(type(e), e)[-1])
|
|
return
|
|
|
|
try:
|
|
console.print("Serving site at: http://{}:{} - Tap CTRL-C to stop".format(
|
|
server, port))
|
|
httpd.serve_forever()
|
|
except Exception as e:
|
|
if excqueue is not None:
|
|
excqueue.put(traceback.format_exception_only(type(e), e)[-1])
|
|
return
|
|
|
|
except KeyboardInterrupt:
|
|
httpd.socket.close()
|
|
if excqueue is not None:
|
|
return
|
|
raise
|
|
|
|
|
|
def main(argv=None):
|
|
args = parse_arguments(argv)
|
|
logs_dedup_min_level = getattr(logging, args.logs_dedup_min_level)
|
|
init_logging(level=args.verbosity, fatal=args.fatal,
|
|
name=__name__, logs_dedup_min_level=logs_dedup_min_level)
|
|
|
|
logger.debug('Pelican version: %s', __version__)
|
|
logger.debug('Python version: %s', sys.version.split()[0])
|
|
|
|
try:
|
|
pelican, settings = get_instance(args)
|
|
|
|
if args.autoreload and args.listen:
|
|
excqueue = multiprocessing.Queue()
|
|
p1 = multiprocessing.Process(
|
|
target=autoreload,
|
|
args=(args, excqueue))
|
|
p2 = multiprocessing.Process(
|
|
target=listen,
|
|
args=(settings.get('BIND'), settings.get('PORT'),
|
|
settings.get("OUTPUT_PATH"), excqueue))
|
|
p1.start()
|
|
p2.start()
|
|
exc = excqueue.get()
|
|
p1.terminate()
|
|
p2.terminate()
|
|
if exc is not None:
|
|
logger.critical(exc)
|
|
elif args.autoreload:
|
|
autoreload(args)
|
|
elif args.listen:
|
|
listen(settings.get('BIND'), settings.get('PORT'),
|
|
settings.get("OUTPUT_PATH"))
|
|
else:
|
|
watcher = FileSystemWatcher(args.settings, Readers, settings)
|
|
watcher.check()
|
|
with console.status("Generating..."):
|
|
pelican.run()
|
|
except KeyboardInterrupt:
|
|
logger.warning('Keyboard interrupt received. Exiting.')
|
|
except Exception as e:
|
|
logger.critical("%s: %s", e.__class__.__name__, e)
|
|
|
|
if args.verbosity == logging.DEBUG:
|
|
console.print_exception()
|
|
sys.exit(getattr(e, 'exitcode', 1))
|