From 3110faa0bab8cdeb4e4e042e87fefa434f64162f Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Sat, 16 May 2026 11:45:43 -0700 Subject: [PATCH] Replace Janus queue with asyncio.Future Closes #1752 AI generated patch explanation: https://gisthost.github.io/?e2b8d9c7666e988b5c003ff5e5ef3098 --- datasette/database.py | 117 +++++++++++++++++++------------ docs/changelog.rst | 8 +++ pyproject.toml | 1 - tests/test_internals_database.py | 34 +++++++++ tests/test_write_wrapper.py | 27 ++++++- 5 files changed, 140 insertions(+), 47 deletions(-) diff --git a/datasette/database.py b/datasette/database.py index 657adfa5..66d50ffa 100644 --- a/datasette/database.py +++ b/datasette/database.py @@ -4,7 +4,6 @@ from collections import namedtuple import inspect import os from pathlib import Path -import janus import queue import sqlite_utils import sys @@ -330,13 +329,16 @@ class Database: else: # For non-blocking writes, spawn a background task to # dispatch events after the write thread completes - task_id, reply_queue = result + task_id, reply_future = result async def _dispatch_events_after_write(): - write_result = await reply_queue.async_q.get() - if not isinstance(write_result, Exception): - for event in pending_events: - await self.ds.track_event(event) + try: + await reply_future + except Exception: + # if the write failed, don't emit success events + return + for event in pending_events: + await self.ds.track_event(event) asyncio.ensure_future(_dispatch_events_after_write()) result = task_id @@ -390,18 +392,15 @@ class Database: ) self._write_thread.start() task_id = uuid.uuid5(uuid.NAMESPACE_DNS, "datasette.io") - reply_queue = janus.Queue() + loop = asyncio.get_running_loop() + reply_future = loop.create_future() self._write_queue.put( - WriteTask(fn, task_id, reply_queue, isolated_connection, transaction) + WriteTask(fn, task_id, loop, reply_future, isolated_connection, transaction) ) if block: - result = await reply_queue.async_q.get() - if isinstance(result, Exception): - raise result - else: - return result + return await reply_future else: - return task_id, reply_queue + return task_id, reply_future def _execute_writes(self): # Infinite looping thread that protects the single write connection @@ -422,36 +421,37 @@ class Database: except Exception: pass return + exception = None + result = None if conn_exception is not None: - result = conn_exception + exception = conn_exception + elif task.isolated_connection: + isolated_connection = self.connect(write=True) + try: + result = task.fn(isolated_connection) + except Exception as e: + sys.stderr.write("{}\n".format(e)) + sys.stderr.flush() + exception = e + finally: + isolated_connection.close() + try: + self._all_file_connections.remove(isolated_connection) + except ValueError: + # Was probably a memory connection + pass else: - if task.isolated_connection: - isolated_connection = self.connect(write=True) - try: - result = task.fn(isolated_connection) - except Exception as e: - sys.stderr.write("{}\n".format(e)) - sys.stderr.flush() - result = e - finally: - isolated_connection.close() - try: - self._all_file_connections.remove(isolated_connection) - except ValueError: - # Was probably a memory connection - pass - else: - try: - if task.transaction: - with conn: - result = task.fn(conn) - else: + try: + if task.transaction: + with conn: result = task.fn(conn) - except Exception as e: - sys.stderr.write("{}\n".format(e)) - sys.stderr.flush() - result = e - task.reply_queue.sync_q.put(result) + else: + result = task.fn(conn) + except Exception as e: + sys.stderr.write("{}\n".format(e)) + sys.stderr.flush() + exception = e + _deliver_write_result(task, result, exception) async def execute_fn(self, fn): self._check_not_closed() @@ -892,16 +892,45 @@ def _apply_write_wrapper(fn, wrapper_factory, track_event): class WriteTask: - __slots__ = ("fn", "task_id", "reply_queue", "isolated_connection", "transaction") + __slots__ = ( + "fn", + "task_id", + "loop", + "reply_future", + "isolated_connection", + "transaction", + ) - def __init__(self, fn, task_id, reply_queue, isolated_connection, transaction): + def __init__( + self, fn, task_id, loop, reply_future, isolated_connection, transaction + ): self.fn = fn self.task_id = task_id - self.reply_queue = reply_queue + self.loop = loop + self.reply_future = reply_future self.isolated_connection = isolated_connection self.transaction = transaction +def _deliver_write_result(task, result, exception): + # Called from the write thread. Delivers the result back to the + # awaiting coroutine on its event loop via call_soon_threadsafe. + def _set(): + if task.reply_future.done(): + # Awaiter was cancelled; nothing to do. + return + if exception is not None: + task.reply_future.set_exception(exception) + else: + task.reply_future.set_result(result) + + try: + task.loop.call_soon_threadsafe(_set) + except RuntimeError: + # Event loop has been closed; the awaiter is gone. + pass + + class QueryInterrupted(Exception): def __init__(self, e, sql, params): self.e = e diff --git a/docs/changelog.rst b/docs/changelog.rst index 4f26066c..5b637797 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -4,6 +4,14 @@ Changelog ========= +.. _unreleased: + +Unreleased +---------- + +- Dropped Janus as a dependency, previously used to manage the write queue. This should not have any impact on plugin developers or end-users. (:issue:`1752`) + + .. _v1_0_a29: 1.0a29 (2026-05-12) diff --git a/pyproject.toml b/pyproject.toml index e6007afd..c50c720a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,6 @@ dependencies = [ "pluggy>=1.0", "uvicorn>=0.11", "aiofiles>=0.4", - "janus>=0.6.2", "PyYAML>=5.3", "mergedeep>=1.1.1", "itsdangerous>=1.1", diff --git a/tests/test_internals_database.py b/tests/test_internals_database.py index 8ff74a83..75ae8d39 100644 --- a/tests/test_internals_database.py +++ b/tests/test_internals_database.py @@ -2,9 +2,12 @@ Tests for the datasette.database.Database class """ +import asyncio +from types import SimpleNamespace from datasette.app import Datasette from datasette.database import Database, Results, MultipleValues from datasette.database import DatasetteClosedError +from datasette.database import _deliver_write_result from datasette.utils.sqlite import sqlite3, sqlite_version from datasette.utils import Column import pytest @@ -590,6 +593,37 @@ async def test_execute_write_fn_connection_exception(tmpdir, app_client): app_client.ds.remove_database("immutable-db") +@pytest.mark.asyncio +async def test_deliver_write_result_leaves_done_future_alone(): + loop = asyncio.get_running_loop() + reply_future = loop.create_future() + reply_future.set_result("original") + task = SimpleNamespace(loop=loop, reply_future=reply_future) + + # The write thread can finish after the caller has stopped waiting for the + # result. Delivery should notice that the future is already resolved and + # leave the caller's outcome alone instead of raising InvalidStateError. + _deliver_write_result(task, "replacement", None) + await asyncio.sleep(0) + + assert reply_future.result() == "original" + + +@pytest.mark.asyncio +async def test_deliver_write_result_ignores_closed_loop(): + closed_loop = asyncio.new_event_loop() + closed_loop.close() + reply_future = asyncio.get_running_loop().create_future() + task = SimpleNamespace(loop=closed_loop, reply_future=reply_future) + + # If the event loop that submitted the write has gone away, the write + # thread should drop the result rather than crash while reporting back to + # that closed loop. + _deliver_write_result(task, "result", None) + + assert not reply_future.done() + + def table_exists(conn, name): return bool( conn.execute( diff --git a/tests/test_write_wrapper.py b/tests/test_write_wrapper.py index 48c964b4..88ce5520 100644 --- a/tests/test_write_wrapper.py +++ b/tests/test_write_wrapper.py @@ -2,6 +2,7 @@ Tests for the write_wrapper plugin hook. """ +import asyncio from dataclasses import dataclass from datasette.app import Datasette from datasette.events import Event @@ -633,8 +634,6 @@ async def test_track_event_with_block_false(ds_with_event_tracking): assert task_id is not None # Give the background task time to complete - import asyncio - for _ in range(50): if ds._tracked_events: break @@ -644,6 +643,30 @@ async def test_track_event_with_block_false(ds_with_event_tracking): assert ds._tracked_events[0].message == "non-blocking" +@pytest.mark.asyncio +async def test_track_event_with_block_false_discarded_on_exception( + ds_with_event_tracking, +): + """Events queued by a non-blocking write are discarded if the write fails.""" + ds = ds_with_event_tracking + db = ds.get_database("test") + + def my_write(conn, track_event): + track_event(DummyEvent(actor=None, message="should not fire")) + raise ValueError("deliberate error") + + task_id = await db.execute_write_fn(my_write, block=False) + assert task_id is not None + + # A following blocking write proves the failed non-blocking task has + # completed; one more loop turn lets its event-dispatch task observe the + # exception and exit. + await db.execute_write_fn(lambda conn: conn.execute("select 1")) + await asyncio.sleep(0) + + assert ds._tracked_events == [] + + # --- Tests for RenameTableEvent detection ---