From dabf8e4199cd4598697e538c495cc66aa429a262 Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Thu, 16 Apr 2026 20:08:46 -0700 Subject: [PATCH] Database.close() shuts down write thread and raises DatasetteClosedError After this commit, Database.close() sends a sentinel to the write queue so the background write thread exits cleanly, closes cached read/write connections, and marks the instance closed. Subsequent calls to execute*() raise DatasetteClosedError. close() remains idempotent and one-way. Refs #2692 Co-Authored-By: Claude Opus 4.7 (1M context) --- datasette/database.py | 79 +++++++++++++++++++++++++++++++- docs/internals.rst | 6 ++- tests/test_internals_database.py | 56 ++++++++++++++++++++++ 3 files changed, 138 insertions(+), 3 deletions(-) diff --git a/datasette/database.py b/datasette/database.py index 7364ff7f..e3c4bfec 100644 --- a/datasette/database.py +++ b/datasette/database.py @@ -34,6 +34,13 @@ connections = threading.local() AttachedDatabase = namedtuple("AttachedDatabase", ("seq", "name", "file")) +class DatasetteClosedError(RuntimeError): + """Raised when using a Datasette or Database instance after close().""" + + +_SHUTDOWN = object() + + class Database: # For table counts stop at this many rows: count_limit = 10000 @@ -76,6 +83,7 @@ class Database: self._cached_table_counts = None self._write_thread = None self._write_queue = None + self._closed = False # These are used when in non-threaded mode: self._read_connection = None self._write_connection = None @@ -84,6 +92,12 @@ class Database: if not is_temp_disk: self.mode = mode + def _check_not_closed(self): + if self._closed: + raise DatasetteClosedError( + "Database {!r} has been closed".format(self.name) + ) + @property def cached_table_counts(self): if self._cached_table_counts is not None: @@ -149,9 +163,53 @@ class Database: return conn def close(self): - # Close all connections - useful to avoid running out of file handles in tests + """Release all resources held by this database. + + Idempotent. After close() further calls to execute()/execute_fn()/ + execute_write()/execute_write_fn() raise DatasetteClosedError. + """ + if self._closed: + return + self._closed = True + # Shut down the write thread, if any, via a sentinel. The thread + # drains any writes already queued before the sentinel and then + # closes its own write connection and returns. + write_thread = self._write_thread + if write_thread is not None and self._write_queue is not None: + self._write_queue.put(_SHUTDOWN) + write_thread.join(timeout=10) + if write_thread.is_alive(): + sys.stderr.write( + "Datasette: write thread for {!r} did not exit within 10s\n".format( + self.name + ) + ) + sys.stderr.flush() + # Close anything still tracked in _all_file_connections for connection in self._all_file_connections: - connection.close() + try: + connection.close() + except Exception: + pass + self._all_file_connections = [] + # Drop per-thread cached read connections we can reach + try: + delattr(connections, self._thread_local_id) + except AttributeError: + pass + # Close non-threaded-mode cached connections if still open + if self._read_connection is not None: + try: + self._read_connection.close() + except Exception: + pass + self._read_connection = None + if self._write_connection is not None: + try: + self._write_connection.close() + except Exception: + pass + self._write_connection = None if self.is_temp_disk: self._cleanup_temp_file() @@ -164,6 +222,8 @@ class Database: pass async def execute_write(self, sql, params=None, block=True, request=None): + self._check_not_closed() + def _inner(conn): return conn.execute(sql, params or []) @@ -172,6 +232,8 @@ class Database: return results async def execute_write_script(self, sql, block=True, request=None): + self._check_not_closed() + def _inner(conn): return conn.executescript(sql) @@ -182,6 +244,8 @@ class Database: return results async def execute_write_many(self, sql, params_seq, block=True, request=None): + self._check_not_closed() + def _inner(conn): count = 0 @@ -203,6 +267,7 @@ class Database: return results async def execute_isolated_fn(self, fn): + self._check_not_closed() # Open a new connection just for the duration of this function # blocking the write queue to avoid any writes occurring during it if self.ds.executor is None: @@ -223,6 +288,7 @@ class Database: return await self._send_to_write_thread(fn, isolated_connection=True) async def execute_write_fn(self, fn, block=True, transaction=True, request=None): + self._check_not_closed() pending_events = [] def track_event(event): @@ -334,6 +400,13 @@ class Database: conn_exception = e while True: task = self._write_queue.get() + if task is _SHUTDOWN: + if conn is not None: + try: + conn.close() + except Exception: + pass + return if conn_exception is not None: result = conn_exception else: @@ -366,6 +439,7 @@ class Database: task.reply_queue.sync_q.put(result) async def execute_fn(self, fn): + self._check_not_closed() if self.ds.executor is None: # non-threaded mode if self._read_connection is None: @@ -396,6 +470,7 @@ class Database: log_sql_errors=True, ): """Executes sql against db_name in a thread""" + self._check_not_closed() page_size = page_size or self.ds.page_size def sql_operation_in_thread(conn): diff --git a/docs/internals.rst b/docs/internals.rst index ba9d3131..53c20106 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -1830,7 +1830,11 @@ The return value of the function will be returned by this method. Any exceptions db.close() ---------- -Closes all of the open connections to file-backed databases. This is mainly intended to be used by large test suites, to avoid hitting limits on the number of open files. +Release all resources held by this ``Database`` instance. This shuts down the background write thread (if one was started by a previous call to :ref:`database_execute_write_fn` or similar), closes the write connection, and closes any cached read connections. + +After ``db.close()`` has been called, any further call to :ref:`database_execute`, :ref:`database_execute_fn`, :ref:`database_execute_write`, :ref:`database_execute_write_fn`, :ref:`database_execute_write_many`, :ref:`database_execute_write_script` or :ref:`database_execute_isolated_fn` will raise a ``datasette.database.DatasetteClosedError`` exception. + +``close()`` is idempotent — calling it a second time is a no-op. It is one-way: a closed ``Database`` cannot be reopened. .. _internals_database_introspection: diff --git a/tests/test_internals_database.py b/tests/test_internals_database.py index 0d565d61..8ff74a83 100644 --- a/tests/test_internals_database.py +++ b/tests/test_internals_database.py @@ -4,6 +4,7 @@ Tests for the datasette.database.Database class from datasette.app import Datasette from datasette.database import Database, Results, MultipleValues +from datasette.database import DatasetteClosedError from datasette.utils.sqlite import sqlite3, sqlite_version from datasette.utils import Column import pytest @@ -833,3 +834,58 @@ def test_repr_temp_disk(app_client): assert isinstance(db.size, int) assert isinstance(db.mtime_ns, int) db.close() + + +@pytest.mark.asyncio +async def test_database_close_shuts_down_write_thread(tmpdir): + path = str(tmpdir / "dbclose.db") + conn = sqlite3.connect(path) + conn.execute("create table t (id integer primary key)") + conn.close() + ds = Datasette([path]) + db = ds.get_database("dbclose") + # Trigger write thread creation + await db.execute_write("insert into t (id) values (1)") + assert db._write_thread is not None + assert db._write_thread.is_alive() + db.close() + # Wait briefly for the thread to exit — the sentinel should cause it to return. + db._write_thread.join(timeout=5) + assert not db._write_thread.is_alive() + ds._internal_database.close() + + +@pytest.mark.asyncio +async def test_database_close_raises_on_further_use(tmpdir): + path = str(tmpdir / "closed.db") + conn = sqlite3.connect(path) + conn.execute("create table t (id integer primary key)") + conn.close() + ds = Datasette([path]) + db = ds.get_database("closed") + await db.execute("select 1") + db.close() + with pytest.raises(DatasetteClosedError): + await db.execute("select 1") + with pytest.raises(DatasetteClosedError): + await db.execute_write("insert into t (id) values (1)") + with pytest.raises(DatasetteClosedError): + await db.execute_fn(lambda conn: conn.execute("select 1").fetchone()) + with pytest.raises(DatasetteClosedError): + await db.execute_write_fn(lambda conn: conn.execute("select 1")) + ds._internal_database.close() + + +@pytest.mark.asyncio +async def test_database_close_is_idempotent(tmpdir): + path = str(tmpdir / "idemp.db") + conn = sqlite3.connect(path) + conn.execute("create table t (id integer primary key)") + conn.close() + ds = Datasette([path]) + db = ds.get_database("idemp") + await db.execute_write("insert into t (id) values (1)") + db.close() + # Second call should be a no-op, not raise + db.close() + ds._internal_database.close()