mirror of
https://github.com/simonw/datasette.git
synced 2026-05-28 04:46:18 +02:00
Compare commits
1 commit
main
...
claude/add
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
56dd5fe510 |
2 changed files with 156 additions and 2 deletions
|
|
@ -365,6 +365,8 @@ class Datasette:
|
|||
self._refresh_schemas_lock = asyncio.Lock()
|
||||
else:
|
||||
raise
|
||||
self._database_updated = {}
|
||||
self._db_watcher_tasks = {}
|
||||
self.crossdb = crossdb
|
||||
self.nolock = nolock
|
||||
if memory or crossdb or not self.files:
|
||||
|
|
@ -700,6 +702,45 @@ class Datasette:
|
|||
await await_me_maybe(hook)
|
||||
self._startup_invoked = True
|
||||
|
||||
async def _start_database_watchers(self):
|
||||
for name, db in self.databases.items():
|
||||
if db.path and name not in self._db_watcher_tasks:
|
||||
self._db_watcher_tasks[name] = asyncio.create_task(
|
||||
self._watch_database(name, db.path)
|
||||
)
|
||||
|
||||
async def _watch_database(self, name, db_path, interval=1.0):
|
||||
loop = asyncio.get_running_loop()
|
||||
conn = sqlite3.connect(
|
||||
db_path, isolation_level=None, check_same_thread=False
|
||||
)
|
||||
try:
|
||||
last = await loop.run_in_executor(
|
||||
None, lambda: conn.execute("PRAGMA data_version").fetchone()[0]
|
||||
)
|
||||
while True:
|
||||
await asyncio.sleep(interval)
|
||||
v = await loop.run_in_executor(
|
||||
None, lambda: conn.execute("PRAGMA data_version").fetchone()[0]
|
||||
)
|
||||
if v != last:
|
||||
last = v
|
||||
self._database_updated[name] = time.monotonic()
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
finally:
|
||||
# Small delay to let any in-flight executor work finish
|
||||
await asyncio.sleep(0.1)
|
||||
conn.close()
|
||||
|
||||
async def _stop_database_watchers(self):
|
||||
for task in self._db_watcher_tasks.values():
|
||||
task.cancel()
|
||||
await asyncio.gather(
|
||||
*self._db_watcher_tasks.values(), return_exceptions=True
|
||||
)
|
||||
self._db_watcher_tasks.clear()
|
||||
|
||||
def sign(self, value, namespace="default"):
|
||||
return URLSafeSerializer(self._secret, namespace).dumps(value)
|
||||
|
||||
|
|
@ -1558,6 +1599,7 @@ class Datasette:
|
|||
"is_mutable": d.is_mutable,
|
||||
"is_memory": d.is_memory,
|
||||
"hash": d.hash,
|
||||
"last_updated": self._database_updated.get(d.name),
|
||||
}
|
||||
for name, d in self.databases.items()
|
||||
]
|
||||
|
|
@ -2139,8 +2181,10 @@ class Datasette:
|
|||
)
|
||||
if self.setting("trace_debug"):
|
||||
asgi = AsgiTracer(asgi)
|
||||
asgi = AsgiLifespan(asgi)
|
||||
asgi = AsgiRunOnFirstRequest(asgi, on_startup=[setup_db, self.invoke_startup])
|
||||
asgi = AsgiLifespan(asgi, on_shutdown=[self._stop_database_watchers])
|
||||
asgi = AsgiRunOnFirstRequest(
|
||||
asgi, on_startup=[setup_db, self.invoke_startup, self._start_database_watchers]
|
||||
)
|
||||
for wrapper in pm.hook.asgi_wrapper(datasette=self):
|
||||
asgi = wrapper(asgi)
|
||||
return asgi
|
||||
|
|
|
|||
110
tests/test_database_watcher.py
Normal file
110
tests/test_database_watcher.py
Normal file
|
|
@ -0,0 +1,110 @@
|
|||
import asyncio
|
||||
import sqlite3
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from datasette.app import Datasette
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def ds_with_file_db():
|
||||
tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
|
||||
tmp_path = tmp.name
|
||||
tmp.close()
|
||||
conn = sqlite3.connect(tmp_path)
|
||||
conn.execute("CREATE TABLE t (id integer primary key)")
|
||||
conn.close()
|
||||
ds = Datasette(files=[tmp_path])
|
||||
client = ds.client
|
||||
# Trigger startup (which starts the watchers)
|
||||
await ds.invoke_startup()
|
||||
await ds._start_database_watchers()
|
||||
try:
|
||||
yield ds, client, tmp_path
|
||||
finally:
|
||||
await ds._stop_database_watchers()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_database_updated_initially_empty(ds_with_file_db):
|
||||
ds, client, tmp_path = ds_with_file_db
|
||||
db_name = list(ds.databases.keys())[0]
|
||||
assert db_name not in ds._database_updated
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_database_updated_after_write(ds_with_file_db):
|
||||
ds, client, tmp_path = ds_with_file_db
|
||||
db_name = list(ds.databases.keys())[0]
|
||||
if db_name == "_memory":
|
||||
db_name = list(ds.databases.keys())[1]
|
||||
|
||||
# Write to the database
|
||||
conn = sqlite3.connect(tmp_path)
|
||||
conn.execute("INSERT INTO t (id) VALUES (1)")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Wait for the watcher to detect the change (interval is 1s by default)
|
||||
await asyncio.sleep(1.5)
|
||||
|
||||
assert db_name in ds._database_updated
|
||||
assert isinstance(ds._database_updated[db_name], float)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_databases_endpoint_includes_last_updated(ds_with_file_db):
|
||||
ds, client, tmp_path = ds_with_file_db
|
||||
db_name = list(ds.databases.keys())[0]
|
||||
if db_name == "_memory":
|
||||
db_name = list(ds.databases.keys())[1]
|
||||
|
||||
# Before any writes, last_updated should be None
|
||||
response = await client.get("/-/databases.json")
|
||||
databases = response.json()
|
||||
file_db = [d for d in databases if d["name"] == db_name][0]
|
||||
assert file_db["last_updated"] is None
|
||||
|
||||
# Write to the database
|
||||
conn = sqlite3.connect(tmp_path)
|
||||
conn.execute("INSERT INTO t (id) VALUES (1)")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Wait for the watcher to detect the change
|
||||
await asyncio.sleep(1.5)
|
||||
|
||||
response = await client.get("/-/databases.json")
|
||||
databases = response.json()
|
||||
file_db = [d for d in databases if d["name"] == db_name][0]
|
||||
assert file_db["last_updated"] is not None
|
||||
assert isinstance(file_db["last_updated"], float)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_database_updated_timestamp_increases(ds_with_file_db):
|
||||
ds, client, tmp_path = ds_with_file_db
|
||||
db_name = list(ds.databases.keys())[0]
|
||||
if db_name == "_memory":
|
||||
db_name = list(ds.databases.keys())[1]
|
||||
|
||||
# First write
|
||||
conn = sqlite3.connect(tmp_path)
|
||||
conn.execute("INSERT INTO t (id) VALUES (1)")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
await asyncio.sleep(1.5)
|
||||
first_ts = ds._database_updated[db_name]
|
||||
|
||||
# Second write
|
||||
conn = sqlite3.connect(tmp_path)
|
||||
conn.execute("INSERT INTO t (id) VALUES (2)")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
await asyncio.sleep(1.5)
|
||||
second_ts = ds._database_updated[db_name]
|
||||
|
||||
assert second_ts > first_ts
|
||||
Loading…
Add table
Add a link
Reference in a new issue