From 1fa23f4a42e95c5212672306fdeffd7b2d9662f2 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 6 Feb 2026 01:52:08 +0000 Subject: [PATCH] Add consolidation proposal for cascading logic duplication Detailed design for extracting build_cascading_ctes(), collect_permission_rules(), and build_restriction_filter() to replace three separate implementations with one shared SQL builder. Includes migration plan and handles the include_is_private complication. https://claude.ai/code/session_013EkyroQKPhcjdMbpHc9g4X --- permissions-notes.md | 390 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 390 insertions(+) diff --git a/permissions-notes.md b/permissions-notes.md index 3b61a06b..00695f08 100644 --- a/permissions-notes.md +++ b/permissions-notes.md @@ -335,3 +335,393 @@ Refactor to avoid duplicating the entire CTE chain when restrictions are present ### P4: Add deny reason to permission check logging (Issue 12) Return `(allowed, reason)` tuples from `check_permission_for_resource()`. + +--- + +## Proposal: Consolidating the Cascading Logic (Issues 2 + 3) + +### The problem + +The cascading logic ("child > parent > global; deny beats allow at each level") is implemented three times in two files: + +| # | Function | File | Used by | SQL pattern | +|---|----------|------|---------|-------------| +| 1 | `_build_single_action_sql()` | `actions_sql.py:246-384` | `allowed_resources()` (production) | 3 separate CTEs (`child_lvl`, `parent_lvl`, `global_lvl`) with `LEFT JOIN` + `GROUP BY` + `MAX()`, then CASE cascade | +| 2 | `check_permission_for_resource()` | `actions_sql.py:555-587` | `allowed()` (production) | `ROW_NUMBER() OVER (PARTITION BY ... ORDER BY depth DESC, ...)` + `LIMIT 1` | +| 3 | `resolve_permissions_from_catalog()` | `permissions.py:197-391` | Tests only | Same `ROW_NUMBER()` as #2, but with the entire CTE chain **tripled** when restrictions are present | + +These three implementations must all agree on the resolution semantics. A logic change (e.g., adding a priority tier) would need to be replicated in all three places. + +### Why three exist + +Each serves a different purpose with different requirements: + +- **#1 (bulk resources)**: Needs to evaluate every `(parent, child)` in the `base` CTE. Can't use `ROW_NUMBER()` as easily because it needs the per-resource aggregates available for the `include_is_private` anonymous pass too. Outputs `reason` as JSON array and `is_private`. +- **#2 (single resource)**: Only checks one `(parent, child)`. Much simpler — just filter matching rules, rank, pick winner. Returns boolean. +- **#3 (test utility)**: Returns full resolution details (allow, reason, source_plugin, depth) for every candidate. Used in tests to verify the cascading logic itself. + +### Proposed design: One SQL builder, three callers + +Introduce a single function `build_cascading_ctes()` that generates the shared CTE fragment, then each caller wraps it with its own `SELECT` and extras. + +#### Step 1: Extract `build_rules_union_from_permission_sqls()` + +Both production paths (#1 and #2) already have nearly identical code to iterate over `PermissionSQL` objects, collect params, collect `restriction_sqls`, and build the UNION ALL. Factor this into a single shared function: + +```python +# In actions_sql.py (or a new shared module) + +@dataclass +class CollectedRules: + """Result of collecting PermissionSQL objects into SQL fragments.""" + rules_union: str # UNION ALL of all rule SELECTs + params: dict[str, Any] # All collected params + restriction_sqls: list[str] # restriction_sql fragments + +def collect_permission_rules( + permission_sqls: list[PermissionSQL], +) -> CollectedRules | None: + """ + Iterate PermissionSQL objects, build the UNION ALL, collect params + and restriction_sqls. Returns None if no rule SQL was found. + """ + rule_parts = [] + all_params = {} + restriction_sqls = [] + + for i, psql in enumerate(permission_sqls): + all_params.update(psql.params or {}) + if psql.restriction_sql: + restriction_sqls.append(psql.restriction_sql) + if psql.sql is None: + continue + # Parameterize source_plugin instead of interpolating (fixes Issue 5) + source_key = f"_src_{i}" + all_params[source_key] = psql.source + rule_parts.append( + f"SELECT parent, child, allow, reason, :{source_key} AS source_plugin" + f" FROM ({psql.sql})" + ) + + if not rule_parts: + return None + + return CollectedRules( + rules_union=" UNION ALL ".join(rule_parts), + params=all_params, + restriction_sqls=restriction_sqls, + ) +``` + +This already fixes **Issue 5** (`source_plugin` injection) as a side effect. + +#### Step 2: Extract `build_cascading_ctes()` + +The core cascading logic — given a `base` CTE and an `all_rules` CTE, produce a `decisions` CTE — can be expressed as a single function that returns CTE SQL fragments: + +```python +def build_cascading_ctes( + *, + rules_alias: str = "all_rules", + base_alias: str = "base", + include_reasons: bool = False, +) -> str: + """ + Return CTE SQL for child_lvl, parent_lvl, global_lvl, decisions. + + Expects the caller to already have defined CTEs named `base_alias` + (with columns: parent, child) and `rules_alias` (with columns: + parent, child, allow, reason, source_plugin). + + The output `decisions` CTE has columns: + parent, child, is_allowed, reason + Where `reason` is either a json_group_array (include_reasons=True) + or the single winning reason text. + """ + # The three level CTEs + level_ctes = [] + for level_name, join_condition in [ + ("child_lvl", f"ar.parent = b.parent AND ar.child = b.child"), + ("parent_lvl", f"ar.parent = b.parent AND ar.child IS NULL"), + ("global_lvl", f"ar.parent IS NULL AND ar.child IS NULL"), + ]: + reason_cols = "" + if include_reasons: + reason_cols = ( + ",\n json_group_array(CASE WHEN ar.allow = 0 " + "THEN ar.source_plugin || ': ' || ar.reason END) AS deny_reasons" + ",\n json_group_array(CASE WHEN ar.allow = 1 " + "THEN ar.source_plugin || ': ' || ar.reason END) AS allow_reasons" + ) + level_ctes.append(f"""{level_name} AS ( + SELECT b.parent, b.child, + MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny, + MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow{reason_cols} + FROM {base_alias} b + LEFT JOIN {rules_alias} ar ON {join_condition} + GROUP BY b.parent, b.child +)""") + + # The decisions CTE + if include_reasons: + reason_case = """ + CASE + WHEN cl.any_deny = 1 THEN cl.deny_reasons + WHEN cl.any_allow = 1 THEN cl.allow_reasons + WHEN pl.any_deny = 1 THEN pl.deny_reasons + WHEN pl.any_allow = 1 THEN pl.allow_reasons + WHEN gl.any_deny = 1 THEN gl.deny_reasons + WHEN gl.any_allow = 1 THEN gl.allow_reasons + ELSE '[]' + END AS reason""" + else: + reason_case = "'implicit deny' AS reason" # simple placeholder + + null_safe_join = ( + "b.parent = {a}.parent AND " + "(b.child = {a}.child OR (b.child IS NULL AND {a}.child IS NULL))" + ) + + decisions_cte = f"""decisions AS ( + SELECT + b.parent, b.child, + CASE + WHEN cl.any_deny = 1 THEN 0 + WHEN cl.any_allow = 1 THEN 1 + WHEN pl.any_deny = 1 THEN 0 + WHEN pl.any_allow = 1 THEN 1 + WHEN gl.any_deny = 1 THEN 0 + WHEN gl.any_allow = 1 THEN 1 + ELSE 0 + END AS is_allowed, + {reason_case} + FROM {base_alias} b + JOIN child_lvl cl ON {null_safe_join.format(a='cl')} + JOIN parent_lvl pl ON {null_safe_join.format(a='pl')} + JOIN global_lvl gl ON {null_safe_join.format(a='gl')} +)""" + + return ",\n".join(level_ctes) + ",\n" + decisions_cte +``` + +#### Step 3: Extract `build_restriction_filter()` + +Restriction handling is also duplicated. A single function can generate the restriction CTE and WHERE clause: + +```python +def build_restriction_filter(restriction_sqls: list[str]) -> tuple[str, str]: + """ + Returns (cte_sql, where_clause) for restriction filtering. + + cte_sql: ", restriction_list AS (...)" to append to WITH block + where_clause: "AND EXISTS (...)" to append to WHERE + """ + restriction_intersect = "\nINTERSECT\n".join( + f"SELECT * FROM ({sql})" for sql in restriction_sqls + ) + cte_sql = f",\nrestriction_list AS (\n {restriction_intersect}\n)" + where_clause = """ + AND EXISTS ( + SELECT 1 FROM restriction_list r + WHERE (r.parent = decisions.parent OR r.parent IS NULL) + AND (r.child = decisions.child OR r.child IS NULL) + )""" + return cte_sql, where_clause +``` + +#### Step 4: Rewrite the three callers + +**`_build_single_action_sql()` (bulk resources)** + +```python +async def _build_single_action_sql(datasette, actor, action, *, parent=None, + include_is_private=False): + action_obj = datasette.actions.get(action) + base_resources_sql = await action_obj.resource_class.resources_sql(datasette) + + permission_sqls = await gather_permission_sql_from_hooks(...) + if permission_sqls is SKIP_PERMISSION_CHECKS: + return ... # early return unchanged + + collected = collect_permission_rules(permission_sqls) + if collected is None: + return ... # empty result unchanged + + all_params = collected.params + + # Build WITH clause + cte_parts = [ + f"WITH\nbase AS (\n {base_resources_sql}\n)", + f"all_rules AS (\n {collected.rules_union}\n)", + ] + + # Anonymous rules for is_private (if needed) + if include_is_private: + anon_collected = ... # same anon logic as before, but using collect_permission_rules + cte_parts.append(f"anon_rules AS (\n {anon_collected.rules_union}\n)") + + # Core cascading logic — ONE call + cte_parts.append(build_cascading_ctes(include_reasons=True)) + + if include_is_private: + cte_parts.append(build_cascading_ctes( + rules_alias="anon_rules", + base_alias="base", + # Use different CTE names to avoid collision: + # This variant would need a prefix parameter, e.g. prefix="anon_" + )) + # ... or simpler: call a second time with aliased names + + # Restriction filter + restriction_cte = "" + restriction_where = "" + if collected.restriction_sqls: + restriction_cte, restriction_where = build_restriction_filter( + collected.restriction_sqls + ) + + # Final SELECT + select_cols = "parent, child, reason" + if include_is_private: + select_cols += ", is_private" + + query = ( + ",\n".join(cte_parts) + restriction_cte + + f"\nSELECT {select_cols}\nFROM decisions\nWHERE is_allowed = 1" + + restriction_where + + (f"\n AND parent = :filter_parent" if parent else "") + + "\nORDER BY parent, child" + ) + return query, all_params +``` + +**`check_permission_for_resource()` (single resource)** + +This can now be rewritten to use the same `build_cascading_ctes()` with a single-row `base`: + +```python +async def check_permission_for_resource(*, datasette, actor, action, parent, child): + rules_union, all_params, restriction_sqls = await build_permission_rules_sql( + datasette, actor, action + ) + if not rules_union: + return False + + all_params["_check_parent"] = parent + all_params["_check_child"] = child + + # Check restrictions first (unchanged fast-path) + if restriction_sqls: + ... # existing restriction check, unchanged + + # Use the shared cascading logic with a single-row base + base_sql = "SELECT :_check_parent AS parent, :_check_child AS child" + cascade = build_cascading_ctes() + + query = f""" +WITH +base AS ({base_sql}), +all_rules AS ({rules_union}), +{cascade} +SELECT COALESCE((SELECT is_allowed FROM decisions), 0) AS is_allowed +""" + result = await datasette.get_internal_database().execute(query, all_params) + return bool(result.rows[0][0]) if result.rows else False +``` + +This replaces the current depth/ROW_NUMBER approach with the same `child_lvl`/`parent_lvl`/`global_lvl` pattern, ensuring identical semantics. + +**`resolve_permissions_from_catalog()` (test utility)** + +This becomes a thin wrapper too. Since it's test-only, the main benefit is eliminating 250 lines of duplicated SQL: + +```python +async def resolve_permissions_from_catalog(db, actor, plugins, action, + candidate_sql, candidate_params=None, + *, implicit_deny=True): + # Resolve plugins (existing code, unchanged) + resolved_plugins, restriction_sqls = ... + + union_sql, rule_params = build_rules_union(actor, resolved_plugins) + all_params = {**(candidate_params or {}), **rule_params, "action": action} + + cascade = build_cascading_ctes( + include_reasons=True, + base_alias="cands", + rules_alias="rules", + ) + + # One query, no duplication + restriction_cte = "" + restriction_where = "" + if restriction_sqls: + restriction_cte, restriction_where = build_restriction_filter(restriction_sqls) + + sql = f""" + WITH + cands AS ({candidate_sql}), + rules AS ({union_sql}), + {cascade} + {restriction_cte} + SELECT + c.parent, c.child, + COALESCE(d.is_allowed, CASE WHEN :implicit_deny THEN 0 ELSE NULL END) AS allow, + d.reason, :action AS action, + ... + FROM cands c + LEFT JOIN decisions d ON c.parent = d.parent AND c.child = d.child + {restriction_where} + ORDER BY c.parent, c.child + """ + + rows = await db.execute(sql, {**all_params, "implicit_deny": ...}) + return [dict(r) for r in rows] +``` + +This eliminates the 135-line SQL triplication entirely. + +### The `include_is_private` complication + +The `include_is_private` path is the one wrinkle. It needs to run the cascading logic twice: once for the real actor and once for anonymous. The current code duplicates all three level CTEs with `anon_` prefixes. + +With the shared builder, we'd need `build_cascading_ctes()` to accept a `prefix` parameter so it can generate `anon_child_lvl`, `anon_parent_lvl`, etc.: + +```python +def build_cascading_ctes(*, rules_alias="all_rules", base_alias="base", + include_reasons=False, prefix=""): + # Use prefix for all CTE names: + # f"{prefix}child_lvl", f"{prefix}parent_lvl", etc. +``` + +Then the caller does: + +```python +ctes = build_cascading_ctes(include_reasons=True) # -> child_lvl, decisions +ctes += build_cascading_ctes(rules_alias="anon_rules", # -> anon_child_lvl, anon_decisions + prefix="anon_", + include_reasons=False) +``` + +And the final SELECT joins both `decisions` and `anon_decisions`. + +### Impact summary + +| Before | After | +|--------|-------| +| `_build_single_action_sql`: ~180 lines of CTE construction | ~40 lines + shared builder | +| `check_permission_for_resource`: ~35 lines of cascading SQL | ~10 lines + shared builder | +| `resolve_permissions_from_catalog`: ~250 lines, SQL tripled when restrictions present | ~30 lines + shared builder | +| `source_plugin` interpolated unsafely in 3 places | Parameterized in `collect_permission_rules()` | +| `build_rules_union` in `permissions.py` (test-only duplicate) | Replaced by shared `collect_permission_rules()` | + +Total: ~465 lines of SQL-building code reduced to ~80 lines of callers + ~80 lines of shared builders. Three implementations of cascading logic become one. + +### Migration plan + +1. Add `collect_permission_rules()` and `build_cascading_ctes()` and `build_restriction_filter()` to `actions_sql.py` (or a new `datasette/utils/permission_sql_builder.py`) +2. Rewrite `check_permission_for_resource()` to use the shared builder +3. Rewrite `_build_single_action_sql()` to use the shared builder (including `include_is_private` prefix support) +4. Rewrite `resolve_permissions_from_catalog()` to use the shared builder +5. Delete `build_rules_union()` from `permissions.py` +6. Run the full test suite — all 263 tests must still pass since behavior is unchanged +7. Verify via `?_trace=1` that the generated SQL is correct and equivalently performant