From 224084facc377e94a66c78a201c0045e6b3cfb8a Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Fri, 24 Oct 2025 13:53:43 -0700 Subject: [PATCH] Make allowed() and check_permission_for_resource keyword-only, add default resource - Made allowed() accept resource=None with InstanceResource() as default - Made both functions keyword-argument only - Added logging to _permission_checks for debug endpoints - Fixed check_permission_for_resource to handle empty params correctly - Created build_permission_rules_sql() helper function for debug views --- datasette/app.py | 282 +++++++++------------------------ datasette/utils/actions_sql.py | 63 +++++--- 2 files changed, 113 insertions(+), 232 deletions(-) diff --git a/datasette/app.py b/datasette/app.py index 79cbb8f0..9348a611 100644 --- a/datasette/app.py +++ b/datasette/app.py @@ -116,7 +116,7 @@ from .plugins import pm, DEFAULT_PLUGINS, get_plugins from .version import __version__ from .permissions import PermissionSQL -from .utils.permissions import build_rules_union +from .resources import InstanceResource, DatabaseResource, TableResource app_root = Path(__file__).parent.parent @@ -1000,14 +1000,14 @@ class Datasette: if request: actor = request.actor # Top-level link - if await self.permission_allowed(actor=actor, action="view-instance"): + if await self.allowed(action="view-instance", actor=actor): crumbs.append({"href": self.urls.instance(), "label": "home"}) # Database link if database: - if await self.permission_allowed( - actor=actor, + if await self.allowed( action="view-database", - resource=database, + resource=DatabaseResource(database=database), + actor=actor, ): crumbs.append( { @@ -1018,10 +1018,10 @@ class Datasette: # Table link if table: assert database, "table= requires database=" - if await self.permission_allowed( - actor=actor, + if await self.allowed( action="view-table", - resource=(database, table), + resource=TableResource(database=database, table=table), + actor=actor, ): crumbs.append( { @@ -1048,194 +1048,6 @@ class Datasette: for hook in pm.hook.track_event(datasette=self, event=event): await await_me_maybe(hook) - async def permission_allowed( - self, actor, action, resource=None, *, default=DEFAULT_NOT_SET - ): - """Check permissions using the permissions_allowed plugin hook""" - result = None - # Use default from registered permission, if available - if default is DEFAULT_NOT_SET and action in self.permissions: - default = self.permissions[action].default - opinions = [] - # Every plugin is consulted for their opinion - for check in pm.hook.permission_allowed( - datasette=self, - actor=actor, - action=action, - resource=resource, - ): - check = await await_me_maybe(check) - if check is not None: - opinions.append(check) - - result = None - # If any plugin said False it's false - the veto rule - if any(not r for r in opinions): - result = False - elif any(r for r in opinions): - # Otherwise, if any plugin said True it's true - result = True - - used_default = False - if result is None: - # No plugin expressed an opinion, so use the default - result = default - used_default = True - self._permission_checks.append( - { - "when": datetime.datetime.now(datetime.timezone.utc).isoformat(), - "actor": actor, - "action": action, - "resource": resource, - "used_default": used_default, - "result": result, - } - ) - return result - - async def _build_permission_rules_sql( - self, actor: dict | None, action: str - ) -> tuple[str, dict]: - """Combine permission_resources_sql PermissionSQL blocks into a UNION query. - - Returns a (sql, params) tuple suitable for execution against SQLite. - Internal helper for permission_allowed_2. - """ - plugin_blocks: List[PermissionSQL] = [] - for block in pm.hook.permission_resources_sql( - datasette=self, - actor=actor, - action=action, - ): - block = await await_me_maybe(block) - if block is None: - continue - if isinstance(block, (list, tuple)): - candidates = block - else: - candidates = [block] - for candidate in candidates: - if candidate is None: - continue - plugin_blocks.append(candidate) - - sql, params = build_rules_union( - actor=actor, - plugins=plugin_blocks, - ) - return sql, params - - async def permission_allowed_2( - self, actor, action, resource=None, *, default=DEFAULT_NOT_SET - ): - """Permission check backed by permission_resources_sql rules.""" - - if default is DEFAULT_NOT_SET and action in self.permissions: - default = self.permissions[action].default - - if isinstance(actor, dict) or actor is None: - actor_dict = actor - else: - actor_dict = {"id": actor} - actor_id = actor_dict.get("id") if actor_dict else None - - candidate_parent = None - candidate_child = None - if isinstance(resource, str): - candidate_parent = resource - elif isinstance(resource, (tuple, list)) and len(resource) == 2: - candidate_parent, candidate_child = resource - elif resource is not None: - raise TypeError("resource must be None, str, or (parent, child) tuple") - - union_sql, union_params = await self._build_permission_rules_sql( - actor_dict, action - ) - - query = f""" - WITH rules AS ( - {union_sql} - ), - candidate AS ( - SELECT :cand_parent AS parent, :cand_child AS child - ), - matched AS ( - SELECT - r.allow, - r.reason, - r.source_plugin, - CASE - WHEN r.child IS NOT NULL THEN 2 - WHEN r.parent IS NOT NULL THEN 1 - ELSE 0 - END AS depth - FROM rules r - JOIN candidate c - ON (r.parent IS NULL OR r.parent = c.parent) - AND (r.child IS NULL OR r.child = c.child) - ), - ranked AS ( - SELECT *, - ROW_NUMBER() OVER ( - ORDER BY - depth DESC, - CASE WHEN allow = 0 THEN 0 ELSE 1 END, - source_plugin - ) AS rn - FROM matched - ), - winner AS ( - SELECT allow, reason, source_plugin, depth - FROM ranked - WHERE rn = 1 - ) - SELECT allow, reason, source_plugin, depth FROM winner - """ - - params = { - **union_params, - "cand_parent": candidate_parent, - "cand_child": candidate_child, - } - - rows = await self.get_internal_database().execute(query, params) - row = rows.first() - - reason = None - source_plugin = None - depth = None - used_default = False - - if row is None: - result = default - used_default = True - else: - allow = row["allow"] - reason = row["reason"] - source_plugin = row["source_plugin"] - depth = row["depth"] - if allow is None: - result = default - used_default = True - else: - result = bool(allow) - - self._permission_checks.append( - { - "when": datetime.datetime.now(datetime.timezone.utc).isoformat(), - "actor": actor, - "action": action, - "resource": resource, - "used_default": used_default, - "result": result, - "reason": reason, - "source_plugin": source_plugin, - "depth": depth, - } - ) - - return result - async def ensure_permissions( self, actor: dict, @@ -1250,26 +1062,37 @@ class Datasette: for permission in permissions: if isinstance(permission, str): action = permission - resource = None + resource_obj = None elif isinstance(permission, (tuple, list)) and len(permission) == 2: action, resource = permission + # Convert old-style resource to Resource object + if isinstance(resource, str): + resource_obj = DatabaseResource(database=resource) + elif isinstance(resource, (tuple, list)) and len(resource) == 2: + resource_obj = TableResource(database=resource[0], table=resource[1]) + else: + resource_obj = None else: assert ( False ), "permission should be string or tuple of two items: {}".format( repr(permission) ) - ok = await self.permission_allowed( - actor, - action, - resource=resource, - default=None, + ok = await self.allowed( + action=action, + resource=resource_obj, + actor=actor, ) - if ok is not None: - if ok: - return - else: - raise Forbidden(action) + if ok: + return + # If we got here, none of the permissions were granted + # Raise Forbidden with the first action + first_permission = permissions[0] + if isinstance(first_permission, str): + first_action = first_permission + else: + first_action = first_permission[0] + raise Forbidden(first_action) async def check_visibility( self, @@ -1441,7 +1264,7 @@ class Datasette: self, *, action: str, - resource: "Resource", + resource: "Resource" = None, actor: dict | None = None, ) -> bool: """ @@ -1450,6 +1273,8 @@ class Datasette: Uses SQL to check permission for a single resource without fetching all resources. This is efficient - it does NOT call allowed_resources() and check membership. + If resource is not provided, defaults to InstanceResource() for instance-level actions. + Example: from datasette.resources import TableResource can_view = await datasette.allowed( @@ -1457,13 +1282,50 @@ class Datasette: resource=TableResource(database="analytics", table="users"), actor=actor ) + + # For instance-level actions, resource can be omitted: + can_debug = await datasette.allowed(action="permissions-debug", actor=actor) """ from datasette.utils.actions_sql import check_permission_for_resource + from datasette.resources import InstanceResource + import datetime - return await check_permission_for_resource( - self, actor, action, resource.parent, resource.child + if resource is None: + resource = InstanceResource() + + result = await check_permission_for_resource( + datasette=self, + actor=actor, + action=action, + parent=resource.parent, + child=resource.child, ) + # Log the permission check for debugging + # Convert Resource to old-style format for backward compatibility with debug tools + if resource.parent and resource.child: + old_style_resource = (resource.parent, resource.child) + elif resource.parent: + old_style_resource = resource.parent + else: + old_style_resource = None + + self._permission_checks.append( + { + "when": datetime.datetime.now(datetime.timezone.utc).isoformat(), + "actor": actor, + "action": action, + "resource": old_style_resource, + "used_default": False, # New system doesn't use defaults in the same way + "result": result, + "reason": None, # Not tracked in new system + "source_plugin": None, # Not tracked in new system + "depth": None, # Not tracked in new system + } + ) + + return result + async def execute( self, db_name, diff --git a/datasette/utils/actions_sql.py b/datasette/utils/actions_sql.py index 2eef2ce1..24643e43 100644 --- a/datasette/utils/actions_sql.py +++ b/datasette/utils/actions_sql.py @@ -415,28 +415,15 @@ async def _build_single_action_sql( return query, all_params -async def check_permission_for_resource( - datasette: "Datasette", - actor: dict | None, - action: str, - parent: str | None, - child: str | None, -) -> bool: +async def build_permission_rules_sql( + datasette: "Datasette", actor: dict | None, action: str +) -> tuple[str, dict]: """ - Check if an actor has permission for a specific action on a specific resource. - - Args: - datasette: The Datasette instance - actor: The actor dict (or None) - action: The action name - parent: The parent resource identifier (e.g., database name, or None) - child: The child resource identifier (e.g., table name, or None) + Build the UNION SQL and params for all permission rules for a given actor and action. Returns: - True if the actor is allowed, False otherwise - - This builds the cascading permission query and checks if the specific - resource is in the allowed set. + A tuple of (sql, params) where sql is a UNION ALL query that returns + (parent, child, allow, reason, source_plugin) rows. """ # Get the Action object action_obj = datasette.actions.get(action) @@ -460,12 +447,44 @@ async def check_permission_for_resource( rule_sqls.extend(sqls) all_params.update(params) - # If no rules, default deny + # Build the UNION query if not rule_sqls: - return False + # Return empty result set + return "SELECT NULL AS parent, NULL AS child, 0 AS allow, NULL AS reason, NULL AS source_plugin WHERE 0", {} - # Build a simplified query that just checks for this one resource rules_union = " UNION ALL ".join(rule_sqls) + return rules_union, all_params + + +async def check_permission_for_resource( + *, + datasette: "Datasette", + actor: dict | None, + action: str, + parent: str | None, + child: str | None, +) -> bool: + """ + Check if an actor has permission for a specific action on a specific resource. + + Args: + datasette: The Datasette instance + actor: The actor dict (or None) + action: The action name + parent: The parent resource identifier (e.g., database name, or None) + child: The child resource identifier (e.g., table name, or None) + + Returns: + True if the actor is allowed, False otherwise + + This builds the cascading permission query and checks if the specific + resource is in the allowed set. + """ + rules_union, all_params = await build_permission_rules_sql(datasette, actor, action) + + # If no rules (empty SQL), default deny + if not rules_union or rules_union.endswith("WHERE 0"): + return False # Add parameters for the resource we're checking all_params["_check_parent"] = parent