Datasette.close() method, plus pytest plugin to automatically call it during tests

Refs #2693, #2692
This commit is contained in:
Simon Willison 2026-04-16 20:50:51 -07:00 committed by GitHub
commit 1cd53e1fc3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 556 additions and 27 deletions

108
datasette/_pytest_plugin.py Normal file
View file

@ -0,0 +1,108 @@
"""
Pytest plugin that automatically closes any Datasette instances constructed
during a pytest test both in the test body and in function-scoped
fixtures. Instances constructed by session-, module-, class- or package-
scoped fixtures are left alone, because other tests in the session will
still want to use them.
Registered as a pytest11 entry point in pyproject.toml so that downstream
projects using Datasette get the same FD-safety net for their own tests.
Opt out by setting ``datasette_autoclose = false`` in pytest.ini (or the
equivalent ini file).
"""
from __future__ import annotations
import contextvars
import weakref
import pytest
from datasette.app import Datasette
_active_instances: contextvars.ContextVar[list | None] = contextvars.ContextVar(
"datasette_active_instances", default=None
)
_original_init = Datasette.__init__
def _tracking_init(self, *args, **kwargs):
_original_init(self, *args, **kwargs)
instances = _active_instances.get()
if instances is not None:
instances.append(weakref.ref(self))
Datasette.__init__ = _tracking_init
def pytest_addoption(parser):
parser.addini(
"datasette_autoclose",
help=(
"Automatically close Datasette instances created inside test "
"bodies and function-scoped fixtures (default: true)."
),
default="true",
)
def _enabled(config) -> bool:
value = config.getini("datasette_autoclose")
if isinstance(value, bool):
return value
return str(value).strip().lower() not in ("false", "0", "no", "off")
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_protocol(item, nextitem):
"""Track Datasette instances across setup, call and teardown; close at end."""
if not _enabled(item.config):
yield
return
refs: list[weakref.ref] = []
token = _active_instances.set(refs)
try:
yield
finally:
_active_instances.reset(token)
for ref in reversed(refs):
ds = ref()
if ds is None:
continue
try:
ds.close()
except Exception as e:
item.warn(
pytest.PytestUnraisableExceptionWarning(
f"Error closing Datasette instance: {e!r}"
)
)
@pytest.hookimpl(hookwrapper=True)
def pytest_fixture_setup(fixturedef, request):
"""Exempt instances created by non-function-scoped fixtures.
Session-, module-, class- and package-scoped fixtures produce Datasette
instances that must survive beyond the current test other tests in
the session will still use them. When such a fixture creates one or
more Datasette instances during its setup, we snapshot the tracking
list before the fixture runs and subtract off any instances that were
added during its setup, so they don't get closed at test teardown.
"""
refs = _active_instances.get()
if refs is None:
yield
return
before_ids = {id(ref) for ref in refs}
yield
if fixturedef.scope != "function":
new_refs = [ref for ref in refs if id(ref) not in before_ids]
for new_ref in new_refs:
try:
refs.remove(new_ref)
except ValueError:
pass

View file

@ -326,6 +326,7 @@ class Datasette:
default_deny=False,
):
self._startup_invoked = False
self._closed = False
assert config_dir is None or isinstance(
config_dir, Path
), "config_dir= should be a pathlib.Path"
@ -834,6 +835,33 @@ class Datasette:
new_databases.pop(name)
self.databases = new_databases
def close(self):
"""Release all resources held by this Datasette instance.
Closes every attached Database (including the internal database),
shuts down the executor, and unlinks the temporary file used for
the internal database if one was created. Idempotent and one-way.
"""
if self._closed:
return
self._closed = True
first_exception = None
dbs = list(self.databases.values()) + [self._internal_database]
for db in dbs:
try:
db.close()
except Exception as e:
if first_exception is None:
first_exception = e
if self.executor is not None:
try:
self.executor.shutdown(wait=True, cancel_futures=True)
except Exception as e:
if first_exception is None:
first_exception = e
if first_exception is not None:
raise first_exception
def setting(self, key):
return self._settings.get(key, None)
@ -2310,10 +2338,13 @@ class Datasette:
if not database.is_mutable:
await database.table_counts(limit=60 * 60 * 1000)
async def _close_on_shutdown():
self.close()
asgi = CrossOriginProtectionMiddleware(DatasetteRouter(self, routes), self)
if self.setting("trace_debug"):
asgi = AsgiTracer(asgi)
asgi = AsgiLifespan(asgi)
asgi = AsgiLifespan(asgi, on_shutdown=[_close_on_shutdown])
asgi = AsgiRunOnFirstRequest(asgi, on_startup=[setup_db, self.invoke_startup])
for wrapper in pm.hook.asgi_wrapper(datasette=self):
asgi = wrapper(asgi)

