Transform actor restrictions into SQL permission rules

Actor restrictions (_r) now integrate with the SQL permission layer via
the permission_resources_sql() hook instead of acting as a post-filter.

This fixes the issue where allowed_resources() didn't respect restrictions,
causing incorrect database/table listings at /.json and /database.json
endpoints for restricted actors.

Key changes:
- Add _restriction_permission_rules() function to generate SQL rules from _r
- Restrictions create global DENY + specific ALLOW rules using allowlist
- Restrictions act as gating filter BEFORE config/root/default permissions
- Remove post-filter check from allowed() method (now redundant)
- Skip default allow rules when actor has restrictions
- Add comprehensive tests for restriction filtering behavior

The cascading permission logic (child → parent → global) ensures that
allowlisted resources override the global deny, while non-allowlisted
resources are blocked.

Closes #2534
This commit is contained in:
Simon Willison 2025-10-25 15:43:14 -07:00
commit fb9cd5c72c
3 changed files with 282 additions and 39 deletions

View file

@ -1290,25 +1290,6 @@ class Datasette:
child=resource.child,
)
# Check actor restrictions after SQL permissions
# If the SQL check says "yes" but actor has restrictions, verify action is allowed
if result and actor and "_r" in actor:
from datasette.default_permissions import restrictions_allow_action
# Convert Resource to old-style format for restrictions check
if resource.parent and resource.child:
old_style_resource = (resource.parent, resource.child)
elif resource.parent:
old_style_resource = resource.parent
else:
old_style_resource = None
# If restrictions don't allow this action, deny it
if not restrictions_allow_action(
self, actor["_r"], action, old_style_resource
):
result = False
# Log the permission check for debugging
self._permission_checks.append(
PermissionCheck(

View file

@ -9,6 +9,12 @@ import time
async def permission_resources_sql(datasette, actor, action):
rules: list[PermissionSQL] = []
# 1. FIRST: Actor restrictions (if present)
# These act as a gating filter - must pass through before other checks
restriction_rules = await _restriction_permission_rules(datasette, actor, action)
rules.extend(restriction_rules)
# 2. Root user permissions
# Root user with root_enabled gets all permissions at global level
# Config rules at more specific levels (database/table) can still override
if datasette.root_enabled and actor and actor.get("id") == "root":
@ -24,10 +30,11 @@ async def permission_resources_sql(datasette, actor, action):
)
)
# 3. Config-based permission rules
config_rules = await _config_permission_rules(datasette, actor, action)
rules.extend(config_rules)
# Check default_allow_sql setting for execute-sql action
# 4. Check default_allow_sql setting for execute-sql action
if action == "execute-sql" and not datasette.setting("default_allow_sql"):
# Return a deny rule for all databases
sql = "SELECT NULL AS parent, NULL AS child, 0 AS allow, 'default_allow_sql is false' AS reason"
@ -45,26 +52,31 @@ async def permission_resources_sql(datasette, actor, action):
return rules[0]
return rules
default_allow_actions = {
"view-instance",
"view-database",
"view-database-download",
"view-table",
"view-query",
"execute-sql",
}
if action in default_allow_actions:
reason = f"default allow for {action}".replace("'", "''")
sql = (
"SELECT NULL AS parent, NULL AS child, 1 AS allow, " f"'{reason}' AS reason"
)
rules.append(
PermissionSQL(
source="default_permissions",
sql=sql,
params={},
# 5. Default allow actions (ONLY if no restrictions)
# If actor has restrictions, they've already added their own deny/allow rules
has_restrictions = actor and "_r" in actor
if not has_restrictions:
default_allow_actions = {
"view-instance",
"view-database",
"view-database-download",
"view-table",
"view-query",
"execute-sql",
}
if action in default_allow_actions:
reason = f"default allow for {action}".replace("'", "''")
sql = (
"SELECT NULL AS parent, NULL AS child, 1 AS allow, "
f"'{reason}' AS reason"
)
rules.append(
PermissionSQL(
source="default_permissions",
sql=sql,
params={},
)
)
)
if not rules:
return None
@ -231,6 +243,122 @@ async def _config_permission_rules(datasette, actor, action) -> list[PermissionS
return [PermissionSQL(source="config_permissions", sql=sql, params=params)]
async def _restriction_permission_rules(
datasette, actor, action
) -> list[PermissionSQL]:
"""
Generate PermissionSQL rules from actor restrictions (_r key).
Actor restrictions define an allowlist. We implement this via:
1. Global DENY rule for the action (blocks everything by default)
2. Specific ALLOW rules for each allowlisted resource
The cascading logic (child parent global) ensures that:
- Allowlisted resources at child/parent level override global deny
- Non-allowlisted resources are blocked by global deny
This creates a gating filter that runs BEFORE normal permission checks.
Restrictions cannot be overridden by config - they gate what gets checked.
"""
if not actor or "_r" not in actor:
return []
restrictions = actor["_r"]
# Check if this action appears in restrictions (with abbreviations)
action_obj = datasette.actions.get(action)
action_checks = {action}
if action_obj and action_obj.abbr:
action_checks.add(action_obj.abbr)
# Check if this action is in the allowlist anywhere in restrictions
is_in_allowlist = False
global_actions = restrictions.get("a", [])
if action_checks.intersection(global_actions):
is_in_allowlist = True
if not is_in_allowlist:
for db_actions in restrictions.get("d", {}).values():
if action_checks.intersection(db_actions):
is_in_allowlist = True
break
if not is_in_allowlist:
for tables in restrictions.get("r", {}).values():
for table_actions in tables.values():
if action_checks.intersection(table_actions):
is_in_allowlist = True
break
if is_in_allowlist:
break
# If action not in allowlist at all, add global deny and return
if not is_in_allowlist:
sql = "SELECT NULL AS parent, NULL AS child, 0 AS allow, :deny_reason AS reason"
return [
PermissionSQL(
source="actor_restrictions",
sql=sql,
params={
"deny_reason": f"actor restrictions: {action} not in allowlist"
},
)
]
# Action IS in allowlist - build deny + specific allows
selects = []
params = {}
param_counter = 0
def add_row(parent, child, allow, reason):
"""Helper to add a parameterized SELECT statement"""
nonlocal param_counter
prefix = f"restr_{param_counter}"
param_counter += 1
selects.append(
f"SELECT :{prefix}_parent AS parent, :{prefix}_child AS child, "
f":{prefix}_allow AS allow, :{prefix}_reason AS reason"
)
params[f"{prefix}_parent"] = parent
params[f"{prefix}_child"] = child
params[f"{prefix}_allow"] = 1 if allow else 0
params[f"{prefix}_reason"] = reason
# If NOT globally allowed, add global deny as gatekeeper
is_globally_allowed = action_checks.intersection(global_actions)
if not is_globally_allowed:
add_row(None, None, 0, f"actor restrictions: {action} denied by default")
else:
# Globally allowed - add global allow
add_row(None, None, 1, f"actor restrictions: global {action}")
# Add database-level allows
db_restrictions = restrictions.get("d", {})
for db_name, db_actions in db_restrictions.items():
if action_checks.intersection(db_actions):
add_row(db_name, None, 1, f"actor restrictions: database {db_name}")
# Add resource/table-level allows
resource_restrictions = restrictions.get("r", {})
for db_name, tables in resource_restrictions.items():
for table_name, table_actions in tables.items():
if action_checks.intersection(table_actions):
add_row(
db_name,
table_name,
1,
f"actor restrictions: {db_name}/{table_name}",
)
if not selects:
return []
sql = "\nUNION ALL\n".join(selects)
return [PermissionSQL(source="actor_restrictions", sql=sql, params=params)]
def restrictions_allow_action(
datasette: "Datasette",
restrictions: dict,

View file

@ -40,6 +40,8 @@ async def perms_ds():
await one.execute_write("create view if not exists v1 as select * from t1")
await one.execute_write("create table if not exists t2 (id integer primary key)")
await two.execute_write("create table if not exists t1 (id integer primary key)")
# Trigger catalog refresh so allowed_resources() can be called
await ds.client.get("/")
return ds
@ -1308,3 +1310,135 @@ async def test_restrictions_allow_action(restrictions, action, resource, expecte
await ds.invoke_startup()
actual = restrictions_allow_action(ds, restrictions, action, resource)
assert actual == expected
@pytest.mark.asyncio
async def test_actor_restrictions_filters_allowed_resources(perms_ds):
"""Test that allowed_resources() respects actor restrictions - issue #2534"""
# Actor restricted to just perms_ds_one/t1
actor = {"id": "user", "_r": {"r": {"perms_ds_one": {"t1": ["vt"]}}}}
# Should only return t1
allowed_tables = await perms_ds.allowed_resources("view-table", actor)
assert len(allowed_tables) == 1
assert allowed_tables[0].parent == "perms_ds_one"
assert allowed_tables[0].child == "t1"
# Database listing should be empty (no view-database permission)
allowed_dbs = await perms_ds.allowed_resources("view-database", actor)
assert len(allowed_dbs) == 0
@pytest.mark.asyncio
async def test_actor_restrictions_database_level(perms_ds):
"""Test database-level restrictions allow all tables in database - issue #2534"""
actor = {"id": "user", "_r": {"d": {"perms_ds_one": ["vt"]}}}
allowed_tables = await perms_ds.allowed_resources(
"view-table", actor, parent="perms_ds_one"
)
# Should return all tables in perms_ds_one
table_names = {r.child for r in allowed_tables}
assert "t1" in table_names
assert "t2" in table_names
assert "v1" in table_names # views too
@pytest.mark.asyncio
async def test_actor_restrictions_global_level(perms_ds):
"""Test global-level restrictions allow all resources - issue #2534"""
actor = {"id": "user", "_r": {"a": ["vt"]}}
allowed_tables = await perms_ds.allowed_resources("view-table", actor)
# Should return all tables in all databases
assert len(allowed_tables) > 0
dbs = {r.parent for r in allowed_tables}
assert "perms_ds_one" in dbs
assert "perms_ds_two" in dbs
@pytest.mark.asyncio
async def test_restrictions_gate_before_config(perms_ds):
"""Test that restrictions act as gating filter before config permissions - issue #2534"""
from datasette.resources import TableResource
# Actor restricted to just t1 (not t2)
actor = {"id": "user", "_r": {"r": {"perms_ds_one": {"t1": ["vt"]}}}}
# Config doesn't matter - restrictions gate what's checked
# t2 is not in restriction allowlist, so should be DENIED
result = await perms_ds.allowed(
action="view-table",
resource=TableResource("perms_ds_one", "t2"),
actor=actor,
)
assert result is False
# t1 is in restrictions AND passes normal permission check - should be ALLOWED
result = await perms_ds.allowed(
action="view-table",
resource=TableResource("perms_ds_one", "t1"),
actor=actor,
)
assert result is True
@pytest.mark.asyncio
async def test_actor_restrictions_json_endpoints_show_filtered_listings(perms_ds):
"""Test that /.json and /db.json show correct filtered listings - issue #2534"""
actor = {"id": "user", "_r": {"r": {"perms_ds_one": {"t1": ["vt"]}}}}
cookies = {"ds_actor": perms_ds.client.actor_cookie(actor)}
# /.json should be 403 (no view-instance permission)
response = await perms_ds.client.get("/.json", cookies=cookies)
assert response.status_code == 403
# /perms_ds_one.json should be 403 (no view-database permission)
response = await perms_ds.client.get("/perms_ds_one.json", cookies=cookies)
assert response.status_code == 403
# /perms_ds_one/t1.json should be 200
response = await perms_ds.client.get("/perms_ds_one/t1.json", cookies=cookies)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_actor_restrictions_view_instance_only(perms_ds):
"""Test actor restricted to view-instance only - issue #2534"""
actor = {"id": "user", "_r": {"a": ["vi"]}}
cookies = {"ds_actor": perms_ds.client.actor_cookie(actor)}
# /.json should be 200 (has view-instance permission)
response = await perms_ds.client.get("/.json", cookies=cookies)
assert response.status_code == 200
# But no databases should be visible (no view-database permission)
data = response.json()
# The instance is visible but databases list should be empty or minimal
# Actually, let's check via allowed_resources
allowed_dbs = await perms_ds.allowed_resources("view-database", actor)
assert len(allowed_dbs) == 0
@pytest.mark.asyncio
async def test_actor_restrictions_empty_allowlist(perms_ds):
"""Test actor with empty restrictions allowlist denies everything - issue #2534"""
actor = {"id": "user", "_r": {}}
# No actions in allowlist, so everything should be denied
allowed_tables = await perms_ds.allowed_resources("view-table", actor)
assert len(allowed_tables) == 0
allowed_dbs = await perms_ds.allowed_resources("view-database", actor)
assert len(allowed_dbs) == 0
result = await perms_ds.allowed(action="view-instance", actor=actor)
assert result is False