datasette/tests/test_restriction_sql.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

315 lines
11 KiB
Python
Raw Normal View History

import pytest
from datasette.app import Datasette
from datasette.permissions import PermissionSQL
from datasette.resources import TableResource
@pytest.mark.asyncio
async def test_multiple_restriction_sources_intersect():
"""
Test that when multiple plugins return restriction_sql, they are INTERSECTed.
This tests the case where both actor _r restrictions AND a plugin
provide restriction_sql - both must pass for access to be granted.
"""
from datasette import hookimpl
class RestrictivePlugin:
__name__ = "RestrictivePlugin"
@hookimpl
def permission_resources_sql(self, datasette, actor, action):
# Plugin adds additional restriction: only db1_multi_intersect allowed
if action == "view-table":
return PermissionSQL(
restriction_sql="SELECT 'db1_multi_intersect' AS parent, NULL AS child",
params={},
)
return None
plugin = RestrictivePlugin()
2025-11-13 10:31:03 -08:00
ds = Datasette()
await ds.invoke_startup()
ds.pm.register(plugin, name="restrictive_plugin")
try:
db1 = ds.add_memory_database("db1_multi_intersect")
db2 = ds.add_memory_database("db2_multi_intersect")
await db1.execute_write("CREATE TABLE t1 (id INTEGER)")
await db2.execute_write("CREATE TABLE t1 (id INTEGER)")
await ds._refresh_schemas() # Populate catalog tables
# Actor has restrictions allowing both databases
# But plugin only allows db1_multi_intersect
# INTERSECT means only db1_multi_intersect/t1 should pass
actor = {
"id": "user",
"_r": {"d": {"db1_multi_intersect": ["vt"], "db2_multi_intersect": ["vt"]}},
}
page = await ds.allowed_resources("view-table", actor)
resources = {(r.parent, r.child) for r in page.resources}
# Should only see db1_multi_intersect/t1 (intersection of actor restrictions and plugin restrictions)
assert ("db1_multi_intersect", "t1") in resources
assert ("db2_multi_intersect", "t1") not in resources
finally:
2025-11-13 10:31:03 -08:00
ds.pm.unregister(name="restrictive_plugin")
@pytest.mark.asyncio
async def test_restriction_sql_with_overlapping_databases_and_tables():
"""
Test actor with both database-level and table-level restrictions for same database.
When actor has:
- Database-level: db1_overlapping allowed (all tables)
- Table-level: db1_overlapping/t1 allowed
Both entries are UNION'd (OR'ed) within the actor's restrictions.
Database-level restriction allows ALL tables, so table-level is redundant.
"""
ds = Datasette()
await ds.invoke_startup()
db = ds.add_memory_database("db1_overlapping")
await db.execute_write("CREATE TABLE t1 (id INTEGER)")
await db.execute_write("CREATE TABLE t2 (id INTEGER)")
await ds._refresh_schemas()
# Actor has BOTH database-level (db1_overlapping all tables) AND table-level (db1_overlapping/t1 only)
actor = {
"id": "user",
"_r": {
"d": {
"db1_overlapping": ["vt"]
}, # Database-level: all tables in db1_overlapping
"r": {
"db1_overlapping": {"t1": ["vt"]}
}, # Table-level: only t1 in db1_overlapping
},
}
# Within actor restrictions, entries are UNION'd (OR'ed):
# - Database level allows: (db1_overlapping, NULL) → matches all tables via hierarchical matching
# - Table level allows: (db1_overlapping, t1) → redundant, already covered by database level
# Result: Both tables are allowed
page = await ds.allowed_resources("view-table", actor)
resources = {(r.parent, r.child) for r in page.resources}
assert ("db1_overlapping", "t1") in resources
# Database-level restriction allows all tables, so t2 is also allowed
assert ("db1_overlapping", "t2") in resources
@pytest.mark.asyncio
async def test_restriction_sql_empty_allowlist_query():
"""
Test the specific SQL query generated when action is not in allowlist.
actor_restrictions_sql() returns "SELECT NULL AS parent, NULL AS child WHERE 0"
Verify this produces an empty result set.
"""
ds = Datasette()
await ds.invoke_startup()
db = ds.add_memory_database("db1_empty_allowlist")
await db.execute_write("CREATE TABLE t1 (id INTEGER)")
await ds._refresh_schemas()
# Actor has restrictions but action not in allowlist
actor = {"id": "user", "_r": {"r": {"db1_empty_allowlist": {"t1": ["vt"]}}}}
# Try to view-database (only view-table is in allowlist)
page = await ds.allowed_resources("view-database", actor)
# Should be empty
assert len(page.resources) == 0
@pytest.mark.asyncio
async def test_restriction_sql_with_pagination():
"""
Test that restrictions work correctly with keyset pagination.
"""
ds = Datasette()
await ds.invoke_startup()
db = ds.add_memory_database("db1_pagination")
# Create many tables
for i in range(10):
await db.execute_write(f"CREATE TABLE t{i:02d} (id INTEGER)")
await ds._refresh_schemas()
# Actor restricted to only odd-numbered tables
restrictions = {"r": {"db1_pagination": {}}}
for i in range(10):
if i % 2 == 1: # Only odd tables
restrictions["r"]["db1_pagination"][f"t{i:02d}"] = ["vt"]
actor = {"id": "user", "_r": restrictions}
# Get first page with small limit
page1 = await ds.allowed_resources(
"view-table", actor, parent="db1_pagination", limit=2
)
assert len(page1.resources) == 2
assert page1.next is not None
# Get second page using next token
page2 = await ds.allowed_resources(
"view-table", actor, parent="db1_pagination", limit=2, next=page1.next
)
assert len(page2.resources) == 2
# Should have no overlap
page1_ids = {r.child for r in page1.resources}
page2_ids = {r.child for r in page2.resources}
assert page1_ids.isdisjoint(page2_ids)
# All should be odd-numbered tables
all_ids = page1_ids | page2_ids
for table_id in all_ids:
table_num = int(table_id[1:]) # Extract number from "t01", "t03", etc.
assert table_num % 2 == 1, f"Table {table_id} should be odd-numbered"
@pytest.mark.asyncio
async def test_also_requires_with_restrictions():
"""
Test that also_requires actions properly respect restrictions.
execute-sql requires view-database. With restrictions, both must pass.
"""
ds = Datasette()
await ds.invoke_startup()
ds.add_memory_database("db1_also_requires")
ds.add_memory_database("db2_also_requires")
await ds._refresh_schemas()
# Actor restricted to only db1_also_requires for view-database
# execute-sql requires view-database, so should only work on db1_also_requires
actor = {
"id": "user",
"_r": {
"d": {
"db1_also_requires": ["vd", "es"],
"db2_also_requires": [
"es"
], # They have execute-sql but not view-database
}
},
}
# db1_also_requires should allow execute-sql
result = await ds.allowed(
action="execute-sql",
resource=TableResource("db1_also_requires", None),
actor=actor,
)
assert result is True
# db2_also_requires should not (they have execute-sql but not view-database)
result = await ds.allowed(
action="execute-sql",
resource=TableResource("db2_also_requires", None),
actor=actor,
)
assert result is False
@pytest.mark.asyncio
async def test_restriction_abbreviations_and_full_names():
"""
Test that both abbreviations and full action names work in restrictions.
"""
ds = Datasette()
await ds.invoke_startup()
db = ds.add_memory_database("db1_abbrev")
await db.execute_write("CREATE TABLE t1 (id INTEGER)")
await ds._refresh_schemas()
# Test with abbreviation
actor_abbr = {"id": "user", "_r": {"r": {"db1_abbrev": {"t1": ["vt"]}}}}
result = await ds.allowed(
action="view-table",
resource=TableResource("db1_abbrev", "t1"),
actor=actor_abbr,
)
assert result is True
# Test with full name
actor_full = {"id": "user", "_r": {"r": {"db1_abbrev": {"t1": ["view-table"]}}}}
result = await ds.allowed(
action="view-table",
resource=TableResource("db1_abbrev", "t1"),
actor=actor_full,
)
assert result is True
# Test with mixed
actor_mixed = {"id": "user", "_r": {"d": {"db1_abbrev": ["view-database", "vt"]}}}
result = await ds.allowed(
action="view-table",
resource=TableResource("db1_abbrev", "t1"),
actor=actor_mixed,
)
assert result is True
@pytest.mark.asyncio
async def test_permission_resources_sql_multiple_restriction_sources_intersect():
"""
Test that when multiple plugins return restriction_sql, they are INTERSECTed.
This tests the case where both actor _r restrictions AND a plugin
provide restriction_sql - both must pass for access to be granted.
"""
from datasette import hookimpl
class RestrictivePlugin:
__name__ = "RestrictivePlugin"
@hookimpl
def permission_resources_sql(self, datasette, actor, action):
# Plugin adds additional restriction: only db1_multi_restrictions allowed
if action == "view-table":
return PermissionSQL(
restriction_sql="SELECT 'db1_multi_restrictions' AS parent, NULL AS child",
params={},
)
return None
plugin = RestrictivePlugin()
2025-11-13 10:31:03 -08:00
ds = Datasette()
await ds.invoke_startup()
ds.pm.register(plugin, name="restrictive_plugin")
try:
db1 = ds.add_memory_database("db1_multi_restrictions")
db2 = ds.add_memory_database("db2_multi_restrictions")
await db1.execute_write("CREATE TABLE t1 (id INTEGER)")
await db2.execute_write("CREATE TABLE t1 (id INTEGER)")
await ds._refresh_schemas() # Populate catalog tables
# Actor has restrictions allowing both databases
# But plugin only allows db1
# INTERSECT means only db1/t1 should pass
actor = {
"id": "user",
"_r": {
"d": {
"db1_multi_restrictions": ["vt"],
"db2_multi_restrictions": ["vt"],
}
},
}
page = await ds.allowed_resources("view-table", actor)
resources = {(r.parent, r.child) for r in page.resources}
# Should only see db1/t1 (intersection of actor restrictions and plugin restrictions)
assert ("db1_multi_restrictions", "t1") in resources
assert ("db2_multi_restrictions", "t1") not in resources
finally:
2025-11-13 10:31:03 -08:00
ds.pm.unregister(name="restrictive_plugin")