View file

@ -34,6 +34,13 @@ connections = threading.local()
AttachedDatabase = namedtuple("AttachedDatabase", ("seq", "name", "file"))
class DatasetteClosedError(RuntimeError):
"""Raised when using a Datasette or Database instance after close()."""
_SHUTDOWN = object()
class Database:
# For table counts stop at this many rows:
count_limit = 10000
@ -76,6 +83,7 @@ class Database:
self._cached_table_counts = None
self._write_thread = None
self._write_queue = None
self._closed = False
# These are used when in non-threaded mode:
self._read_connection = None
self._write_connection = None
@ -84,6 +92,12 @@ class Database:
if not is_temp_disk:
self.mode = mode
def _check_not_closed(self):
if self._closed:
raise DatasetteClosedError(
"Database {!r} has been closed".format(self.name)
)
@property
def cached_table_counts(self):
if self._cached_table_counts is not None:
@ -149,9 +163,53 @@ class Database:
return conn
def close(self):
# Close all connections - useful to avoid running out of file handles in tests
"""Release all resources held by this database.
Idempotent. After close() further calls to execute()/execute_fn()/
execute_write()/execute_write_fn() raise DatasetteClosedError.
"""
if self._closed:
return
self._closed = True
# Shut down the write thread, if any, via a sentinel. The thread
# drains any writes already queued before the sentinel and then
# closes its own write connection and returns.
write_thread = self._write_thread
if write_thread is not None and self._write_queue is not None:
self._write_queue.put(_SHUTDOWN)
write_thread.join(timeout=10)
if write_thread.is_alive():
sys.stderr.write(
"Datasette: write thread for {!r} did not exit within 10s\n".format(
self.name
)
)
sys.stderr.flush()
# Close anything still tracked in _all_file_connections
for connection in self._all_file_connections:
connection.close()
try:
connection.close()
except Exception:
pass
self._all_file_connections = []
# Drop per-thread cached read connections we can reach
try:
delattr(connections, self._thread_local_id)
except AttributeError:
pass
# Close non-threaded-mode cached connections if still open
if self._read_connection is not None:
try:
self._read_connection.close()
except Exception:
pass
self._read_connection = None
if self._write_connection is not None:
try:
self._write_connection.close()
except Exception:
pass
self._write_connection = None
if self.is_temp_disk:
self._cleanup_temp_file()
@ -164,6 +222,8 @@ class Database:
pass
async def execute_write(self, sql, params=None, block=True, request=None):
self._check_not_closed()
def _inner(conn):
return conn.execute(sql, params or [])
@ -172,6 +232,8 @@ class Database:
return results
async def execute_write_script(self, sql, block=True, request=None):
self._check_not_closed()
def _inner(conn):
return conn.executescript(sql)
@ -182,6 +244,8 @@ class Database:
return results
async def execute_write_many(self, sql, params_seq, block=True, request=None):
self._check_not_closed()
def _inner(conn):
count = 0
@ -203,6 +267,7 @@ class Database:
return results
async def execute_isolated_fn(self, fn):
self._check_not_closed()
# Open a new connection just for the duration of this function
# blocking the write queue to avoid any writes occurring during it
if self.ds.executor is None:
@ -223,6 +288,7 @@ class Database:
return await self._send_to_write_thread(fn, isolated_connection=True)
async def execute_write_fn(self, fn, block=True, transaction=True, request=None):
self._check_not_closed()
pending_events = []
def track_event(event):
@ -334,6 +400,13 @@ class Database:
conn_exception = e
while True:
task = self._write_queue.get()
if task is _SHUTDOWN:
if conn is not None:
try:
conn.close()
except Exception:
pass
return
if conn_exception is not None:
result = conn_exception
else:
@ -366,6 +439,7 @@ class Database:
task.reply_queue.sync_q.put(result)
async def execute_fn(self, fn):
self._check_not_closed()
if self.ds.executor is None:
# non-threaded mode
if self._read_connection is None:
@ -396,6 +470,7 @@ class Database:
log_sql_errors=True,
):
"""Executes sql against db_name in a thread"""
self._check_not_closed()
page_size = page_size or self.ds.page_size
def sql_operation_in_thread(conn):

