From 79879b834a07a1a31f17c7cb737962d790109de8 Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Thu, 23 Oct 2025 16:08:56 -0700 Subject: [PATCH] Address PR #2515 review comments - Add URL to sqlite-permissions-poc in module docstring - Replace Optional with | None for modern Python syntax - Add Datasette type annotations - Add SQL comment explaining cascading permission logic - Refactor duplicated plugin result processing into helper function --- datasette/utils/actions_sql.py | 81 +++++++++++++++++++++++----------- 1 file changed, 55 insertions(+), 26 deletions(-) diff --git a/datasette/utils/actions_sql.py b/datasette/utils/actions_sql.py index c0d55d08..77aab236 100644 --- a/datasette/utils/actions_sql.py +++ b/datasette/utils/actions_sql.py @@ -2,7 +2,9 @@ SQL query builder for hierarchical permission checking. This module implements a cascading permission system based on the pattern -from the sqlite-permissions-poc. It builds SQL queries that: +from https://github.com/simonw/research/tree/main/sqlite-permissions-poc + +It builds SQL queries that: 1. Start with all resources of a given type (from resource_type.resources_sql()) 2. Gather permission rules from plugins (via permission_resources_sql hook) @@ -19,14 +21,46 @@ The core pattern is: - Across levels, child beats parent beats global """ -from typing import Optional +from typing import TYPE_CHECKING + from datasette.plugins import pm from datasette.utils import await_me_maybe from datasette.permissions import PermissionSQL +if TYPE_CHECKING: + from datasette.app import Datasette + + +def _process_permission_results(results) -> tuple[list[str], dict]: + """ + Process plugin permission results into SQL fragments and parameters. + + Args: + results: Results from permission_resources_sql hook (may be list or single PermissionSQL) + + Returns: + A tuple of (list of SQL strings, dict of parameters) + """ + rule_sqls = [] + all_params = {} + + if results is None: + return rule_sqls, all_params + + if isinstance(results, list): + for plugin_sql in results: + if isinstance(plugin_sql, PermissionSQL): + rule_sqls.append(plugin_sql.sql) + all_params.update(plugin_sql.params) + elif isinstance(results, PermissionSQL): + rule_sqls.append(results.sql) + all_params.update(results.params) + + return rule_sqls, all_params + async def build_allowed_resources_sql( - datasette, + datasette: "Datasette", actor: dict | None, action: str, ) -> tuple[str, dict]: @@ -76,16 +110,9 @@ async def build_allowed_resources_sql( for result in rule_results: result = await await_me_maybe(result) - if result is None: - continue - if isinstance(result, list): - for plugin_sql in result: - if isinstance(plugin_sql, PermissionSQL): - rule_sqls.append(plugin_sql.sql) - all_params.update(plugin_sql.params) - elif isinstance(result, PermissionSQL): - rule_sqls.append(result.sql) - all_params.update(result.params) + sqls, params = _process_permission_results(result) + rule_sqls.extend(sqls) + all_params.update(params) # If no rules, return empty result (deny all) if not rule_sqls: @@ -135,6 +162,15 @@ global_lvl AS ( decisions AS ( SELECT b.parent, b.child, + -- Cascading permission logic: child → parent → global, DENY beats ALLOW at each level + -- Priority order: + -- 1. Child-level deny (most specific, blocks access) + -- 2. Child-level allow (most specific, grants access) + -- 3. Parent-level deny (intermediate, blocks access) + -- 4. Parent-level allow (intermediate, grants access) + -- 5. Global-level deny (least specific, blocks access) + -- 6. Global-level allow (least specific, grants access) + -- 7. Default deny (no rules match) CASE WHEN cl.any_deny = 1 THEN 0 WHEN cl.any_allow = 1 THEN 1 @@ -167,11 +203,11 @@ ORDER BY parent, child async def check_permission_for_resource( - datasette, + datasette: "Datasette", actor: dict | None, action: str, - parent: Optional[str], - child: Optional[str], + parent: str | None, + child: str | None, ) -> bool: """ Check if an actor has permission for a specific action on a specific resource. @@ -207,16 +243,9 @@ async def check_permission_for_resource( for result in rule_results: result = await await_me_maybe(result) - if result is None: - continue - if isinstance(result, list): - for plugin_sql in result: - if isinstance(plugin_sql, PermissionSQL): - rule_sqls.append(plugin_sql.sql) - all_params.update(plugin_sql.params) - elif isinstance(result, PermissionSQL): - rule_sqls.append(result.sql) - all_params.update(result.params) + sqls, params = _process_permission_results(result) + rule_sqls.extend(sqls) + all_params.update(params) # If no rules, default deny if not rule_sqls: