mirror of
https://github.com/simonw/datasette.git
synced 2026-05-28 04:46:18 +02:00
Database.close() shuts down write thread and raises DatasetteClosedError
After this commit, Database.close() sends a sentinel to the write queue so the background write thread exits cleanly, closes cached read/write connections, and marks the instance closed. Subsequent calls to execute*() raise DatasetteClosedError. close() remains idempotent and one-way. Refs #2692 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
ade0ef8a60
commit
dabf8e4199
3 changed files with 138 additions and 3 deletions
|
|
@ -34,6 +34,13 @@ connections = threading.local()
|
|||
AttachedDatabase = namedtuple("AttachedDatabase", ("seq", "name", "file"))
|
||||
|
||||
|
||||
class DatasetteClosedError(RuntimeError):
|
||||
"""Raised when using a Datasette or Database instance after close()."""
|
||||
|
||||
|
||||
_SHUTDOWN = object()
|
||||
|
||||
|
||||
class Database:
|
||||
# For table counts stop at this many rows:
|
||||
count_limit = 10000
|
||||
|
|
@ -76,6 +83,7 @@ class Database:
|
|||
self._cached_table_counts = None
|
||||
self._write_thread = None
|
||||
self._write_queue = None
|
||||
self._closed = False
|
||||
# These are used when in non-threaded mode:
|
||||
self._read_connection = None
|
||||
self._write_connection = None
|
||||
|
|
@ -84,6 +92,12 @@ class Database:
|
|||
if not is_temp_disk:
|
||||
self.mode = mode
|
||||
|
||||
def _check_not_closed(self):
|
||||
if self._closed:
|
||||
raise DatasetteClosedError(
|
||||
"Database {!r} has been closed".format(self.name)
|
||||
)
|
||||
|
||||
@property
|
||||
def cached_table_counts(self):
|
||||
if self._cached_table_counts is not None:
|
||||
|
|
@ -149,9 +163,53 @@ class Database:
|
|||
return conn
|
||||
|
||||
def close(self):
|
||||
# Close all connections - useful to avoid running out of file handles in tests
|
||||
"""Release all resources held by this database.
|
||||
|
||||
Idempotent. After close() further calls to execute()/execute_fn()/
|
||||
execute_write()/execute_write_fn() raise DatasetteClosedError.
|
||||
"""
|
||||
if self._closed:
|
||||
return
|
||||
self._closed = True
|
||||
# Shut down the write thread, if any, via a sentinel. The thread
|
||||
# drains any writes already queued before the sentinel and then
|
||||
# closes its own write connection and returns.
|
||||
write_thread = self._write_thread
|
||||
if write_thread is not None and self._write_queue is not None:
|
||||
self._write_queue.put(_SHUTDOWN)
|
||||
write_thread.join(timeout=10)
|
||||
if write_thread.is_alive():
|
||||
sys.stderr.write(
|
||||
"Datasette: write thread for {!r} did not exit within 10s\n".format(
|
||||
self.name
|
||||
)
|
||||
)
|
||||
sys.stderr.flush()
|
||||
# Close anything still tracked in _all_file_connections
|
||||
for connection in self._all_file_connections:
|
||||
connection.close()
|
||||
try:
|
||||
connection.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._all_file_connections = []
|
||||
# Drop per-thread cached read connections we can reach
|
||||
try:
|
||||
delattr(connections, self._thread_local_id)
|
||||
except AttributeError:
|
||||
pass
|
||||
# Close non-threaded-mode cached connections if still open
|
||||
if self._read_connection is not None:
|
||||
try:
|
||||
self._read_connection.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._read_connection = None
|
||||
if self._write_connection is not None:
|
||||
try:
|
||||
self._write_connection.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._write_connection = None
|
||||
if self.is_temp_disk:
|
||||
self._cleanup_temp_file()
|
||||
|
||||
|
|
@ -164,6 +222,8 @@ class Database:
|
|||
pass
|
||||
|
||||
async def execute_write(self, sql, params=None, block=True, request=None):
|
||||
self._check_not_closed()
|
||||
|
||||
def _inner(conn):
|
||||
return conn.execute(sql, params or [])
|
||||
|
||||
|
|
@ -172,6 +232,8 @@ class Database:
|
|||
return results
|
||||
|
||||
async def execute_write_script(self, sql, block=True, request=None):
|
||||
self._check_not_closed()
|
||||
|
||||
def _inner(conn):
|
||||
return conn.executescript(sql)
|
||||
|
||||
|
|
@ -182,6 +244,8 @@ class Database:
|
|||
return results
|
||||
|
||||
async def execute_write_many(self, sql, params_seq, block=True, request=None):
|
||||
self._check_not_closed()
|
||||
|
||||
def _inner(conn):
|
||||
count = 0
|
||||
|
||||
|
|
@ -203,6 +267,7 @@ class Database:
|
|||
return results
|
||||
|
||||
async def execute_isolated_fn(self, fn):
|
||||
self._check_not_closed()
|
||||
# Open a new connection just for the duration of this function
|
||||
# blocking the write queue to avoid any writes occurring during it
|
||||
if self.ds.executor is None:
|
||||
|
|
@ -223,6 +288,7 @@ class Database:
|
|||
return await self._send_to_write_thread(fn, isolated_connection=True)
|
||||
|
||||
async def execute_write_fn(self, fn, block=True, transaction=True, request=None):
|
||||
self._check_not_closed()
|
||||
pending_events = []
|
||||
|
||||
def track_event(event):
|
||||
|
|
@ -334,6 +400,13 @@ class Database:
|
|||
conn_exception = e
|
||||
while True:
|
||||
task = self._write_queue.get()
|
||||
if task is _SHUTDOWN:
|
||||
if conn is not None:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
return
|
||||
if conn_exception is not None:
|
||||
result = conn_exception
|
||||
else:
|
||||
|
|
@ -366,6 +439,7 @@ class Database:
|
|||
task.reply_queue.sync_q.put(result)
|
||||
|
||||
async def execute_fn(self, fn):
|
||||
self._check_not_closed()
|
||||
if self.ds.executor is None:
|
||||
# non-threaded mode
|
||||
if self._read_connection is None:
|
||||
|
|
@ -396,6 +470,7 @@ class Database:
|
|||
log_sql_errors=True,
|
||||
):
|
||||
"""Executes sql against db_name in a thread"""
|
||||
self._check_not_closed()
|
||||
page_size = page_size or self.ds.page_size
|
||||
|
||||
def sql_operation_in_thread(conn):
|
||||
|
|
|
|||
|
|
@ -1830,7 +1830,11 @@ The return value of the function will be returned by this method. Any exceptions
|
|||
db.close()
|
||||
----------
|
||||
|
||||
Closes all of the open connections to file-backed databases. This is mainly intended to be used by large test suites, to avoid hitting limits on the number of open files.
|
||||
Release all resources held by this ``Database`` instance. This shuts down the background write thread (if one was started by a previous call to :ref:`database_execute_write_fn` or similar), closes the write connection, and closes any cached read connections.
|
||||
|
||||
After ``db.close()`` has been called, any further call to :ref:`database_execute`, :ref:`database_execute_fn`, :ref:`database_execute_write`, :ref:`database_execute_write_fn`, :ref:`database_execute_write_many`, :ref:`database_execute_write_script` or :ref:`database_execute_isolated_fn` will raise a ``datasette.database.DatasetteClosedError`` exception.
|
||||
|
||||
``close()`` is idempotent — calling it a second time is a no-op. It is one-way: a closed ``Database`` cannot be reopened.
|
||||
|
||||
.. _internals_database_introspection:
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ Tests for the datasette.database.Database class
|
|||
|
||||
from datasette.app import Datasette
|
||||
from datasette.database import Database, Results, MultipleValues
|
||||
from datasette.database import DatasetteClosedError
|
||||
from datasette.utils.sqlite import sqlite3, sqlite_version
|
||||
from datasette.utils import Column
|
||||
import pytest
|
||||
|
|
@ -833,3 +834,58 @@ def test_repr_temp_disk(app_client):
|
|||
assert isinstance(db.size, int)
|
||||
assert isinstance(db.mtime_ns, int)
|
||||
db.close()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_database_close_shuts_down_write_thread(tmpdir):
|
||||
path = str(tmpdir / "dbclose.db")
|
||||
conn = sqlite3.connect(path)
|
||||
conn.execute("create table t (id integer primary key)")
|
||||
conn.close()
|
||||
ds = Datasette([path])
|
||||
db = ds.get_database("dbclose")
|
||||
# Trigger write thread creation
|
||||
await db.execute_write("insert into t (id) values (1)")
|
||||
assert db._write_thread is not None
|
||||
assert db._write_thread.is_alive()
|
||||
db.close()
|
||||
# Wait briefly for the thread to exit — the sentinel should cause it to return.
|
||||
db._write_thread.join(timeout=5)
|
||||
assert not db._write_thread.is_alive()
|
||||
ds._internal_database.close()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_database_close_raises_on_further_use(tmpdir):
|
||||
path = str(tmpdir / "closed.db")
|
||||
conn = sqlite3.connect(path)
|
||||
conn.execute("create table t (id integer primary key)")
|
||||
conn.close()
|
||||
ds = Datasette([path])
|
||||
db = ds.get_database("closed")
|
||||
await db.execute("select 1")
|
||||
db.close()
|
||||
with pytest.raises(DatasetteClosedError):
|
||||
await db.execute("select 1")
|
||||
with pytest.raises(DatasetteClosedError):
|
||||
await db.execute_write("insert into t (id) values (1)")
|
||||
with pytest.raises(DatasetteClosedError):
|
||||
await db.execute_fn(lambda conn: conn.execute("select 1").fetchone())
|
||||
with pytest.raises(DatasetteClosedError):
|
||||
await db.execute_write_fn(lambda conn: conn.execute("select 1"))
|
||||
ds._internal_database.close()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_database_close_is_idempotent(tmpdir):
|
||||
path = str(tmpdir / "idemp.db")
|
||||
conn = sqlite3.connect(path)
|
||||
conn.execute("create table t (id integer primary key)")
|
||||
conn.close()
|
||||
ds = Datasette([path])
|
||||
db = ds.get_database("idemp")
|
||||
await db.execute_write("insert into t (id) values (1)")
|
||||
db.close()
|
||||
# Second call should be a no-op, not raise
|
||||
db.close()
|
||||
ds._internal_database.close()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue