WIP restrictions SQL mechanism, refs #2572

This commit is contained in:
Simon Willison 2025-11-03 08:46:42 -08:00
commit 2fd98f4422
7 changed files with 434 additions and 113 deletions

View file

@ -19,7 +19,7 @@ async def actor_restrictions_sql(datasette, actor, action):
return None return None
restrictions = actor.get("_r") if isinstance(actor, dict) else None restrictions = actor.get("_r") if isinstance(actor, dict) else None
if not restrictions: if restrictions is None:
return [] return []
# Check if this action appears in restrictions (with abbreviations) # Check if this action appears in restrictions (with abbreviations)
@ -28,91 +28,63 @@ async def actor_restrictions_sql(datasette, actor, action):
if action_obj and action_obj.abbr: if action_obj and action_obj.abbr:
action_checks.add(action_obj.abbr) action_checks.add(action_obj.abbr)
# Check if this action is in the allowlist anywhere in restrictions # Check if globally allowed in restrictions
is_in_allowlist = False
global_actions = restrictions.get("a", []) global_actions = restrictions.get("a", [])
if action_checks.intersection(global_actions): is_globally_allowed = action_checks.intersection(global_actions)
is_in_allowlist = True
if not is_in_allowlist: if is_globally_allowed:
for db_actions in restrictions.get("d", {}).values(): # Globally allowed - no restriction filtering needed
if action_checks.intersection(db_actions): return []
is_in_allowlist = True
break
if not is_in_allowlist: # Not globally allowed - build restriction_sql that lists allowlisted resources
for tables in restrictions.get("r", {}).values(): restriction_selects = []
for table_actions in tables.values(): restriction_params = {}
if action_checks.intersection(table_actions):
is_in_allowlist = True
break
if is_in_allowlist:
break
# If action not in allowlist at all, add global deny and return
if not is_in_allowlist:
sql = "SELECT NULL AS parent, NULL AS child, 0 AS allow, :actor_deny_reason AS reason"
return [
PermissionSQL(
sql=sql,
params={
"actor_deny_reason": f"actor restrictions: {action} not in allowlist"
},
)
]
# Action IS in allowlist - build deny + specific allows
selects = []
params = {}
param_counter = 0 param_counter = 0
def add_row(parent, child, allow, reason): # Add database-level allowlisted resources
"""Helper to add a parameterized SELECT statement."""
nonlocal param_counter
prefix = f"restr_{param_counter}"
param_counter += 1
selects.append(
f"SELECT :{prefix}_parent AS parent, :{prefix}_child AS child, "
f":{prefix}_allow AS allow, :{prefix}_reason AS reason"
)
params[f"{prefix}_parent"] = parent
params[f"{prefix}_child"] = child
params[f"{prefix}_allow"] = 1 if allow else 0
params[f"{prefix}_reason"] = reason
# If NOT globally allowed, add global deny as gatekeeper
is_globally_allowed = action_checks.intersection(global_actions)
if not is_globally_allowed:
add_row(None, None, 0, f"actor restrictions: {action} denied by default")
else:
# Globally allowed - add global allow
add_row(None, None, 1, f"actor restrictions: global {action}")
# Add database-level allows
db_restrictions = restrictions.get("d", {}) db_restrictions = restrictions.get("d", {})
for db_name, db_actions in db_restrictions.items(): for db_name, db_actions in db_restrictions.items():
if action_checks.intersection(db_actions): if action_checks.intersection(db_actions):
add_row(db_name, None, 1, f"actor restrictions: database {db_name}") prefix = f"restr_{param_counter}"
param_counter += 1
restriction_selects.append(
f"SELECT :{prefix}_parent AS parent, NULL AS child"
)
restriction_params[f"{prefix}_parent"] = db_name
# Add resource/table-level allows # Add table-level allowlisted resources
resource_restrictions = restrictions.get("r", {}) resource_restrictions = restrictions.get("r", {})
for db_name, tables in resource_restrictions.items(): for db_name, tables in resource_restrictions.items():
for table_name, table_actions in tables.items(): for table_name, table_actions in tables.items():
if action_checks.intersection(table_actions): if action_checks.intersection(table_actions):
add_row( prefix = f"restr_{param_counter}"
db_name, param_counter += 1
table_name, restriction_selects.append(
1, f"SELECT :{prefix}_parent AS parent, :{prefix}_child AS child"
f"actor restrictions: {db_name}/{table_name}",
) )
restriction_params[f"{prefix}_parent"] = db_name
restriction_params[f"{prefix}_child"] = table_name
if not selects: if not restriction_selects:
return [] # Action not in allowlist - return empty restriction (INTERSECT will return no results)
return [
PermissionSQL(
params={"deny": f"actor restrictions: {action} not in allowlist"},
restriction_sql="SELECT NULL AS parent, NULL AS child WHERE 0", # Empty set
)
]
sql = "\nUNION ALL\n".join(selects) # Build restriction SQL that returns allowed (parent, child) pairs
restriction_sql = "\nUNION ALL\n".join(restriction_selects)
return [PermissionSQL(sql=sql, params=params)] # Return restriction-only PermissionSQL (sql=None means no permission rules)
# The restriction_sql does the actual filtering via INTERSECT
return [
PermissionSQL(
params=restriction_params,
restriction_sql=restriction_sql,
)
]
@hookimpl(specname="permission_resources_sql") @hookimpl(specname="permission_resources_sql")
@ -375,13 +347,11 @@ async def default_allow_sql_check(datasette, actor, action):
@hookimpl(specname="permission_resources_sql") @hookimpl(specname="permission_resources_sql")
async def default_action_permissions_sql(datasette, actor, action): async def default_action_permissions_sql(datasette, actor, action):
"""Apply default allow rules for standard view/execute actions.""" """Apply default allow rules for standard view/execute actions.
# Only apply defaults if actor has no restrictions
# If actor has restrictions, they've already added their own deny/allow rules
has_restrictions = actor and "_r" in actor
if has_restrictions:
return None
With the INTERSECT-based restriction approach, these defaults are always generated
and then filtered by restriction_sql if the actor has restrictions.
"""
default_allow_actions = { default_allow_actions = {
"view-instance", "view-instance",
"view-database", "view-database",

View file

@ -138,13 +138,20 @@ class PermissionSQL:
child TEXT NULL, child TEXT NULL,
allow INTEGER, -- 1 allow, 0 deny allow INTEGER, -- 1 allow, 0 deny
reason TEXT reason TEXT
For restriction-only plugins, sql can be None and only restriction_sql is provided.
""" """
sql: str # SQL that SELECTs the 4 columns above sql: str | None = (
None # SQL that SELECTs the 4 columns above (can be None for restriction-only)
)
params: dict[str, Any] | None = ( params: dict[str, Any] | None = (
None # bound params for the SQL (values only; no ':' prefix) None # bound params for the SQL (values only; no ':' prefix)
) )
source: str | None = None # System will set this to the plugin name source: str | None = None # System will set this to the plugin name
restriction_sql: str | None = (
None # Optional SQL that returns (parent, child) for restriction filtering
)
@classmethod @classmethod
def allow(cls, reason: str, _allow: bool = True) -> "PermissionSQL": def allow(cls, reason: str, _allow: bool = True) -> "PermissionSQL":

View file

@ -157,8 +157,19 @@ async def _build_single_action_sql(
all_params = {} all_params = {}
rule_sqls = [] rule_sqls = []
restriction_sqls = []
for permission_sql in permission_sqls: for permission_sql in permission_sqls:
# Always collect params (even from restriction-only plugins)
all_params.update(permission_sql.params or {})
# Collect restriction SQL filters
if permission_sql.restriction_sql:
restriction_sqls.append(permission_sql.restriction_sql)
# Skip plugins that only provide restriction_sql (no permission rules)
if permission_sql.sql is None:
continue
rule_sqls.append( rule_sqls.append(
f""" f"""
SELECT parent, child, allow, reason, '{permission_sql.source}' AS source_plugin FROM ( SELECT parent, child, allow, reason, '{permission_sql.source}' AS source_plugin FROM (
@ -166,7 +177,6 @@ async def _build_single_action_sql(
) )
""".strip() """.strip()
) )
all_params.update(permission_sql.params or {})
# If no rules, return empty result (deny all) # If no rules, return empty result (deny all)
if not rule_sqls: if not rule_sqls:
@ -200,6 +210,9 @@ async def _build_single_action_sql(
anon_params = {} anon_params = {}
for permission_sql in anon_permission_sqls: for permission_sql in anon_permission_sqls:
# Skip plugins that only provide restriction_sql (no permission rules)
if permission_sql.sql is None:
continue
rewritten_sql = permission_sql.sql rewritten_sql = permission_sql.sql
for key, value in (permission_sql.params or {}).items(): for key, value in (permission_sql.params or {}).items():
anon_key = f"anon_{key}" anon_key = f"anon_{key}"
@ -360,6 +373,13 @@ async def _build_single_action_sql(
query_parts.append(")") query_parts.append(")")
# Add restriction list CTE if there are restrictions
if restriction_sqls:
restriction_intersect = "\nINTERSECT\n".join(restriction_sqls)
query_parts.extend(
[",", "restriction_list AS (", f" {restriction_intersect}", ")"]
)
# Final SELECT # Final SELECT
select_cols = "parent, child, reason" select_cols = "parent, child, reason"
if include_is_private: if include_is_private:
@ -369,6 +389,17 @@ async def _build_single_action_sql(
query_parts.append("FROM decisions") query_parts.append("FROM decisions")
query_parts.append("WHERE is_allowed = 1") query_parts.append("WHERE is_allowed = 1")
# Add restriction filter if there are restrictions
if restriction_sqls:
query_parts.append(
"""
AND EXISTS (
SELECT 1 FROM restriction_list r
WHERE (r.parent = decisions.parent OR r.parent IS NULL)
AND (r.child = decisions.child OR r.child IS NULL)
)"""
)
# Add parent filter if specified # Add parent filter if specified
if parent is not None: if parent is not None:
query_parts.append(" AND parent = :filter_parent") query_parts.append(" AND parent = :filter_parent")
@ -405,11 +436,24 @@ async def build_permission_rules_sql(
return ( return (
"SELECT NULL AS parent, NULL AS child, 0 AS allow, NULL AS reason, NULL AS source_plugin WHERE 0", "SELECT NULL AS parent, NULL AS child, 0 AS allow, NULL AS reason, NULL AS source_plugin WHERE 0",
{}, {},
[],
) )
union_parts = [] union_parts = []
all_params = {} all_params = {}
restriction_sqls = []
for permission_sql in permission_sqls: for permission_sql in permission_sqls:
all_params.update(permission_sql.params or {})
# Collect restriction SQL filters
if permission_sql.restriction_sql:
restriction_sqls.append(permission_sql.restriction_sql)
# Skip plugins that only provide restriction_sql (no permission rules)
if permission_sql.sql is None:
continue
union_parts.append( union_parts.append(
f""" f"""
SELECT parent, child, allow, reason, '{permission_sql.source}' AS source_plugin FROM ( SELECT parent, child, allow, reason, '{permission_sql.source}' AS source_plugin FROM (
@ -417,10 +461,9 @@ async def build_permission_rules_sql(
) )
""".strip() """.strip()
) )
all_params.update(permission_sql.params or {})
rules_union = " UNION ALL ".join(union_parts) rules_union = " UNION ALL ".join(union_parts)
return rules_union, all_params return rules_union, all_params, restriction_sqls
async def check_permission_for_resource( async def check_permission_for_resource(
@ -447,7 +490,9 @@ async def check_permission_for_resource(
This builds the cascading permission query and checks if the specific This builds the cascading permission query and checks if the specific
resource is in the allowed set. resource is in the allowed set.
""" """
rules_union, all_params = await build_permission_rules_sql(datasette, actor, action) rules_union, all_params, restriction_sqls = await build_permission_rules_sql(
datasette, actor, action
)
# If no rules (empty SQL), default deny # If no rules (empty SQL), default deny
if not rules_union: if not rules_union:
@ -457,43 +502,54 @@ async def check_permission_for_resource(
all_params["_check_parent"] = parent all_params["_check_parent"] = parent
all_params["_check_child"] = child all_params["_check_child"] = child
# If there are restriction filters, check if the resource passes them first
if restriction_sqls:
# Check if resource is in restriction allowlist
# Database-level restrictions (parent, NULL) should match all children (parent, *)
restriction_check = "\nINTERSECT\n".join(restriction_sqls)
restriction_query = f"""
WITH restriction_list AS (
{restriction_check}
)
SELECT EXISTS (
SELECT 1 FROM restriction_list
WHERE (parent = :_check_parent OR parent IS NULL)
AND (child = :_check_child OR child IS NULL)
) AS in_allowlist
"""
result = await datasette.get_internal_database().execute(
restriction_query, all_params
)
if result.rows and not result.rows[0][0]:
# Resource not in restriction allowlist - deny
return False
query = f""" query = f"""
WITH WITH
all_rules AS ( all_rules AS (
{rules_union} {rules_union}
), ),
child_lvl AS ( matched_rules AS (
SELECT SELECT ar.*,
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny, CASE
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow WHEN ar.child IS NOT NULL THEN 2 -- child-level (most specific)
WHEN ar.parent IS NOT NULL THEN 1 -- parent-level
ELSE 0 -- root/global
END AS depth
FROM all_rules ar FROM all_rules ar
WHERE ar.parent = :_check_parent AND ar.child = :_check_child WHERE (ar.parent IS NULL OR ar.parent = :_check_parent)
AND (ar.child IS NULL OR ar.child = :_check_child)
), ),
parent_lvl AS ( winner AS (
SELECT SELECT *
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny, FROM matched_rules
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow ORDER BY
FROM all_rules ar depth DESC, -- specificity first (higher depth wins)
WHERE ar.parent = :_check_parent AND ar.child IS NULL CASE WHEN allow=0 THEN 0 ELSE 1 END, -- then deny over allow
), source_plugin -- stable tie-break
global_lvl AS ( LIMIT 1
SELECT
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow
FROM all_rules ar
WHERE ar.parent IS NULL AND ar.child IS NULL
) )
SELECT SELECT COALESCE((SELECT allow FROM winner), 0) AS is_allowed
CASE
WHEN cl.any_deny = 1 THEN 0
WHEN cl.any_allow = 1 THEN 1
WHEN pl.any_deny = 1 THEN 0
WHEN pl.any_allow = 1 THEN 1
WHEN gl.any_deny = 1 THEN 0
WHEN gl.any_allow = 1 THEN 1
ELSE 0
END AS is_allowed
FROM child_lvl cl, parent_lvl pl, global_lvl gl
""" """
# Execute the query against the internal database # Execute the query against the internal database

View file

@ -99,6 +99,10 @@ def build_rules_union(
# No namespacing - just use plugin params as-is # No namespacing - just use plugin params as-is
params.update(p.params or {}) params.update(p.params or {})
# Skip plugins that only provide restriction_sql (no permission rules)
if p.sql is None:
continue
parts.append( parts.append(
f""" f"""
SELECT parent, child, allow, reason, '{p.source}' AS source_plugin FROM ( SELECT parent, child, allow, reason, '{p.source}' AS source_plugin FROM (
@ -155,6 +159,8 @@ async def resolve_permissions_from_catalog(
- resource (rendered "/parent/child" or "/parent" or "/") - resource (rendered "/parent/child" or "/parent" or "/")
""" """
resolved_plugins: List[PermissionSQL] = [] resolved_plugins: List[PermissionSQL] = []
restriction_sqls: List[str] = []
for plugin in plugins: for plugin in plugins:
if callable(plugin) and not isinstance(plugin, PermissionSQL): if callable(plugin) and not isinstance(plugin, PermissionSQL):
resolved = plugin(action) # type: ignore[arg-type] resolved = plugin(action) # type: ignore[arg-type]
@ -164,6 +170,10 @@ async def resolve_permissions_from_catalog(
raise TypeError("Plugin providers must return PermissionSQL instances") raise TypeError("Plugin providers must return PermissionSQL instances")
resolved_plugins.append(resolved) resolved_plugins.append(resolved)
# Collect restriction SQL filters
if resolved.restriction_sql:
restriction_sqls.append(resolved.restriction_sql)
union_sql, rule_params = build_rules_union(actor, resolved_plugins) union_sql, rule_params = build_rules_union(actor, resolved_plugins)
all_params = { all_params = {
**(candidate_params or {}), **(candidate_params or {}),
@ -199,8 +209,8 @@ async def resolve_permissions_from_catalog(
PARTITION BY parent, child PARTITION BY parent, child
ORDER BY ORDER BY
depth DESC, -- specificity first depth DESC, -- specificity first
CASE WHEN allow=0 THEN 0 ELSE 1 END, -- deny over allow at same depth CASE WHEN allow=0 THEN 0 ELSE 1 END, -- then deny over allow at same depth
source_plugin -- stable tie-break source_plugin -- stable tie-break
) AS rn ) AS rn
FROM matched FROM matched
), ),
@ -228,6 +238,141 @@ async def resolve_permissions_from_catalog(
ORDER BY c.parent, c.child ORDER BY c.parent, c.child
""" """
# If there are restriction filters, wrap the query with INTERSECT
# This ensures only resources in the restriction allowlist are returned
if restriction_sqls:
# Start with the main query, but select only parent/child for the INTERSECT
main_query_for_intersect = f"""
WITH
cands AS (
{candidate_sql}
),
rules AS (
{union_sql}
),
matched AS (
SELECT
c.parent, c.child,
r.allow, r.reason, r.source_plugin,
CASE
WHEN r.child IS NOT NULL THEN 2 -- child-level (most specific)
WHEN r.parent IS NOT NULL THEN 1 -- parent-level
ELSE 0 -- root/global
END AS depth
FROM cands c
JOIN rules r
ON (r.parent IS NULL OR r.parent = c.parent)
AND (r.child IS NULL OR r.child = c.child)
),
ranked AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY parent, child
ORDER BY
depth DESC, -- specificity first
CASE WHEN allow=0 THEN 0 ELSE 1 END, -- then deny over allow at same depth
source_plugin -- stable tie-break
) AS rn
FROM matched
),
winner AS (
SELECT parent, child,
allow, reason, source_plugin, depth
FROM ranked WHERE rn = 1
),
permitted_resources AS (
SELECT c.parent, c.child
FROM cands c
LEFT JOIN winner w
ON ((w.parent = c.parent) OR (w.parent IS NULL AND c.parent IS NULL))
AND ((w.child = c.child ) OR (w.child IS NULL AND c.child IS NULL))
WHERE COALESCE(w.allow, CASE WHEN :implicit_deny THEN 0 ELSE NULL END) = 1
)
SELECT parent, child FROM permitted_resources
"""
# Build restriction list with INTERSECT (all must match)
# Then filter to resources that match hierarchically
restriction_intersect = "\nINTERSECT\n".join(restriction_sqls)
# Combine: resources allowed by permissions AND in restriction allowlist
# Database-level restrictions (parent, NULL) should match all children (parent, *)
filtered_resources = f"""
WITH restriction_list AS (
{restriction_intersect}
),
permitted AS (
{main_query_for_intersect}
),
filtered AS (
SELECT p.parent, p.child
FROM permitted p
WHERE EXISTS (
SELECT 1 FROM restriction_list r
WHERE (r.parent = p.parent OR r.parent IS NULL)
AND (r.child = p.child OR r.child IS NULL)
)
)
"""
# Now join back to get full results for only the filtered resources
sql = f"""
{filtered_resources}
, cands AS (
{candidate_sql}
),
rules AS (
{union_sql}
),
matched AS (
SELECT
c.parent, c.child,
r.allow, r.reason, r.source_plugin,
CASE
WHEN r.child IS NOT NULL THEN 2 -- child-level (most specific)
WHEN r.parent IS NOT NULL THEN 1 -- parent-level
ELSE 0 -- root/global
END AS depth
FROM cands c
JOIN rules r
ON (r.parent IS NULL OR r.parent = c.parent)
AND (r.child IS NULL OR r.child = c.child)
),
ranked AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY parent, child
ORDER BY
depth DESC, -- specificity first
CASE WHEN allow=0 THEN 0 ELSE 1 END, -- then deny over allow at same depth
source_plugin -- stable tie-break
) AS rn
FROM matched
),
winner AS (
SELECT parent, child,
allow, reason, source_plugin, depth
FROM ranked WHERE rn = 1
)
SELECT
c.parent, c.child,
COALESCE(w.allow, CASE WHEN :implicit_deny THEN 0 ELSE NULL END) AS allow,
COALESCE(w.reason, CASE WHEN :implicit_deny THEN 'implicit deny' ELSE NULL END) AS reason,
w.source_plugin,
COALESCE(w.depth, -1) AS depth,
:action AS action,
CASE
WHEN c.parent IS NULL THEN '/'
WHEN c.child IS NULL THEN '/' || c.parent
ELSE '/' || c.parent || '/' || c.child
END AS resource
FROM filtered c
LEFT JOIN winner w
ON ((w.parent = c.parent) OR (w.parent IS NULL AND c.parent IS NULL))
AND ((w.child = c.child ) OR (w.child IS NULL AND c.child IS NULL))
ORDER BY c.parent, c.child
"""
rows_iter: Iterable[sqlite3.Row] = await db.execute( rows_iter: Iterable[sqlite3.Row] = await db.execute(
sql, sql,
{**all_params, "implicit_deny": 1 if implicit_deny else 0}, {**all_params, "implicit_deny": 1 if implicit_deny else 0},

View file

@ -403,7 +403,7 @@ class PermissionRulesView(BaseView):
from datasette.utils.actions_sql import build_permission_rules_sql from datasette.utils.actions_sql import build_permission_rules_sql
union_sql, union_params = await build_permission_rules_sql( union_sql, union_params, restriction_sqls = await build_permission_rules_sql(
self.ds, actor, action self.ds, actor, action
) )
await self.ds.refresh_schemas() await self.ds.refresh_schemas()

View file

@ -0,0 +1,133 @@
"""
Test for actor restrictions bug with database-level config.
This test currently FAILS, demonstrating the bug where database-level
config allow blocks can bypass table-level restrictions.
"""
import pytest
from datasette.app import Datasette
from datasette.resources import TableResource
@pytest.mark.asyncio
async def test_table_restrictions_not_bypassed_by_database_level_config():
"""
Actor restrictions should act as hard limits that config cannot override.
BUG: When an actor has table-level restrictions (e.g., only table2 and table3)
but config has a database-level allow block, the database-level config rule
currently allows ALL tables, not just those in the restriction allowlist.
This test documents the expected behavior and will FAIL until the bug is fixed.
"""
# Config grants access at DATABASE level (not table level)
config = {
"databases": {
"test_db_rnbbdlc": {
"allow": {
"id": "user"
} # Database-level allow - grants access to all tables
}
}
}
ds = Datasette(config=config)
await ds.invoke_startup()
db = ds.add_memory_database("test_db_rnbbdlc")
await db.execute_write("create table table1 (id integer primary key)")
await db.execute_write("create table table2 (id integer primary key)")
await db.execute_write("create table table3 (id integer primary key)")
await db.execute_write("create table table4 (id integer primary key)")
# Actor restricted to ONLY table2 and table3
# Even though config allows the whole database, restrictions should limit access
actor = {
"id": "user",
"_r": {
"r": { # Resource-level (table-level) restrictions
"test_db_rnbbdlc": {
"table2": ["vt"], # vt = view-table abbreviation
"table3": ["vt"],
}
}
},
}
# table2 should be allowed (in restriction allowlist AND config allows)
result = await ds.allowed(
action="view-table",
resource=TableResource("test_db_rnbbdlc", "table2"),
actor=actor,
)
assert result is True, "table2 should be allowed - in restriction allowlist"
# table3 should be allowed (in restriction allowlist AND config allows)
result = await ds.allowed(
action="view-table",
resource=TableResource("test_db_rnbbdlc", "table3"),
actor=actor,
)
assert result is True, "table3 should be allowed - in restriction allowlist"
# table1 should be DENIED (NOT in restriction allowlist)
# Even though database-level config allows it, restrictions should deny it
result = await ds.allowed(
action="view-table",
resource=TableResource("test_db_rnbbdlc", "table1"),
actor=actor,
)
assert (
result is False
), "table1 should be DENIED - not in restriction allowlist, config cannot override"
# table4 should be DENIED (NOT in restriction allowlist)
# Even though database-level config allows it, restrictions should deny it
result = await ds.allowed(
action="view-table",
resource=TableResource("test_db_rnbbdlc", "table4"),
actor=actor,
)
assert (
result is False
), "table4 should be DENIED - not in restriction allowlist, config cannot override"
@pytest.mark.asyncio
async def test_database_restrictions_with_database_level_config():
"""
Verify that database-level restrictions work correctly with database-level config.
This should pass - it's testing the case where restriction granularity
matches config granularity.
"""
config = {
"databases": {"test_db_rwdl": {"allow": {"id": "user"}}} # Database-level allow
}
ds = Datasette(config=config)
await ds.invoke_startup()
db = ds.add_memory_database("test_db_rwdl")
await db.execute_write("create table table1 (id integer primary key)")
await db.execute_write("create table table2 (id integer primary key)")
# Actor has database-level restriction (all tables in test_db_rwdl)
actor = {
"id": "user",
"_r": {"d": {"test_db_rwdl": ["vt"]}}, # Database-level restrictions
}
# Both tables should be allowed (database-level restriction matches database-level config)
result = await ds.allowed(
action="view-table",
resource=TableResource("test_db_rwdl", "table1"),
actor=actor,
)
assert result is True, "table1 should be allowed"
result = await ds.allowed(
action="view-table",
resource=TableResource("test_db_rwdl", "table2"),
actor=actor,
)
assert result is True, "table2 should be allowed"

View file

@ -1630,6 +1630,16 @@ async def test_hook_register_actions_with_custom_resources():
reason="user2 granted view-document-collection" reason="user2 granted view-document-collection"
) )
# Default allow for view-document-collection (like other view-* actions)
if action == "view-document-collection":
return PermissionSQL.allow(
reason="default allow for view-document-collection"
)
# Default allow for view-document (like other view-* actions)
if action == "view-document":
return PermissionSQL.allow(reason="default allow for view-document")
# Register the plugin temporarily # Register the plugin temporarily
plugin = TestPlugin() plugin = TestPlugin()
pm.register(plugin, name="test_custom_resources_plugin") pm.register(plugin, name="test_custom_resources_plugin")