diff --git a/datasette/database.py b/datasette/database.py index 0a32442c..6cd5d11e 100644 --- a/datasette/database.py +++ b/datasette/database.py @@ -298,13 +298,14 @@ class Database: async def execute_isolated_fn(self, fn): self._check_not_closed() - # Open a new connection just for the duration of this function + # Open a new connection just for the duration of this function, # blocking the write queue to avoid any writes occurring during it - if self.ds.executor is None: - # non-threaded mode - isolated_connection = self.connect(write=True) + write = self.is_mutable + + def _run(): + isolated_connection = self.connect(write=write) try: - result = fn(isolated_connection) + return fn(isolated_connection) finally: isolated_connection.close() try: @@ -312,10 +313,18 @@ class Database: except ValueError: # Was probably a memory connection pass - return result - else: - # Threaded mode - send to write thread - return await self._send_to_write_thread(fn, isolated_connection=True) + + if self.ds.executor is None: + # non-threaded mode + return _run() + if not write: + # Immutable database - no writes can ever occur, so there is no + # write queue to block; run against a fresh read-only connection + return await asyncio.get_running_loop().run_in_executor( + self.ds.executor, _run + ) + # Threaded mode - send to write thread + return await self._send_to_write_thread(fn, isolated_connection=True) async def analyze_sql(self, sql, params=None) -> SQLAnalysis: self._check_not_closed() @@ -449,20 +458,21 @@ class Database: if conn_exception is not None: exception = conn_exception elif task.isolated_connection: - isolated_connection = self.connect(write=True) try: - result = task.fn(isolated_connection) + isolated_connection = self.connect(write=True) + try: + result = task.fn(isolated_connection) + finally: + isolated_connection.close() + try: + self._all_file_connections.remove(isolated_connection) + except ValueError: + # Was probably a memory connection + pass except Exception as e: sys.stderr.write("{}\n".format(e)) sys.stderr.flush() exception = e - finally: - isolated_connection.close() - try: - self._all_file_connections.remove(isolated_connection) - except ValueError: - # Was probably a memory connection - pass else: try: if task.transaction: diff --git a/tests/test_internals_database.py b/tests/test_internals_database.py index bb209649..bad4e8ca 100644 --- a/tests/test_internals_database.py +++ b/tests/test_internals_database.py @@ -863,6 +863,39 @@ async def test_execute_isolated(db, disable_threads): assert not await db.execute_isolated_fn(table_exists_checker("created_by_isolated")) +@pytest.mark.asyncio +async def test_execute_isolated_connect_failure_does_not_kill_write_thread(): + # A connect() failure for an isolated task should be returned to the + # caller as an exception, not crash the write thread + class ConnectError(Exception): + pass + + ds = Datasette(memory=True) + db = ds.add_memory_database("test_isolated_connect_failure") + # Start the write thread with a healthy dedicated write connection + await db.execute_write("create table dogs (id integer primary key)") + + original_connect = db.connect + + def broken_connect(write=False): + raise ConnectError("Could not connect") + + db.connect = broken_connect + try: + with pytest.raises(ConnectError): + await asyncio.wait_for(db.execute_isolated_fn(lambda conn: None), timeout=2) + finally: + db.connect = original_connect + + # Write thread should still be alive and processing tasks + assert db._write_thread.is_alive() + await db.execute_write("insert into dogs (id) values (1)") + count = await db.execute_isolated_fn( + lambda conn: conn.execute("select count(*) from dogs").fetchone()[0] + ) + assert count == 1 + + @pytest.mark.asyncio async def test_analyze_sql(): ds = Datasette(memory=True) diff --git a/tests/test_queries.py b/tests/test_queries.py index 6e9bcbdb..0354f73a 100644 --- a/tests/test_queries.py +++ b/tests/test_queries.py @@ -9,7 +9,7 @@ from datasette.app import Datasette from datasette.resources import DatabaseResource, QueryResource from datasette.stored_queries import StoredQuery, StoredQueryPage from datasette.utils.asgi import Forbidden -from datasette.utils.sqlite import supports_returning +from datasette.utils.sqlite import sqlite3, supports_returning requires_sqlite_returning = pytest.mark.skipif( not supports_returning(), reason="SQLite does not support RETURNING" @@ -593,6 +593,38 @@ async def test_query_store_api_creates_read_only_query(): assert data["query"]["owner_id"] == "root" +@pytest.mark.asyncio +async def test_query_store_api_creates_query_for_immutable_database(tmp_path): + db_path = tmp_path / "immutable.db" + conn = sqlite3.connect(str(db_path)) + conn.execute("create table dogs (id integer primary key, name text)") + conn.commit() + conn.close() + + ds = Datasette([], immutables=[str(db_path)], default_deny=True) + ds.root_enabled = True + await ds.invoke_startup() + + response = await ds.client.post( + "/immutable/-/queries/store", + actor={"id": "root"}, + json={ + "query": { + "name": "by_name", + "sql": "select * from dogs where name = :name", + } + }, + ) + + ds.close() + assert response.status_code == 201 + data = response.json() + assert data["ok"] is True + assert data["query"]["name"] == "by_name" + assert data["query"]["parameters"] == ["name"] + assert data["query"]["is_write"] is False + + @pytest.mark.asyncio async def test_query_list_and_definition_api(): ds = Datasette(memory=True)