fix caching

*  break out cache into cache.py
*  break out cache-tests into test_cache.py
*  fix broken cache tests
   *  replace non existing assert calls with self.assertEqual
   *  fix path for page caching test (was invalid)
   *  cleanup test code
*  restructure generate_context in Article and Path Generator
   * destinguish between valid/invalid files correctly and cache accordingly
*  use cPickle if available for increased performance
This commit is contained in:
derwinlu 2015-06-05 12:11:53 +02:00
commit b7e6390f04
6 changed files with 364 additions and 299 deletions

140
pelican/cache.py Normal file
View file

@ -0,0 +1,140 @@
from __future__ import unicode_literals
import hashlib
import logging
import os
try:
import cPickle as pickle
except:
import pickle
from pelican.utils import mkdir_p
logger = logging.getLogger(__name__)
class FileDataCacher(object):
"""Class that can cache data contained in files"""
def __init__(self, settings, cache_name, caching_policy, load_policy):
"""Load the specified cache within CACHE_PATH in settings
only if *load_policy* is True,
May use gzip if GZIP_CACHE ins settings is True.
Sets caching policy according to *caching_policy*.
"""
self.settings = settings
self._cache_path = os.path.join(self.settings['CACHE_PATH'],
cache_name)
self._cache_data_policy = caching_policy
if self.settings['GZIP_CACHE']:
import gzip
self._cache_open = gzip.open
else:
self._cache_open = open
if load_policy:
try:
with self._cache_open(self._cache_path, 'rb') as fhandle:
self._cache = pickle.load(fhandle)
except (IOError, OSError) as err:
logger.debug('Cannot load cache %s (this is normal on first '
'run). Proceeding with empty cache.\n%s',
self._cache_path, err)
self._cache = {}
except pickle.PickleError as err:
logger.warning('Cannot unpickle cache %s, cache may be using '
'an incompatible protocol (see pelican '
'caching docs). '
'Proceeding with empty cache.\n%s',
self._cache_path, err)
self._cache = {}
else:
self._cache = {}
def cache_data(self, filename, data):
"""Cache data for given file"""
if self._cache_data_policy:
self._cache[filename] = data
def get_cached_data(self, filename, default=None):
"""Get cached data for the given file
if no data is cached, return the default object
"""
return self._cache.get(filename, default)
def save_cache(self):
"""Save the updated cache"""
if self._cache_data_policy:
try:
mkdir_p(self.settings['CACHE_PATH'])
with self._cache_open(self._cache_path, 'wb') as fhandle:
pickle.dump(self._cache, fhandle)
except (IOError, OSError, pickle.PicklingError) as err:
logger.warning('Could not save cache %s\n ... %s',
self._cache_path, err)
class FileStampDataCacher(FileDataCacher):
"""Subclass that also caches the stamp of the file"""
def __init__(self, settings, cache_name, caching_policy, load_policy):
"""This sublcass additionally sets filestamp function
and base path for filestamping operations
"""
super(FileStampDataCacher, self).__init__(settings, cache_name,
caching_policy,
load_policy)
method = self.settings['CHECK_MODIFIED_METHOD']
if method == 'mtime':
self._filestamp_func = os.path.getmtime
else:
try:
hash_func = getattr(hashlib, method)
def filestamp_func(filename):
"""return hash of file contents"""
with open(filename, 'rb') as fhandle:
return hash_func(fhandle.read()).digest()
self._filestamp_func = filestamp_func
except AttributeError as err:
logger.warning('Could not get hashing function\n\t%s', err)
self._filestamp_func = None
def cache_data(self, filename, data):
"""Cache stamp and data for the given file"""
stamp = self._get_file_stamp(filename)
super(FileStampDataCacher, self).cache_data(filename, (stamp, data))
def _get_file_stamp(self, filename):
"""Check if the given file has been modified
since the previous build.
depending on CHECK_MODIFIED_METHOD
a float may be returned for 'mtime',
a hash for a function name in the hashlib module
or an empty bytes string otherwise
"""
try:
return self._filestamp_func(filename)
except (IOError, OSError, TypeError) as err:
logger.warning('Cannot get modification stamp for %s\n\t%s',
filename, err)
return ''
def get_cached_data(self, filename, default=None):
"""Get the cached data for the given filename
if the file has not been modified.
If no record exists or file has been modified, return default.
Modification is checked by comparing the cached
and current file stamp.
"""
stamp, data = super(FileStampDataCacher, self).get_cached_data(
filename, (None, default))
if stamp != self._get_file_stamp(filename):
return default
return data

