From cd838daef4d066e584b047164d8e2a5e96909511 Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Thu, 28 May 2026 15:22:21 -0700 Subject: [PATCH] Refactor tests a bit --- tests/test_queries.py | 449 +++++++++++++++++++++--------------------- 1 file changed, 225 insertions(+), 224 deletions(-) diff --git a/tests/test_queries.py b/tests/test_queries.py index 9c3ebcc8..216cb211 100644 --- a/tests/test_queries.py +++ b/tests/test_queries.py @@ -1700,8 +1700,22 @@ async def test_execute_write_post_requires_database_and_table_permissions(): assert (await db.execute("select name from dogs")).first()[0] == "Cleo" +@pytest.mark.parametrize( + "database_name, sql", + ( + ( + "execute_write_insert_or_replace", + "insert or replace into users(id, email) values (3, 'b@example.com')", + ), + ( + "execute_write_update_or_replace", + "update or replace users set email = 'b@example.com' where id = 1", + ), + ), + ids=("insert-or-replace", "update-or-replace"), +) @pytest.mark.asyncio -async def test_execute_write_insert_or_replace_requires_delete_row_permission(): +async def test_execute_write_replace_requires_delete_row_permission(database_name, sql): ds = Datasette( memory=True, default_deny=True, @@ -1725,7 +1739,7 @@ async def test_execute_write_insert_or_replace_requires_delete_row_permission(): } }, ) - db = ds.add_memory_database("execute_write_insert_or_replace", name="data") + db = ds.add_memory_database(database_name, name="data") await db.execute_write( "create table users (id integer primary key, email text unique)" ) @@ -1738,64 +1752,7 @@ async def test_execute_write_insert_or_replace_requires_delete_row_permission(): denied_response = await ds.client.post( "/data/-/execute-write", actor={"id": "writer"}, - json={ - "sql": ( - "insert or replace into users(id, email) " "values (3, 'b@example.com')" - ) - }, - ) - - assert denied_response.status_code == 403 - assert denied_response.json()["errors"] == [ - "Permission denied: need delete-row on data/users" - ] - assert (await db.execute("select id, email from users order by id")).dicts() == [ - {"id": 1, "email": "a@example.com"}, - {"id": 2, "email": "b@example.com"}, - ] - - -@pytest.mark.asyncio -async def test_execute_write_update_or_replace_requires_delete_row_permission(): - ds = Datasette( - memory=True, - default_deny=True, - config={ - "databases": { - "data": { - "permissions": { - "view-database": {"id": "writer"}, - "execute-write-sql": {"id": "writer"}, - }, - "tables": { - "users": { - "permissions": { - "insert-row": {"id": "writer"}, - "update-row": {"id": "writer"}, - "view-table": {"id": "writer"}, - } - } - }, - } - } - }, - ) - db = ds.add_memory_database("execute_write_update_or_replace", name="data") - await db.execute_write( - "create table users (id integer primary key, email text unique)" - ) - await db.execute_write( - "insert into users (id, email) values " - "(1, 'a@example.com'), (2, 'b@example.com')" - ) - await ds.invoke_startup() - - denied_response = await ds.client.post( - "/data/-/execute-write", - actor={"id": "writer"}, - json={ - "sql": "update or replace users set email = 'b@example.com' where id = 1" - }, + json={"sql": sql}, ) assert denied_response.status_code == 403 @@ -2262,74 +2219,71 @@ async def test_trusted_stored_write_query_skips_vacuum_filtering(): assert response.json()["ok"] is True +@pytest.mark.parametrize( + ( + "database_name", + "setup_sqls", + "write_sql", + "expected_error", + "verification_sql", + "expected_count", + ), + ( + ( + "execute_write_virtual_table_control", + ( + "create virtual table docs using fts5(title, body, content='')", + "insert into docs(rowid, title, body) values (1, 'hello', 'world')", + ), + "insert into docs(docs) values('delete-all')", + "Writes to virtual tables are not allowed in user-supplied SQL", + "select count(*) from docs where docs match 'hello'", + 1, + ), + ( + "execute_write_virtual_table_insert", + ("create virtual table docs using fts5(title, body)",), + "insert into docs(rowid, title, body) values (1, 'a', 'b')", + "Writes to virtual tables are not allowed in user-supplied SQL", + "select count(*) from docs", + 0, + ), + ( + "execute_write_shadow_table_insert", + ("create virtual table docs using fts5(title, body)",), + "insert into docs_config(k, v) values ('x', 1)", + "Writes to shadow tables are not allowed in user-supplied SQL", + "select count(*) from docs_config", + 1, + ), + ), + ids=("control-insert", "virtual-table", "shadow-table"), +) @pytest.mark.asyncio -async def test_execute_write_rejects_virtual_table_control_insert(): +async def test_execute_write_rejects_virtual_and_shadow_table_writes( + database_name, + setup_sqls, + write_sql, + expected_error, + verification_sql, + expected_count, +): ds = Datasette(memory=True, default_deny=True) ds.root_enabled = True - db = ds.add_memory_database("execute_write_virtual_table_control", name="data") - await db.execute_write(""" - create virtual table docs using fts5(title, body, content='') - """) - await db.execute_write(""" - insert into docs(rowid, title, body) values (1, 'hello', 'world') - """) + db = ds.add_memory_database(database_name, name="data") + for setup_sql in setup_sqls: + await db.execute_write(setup_sql) await ds.invoke_startup() denied_response = await ds.client.post( "/data/-/execute-write", actor={"id": "root"}, - json={"sql": "insert into docs(docs) values('delete-all')"}, + json={"sql": write_sql}, ) assert denied_response.status_code == 403 - assert denied_response.json()["errors"] == [ - "Writes to virtual tables are not allowed in user-supplied SQL" - ] - assert ( - await db.execute("select count(*) from docs where docs match 'hello'") - ).first()[0] == 1 - - -@pytest.mark.asyncio -async def test_execute_write_rejects_regular_virtual_table_insert(): - ds = Datasette(memory=True, default_deny=True) - ds.root_enabled = True - db = ds.add_memory_database("execute_write_virtual_table_insert", name="data") - await db.execute_write("create virtual table docs using fts5(title, body)") - await ds.invoke_startup() - - denied_response = await ds.client.post( - "/data/-/execute-write", - actor={"id": "root"}, - json={"sql": "insert into docs(rowid, title, body) values (1, 'a', 'b')"}, - ) - - assert denied_response.status_code == 403 - assert denied_response.json()["errors"] == [ - "Writes to virtual tables are not allowed in user-supplied SQL" - ] - assert (await db.execute("select count(*) from docs")).first()[0] == 0 - - -@pytest.mark.asyncio -async def test_execute_write_rejects_shadow_table_insert(): - ds = Datasette(memory=True, default_deny=True) - ds.root_enabled = True - db = ds.add_memory_database("execute_write_shadow_table_insert", name="data") - await db.execute_write("create virtual table docs using fts5(title, body)") - await ds.invoke_startup() - - denied_response = await ds.client.post( - "/data/-/execute-write", - actor={"id": "root"}, - json={"sql": "insert into docs_config(k, v) values ('x', 1)"}, - ) - - assert denied_response.status_code == 403 - assert denied_response.json()["errors"] == [ - "Writes to shadow tables are not allowed in user-supplied SQL" - ] - assert (await db.execute("select count(*) from docs_config")).first()[0] == 1 + assert denied_response.json()["errors"] == [expected_error] + assert (await db.execute(verification_sql)).first()[0] == expected_count @pytest.mark.asyncio @@ -2482,8 +2436,69 @@ async def test_execute_write_create_table_uses_create_table_permission(): assert not await db.table_exists("should_not_exist") +@pytest.mark.parametrize( + ( + "database_name", + "allowed_actor", + "allowed_sql", + "denied_sql", + "expected_error", + "setup_sqls", + "expected_state", + ), + ( + ( + "execute_write_alter_table", + "alterer", + "alter table dogs add column age integer", + "alter table cats add column age integer", + "Permission denied: need alter-table on data/cats", + (), + "alter-table", + ), + ( + "execute_write_create_index", + "alterer", + "create index idx_dogs_name on dogs(name)", + "create index idx_cats_name on cats(name)", + "Permission denied: need alter-table on data/cats", + (), + "create-index", + ), + ( + "execute_write_drop_index", + "alterer", + "drop index idx_dogs_name", + "drop index idx_cats_name", + "Permission denied: need alter-table on data/cats", + ( + "create index idx_dogs_name on dogs(name)", + "create index idx_cats_name on cats(name)", + ), + "drop-index", + ), + ( + "execute_write_drop_table", + "dropper", + "drop table dogs", + "drop table cats", + "Permission denied: need drop-table on data/cats", + (), + "drop-table", + ), + ), + ids=("alter-table", "create-index", "drop-index", "drop-table"), +) @pytest.mark.asyncio -async def test_execute_write_alter_and_drop_table_use_schema_permissions(): +async def test_execute_write_schema_operations_use_schema_permissions( + database_name, + allowed_actor, + allowed_sql, + denied_sql, + expected_error, + setup_sqls, + expected_state, +): ds = Datasette( memory=True, default_deny=True, @@ -2513,73 +2528,53 @@ async def test_execute_write_alter_and_drop_table_use_schema_permissions(): }, }, ) - db = ds.add_memory_database("execute_write_alter_drop_table", name="data") + db = ds.add_memory_database(database_name, name="data") await db.execute_write("create table dogs (id integer primary key, name text)") await db.execute_write("create table cats (id integer primary key, name text)") + for setup_sql in setup_sqls: + await db.execute_write(setup_sql) await ds.invoke_startup() - alter_allowed_response = await ds.client.post( + async def index_exists(index_name): + row = ( + await db.execute( + "select 1 from sqlite_master where type = 'index' and name = ?", + [index_name], + ) + ).first() + return row is not None + + allowed_response = await ds.client.post( "/data/-/execute-write", - actor={"id": "alterer"}, - json={"sql": "alter table dogs add column age integer"}, + actor={"id": allowed_actor}, + json={"sql": allowed_sql}, ) - alter_row_permission_response = await ds.client.post( + denied_response = await ds.client.post( "/data/-/execute-write", actor={"id": "row-writer"}, - json={"sql": "alter table cats add column age integer"}, + json={"sql": denied_sql}, ) - assert alter_allowed_response.status_code == 200 - assert "age" in [column.name for column in await db.table_column_details("dogs")] - assert alter_row_permission_response.status_code == 403 - assert alter_row_permission_response.json()["errors"] == [ - "Permission denied: need alter-table on data/cats" - ] - assert "age" not in [ - column.name for column in await db.table_column_details("cats") - ] + assert allowed_response.status_code == 200 + assert denied_response.status_code == 403 + assert denied_response.json()["errors"] == [expected_error] - create_index_allowed_response = await ds.client.post( - "/data/-/execute-write", - actor={"id": "alterer"}, - json={"sql": "create index idx_dogs_name on dogs(name)"}, - ) - create_index_row_permission_response = await ds.client.post( - "/data/-/execute-write", - actor={"id": "row-writer"}, - json={"sql": "create index idx_cats_name on cats(name)"}, - ) - drop_index_allowed_response = await ds.client.post( - "/data/-/execute-write", - actor={"id": "alterer"}, - json={"sql": "drop index idx_dogs_name"}, - ) - - assert create_index_allowed_response.status_code == 200 - assert create_index_row_permission_response.status_code == 403 - assert create_index_row_permission_response.json()["errors"] == [ - "Permission denied: need alter-table on data/cats" - ] - assert drop_index_allowed_response.status_code == 200 - - drop_allowed_response = await ds.client.post( - "/data/-/execute-write", - actor={"id": "dropper"}, - json={"sql": "drop table dogs"}, - ) - drop_row_permission_response = await ds.client.post( - "/data/-/execute-write", - actor={"id": "row-writer"}, - json={"sql": "drop table cats"}, - ) - - assert drop_allowed_response.status_code == 200 - assert not await db.table_exists("dogs") - assert drop_row_permission_response.status_code == 403 - assert drop_row_permission_response.json()["errors"] == [ - "Permission denied: need drop-table on data/cats" - ] - assert await db.table_exists("cats") + if expected_state == "alter-table": + assert "age" in [ + column.name for column in await db.table_column_details("dogs") + ] + assert "age" not in [ + column.name for column in await db.table_column_details("cats") + ] + elif expected_state == "create-index": + assert await index_exists("idx_dogs_name") + assert not await index_exists("idx_cats_name") + elif expected_state == "drop-index": + assert not await index_exists("idx_dogs_name") + assert await index_exists("idx_cats_name") + elif expected_state == "drop-table": + assert not await db.table_exists("dogs") + assert await db.table_exists("cats") @pytest.mark.asyncio @@ -2644,8 +2639,9 @@ async def test_execute_write_post_rejects_read_only_sql(): ] +@pytest.mark.parametrize("action", ("view-query", "update-query", "delete-query")) @pytest.mark.asyncio -async def test_query_owner_gets_update_delete_and_writable_view_defaults(): +async def test_query_owner_gets_update_delete_and_writable_view_defaults(action): ds = Datasette(memory=True, default_deny=True) ds.add_memory_database("query_owner_defaults", name="data") await ds.invoke_startup() @@ -2658,21 +2654,35 @@ async def test_query_owner_gets_update_delete_and_writable_view_defaults(): owner_id="alice", ) - for action in ("view-query", "update-query", "delete-query"): - assert await ds.allowed( - action=action, - resource=QueryResource("data", "insert_dog"), - actor={"id": "alice"}, - ) - assert not await ds.allowed( - action=action, - resource=QueryResource("data", "insert_dog"), - actor={"id": "bob"}, - ) + assert await ds.allowed( + action=action, + resource=QueryResource("data", "insert_dog"), + actor={"id": "alice"}, + ) + assert not await ds.allowed( + action=action, + resource=QueryResource("data", "insert_dog"), + actor={"id": "bob"}, + ) +@pytest.mark.parametrize( + "action, path_suffix, request_json, expected_public_title", + ( + ( + "update-query", + "-/update", + {"update": {"title": "Bob can edit public queries"}}, + "Bob can edit public queries", + ), + ("delete-query", "-/delete", {}, None), + ), + ids=("update-query", "delete-query"), +) @pytest.mark.asyncio -async def test_private_query_restricts_broad_update_delete_permissions(): +async def test_private_query_restricts_broad_update_delete_permissions( + action, path_suffix, request_json, expected_public_title +): ds = Datasette( memory=True, default_deny=True, @@ -2706,50 +2716,41 @@ async def test_private_query_restricts_broad_update_delete_permissions(): owner_id="alice", ) - for action in ("update-query", "delete-query"): - assert await ds.allowed( - action=action, - resource=QueryResource("data", "alice_private"), - actor={"id": "alice"}, - ) - assert not await ds.allowed( - action=action, - resource=QueryResource("data", "alice_private"), - actor={"id": "bob"}, - ) - assert await ds.allowed( - action=action, - resource=QueryResource("data", "alice_public"), - actor={"id": "bob"}, - ) - - private_update_response = await ds.client.post( - "/data/alice_private/-/update", - actor={"id": "bob"}, - json={"update": {"title": "Nope"}}, + assert await ds.allowed( + action=action, + resource=QueryResource("data", "alice_private"), + actor={"id": "alice"}, ) - private_delete_response = await ds.client.post( - "/data/alice_private/-/delete", + assert not await ds.allowed( + action=action, + resource=QueryResource("data", "alice_private"), actor={"id": "bob"}, - json={}, ) - public_update_response = await ds.client.post( - "/data/alice_public/-/update", + assert await ds.allowed( + action=action, + resource=QueryResource("data", "alice_public"), actor={"id": "bob"}, - json={"update": {"title": "Bob can edit public queries"}}, - ) - public_delete_response = await ds.client.post( - "/data/alice_public/-/delete", - actor={"id": "bob"}, - json={}, ) - assert private_update_response.status_code == 403 - assert private_delete_response.status_code == 403 - assert public_update_response.status_code == 200 - assert public_delete_response.status_code == 200 + private_response = await ds.client.post( + "/data/alice_private/{}".format(path_suffix), + actor={"id": "bob"}, + json=request_json, + ) + public_response = await ds.client.post( + "/data/alice_public/{}".format(path_suffix), + actor={"id": "bob"}, + json=request_json, + ) + + assert private_response.status_code == 403 + assert public_response.status_code == 200 assert await ds.get_query("data", "alice_private") is not None - assert await ds.get_query("data", "alice_public") is None + public_query = await ds.get_query("data", "alice_public") + if expected_public_title is None: + assert public_query is None + else: + assert public_query.title == expected_public_title @pytest.mark.asyncio