""" Tests for the datasette.app.Datasette class """ import asyncio import dataclasses import hashlib import importlib import os import sqlite3 import time from datasette import Context from datasette.app import Datasette, Database, ResourcesSQL from datasette.database import DatasetteClosedError from datasette.resources import DatabaseResource from datasette.utils import PrefixedUrlString from itsdangerous import BadSignature import pytest @pytest.fixture def datasette(ds_client): return ds_client.ds def test_get_database(datasette): db = datasette.get_database("fixtures") assert "fixtures" == db.name with pytest.raises(KeyError): datasette.get_database("missing") def test_get_database_no_argument(datasette): # Returns the first available database: db = datasette.get_database() assert "fixtures" == db.name @pytest.mark.parametrize("value", ["hello", 123, {"key": "value"}]) @pytest.mark.parametrize("namespace", [None, "two"]) def test_sign_unsign(datasette, value, namespace): extra_args = [namespace] if namespace else [] signed = datasette.sign(value, *extra_args) assert value != signed assert value == datasette.unsign(signed, *extra_args) with pytest.raises(BadSignature): datasette.unsign(signed[:-1] + ("!" if signed[-1] != "!" else ":")) @pytest.mark.parametrize( "setting,expected", ( ("base_url", "/"), ("max_csv_mb", 100), ("allow_csv_stream", True), ), ) def test_datasette_setting(datasette, setting, expected): assert datasette.setting(setting) == expected def _setup_static_app_root(tmp_path, monkeypatch, filename, content): app_module = importlib.import_module("datasette.app") app_root = tmp_path / "app-root" static_path = app_root / "datasette" / "static" static_path.mkdir(parents=True) asset_path = static_path / filename asset_path.write_bytes(content) monkeypatch.setattr(app_module, "app_root", app_root) return asset_path @pytest.mark.asyncio async def test_static_template_function_hashes_core_asset(tmp_path, monkeypatch): _setup_static_app_root(tmp_path, monkeypatch, "demo.js", b"const demo = true;") ds = Datasette() template = ds.get_jinja_environment().from_string("{{ static('demo.js') }}") expected_hash = hashlib.sha256(b"const demo = true;").hexdigest()[:12] assert await template.render_async() == "/-/static/demo.js?_hash={}".format( expected_hash ) assert isinstance(ds.static("demo.js"), PrefixedUrlString) def test_static_hash_cached_when_cache_headers_enabled(tmp_path, monkeypatch): asset_path = _setup_static_app_root(tmp_path, monkeypatch, "demo.js", b"let a = 1;") ds = Datasette(cache_headers=True) first_url = ds.static("demo.js") asset_path.write_bytes(b"let a = 2;") assert ds.static("demo.js") == first_url def test_static_hash_recalculated_when_cache_headers_disabled(tmp_path, monkeypatch): asset_path = _setup_static_app_root(tmp_path, monkeypatch, "demo.js", b"let a = 1;") ds = Datasette(cache_headers=False) first_url = ds.static("demo.js") asset_path.write_bytes(b"let a = 2;") expected_hash = hashlib.sha256(b"let a = 2;").hexdigest()[:12] assert ds.static("demo.js") == "/-/static/demo.js?_hash={}".format(expected_hash) assert ds.static("demo.js") != first_url def test_static_hashes_mounted_static_file(tmp_path): static_path = tmp_path / "static-files" static_path.mkdir() asset_path = static_path / "styles.css" asset_path.write_bytes(b"body { color: black; }") ds = Datasette(static_mounts=[("assets", str(static_path))]) expected_hash = hashlib.sha256(b"body { color: black; }").hexdigest()[:12] assert ds.static("styles.css", mount="assets") == ( "/assets/styles.css?_hash={}".format(expected_hash) ) ds._settings["base_url"] = "/prefix/" assert ds.static("styles.css", mount="assets") == ( "/prefix/assets/styles.css?_hash={}".format(expected_hash) ) def test_static_hashes_plugin_static_file(tmp_path, monkeypatch): plugin_static_path = tmp_path / "plugin-static" plugin_static_path.mkdir() asset_path = plugin_static_path / "plugin.js" asset_path.write_bytes(b"console.log('plugin');") app_module = importlib.import_module("datasette.app") monkeypatch.setattr( app_module, "get_plugins", lambda: [ { "name": "datasette-cluster-map", "static_path": str(plugin_static_path), "templates_path": None, } ], ) ds = Datasette() expected_hash = hashlib.sha256(b"console.log('plugin');").hexdigest()[:12] assert ds.static("plugin.js", plugin="datasette_cluster_map") == ( "/-/static-plugins/datasette_cluster_map/plugin.js?_hash={}".format( expected_hash ) ) def test_static_rejects_plugin_and_mount(): ds = Datasette() with pytest.raises(ValueError): ds.static("styles.css", plugin="datasette_cluster_map", mount="assets") def test_static_rejects_path_traversal(tmp_path, monkeypatch): _setup_static_app_root(tmp_path, monkeypatch, "demo.js", b"") ds = Datasette() with pytest.raises(ValueError, match="cannot escape static root"): ds.static("../secret.js") @pytest.mark.asyncio async def test_datasette_constructor(): ds = Datasette() databases = (await ds.client.get("/-/databases.json")).json() assert databases == [ { "name": "_memory", "route": "_memory", "path": None, "size": 0, "is_mutable": False, "is_memory": True, "hash": None, } ] @pytest.mark.asyncio async def test_num_sql_threads_zero(): ds = Datasette([], memory=True, settings={"num_sql_threads": 0}) db = ds.add_database(Database(ds, memory_name="test_num_sql_threads_zero")) await db.execute_write("create table t(id integer primary key)") await db.execute_write("insert into t (id) values (1)") response = await ds.client.get("/-/threads.json") assert response.json() == {"num_threads": 0, "threads": []} response2 = await ds.client.get("/test_num_sql_threads_zero/t.json?_shape=array") assert response2.json() == [{"id": 1}] ROOT = {"id": "root"} ALLOW_ROOT = {"allow": {"id": "root"}} @pytest.mark.asyncio @pytest.mark.parametrize( "actor,config,action,resource,should_allow,expected_private", ( (None, ALLOW_ROOT, "view-instance", None, False, False), (ROOT, ALLOW_ROOT, "view-instance", None, True, True), ( None, {"databases": {"_memory": ALLOW_ROOT}}, "view-database", DatabaseResource(database="_memory"), False, False, ), ( ROOT, {"databases": {"_memory": ALLOW_ROOT}}, "view-database", DatabaseResource(database="_memory"), True, True, ), # Check private is false for non-protected instance check ( ROOT, {"allow": True}, "view-instance", None, True, False, ), ), ) async def test_datasette_check_visibility( actor, config, action, resource, should_allow, expected_private ): ds = Datasette([], memory=True, config=config) await ds.invoke_startup() visible, private = await ds.check_visibility( actor, action=action, resource=resource ) assert visible == should_allow assert private == expected_private @pytest.mark.asyncio async def test_datasette_render_template_no_request(): # https://github.com/simonw/datasette/issues/1849 ds = Datasette(memory=True) await ds.invoke_startup() rendered = await ds.render_template("error.html") assert "Error " in rendered @pytest.mark.asyncio async def test_datasette_render_template_with_dataclass(): @dataclasses.dataclass class ExampleContext(Context): title: str status: int error: str context = ExampleContext(title="Hello", status=200, error="Error message") ds = Datasette(memory=True) await ds.invoke_startup() rendered = await ds.render_template("error.html", context) assert "

