1
0
Fork 0
forked from github/pelican

Use watchfiles as a file watching backend

This doesn't use polling unless absolutely necessarily, making it more efficient. It also reduces the amount of first-party code required, and simplifies working out which files are being watched.
This commit is contained in:
Jake Howard 2023-06-21 22:01:38 +01:00 committed by Deniz Turgut
commit 61ca47c519
No known key found for this signature in database
GPG key ID: 87B7168D7AB3ED2F
4 changed files with 32 additions and 260 deletions

View file

@ -24,6 +24,8 @@ except ModuleNotFoundError:
from backports.zoneinfo import ZoneInfo
from markupsafe import Markup
import watchfiles
logger = logging.getLogger(__name__)
@ -755,167 +757,31 @@ def order_content(content_list, order_by='slug'):
return content_list
class FileSystemWatcher:
def __init__(self, settings_file, reader_class, settings=None):
self.watchers = {
'settings': FileSystemWatcher.file_watcher(settings_file)
}
def wait_for_changes(settings_file, reader_class, settings):
new_extensions = set(reader_class(settings).extensions)
content_path = settings.get('PATH', '')
theme_path = settings.get('THEME', '')
ignore_files = set(settings.get('IGNORE_FILES', []))
self.settings = None
self.reader_class = reader_class
self._extensions = None
self._content_path = None
self._theme_path = None
self._ignore_files = None
watching_paths = [
settings_file,
theme_path,
content_path,
]
if settings is not None:
self.update_watchers(settings)
watching_paths.extend(
os.path.join(content_path, path) for path in settings.get('STATIC_PATHS', [])
)
def update_watchers(self, settings):
new_extensions = set(self.reader_class(settings).extensions)
new_content_path = settings.get('PATH', '')
new_theme_path = settings.get('THEME', '')
new_ignore_files = set(settings.get('IGNORE_FILES', []))
watching_paths = [os.path.abspath(p) for p in watching_paths if p and os.path.exists(p)]
extensions_changed = new_extensions != self._extensions
content_changed = new_content_path != self._content_path
theme_changed = new_theme_path != self._theme_path
ignore_changed = new_ignore_files != self._ignore_files
# Refresh content watcher if related settings changed
if extensions_changed or content_changed or ignore_changed:
self.add_watcher('content',
new_content_path,
new_extensions,
new_ignore_files)
# Refresh theme watcher if related settings changed
if theme_changed or ignore_changed:
self.add_watcher('theme',
new_theme_path,
[''],
new_ignore_files)
# Watch STATIC_PATHS
old_static_watchers = set(key
for key in self.watchers
if key.startswith('[static]'))
for path in settings.get('STATIC_PATHS', []):
key = '[static]{}'.format(path)
if ignore_changed or (key not in self.watchers):
self.add_watcher(
key,
os.path.join(new_content_path, path),
[''],
new_ignore_files)
if key in old_static_watchers:
old_static_watchers.remove(key)
# cleanup removed static watchers
for key in old_static_watchers:
del self.watchers[key]
# update values
self.settings = settings
self._extensions = new_extensions
self._content_path = new_content_path
self._theme_path = new_theme_path
self._ignore_files = new_ignore_files
def check(self):
'''return a key:watcher_status dict for all watchers'''
result = {key: next(watcher) for key, watcher in self.watchers.items()}
# Various warnings
if result.get('content') is None:
reader_descs = sorted(
{
' | %s (%s)' % (type(r).__name__, ', '.join(r.file_extensions))
for r in self.reader_class(self.settings).readers.values()
if r.enabled
}
)
logger.warning(
'No valid files found in content for the active readers:\n'
+ '\n'.join(reader_descs))
if result.get('theme') is None:
logger.warning('Empty theme folder. Using `basic` theme.')
return result
def add_watcher(self, key, path, extensions=[''], ignores=[]):
watcher = self.get_watcher(path, extensions, ignores)
if watcher is not None:
self.watchers[key] = watcher
def get_watcher(self, path, extensions=[''], ignores=[]):
'''return a watcher depending on path type (file or folder)'''
if not os.path.exists(path):
logger.warning("Watched path does not exist: %s", path)
return None
if os.path.isdir(path):
return self.folder_watcher(path, extensions, ignores)
else:
return self.file_watcher(path)
@staticmethod
def folder_watcher(path, extensions, ignores=[]):
'''Generator for monitoring a folder for modifications.
Returns a boolean indicating if files are changed since last check.
Returns None if there are no matching files in the folder'''
def file_times(path):
'''Return `mtime` for each file in path'''
for root, dirs, files in os.walk(path, followlinks=True):
dirs[:] = [x for x in dirs if not x.startswith(os.curdir)]
for f in files:
valid_extension = f.endswith(tuple(extensions))
file_ignored = any(
fnmatch.fnmatch(f, ignore) for ignore in ignores
)
if valid_extension and not file_ignored:
try:
yield os.stat(os.path.join(root, f)).st_mtime
except OSError as e:
logger.warning('Caught Exception: %s', e)
LAST_MTIME = 0
while True:
try:
mtime = max(file_times(path))
if mtime > LAST_MTIME:
LAST_MTIME = mtime
yield True
except ValueError:
yield None
else:
yield False
@staticmethod
def file_watcher(path):
'''Generator for monitoring a file for modifications'''
LAST_MTIME = 0
while True:
if path:
try:
mtime = os.stat(path).st_mtime
except OSError as e:
logger.warning('Caught Exception: %s', e)
continue
if mtime > LAST_MTIME:
LAST_MTIME = mtime
yield True
else:
yield False
else:
yield None
return next(watchfiles.watch(
*watching_paths,
watch_filter=watchfiles.DefaultFilter(
ignore_entity_patterns=[fnmatch.translate(pattern) for pattern in ignore_files]
),
rust_timeout=0
))
def set_date_tzinfo(d, tz_name=None):