View file

@ -1079,6 +1079,19 @@ The ``name`` and ``route`` parameters are optional and work the same way as they
This removes a database that has been previously added. ``name=`` is the unique name of that database.
.. _datasette_close:
.close()
--------
Release all resources held by this ``Datasette`` instance. This calls :ref:`database_close` on every attached database (including the internal database), shuts down the thread pool executor used to run SQL queries, and unlinks the temporary file used to back the internal database if one was created.
``close()`` is synchronous, idempotent and one-way: after a call to ``close()`` any attempt to use the Datasette instance to execute SQL will raise a ``datasette.database.DatasetteClosedError`` exception. A closed ``Datasette`` cannot be reopened — callers that need a fresh instance should construct a new one.
If a call to ``Database.close()`` on one of the attached databases raises an exception, ``Datasette.close()`` will continue trying to close the remaining databases and will re-raise the first exception after every database has been processed.
When Datasette is being served over ASGI the ``close()`` method is wired up to the lifespan shutdown event, so resources are released cleanly on ``SIGTERM`` / ``SIGINT``.
.. _datasette_track_event:
await .track_event(event)
@ -1830,7 +1843,11 @@ The return value of the function will be returned by this method. Any exceptions
db.close()
----------
Closes all of the open connections to file-backed databases. This is mainly intended to be used by large test suites, to avoid hitting limits on the number of open files.
Release all resources held by this ``Database`` instance. This shuts down the background write thread (if one was started by a previous call to :ref:`database_execute_write_fn` or similar), closes the write connection, and closes any cached read connections.
After ``db.close()`` has been called, any further call to :ref:`database_execute`, :ref:`database_execute_fn`, :ref:`database_execute_write`, :ref:`database_execute_write_fn`, :ref:`database_execute_write_many`, :ref:`database_execute_write_script` or :ref:`database_execute_isolated_fn` will raise a ``datasette.database.DatasetteClosedError`` exception.
``close()`` is idempotent — calling it a second time is a no-op. It is one-way: a closed ``Database`` cannot be reopened.
.. _internals_database_introspection:

View file

