datasette/tests/test_internals_database.py
Claude e34aac77a9
Add Database.request_connection() for per-request scoped connections
Opens a fresh sqlite3 connection scoped to an async-with block, intended
for short-lived per-request work such as installing a set_authorizer
callback derived from request.actor before running queries. The
connection is not pooled, not cached, and closed when the context exits.

- Database.request_connection(write=False, run_prepare_connection_hook=False)
  yields a RequestConnection with .connection, .execute() and .execute_fn()
- Datasette._prepare_connection gains run_plugin_hook=True kwarg; the
  prepare_connection plugin hook is skipped by default for these
  connections to keep them cheap
- Database.connect gains track=False to skip _all_file_connections
  tracking when the caller owns lifecycle
- Database.execute body extracted into _make_sql_operation so both
  pooled execute() and RequestConnection.execute() share it

https://claude.ai/code/session_01GdaNscbub6d2MPaUANrhJj
2026-05-19 02:07:18 +00:00

1078 lines
33 KiB
Python

"""
Tests for the datasette.database.Database class
"""
import asyncio
from types import SimpleNamespace
from datasette.app import Datasette
from datasette.database import Database, Results, MultipleValues
from datasette.database import DatasetteClosedError
from datasette.database import _deliver_write_result
from datasette.utils.sqlite import sqlite3, sqlite_version
from datasette.utils import Column
import pytest
import time
import uuid
@pytest.fixture
def db(app_client):
return app_client.ds.get_database("fixtures")
@pytest.mark.asyncio
async def test_execute(db):
results = await db.execute("select * from facetable")
assert isinstance(results, Results)
assert 15 == len(results)
@pytest.mark.asyncio
async def test_results_first(db):
assert None is (await db.execute("select * from facetable where pk > 100")).first()
results = await db.execute("select * from facetable")
row = results.first()
assert isinstance(row, sqlite3.Row)
@pytest.mark.asyncio
@pytest.mark.parametrize("expected", (True, False))
async def test_results_bool(db, expected):
where = "" if expected else "where pk = 0"
results = await db.execute("select * from facetable {}".format(where))
assert bool(results) is expected
@pytest.mark.asyncio
async def test_results_dicts(db):
results = await db.execute("select pk, name from roadside_attractions")
assert results.dicts() == [
{"pk": 1, "name": "The Mystery Spot"},
{"pk": 2, "name": "Winchester Mystery House"},
{"pk": 3, "name": "Burlingame Museum of PEZ Memorabilia"},
{"pk": 4, "name": "Bigfoot Discovery Museum"},
]
@pytest.mark.parametrize(
"query,expected",
[
("select 1", 1),
("select 1, 2", None),
("select 1 as num union select 2 as num", None),
],
)
@pytest.mark.asyncio
async def test_results_single_value(db, query, expected):
results = await db.execute(query)
if expected:
assert expected == results.single_value()
else:
with pytest.raises(MultipleValues):
results.single_value()
@pytest.mark.asyncio
async def test_execute_fn(db):
def get_1_plus_1(conn):
return conn.execute("select 1 + 1").fetchall()[0][0]
assert 2 == await db.execute_fn(get_1_plus_1)
@pytest.mark.asyncio
async def test_execute_fn_transaction_false():
datasette = Datasette(memory=True)
db = datasette.add_memory_database("test_execute_fn_transaction_false")
def run(conn):
try:
with conn:
conn.execute("create table foo (id integer primary key)")
conn.execute("insert into foo (id) values (44)")
# Table should exist
assert (
conn.execute(
'select count(*) from sqlite_master where name = "foo"'
).fetchone()[0]
== 1
)
assert conn.execute("select id from foo").fetchall()[0][0] == 44
raise ValueError("Cancel commit")
except ValueError:
pass
# Row should NOT exist
assert conn.execute("select count(*) from foo").fetchone()[0] == 0
await db.execute_write_fn(run, transaction=False)
@pytest.mark.parametrize(
"tables,exists",
(
(["facetable", "searchable", "tags", "searchable_tags"], True),
(["foo", "bar", "baz"], False),
),
)
@pytest.mark.asyncio
async def test_table_exists(db, tables, exists):
for table in tables:
actual = await db.table_exists(table)
assert exists == actual
@pytest.mark.parametrize(
"view,expected",
(
("not_a_view", False),
("paginated_view", True),
),
)
@pytest.mark.asyncio
async def test_view_exists(db, view, expected):
actual = await db.view_exists(view)
assert actual == expected
@pytest.mark.parametrize(
"table,expected",
(
(
"facetable",
[
"pk",
"created",
"planet_int",
"on_earth",
"state",
"_city_id",
"_neighborhood",
"tags",
"complex_array",
"distinct_some_null",
"n",
],
),
(
"sortable",
[
"pk1",
"pk2",
"content",
"sortable",
"sortable_with_nulls",
"sortable_with_nulls_2",
"text",
],
),
),
)
@pytest.mark.asyncio
async def test_table_columns(db, table, expected):
columns = await db.table_columns(table)
assert columns == expected
@pytest.mark.parametrize(
"table,expected",
(
(
"facetable",
[
Column(
cid=0,
name="pk",
type="integer",
notnull=0,
default_value=None,
is_pk=1,
hidden=0,
),
Column(
cid=1,
name="created",
type="text",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=2,
name="planet_int",
type="integer",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=3,
name="on_earth",
type="integer",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=4,
name="state",
type="text",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=5,
name="_city_id",
type="integer",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=6,
name="_neighborhood",
type="text",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=7,
name="tags",
type="text",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=8,
name="complex_array",
type="text",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=9,
name="distinct_some_null",
type="",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=10,
name="n",
type="text",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
],
),
(
"sortable",
[
Column(
cid=0,
name="pk1",
type="varchar(30)",
notnull=0,
default_value=None,
is_pk=1,
hidden=0,
),
Column(
cid=1,
name="pk2",
type="varchar(30)",
notnull=0,
default_value=None,
is_pk=2,
hidden=0,
),
Column(
cid=2,
name="content",
type="text",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=3,
name="sortable",
type="integer",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=4,
name="sortable_with_nulls",
type="real",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=5,
name="sortable_with_nulls_2",
type="real",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
Column(
cid=6,
name="text",
type="text",
notnull=0,
default_value=None,
is_pk=0,
hidden=0,
),
],
),
),
)
@pytest.mark.asyncio
async def test_table_column_details(db, table, expected):
columns = await db.table_column_details(table)
# Convert "type" to lowercase before comparison
# https://github.com/simonw/datasette/issues/1647
compare_columns = [
Column(
c.cid, c.name, c.type.lower(), c.notnull, c.default_value, c.is_pk, c.hidden
)
for c in columns
]
assert compare_columns == expected
@pytest.mark.asyncio
async def test_get_all_foreign_keys(db):
all_foreign_keys = await db.get_all_foreign_keys()
assert all_foreign_keys["roadside_attraction_characteristics"] == {
"incoming": [],
"outgoing": [
{
"other_table": "attraction_characteristic",
"column": "characteristic_id",
"other_column": "pk",
},
{
"other_table": "roadside_attractions",
"column": "attraction_id",
"other_column": "pk",
},
],
}
assert all_foreign_keys["attraction_characteristic"] == {
"incoming": [
{
"other_table": "roadside_attraction_characteristics",
"column": "pk",
"other_column": "characteristic_id",
}
],
"outgoing": [],
}
assert all_foreign_keys["compound_primary_key"] == {
# No incoming because these are compound foreign keys, which we currently ignore
"incoming": [],
"outgoing": [],
}
assert all_foreign_keys["foreign_key_references"] == {
"incoming": [],
"outgoing": [
{
"other_table": "primary_key_multiple_columns",
"column": "foreign_key_with_no_label",
"other_column": "id",
},
{
"other_table": "simple_primary_key",
"column": "foreign_key_with_blank_label",
"other_column": "id",
},
{
"other_table": "simple_primary_key",
"column": "foreign_key_with_label",
"other_column": "id",
},
],
}
@pytest.mark.asyncio
async def test_table_names(db):
table_names = await db.table_names()
# Tables are sorted alphabetically by name
assert table_names == [
"123_starts_with_digits",
"Table With Space In Name",
"attraction_characteristic",
"binary_data",
"complex_foreign_keys",
"compound_primary_key",
"compound_three_primary_keys",
"custom_foreign_key_label",
"facet_cities",
"facetable",
"foreign_key_references",
"infinity",
"no_primary_key",
"primary_key_multiple_columns",
"primary_key_multiple_columns_explicit_label",
"roadside_attraction_characteristics",
"roadside_attractions",
"searchable",
"searchable_fts",
"searchable_fts_config",
"searchable_fts_data",
"searchable_fts_docsize",
"searchable_fts_idx",
"searchable_tags",
"select",
"simple_primary_key",
"sortable",
"table/with/slashes.csv",
"tags",
]
@pytest.mark.asyncio
async def test_view_names(db):
view_names = await db.view_names()
assert view_names == [
"paginated_view",
"simple_view",
"searchable_view",
"searchable_view_configured_by_metadata",
]
@pytest.mark.asyncio
async def test_execute_write_block_true(db):
await db.execute_write(
"update roadside_attractions set name = ? where pk = ?", ["Mystery!", 1]
)
rows = await db.execute("select name from roadside_attractions where pk = 1")
assert "Mystery!" == rows.rows[0][0]
@pytest.mark.asyncio
async def test_execute_write_block_false(db):
await db.execute_write(
"update roadside_attractions set name = ? where pk = ?",
["Mystery!", 1],
)
time.sleep(0.1)
rows = await db.execute("select name from roadside_attractions where pk = 1")
assert "Mystery!" == rows.rows[0][0]
@pytest.mark.asyncio
async def test_execute_write_script(db):
await db.execute_write_script(
"create table foo (id integer primary key); create table bar (id integer primary key);"
)
table_names = await db.table_names()
assert {"foo", "bar"}.issubset(table_names)
@pytest.mark.asyncio
async def test_execute_write_many(db):
await db.execute_write_script("create table foomany (id integer primary key)")
await db.execute_write_many(
"insert into foomany (id) values (?)", [(1,), (10,), (100,)]
)
result = await db.execute("select * from foomany")
assert [r[0] for r in result.rows] == [1, 10, 100]
@pytest.mark.asyncio
async def test_execute_write_has_correctly_prepared_connection(db):
# The sleep() function is only available if ds._prepare_connection() was called
await db.execute_write("select sleep(0.01)")
@pytest.mark.asyncio
async def test_execute_write_fn_block_false(db):
def write_fn(conn):
conn.execute("delete from roadside_attractions where pk = 1;")
row = conn.execute("select count(*) from roadside_attractions").fetchone()
return row[0]
task_id = await db.execute_write_fn(write_fn, block=False)
assert isinstance(task_id, uuid.UUID)
@pytest.mark.asyncio
async def test_execute_write_fn_block_true(db):
def write_fn(conn):
conn.execute("delete from roadside_attractions where pk = 1;")
row = conn.execute("select count(*) from roadside_attractions").fetchone()
return row[0]
new_count = await db.execute_write_fn(write_fn)
assert 3 == new_count
@pytest.mark.asyncio
async def test_execute_write_fn_exception(db):
def write_fn(conn):
assert False
with pytest.raises(AssertionError):
await db.execute_write_fn(write_fn)
@pytest.mark.asyncio
@pytest.mark.parametrize("param_name", ["conn", "connection", "db", "c"])
async def test_execute_write_fn_accepts_any_single_param_name(db, param_name):
# Plugins historically relied on the fact that the callback was invoked
# positionally, so any parameter name worked. Preserve that contract.
scope = {}
exec(
"def write_fn({0}):\n"
" return {0}.execute('select 1 + 1').fetchone()[0]".format(param_name),
scope,
)
write_fn = scope["write_fn"]
result = await db.execute_write_fn(write_fn)
assert result == 2
@pytest.mark.asyncio
async def test_execute_write_fn_with_track_event(db):
# When the callback declares track_event it still receives both args
# via dependency injection.
seen = []
def write_fn(conn, track_event):
seen.append(track_event)
return conn.execute("select 1 + 1").fetchone()[0]
result = await db.execute_write_fn(write_fn)
assert result == 2
assert len(seen) == 1 and callable(seen[0])
@pytest.mark.asyncio
@pytest.mark.timeout(1)
async def test_execute_write_fn_connection_exception(tmpdir, app_client):
path = str(tmpdir / "immutable.db")
conn = sqlite3.connect(path)
conn.execute("vacuum")
conn.close()
db = Database(app_client.ds, path=path, is_mutable=False)
app_client.ds.add_database(db, name="immutable-db")
def write_fn(conn):
assert False
with pytest.raises(AssertionError):
await db.execute_write_fn(write_fn)
app_client.ds.remove_database("immutable-db")
@pytest.mark.asyncio
async def test_deliver_write_result_leaves_done_future_alone():
loop = asyncio.get_running_loop()
reply_future = loop.create_future()
reply_future.set_result("original")
task = SimpleNamespace(loop=loop, reply_future=reply_future)
# The write thread can finish after the caller has stopped waiting for the
# result. Delivery should notice that the future is already resolved and
# leave the caller's outcome alone instead of raising InvalidStateError.
_deliver_write_result(task, "replacement", None)
await asyncio.sleep(0)
assert reply_future.result() == "original"
@pytest.mark.asyncio
async def test_deliver_write_result_ignores_closed_loop():
closed_loop = asyncio.new_event_loop()
closed_loop.close()
reply_future = asyncio.get_running_loop().create_future()
task = SimpleNamespace(loop=closed_loop, reply_future=reply_future)
# If the event loop that submitted the write has gone away, the write
# thread should drop the result rather than crash while reporting back to
# that closed loop.
_deliver_write_result(task, "result", None)
assert not reply_future.done()
def table_exists(conn, name):
return bool(
conn.execute(
"""
with all_tables as (
select name from sqlite_master where type = 'table'
union all
select name from temp.sqlite_master where type = 'table'
)
select 1 from all_tables where name = ?
""",
(name,),
).fetchall(),
)
def table_exists_checker(name):
def inner(conn):
return table_exists(conn, name)
return inner
@pytest.mark.asyncio
@pytest.mark.parametrize("disable_threads", (False, True))
async def test_execute_isolated(db, disable_threads):
if disable_threads:
ds = Datasette(memory=True, settings={"num_sql_threads": 0})
db = ds.add_database(Database(ds, memory_name="test_num_sql_threads_zero"))
# Create temporary table in write
await db.execute_write(
"create temporary table created_by_write (id integer primary key)"
)
# Should stay visible to write connection
assert await db.execute_write_fn(table_exists_checker("created_by_write"))
def create_shared_table(conn):
conn.execute("create table shared (id integer primary key)")
# And a temporary table that should not continue to exist
conn.execute(
"create temporary table created_by_isolated (id integer primary key)"
)
assert table_exists(conn, "created_by_isolated")
# Also confirm that created_by_write does not exist
return table_exists(conn, "created_by_write")
# shared should not exist
assert not await db.execute_fn(table_exists_checker("shared"))
# Create it using isolated
created_by_write_exists = await db.execute_isolated_fn(create_shared_table)
assert not created_by_write_exists
# shared SHOULD exist now
assert await db.execute_fn(table_exists_checker("shared"))
# created_by_isolated should not exist, even in write connection
assert not await db.execute_write_fn(table_exists_checker("created_by_isolated"))
# ... and a second call to isolated should not see that connection either
assert not await db.execute_isolated_fn(table_exists_checker("created_by_isolated"))
@pytest.mark.asyncio
async def test_mtime_ns(db):
assert isinstance(db.mtime_ns, int)
def test_mtime_ns_is_none_for_memory(app_client):
memory_db = Database(app_client.ds, is_memory=True)
assert memory_db.is_memory is True
assert None is memory_db.mtime_ns
def test_is_mutable(app_client):
assert Database(app_client.ds, is_memory=True).is_mutable is True
assert Database(app_client.ds, is_memory=True, is_mutable=True).is_mutable is True
assert Database(app_client.ds, is_memory=True, is_mutable=False).is_mutable is False
@pytest.mark.asyncio
async def test_attached_databases(app_client_two_attached_databases_crossdb_enabled):
database = app_client_two_attached_databases_crossdb_enabled.ds.get_database(
"_memory"
)
attached = await database.attached_databases()
assert {a.name for a in attached} == {"extra database", "fixtures"}
@pytest.mark.asyncio
async def test_database_memory_name(app_client):
ds = app_client.ds
foo1 = ds.add_database(Database(ds, memory_name="foo"))
foo2 = ds.add_memory_database("foo")
bar1 = ds.add_database(Database(ds, memory_name="bar"))
bar2 = ds.add_memory_database("bar")
for db in (foo1, foo2, bar1, bar2):
table_names = await db.table_names()
assert table_names == []
# Now create a table in foo
await foo1.execute_write("create table foo (t text)")
assert await foo1.table_names() == ["foo"]
assert await foo2.table_names() == ["foo"]
assert await bar1.table_names() == []
assert await bar2.table_names() == []
@pytest.mark.asyncio
async def test_in_memory_databases_forbid_writes(app_client):
ds = app_client.ds
db = ds.add_database(Database(ds, memory_name="test"))
with pytest.raises(sqlite3.OperationalError):
await db.execute("create table foo (t text)")
assert await db.table_names() == []
# Using db.execute_write() should work:
await db.execute_write("create table foo (t text)")
assert await db.table_names() == ["foo"]
def pragma_table_list_supported():
return sqlite_version()[1] >= 37
@pytest.mark.asyncio
@pytest.mark.skipif(
not pragma_table_list_supported(), reason="Requires PRAGMA table_list support"
)
async def test_hidden_tables(app_client):
ds = app_client.ds
db = ds.add_database(Database(ds, is_memory=True, is_mutable=True))
assert await db.hidden_table_names() == []
await db.execute("create virtual table f using fts5(a)")
assert await db.hidden_table_names() == [
"f_config",
"f_content",
"f_data",
"f_docsize",
"f_idx",
]
await db.execute("create virtual table r using rtree(id, amin, amax)")
assert await db.hidden_table_names() == [
"f_config",
"f_content",
"f_data",
"f_docsize",
"f_idx",
"r_node",
"r_parent",
"r_rowid",
]
await db.execute("create table _hideme(_)")
assert await db.hidden_table_names() == [
"_hideme",
"f_config",
"f_content",
"f_data",
"f_docsize",
"f_idx",
"r_node",
"r_parent",
"r_rowid",
]
# A fts virtual table with a content table should be hidden too
await db.execute("create virtual table f2_fts using fts5(a, content='f')")
assert await db.hidden_table_names() == [
"_hideme",
"f2_fts_config",
"f2_fts_data",
"f2_fts_docsize",
"f2_fts_idx",
"f_config",
"f_content",
"f_data",
"f_docsize",
"f_idx",
"r_node",
"r_parent",
"r_rowid",
"f2_fts",
]
@pytest.mark.asyncio
async def test_replace_database(tmpdir):
path1 = str(tmpdir / "data1.db")
(tmpdir / "two").mkdir()
path2 = str(tmpdir / "two" / "data1.db")
conn1 = sqlite3.connect(path1)
conn1.executescript("""
create table t (id integer primary key);
insert into t (id) values (1);
insert into t (id) values (2);
""")
conn1.close()
conn2 = sqlite3.connect(path2)
conn2.executescript("""
create table t (id integer primary key);
insert into t (id) values (1);
""")
conn2.close()
datasette = Datasette([path1])
db = datasette.get_database("data1")
count = (await db.execute("select count(*) from t")).first()[0]
assert count == 2
# Now replace that database
datasette.get_database("data1").close()
datasette.remove_database("data1")
datasette.add_database(Database(datasette, path2), "data1")
db2 = datasette.get_database("data1")
count = (await db2.execute("select count(*) from t")).first()[0]
assert count == 1
@pytest.mark.parametrize(
"kwargs,expected_repr",
[
({"is_memory": True}, "<Database: test_db (mutable, memory, size=0)>"),
({"memory_name": "my_mem"}, "<Database: test_db (mutable, memory, size=0)>"),
(
{"is_memory": True, "is_mutable": False},
"<Database: test_db (memory, size=0)>",
),
],
ids=["memory", "named_memory", "immutable_memory"],
)
def test_repr(app_client, kwargs, expected_repr):
db = Database(app_client.ds, **kwargs)
db.name = "test_db"
assert repr(db) == expected_repr
def test_repr_temp_disk(app_client):
db = Database(app_client.ds, is_temp_disk=True)
db.name = "test_db"
r = repr(db)
assert r.startswith("<Database: test_db (mutable, temp_disk, size=")
assert r.endswith(")>")
assert isinstance(db.size, int)
assert isinstance(db.mtime_ns, int)
db.close()
@pytest.mark.asyncio
async def test_database_close_shuts_down_write_thread(tmpdir):
path = str(tmpdir / "dbclose.db")
conn = sqlite3.connect(path)
conn.execute("create table t (id integer primary key)")
conn.close()
ds = Datasette([path])
db = ds.get_database("dbclose")
# Trigger write thread creation
await db.execute_write("insert into t (id) values (1)")
assert db._write_thread is not None
assert db._write_thread.is_alive()
db.close()
# Wait briefly for the thread to exit — the sentinel should cause it to return.
db._write_thread.join(timeout=5)
assert not db._write_thread.is_alive()
ds._internal_database.close()
@pytest.mark.asyncio
async def test_database_close_raises_on_further_use(tmpdir):
path = str(tmpdir / "closed.db")
conn = sqlite3.connect(path)
conn.execute("create table t (id integer primary key)")
conn.close()
ds = Datasette([path])
db = ds.get_database("closed")
await db.execute("select 1")
db.close()
with pytest.raises(DatasetteClosedError):
await db.execute("select 1")
with pytest.raises(DatasetteClosedError):
await db.execute_write("insert into t (id) values (1)")
with pytest.raises(DatasetteClosedError):
await db.execute_fn(lambda conn: conn.execute("select 1").fetchone())
with pytest.raises(DatasetteClosedError):
await db.execute_write_fn(lambda conn: conn.execute("select 1"))
ds._internal_database.close()
@pytest.mark.asyncio
async def test_database_close_is_idempotent(tmpdir):
path = str(tmpdir / "idemp.db")
conn = sqlite3.connect(path)
conn.execute("create table t (id integer primary key)")
conn.close()
ds = Datasette([path])
db = ds.get_database("idemp")
await db.execute_write("insert into t (id) values (1)")
db.close()
# Second call should be a no-op, not raise
db.close()
ds._internal_database.close()
@pytest.mark.asyncio
async def test_request_connection_basic(db):
async with db.request_connection() as conn:
results = await conn.execute("select 1 + 1")
assert results.single_value() == 2
@pytest.mark.asyncio
async def test_request_connection_exposes_raw_sqlite_connection(db):
async with db.request_connection() as conn:
assert isinstance(conn.connection, sqlite3.Connection)
# Direct execution on the raw connection works too
assert conn.connection.execute("select 1").fetchone()[0] == 1
@pytest.mark.asyncio
async def test_request_connection_fresh_each_call(db):
async with db.request_connection() as conn_a:
async with db.request_connection() as conn_b:
assert conn_a.connection is not conn_b.connection
@pytest.mark.asyncio
async def test_request_connection_closes_on_exit(db):
async with db.request_connection() as conn:
raw = conn.connection
await conn.execute("select 1")
with pytest.raises(sqlite3.ProgrammingError):
raw.execute("select 1")
@pytest.mark.asyncio
async def test_request_connection_readonly_by_default(db):
async with db.request_connection() as conn:
with pytest.raises(sqlite3.OperationalError):
await conn.execute(
"create table should_not_exist (id integer primary key)"
)
@pytest.mark.asyncio
async def test_request_connection_writable():
ds = Datasette(memory=True)
db = ds.add_memory_database("test_request_connection_writable")
async with db.request_connection(write=True) as conn:
await conn.execute_fn(
lambda c: c.execute("create table t (id integer primary key)")
)
await conn.execute_fn(lambda c: c.execute("insert into t (id) values (1)"))
row = await conn.execute_fn(
lambda c: c.execute("select id from t").fetchone()
)
assert row[0] == 1
@pytest.mark.asyncio
async def test_request_connection_authorizer_does_not_leak_to_pool(db):
def deny_all(action, *args):
return sqlite3.SQLITE_DENY
async with db.request_connection() as conn:
conn.connection.set_authorizer(deny_all)
with pytest.raises(sqlite3.DatabaseError):
await conn.execute("select 1")
# Plain pool execute should still succeed
assert (await db.execute("select 1")).single_value() == 1
@pytest.mark.asyncio
async def test_request_connection_skips_prepare_connection_hook_by_default(db):
# The `fixtures` db has my_plugin loaded, which would register
# convert_units via the prepare_connection hook. The per-request
# connection should NOT have that function available by default.
async with db.request_connection() as conn:
with pytest.raises(sqlite3.OperationalError):
await conn.execute("select convert_units(100, 'm', 'ft')")
@pytest.mark.asyncio
async def test_request_connection_runs_prepare_connection_hook_when_opted_in(db):
async with db.request_connection(run_prepare_connection_hook=True) as conn:
result = await conn.execute("select convert_units(100, 'm', 'ft')")
assert result.single_value() == pytest.approx(328.0839)
@pytest.mark.asyncio
async def test_request_connection_row_factory_applied(db):
async with db.request_connection() as conn:
row = (await conn.execute("select 1 as one")).first()
assert isinstance(row, sqlite3.Row)
assert row["one"] == 1
@pytest.mark.asyncio
async def test_request_connection_not_tracked_in_all_file_connections(db):
before = list(db._all_file_connections)
async with db.request_connection() as conn:
pass
assert db._all_file_connections == before
@pytest.mark.asyncio
async def test_request_connection_cleans_up_on_exception(db):
raw = None
with pytest.raises(RuntimeError, match="boom"):
async with db.request_connection() as conn:
raw = conn.connection
raise RuntimeError("boom")
with pytest.raises(sqlite3.ProgrammingError):
raw.execute("select 1")
@pytest.mark.asyncio
async def test_request_connection_execute_fn(db):
async with db.request_connection() as conn:
value = await conn.execute_fn(
lambda c: c.execute("select 41 + 1").fetchone()[0]
)
assert value == 42
@pytest.mark.asyncio
@pytest.mark.parametrize("disable_threads", (False, True))
async def test_request_connection_non_threaded(disable_threads):
if disable_threads:
ds = Datasette(memory=True, settings={"num_sql_threads": 0})
else:
ds = Datasette(memory=True)
db = ds.add_memory_database("test_request_connection_non_threaded")
async with db.request_connection() as conn:
assert (await conn.execute("select 1")).single_value() == 1
assert (
await conn.execute_fn(lambda c: c.execute("select 2").fetchone()[0])
) == 2
@pytest.mark.asyncio
async def test_request_connection_raises_after_database_closed(tmpdir):
path = str(tmpdir / "closed_req.db")
conn = sqlite3.connect(path)
conn.execute("create table t (id integer primary key)")
conn.close()
ds = Datasette([path])
db = ds.get_database("closed_req")
await db.execute("select 1")
db.close()
with pytest.raises(DatasetteClosedError):
async with db.request_connection() as conn:
pass
ds._internal_database.close()