Compare commits

...

1 commit

Author SHA1 Message Date
Claude
56dd5fe510
Add database file watcher using PRAGMA data_version
Monitors all file-backed databases for changes by polling PRAGMA data_version
via a dedicated sqlite3 connection per database. Detected changes update
datasette._database_updated with the time.monotonic() timestamp. Watchers
start on ASGI startup and are cancelled on shutdown. The /-/databases endpoint
now includes a "last_updated" field for each database.

https://claude.ai/code/session_018W3THaecoPG4K8pFuthz62
2026-03-16 19:21:53 +00:00
2 changed files with 156 additions and 2 deletions

View file

@ -365,6 +365,8 @@ class Datasette:
self._refresh_schemas_lock = asyncio.Lock()
else:
raise
self._database_updated = {}
self._db_watcher_tasks = {}
self.crossdb = crossdb
self.nolock = nolock
if memory or crossdb or not self.files:
@ -700,6 +702,45 @@ class Datasette:
await await_me_maybe(hook)
self._startup_invoked = True
async def _start_database_watchers(self):
for name, db in self.databases.items():
if db.path and name not in self._db_watcher_tasks:
self._db_watcher_tasks[name] = asyncio.create_task(
self._watch_database(name, db.path)
)
async def _watch_database(self, name, db_path, interval=1.0):
loop = asyncio.get_running_loop()
conn = sqlite3.connect(
db_path, isolation_level=None, check_same_thread=False
)
try:
last = await loop.run_in_executor(
None, lambda: conn.execute("PRAGMA data_version").fetchone()[0]
)
while True:
await asyncio.sleep(interval)
v = await loop.run_in_executor(
None, lambda: conn.execute("PRAGMA data_version").fetchone()[0]
)
if v != last:
last = v
self._database_updated[name] = time.monotonic()
except (asyncio.CancelledError, Exception):
pass
finally:
# Small delay to let any in-flight executor work finish
await asyncio.sleep(0.1)
conn.close()
async def _stop_database_watchers(self):
for task in self._db_watcher_tasks.values():
task.cancel()
await asyncio.gather(
*self._db_watcher_tasks.values(), return_exceptions=True
)
self._db_watcher_tasks.clear()
def sign(self, value, namespace="default"):
return URLSafeSerializer(self._secret, namespace).dumps(value)
@ -1558,6 +1599,7 @@ class Datasette:
"is_mutable": d.is_mutable,
"is_memory": d.is_memory,
"hash": d.hash,
"last_updated": self._database_updated.get(d.name),
}
for name, d in self.databases.items()
]
@ -2139,8 +2181,10 @@ class Datasette:
)
if self.setting("trace_debug"):
asgi = AsgiTracer(asgi)
asgi = AsgiLifespan(asgi)
asgi = AsgiRunOnFirstRequest(asgi, on_startup=[setup_db, self.invoke_startup])
asgi = AsgiLifespan(asgi, on_shutdown=[self._stop_database_watchers])
asgi = AsgiRunOnFirstRequest(
asgi, on_startup=[setup_db, self.invoke_startup, self._start_database_watchers]
)
for wrapper in pm.hook.asgi_wrapper(datasette=self):
asgi = wrapper(asgi)
return asgi

View file

@ -0,0 +1,110 @@
import asyncio
import sqlite3
import tempfile
import time
import pytest
import pytest_asyncio
from datasette.app import Datasette
@pytest_asyncio.fixture
async def ds_with_file_db():
tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
tmp_path = tmp.name
tmp.close()
conn = sqlite3.connect(tmp_path)
conn.execute("CREATE TABLE t (id integer primary key)")
conn.close()
ds = Datasette(files=[tmp_path])
client = ds.client
# Trigger startup (which starts the watchers)
await ds.invoke_startup()
await ds._start_database_watchers()
try:
yield ds, client, tmp_path
finally:
await ds._stop_database_watchers()
@pytest.mark.asyncio
async def test_database_updated_initially_empty(ds_with_file_db):
ds, client, tmp_path = ds_with_file_db
db_name = list(ds.databases.keys())[0]
assert db_name not in ds._database_updated
@pytest.mark.asyncio
async def test_database_updated_after_write(ds_with_file_db):
ds, client, tmp_path = ds_with_file_db
db_name = list(ds.databases.keys())[0]
if db_name == "_memory":
db_name = list(ds.databases.keys())[1]
# Write to the database
conn = sqlite3.connect(tmp_path)
conn.execute("INSERT INTO t (id) VALUES (1)")
conn.commit()
conn.close()
# Wait for the watcher to detect the change (interval is 1s by default)
await asyncio.sleep(1.5)
assert db_name in ds._database_updated
assert isinstance(ds._database_updated[db_name], float)
@pytest.mark.asyncio
async def test_databases_endpoint_includes_last_updated(ds_with_file_db):
ds, client, tmp_path = ds_with_file_db
db_name = list(ds.databases.keys())[0]
if db_name == "_memory":
db_name = list(ds.databases.keys())[1]
# Before any writes, last_updated should be None
response = await client.get("/-/databases.json")
databases = response.json()
file_db = [d for d in databases if d["name"] == db_name][0]
assert file_db["last_updated"] is None
# Write to the database
conn = sqlite3.connect(tmp_path)
conn.execute("INSERT INTO t (id) VALUES (1)")
conn.commit()
conn.close()
# Wait for the watcher to detect the change
await asyncio.sleep(1.5)
response = await client.get("/-/databases.json")
databases = response.json()
file_db = [d for d in databases if d["name"] == db_name][0]
assert file_db["last_updated"] is not None
assert isinstance(file_db["last_updated"], float)
@pytest.mark.asyncio
async def test_database_updated_timestamp_increases(ds_with_file_db):
ds, client, tmp_path = ds_with_file_db
db_name = list(ds.databases.keys())[0]
if db_name == "_memory":
db_name = list(ds.databases.keys())[1]
# First write
conn = sqlite3.connect(tmp_path)
conn.execute("INSERT INTO t (id) VALUES (1)")
conn.commit()
conn.close()
await asyncio.sleep(1.5)
first_ts = ds._database_updated[db_name]
# Second write
conn = sqlite3.connect(tmp_path)
conn.execute("INSERT INTO t (id) VALUES (2)")
conn.commit()
conn.close()
await asyncio.sleep(1.5)
second_ts = ds._database_updated[db_name]
assert second_ts > first_ts