View file

@ -17,11 +17,11 @@ from operator import attrgetter
from jinja2 import (Environment, FileSystemLoader, PrefixLoader, ChoiceLoader,
BaseLoader, TemplateNotFound)
from pelican.cache import FileStampDataCacher
from pelican.contents import Article, Draft, Page, Static, is_valid_content
from pelican.readers import Readers
from pelican.utils import (copy, process_translations, mkdir_p, DateFormatter,
FileStampDataCacher, python_2_unicode_compatible,
posixize_path)
python_2_unicode_compatible, posixize_path)
from pelican import signals
@ -493,10 +493,11 @@ class ArticlesGenerator(CachingGenerator):
for f in self.get_files(
self.settings['ARTICLE_PATHS'],
exclude=self.settings['ARTICLE_EXCLUDES']):
article = self.get_cached_data(f, None)
if article is None:
article_or_draft = self.get_cached_data(f, None)
if article_or_draft is None:
#TODO needs overhaul, maybe nomad for read_file solution, unified behaviour
try:
article = self.readers.read_file(
article_or_draft = self.readers.read_file(
base_path=self.path, path=f, content_class=Article,
context=self.context,
preread_signal=signals.article_generator_preread,
@ -509,29 +510,32 @@ class ArticlesGenerator(CachingGenerator):
self._add_failed_source_path(f)
continue
if not is_valid_content(article, f):
if not is_valid_content(article_or_draft, f):
self._add_failed_source_path(f)
continue
self.cache_data(f, article)
if article_or_draft.status.lower() == "published":
all_articles.append(article_or_draft)
elif article_or_draft.status.lower() == "draft":
article_or_draft = self.readers.read_file(
base_path=self.path, path=f, content_class=Draft,
context=self.context,
preread_signal=signals.article_generator_preread,
preread_sender=self,
context_signal=signals.article_generator_context,
context_sender=self)
self.add_source_path(article_or_draft)
all_drafts.append(article_or_draft)
else:
logger.error("Unknown status '%s' for file %s, skipping it.",
article_or_draft.status, f)
self._add_failed_source_path(f)
continue
self.add_source_path(article)
self.cache_data(f, article_or_draft)
self.add_source_path(article_or_draft)
if article.status.lower() == "published":
all_articles.append(article)
elif article.status.lower() == "draft":
draft = self.readers.read_file(
base_path=self.path, path=f, content_class=Draft,
context=self.context,
preread_signal=signals.article_generator_preread,
preread_sender=self,
context_signal=signals.article_generator_context,
context_sender=self)
self.add_source_path(draft)
all_drafts.append(draft)
else:
logger.error("Unknown status '%s' for file %s, skipping it.",
article.status, f)
self.articles, self.translations = process_translations(all_articles,
order_by=self.settings['ARTICLE_ORDER_BY'])
@ -613,18 +617,20 @@ class PagesGenerator(CachingGenerator):
self._add_failed_source_path(f)
continue
if page.status.lower() == "published":
all_pages.append(page)
elif page.status.lower() == "hidden":
hidden_pages.append(page)
else:
logger.error("Unknown status '%s' for file %s, skipping it.",
page.status, f)
self._add_failed_source_path(f)
continue
self.cache_data(f, page)
self.add_source_path(page)
if page.status.lower() == "published":
all_pages.append(page)
elif page.status.lower() == "hidden":
hidden_pages.append(page)
else:
logger.error("Unknown status '%s' for file %s, skipping it.",
page.status, f)
self.pages, self.translations = process_translations(all_pages,
order_by=self.settings['PAGE_ORDER_BY'])
self.hidden_pages, self.hidden_translations = (

View file

@ -24,8 +24,9 @@ except ImportError:
from six.moves.html_parser import HTMLParser
from pelican import signals
from pelican.cache import FileStampDataCacher
from pelican.contents import Page, Category, Tag, Author
from pelican.utils import get_date, pelican_open, FileStampDataCacher, SafeDatetime, posixize_path
from pelican.utils import get_date, pelican_open, SafeDatetime, posixize_path
def strip_split(text, sep=','):

183
pelican/tests/test_cache.py Normal file
View file

@ -0,0 +1,183 @@
from __future__ import unicode_literals
import os
from codecs import open
try:
from unittest.mock import MagicMock
except ImportError:
try:
from mock import MagicMock
except ImportError:
MagicMock = False
from shutil import rmtree
from tempfile import mkdtemp
from pelican.generators import ArticlesGenerator, PagesGenerator
from pelican.tests.support import unittest, get_settings
CUR_DIR = os.path.dirname(__file__)
CONTENT_DIR = os.path.join(CUR_DIR, 'content')
class TestCache(unittest.TestCase):
def setUp(self):
self.temp_cache = mkdtemp(prefix='pelican_cache.')
def tearDown(self):
rmtree(self.temp_cache)
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_article_object_caching(self):
"""Test Article objects caching at the generator level"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['CONTENT_CACHING_LAYER'] = 'generator'
settings['DEFAULT_DATE'] = (1970, 1, 1)
settings['READERS'] = {'asc': None}
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.generate_context()
self.assertTrue(hasattr(generator, '_cache'))
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
"""
3 Files don't get cached because they were not valid
- article_with_comments.html
- article_with_null_attributes.html
- 2012-11-30_md_w_filename_meta#foo-bar.md
"""
self.assertEqual(generator.readers.read_file.call_count, 3)
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_article_reader_content_caching(self):
"""Test raw article content caching at the reader level"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['READERS'] = {'asc': None}
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.generate_context()
self.assertTrue(hasattr(generator.readers, '_cache'))
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
readers = generator.readers.readers
for reader in readers.values():
reader.read = MagicMock()
generator.generate_context()
for reader in readers.values():
self.assertEqual(reader.read.call_count, 0)
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_article_ignore_cache(self):
"""Test that all the articles are read again when not loading cache
used in --ignore-cache or autoreload mode"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['READERS'] = {'asc': None}
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
self.assertTrue(hasattr(generator, '_cache_open'))
orig_call_count = generator.readers.read_file.call_count
settings['LOAD_CONTENT_CACHE'] = False
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
self.assertEqual(generator.readers.read_file.call_count, orig_call_count)
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_page_object_caching(self):
"""Test Page objects caching at the generator level"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['PAGE_PATHS'] = ['TestPages']
settings['CONTENT_CACHING_LAYER'] = 'generator'
settings['READERS'] = {'asc': None}
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CUR_DIR, theme=settings['THEME'], output_path=None)
generator.generate_context()
self.assertTrue(hasattr(generator, '_cache'))
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CUR_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
"""
1 File doesn't get cached because it was not valid
- bad_page.rst
"""
self.assertEqual(generator.readers.read_file.call_count, 1)
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_page_reader_content_caching(self):
"""Test raw page content caching at the reader level"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['PAGE_PATHS'] = ['TestPages']
settings['READERS'] = {'asc': None}
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CUR_DIR, theme=settings['THEME'], output_path=None)
generator.generate_context()
self.assertTrue(hasattr(generator.readers, '_cache'))
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CUR_DIR, theme=settings['THEME'], output_path=None)
readers = generator.readers.readers
for reader in readers.values():
reader.read = MagicMock()
generator.generate_context()
for reader in readers.values():
self.assertEqual(reader.read.call_count, 0)
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_page_ignore_cache(self):
"""Test that all the pages are read again when not loading cache
used in --ignore_cache or autoreload mode"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['PAGE_PATHS'] = ['TestPages']
settings['READERS'] = {'asc': None}
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CUR_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
self.assertTrue(hasattr(generator, '_cache_open'))
orig_call_count = generator.readers.read_file.call_count
settings['LOAD_CONTENT_CACHE'] = False
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CUR_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
self.assertEqual(generator.readers.read_file.call_count, orig_call_count)

View file

@ -135,7 +135,6 @@ class TestArticlesGenerator(unittest.TestCase):
self.assertFalse(writer.write_feed.called)
def test_generate_context(self):
articles_expected = [
['Article title', 'published', 'Default', 'article'],
['Article with markdown and summary metadata multi', 'published',
@ -174,7 +173,6 @@ class TestArticlesGenerator(unittest.TestCase):
self.assertEqual(sorted(articles_expected), sorted(self.articles))
def test_generate_categories(self):
# test for name
# categories are grouped by slug; if two categories have the same slug
# but different names they will be grouped together, the first one in
@ -192,7 +190,6 @@ class TestArticlesGenerator(unittest.TestCase):
self.assertEqual(sorted(categories), sorted(categories_expected))
def test_do_not_use_folder_as_category(self):
settings = get_settings(filenames={})
settings['DEFAULT_CATEGORY'] = 'Default'
settings['DEFAULT_DATE'] = (1970, 1, 1)
@ -355,75 +352,6 @@ class TestArticlesGenerator(unittest.TestCase):
authors_expected = ['alexis-metaireau', 'first-author', 'second-author']
self.assertEqual(sorted(authors), sorted(authors_expected))
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_article_object_caching(self):
"""Test Article objects caching at the generator level"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['CONTENT_CACHING_LAYER'] = 'generator'
settings['READERS'] = {'asc': None}
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.generate_context()
self.assertTrue(hasattr(generator, '_cache'))
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
generator.readers.read_file.assert_called_count == 0
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_reader_content_caching(self):
"""Test raw content caching at the reader level"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['READERS'] = {'asc': None}
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.generate_context()
self.assertTrue(hasattr(generator.readers, '_cache'))
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
readers = generator.readers.readers
for reader in readers.values():
reader.read = MagicMock()
generator.generate_context()
for reader in readers.values():
reader.read.assert_called_count == 0
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_ignore_cache(self):
"""Test that all the articles are read again when not loading cache
used in --ignore-cache or autoreload mode"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['READERS'] = {'asc': None}
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
self.assertTrue(hasattr(generator, '_cache_open'))
orig_call_count = generator.readers.read_file.call_count
settings['LOAD_CONTENT_CACHE'] = False
generator = ArticlesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
generator.readers.read_file.assert_called_count == orig_call_count
def test_standard_metadata_in_default_metadata(self):
settings = get_settings(filenames={})
settings['CACHE_CONTENT'] = False
@ -503,75 +431,6 @@ class TestPageGenerator(unittest.TestCase):
self.assertEqual(sorted(pages_expected), sorted(pages))
self.assertEqual(sorted(hidden_pages_expected), sorted(hidden_pages))
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_page_object_caching(self):
"""Test Page objects caching at the generator level"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['CONTENT_CACHING_LAYER'] = 'generator'
settings['READERS'] = {'asc': None}
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.generate_context()
self.assertTrue(hasattr(generator, '_cache'))
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
generator.readers.read_file.assert_called_count == 0
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_reader_content_caching(self):
"""Test raw content caching at the reader level"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['READERS'] = {'asc': None}
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.generate_context()
self.assertTrue(hasattr(generator.readers, '_cache'))
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
readers = generator.readers.readers
for reader in readers.values():
reader.read = MagicMock()
generator.generate_context()
for reader in readers.values():
reader.read.assert_called_count == 0
@unittest.skipUnless(MagicMock, 'Needs Mock module')
def test_ignore_cache(self):
"""Test that all the pages are read again when not loading cache
used in --ignore_cache or autoreload mode"""
settings = get_settings(filenames={})
settings['CACHE_PATH'] = self.temp_cache
settings['READERS'] = {'asc': None}
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
self.assertTrue(hasattr(generator, '_cache_open'))
orig_call_count = generator.readers.read_file.call_count
settings['LOAD_CONTENT_CACHE'] = False
generator = PagesGenerator(
context=settings.copy(), settings=settings,
path=CONTENT_DIR, theme=settings['THEME'], output_path=None)
generator.readers.read_file = MagicMock()
generator.generate_context()
generator.readers.read_file.assert_called_count == orig_call_count
def test_generate_sorted(self):
settings = get_settings(filenames={})
settings['PAGE_PATHS'] = ['TestPages'] # relative to CUR_DIR

View file

@ -14,7 +14,6 @@ import shutil
import sys
import traceback
import pickle
import hashlib
import datetime
from collections import Hashable
@ -627,129 +626,6 @@ def split_all(path):
return components
class FileDataCacher(object):
'''Class that can cache data contained in files'''
def __init__(self, settings, cache_name, caching_policy, load_policy):
'''Load the specified cache within CACHE_PATH in settings
only if *load_policy* is True,
May use gzip if GZIP_CACHE ins settings is True.
Sets caching policy according to *caching_policy*.
'''
self.settings = settings
self._cache_path = os.path.join(self.settings['CACHE_PATH'],
cache_name)
self._cache_data_policy = caching_policy
if self.settings['GZIP_CACHE']:
import gzip
self._cache_open = gzip.open
else:
self._cache_open = open
if load_policy:
try:
with self._cache_open(self._cache_path, 'rb') as fhandle:
self._cache = pickle.load(fhandle)
except (IOError, OSError) as err:
logger.debug('Cannot load cache %s (this is normal on first '
'run). Proceeding with empty cache.\n%s',
self._cache_path, err)
self._cache = {}
except Exception as err:
logger.warning(('Cannot unpickle cache %s, cache may be using '
'an incompatible protocol (see pelican caching docs). '
'Proceeding with empty cache.\n%s'),
self._cache_path, err)
self._cache = {}
else:
self._cache = {}
def cache_data(self, filename, data):
'''Cache data for given file'''
if self._cache_data_policy:
self._cache[filename] = data
def get_cached_data(self, filename, default=None):
'''Get cached data for the given file
if no data is cached, return the default object
'''
return self._cache.get(filename, default)
def save_cache(self):
'''Save the updated cache'''
if self._cache_data_policy:
try:
mkdir_p(self.settings['CACHE_PATH'])
with self._cache_open(self._cache_path, 'wb') as fhandle:
pickle.dump(self._cache, fhandle)
except (IOError, OSError, pickle.PicklingError) as err:
logger.warning('Could not save cache %s\n ... %s',
self._cache_path, err)
class FileStampDataCacher(FileDataCacher):
'''Subclass that also caches the stamp of the file'''
def __init__(self, settings, cache_name, caching_policy, load_policy):
'''This sublcass additionally sets filestamp function
and base path for filestamping operations
'''
super(FileStampDataCacher, self).__init__(settings, cache_name,
caching_policy,
load_policy)
method = self.settings['CHECK_MODIFIED_METHOD']
if method == 'mtime':
self._filestamp_func = os.path.getmtime
else:
try:
hash_func = getattr(hashlib, method)
def filestamp_func(filename):
'''return hash of file contents'''
with open(filename, 'rb') as fhandle:
return hash_func(fhandle.read()).digest()
self._filestamp_func = filestamp_func
except AttributeError as err:
logger.warning('Could not get hashing function\n\t%s', err)
self._filestamp_func = None
def cache_data(self, filename, data):
'''Cache stamp and data for the given file'''
stamp = self._get_file_stamp(filename)
super(FileStampDataCacher, self).cache_data(filename, (stamp, data))
def _get_file_stamp(self, filename):
'''Check if the given file has been modified
since the previous build.
depending on CHECK_MODIFIED_METHOD
a float may be returned for 'mtime',
a hash for a function name in the hashlib module
or an empty bytes string otherwise
'''
try:
return self._filestamp_func(filename)
except (IOError, OSError, TypeError) as err:
logger.warning('Cannot get modification stamp for %s\n\t%s',
filename, err)
return b''
def get_cached_data(self, filename, default=None):
'''Get the cached data for the given filename
if the file has not been modified.
If no record exists or file has been modified, return default.
Modification is checked by comparing the cached
and current file stamp.
'''
stamp, data = super(FileStampDataCacher, self).get_cached_data(
filename, (None, default))
if stamp != self._get_file_stamp(filename):
return default
return data
def is_selected_for_writing(settings, path):
'''Check whether path is selected for writing
according to the WRITE_SELECTED list