@ -82,6 +82,31 @@ This method registers any :ref:`plugin_hook_startup` or :ref:`plugin_hook_prepar
If you are using ``await datasette.client.get()`` and similar methods then you don't need to worry about this - Datasette automatically calls ``invoke_startup()`` the first time it handles a request.
.. _testing_plugins_autoclose:
Automatic cleanup of Datasette instances
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Installing Datasette also installs a small pytest plugin that automatically calls :ref:`datasette_close` on any ``Datasette()`` instance constructed during a test. This helps prevent large test suites from running out of file descriptors or leaking background threads from the hundreds of instances they may build up across a session.
The plugin closes:
- Instances created in the body of a test function.
- Instances created inside **function-scoped** pytest fixtures (the default scope — ``@pytest.fixture`` with no ``scope=`` argument, or ``scope="function"``).
The plugin deliberately does **not** close:
- Instances created inside higher-scoped fixtures (``scope="session"``, ``"module"``, ``"class"`` or ``"package"``). Those fixtures are typically designed to produce a single ``Datasette`` that is shared across many tests, and closing it automatically would break the tests that run after the first.
In practice this means downstream projects rarely need to call ``ds.close()`` themselves — function-scoped fixtures and inline test code are both covered automatically, while long-lived shared fixtures keep working as before.
If you need to opt out of this behavior, add the following to your ``pytest.ini`` (or equivalent):
.. code-block:: ini
[pytest]
datasette_autoclose = false
.. _testing_datasette_client:
Using datasette.client in tests

View file

@ -54,6 +54,9 @@ CI = "https://github.com/simonw/datasette/actions?query=workflow%3ATest"
[project.scripts]
datasette = "datasette.cli:cli"
[project.entry-points.pytest11]
datasette = "datasette._pytest_plugin"
[dependency-groups]
dev = [
"pytest>=9",
@ -77,6 +80,7 @@ dev = [
"myst-parser",
"sphinx-markdown-builder",
"ruamel.yaml",
"psutil>=5.9",
]
[project.optional-dependencies]

View file

@ -28,8 +28,6 @@ UNDOCUMENTED_PERMISSIONS = {
"view_document",
}
_ds_client = None
def wait_until_responds(url, timeout=5.0, client=httpx, **kwargs):
start = time.time()
@ -53,17 +51,13 @@ def bare_ds():
return Datasette(memory=True)
@pytest_asyncio.fixture
@pytest_asyncio.fixture(scope="session")
async def ds_client():
from datasette.app import Datasette
from datasette.database import Database
from .fixtures import CONFIG, METADATA, PLUGINS_DIR
import secrets
global _ds_client
if _ds_client is not None:
return _ds_client
ds = Datasette(
metadata=METADATA,
config=CONFIG,
@ -95,8 +89,7 @@ async def ds_client():
await db.execute_write_fn(prepare)
await ds.invoke_startup()
_ds_client = ds.client
return _ds_client
return ds.client
def pytest_report_header(config):

View file

@ -22,12 +22,7 @@ def ds_write(tmp_path_factory):
ds = Datasette([db_path], immutables=[db_path_immutable])
ds.root_enabled = True
yield ds
# Close both setup connections plus any Datasette-managed connections.
db1.close()
db2.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()
ds.close()
def write_token(ds, actor_id="root", permissions=None):

View file

@ -52,10 +52,7 @@ def ds_ct(tmp_path_factory):
)
ds.root_enabled = True
yield ds
db.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()
ds.close()
@pytest.fixture
@ -95,10 +92,7 @@ def ds_ct_editor_permission(tmp_path_factory):
)
ds.root_enabled = True
yield ds
db.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()
ds.close()
def write_token(ds, actor_id="root", permissions=None):

View file

@ -23,6 +23,7 @@ async def datasette_with_plugin():
yield datasette
finally:
datasette.pm.unregister(name="undo")
datasette.close()
# -- end datasette_with_plugin_fixture --

56
tests/test_fd_leak.py Normal file
View file

@ -0,0 +1,56 @@
"""
Regression test for https://github.com/simonw/datasette/issues/2692
confirm that creating and closing Datasette instances in a loop does not
leak open file descriptors.
Each Datasette() with is_temp_disk internal DB opens a temp file and a
write thread with its own SQLite connection. Without Datasette.close()
nothing unwinds this state, and a large pytest run exhausts the process
FD limit.
"""
import asyncio
import threading
import pytest
try:
import psutil
except ImportError: # pragma: no cover
psutil = None
from datasette.app import Datasette
def _count_open_files():
return len(psutil.Process().open_files())
def _count_threads():
return threading.active_count()
@pytest.mark.skipif(psutil is None, reason="psutil not installed")
def test_close_releases_file_descriptors():
# Warm-up so Python/library caches don't skew the baseline
ds = Datasette(memory=True)
asyncio.run(ds.invoke_startup())
ds.close()
baseline_fds = _count_open_files()
baseline_threads = _count_threads()
for _ in range(50):
ds = Datasette(memory=True)
asyncio.run(ds.invoke_startup())
ds.close()
after_fds = _count_open_files()
after_threads = _count_threads()
assert (
after_fds - baseline_fds <= 2
), f"Leaked FDs: baseline={baseline_fds}, after=50 iterations={after_fds}"
assert (
after_threads - baseline_threads <= 2
), f"Leaked threads: baseline={baseline_threads}, after={after_threads}"

View file

@ -4,6 +4,7 @@ Tests for the datasette.database.Database class
from datasette.app import Datasette
from datasette.database import Database, Results, MultipleValues
from datasette.database import DatasetteClosedError
from datasette.utils.sqlite import sqlite3, sqlite_version
from datasette.utils import Column
import pytest
@ -833,3 +834,58 @@ def test_repr_temp_disk(app_client):
assert isinstance(db.size, int)
assert isinstance(db.mtime_ns, int)
db.close()
@pytest.mark.asyncio
async def test_database_close_shuts_down_write_thread(tmpdir):
path = str(tmpdir / "dbclose.db")
conn = sqlite3.connect(path)
conn.execute("create table t (id integer primary key)")
conn.close()
ds = Datasette([path])
db = ds.get_database("dbclose")
# Trigger write thread creation
await db.execute_write("insert into t (id) values (1)")
assert db._write_thread is not None
assert db._write_thread.is_alive()
db.close()
# Wait briefly for the thread to exit — the sentinel should cause it to return.
db._write_thread.join(timeout=5)
assert not db._write_thread.is_alive()
ds._internal_database.close()
@pytest.mark.asyncio
async def test_database_close_raises_on_further_use(tmpdir):
path = str(tmpdir / "closed.db")
conn = sqlite3.connect(path)
conn.execute("create table t (id integer primary key)")
conn.close()
ds = Datasette([path])
db = ds.get_database("closed")
await db.execute("select 1")
db.close()
with pytest.raises(DatasetteClosedError):
await db.execute("select 1")
with pytest.raises(DatasetteClosedError):
await db.execute_write("insert into t (id) values (1)")
with pytest.raises(DatasetteClosedError):
await db.execute_fn(lambda conn: conn.execute("select 1").fetchone())
with pytest.raises(DatasetteClosedError):
await db.execute_write_fn(lambda conn: conn.execute("select 1"))
ds._internal_database.close()
@pytest.mark.asyncio
async def test_database_close_is_idempotent(tmpdir):
path = str(tmpdir / "idemp.db")
conn = sqlite3.connect(path)
conn.execute("create table t (id integer primary key)")
conn.close()
ds = Datasette([path])
db = ds.get_database("idemp")
await db.execute_write("insert into t (id) values (1)")
db.close()
# Second call should be a no-op, not raise
db.close()
ds._internal_database.close()

View file

@ -3,8 +3,10 @@ Tests for the datasette.app.Datasette class
"""
import dataclasses
import os
from datasette import Context
from datasette.app import Datasette, Database, ResourcesSQL
from datasette.database import DatasetteClosedError
from datasette.resources import DatabaseResource
from itsdangerous import BadSignature
import pytest
@ -213,3 +215,83 @@ async def test_allowed_resources_sql(datasette):
assert isinstance(result, ResourcesSQL)
assert "all_rules AS" in result.sql
assert result.params["action"] == "view-table"
@pytest.mark.asyncio
async def test_datasette_close_closes_all_databases_and_executor():
ds = Datasette(memory=True)
await ds.invoke_startup()
# Confirm internal DB has write machinery running
assert ds._internal_database._write_thread is not None
assert ds._internal_database._write_thread.is_alive()
temp_path = ds._internal_database.path
assert os.path.exists(temp_path)
executor = ds.executor
ds.close()
# Executor is shut down
assert executor._shutdown
# All attached Database instances are closed
for db in ds.databases.values():
assert db._closed
assert ds._internal_database._closed
# Temp internal DB file is unlinked
assert not os.path.exists(temp_path)
@pytest.mark.asyncio
async def test_datasette_close_is_idempotent():
ds = Datasette(memory=True)
await ds.invoke_startup()
ds.close()
# Second call should be a no-op
ds.close()
@pytest.mark.asyncio
async def test_datasette_close_raises_on_use():
ds = Datasette(memory=True)
await ds.invoke_startup()
ds.close()
with pytest.raises(DatasetteClosedError):
await ds.get_internal_database().execute("select 1")
@pytest.mark.asyncio
async def test_asgi_lifespan_shutdown_closes_datasette():
ds = Datasette(memory=True)
app = ds.app()
# Drive an ASGI lifespan: startup, then shutdown.
messages_sent = []
inbox = [
{"type": "lifespan.startup"},
{"type": "lifespan.shutdown"},
]
async def receive():
return inbox.pop(0)
async def send(message):
messages_sent.append(message)
await app({"type": "lifespan"}, receive, send)
assert {"type": "lifespan.startup.complete"} in messages_sent
assert {"type": "lifespan.shutdown.complete"} in messages_sent
assert ds._closed
@pytest.mark.asyncio
async def test_datasette_close_continues_past_db_error():
# If one Database raises during close(), the others still get closed.
ds = Datasette(memory=True)
await ds.invoke_startup()
class Boom(Database):
def close(self):
raise RuntimeError("boom")
ds.add_database(Boom(ds, is_memory=True), name="bad")
good = ds.add_database(Database(ds, is_memory=True), name="good")
with pytest.raises(RuntimeError, match="boom"):
ds.close()
assert good._closed
assert ds._internal_database._closed

View file

@ -0,0 +1,91 @@
"""
Tests for datasette._pytest_plugin the pytest plugin that auto-closes
Datasette instances constructed inside test bodies.
These tests drive a real pytest session in a subprocess so the plugin
operates exactly as it would for a downstream consumer.
"""
import subprocess
import sys
import textwrap
from pathlib import Path
REPO_ROOT = Path(__file__).parent.parent
def _run_pytest(tmp_path: Path) -> subprocess.CompletedProcess:
return subprocess.run(
[sys.executable, "-m", "pytest", "-v", str(tmp_path)],
cwd=str(tmp_path),
capture_output=True,
text=True,
)
def test_auto_close_of_instances_made_in_test_body(tmp_path):
# Two ordered tests:
# test_a makes a Datasette() and stashes a hard reference
# test_b asserts that the hard-reffed instance was closed by the plugin
(tmp_path / "test_sample.py").write_text(textwrap.dedent("""
from datasette.app import Datasette
_stash = {}
def test_a():
ds = Datasette(memory=True)
_stash["ds"] = ds
assert ds._closed is False
def test_b():
assert _stash["ds"]._closed is True
"""))
result = _run_pytest(tmp_path)
assert result.returncode == 0, result.stdout + result.stderr
def test_fixture_scoped_instance_is_not_closed(tmp_path):
# A module-scoped fixture instance must survive across tests in the module.
(tmp_path / "test_fixture.py").write_text(textwrap.dedent("""
import pytest
from datasette.app import Datasette
@pytest.fixture(scope="module")
def ds():
return Datasette(memory=True)
def test_first(ds):
assert ds._closed is False
def test_second(ds):
# Still alive because the plugin only tracks instances
# constructed during pytest_runtest_call, not during fixture
# setup.
assert ds._closed is False
"""))
result = _run_pytest(tmp_path)
assert result.returncode == 0, result.stdout + result.stderr
def test_opt_out_via_ini(tmp_path):
# datasette_autoclose = false should leave instances untouched.
(tmp_path / "pytest.ini").write_text(textwrap.dedent("""
[pytest]
datasette_autoclose = false
""").strip())
(tmp_path / "test_optout.py").write_text(textwrap.dedent("""
from datasette.app import Datasette
_stash = {}
def test_a():
ds = Datasette(memory=True)
_stash["ds"] = ds
def test_b():
# Opt-out: plugin must not have closed it.
assert _stash["ds"]._closed is False
_stash["ds"].close()
"""))
result = _run_pytest(tmp_path)
assert result.returncode == 0, result.stdout + result.stderr

View file

@ -505,6 +505,7 @@ def ds_with_event_tracking(tmp_path):
ds.track_event = recording_track_event
yield ds
ds.close()
@pytest.mark.asyncio