mirror of
https://github.com/simonw/datasette.git
synced 2026-05-27 20:36:17 +02:00
Closes: - #2709 The key behavior change: after close() starts, no new execute work can be submitted, but already-running execute work is allowed to finish before SQLite connections are closed.
346 lines
10 KiB
Python
346 lines
10 KiB
Python
"""
|
|
Tests for the datasette.app.Datasette class
|
|
"""
|
|
|
|
import asyncio
|
|
import dataclasses
|
|
import os
|
|
import sqlite3
|
|
import time
|
|
from datasette import Context
|
|
from datasette.app import Datasette, Database, ResourcesSQL
|
|
from datasette.database import DatasetteClosedError
|
|
from datasette.resources import DatabaseResource
|
|
from itsdangerous import BadSignature
|
|
import pytest
|
|
|
|
|
|
@pytest.fixture
|
|
def datasette(ds_client):
|
|
return ds_client.ds
|
|
|
|
|
|
def test_get_database(datasette):
|
|
db = datasette.get_database("fixtures")
|
|
assert "fixtures" == db.name
|
|
with pytest.raises(KeyError):
|
|
datasette.get_database("missing")
|
|
|
|
|
|
def test_get_database_no_argument(datasette):
|
|
# Returns the first available database:
|
|
db = datasette.get_database()
|
|
assert "fixtures" == db.name
|
|
|
|
|
|
@pytest.mark.parametrize("value", ["hello", 123, {"key": "value"}])
|
|
@pytest.mark.parametrize("namespace", [None, "two"])
|
|
def test_sign_unsign(datasette, value, namespace):
|
|
extra_args = [namespace] if namespace else []
|
|
signed = datasette.sign(value, *extra_args)
|
|
assert value != signed
|
|
assert value == datasette.unsign(signed, *extra_args)
|
|
with pytest.raises(BadSignature):
|
|
datasette.unsign(signed[:-1] + ("!" if signed[-1] != "!" else ":"))
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"setting,expected",
|
|
(
|
|
("base_url", "/"),
|
|
("max_csv_mb", 100),
|
|
("allow_csv_stream", True),
|
|
),
|
|
)
|
|
def test_datasette_setting(datasette, setting, expected):
|
|
assert datasette.setting(setting) == expected
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_datasette_constructor():
|
|
ds = Datasette()
|
|
databases = (await ds.client.get("/-/databases.json")).json()
|
|
assert databases == [
|
|
{
|
|
"name": "_memory",
|
|
"route": "_memory",
|
|
"path": None,
|
|
"size": 0,
|
|
"is_mutable": False,
|
|
"is_memory": True,
|
|
"hash": None,
|
|
}
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_num_sql_threads_zero():
|
|
ds = Datasette([], memory=True, settings={"num_sql_threads": 0})
|
|
db = ds.add_database(Database(ds, memory_name="test_num_sql_threads_zero"))
|
|
await db.execute_write("create table t(id integer primary key)")
|
|
await db.execute_write("insert into t (id) values (1)")
|
|
response = await ds.client.get("/-/threads.json")
|
|
assert response.json() == {"num_threads": 0, "threads": []}
|
|
response2 = await ds.client.get("/test_num_sql_threads_zero/t.json?_shape=array")
|
|
assert response2.json() == [{"id": 1}]
|
|
|
|
|
|
ROOT = {"id": "root"}
|
|
ALLOW_ROOT = {"allow": {"id": "root"}}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"actor,config,action,resource,should_allow,expected_private",
|
|
(
|
|
(None, ALLOW_ROOT, "view-instance", None, False, False),
|
|
(ROOT, ALLOW_ROOT, "view-instance", None, True, True),
|
|
(
|
|
None,
|
|
{"databases": {"_memory": ALLOW_ROOT}},
|
|
"view-database",
|
|
DatabaseResource(database="_memory"),
|
|
False,
|
|
False,
|
|
),
|
|
(
|
|
ROOT,
|
|
{"databases": {"_memory": ALLOW_ROOT}},
|
|
"view-database",
|
|
DatabaseResource(database="_memory"),
|
|
True,
|
|
True,
|
|
),
|
|
# Check private is false for non-protected instance check
|
|
(
|
|
ROOT,
|
|
{"allow": True},
|
|
"view-instance",
|
|
None,
|
|
True,
|
|
False,
|
|
),
|
|
),
|
|
)
|
|
async def test_datasette_check_visibility(
|
|
actor, config, action, resource, should_allow, expected_private
|
|
):
|
|
ds = Datasette([], memory=True, config=config)
|
|
await ds.invoke_startup()
|
|
visible, private = await ds.check_visibility(
|
|
actor, action=action, resource=resource
|
|
)
|
|
assert visible == should_allow
|
|
assert private == expected_private
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_datasette_render_template_no_request():
|
|
# https://github.com/simonw/datasette/issues/1849
|
|
ds = Datasette(memory=True)
|
|
await ds.invoke_startup()
|
|
rendered = await ds.render_template("error.html")
|
|
assert "Error " in rendered
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_datasette_render_template_with_dataclass():
|
|
@dataclasses.dataclass
|
|
class ExampleContext(Context):
|
|
title: str
|
|
status: int
|
|
error: str
|
|
|
|
context = ExampleContext(title="Hello", status=200, error="Error message")
|
|
ds = Datasette(memory=True)
|
|
await ds.invoke_startup()
|
|
rendered = await ds.render_template("error.html", context)
|
|
assert "<h1>Hello</h1>" in rendered
|
|
assert "Error message" in rendered
|
|
|
|
|
|
def test_datasette_error_if_string_not_list(tmpdir):
|
|
# https://github.com/simonw/datasette/issues/1985
|
|
db_path = str(tmpdir / "data.db")
|
|
with pytest.raises(ValueError):
|
|
Datasette(db_path)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_action(ds_client):
|
|
ds = ds_client.ds
|
|
for name_or_abbr in (
|
|
"vi",
|
|
"view-instance",
|
|
"vt",
|
|
"view-table",
|
|
"sct",
|
|
"set-column-type",
|
|
):
|
|
action = ds.get_action(name_or_abbr)
|
|
if "-" in name_or_abbr:
|
|
assert action.name == name_or_abbr
|
|
else:
|
|
assert action.abbr == name_or_abbr
|
|
# And test None return for missing action
|
|
assert ds.get_action("missing-permission") is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_apply_metadata_json():
|
|
ds = Datasette(
|
|
metadata={
|
|
"databases": {
|
|
"legislators": {
|
|
"tables": {"offices": {"summary": "office address or sumtin"}},
|
|
"queries": {
|
|
"millennial_representatives": {
|
|
"summary": "Social media accounts for current legislators"
|
|
}
|
|
},
|
|
}
|
|
},
|
|
"weird_instance_value": {"nested": [1, 2, 3]},
|
|
},
|
|
)
|
|
await ds.invoke_startup()
|
|
assert (await ds.client.get("/")).status_code == 200
|
|
value = (await ds.get_instance_metadata()).get("weird_instance_value")
|
|
assert value == '{"nested": [1, 2, 3]}'
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_allowed_resources_sql(datasette):
|
|
result = await datasette.allowed_resources_sql(
|
|
action="view-table",
|
|
actor=None,
|
|
)
|
|
assert isinstance(result, ResourcesSQL)
|
|
assert "all_rules AS" in result.sql
|
|
assert result.params["action"] == "view-table"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_datasette_close_closes_all_databases_and_executor():
|
|
ds = Datasette(memory=True)
|
|
await ds.invoke_startup()
|
|
# Confirm internal DB has write machinery running
|
|
assert ds._internal_database._write_thread is not None
|
|
assert ds._internal_database._write_thread.is_alive()
|
|
temp_path = ds._internal_database.path
|
|
assert os.path.exists(temp_path)
|
|
executor = ds.executor
|
|
ds.close()
|
|
# Executor is shut down
|
|
assert executor._shutdown
|
|
# All attached Database instances are closed
|
|
for db in ds.databases.values():
|
|
assert db._closed
|
|
assert ds._internal_database._closed
|
|
# Temp internal DB file is unlinked
|
|
assert not os.path.exists(temp_path)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_datasette_close_is_idempotent():
|
|
ds = Datasette(memory=True)
|
|
await ds.invoke_startup()
|
|
ds.close()
|
|
# Second call should be a no-op
|
|
ds.close()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_datasette_close_raises_on_use():
|
|
ds = Datasette(memory=True)
|
|
await ds.invoke_startup()
|
|
ds.close()
|
|
with pytest.raises(DatasetteClosedError):
|
|
await ds.get_internal_database().execute("select 1")
|
|
|
|
|
|
async def _datasette_with_sleeping_execute(tmp_path, sleep_ms=200):
|
|
db_path = tmp_path / "data.db"
|
|
internal_path = tmp_path / "internal.db"
|
|
sqlite3.connect(db_path).close()
|
|
ds = Datasette([str(db_path)], internal=str(internal_path))
|
|
loop = asyncio.get_running_loop()
|
|
sql_started = asyncio.Event()
|
|
original_prepare_connection = ds._prepare_connection
|
|
|
|
def prepare_connection(conn, name):
|
|
original_prepare_connection(conn, name)
|
|
|
|
def sleep_ms(ms):
|
|
loop.call_soon_threadsafe(sql_started.set)
|
|
time.sleep(ms / 1000)
|
|
return ms
|
|
|
|
conn.create_function("sleep_ms", 1, sleep_ms)
|
|
|
|
ds._prepare_connection = prepare_connection
|
|
task = asyncio.create_task(
|
|
ds.get_database().execute(
|
|
f"select sleep_ms({sleep_ms})", custom_time_limit=1000
|
|
)
|
|
)
|
|
await asyncio.wait_for(sql_started.wait(), timeout=5)
|
|
return ds, task
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_datasette_close_waits_for_in_flight_execute(tmp_path):
|
|
ds, task = await _datasette_with_sleeping_execute(tmp_path)
|
|
ds.close()
|
|
results = await task
|
|
assert [tuple(row) for row in results.rows] == [(200,)]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_datasette_close_waits_for_cancelled_in_flight_execute(tmp_path):
|
|
ds, task = await _datasette_with_sleeping_execute(tmp_path)
|
|
task.cancel()
|
|
with pytest.raises(asyncio.CancelledError):
|
|
await task
|
|
ds.close()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_asgi_lifespan_shutdown_closes_datasette():
|
|
ds = Datasette(memory=True)
|
|
app = ds.app()
|
|
# Drive an ASGI lifespan: startup, then shutdown.
|
|
messages_sent = []
|
|
inbox = [
|
|
{"type": "lifespan.startup"},
|
|
{"type": "lifespan.shutdown"},
|
|
]
|
|
|
|
async def receive():
|
|
return inbox.pop(0)
|
|
|
|
async def send(message):
|
|
messages_sent.append(message)
|
|
|
|
await app({"type": "lifespan"}, receive, send)
|
|
assert {"type": "lifespan.startup.complete"} in messages_sent
|
|
assert {"type": "lifespan.shutdown.complete"} in messages_sent
|
|
assert ds._closed
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_datasette_close_continues_past_db_error():
|
|
# If one Database raises during close(), the others still get closed.
|
|
ds = Datasette(memory=True)
|
|
await ds.invoke_startup()
|
|
|
|
class Boom(Database):
|
|
def close(self):
|
|
raise RuntimeError("boom")
|
|
|
|
ds.add_database(Boom(ds, is_memory=True), name="bad")
|
|
good = ds.add_database(Database(ds, is_memory=True), name="good")
|
|
with pytest.raises(RuntimeError, match="boom"):
|
|
ds.close()
|
|
assert good._closed
|
|
assert ds._internal_database._closed
|