From 56dd5fe51011288d5922fadd7d4214ea07691e94 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 16 Mar 2026 19:21:53 +0000 Subject: [PATCH] Add database file watcher using PRAGMA data_version Monitors all file-backed databases for changes by polling PRAGMA data_version via a dedicated sqlite3 connection per database. Detected changes update datasette._database_updated with the time.monotonic() timestamp. Watchers start on ASGI startup and are cancelled on shutdown. The /-/databases endpoint now includes a "last_updated" field for each database. https://claude.ai/code/session_018W3THaecoPG4K8pFuthz62 --- datasette/app.py | 48 +++++++++++++- tests/test_database_watcher.py | 110 +++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+), 2 deletions(-) create mode 100644 tests/test_database_watcher.py diff --git a/datasette/app.py b/datasette/app.py index 2df6e4e8..4c152861 100644 --- a/datasette/app.py +++ b/datasette/app.py @@ -365,6 +365,8 @@ class Datasette: self._refresh_schemas_lock = asyncio.Lock() else: raise + self._database_updated = {} + self._db_watcher_tasks = {} self.crossdb = crossdb self.nolock = nolock if memory or crossdb or not self.files: @@ -700,6 +702,45 @@ class Datasette: await await_me_maybe(hook) self._startup_invoked = True + async def _start_database_watchers(self): + for name, db in self.databases.items(): + if db.path and name not in self._db_watcher_tasks: + self._db_watcher_tasks[name] = asyncio.create_task( + self._watch_database(name, db.path) + ) + + async def _watch_database(self, name, db_path, interval=1.0): + loop = asyncio.get_running_loop() + conn = sqlite3.connect( + db_path, isolation_level=None, check_same_thread=False + ) + try: + last = await loop.run_in_executor( + None, lambda: conn.execute("PRAGMA data_version").fetchone()[0] + ) + while True: + await asyncio.sleep(interval) + v = await loop.run_in_executor( + None, lambda: conn.execute("PRAGMA data_version").fetchone()[0] + ) + if v != last: + last = v + self._database_updated[name] = time.monotonic() + except (asyncio.CancelledError, Exception): + pass + finally: + # Small delay to let any in-flight executor work finish + await asyncio.sleep(0.1) + conn.close() + + async def _stop_database_watchers(self): + for task in self._db_watcher_tasks.values(): + task.cancel() + await asyncio.gather( + *self._db_watcher_tasks.values(), return_exceptions=True + ) + self._db_watcher_tasks.clear() + def sign(self, value, namespace="default"): return URLSafeSerializer(self._secret, namespace).dumps(value) @@ -1558,6 +1599,7 @@ class Datasette: "is_mutable": d.is_mutable, "is_memory": d.is_memory, "hash": d.hash, + "last_updated": self._database_updated.get(d.name), } for name, d in self.databases.items() ] @@ -2139,8 +2181,10 @@ class Datasette: ) if self.setting("trace_debug"): asgi = AsgiTracer(asgi) - asgi = AsgiLifespan(asgi) - asgi = AsgiRunOnFirstRequest(asgi, on_startup=[setup_db, self.invoke_startup]) + asgi = AsgiLifespan(asgi, on_shutdown=[self._stop_database_watchers]) + asgi = AsgiRunOnFirstRequest( + asgi, on_startup=[setup_db, self.invoke_startup, self._start_database_watchers] + ) for wrapper in pm.hook.asgi_wrapper(datasette=self): asgi = wrapper(asgi) return asgi diff --git a/tests/test_database_watcher.py b/tests/test_database_watcher.py new file mode 100644 index 00000000..b44b5c7a --- /dev/null +++ b/tests/test_database_watcher.py @@ -0,0 +1,110 @@ +import asyncio +import sqlite3 +import tempfile +import time + +import pytest +import pytest_asyncio + +from datasette.app import Datasette + + +@pytest_asyncio.fixture +async def ds_with_file_db(): + tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False) + tmp_path = tmp.name + tmp.close() + conn = sqlite3.connect(tmp_path) + conn.execute("CREATE TABLE t (id integer primary key)") + conn.close() + ds = Datasette(files=[tmp_path]) + client = ds.client + # Trigger startup (which starts the watchers) + await ds.invoke_startup() + await ds._start_database_watchers() + try: + yield ds, client, tmp_path + finally: + await ds._stop_database_watchers() + + +@pytest.mark.asyncio +async def test_database_updated_initially_empty(ds_with_file_db): + ds, client, tmp_path = ds_with_file_db + db_name = list(ds.databases.keys())[0] + assert db_name not in ds._database_updated + + +@pytest.mark.asyncio +async def test_database_updated_after_write(ds_with_file_db): + ds, client, tmp_path = ds_with_file_db + db_name = list(ds.databases.keys())[0] + if db_name == "_memory": + db_name = list(ds.databases.keys())[1] + + # Write to the database + conn = sqlite3.connect(tmp_path) + conn.execute("INSERT INTO t (id) VALUES (1)") + conn.commit() + conn.close() + + # Wait for the watcher to detect the change (interval is 1s by default) + await asyncio.sleep(1.5) + + assert db_name in ds._database_updated + assert isinstance(ds._database_updated[db_name], float) + + +@pytest.mark.asyncio +async def test_databases_endpoint_includes_last_updated(ds_with_file_db): + ds, client, tmp_path = ds_with_file_db + db_name = list(ds.databases.keys())[0] + if db_name == "_memory": + db_name = list(ds.databases.keys())[1] + + # Before any writes, last_updated should be None + response = await client.get("/-/databases.json") + databases = response.json() + file_db = [d for d in databases if d["name"] == db_name][0] + assert file_db["last_updated"] is None + + # Write to the database + conn = sqlite3.connect(tmp_path) + conn.execute("INSERT INTO t (id) VALUES (1)") + conn.commit() + conn.close() + + # Wait for the watcher to detect the change + await asyncio.sleep(1.5) + + response = await client.get("/-/databases.json") + databases = response.json() + file_db = [d for d in databases if d["name"] == db_name][0] + assert file_db["last_updated"] is not None + assert isinstance(file_db["last_updated"], float) + + +@pytest.mark.asyncio +async def test_database_updated_timestamp_increases(ds_with_file_db): + ds, client, tmp_path = ds_with_file_db + db_name = list(ds.databases.keys())[0] + if db_name == "_memory": + db_name = list(ds.databases.keys())[1] + + # First write + conn = sqlite3.connect(tmp_path) + conn.execute("INSERT INTO t (id) VALUES (1)") + conn.commit() + conn.close() + await asyncio.sleep(1.5) + first_ts = ds._database_updated[db_name] + + # Second write + conn = sqlite3.connect(tmp_path) + conn.execute("INSERT INTO t (id) VALUES (2)") + conn.commit() + conn.close() + await asyncio.sleep(1.5) + second_ts = ds._database_updated[db_name] + + assert second_ts > first_ts