Hello

" in rendered assert "Error message" in rendered def test_datasette_error_if_string_not_list(tmpdir): # https://github.com/simonw/datasette/issues/1985 db_path = str(tmpdir / "data.db") with pytest.raises(ValueError): Datasette(db_path) @pytest.mark.asyncio async def test_get_action(ds_client): ds = ds_client.ds for name_or_abbr in ( "vi", "view-instance", "vt", "view-table", "sct", "set-column-type", ): action = ds.get_action(name_or_abbr) if "-" in name_or_abbr: assert action.name == name_or_abbr else: assert action.abbr == name_or_abbr # And test None return for missing action assert ds.get_action("missing-permission") is None @pytest.mark.asyncio async def test_apply_metadata_json(): ds = Datasette( metadata={ "databases": { "legislators": { "tables": {"offices": {"summary": "office address or sumtin"}}, "queries": { "millennial_representatives": { "summary": "Social media accounts for current legislators" } }, } }, "weird_instance_value": {"nested": [1, 2, 3]}, }, ) await ds.invoke_startup() assert (await ds.client.get("/")).status_code == 200 value = (await ds.get_instance_metadata()).get("weird_instance_value") assert value == '{"nested": [1, 2, 3]}' @pytest.mark.asyncio async def test_allowed_resources_sql(datasette): result = await datasette.allowed_resources_sql( action="view-table", actor=None, ) assert isinstance(result, ResourcesSQL) assert "all_rules AS" in result.sql assert result.params["action"] == "view-table" @pytest.mark.asyncio async def test_datasette_close_closes_all_databases_and_executor(): ds = Datasette(memory=True) await ds.invoke_startup() # Confirm internal DB has write machinery running assert ds._internal_database._write_thread is not None assert ds._internal_database._write_thread.is_alive() temp_path = ds._internal_database.path assert os.path.exists(temp_path) executor = ds.executor ds.close() # Executor is shut down assert executor._shutdown # All attached Database instances are closed for db in ds.databases.values(): assert db._closed assert ds._internal_database._closed # Temp internal DB file is unlinked assert not os.path.exists(temp_path) @pytest.mark.asyncio async def test_datasette_close_is_idempotent(): ds = Datasette(memory=True) await ds.invoke_startup() ds.close() # Second call should be a no-op ds.close() @pytest.mark.asyncio async def test_datasette_close_raises_on_use(): ds = Datasette(memory=True) await ds.invoke_startup() ds.close() with pytest.raises(DatasetteClosedError): await ds.get_internal_database().execute("select 1") async def _datasette_with_sleeping_execute(tmp_path, sleep_ms=200): db_path = tmp_path / "data.db" internal_path = tmp_path / "internal.db" sqlite3.connect(db_path).close() ds = Datasette([str(db_path)], internal=str(internal_path)) loop = asyncio.get_running_loop() sql_started = asyncio.Event() original_prepare_connection = ds._prepare_connection def prepare_connection(conn, name): original_prepare_connection(conn, name) def sleep_ms(ms): loop.call_soon_threadsafe(sql_started.set) time.sleep(ms / 1000) return ms conn.create_function("sleep_ms", 1, sleep_ms) ds._prepare_connection = prepare_connection task = asyncio.create_task( ds.get_database().execute( f"select sleep_ms({sleep_ms})", custom_time_limit=1000 ) ) await asyncio.wait_for(sql_started.wait(), timeout=5) return ds, task @pytest.mark.asyncio async def test_datasette_close_waits_for_in_flight_execute(tmp_path): ds, task = await _datasette_with_sleeping_execute(tmp_path) ds.close() results = await task assert [tuple(row) for row in results.rows] == [(200,)] @pytest.mark.asyncio async def test_datasette_close_waits_for_cancelled_in_flight_execute(tmp_path): ds, task = await _datasette_with_sleeping_execute(tmp_path) task.cancel() with pytest.raises(asyncio.CancelledError): await task ds.close() @pytest.mark.asyncio async def test_asgi_lifespan_shutdown_closes_datasette(): ds = Datasette(memory=True) app = ds.app() # Drive an ASGI lifespan: startup, then shutdown. messages_sent = [] inbox = [ {"type": "lifespan.startup"}, {"type": "lifespan.shutdown"}, ] async def receive(): return inbox.pop(0) async def send(message): messages_sent.append(message) await app({"type": "lifespan"}, receive, send) assert {"type": "lifespan.startup.complete"} in messages_sent assert {"type": "lifespan.shutdown.complete"} in messages_sent assert ds._closed @pytest.mark.asyncio async def test_datasette_close_continues_past_db_error(): # If one Database raises during close(), the others still get closed. ds = Datasette(memory=True) await ds.invoke_startup() class Boom(Database): def close(self): raise RuntimeError("boom") ds.add_database(Boom(ds, is_memory=True), name="bad") good = ds.add_database(Database(ds, is_memory=True), name="good") with pytest.raises(RuntimeError, match="boom"): ds.close() assert good._closed assert ds._internal_database._closed @pytest.mark.asyncio async def test_datasette_render_template_dataclass_values_not_deep_copied(): # display_rows can contain values like sqlite3.Row that cannot be # deep-copied, so render_template must convert Context dataclasses # shallowly - https://github.com/simonw/datasette/issues/2127 class RefusesDeepCopy: def __deepcopy__(self, memo): raise RuntimeError("deepcopy not supported") def __str__(self): return "shallow-copied-value" @dataclasses.dataclass class ExampleContext(Context): title: str status: int error: RefusesDeepCopy context = ExampleContext(title="Hello", status=200, error=RefusesDeepCopy()) ds = Datasette(memory=True) await ds.invoke_startup() rendered = await ds.render_template("error.html", context) assert "shallow-copied-value" in rendered