Merge pull request #1982 from adeverteuil/feature_check_static_modified

Add static file options: hard/symlink & only-when-modified
This commit is contained in:
Justin Mayer 2017-06-22 12:49:26 -07:00 committed by GitHub
commit d4435ea874
4 changed files with 261 additions and 26 deletions

View file

@ -2,6 +2,7 @@
from __future__ import print_function, unicode_literals
import calendar
import errno
import fnmatch
import logging
import os
@ -20,9 +21,8 @@ from pelican import signals
from pelican.cache import FileStampDataCacher
from pelican.contents import Article, Draft, Page, Static, is_valid_content
from pelican.readers import Readers
from pelican.utils import (DateFormatter, copy, copy_file_metadata, mkdir_p,
posixize_path, process_translations,
python_2_unicode_compatible)
from pelican.utils import (DateFormatter, copy, mkdir_p, posixize_path,
process_translations, python_2_unicode_compatible)
logger = logging.getLogger(__name__)
@ -682,21 +682,9 @@ class StaticGenerator(Generator):
def __init__(self, *args, **kwargs):
super(StaticGenerator, self).__init__(*args, **kwargs)
self.fallback_to_symlinks = False
signals.static_generator_init.send(self)
def _copy_paths(self, paths, source, destination, output_path,
final_path=None):
"""Copy all the paths from source to destination"""
for path in paths:
if final_path:
copy(os.path.join(source, path),
os.path.join(output_path, destination, final_path),
self.settings['IGNORE_FILES'])
else:
copy(os.path.join(source, path),
os.path.join(output_path, destination, path),
self.settings['IGNORE_FILES'])
def generate_context(self):
self.staticfiles = []
for f in self.get_files(self.settings['STATIC_PATHS'],
@ -724,13 +712,88 @@ class StaticGenerator(Generator):
self._copy_paths(self.settings['THEME_STATIC_PATHS'], self.theme,
self.settings['THEME_STATIC_DIR'], self.output_path,
os.curdir)
# copy all Static files
for sc in self.context['staticfiles']:
source_path = os.path.join(self.path, sc.source_path)
save_as = os.path.join(self.output_path, sc.save_as)
mkdir_p(os.path.dirname(save_as))
logger.info('Copying %s to %s', sc.source_path, sc.save_as)
copy_file_metadata(source_path, save_as)
if self._file_update_required(sc):
self._link_or_copy_staticfile(sc)
else:
logger.debug('%s is up to date, not copying', sc.source_path)
def _copy_paths(self, paths, source, destination, output_path,
final_path=None):
"""Copy all the paths from source to destination"""
for path in paths:
if final_path:
copy(os.path.join(source, path),
os.path.join(output_path, destination, final_path),
self.settings['IGNORE_FILES'])
else:
copy(os.path.join(source, path),
os.path.join(output_path, destination, path),
self.settings['IGNORE_FILES'])
def _file_update_required(self, staticfile):
source_path = os.path.join(self.path, staticfile.source_path)
save_as = os.path.join(self.output_path, staticfile.save_as)
if not os.path.exists(save_as):
return True
elif (self.settings['STATIC_CREATE_LINKS'] and
os.path.samefile(source_path, save_as)):
return False
elif (self.settings['STATIC_CREATE_LINKS'] and
os.path.realpath(save_as) == source_path):
return False
elif not self.settings['STATIC_CHECK_IF_MODIFIED']:
return True
else:
return self._source_is_newer(staticfile)
def _source_is_newer(self, staticfile):
source_path = os.path.join(self.path, staticfile.source_path)
save_as = os.path.join(self.output_path, staticfile.save_as)
s_mtime = os.path.getmtime(source_path)
d_mtime = os.path.getmtime(save_as)
return s_mtime > d_mtime
def _link_or_copy_staticfile(self, sc):
if self.settings['STATIC_CREATE_LINKS']:
self._link_staticfile(sc)
else:
self._copy_staticfile(sc)
def _copy_staticfile(self, sc):
source_path = os.path.join(self.path, sc.source_path)
save_as = os.path.join(self.output_path, sc.save_as)
self._mkdir(os.path.dirname(save_as))
copy(source_path, save_as)
logger.info('Copying %s to %s', sc.source_path, sc.save_as)
def _link_staticfile(self, sc):
source_path = os.path.join(self.path, sc.source_path)
save_as = os.path.join(self.output_path, sc.save_as)
self._mkdir(os.path.dirname(save_as))
try:
if os.path.lexists(save_as):
os.unlink(save_as)
logger.info('Linking %s and %s', sc.source_path, sc.save_as)
if self.fallback_to_symlinks:
os.symlink(source_path, save_as)
else:
os.link(source_path, save_as)
except OSError as err:
if err.errno == errno.EXDEV: # 18: Invalid cross-device link
logger.debug(
"Cross-device links not valid. "
"Creating symbolic links instead."
)
self.fallback_to_symlinks = True
self._link_staticfile(sc)
else:
raise err
def _mkdir(self, path):
if os.path.lexists(path) and not os.path.isdir(path):
os.unlink(path)
mkdir_p(path)
class SourceFileGenerator(Generator):

View file

@ -82,6 +82,8 @@ DEFAULT_CONFIG = {
'PAGE_LANG_SAVE_AS': posix_join('pages', '{slug}-{lang}.html'),
'STATIC_URL': '{path}',
'STATIC_SAVE_AS': '{path}',
'STATIC_CREATE_LINKS': False,
'STATIC_CHECK_IF_MODIFIED': False,
'CATEGORY_URL': 'category/{slug}.html',
'CATEGORY_SAVE_AS': posix_join('category', '{slug}.html'),
'TAG_URL': 'tag/{slug}.html',

View file

@ -5,7 +5,7 @@ import locale
import os
from codecs import open
from shutil import rmtree
from shutil import copy, rmtree
from tempfile import mkdtemp
from pelican.generators import (ArticlesGenerator, Generator, PagesGenerator,
@ -674,6 +674,30 @@ class TestStaticGenerator(unittest.TestCase):
def setUp(self):
self.content_path = os.path.join(CUR_DIR, 'mixed_content')
self.temp_content = mkdtemp(prefix='testcontent.')
self.temp_output = mkdtemp(prefix='testoutput.')
self.settings = get_settings()
self.settings['PATH'] = self.temp_content
self.settings['STATIC_PATHS'] = ["static"]
self.settings['OUTPUT_PATH'] = self.temp_output
os.mkdir(os.path.join(self.temp_content, "static"))
self.startfile = os.path.join(self.temp_content,
"static", "staticfile")
self.endfile = os.path.join(self.temp_output, "static", "staticfile")
self.generator = StaticGenerator(
context={'filenames': {}},
settings=self.settings,
path=self.temp_content,
theme="",
output_path=self.temp_output,
)
def tearDown(self):
rmtree(self.temp_content)
rmtree(self.temp_output)
def set_ancient_mtime(self, path, timestamp=1):
os.utime(path, (timestamp, timestamp))
def test_static_excludes(self):
"""Test that StaticGenerator respects STATIC_EXCLUDES.
@ -687,7 +711,7 @@ class TestStaticGenerator(unittest.TestCase):
StaticGenerator(
context=context, settings=settings,
path=settings['PATH'], output_path=None,
path=settings['PATH'], output_path=self.temp_output,
theme=settings['THEME']).generate_context()
staticnames = [os.path.basename(c.source_path)
@ -716,7 +740,7 @@ class TestStaticGenerator(unittest.TestCase):
for generator_class in (PagesGenerator, StaticGenerator):
generator_class(
context=context, settings=settings,
path=settings['PATH'], output_path=None,
path=settings['PATH'], output_path=self.temp_output,
theme=settings['THEME']).generate_context()
staticnames = [os.path.basename(c.source_path)
@ -733,7 +757,7 @@ class TestStaticGenerator(unittest.TestCase):
for generator_class in (PagesGenerator, StaticGenerator):
generator_class(
context=context, settings=settings,
path=settings['PATH'], output_path=None,
path=settings['PATH'], output_path=self.temp_output,
theme=settings['THEME']).generate_context()
staticnames = [os.path.basename(c.source_path)
@ -742,3 +766,135 @@ class TestStaticGenerator(unittest.TestCase):
self.assertTrue(
any(name.endswith(".md") for name in staticnames),
"STATIC_EXCLUDE_SOURCES=False failed to include a markdown file")
def test_copy_one_file(self):
with open(self.startfile, "w") as f:
f.write("staticcontent")
self.generator.generate_context()
self.generator.generate_output(None)
with open(self.endfile, "r") as f:
self.assertEqual(f.read(), "staticcontent")
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_file_update_required_when_dest_does_not_exist(self):
staticfile = MagicMock()
staticfile.source_path = self.startfile
staticfile.save_as = self.endfile
with open(staticfile.source_path, "w") as f:
f.write("a")
update_required = self.generator._file_update_required(staticfile)
self.assertTrue(update_required)
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_dest_and_source_mtimes_are_equal(self):
staticfile = MagicMock()
staticfile.source_path = self.startfile
staticfile.save_as = self.endfile
self.settings['STATIC_CHECK_IF_MODIFIED'] = True
with open(staticfile.source_path, "w") as f:
f.write("a")
os.mkdir(os.path.join(self.temp_output, "static"))
copy(staticfile.source_path, staticfile.save_as)
isnewer = self.generator._source_is_newer(staticfile)
self.assertFalse(isnewer)
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_source_is_newer(self):
staticfile = MagicMock()
staticfile.source_path = self.startfile
staticfile.save_as = self.endfile
with open(staticfile.source_path, "w") as f:
f.write("a")
os.mkdir(os.path.join(self.temp_output, "static"))
copy(staticfile.source_path, staticfile.save_as)
self.set_ancient_mtime(staticfile.save_as)
isnewer = self.generator._source_is_newer(staticfile)
self.assertTrue(isnewer)
def test_skip_file_when_source_is_not_newer(self):
self.settings['STATIC_CHECK_IF_MODIFIED'] = True
with open(self.startfile, "w") as f:
f.write("staticcontent")
os.mkdir(os.path.join(self.temp_output, "static"))
with open(self.endfile, "w") as f:
f.write("staticcontent")
expected = os.path.getmtime(self.endfile)
self.set_ancient_mtime(self.startfile)
self.generator.generate_context()
self.generator.generate_output(None)
self.assertEqual(os.path.getmtime(self.endfile), expected)
def test_dont_link_by_default(self):
with open(self.startfile, "w") as f:
f.write("staticcontent")
self.generator.generate_context()
self.generator.generate_output(None)
self.assertFalse(os.path.samefile(self.startfile, self.endfile))
def test_output_file_is_linked_to_source(self):
self.settings['STATIC_CREATE_LINKS'] = True
with open(self.startfile, "w") as f:
f.write("staticcontent")
self.generator.generate_context()
self.generator.generate_output(None)
self.assertTrue(os.path.samefile(self.startfile, self.endfile))
def test_output_file_exists_and_is_newer(self):
self.settings['STATIC_CREATE_LINKS'] = True
with open(self.startfile, "w") as f:
f.write("staticcontent")
os.mkdir(os.path.join(self.temp_output, "static"))
with open(self.endfile, "w") as f:
f.write("othercontent")
self.generator.generate_context()
self.generator.generate_output(None)
self.assertTrue(os.path.samefile(self.startfile, self.endfile))
def test_can_symlink_when_hardlink_not_possible(self):
self.settings['STATIC_CREATE_LINKS'] = True
with open(self.startfile, "w") as f:
f.write("staticcontent")
os.mkdir(os.path.join(self.temp_output, "static"))
self.generator.fallback_to_symlinks = True
self.generator.generate_context()
self.generator.generate_output(None)
self.assertTrue(os.path.islink(self.endfile))
def test_existing_symlink_is_considered_up_to_date(self):
self.settings['STATIC_CREATE_LINKS'] = True
with open(self.startfile, "w") as f:
f.write("staticcontent")
os.mkdir(os.path.join(self.temp_output, "static"))
os.symlink(self.startfile, self.endfile)
staticfile = MagicMock()
staticfile.source_path = self.startfile
staticfile.save_as = self.endfile
requires_update = self.generator._file_update_required(staticfile)
self.assertFalse(requires_update)
def test_invalid_symlink_is_overwritten(self):
self.settings['STATIC_CREATE_LINKS'] = True
with open(self.startfile, "w") as f:
f.write("staticcontent")
os.mkdir(os.path.join(self.temp_output, "static"))
os.symlink("invalid", self.endfile)
staticfile = MagicMock()
staticfile.source_path = self.startfile
staticfile.save_as = self.endfile
requires_update = self.generator._file_update_required(staticfile)
self.assertTrue(requires_update)
self.generator.fallback_to_symlinks = True
self.generator.generate_context()
self.generator.generate_output(None)
self.assertEqual(os.path.realpath(self.endfile), self.startfile)
def test_delete_existing_file_before_mkdir(self):
with open(self.startfile, "w") as f:
f.write("staticcontent")
with open(os.path.join(self.temp_output, "static"), "w") as f:
f.write("This file should be a directory")
self.generator.generate_context()
self.generator.generate_output(None)
self.assertTrue(
os.path.isdir(os.path.join(self.temp_output, "static")))
self.assertTrue(os.path.isfile(self.endfile))