From c0a87b809f562ed1c64c666ab1938f17dc3dc2b9 Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Mon, 3 Nov 2025 14:07:03 -0800 Subject: [PATCH] Additional tests for restrict SQL Refs https://github.com/simonw/datasette/issues/2572#issuecomment-3482778412 --- datasette/utils/actions_sql.py | 11 +- datasette/utils/permissions.py | 6 +- tests/test_restriction_sql.py | 315 +++++++++++++++++++++++++++++++++ 3 files changed, 329 insertions(+), 3 deletions(-) create mode 100644 tests/test_restriction_sql.py diff --git a/datasette/utils/actions_sql.py b/datasette/utils/actions_sql.py index 22d42dbc..7121e2d0 100644 --- a/datasette/utils/actions_sql.py +++ b/datasette/utils/actions_sql.py @@ -375,7 +375,11 @@ async def _build_single_action_sql( # Add restriction list CTE if there are restrictions if restriction_sqls: - restriction_intersect = "\nINTERSECT\n".join(restriction_sqls) + # Wrap each restriction_sql in a subquery to avoid operator precedence issues + # with UNION ALL inside the restriction SQL statements + restriction_intersect = "\nINTERSECT\n".join( + f"SELECT * FROM ({sql})" for sql in restriction_sqls + ) query_parts.extend( [",", "restriction_list AS (", f" {restriction_intersect}", ")"] ) @@ -506,7 +510,10 @@ async def check_permission_for_resource( if restriction_sqls: # Check if resource is in restriction allowlist # Database-level restrictions (parent, NULL) should match all children (parent, *) - restriction_check = "\nINTERSECT\n".join(restriction_sqls) + # Wrap each restriction_sql in a subquery to avoid operator precedence issues + restriction_check = "\nINTERSECT\n".join( + f"SELECT * FROM ({sql})" for sql in restriction_sqls + ) restriction_query = f""" WITH restriction_list AS ( {restriction_check} diff --git a/datasette/utils/permissions.py b/datasette/utils/permissions.py index f951e2b3..58be53a3 100644 --- a/datasette/utils/permissions.py +++ b/datasette/utils/permissions.py @@ -293,7 +293,11 @@ async def resolve_permissions_from_catalog( # Build restriction list with INTERSECT (all must match) # Then filter to resources that match hierarchically - restriction_intersect = "\nINTERSECT\n".join(restriction_sqls) + # Wrap each restriction_sql in a subquery to avoid operator precedence issues + # with UNION ALL inside the restriction SQL statements + restriction_intersect = "\nINTERSECT\n".join( + f"SELECT * FROM ({sql})" for sql in restriction_sqls + ) # Combine: resources allowed by permissions AND in restriction allowlist # Database-level restrictions (parent, NULL) should match all children (parent, *) diff --git a/tests/test_restriction_sql.py b/tests/test_restriction_sql.py new file mode 100644 index 00000000..7d6d8a5a --- /dev/null +++ b/tests/test_restriction_sql.py @@ -0,0 +1,315 @@ +import pytest +from datasette.app import Datasette +from datasette.permissions import PermissionSQL +from datasette.resources import TableResource + + +@pytest.mark.asyncio +async def test_multiple_restriction_sources_intersect(): + """ + Test that when multiple plugins return restriction_sql, they are INTERSECTed. + + This tests the case where both actor _r restrictions AND a plugin + provide restriction_sql - both must pass for access to be granted. + """ + from datasette import hookimpl + from datasette.plugins import pm + + class RestrictivePlugin: + __name__ = "RestrictivePlugin" + + @hookimpl + def permission_resources_sql(self, datasette, actor, action): + # Plugin adds additional restriction: only db1_multi_intersect allowed + if action == "view-table": + return PermissionSQL( + restriction_sql="SELECT 'db1_multi_intersect' AS parent, NULL AS child", + params={}, + ) + return None + + plugin = RestrictivePlugin() + pm.register(plugin, name="restrictive_plugin") + + try: + ds = Datasette() + await ds.invoke_startup() + db1 = ds.add_memory_database("db1_multi_intersect") + db2 = ds.add_memory_database("db2_multi_intersect") + await db1.execute_write("CREATE TABLE t1 (id INTEGER)") + await db2.execute_write("CREATE TABLE t1 (id INTEGER)") + await ds._refresh_schemas() # Populate catalog tables + + # Actor has restrictions allowing both databases + # But plugin only allows db1_multi_intersect + # INTERSECT means only db1_multi_intersect/t1 should pass + actor = { + "id": "user", + "_r": {"d": {"db1_multi_intersect": ["vt"], "db2_multi_intersect": ["vt"]}}, + } + + page = await ds.allowed_resources("view-table", actor) + resources = {(r.parent, r.child) for r in page.resources} + + # Should only see db1_multi_intersect/t1 (intersection of actor restrictions and plugin restrictions) + assert ("db1_multi_intersect", "t1") in resources + assert ("db2_multi_intersect", "t1") not in resources + finally: + pm.unregister(name="restrictive_plugin") + + +@pytest.mark.asyncio +async def test_restriction_sql_with_overlapping_databases_and_tables(): + """ + Test actor with both database-level and table-level restrictions for same database. + + When actor has: + - Database-level: db1_overlapping allowed (all tables) + - Table-level: db1_overlapping/t1 allowed + + Both entries are UNION'd (OR'ed) within the actor's restrictions. + Database-level restriction allows ALL tables, so table-level is redundant. + """ + ds = Datasette() + await ds.invoke_startup() + db = ds.add_memory_database("db1_overlapping") + await db.execute_write("CREATE TABLE t1 (id INTEGER)") + await db.execute_write("CREATE TABLE t2 (id INTEGER)") + await ds._refresh_schemas() + + # Actor has BOTH database-level (db1_overlapping all tables) AND table-level (db1_overlapping/t1 only) + actor = { + "id": "user", + "_r": { + "d": { + "db1_overlapping": ["vt"] + }, # Database-level: all tables in db1_overlapping + "r": { + "db1_overlapping": {"t1": ["vt"]} + }, # Table-level: only t1 in db1_overlapping + }, + } + + # Within actor restrictions, entries are UNION'd (OR'ed): + # - Database level allows: (db1_overlapping, NULL) → matches all tables via hierarchical matching + # - Table level allows: (db1_overlapping, t1) → redundant, already covered by database level + # Result: Both tables are allowed + page = await ds.allowed_resources("view-table", actor) + resources = {(r.parent, r.child) for r in page.resources} + + assert ("db1_overlapping", "t1") in resources + # Database-level restriction allows all tables, so t2 is also allowed + assert ("db1_overlapping", "t2") in resources + + +@pytest.mark.asyncio +async def test_restriction_sql_empty_allowlist_query(): + """ + Test the specific SQL query generated when action is not in allowlist. + + actor_restrictions_sql() returns "SELECT NULL AS parent, NULL AS child WHERE 0" + Verify this produces an empty result set. + """ + ds = Datasette() + await ds.invoke_startup() + db = ds.add_memory_database("db1_empty_allowlist") + await db.execute_write("CREATE TABLE t1 (id INTEGER)") + await ds._refresh_schemas() + + # Actor has restrictions but action not in allowlist + actor = {"id": "user", "_r": {"r": {"db1_empty_allowlist": {"t1": ["vt"]}}}} + + # Try to view-database (only view-table is in allowlist) + page = await ds.allowed_resources("view-database", actor) + + # Should be empty + assert len(page.resources) == 0 + + +@pytest.mark.asyncio +async def test_restriction_sql_with_pagination(): + """ + Test that restrictions work correctly with keyset pagination. + """ + ds = Datasette() + await ds.invoke_startup() + db = ds.add_memory_database("db1_pagination") + + # Create many tables + for i in range(10): + await db.execute_write(f"CREATE TABLE t{i:02d} (id INTEGER)") + await ds._refresh_schemas() + + # Actor restricted to only odd-numbered tables + restrictions = {"r": {"db1_pagination": {}}} + for i in range(10): + if i % 2 == 1: # Only odd tables + restrictions["r"]["db1_pagination"][f"t{i:02d}"] = ["vt"] + + actor = {"id": "user", "_r": restrictions} + + # Get first page with small limit + page1 = await ds.allowed_resources( + "view-table", actor, parent="db1_pagination", limit=2 + ) + assert len(page1.resources) == 2 + assert page1.next is not None + + # Get second page using next token + page2 = await ds.allowed_resources( + "view-table", actor, parent="db1_pagination", limit=2, next=page1.next + ) + assert len(page2.resources) == 2 + + # Should have no overlap + page1_ids = {r.child for r in page1.resources} + page2_ids = {r.child for r in page2.resources} + assert page1_ids.isdisjoint(page2_ids) + + # All should be odd-numbered tables + all_ids = page1_ids | page2_ids + for table_id in all_ids: + table_num = int(table_id[1:]) # Extract number from "t01", "t03", etc. + assert table_num % 2 == 1, f"Table {table_id} should be odd-numbered" + + +@pytest.mark.asyncio +async def test_also_requires_with_restrictions(): + """ + Test that also_requires actions properly respect restrictions. + + execute-sql requires view-database. With restrictions, both must pass. + """ + ds = Datasette() + await ds.invoke_startup() + db1 = ds.add_memory_database("db1_also_requires") + db2 = ds.add_memory_database("db2_also_requires") + await ds._refresh_schemas() + + # Actor restricted to only db1_also_requires for view-database + # execute-sql requires view-database, so should only work on db1_also_requires + actor = { + "id": "user", + "_r": { + "d": { + "db1_also_requires": ["vd", "es"], + "db2_also_requires": [ + "es" + ], # They have execute-sql but not view-database + } + }, + } + + # db1_also_requires should allow execute-sql + result = await ds.allowed( + action="execute-sql", + resource=TableResource("db1_also_requires", None), + actor=actor, + ) + assert result is True + + # db2_also_requires should not (they have execute-sql but not view-database) + result = await ds.allowed( + action="execute-sql", + resource=TableResource("db2_also_requires", None), + actor=actor, + ) + assert result is False + + +@pytest.mark.asyncio +async def test_restriction_abbreviations_and_full_names(): + """ + Test that both abbreviations and full action names work in restrictions. + """ + ds = Datasette() + await ds.invoke_startup() + db = ds.add_memory_database("db1_abbrev") + await db.execute_write("CREATE TABLE t1 (id INTEGER)") + await ds._refresh_schemas() + + # Test with abbreviation + actor_abbr = {"id": "user", "_r": {"r": {"db1_abbrev": {"t1": ["vt"]}}}} + result = await ds.allowed( + action="view-table", + resource=TableResource("db1_abbrev", "t1"), + actor=actor_abbr, + ) + assert result is True + + # Test with full name + actor_full = {"id": "user", "_r": {"r": {"db1_abbrev": {"t1": ["view-table"]}}}} + result = await ds.allowed( + action="view-table", + resource=TableResource("db1_abbrev", "t1"), + actor=actor_full, + ) + assert result is True + + # Test with mixed + actor_mixed = {"id": "user", "_r": {"d": {"db1_abbrev": ["view-database", "vt"]}}} + result = await ds.allowed( + action="view-table", + resource=TableResource("db1_abbrev", "t1"), + actor=actor_mixed, + ) + assert result is True + + +@pytest.mark.asyncio +async def test_permission_resources_sql_multiple_restriction_sources_intersect(): + """ + Test that when multiple plugins return restriction_sql, they are INTERSECTed. + + This tests the case where both actor _r restrictions AND a plugin + provide restriction_sql - both must pass for access to be granted. + """ + from datasette import hookimpl + from datasette.plugins import pm + + class RestrictivePlugin: + __name__ = "RestrictivePlugin" + + @hookimpl + def permission_resources_sql(self, datasette, actor, action): + # Plugin adds additional restriction: only db1_multi_restrictions allowed + if action == "view-table": + return PermissionSQL( + restriction_sql="SELECT 'db1_multi_restrictions' AS parent, NULL AS child", + params={}, + ) + return None + + plugin = RestrictivePlugin() + pm.register(plugin, name="restrictive_plugin") + + try: + ds = Datasette() + await ds.invoke_startup() + db1 = ds.add_memory_database("db1_multi_restrictions") + db2 = ds.add_memory_database("db2_multi_restrictions") + await db1.execute_write("CREATE TABLE t1 (id INTEGER)") + await db2.execute_write("CREATE TABLE t1 (id INTEGER)") + await ds._refresh_schemas() # Populate catalog tables + + # Actor has restrictions allowing both databases + # But plugin only allows db1 + # INTERSECT means only db1/t1 should pass + actor = { + "id": "user", + "_r": { + "d": { + "db1_multi_restrictions": ["vt"], + "db2_multi_restrictions": ["vt"], + } + }, + } + + page = await ds.allowed_resources("view-table", actor) + resources = {(r.parent, r.child) for r in page.resources} + + # Should only see db1/t1 (intersection of actor restrictions and plugin restrictions) + assert ("db1_multi_restrictions", "t1") in resources + assert ("db2_multi_restrictions", "t1") not in resources + finally: + pm.unregister(name="restrictive_plugin")